// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 use crate::{ contexts::{OnTransmitError, WriteContext}, stream::{ incoming_connection_flow_controller::IncomingConnectionFlowController, stream_events::StreamEvents, stream_interests::{StreamInterestProvider, StreamInterests}, StreamError, }, sync::{IncrementalValueSync, OnceSync, ValueToFrameWriter}, transmission::interest::Provider as _, }; use core::{ convert::TryFrom, task::{Context, Poll, Waker}, }; use s2n_quic_core::{ ack, application, buffer::{ ReceiveBuffer as StreamReceiveBuffer, ReceiveBufferError as StreamReceiveBufferError, }, frame::{stream::StreamRef, MaxStreamData, ResetStream, StopSending, StreamDataBlocked}, packet::number::PacketNumber, stream::{ops, StreamId}, transport, varint::VarInt, }; //= https://www.rfc-editor.org/rfc/rfc9000#section-3.2 //# o //# | Recv STREAM / STREAM_DATA_BLOCKED / RESET_STREAM //# | Create Bidirectional Stream (Sending) //# | Recv MAX_STREAM_DATA / STOP_SENDING (Bidirectional) //# | Create Higher-Numbered Stream //# v //# +-------+ //# | Recv | Recv RESET_STREAM //# | |-----------------------. //# +-------+ | //# | | //# | Recv STREAM + FIN | //# v | //# +-------+ | //# | Size | Recv RESET_STREAM | //# | Known |---------------------->| //# +-------+ | //# | | //# | Recv All Data | //# v v //# +-------+ Recv RESET_STREAM +-------+ //# | Data |--- (optional) --->| Reset | //# | Recvd | Recv All Data | Recvd | //# +-------+<-- (optional) ----+-------+ //# | | //# | App Read All Data | App Read Reset //# v v //# +-------+ +-------+ //# | Data | | Reset | //# | Read | | Read | //# +-------+ +-------+ /// Enumerates the possible states of the receiving side of a stream. /// These states are equivalent to the ones in the QUIC transport specification. #[derive(PartialEq, Debug, Clone)] pub(super) enum ReceiveStreamState { /// The stream is still receiving data from the remote peer. This state /// coverst the `Recv`, `Size Known` and `Data Recvd` state from the QUIC /// specification. These are modelled as a single state because the handling /// for the states is mostly identical. /// The parameter indicates the total size of the stream if it had already /// been signalled by the peer. Receiving, /// All data had been received from the peer and consumed by the user. /// This is the terminal state. DataRead, /// The application has requested the peer to STOP_SENDING and the stream is currently /// waiting for an ACK for the STOP_SENDING frame. Stopping { error: StreamError, missing_data: MissingData, }, /// The connection was reset. The flag indicates whether the reset status /// had already been observed by the user. Reset(StreamError), } /// Keeps track of any missing data in the `Stopping` state #[derive(PartialEq, Debug, Clone)] pub(super) struct MissingData { start: u64, end: u64, } impl MissingData { fn new(start: u64) -> Self { Self { start, end: u64::MAX, } } fn on_data(&mut self, frame: &StreamRef) -> Poll<()> { // We could track if we have any pending gaps and continue to send STOP_SENDING but // that would require keeping the receive buffer around, which isn't really useful // since the application has already closed the stream. // // Instead, we just use a simple range let frame_start = *frame.offset; let frame_end = *(frame.offset + frame.data.len()); let frame_range = frame_start..frame_end; // update the start if it overlaps the offset of the frame if frame_range.contains(&self.start) { self.start = frame_end; } // update the end if this is the last frame or if it contains the current end if frame.is_fin || frame_range.contains(&self.end) { self.end = self.end.min(frame_start); } // return if we've received everything if self.start >= self.end { Poll::Ready(()) } else { Poll::Pending } } } /// Writes the `MAX_STREAM_DATA` frames based on the streams flow control window. #[derive(Debug, Default)] pub(super) struct MaxStreamDataToFrameWriter {} impl ValueToFrameWriter for MaxStreamDataToFrameWriter { fn write_value_as_frame( &self, value: VarInt, stream_id: StreamId, context: &mut W, ) -> Option { context.write_frame(&MaxStreamData { stream_id: stream_id.into(), maximum_stream_data: value, }) } } /// Writes `STOP_SENDING` frames basd on `ApplicationErrorCode`s #[derive(Debug, Default)] pub(super) struct StopSendingToFrameWriter {} impl ValueToFrameWriter for StopSendingToFrameWriter { fn write_value_as_frame( &self, value: application::Error, stream_id: StreamId, context: &mut W, ) -> Option { context.write_frame(&StopSending { stream_id: stream_id.into(), application_error_code: value.into(), }) } } /// A composite flow controller for receiving data. /// The flow controller manages the Streams individual window as well as the /// connection flow control window. #[derive(Debug)] pub(super) struct ReceiveStreamFlowController { /// The connection flow controller pub(super) connection_flow_controller: IncomingConnectionFlowController, /// Synchronizes the read window to the remote peer pub(super) read_window_sync: IncrementalValueSync, /// The relative flow control window we want to maintain pub(super) desired_flow_control_window: u32, /// The amount of credits which had been acquired from the connection and /// stream window in total pub(super) acquired_connection_window: VarInt, /// The amount of credits which had been released in total pub(super) released_connection_window: VarInt, } impl ReceiveStreamFlowController { fn new( connection_flow_controller: IncomingConnectionFlowController, initial_window: VarInt, desired_flow_control_window: u32, ) -> Self { Self { connection_flow_controller, read_window_sync: IncrementalValueSync::new( VarInt::from_u32(desired_flow_control_window), initial_window, VarInt::from_u32(desired_flow_control_window / 10), ), acquired_connection_window: VarInt::from_u32(0), released_connection_window: VarInt::from_u32(0), desired_flow_control_window, } } /// Asserts that the flow control window up to the given offset is available /// /// This checks the Streams individual flow control limit as well as the /// connections flow control limit. /// For the connections limit the method will acquire the necessary remaining /// limit from the connetions flow controller. fn acquire_window_up_to( &mut self, offset: VarInt, source_frame_type: Option, ) -> Result<(), transport::Error> { // Step 1: Check the stream limit //= https://www.rfc-editor.org/rfc/rfc9000#section-19.10 //# The data sent on a stream MUST NOT exceed the largest maximum stream //# data value advertised by the receiver. An endpoint MUST terminate a //# connection with an error of type FLOW_CONTROL_ERROR if it receives //# more data than the largest maximum stream data that it has sent for //# the affected stream. This includes violations of remembered limits //# in Early Data; see Section 7.4.1. if offset > self.read_window_sync.latest_value() { //= https://www.rfc-editor.org/rfc/rfc9000#section-4.1 //# A receiver MUST close the connection with an error of type //# FLOW_CONTROL_ERROR if the sender violates the advertised connection //# or stream data limits; see Section 11 for details on error handling. return Err(transport::Error::FLOW_CONTROL_ERROR .with_reason("Stream flow control window exceeded") .with_frame_type(source_frame_type.unwrap_or_default().into())); } // Remark: Actually this read window might not have yet been // transmitted to the peer. In that case it might have now // successfully sent us data even though we didn't request // yet. However even if we knew we sent the MAX_STREAM_DATA frame // we wouldn't knew whether the peer actually received it and // send their data because of that. Therefore there exists // always some uncertainty around the window. The most // important part however is that the client can never send // us any data outside of a given window - which is still // enforced here. // Step 2: Check the connection limit // Take into account that we might already have acquired a higher // connection window than what is necessary for the given offset let additional_connection_window = offset.saturating_sub(self.acquired_connection_window); if additional_connection_window > VarInt::from_u32(0) { self.connection_flow_controller .acquire_window(additional_connection_window) .map_err(|err| { //= https://www.rfc-editor.org/rfc/rfc9000#section-4.1 //# A receiver MUST close the connection with an error of type //# FLOW_CONTROL_ERROR if the sender violates the advertised connection //# or stream data limits; see Section 11 for details on error handling. err.with_reason("Connection flow control window exceeded") .with_frame_type(source_frame_type.unwrap_or_default().into()) })?; // The connection window was acquired successfully self.acquired_connection_window += additional_connection_window; } Ok(()) } /// Notifies the flow controller that the relative amount of data had been /// consumed from the Stream. /// /// The flow controller will use the information to enqueue window updates /// if necessary fn release_window(&mut self, amount: VarInt) { self.released_connection_window += amount; //= https://www.rfc-editor.org/rfc/rfc9000#section-4.2 //# Therefore, a receiver MUST NOT wait for a //# STREAM_DATA_BLOCKED or DATA_BLOCKED frame before sending a //# MAX_STREAM_DATA or MAX_DATA frame; doing so could result in the //# sender being blocked for the rest of the connection. // Enqueue Stream window updates by increasing the latest value on // the read window synchronisation component self.read_window_sync.update_latest_value( self.released_connection_window .saturating_add(VarInt::from_u32(self.desired_flow_control_window)), ); // Notify the connection flow controller about the consumed data self.connection_flow_controller.release_window(amount); } /// Releases all flow credits which had been acquired but not yet released /// through previous [`release_window`] calls. fn release_outstanding_window(&mut self) { let unreleased = self.acquired_connection_window - self.released_connection_window; self.release_window(unreleased); } /// Stop to synchronize the Streams flow control window to the peer fn stop_sync(&mut self) { self.read_window_sync.stop_sync(); } /// Returns the low watermark for the current state of the flow controller fn watermark(&self) -> usize { // As we approach the flow controller window we want to wake the waiter a bit early // to ensure the application has enough time to read the data and release // additional credits for the peer to send more data. 50% may need to be // modified as additional test are performed. It also may be a good idea to make this // configurable in the future. // TODO possibly make this value configurable let watermark = self.desired_flow_control_window / 2; usize::try_from(watermark).unwrap_or(core::usize::MAX) } /// Returns the MAX_STREAM_DATA window that is currently synchronized /// towards the peer. #[cfg(test)] pub(super) fn current_stream_receive_window(&self) -> VarInt { self.read_window_sync.latest_value() } #[cfg(test)] pub(super) fn remaining_connection_receive_window(&self) -> VarInt { self.connection_flow_controller.remaining_window() } } /// The read half of a stream #[derive(Debug)] pub struct ReceiveStream { /// The current state of the stream pub(super) state: ReceiveStreamState, /// Buffer of already received data pub(super) receive_buffer: StreamReceiveBuffer, /// The composite flow controller for receiving data pub(super) flow_controller: ReceiveStreamFlowController, /// Synchronizes the `STOP_SENDING` flag towards the peer. pub(super) stop_sending_sync: OnceSync, /// The handle of a task that is currently waiting on new incoming data, along with the low /// watermark value. pub(super) read_waiter: Option<(Waker, usize)>, /// Whether the final state had already been observed by the application final_state_observed: bool, /// Marks the stream as detached from the application detached: bool, } impl ReceiveStream { pub fn new( is_closed: bool, connection_flow_controller: IncomingConnectionFlowController, initial_window: VarInt, desired_flow_control_window: u32, ) -> ReceiveStream { // If the stream is created in closed state directly move into the // terminal state. let state = if is_closed { ReceiveStreamState::DataRead } else { ReceiveStreamState::Receiving }; let mut result = ReceiveStream { state, receive_buffer: StreamReceiveBuffer::new(), flow_controller: ReceiveStreamFlowController::new( connection_flow_controller, initial_window, desired_flow_control_window, ), stop_sending_sync: OnceSync::new(), read_waiter: None, final_state_observed: is_closed, detached: is_closed, }; if is_closed { result.flow_controller.stop_sync(); result.stop_sending_sync.stop_sync(); } result } // These functions are called from the packet delivery thread pub fn on_data( &mut self, frame: &StreamRef, events: &mut StreamEvents, ) -> Result<(), transport::Error> { match self.state { ReceiveStreamState::Reset(_) => { // Since the stream already had been reset we ignore the data. // In this case we don't check for correctness - e.g. whether the // would actually have fitted within our flow-control window and // into the end-of-stream signal. We could add these checks, but // the main outcome would be to send connection errors. } ReceiveStreamState::Stopping { ref mut missing_data, .. } => { if missing_data.on_data(frame).is_ready() { self.stop_sending_sync.stop_sync(); self.final_state_observed = true; } } ReceiveStreamState::DataRead => { // We also ignore the data in this case. We could validate whether // it actually fitted into previously announced window, but // don't get any benefit from this. } ReceiveStreamState::Receiving => { // In this function errors are returned, but the Stream is left // intact. It will be task of the caller to fail the stream // with `trigger_internal_reset()`. // This decision was made since on a connection error all // Streams need to be failed. // If the size is known we check against the maximum size. // Otherwise we check against the flow control window let data_end = frame .offset .checked_add_usize(frame.data.len()) .ok_or_else(|| { transport::Error::FLOW_CONTROL_ERROR .with_reason("data size overflow") .with_frame_type(frame.tag().into()) })?; // If we don't know the final size then try acquiring flow control //= https://www.rfc-editor.org/rfc/rfc9000#section-4.5 //# The receiver MUST use the final size of the stream to //# account for all bytes sent on the stream in its connection level flow //# controller. if self.receive_buffer.final_size().is_none() { self.flow_controller .acquire_window_up_to(data_end, frame.tag().into())?; } // If this is the last frame then inform the receive_buffer so it can check for any // final size errors. let write_result = if frame.is_fin { self.receive_buffer.write_at_fin(frame.offset, frame.data) } else { self.receive_buffer.write_at(frame.offset, frame.data) }; write_result.map_err(|error| { match error { //= https://www.rfc-editor.org/rfc/rfc9000#section-19.9 //# An endpoint MUST terminate a connection with an error of type //# FLOW_CONTROL_ERROR if it receives more data than the maximum data //# value that it has sent. This includes violations of remembered //# limits in Early Data; see Section 7.4.1. StreamReceiveBufferError::OutOfRange => { transport::Error::FLOW_CONTROL_ERROR } //= https://www.rfc-editor.org/rfc/rfc9000#section-4.5 //# Once a final size for a stream is known, it cannot change. If a //# RESET_STREAM or STREAM frame is received indicating a change in the //# final size for the stream, an endpoint SHOULD respond with an error //# of type FINAL_SIZE_ERROR; see Section 11 for details on error //# handling. StreamReceiveBufferError::InvalidFin => transport::Error::FINAL_SIZE_ERROR, } .with_reason("data reception error") .with_frame_type(frame.tag().into()) })?; // wake the waiter if the buffer has data and the len has crossed the watermark let mut should_wake = self .read_waiter .as_ref() .map(|(_, low_watermark)| { let len = self.receive_buffer.len(); // make sure we have at least 1 byte available for reading if len == 0 { return false; } let watermark = (*low_watermark) // don't let the application-provided watermark exceed the flow // controller watermark .min(self.flow_controller.watermark()); // ensure the buffer has at least the watermark len >= watermark }) .unwrap_or(false); if frame.is_fin { // We don't have to transmit MAX_STREAM_DATA frames anymore. // If there is pending transmission/retransmission then remove it. // // This has a subtle side effect that the message which signaled // the higher flow control window might actually have never been // received by the peer (it's pending), and it still was able to send // the FIN and more data to us. Since we neither can prove the peer // right there is nothing we can do about this. //= https://www.rfc-editor.org/rfc/rfc9000#section-3.2 //# When a STREAM frame with a FIN bit is received, the final size of the //# stream is known; see Section 4.5. The receiving part of the stream //# then enters the "Size Known" state. In this state, the endpoint no //# longer needs to send MAX_STREAM_DATA frames, it only receives any //# retransmissions of stream data. self.flow_controller.stop_sync(); } if let Some(total_size) = self.receive_buffer.final_size() { // If we already have received all the data, there is no point // in transmitting STOP_SENDING anymore. // Note that this might not happen in the same frame where we // receive the FIN. We might receive the FIN before receiving // outstanding data. if self.receive_buffer.total_received_len() == total_size { self.stop_sending_sync.stop_sync(); // wake the waiter, even if we didn't cross the watermark, since the stream // is finished at this point should_wake = true; } // If the frame with the FIN contained no new data all // buffered data might already have been consumed. In this // case we directly go into [`ReceiveStreamState::DataRead`] if frame.is_fin && self.receive_buffer.consumed_len() == total_size { self.receive_buffer.reset(); self.state = ReceiveStreamState::DataRead; } } if should_wake { self.wake(events); } } } Ok(()) } /// This is called when a `STREAM_DATA_BLOCKED` frame had been received for /// this stream pub fn on_stream_data_blocked( &mut self, _frame: &StreamDataBlocked, _events: &mut StreamEvents, ) -> Result<(), transport::Error> { // There is currently no special handling implemented for this event. // In the future we might e.g. generate metrics for this. Ok(()) } /// This method gets called when a stream gets reset due to a reason that is /// not related to a frame. E.g. due to a connection failure. pub fn on_internal_reset(&mut self, error: StreamError, events: &mut StreamEvents) { let reset_result = self.init_reset(error, None, None); // Internal results should never fail debug_assert!(reset_result.is_ok()); // Return the waker to wake up potential users of the stream self.wake(events); } /// This is called when a `RESET_STREAM` frame had been received for /// this stream pub fn on_reset( &mut self, frame: &ResetStream, events: &mut StreamEvents, ) -> Result<(), transport::Error> { //= https://www.rfc-editor.org/rfc/rfc9000#section-3.5 //= type=exception //= reason=It's simpler to accept any RESET_STREAM frame instead of ignore //# An endpoint that sends a STOP_SENDING frame MAY ignore the //# error code in any RESET_STREAM frames subsequently received for that //# stream. let error = StreamError::stream_reset(frame.application_error_code.into()); self.init_reset(error, Some(frame.final_size), Some(frame.tag()))?; // We don't have to send `STOP_SENDING` anymore since the stream was reset by the peer self.stop_sending_sync.stop_sync(); // Return the waker to wake up potential users of the stream self.wake(events); Ok(()) } /// Starts the reset procedure if the Stream has not been in a RESET state /// before. fn init_reset( &mut self, error: StreamError, actual_size: Option, frame_tag: Option, ) -> Result<(), transport::Error> { //= https://www.rfc-editor.org/rfc/rfc9000#section-3.2 //# An implementation MAY //# interrupt delivery of stream data, discard any data that was not //# consumed, and signal the receipt of the RESET_STREAM. // Reset logic is only executed if the stream is neither reset nor if all // data had been already received. match self.state { ReceiveStreamState::Reset(_) | ReceiveStreamState::DataRead => return Ok(()), ReceiveStreamState::Receiving if self.receive_buffer.final_size().is_some() => { let total_size = self.receive_buffer.final_size().unwrap(); if let Some(actual_size) = actual_size { // If the stream size which is indicated through the reset // diverges from the stream size which had been communicated // before this is an error //= https://www.rfc-editor.org/rfc/rfc9000#section-4.5 //# Once a final size for a stream is known, it cannot change. If a //# RESET_STREAM or STREAM frame is received indicating a change in the //# final size for the stream, an endpoint SHOULD respond with an error //# of type FINAL_SIZE_ERROR; see Section 11 for details on error //# handling. A receiver SHOULD treat receipt of data at or beyond the //# final size as an error of type FINAL_SIZE_ERROR, even after a stream //# is closed. if Into::::into(actual_size) != total_size { return Err(transport::Error::FINAL_SIZE_ERROR .with_reason( "Final size in reset frame did not match previous final size", ) .with_frame_type(frame_tag.unwrap_or_default().into())); } } if self.receive_buffer.total_received_len() == total_size { // This equals the DataRecvd state from the specification. // We have received all data up to offset total_size and are // just waiting for the user to read it. // In this case we ignore the reset, since we don't require // any information from the peer anymore. return Ok(()); } } ReceiveStreamState::Receiving | ReceiveStreamState::Stopping { .. } => { if let Some(actual_size) = actual_size { // We have to acquire the flow control credits up up to the // offset which the peer indicates as the end of the Stream. // This is necessary since the peer will have reserved credits // up to this offset, and we need to send the necessary // flow control updates. self.flow_controller .acquire_window_up_to(actual_size, frame_tag)?; } } } // If the stream was reset by the peer we don't actually have to retransmit // outgoing flow control window anymore. self.flow_controller.stop_sync(); // Reset the stream receive buffer self.receive_buffer.reset(); // The data which was inside the receive buffer had actually not been // consumed. And if the peer signaled us a bigger final size than what // we actually received, we might not even had received the data yet for // which we acquired a connection flow control window. Nevertheless we // need to release the complete window in order not to starve other // streams on connection flow control credits. This is performed by the // the following call, which releases ALL credits which have not been // previously released. self.flow_controller.release_outstanding_window(); self.state = ReceiveStreamState::Reset(error); Ok(()) } /// This method gets called when a packet delivery got acknowledged pub fn on_packet_ack(&mut self, ack_set: &A) { self.flow_controller.read_window_sync.on_packet_ack(ack_set); let _ = self.stop_sending_sync.on_packet_ack(ack_set); } /// This method gets called when a packet loss is reported pub fn on_packet_loss(&mut self, ack_set: &A) { self.flow_controller .read_window_sync .on_packet_loss(ack_set); self.stop_sending_sync.on_packet_loss(ack_set); } /// Queries the component for any outgoing frames that need to get sent pub fn on_transmit( &mut self, stream_id: StreamId, context: &mut W, ) -> Result<(), OnTransmitError> { self.stop_sending_sync.on_transmit(stream_id, context)?; //= https://www.rfc-editor.org/rfc/rfc9000#section-4.2 //= type=TODO //= tracking-issue=334 //# To avoid blocking a sender, a receiver MAY send a MAX_STREAM_DATA or //# MAX_DATA frame multiple times within a round trip or send it early //# enough to allow time for loss of the frame and subsequent recovery. self.flow_controller .read_window_sync .on_transmit(stream_id, context) } /// Wakes up the application on progress updates /// /// If there is not a registered waker and the stream is in a terminal state, /// the stream will be finalized. fn wake(&mut self, events: &mut StreamEvents) { // Return the waker to wake up potential users of the stream if let Some((waker, _low_watermark)) = self.read_waiter.take() { events.store_read_waker(waker); return; } // If the stream is detached from the application, then try to make progress if self.detached { self.detach(); } } // These functions are called from the client API pub fn poll_request( &mut self, request: &mut ops::rx::Request, context: Option<&Context>, ) -> Result { let mut response = ops::rx::Response::default(); if let Some(error_code) = request.stop_sending { let error = StreamError::stream_reset(error_code); match self.state { //= https://www.rfc-editor.org/rfc/rfc9000#section-3.3 //# A receiver MAY send a STOP_SENDING frame in any state where it has //# not received a RESET_STREAM frame -- that is, states other than //# "Reset Recvd" or "Reset Read". //= https://www.rfc-editor.org/rfc/rfc9000#section-3.5 //# STOP_SENDING SHOULD only be sent for a stream that has not been reset //# by the peer. ReceiveStreamState::Reset(error) | ReceiveStreamState::Stopping { error, .. } => { response.status = ops::Status::Reset(error); return Ok(response); } // If we've already read everything, transition to the final state ReceiveStreamState::DataRead => { self.state = ReceiveStreamState::DataRead; self.final_state_observed = true; response.status = ops::Status::Finished; return Ok(response); } // If we've already buffered everything, transition to the final state ReceiveStreamState::Receiving if self.receive_buffer.is_writing_complete() => { self.state = ReceiveStreamState::DataRead; self.final_state_observed = true; response.status = ops::Status::Finished; return Ok(response); } //= https://www.rfc-editor.org/rfc/rfc9000#section-3.5 //# If the stream is in the "Recv" or "Size Known" states, the transport //# SHOULD signal this by sending a STOP_SENDING frame to prompt closure //# of the stream in the opposite direction. _ => { self.stop_sending_sync.request_delivery(error_code); let received_len = self.receive_buffer.total_received_len(); let missing_data = MissingData::new(received_len); // transition to the Stopping state so we can start shutting down self.state = ReceiveStreamState::Stopping { error, missing_data, }; } } // STOP_SENDING cannot be flushed so it natually operates in detached mode self.detach(); // We clear the receive buffer, to free up any buffer // space which had been allocated but not used self.receive_buffer.reset(); // Mark the stream as reset. Note that the request doesn't have a flush so there's // currently no way to wait for the reset to be acknowledged. response.status = ops::Status::Reset(error); return Ok(response); } if request.detached { self.detach(); } // Do some state checks here. Only read data when the client is still // allowed to read (not reset). let total_size = match self.state { ReceiveStreamState::Reset(error) => { // The reset is now known to have been read by the client. self.final_state_observed = true; self.read_waiter = None; return Err(error); } ReceiveStreamState::Stopping { error, .. } => { self.read_waiter = None; return Err(error); } ReceiveStreamState::DataRead => { // All stream data had been received and all buffered data was // already consumed self.final_state_observed = true; self.read_waiter = None; response.status = ops::Status::Finished; return Ok(response); } ReceiveStreamState::Receiving => self.receive_buffer.final_size(), }; let low_watermark = &mut request.low_watermark; let high_watermark = &mut request.high_watermark; let mut should_wake = false; // ensure the number of available bytes is at least the requested low watermark if self.receive_buffer.len() >= self.flow_controller.watermark().min(*low_watermark) { if let Some(chunks) = request.chunks.as_mut().filter(|chunks| !chunks.is_empty()) { // Make sure all of the placeholder chunks are empty. If it's not, it could lead to // replacing a chunk that was received in a previous request. // // We iterate over all of the chunks to make sure we don't do a partial write and // return an error (which would result in losing data). if chunks.iter().any(|chunk| !chunk.is_empty()) { return Err(StreamError::non_empty_output()); } while response.chunks.consumed < chunks.len() { if let Some(data) = self.receive_buffer.pop_watermarked(*high_watermark) { let data_len = data.len(); // Release the flow control window for the consumed chunk self.flow_controller.release_window( VarInt::try_from(data_len) .expect("chunk len should always be less than maximum VarInt"), ); *low_watermark = (*low_watermark).saturating_sub(data_len); *high_watermark = (*high_watermark).saturating_sub(data_len); // replace the placeholder with the actual data let placeholder = core::mem::replace( &mut chunks[response.chunks.consumed], data.freeze(), ); debug_assert!( placeholder.is_empty(), "the placeholder should never contain data" ); response.bytes.consumed += data_len; response.chunks.consumed += 1; } else { // wake the request if we didn't consume anything should_wake |= response.chunks.consumed == 0; break; } } } } else { // notify when we have at least the requested watermark should_wake = true; } // Check for the end of stream and transition to // [`ReceiveStreamState::DataRead`] if necessary. if let Some(total_size) = total_size { if total_size == self.receive_buffer.consumed_len() { // By the time we enter the final state all synchronization // should have been cancelled. debug_assert!(self.stop_sending_sync.is_cancelled()); debug_assert!(self.flow_controller.read_window_sync.is_cancelled()); // The client has consumed all data. The stream // is thereby finished. self.state = ReceiveStreamState::DataRead; // We clear the receive buffer, to free up any buffer // space which had been allocated but not used self.receive_buffer.reset(); // clear the waiter self.read_waiter = None; // mark the final state as observed - the caller is expected to cache the `Finished` status self.final_state_observed = true; // Indicate that all data has been read response.status = ops::Status::Finished; } else if total_size == self.receive_buffer.total_received_len() { // inform callers that the stream will not increase beyond its current size response.status = ops::Status::Finishing; } } let (available_bytes, available_chunks) = self.receive_buffer.report(); response.bytes.available = available_bytes; response.chunks.available = available_chunks; if should_wake { if let Some(context) = context { // Store the waker, in order to be able to wakeup the client when // data arrives later. self.read_waiter = Some((context.waker().clone(), request.low_watermark)); response.will_wake = true; } } Ok(response) } fn detach(&mut self) { debug_assert!( matches!( &self.state, ReceiveStreamState::DataRead | ReceiveStreamState::Reset(_) | ReceiveStreamState::Stopping { .. } ), "a receive stream should only detach in a finalizing state" ); self.detached = true; self.read_waiter = None; match &self.state { // if the application has read the entire stream, then we can finalize the stream ReceiveStreamState::DataRead => { self.final_state_observed = true; } // if the stream has been reset and the application isn't subscribed to updates ReceiveStreamState::Reset(_) => { self.final_state_observed = true; } _ => {} } } } impl StreamInterestProvider for ReceiveStream { #[inline] fn stream_interests(&self, interests: &mut StreamInterests) { if self.final_state_observed { return; } // let the stream container know we still have work to do interests.retained = true; interests.delivery_notifications |= self.stop_sending_sync.is_inflight() || self.flow_controller.read_window_sync.is_inflight(); interests.with_transmission(|query| { self.stop_sending_sync.transmission_interest(query)?; self.flow_controller .read_window_sync .transmission_interest(query)?; Ok(()) }); } } #[cfg(test)] mod tests;