// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 use crate::{ ack::{ ack_eliciting_transmission::{AckElicitingTransmission, AckElicitingTransmissionSet}, ack_ranges::{AckRanges, AckRangesError}, ack_transmission_state::AckTransmissionState, }, contexts::WriteContext, processed_packet::ProcessedPacket, transmission, }; use s2n_quic_core::{ ack, counter::{Counter, Saturating}, event::{ self, builder::{AckAction, AckProcessed}, IntoEvent as _, }, frame::{ack::EcnCounts, Ack, Ping}, packet::number::{PacketNumber, PacketNumberSpace}, time::{timer, Timer, Timestamp}, varint::VarInt, }; //= https://www.rfc-editor.org/rfc/rfc9000#section-13.2 //# Endpoints acknowledge all packets they receive and process. However, //# only ack-eliciting packets cause an ACK frame to be sent within the //# maximum ack delay. Packets that are not ack-eliciting are only //# acknowledged when an ACK frame is sent for other reasons. //# //# When sending a packet for any reason, an endpoint SHOULD attempt to //# include an ACK frame if one has not been sent recently. Doing so //# helps with timely loss detection at the peer. //# //# In general, frequent feedback from a receiver improves loss and //# congestion response, but this has to be balanced against excessive //# load generated by a receiver that sends an ACK frame in response to //# every ack-eliciting packet. The guidance offered below seeks to //# strike this balance. #[derive(Clone, Debug)] pub struct AckManager { /// Time at which the AckManager will wake and transmit an ACK ack_delay_timer: Timer, /// Used to track the ACK-eliciting transmissions sent from the AckManager ack_eliciting_transmissions: AckElicitingTransmissionSet, /// All of the processed AckRanges that need to be ACKed pub(super) ack_ranges: AckRanges, /// Locally configured AckSettings pub ack_settings: ack::Settings, /// The largest packet number that we've acked - used for pn decoding largest_received_packet_number_acked: PacketNumber, /// The time at which we received the largest pn largest_received_packet_number_at: Option, /// The number of processed packets since transmission processed_packets_since_transmission: Counter, /// The number of transmissions since the last ACK-eliciting packet was sent transmissions_since_elicitation: Counter, /// Used to transition through transmission/retransmission states transmission_state: AckTransmissionState, /// Explicit Congestion Notification counts from processed packets ecn_counts: EcnCounts, } impl AckManager { pub fn new(packet_space: PacketNumberSpace, ack_settings: ack::Settings) -> Self { Self { ack_delay_timer: Timer::default(), ack_eliciting_transmissions: AckElicitingTransmissionSet::default(), ack_settings, ack_ranges: AckRanges::new(ack_settings.ack_ranges_limit as usize), largest_received_packet_number_acked: packet_space .new_packet_number(VarInt::from_u8(0)), largest_received_packet_number_at: None, processed_packets_since_transmission: Counter::new(0), transmissions_since_elicitation: Counter::new(0), transmission_state: AckTransmissionState::default(), ecn_counts: EcnCounts::default(), } } /// Called when an outgoing packet is being assembled pub fn on_transmit(&mut self, context: &mut W) -> bool { let constraint = context.transmission_constraint(); let mode = context.transmission_mode(); let has_ranges = !self.ack_ranges.is_empty(); let should_transmit = self .transmission_state .should_transmit(constraint, mode, has_ranges); if !should_transmit { return false; } let ack_delay = self.ack_delay(context.current_time()); //= https://www.rfc-editor.org/rfc/rfc9000#section-13.4.1 //# Even if an endpoint does not set an ECT field on packets it sends, //# the endpoint MUST provide feedback about ECN markings it receives, if //# these are accessible. context .write_ack_frame(&Ack { ack_delay, ack_ranges: &self.ack_ranges, ecn_counts: self.ecn_counts.as_option(), }) .is_some() } /// Called after an outgoing packet is assembled and `on_transmit` returned `true` pub fn on_transmit_complete(&mut self, context: &mut W) { debug_assert!( self.transmission_state.should_transmit( context.transmission_constraint(), context.transmission_mode(), !self.ack_ranges.is_empty() ), "`on_transmit_complete` was called when `should_transmit` is false" ); // if we transmitted something no need to wake up again to transmit the same thing self.ack_delay_timer.cancel(); let mut is_ack_eliciting = context.ack_elicitation().is_ack_eliciting(); if !is_ack_eliciting { // check the timer and make sure we can still write a Ping frame before removing it // We send a ping even when constrained to retransmissions only, as a fast // retransmission that is not ack eliciting will not help us recover faster. if (context.transmission_constraint().can_transmit() || context.transmission_constraint().can_retransmit()) && self.transmissions_since_elicitation >= self.ack_settings.ack_elicitation_interval && context.write_frame(&Ping).is_some() { is_ack_eliciting = true; } else { self.transmissions_since_elicitation += 1; } } self.largest_received_packet_number_acked = self .ack_ranges .max_value() .expect("transmission_state should be Disabled while ack_ranges is empty"); if is_ack_eliciting { // reset the counter self.transmissions_since_elicitation = Counter::new(0); //= https://www.rfc-editor.org/rfc/rfc9000#section-13.2.4 //# When a packet containing an ACK frame is sent, the Largest //# Acknowledged field in that frame can be saved. self.ack_eliciting_transmissions .on_transmit(AckElicitingTransmission { sent_in_packet: context.packet_number(), largest_received_packet_number_acked: self.largest_received_packet_number_acked, }); } // record a transmission self.transmission_state .on_transmit(!self.ack_ranges.is_empty()); // reset the number of packets since transmission self.processed_packets_since_transmission = Counter::new(0); } /// Called when a set of packets was acknowledged pub fn on_packet_ack(&mut self, _timestamp: Timestamp, ack_set: &A) { if let Some(ack_range) = self.ack_eliciting_transmissions.on_update(ack_set) { self.ack_ranges .remove(ack_range) .expect("The range should always shrink the interval length"); // `self.transmission_state` will be automatically notified in `on_processed_packet` // so wait for that instead } } /// Called when a set of packets was reported lost pub fn on_packet_loss(&mut self, ack_set: &A) { if self .ack_eliciting_transmissions .on_update(ack_set) .is_some() { // transition to active mode when packet is lost self.transmission_state.on_update(&self.ack_ranges); self.transmission_state.activate(); } } /// Called after an RX packet has been processed pub fn on_processed_packet( &mut self, processed_packet: &ProcessedPacket, path: event::builder::Path, publisher: &mut Pub, ) { let packet_number = processed_packet.packet_number; let now = processed_packet.datagram.timestamp; // perform some checks before inserting into the ack_ranges let (is_ordered, is_largest) = self .ack_ranges .max_value() .and_then(|max_value| { // check to see if the packet number is the next one in the sequence let is_ordered = packet_number == max_value.next()?; // check to see if the packet number is the largest we've seen let is_largest = packet_number > max_value; Some((is_ordered, is_largest)) }) .unwrap_or((true, true)); // This will fail if `packet_number` is less than `ack_ranges.min_value()` // and `ack_ranges` is at capacity. // // Most likely, this packet is very old and the contents have already // been retransmitted by the peer. if let Err(err) = self.ack_ranges.insert_packet_number(packet_number) { match err { AckRangesError::RangeInsertionFailed { min, max } | AckRangesError::LowestRangeDropped { min, max } => { let start = self .ack_ranges .min_value() .expect("should be non empty") .into_event(); let end = self .ack_ranges .max_value() .expect("should be non empty") .into_event(); publisher.on_rx_ack_range_dropped(event::builder::RxAckRangeDropped { path, packet_number_range: min.into_event()..=max.into_event(), capacity: self.ack_ranges.interval_len(), stored_range: start..=end, }); publisher.on_ack_processed(AckProcessed { action: AckAction::RxAckRangeDropped { packet_number_range: min.into_event()..=max.into_event(), capacity: self.ack_ranges.interval_len(), stored_range: start..=end, }, path, }); } }; } //= https://www.rfc-editor.org/rfc/rfc9000#section-13.4.1 //# ECN counts are only incremented when QUIC packets from the received //# IP packet are processed. As such, duplicate QUIC packets are not //# processed and do not increase ECN counts; see Section 21.10 for //# relevant security concerns. self.ecn_counts.increment(processed_packet.datagram.ecn); // Notify the state that the ack_ranges have changed self.transmission_state.on_update(&self.ack_ranges); self.processed_packets_since_transmission += 1; //= https://www.rfc-editor.org/rfc/rfc9000#section-13.2.5 //# An endpoint measures the delays intentionally introduced between the //# time the packet with the largest packet number is received and the //# time an acknowledgment is sent. The endpoint encodes this //# acknowledgement delay in the ACK Delay field of an ACK frame; see //# Section 19.3. This allows the receiver of the ACK frame to adjust //# for any intentional delays, which is important for getting a better //# estimate of the path RTT when acknowledgments are delayed. if is_largest { self.largest_received_packet_number_at = Some(now); } if processed_packet.is_ack_eliciting() { let mut should_activate = false; //= https://www.rfc-editor.org/rfc/rfc9000#section-13.2.1 //# In order to assist loss detection at the sender, an endpoint SHOULD //# generate and send an ACK frame without delay when it receives an ack- //# eliciting packet either: //# //# * when the received packet has a packet number less than another //# ack-eliciting packet that has been received, or should_activate |= !is_largest; //= https://www.rfc-editor.org/rfc/rfc9000#section-13.2.1 //# * when the packet has a packet number larger than the highest- //# numbered ack-eliciting packet that has been received and there are //# missing packets between that packet and this packet. should_activate |= !is_ordered; //= https://www.rfc-editor.org/rfc/rfc9000#section-13.2.1 //# Similarly, packets marked with the ECN Congestion Experienced (CE) //# codepoint in the IP header SHOULD be acknowledged immediately, to //# reduce the peer's response time to congestion events. should_activate |= processed_packet.datagram.ecn.congestion_experienced(); // TODO update to draft link after published // https://github.com/quicwg/base-drafts/pull/3623 // An ACK frame SHOULD be generated for at least every 10th ack-eliciting packet // TODO support delayed ack proposal // https://tools.ietf.org/html/draft-iyengar-quic-delayed-ack-00 let packet_tolerance = 10; should_activate |= self.processed_packets_since_transmission >= packet_tolerance; //= https://www.rfc-editor.org/rfc/rfc9000#section-9.3.3 //# An endpoint that receives a PATH_CHALLENGE on an active path SHOULD //# send a non-probing packet in response. should_activate |= processed_packet.path_challenge_on_active_path; if should_activate { self.transmission_state.activate(); } else if !self.ack_delay_timer.is_armed() { //= https://www.rfc-editor.org/rfc/rfc9000#section-13.2 //# Endpoints acknowledge all packets they receive and process. However, //# only ack-eliciting packets cause an ACK frame to be sent within the //# maximum ack delay. Packets that are not ack-eliciting are only //# acknowledged when an ACK frame is sent for other reasons. self.ack_delay_timer .set(now + self.ack_settings.max_ack_delay) } } // To save on timer churn, check to see if we've already expired since the // last time we sent an ACK frame if self.ack_delay_timer.poll_expiration(now).is_ready() { self.transmission_state.activate(); } } /// Called when the connection timer expired pub fn on_timeout(&mut self, timestamp: Timestamp) { // NOTE: ack_elicitation_timer is not actively polled if self.ack_delay_timer.poll_expiration(timestamp).is_ready() { // transition to active transmission when we exceed the ack_delay self.transmission_state.activate(); } } /// Returns the largest received packet number that has been ACKed at least once pub fn largest_received_packet_number_acked(&self) -> PacketNumber { self.largest_received_packet_number_acked } /// Computes the ack_delay field for the current state fn ack_delay(&self, now: Timestamp) -> VarInt { let ack_delay = self .largest_received_packet_number_at .map(|prev| now.saturating_duration_since(prev)) .unwrap_or_default(); self.ack_settings.encode_ack_delay(ack_delay) } } impl timer::Provider for AckManager { #[inline] fn timers(&self, query: &mut Q) -> timer::Result { // NOTE: ack_elicitation_timer is not actively polled self.ack_delay_timer.timers(query)?; Ok(()) } } impl transmission::interest::Provider for AckManager { #[inline] fn transmission_interest( &self, query: &mut Q, ) -> transmission::interest::Result { self.transmission_state.transmission_interest(query) } } #[cfg(test)] mod tests { use super::{super::tests::*, *}; use crate::{ contexts::testing::{MockWriteContext, OutgoingFrameBuffer}, path::{path_event, testing::helper_path_server}, }; use core::{ iter::{empty, once}, time::Duration, }; use insta::assert_debug_snapshot; use s2n_quic_core::{ ack, connection, endpoint, event::testing::Publisher, frame::{ack_elicitation::AckElicitation, ping, Frame}, inet::{DatagramInfo, ExplicitCongestionNotification}, path, time::{clock::testing as time, Clock, NoopClock}, }; #[test] fn activate() { // Setup: let mut manager = AckManager::new(PacketNumberSpace::ApplicationData, ack::Settings::default()); let pn = PacketNumberSpace::ApplicationData.new_packet_number(VarInt::from_u8(1)); let datagram = DatagramInfo { ecn: Default::default(), payload_len: 1200, timestamp: NoopClock {}.get_time(), destination_connection_id: connection::LocalId::TEST_ID, source_connection_id: None, }; let mut processed_packet = ProcessedPacket::new(pn, &datagram); processed_packet.path_challenge_on_active_path = true; processed_packet.ack_elicitation = AckElicitation::Eliciting; assert!(!manager.transmission_state.is_active()); // Trigger: let path = helper_path_server(); let path_id = path::Id::test_id(); manager.on_processed_packet( &processed_packet, path_event!(path, path_id), &mut Publisher::snapshot(), ); // Expectation: assert!(manager.transmission_state.is_active()); } #[test] fn ecn_counts() { // Setup: let mut manager = AckManager::new(PacketNumberSpace::ApplicationData, ack::Settings::default()); assert_eq!(0, manager.ecn_counts.ect_0_count.as_u64()); assert_eq!(0, manager.ecn_counts.ect_1_count.as_u64()); assert_eq!(0, manager.ecn_counts.ce_count.as_u64()); // Process a packet in an Ect0 datagram let pn = PacketNumberSpace::ApplicationData.new_packet_number(VarInt::from_u8(1)); let datagram = helper_datagram_info(ExplicitCongestionNotification::Ect0); let path = helper_path_server(); let path_id = path::Id::test_id(); let mut publisher = Publisher::snapshot(); manager.on_processed_packet( &ProcessedPacket::new(pn, &datagram), path_event!(path, path_id), &mut publisher, ); assert_eq!(1, manager.ecn_counts.ect_0_count.as_u64()); assert_eq!(0, manager.ecn_counts.ect_1_count.as_u64()); assert_eq!(0, manager.ecn_counts.ce_count.as_u64()); // Process a couple packets in an Ect1 datagram let pn1 = PacketNumberSpace::ApplicationData.new_packet_number(VarInt::from_u8(2)); let pn2 = PacketNumberSpace::ApplicationData.new_packet_number(VarInt::from_u8(3)); let datagram = helper_datagram_info(ExplicitCongestionNotification::Ect1); manager.on_processed_packet( &ProcessedPacket::new(pn1, &datagram), path_event!(path, path_id), &mut publisher, ); manager.on_processed_packet( &ProcessedPacket::new(pn2, &datagram), path_event!(path, path_id), &mut publisher, ); assert_eq!(1, manager.ecn_counts.ect_0_count.as_u64()); assert_eq!(2, manager.ecn_counts.ect_1_count.as_u64()); assert_eq!(0, manager.ecn_counts.ce_count.as_u64()); // Process a packet in an Ce datagram let pn = PacketNumberSpace::ApplicationData.new_packet_number(VarInt::from_u8(4)); let datagram = helper_datagram_info(ExplicitCongestionNotification::Ce); manager.on_processed_packet( &ProcessedPacket::new(pn, &datagram), path_event!(path, path_id), &mut publisher, ); assert_eq!(1, manager.ecn_counts.ect_0_count.as_u64()); assert_eq!(2, manager.ecn_counts.ect_1_count.as_u64()); assert_eq!(1, manager.ecn_counts.ce_count.as_u64()); // Process a packet in a NotEct datagram let pn = PacketNumberSpace::ApplicationData.new_packet_number(VarInt::from_u8(5)); let datagram = helper_datagram_info(ExplicitCongestionNotification::NotEct); manager.on_processed_packet( &ProcessedPacket::new(pn, &datagram), path_event!(path, path_id), &mut publisher, ); assert_eq!(1, manager.ecn_counts.ect_0_count.as_u64()); assert_eq!(2, manager.ecn_counts.ect_1_count.as_u64()); assert_eq!(1, manager.ecn_counts.ce_count.as_u64()); } /// Helper function to construct `DatagramInfo` with the given `ExplicitCongestionNotification` fn helper_datagram_info(ecn: ExplicitCongestionNotification) -> DatagramInfo { DatagramInfo { ecn, payload_len: 1200, timestamp: NoopClock {}.get_time(), destination_connection_id: connection::LocalId::TEST_ID, source_connection_id: None, } } #[test] fn on_transmit_complete_transmission_constrained() { let mut manager = AckManager::new(PacketNumberSpace::ApplicationData, ack::Settings::default()); let mut frame_buffer = OutgoingFrameBuffer::new(); let mut write_context = MockWriteContext::new( time::now(), &mut frame_buffer, transmission::Constraint::None, transmission::Mode::Normal, endpoint::Type::Server, ); manager.ack_ranges = AckRanges::default(); assert!(manager .ack_ranges .insert_packet_number( PacketNumberSpace::ApplicationData.new_packet_number(VarInt::from_u8(1)), ) .is_ok()); manager.transmission_state = AckTransmissionState::Active { retransmissions: 0 }; manager.transmissions_since_elicitation = Counter::new(ack::Settings::EARLY.ack_elicitation_interval); manager.on_transmit_complete(&mut write_context); assert_eq!( write_context .frame_buffer .pop_front() .expect("Frame is written") .as_frame(), Frame::Ping(ping::Ping), "Ping should be written when transmission is not constrained" ); manager.transmission_state = AckTransmissionState::Active { retransmissions: 0 }; manager.transmissions_since_elicitation = Counter::new(ack::Settings::EARLY.ack_elicitation_interval); write_context.frame_buffer.clear(); write_context.transmission_constraint = transmission::Constraint::CongestionLimited; manager.on_transmit_complete(&mut write_context); assert!( write_context.frame_buffer.is_empty(), "Ping should not be written when CongestionLimited" ); manager.transmission_state = AckTransmissionState::Active { retransmissions: 0 }; manager.transmissions_since_elicitation = Counter::new(ack::Settings::EARLY.ack_elicitation_interval); write_context.frame_buffer.clear(); write_context.transmission_constraint = transmission::Constraint::RetransmissionOnly; manager.on_transmit_complete(&mut write_context); assert_eq!( write_context .frame_buffer .pop_front() .expect("Frame is written") .as_frame(), Frame::Ping(ping::Ping), "Ping should be written when transmission is retransmission only" ); } #[test] fn on_transmit_complete_many_transmissions_since_elicitation() { let mut manager = AckManager::new(PacketNumberSpace::ApplicationData, ack::Settings::default()); let mut frame_buffer = OutgoingFrameBuffer::new(); let mut write_context = MockWriteContext::new( time::now(), &mut frame_buffer, transmission::Constraint::None, transmission::Mode::Normal, endpoint::Type::Server, ); write_context.transmission_constraint = transmission::Constraint::CongestionLimited; manager.ack_ranges = AckRanges::default(); assert!(manager .ack_ranges .insert_packet_number( PacketNumberSpace::ApplicationData.new_packet_number(VarInt::from_u8(1)), ) .is_ok()); manager.transmission_state = AckTransmissionState::Active { retransmissions: 0 }; manager.transmissions_since_elicitation = Counter::new(u8::max_value()); manager.on_transmit_complete(&mut write_context); assert_eq!( manager.transmissions_since_elicitation, Counter::new(u8::max_value()) ); } #[test] #[cfg(target_pointer_width = "64")] fn size_of_snapshots() { use core::mem::size_of; use insta::assert_debug_snapshot; assert_debug_snapshot!("AckManager", size_of::()); } #[test] fn client_sending_test() { assert_debug_snapshot!( "client_sending_test", Simulation { network: Network { client: Application::new( Endpoint::new(ack::Settings { max_ack_delay: Duration::from_millis(25), ack_delay_exponent: 1, ..Default::default() }), [Duration::from_millis(5)].iter().cycle().take(100).cloned(), ) .into(), server: Application::new( Endpoint::new(ack::Settings { max_ack_delay: Duration::from_millis(25), ack_delay_exponent: 1, ..Default::default() }), empty(), ) .into(), }, // pass all packets unchanged events: empty().collect(), delay: Duration::from_millis(0), } .run() ); } #[test] fn delayed_client_sending_test() { assert_debug_snapshot!( "delayed_client_sending_test", Simulation { network: Network { client: Application::new( Endpoint::new(ack::Settings { max_ack_delay: Duration::from_millis(25), ack_delay_exponent: 1, ..Default::default() }), [Duration::from_millis(5)].iter().cycle().take(100).cloned(), ) .into(), server: Application::new( Endpoint::new(ack::Settings { max_ack_delay: Duration::from_millis(25), ack_delay_exponent: 1, ..Default::default() }), empty(), ) .into(), }, // pass all packets unchanged events: empty().collect(), // delay sending each packet by 100ms delay: Duration::from_millis(100), } .run() ); } #[test] fn high_latency_test() { assert_debug_snapshot!( "high_latency_test", Simulation { network: Network { client: Application::new( Endpoint::new(ack::Settings { max_ack_delay: Duration::from_millis(25), ack_delay_exponent: 1, ..Default::default() }), [Duration::from_millis(5)].iter().cycle().take(100).cloned(), ) .into(), server: Application::new( Endpoint::new(ack::Settings { max_ack_delay: Duration::from_millis(100), ack_delay_exponent: 1, ..Default::default() }), [Duration::from_millis(5)].iter().cycle().take(100).cloned(), ) .into(), }, // pass all packets unchanged events: empty().collect(), // delay sending each packet by 1s delay: Duration::from_millis(1000), } .run() ); } #[test] fn lossy_network_test() { assert_debug_snapshot!( "lossy_network_test", Simulation { network: Network { client: Application::new( Endpoint::new(ack::Settings { max_ack_delay: Duration::from_millis(25), ack_delay_exponent: 1, ..Default::default() }), [Duration::from_millis(5)].iter().cycle().take(100).cloned(), ) .into(), server: Application::new( Endpoint::new(ack::Settings { max_ack_delay: Duration::from_millis(100), ack_delay_exponent: 1, ..Default::default() }), [Duration::from_millis(5)].iter().cycle().take(100).cloned(), ) .into(), }, // drop every 5th packet events: once(NetworkEvent::Pass) .cycle() .take(4) .chain(once(NetworkEvent::Drop)) .collect(), // delay sending each packet by 100ms delay: Duration::from_millis(0), } .run() ); } }