// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 //! `StreamManager` manages the lifecycle of all `Stream`s inside a `Connection` use crate::{ connection, contexts::{ConnectionApiCallContext, OnTransmitError, WriteContext}, recovery::RttEstimator, stream::{ self, incoming_connection_flow_controller::IncomingConnectionFlowController, outgoing_connection_flow_controller::OutgoingConnectionFlowController, stream_container::{StreamContainer, StreamContainerIterationResult}, stream_events::StreamEvents, stream_impl::StreamConfig, StreamError, StreamTrait, }, transmission::{self, interest::Provider as _}, }; use core::{ task::{Context, Poll, Waker}, time::Duration, }; use futures_core::ready; use s2n_quic_core::{ ack, connection::error::Error, endpoint, frame::{ stream::StreamRef, DataBlocked, MaxData, MaxStreamData, MaxStreams, ResetStream, StopSending, StreamDataBlocked, StreamsBlocked, }, packet::number::PacketNumberSpace, stream::{iter::StreamIter, ops, StreamId, StreamType}, time::{timer, Timestamp}, transport::{self, parameters::InitialFlowControlLimits}, varint::VarInt, }; /// Holds one Stream ID of each type (initiator/stream type) #[derive(Debug)] pub(super) struct StreamIdSet { server_initiated_unidirectional: Option, client_initiated_unidirectional: Option, server_initiated_bidirectional: Option, client_initiated_bidirectional: Option, } impl StreamIdSet { /// Returns the `StreamIdSet` where each `StreamId` is initialized to its /// initial value. pub fn initial() -> Self { Self { server_initiated_bidirectional: Some(StreamId::initial( endpoint::Type::Server, StreamType::Bidirectional, )), client_initiated_bidirectional: Some(StreamId::initial( endpoint::Type::Client, StreamType::Bidirectional, )), server_initiated_unidirectional: Some(StreamId::initial( endpoint::Type::Server, StreamType::Unidirectional, )), client_initiated_unidirectional: Some(StreamId::initial( endpoint::Type::Client, StreamType::Unidirectional, )), } } /// Returns the reference to the `StreamId` inside the set for the given /// initiator and stream type pub fn get_mut( &mut self, initiator: endpoint::Type, stream_type: StreamType, ) -> &mut Option { match (initiator, stream_type) { (endpoint::Type::Server, StreamType::Unidirectional) => { &mut self.server_initiated_unidirectional } (endpoint::Type::Client, StreamType::Unidirectional) => { &mut self.client_initiated_unidirectional } (endpoint::Type::Server, StreamType::Bidirectional) => { &mut self.server_initiated_bidirectional } (endpoint::Type::Client, StreamType::Bidirectional) => { &mut self.client_initiated_bidirectional } } } } /// Stores all required state for accepting incoming Streams via the /// `accept()` method #[derive(Debug)] pub(super) struct AcceptState { /// The ID of the next bidirectional Stream that an `accept()` call /// should return. next_bidi_stream_to_accept: Option, /// The ID of the next unidirectional Stream that an `accept()` call /// should return. next_uni_stream_to_accept: Option, /// The `Waker` for the task which needs to get woken when the next /// bidirectional stream was accepted bidi_waker: Option, /// The `Waker` for the task which needs to get woken when the next /// unidirectional stream was accepted uni_waker: Option, } impl AcceptState { pub fn new(local_endpoint_type: endpoint::Type) -> AcceptState { let peer_type = local_endpoint_type.peer_type(); AcceptState { next_bidi_stream_to_accept: Some(StreamId::initial( peer_type, StreamType::Bidirectional, )), next_uni_stream_to_accept: Some(StreamId::initial( peer_type, StreamType::Unidirectional, )), bidi_waker: None, uni_waker: None, } } /// Returns a mutable reference to the `Waker` for the given Stream type pub fn waker_mut(&mut self, stream_type: StreamType) -> &mut Option { match stream_type { StreamType::Bidirectional => &mut self.bidi_waker, StreamType::Unidirectional => &mut self.uni_waker, } } /// Returns the ID of the next Stream that needs to get accepted through /// an `accept())` call. pub fn next_stream_id(&self, stream_type: StreamType) -> Option { match stream_type { StreamType::Bidirectional => self.next_bidi_stream_to_accept, StreamType::Unidirectional => self.next_uni_stream_to_accept, } } /// Returns a mutable reference to the ID of the next Stream that needs to /// get accepted through an `accept())` call. pub fn next_stream_mut(&mut self, stream_type: StreamType) -> &mut Option { match stream_type { StreamType::Bidirectional => &mut self.next_bidi_stream_to_accept, StreamType::Unidirectional => &mut self.next_uni_stream_to_accept, } } } /// Manages all active `Stream`s inside a connection #[derive(Debug)] pub struct StreamManagerState { /// Flow control credit manager for receiving data pub(super) incoming_connection_flow_controller: IncomingConnectionFlowController, /// Flow control credit manager for sending data pub(super) outgoing_connection_flow_controller: OutgoingConnectionFlowController, /// Controller for managing streams concurrency limits stream_controller: stream::Controller, /// A container which contains all Streams streams: StreamContainer, /// The next Stream ID which was not yet used for an initiated stream /// for each stream type pub(super) next_stream_ids: StreamIdSet, /// The type of our local endpoint (client or server) local_endpoint_type: endpoint::Type, /// The initial flow control limits which we advertised towards the peer /// via transport parameters initial_local_limits: InitialFlowControlLimits, /// The initial flow control limits we received from the peer via transport /// parameters initial_peer_limits: InitialFlowControlLimits, /// If the `StreamManager` was closed, this contains the error which was /// passed to the `close()` call close_reason: Option, /// All state for accepting remotely initiated connections pub(super) accept_state: AcceptState, /// Limits for the Stream manager. Since only Stream limits are utilized at /// the moment we only store those stream_limits: stream::Limits, } impl StreamManagerState { /// Performs the given transaction on the `StreamManagerState`. /// If an error occurs, all Streams will be reset with an internal reset. pub fn reset_streams_on_error(&mut self, func: F) -> Result where F: FnOnce(&mut Self) -> Result, { let result = func(self); if let Err(err) = result.as_ref() { self.close((*err).into(), false); } result } /// Inserts the `Stream` into the StreamContainer. /// /// This method does not perform any validation whether it is allowed to /// open the `Stream`. fn insert_stream(&mut self, stream_id: StreamId) { // The receive window is announced by us towards to the peer let initial_receive_window = self .initial_local_limits .stream_limits .max_data(self.local_endpoint_type, stream_id); // The send window is announced to us by the peer let initial_send_window = self .initial_peer_limits .stream_limits .max_data(self.local_endpoint_type.peer_type(), stream_id); // We pass the initial_receive_window also as the desired flow control // window. Thereby we will maintain the same flow control window over // the lifetime of the Stream. // If we would want to have another limit, we would need to have various // limits for the various combinations of unidirectional/bidirectional // Streams. Those would bloat up the config, and essentially just // duplicate the transport parameters. // We limit the initial data limit to u32::MAX (4GB), which far // exceeds the reasonable amount of data a connection is // initially allowed to send. // // By representing the flow control value as a u32, we save space // on the connection state. assert!( initial_receive_window <= VarInt::from_u32(core::u32::MAX), "Receive window must not exceed 32bit range" ); self.streams.insert_stream(S::new(StreamConfig { incoming_connection_flow_controller: self.incoming_connection_flow_controller.clone(), outgoing_connection_flow_controller: self.outgoing_connection_flow_controller.clone(), local_endpoint_type: self.local_endpoint_type, stream_id, initial_receive_window, desired_flow_control_window: initial_receive_window.as_u64() as u32, initial_send_window, max_send_buffer_size: self.stream_limits.max_send_buffer_size.as_u32(), })); } /// Opens a Stream which is referenced in a frame if it has not yet been /// opened so far. This will also open all unopened frames which a lower /// Stream ID of the same type, as required by the QUIC specification. fn open_stream_if_necessary(&mut self, stream_id: StreamId) -> Result<(), transport::Error> { // If the stream ID is higher than any Stream ID we observed so far, we // need open all Stream IDs of the same type. Otherwise we need to look // up the Stream ID the map. let first_unopened_id: StreamId = if let Some(first_unopened_id) = *self .next_stream_ids .get_mut(stream_id.initiator(), stream_id.stream_type()) { first_unopened_id } else { // All Streams for particular initiator end endpoint type have // already been opened. In this case we don't have to open a // Stream, and the referenced Stream ID can also not be higher // than a previous outgoing Stream ID we used. return Ok(()); }; if stream_id.initiator() != self.local_endpoint_type { if stream_id >= first_unopened_id { // This Stream ID is first referenced here. This means we have // to create a new Stream instance if self.close_reason.is_some() { return Err(transport::Error::NO_ERROR.with_reason("Connection was closed")); } //= https://www.rfc-editor.org/rfc/rfc9000#section-4.6 //# Endpoints MUST NOT exceed the limit set by their peer. An endpoint //# that receives a frame with a stream ID exceeding the limit it has //# sent MUST treat this as a connection error of type //# STREAM_LIMIT_ERROR; see Section 11 for details on error handling. let stream_iter = StreamIter::new(first_unopened_id, stream_id); // Validate that there is enough capacity to open all streams. self.stream_controller.on_open_remote_stream(stream_iter)?; // We must create ALL streams with a lower Stream ID too: // //= https://www.rfc-editor.org/rfc/rfc9000#section-3.2 //# Before a stream is created, all streams of the same type with lower- //# numbered stream IDs MUST be created. This ensures that the creation //# order for streams is consistent on both endpoints. for stream_id in stream_iter { self.insert_stream(stream_id); } //= https://www.rfc-editor.org/rfc/rfc9000#section-2.1 //# A QUIC //# endpoint MUST NOT reuse a stream ID within a connection. // Increase the next expected Stream ID. We might thereby exhaust // the Stream ID range, which means we can no longer accept a // further Stream. *self .next_stream_ids .get_mut(stream_id.initiator(), stream_id.stream_type()) = stream_id.next_of_type(); // Wake up the application if it is waiting on new incoming Streams if let Some(waker) = self.accept_state.waker_mut(stream_id.stream_type()).take() { waker.wake(); } } } else { // Check if the peer is sending us a frame for a local initiated Stream with // a higher Stream ID than we ever used. // In this case the peer seems to be time-travelling and know about // Future Stream IDs we might use. We also will not accept this and // close the connection. if stream_id >= first_unopened_id { return Err( transport::Error::STREAM_STATE_ERROR.with_reason("Stream was not yet opened") ); } } Ok(()) } fn poll_open_local_stream( &mut self, stream_type: StreamType, open_token: &mut connection::OpenToken, context: &Context, ) -> Poll> { let first_unopened_id = self .next_stream_ids .get_mut(self.local_endpoint_type, stream_type) .ok_or_else(connection::Error::stream_id_exhausted)?; //= https://www.rfc-editor.org/rfc/rfc9000#section-4.6 //# Endpoints MUST NOT exceed the limit set by their peer. // //= https://www.rfc-editor.org/rfc/rfc9000#section-19.11 //# An endpoint MUST NOT open more streams than permitted by the current //# stream limit set by its peer. let poll_open = self.stream_controller .poll_open_local_stream(stream_type, open_token, context); // returns Pending if there is no capacity available ready!(poll_open); self.insert_stream(first_unopened_id); Poll::Ready(Ok(first_unopened_id)) } fn close(&mut self, error: connection::Error, flush: bool) { if self.close_reason.is_some() { return; } self.close_reason = Some(error); self.streams .iterate_streams(&mut self.stream_controller, |stream| { // We have to wake inside the lock, since `StreamEvent`s has no capacity // to carry wakers in another iteration let mut events = StreamEvents::new(); if flush { stream.on_flush(error.into(), &mut events); } else { stream.on_internal_reset(error.into(), &mut events); } events.wake_all(); }); // If the connection gets closed we need to notify tasks which are blocked // on `accept()`. if let Some(waker) = self .accept_state .waker_mut(StreamType::Bidirectional) .take() { waker.wake(); } if let Some(waker) = self .accept_state .waker_mut(StreamType::Unidirectional) .take() { waker.wake(); } self.stream_controller.close(); } fn flush(&mut self, error: connection::Error) -> Poll<()> { self.close(error, true); // if we still have active streams, we're not done flushing if self.streams.nr_active_streams() > 0 { Poll::Pending } else { Poll::Ready(()) } } } /// Manages all active `Stream`s inside a connection. /// `AbstractStreamManager` is parameterized over the `Stream` type. #[derive(Debug)] pub struct AbstractStreamManager { pub(super) inner: StreamManagerState, last_blocked_sync_period: Duration, } // Sending the `AbstractStreamManager` between threads is safe, since we never expose the `Rc`s // outside of the container #[allow(unknown_lints, clippy::non_send_fields_in_send_ty)] unsafe impl Send for AbstractStreamManager {} impl AbstractStreamManager { fn accept_stream_with_type( &mut self, stream_type: StreamType, ) -> Result, connection::Error> { // Check if the Stream exists let next_id_to_accept = self .inner .accept_state .next_stream_id(stream_type) .ok_or_else(connection::Error::stream_id_exhausted)?; if self.inner.streams.contains(next_id_to_accept) { *self.inner.accept_state.next_stream_mut(stream_type) = next_id_to_accept.next_of_type(); Ok(Some(next_id_to_accept)) } else { Ok(None) } } /// Calculates the period for sending STREAMS_BLOCKED, STREAM_DATA_BLOCKED and /// DATA_BLOCKED frames when blocked, according to the idle timeout and latest RTT estimates fn blocked_sync_period(&self, rtt_estimator: &RttEstimator) -> Duration { //= https://www.rfc-editor.org/rfc/rfc9000#section-4.1 //# To keep the //# connection from closing, a sender that is flow control limited SHOULD //# periodically send a STREAM_DATA_BLOCKED or DATA_BLOCKED frame when it //# has no ack-eliciting packets in flight. // STREAMS_BLOCKED, DATA_BLOCKED, and STREAM_DATA_BLOCKED frames are // sent to prevent the connection from closing due to an idle timeout // when we are blocked from opening or sending on streams. We use a pto count // of 1 so the periodic components can track backoff independently. // For extremely low RTT networks, this will ensure we do not send blocked // frames too frequently. const MIN_BLOCKED_SYNC_PERIOD: Duration = Duration::from_millis(5); let pto = rtt_estimator.pto_period(1, PacketNumberSpace::ApplicationData); pto.max(MIN_BLOCKED_SYNC_PERIOD) } /// This method encapsulates all common actions for handling incoming frames /// which target a specific `Stream`. /// It will open unopened Streams, lookup the required `Stream`, /// and then call the provided function on the Stream. /// If this leads to a connection error it will reset all internal connections. fn handle_stream_frame( &mut self, stream_id: StreamId, mut func: F, ) -> Result<(), transport::Error> where F: FnMut(&mut S, &mut StreamEvents) -> Result<(), transport::Error>, { let mut events = StreamEvents::new(); let result = { // If Stream handling causes an error, trigger an internal reset self.inner.reset_streams_on_error(|state| { // Open streams if necessary state.open_stream_if_necessary(stream_id)?; // Apply the provided function on the Stream. // If the Stream does not exist it is no error. state .streams .with_stream(stream_id, &mut state.stream_controller, |stream| { func(stream, &mut events) }) .unwrap_or(Ok(())) }) }; // We wake `Waker`s outside of the Mutex to reduce contention. // TODO: This is now no longer outside the Mutex events.wake_all(); result } /// Executes an application API call on the given Stream if the Stream exists /// and returns the result of the API call. /// /// If the Stream does not exist `unknown_stream_result` will be returned. /// /// If the application call requires transmission of data, the QUIC connection /// thread will be notified through the [`WakeHandle`] in the provided [`ConnectionApiCallContext`]. fn perform_api_call( &mut self, stream_id: StreamId, unknown_stream_result: R, api_call_context: &mut ConnectionApiCallContext, func: F, ) -> R where F: FnOnce(&mut S) -> R, { let had_transmission_interest = self.inner.streams.has_transmission_interest(); let result = self .inner .streams .with_stream(stream_id, &mut self.inner.stream_controller, |stream| { func(stream) }) .unwrap_or(unknown_stream_result); // A wakeup is only triggered if the the transmission list is // now empty, but was previously not. The edge triggered behavior // minimizes the amount of necessary wakeups. let require_wakeup = !had_transmission_interest && self.inner.streams.has_transmission_interest(); // TODO: This currently wakes the connection task while inside the connection Mutex. // It will be better if we return the `Waker` instead and perform the wakeup afterwards. if require_wakeup { api_call_context.wakeup_handle().wakeup(); } result } } impl stream::Manager for AbstractStreamManager { fn new( connection_limits: &connection::Limits, local_endpoint_type: endpoint::Type, initial_local_limits: InitialFlowControlLimits, initial_peer_limits: InitialFlowControlLimits, ) -> Self { // We limit the initial data limit to u32::MAX (4GB), which far // exceeds the reasonable amount of data a connection is // initially allowed to send. // // By representing the flow control value as a u32, we save space // on the connection state. assert!( initial_local_limits.max_data <= VarInt::from_u32(core::u32::MAX), "Receive window must not exceed 32bit range" ); Self { inner: StreamManagerState { incoming_connection_flow_controller: IncomingConnectionFlowController::new( initial_local_limits.max_data, initial_local_limits.max_data.as_u64() as u32, ), outgoing_connection_flow_controller: OutgoingConnectionFlowController::new( initial_peer_limits.max_data, ), stream_controller: stream::Controller::new( local_endpoint_type, initial_peer_limits, initial_local_limits, connection_limits.stream_limits(), ), streams: StreamContainer::new(), next_stream_ids: StreamIdSet::initial(), local_endpoint_type, initial_local_limits, initial_peer_limits, close_reason: None, accept_state: AcceptState::new(local_endpoint_type), stream_limits: connection_limits.stream_limits(), }, last_blocked_sync_period: Duration::ZERO, } } fn incoming_bytes_progressed(&self) -> VarInt { self.inner .incoming_connection_flow_controller .acquired_window() } fn outgoing_bytes_progressed(&self) -> VarInt { self.inner .outgoing_connection_flow_controller .acquired_window() } fn poll_accept( &mut self, stream_type: Option, context: &Context, ) -> Poll, connection::Error>> { macro_rules! with_stream_type { (| $stream_type:ident | $block:stmt) => { if stream_type == None || stream_type == Some(StreamType::Bidirectional) { let $stream_type = StreamType::Bidirectional; $block } if stream_type == None || stream_type == Some(StreamType::Unidirectional) { let $stream_type = StreamType::Unidirectional; $block } }; } // Clear a stored Waker with_stream_type!(|stream_type| *self.inner.accept_state.waker_mut(stream_type) = None); // If the connection was closed we still allow the application to accept // Streams which are already known to the StreamManager. // This is done for 2 reasons: // 1. If the application doesn't interact with the Streams and observes // their close status, they won't get removed from StreamManager due // to missing finalization interest // 2. The streams might already have received all data from the peer at // this point, and for applications it can be helpful to act on this // data. with_stream_type!(|stream_type| if let Some(stream_id) = self.accept_stream_with_type(stream_type)? { return Ok(Some(stream_id)).into(); }); if let Some(close_reason) = self.inner.close_reason { match Error::into_accept_error(close_reason) { Ok(_) => return Ok(None).into(), Err(err) => return Err(err).into(), }; } // Store the `Waker` for notifying the application if we accept a Stream with_stream_type!( |stream_type| *self.inner.accept_state.waker_mut(stream_type) = Some(context.waker().clone()) ); Poll::Pending } fn poll_open_local_stream( &mut self, stream_type: StreamType, open_token: &mut connection::OpenToken, context: &Context, ) -> Poll> { // If StreamManager was closed, return the error if let Some(error) = self.inner.close_reason { return Err(error).into(); } let first_unopened_id = ready!(self .inner .poll_open_local_stream(stream_type, open_token, context))?; // Increase the next utilized Stream ID *self .inner .next_stream_ids .get_mut(self.inner.local_endpoint_type, stream_type) = first_unopened_id.next_of_type(); Ok(first_unopened_id).into() } fn on_packet_ack(&mut self, ack_set: &A) { self.inner .incoming_connection_flow_controller .on_packet_ack(ack_set); self.inner .outgoing_connection_flow_controller .on_packet_ack(ack_set); self.inner.stream_controller.on_packet_ack(ack_set); self.inner.streams.iterate_frame_delivery_list( &mut self.inner.stream_controller, |stream| { // We have to wake inside the lock, since `StreamEvent`s has no capacity // to carry wakers in another iteration let mut events = StreamEvents::new(); stream.on_packet_ack(ack_set, &mut events); events.wake_all(); }, ); } fn on_packet_loss(&mut self, ack_set: &A) { self.inner .incoming_connection_flow_controller .on_packet_loss(ack_set); self.inner .outgoing_connection_flow_controller .on_packet_loss(ack_set); self.inner.stream_controller.on_packet_loss(ack_set); self.inner.streams.iterate_frame_delivery_list( &mut self.inner.stream_controller, |stream| { // We have to wake inside the lock, since `StreamEvent`s has no capacity // to carry wakers in another iteration let mut events = StreamEvents::new(); stream.on_packet_loss(ack_set, &mut events); events.wake_all(); }, ); } fn on_rtt_update(&mut self, rtt_estimator: &RttEstimator) { let blocked_sync_period = self.blocked_sync_period(rtt_estimator); { let last_blocked_sync_period = self.last_blocked_sync_period.as_millis() as u64; let current_blocked_sync_period = blocked_sync_period.as_millis() as u64; /// The number of milliseconds to which the change comparison is configured /// /// Ideally this number is a power of 2 so the computation is efficient const SENSITIVITY_MS: u64 = 16; // If we haven't changed a significant amount, there's no point in updating everything if last_blocked_sync_period / SENSITIVITY_MS == current_blocked_sync_period / SENSITIVITY_MS { return; } } self.last_blocked_sync_period = blocked_sync_period; self.inner .stream_controller .update_blocked_sync_period(blocked_sync_period); self.inner .outgoing_connection_flow_controller .update_blocked_sync_period(blocked_sync_period); self.inner.streams.iterate_stream_flow_credits_list( &mut self.inner.stream_controller, |stream| { stream.update_blocked_sync_period(blocked_sync_period); StreamContainerIterationResult::Continue }, ); } fn on_timeout(&mut self, now: Timestamp) { self.inner.stream_controller.on_timeout(now); self.inner .outgoing_connection_flow_controller .on_timeout(now); self.inner.streams.iterate_stream_flow_credits_list( &mut self.inner.stream_controller, |stream| { stream.on_timeout(now); StreamContainerIterationResult::Continue }, ); } fn close(&mut self, error: connection::Error) { self.inner.close(error, false); } fn close_reason(&self) -> Option { self.inner.close_reason } fn flush(&mut self, error: connection::Error) -> Poll<()> { self.inner.flush(error) } fn on_transmit(&mut self, context: &mut W) -> Result<(), OnTransmitError> { self.inner .incoming_connection_flow_controller .on_transmit(context)?; self.inner .outgoing_connection_flow_controller .on_transmit(context)?; self.inner.stream_controller.on_transmit(context)?; // Due to an error we could not transmit all data. // We add streams which could not send data back into the // waiting_for_transmission list, so that they will be queried again // the next time transmission capacity is available. // We actually add those Streams to the end of the list, // since the earlier entries are from Streams which were not // able to write all the desired data and added themselves as // transmit interested again let mut transmit_result = Ok(()); if context.transmission_constraint().can_retransmit() { // ensure components only retransmit in this phase let mut retransmission_context = transmission::context::RetransmissionContext::new(context); // Prioritize retransmitting lost data self.inner.streams.iterate_retransmission_list( &mut self.inner.stream_controller, |stream: &mut S| { transmit_result = stream.on_transmit(&mut retransmission_context); if transmit_result.is_err() { StreamContainerIterationResult::BreakAndInsertAtBack } else { StreamContainerIterationResult::Continue } }, ); // return if there were any errors transmit_result?; } if context.transmission_constraint().can_transmit() { self.inner.streams.iterate_transmission_list( &mut self.inner.stream_controller, |stream: &mut S| { transmit_result = stream.on_transmit(context); if transmit_result.is_err() { StreamContainerIterationResult::BreakAndInsertAtBack } else { StreamContainerIterationResult::Continue } }, ); } // There is no `finalize_done_streams` here, since we do not expect to // perform an operation which brings us in a finalization state transmit_result } // Frame reception // These functions are called from the packet delivery thread fn on_data(&mut self, frame: &StreamRef) -> Result<(), transport::Error> { let stream_id = StreamId::from_varint(frame.stream_id); self.handle_stream_frame(stream_id, |stream, events| stream.on_data(frame, events)) } fn on_data_blocked(&mut self, _frame: DataBlocked) -> Result<(), transport::Error> { Ok(()) // This is currently ignored } fn on_stream_data_blocked( &mut self, frame: &StreamDataBlocked, ) -> Result<(), transport::Error> { let stream_id = StreamId::from_varint(frame.stream_id); self.handle_stream_frame(stream_id, |stream, events| { stream.on_stream_data_blocked(frame, events) }) } fn on_reset_stream(&mut self, frame: &ResetStream) -> Result<(), transport::Error> { let stream_id = StreamId::from_varint(frame.stream_id); self.handle_stream_frame(stream_id, |stream, events| stream.on_reset(frame, events)) } fn on_max_stream_data(&mut self, frame: &MaxStreamData) -> Result<(), transport::Error> { let stream_id = StreamId::from_varint(frame.stream_id); self.handle_stream_frame(stream_id, |stream, events| { stream.on_max_stream_data(frame, events) }) } fn on_stop_sending(&mut self, frame: &StopSending) -> Result<(), transport::Error> { let stream_id = StreamId::from_varint(frame.stream_id); self.handle_stream_frame(stream_id, |stream, events| { stream.on_stop_sending(frame, events) }) } fn on_max_data(&mut self, frame: MaxData) -> Result<(), transport::Error> { self.inner .outgoing_connection_flow_controller .on_max_data(frame); if self .inner .outgoing_connection_flow_controller .available_window() == VarInt::from_u32(0) { return Ok(()); } // Iterate over streams and allow them to grab credits from the // connection window. As soon as we run out of credits we stop // iterating and insert the remaining streams to the end of the list // again. let conn_flow = &mut self.inner.outgoing_connection_flow_controller; self.inner.streams.iterate_connection_flow_credits_list( &mut self.inner.stream_controller, |stream| { stream.on_connection_window_available(); if conn_flow.available_window() == VarInt::from_u32(0) { StreamContainerIterationResult::BreakAndInsertAtBack } else { StreamContainerIterationResult::Continue } }, ); Ok(()) } fn on_streams_blocked(&mut self, _frame: &StreamsBlocked) -> Result<(), transport::Error> { //= https://www.rfc-editor.org/rfc/rfc9000#section-4.6 //= type=TODO //= tracking-issue=244 //= feature=Stream concurrency Ok(()) // TODO: Implement me } fn on_max_streams(&mut self, frame: &MaxStreams) -> Result<(), transport::Error> { self.inner.stream_controller.on_max_streams(frame); Ok(()) } fn poll_request( &mut self, stream_id: StreamId, api_call_context: &mut ConnectionApiCallContext, request: &mut ops::Request, context: Option<&Context>, ) -> Result { self.perform_api_call( stream_id, Err(StreamError::invalid_stream()), api_call_context, |stream| stream.poll_request(request, context), ) } fn has_pending_streams(&self) -> bool { self.inner.streams.has_pending_streams() } } impl timer::Provider for AbstractStreamManager { #[inline] fn timers(&self, query: &mut Q) -> timer::Result { self.inner.stream_controller.timers(query)?; self.inner .outgoing_connection_flow_controller .timers(query)?; self.inner.streams.timers(query)?; Ok(()) } } impl transmission::interest::Provider for AbstractStreamManager { #[inline] fn transmission_interest( &self, query: &mut Q, ) -> transmission::interest::Result { self.inner.streams.transmission_interest(query)?; self.inner.stream_controller.transmission_interest(query)?; self.inner .incoming_connection_flow_controller .transmission_interest(query)?; self.inner .outgoing_connection_flow_controller .transmission_interest(query)?; Ok(()) } } impl connection::finalization::Provider for AbstractStreamManager { fn finalization_status(&self) -> connection::finalization::Status { if self.inner.close_reason.is_some() && self.inner.streams.nr_active_streams() == 0 { connection::finalization::Status::Final } else if self.inner.close_reason.is_some() && self.inner.streams.nr_active_streams() > 0 { connection::finalization::Status::Draining } else { connection::finalization::Status::Idle } } } // These are methods that StreamManager only exposes for test purposes. // // They might perform additional allocations, and may not be as safe to call // due to being allowed to panic! when invariants are violated. #[cfg(test)] impl AbstractStreamManager { /// Executes the given function using the outgoing flow controller pub fn with_outgoing_connection_flow_controller(&mut self, func: F) -> R where F: FnOnce(&mut OutgoingConnectionFlowController) -> R, { func(&mut self.inner.outgoing_connection_flow_controller) } /// Executes the given function using the stream controller pub fn with_stream_controller(&mut self, func: F) -> R where F: FnOnce(&mut stream::Controller) -> R, { func(&mut self.inner.stream_controller) } /// Asserts that a Stream with the given ID exists, and executes the provided /// function on it pub fn with_asserted_stream(&mut self, stream_id: StreamId, func: F) -> R where F: FnOnce(&mut S) -> R, { self.inner .streams .with_stream(stream_id, &mut self.inner.stream_controller, func) .expect("Stream is open") } /// Returns the list of Stream IDs which is currently tracked by the /// [`StreamManager`]. pub fn active_streams(&mut self) -> Vec { let mut results = Vec::new(); self.inner .streams .iterate_streams(&mut self.inner.stream_controller, |stream| { results.push(stream.stream_id()) }); results } /// Returns the list of Stream IDs for Streams which are waiting for /// connection flow control credits. pub fn streams_waiting_for_connection_flow_control_credits(&mut self) -> Vec { let mut results = Vec::new(); self.inner.streams.iterate_connection_flow_credits_list( &mut self.inner.stream_controller, |stream| { results.push(stream.stream_id()); StreamContainerIterationResult::Continue }, ); results } /// Returns the list of Stream IDs for Streams which are waiting for /// delivery notifications. pub fn streams_waiting_for_delivery_notifications(&mut self) -> Vec { let mut results = Vec::new(); self.inner.streams.iterate_frame_delivery_list( &mut self.inner.stream_controller, |stream| { results.push(stream.stream_id()); }, ); results } /// Returns the list of Stream IDs for Streams which are waiting for /// transmission. pub fn streams_waiting_for_transmission(&mut self) -> Vec { let mut results = Vec::new(); self.inner .streams .iterate_transmission_list(&mut self.inner.stream_controller, |stream| { results.push(stream.stream_id()); StreamContainerIterationResult::Continue }); results } /// Returns the list of Stream IDs for Streams which are waiting for /// retransmission. pub fn streams_waiting_for_retransmission(&mut self) -> Vec { let mut results = Vec::new(); self.inner.streams.iterate_retransmission_list( &mut self.inner.stream_controller, |stream| { results.push(stream.stream_id()); StreamContainerIterationResult::Continue }, ); results } } #[cfg(test)] mod tests;