// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 use crate::{ ack::AckManager, connection::{self, ConnectionTransmissionContext, ProcessingError}, endpoint, path, path::{path_event, Path}, processed_packet::ProcessedPacket, recovery, space::{datagram, keep_alive::KeepAlive, HandshakeStatus, PacketSpace, TxPacketNumbers}, stream::Manager as _, sync::flag, transmission, transmission::interest::Provider, }; use core::{convert::TryInto, fmt, marker::PhantomData}; use once_cell::sync::OnceCell; use s2n_codec::EncoderBuffer; use s2n_quic_core::{ crypto::{application::KeySet, limited, tls, CryptoSuite}, event::{self, ConnectionPublisher as _, IntoEvent}, frame::{ ack::AckRanges, crypto::CryptoRef, datagram::DatagramRef, stream::StreamRef, Ack, ConnectionClose, DataBlocked, HandshakeDone, MaxData, MaxStreamData, MaxStreams, NewConnectionId, NewToken, PathChallenge, PathResponse, ResetStream, RetireConnectionId, StopSending, StreamDataBlocked, StreamsBlocked, }, inet::DatagramInfo, packet::{ encoding::{PacketEncoder, PacketEncodingError}, number::{PacketNumber, PacketNumberRange, PacketNumberSpace, SlidingWindow}, short::{CleartextShort, ProtectedShort, Short, SpinBit}, }, path::MaxMtu, time::{timer, Timestamp}, transport, }; pub struct ApplicationSpace<Config: endpoint::Config> { /// Transmission Packet numbers pub tx_packet_numbers: TxPacketNumbers, /// Ack manager pub ack_manager: AckManager, /// All streams that are managed through this connection pub stream_manager: Config::StreamManager, /// The current state of the Spin bit /// TODO: Spin me pub spin_bit: SpinBit, /// The crypto suite for application data /// TODO: What about ZeroRtt? //= https://www.rfc-editor.org/rfc/rfc9001#section-6.3 //# For this reason, endpoints MUST be able to retain two sets of packet //# protection keys for receiving packets: the current and the next. //= https://www.rfc-editor.org/rfc/rfc9001#section-6.1 //# An endpoint MUST NOT initiate a key update prior to having confirmed //# the handshake (Section 4.1.2). key_set: KeySet<<<Config::TLSEndpoint as tls::Endpoint>::Session as CryptoSuite>::OneRttKey>, header_key: <<Config::TLSEndpoint as tls::Endpoint>::Session as CryptoSuite>::OneRttHeaderKey, ping: flag::Ping, keep_alive: KeepAlive, processed_packet_numbers: SlidingWindow, recovery_manager: recovery::Manager<Config>, pub datagram_manager: datagram::Manager<Config>, } impl<Config: endpoint::Config> fmt::Debug for ApplicationSpace<Config> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("ApplicationSpace") .field("ack_manager", &self.ack_manager) .field("ping", &self.ping) .field("processed_packet_numbers", &self.processed_packet_numbers) .field("recovery_manager", &self.recovery_manager) .field("stream_manager", &self.stream_manager) .field("tx_packet_numbers", &self.tx_packet_numbers) .finish() } } impl<Config: endpoint::Config> ApplicationSpace<Config> { #[allow(clippy::too_many_arguments)] pub fn new( key: <<Config::TLSEndpoint as tls::Endpoint>::Session as CryptoSuite>::OneRttKey, header_key: <<Config::TLSEndpoint as tls::Endpoint>::Session as CryptoSuite>::OneRttHeaderKey, now: Timestamp, stream_manager: Config::StreamManager, ack_manager: AckManager, keep_alive: KeepAlive, max_mtu: MaxMtu, datagram_manager: datagram::Manager<Config>, ) -> Self { let key_set = KeySet::new(key, Self::key_limits(max_mtu)); Self { tx_packet_numbers: TxPacketNumbers::new(PacketNumberSpace::ApplicationData, now), ack_manager, spin_bit: SpinBit::Zero, stream_manager, key_set, header_key, ping: flag::Ping::default(), keep_alive, processed_packet_numbers: SlidingWindow::default(), recovery_manager: recovery::Manager::new(PacketNumberSpace::ApplicationData), datagram_manager, } } /// Returns true if the packet number has already been processed pub fn is_duplicate<Pub: event::ConnectionPublisher>( &self, packet_number: PacketNumber, path_id: path::Id, path: &path::Path<Config>, publisher: &mut Pub, ) -> bool { let packet_check = self.processed_packet_numbers.check(packet_number); if let Err(error) = packet_check { publisher.on_duplicate_packet(event::builder::DuplicatePacket { packet_header: event::builder::PacketHeader::new( packet_number, publisher.quic_version(), ), path: path_event!(path, path_id), error: error.into_event(), }); } match packet_check { Ok(()) => false, Err(_) => true, } } pub fn on_transmit<'a>( &mut self, context: &mut ConnectionTransmissionContext<Config>, transmission_constraint: transmission::Constraint, handshake_status: &mut HandshakeStatus, buffer: EncoderBuffer<'a>, ) -> Result<(transmission::Outcome, EncoderBuffer<'a>), PacketEncodingError<'a>> { let mut packet_number = self.tx_packet_numbers.next(); if self.recovery_manager.requires_probe() { //= https://www.rfc-editor.org/rfc/rfc9002#section-6.2.4 //# If the sender wants to elicit a faster acknowledgement on PTO, it can //# skip a packet number to eliminate the acknowledgment delay. // TODO Does this interact negatively with persistent congestion detection, which // relies on consecutive packet numbers? packet_number = packet_number.next().unwrap(); } let packet_number_encoder = self.packet_number_encoder(); let mut outcome = transmission::Outcome::default(); let destination_connection_id = context.path().peer_connection_id; let timestamp = context.timestamp; let transmission_mode = context.transmission_mode; let min_packet_len = context.min_packet_len; let bytes_progressed = self.stream_manager.outgoing_bytes_progressed(); let payload = transmission::Transmission { config: PhantomData::<Config>, outcome: &mut outcome, packet_number, payload: transmission::application::Payload::<Config>::new( context.path_id, context.path_manager, context.local_id_registry, context.transmission_mode, &mut self.ack_manager, handshake_status, &mut self.ping, &mut self.stream_manager, &mut self.recovery_manager, &mut self.datagram_manager, ), timestamp, transmission_constraint, transmission_mode, tx_packet_numbers: &mut self.tx_packet_numbers, path_id: context.path_id, publisher: context.publisher, packet_interceptor: context.packet_interceptor, }; let spin_bit = self.spin_bit; let header_key = &self.header_key; let (_protected_packet, buffer) = self.key_set .encrypt_packet(buffer, |buffer, key, key_phase| { let packet = Short { spin_bit, key_phase, destination_connection_id, packet_number, payload, }; packet.encode_packet( key, header_key, packet_number_encoder, min_packet_len, buffer, ) })?; outcome.bytes_progressed += (self.stream_manager.outgoing_bytes_progressed() - bytes_progressed).as_u64() as usize; let app_limited = self.is_app_limited(context.path(), outcome.bytes_sent); let (recovery_manager, mut recovery_context) = self.recovery( handshake_status, context.local_id_registry, context.path_id, context.path_manager, ); recovery_manager.on_packet_sent( packet_number, outcome, context.timestamp, context.ecn, context.transmission_mode, Some(app_limited), &mut recovery_context, context.publisher, ); // reset the keep alive timer after sending an ack-eliciting packet if outcome.ack_elicitation.is_ack_eliciting() { self.keep_alive.reset(timestamp); } context .publisher .on_packet_sent(event::builder::PacketSent { packet_header: event::builder::PacketHeader::new( packet_number, context.publisher.quic_version(), ), packet_len: outcome.bytes_sent, }); Ok((outcome, buffer)) } pub(super) fn on_transmit_burst_complete( &mut self, active_path: &Path<Config>, timestamp: Timestamp, is_handshake_confirmed: bool, ) { self.recovery_manager.on_transmit_burst_complete( active_path, timestamp, is_handshake_confirmed, ); } pub(super) fn on_transmit_close<'a>( &mut self, context: &mut ConnectionTransmissionContext<Config>, connection_close: &ConnectionClose, buffer: EncoderBuffer<'a>, ) -> Result<(transmission::Outcome, EncoderBuffer<'a>), PacketEncodingError<'a>> { let packet_number = self.tx_packet_numbers.next(); let packet_number_encoder = self.packet_number_encoder(); let mut outcome = transmission::Outcome::default(); let destination_connection_id = context.path().peer_connection_id; let payload = transmission::Transmission { config: PhantomData::<Config>, outcome: &mut outcome, packet_number, payload: transmission::connection_close::Payload { connection_close, packet_number_space: PacketNumberSpace::ApplicationData, }, timestamp: context.timestamp, transmission_constraint: transmission::Constraint::None, transmission_mode: transmission::Mode::Normal, tx_packet_numbers: &mut self.tx_packet_numbers, path_id: context.path_id, publisher: context.publisher, packet_interceptor: context.packet_interceptor, }; let spin_bit = self.spin_bit; let min_packet_len = context.min_packet_len; let header_key = &self.header_key; let (_protected_packet, buffer) = self.key_set .encrypt_packet(buffer, |buffer, key, key_phase| { let packet = Short { spin_bit, key_phase, destination_connection_id, packet_number, payload, }; packet.encode_packet( key, header_key, packet_number_encoder, min_packet_len, buffer, ) })?; context .publisher .on_packet_sent(event::builder::PacketSent { packet_header: event::builder::PacketHeader::new( packet_number, context.publisher.quic_version(), ), packet_len: outcome.bytes_sent, }); Ok((outcome, buffer)) } /// Signals the connection was previously blocked by anti-amplification limits /// but is now no longer limited. pub fn on_amplification_unblocked( &mut self, path: &Path<Config>, timestamp: Timestamp, is_handshake_confirmed: bool, ) { debug_assert!( Config::ENDPOINT_TYPE.is_server(), "Clients are never in an anti-amplification state" ); //= https://www.rfc-editor.org/rfc/rfc9002#appendix-A.6 //# When a server is blocked by anti-amplification limits, receiving a //# datagram unblocks it, even if none of the packets in the datagram are //# successfully processed. In such a case, the PTO timer will need to //# be re-armed. self.recovery_manager .update_pto_timer(path, timestamp, is_handshake_confirmed); } /// Signals the handshake is confirmed pub fn on_handshake_confirmed( &mut self, path: &Path<Config>, local_id_registry: &mut connection::LocalIdRegistry, timestamp: Timestamp, ) { // Retire the local connection ID used during the handshake to reduce linkability local_id_registry.retire_handshake_connection_id(); //= https://www.rfc-editor.org/rfc/rfc9002#section-6.2.1 //# A sender SHOULD restart its PTO timer every time an ack-eliciting //# packet is sent or acknowledged, or when Initial or Handshake keys are //# discarded (Section 4.9 of [QUIC-TLS]). //= https://www.rfc-editor.org/rfc/rfc9002#section-6.2.1 //# An endpoint MUST NOT set its PTO timer for the Application Data //# packet number space until the handshake is confirmed. // Since we maintain a separate PTO timer for each packet space, we don't have to update // it when the Initial or Handshake keys are discarded. However, we do need to update the // PTO timer when the handshake is confirmed, as the Application space PTO timer is not // started until the handshake is confirmed. self.recovery_manager .update_pto_timer(path, timestamp, true) } /// Called when the connection timer expired pub fn on_timeout<Pub: event::ConnectionPublisher>( &mut self, path_manager: &mut path::Manager<Config>, handshake_status: &mut HandshakeStatus, local_id_registry: &mut connection::LocalIdRegistry, random_generator: &mut Config::RandomGenerator, timestamp: Timestamp, publisher: &mut Pub, ) { self.ack_manager.on_timeout(timestamp); self.key_set.on_timeout(timestamp); let (recovery_manager, mut context) = self.recovery( handshake_status, local_id_registry, path_manager.active_path_id(), path_manager, ); recovery_manager.on_timeout(timestamp, random_generator, &mut context, publisher); self.stream_manager.on_timeout(timestamp); if self.keep_alive.on_timeout(timestamp).is_ready() { publisher.on_keep_alive_timer_expired(event::builder::KeepAliveTimerExpired { timeout: self.keep_alive.period(), }); // send a ping after timing out self.ping(); } } /// Returns `true` if the recovery manager for this packet space requires a probe /// packet to be sent. pub fn requires_probe(&self) -> bool { self.recovery_manager.requires_probe() } pub fn ping(&mut self) { self.ping.send() } pub fn keep_alive(&mut self, enabled: bool) { self.keep_alive.update(enabled); } /// Returns the Packet Number to be used when encoding outgoing packets fn packet_number_encoder(&self) -> PacketNumber { self.tx_packet_numbers.largest_sent_packet_number_acked() } fn recovery<'a>( &'a mut self, handshake_status: &'a mut HandshakeStatus, local_id_registry: &'a mut connection::LocalIdRegistry, path_id: path::Id, path_manager: &'a mut path::Manager<Config>, ) -> ( &'a mut recovery::Manager<Config>, RecoveryContext<'a, Config>, ) { ( &mut self.recovery_manager, RecoveryContext { ack_manager: &mut self.ack_manager, handshake_status, ping: &mut self.ping, stream_manager: &mut self.stream_manager, local_id_registry, path_id, path_manager, tx_packet_numbers: &mut self.tx_packet_numbers, }, ) } /// Returns `true` if sending is limited by the application and not the congestion controller /// /// Sending is app limited if the application is not fully utilizing the available /// congestion window currently and there is no more application data remaining to send. fn is_app_limited(&self, path: &Path<Config>, bytes_sent: usize) -> bool { !path.is_congestion_limited(bytes_sent) && !self.has_transmission_interest() } /// Validate packets in the Application packet space pub fn validate_and_decrypt_packet<'a, Pub: event::ConnectionPublisher>( &mut self, protected: ProtectedShort<'a>, datagram: &DatagramInfo, path_id: path::Id, path: &path::Path<Config>, publisher: &mut Pub, ) -> Result<CleartextShort<'a>, ProcessingError> { let largest_acked = self.ack_manager.largest_received_packet_number_acked(); let packet = protected .unprotect(&self.header_key, largest_acked) .map_err(|err| { publisher.on_packet_dropped(event::builder::PacketDropped { reason: event::builder::PacketDropReason::UnprotectFailed { space: event::builder::KeySpace::OneRtt, path: path_event!(path, path_id), }, }); err })?; let packet_number = packet.packet_number; let packet_header = event::builder::PacketHeader::new(packet.packet_number, publisher.quic_version()); let decrypted = self.key_set.decrypt_packet( packet, largest_acked, //= https://www.rfc-editor.org/rfc/rfc9001#section-6.3 //# For a short period after a key //# update completes, up to the PTO, endpoints MAY defer generation of //# the next set of receive packet protection keys. This allows //# endpoints to retain only two sets of receive keys; see Section 6.5. //= https://www.rfc-editor.org/rfc/rfc9001#section-6.5 //# An endpoint MAY allow a period of approximately the Probe Timeout //# (PTO; see [QUIC-RECOVERY]) after promoting the next set of receive //# keys to be current before it creates the subsequent set of packet //# protection keys. datagram.timestamp + path .rtt_estimator .pto_period(1, PacketNumberSpace::ApplicationData), ); match decrypted { Ok((_, Some(generation))) => { publisher.on_key_update(event::builder::KeyUpdate { key_type: event::builder::KeyType::OneRtt { generation }, cipher_suite: self.key_set.cipher_suite().into_event(), }); } Ok(_) => {} Err(_) => { publisher.on_packet_dropped(event::builder::PacketDropped { reason: event::builder::PacketDropReason::DecryptionFailed { packet_header, path: path_event!(path, path_id), }, }); } } //= https://www.rfc-editor.org/rfc/rfc9001#section-9.5 //# For authentication to be //# free from side channels, the entire process of header protection //# removal, packet number recovery, and packet protection removal MUST //# be applied together without timing and other side channels. // We perform decryption prior to checking for duplicate to avoid short-circuiting // and maintain constant-time operation. if self.is_duplicate(packet_number, path_id, path, publisher) { return Err(ProcessingError::DuplicatePacket); } if decrypted.is_ok() { // reset the keep alive timer after receiving a packet self.keep_alive.reset(datagram.timestamp); } decrypted.map(|x| x.0) } fn key_limits(max_mtu: MaxMtu) -> limited::Limits { let mut limits = limited::Limits::default(); limits.max_mtu = max_mtu; // AEAD optimizations are currently in the testing phase so make them opt-in at runtime limits.sealer_optimization_threshold = { static THRESHOLD: OnceCell<u64> = OnceCell::new(); *THRESHOLD.get_or_init(|| { std::env::var("S2N_UNSTABLE_CRYPTO_OPT_TX") .ok() .and_then(|v| v.parse().ok()) .unwrap_or(u64::MAX) }) }; limits.opener_optimization_threshold = { static THRESHOLD: OnceCell<u64> = OnceCell::new(); *THRESHOLD.get_or_init(|| { std::env::var("S2N_UNSTABLE_CRYPTO_OPT_RX") .ok() .and_then(|v| v.parse().ok()) .unwrap_or(u64::MAX) }) }; limits } } impl<Config: endpoint::Config> timer::Provider for ApplicationSpace<Config> { #[inline] fn timers<Q: timer::Query>(&self, query: &mut Q) -> timer::Result { self.ack_manager.timers(query)?; self.recovery_manager.timers(query)?; self.key_set.timers(query)?; self.stream_manager.timers(query)?; self.keep_alive.timers(query)?; Ok(()) } } impl<Config: endpoint::Config> transmission::interest::Provider for ApplicationSpace<Config> { #[inline] fn transmission_interest<Q: transmission::interest::Query>( &self, query: &mut Q, ) -> transmission::interest::Result { self.ack_manager.transmission_interest(query)?; self.ping.transmission_interest(query)?; self.recovery_manager.transmission_interest(query)?; self.stream_manager.transmission_interest(query)?; self.datagram_manager.transmission_interest(query)?; Ok(()) } } impl<Config: endpoint::Config> connection::finalization::Provider for ApplicationSpace<Config> { fn finalization_status(&self) -> connection::finalization::Status { self.stream_manager.finalization_status() } } struct RecoveryContext<'a, Config: endpoint::Config> { ack_manager: &'a mut AckManager, handshake_status: &'a mut HandshakeStatus, ping: &'a mut flag::Ping, stream_manager: &'a mut Config::StreamManager, local_id_registry: &'a mut connection::LocalIdRegistry, path_id: path::Id, path_manager: &'a mut path::Manager<Config>, tx_packet_numbers: &'a mut TxPacketNumbers, } impl<'a, Config: endpoint::Config> recovery::Context<Config> for RecoveryContext<'a, Config> { const ENDPOINT_TYPE: endpoint::Type = Config::ENDPOINT_TYPE; fn is_handshake_confirmed(&self) -> bool { self.handshake_status.is_confirmed() } fn path(&self) -> &Path<Config> { &self.path_manager[self.path_id] } fn path_mut(&mut self) -> &mut Path<Config> { &mut self.path_manager[self.path_id] } fn path_by_id(&self, path_id: path::Id) -> &path::Path<Config> { &self.path_manager[path_id] } fn path_mut_by_id(&mut self, path_id: path::Id) -> &mut path::Path<Config> { &mut self.path_manager[path_id] } fn path_id(&self) -> path::Id { self.path_id } fn validate_packet_ack( &mut self, timestamp: Timestamp, packet_number_range: &PacketNumberRange, ) -> Result<(), transport::Error> { self.tx_packet_numbers .on_packet_ack(timestamp, packet_number_range) } fn on_new_packet_ack<Pub: event::ConnectionPublisher>( &mut self, packet_number_range: &PacketNumberRange, publisher: &mut Pub, ) { self.handshake_status .on_packet_ack(packet_number_range, publisher); self.ping.on_packet_ack(packet_number_range); self.stream_manager.on_packet_ack(packet_number_range); self.local_id_registry.on_packet_ack(packet_number_range); self.path_manager.on_packet_ack(packet_number_range); } fn on_packet_ack(&mut self, timestamp: Timestamp, packet_number_range: &PacketNumberRange) { self.ack_manager .on_packet_ack(timestamp, packet_number_range); } fn on_packet_loss<Pub: event::ConnectionPublisher>( &mut self, packet_number_range: &PacketNumberRange, publisher: &mut Pub, ) { self.ack_manager.on_packet_loss(packet_number_range); self.handshake_status .on_packet_loss(packet_number_range, publisher); self.ping.on_packet_loss(packet_number_range); self.stream_manager.on_packet_loss(packet_number_range); self.local_id_registry.on_packet_loss(packet_number_range); self.path_manager.on_packet_loss(packet_number_range); } fn on_rtt_update(&mut self) { // Update the stream manager if this RTT update was for the active path if self.path_manager.active_path_id() == self.path_id { self.stream_manager .on_rtt_update(&self.path_manager.active_path().rtt_estimator) } } } impl<Config: endpoint::Config> PacketSpace<Config> for ApplicationSpace<Config> { const INVALID_FRAME_ERROR: &'static str = "invalid frame in application space"; fn handle_crypto_frame<Pub: event::ConnectionPublisher>( &mut self, _frame: CryptoRef, _datagram: &DatagramInfo, _path: &mut Path<Config>, _publisher: &mut Pub, ) -> Result<(), transport::Error> { //= https://www.rfc-editor.org/rfc/rfc9000#section-7.5 //# Once the handshake completes, if an endpoint is unable to buffer all //# data in a CRYPTO frame, it MAY discard that CRYPTO frame and all //# CRYPTO frames received in the future, or it MAY close the connection //# with a CRYPTO_BUFFER_EXCEEDED error code. // we currently just discard CRYPTO frames post-handshake Ok(()) } fn handle_ack_frame<A: AckRanges, Pub: event::ConnectionPublisher>( &mut self, frame: Ack<A>, timestamp: Timestamp, path_id: path::Id, path_manager: &mut path::Manager<Config>, packet_number: PacketNumber, handshake_status: &mut HandshakeStatus, local_id_registry: &mut connection::LocalIdRegistry, random_generator: &mut Config::RandomGenerator, publisher: &mut Pub, ) -> Result<(), transport::Error> { let path = &mut path_manager[path_id]; path.on_peer_validated(); let (recovery_manager, mut context) = self.recovery(handshake_status, local_id_registry, path_id, path_manager); recovery_manager.on_ack_frame( timestamp, frame, packet_number, random_generator, &mut context, publisher, ) } fn handle_connection_close_frame( &mut self, _frame: ConnectionClose, _timestamp: Timestamp, _path: &mut Path<Config>, ) -> Result<(), transport::Error> { Ok(()) } fn handle_stream_frame( &mut self, frame: StreamRef, packet: &mut ProcessedPacket, ) -> Result<(), transport::Error> { let bytes_progressed = self.stream_manager.incoming_bytes_progressed(); self.stream_manager.on_data(&frame)?; packet.bytes_progressed += (self.stream_manager.incoming_bytes_progressed() - bytes_progressed).as_u64() as usize; Ok(()) } fn handle_datagram_frame( &mut self, path: s2n_quic_core::event::api::Path<'_>, frame: DatagramRef, ) -> Result<(), transport::Error> { self.datagram_manager.on_datagram_frame(path, frame); Ok(()) } fn handle_data_blocked_frame(&mut self, frame: DataBlocked) -> Result<(), transport::Error> { self.stream_manager.on_data_blocked(frame) } fn handle_max_data_frame(&mut self, frame: MaxData) -> Result<(), transport::Error> { self.stream_manager.on_max_data(frame) } fn handle_max_stream_data_frame( &mut self, frame: MaxStreamData, ) -> Result<(), transport::Error> { self.stream_manager.on_max_stream_data(&frame) } fn handle_max_streams_frame(&mut self, frame: MaxStreams) -> Result<(), transport::Error> { self.stream_manager.on_max_streams(&frame) } fn handle_reset_stream_frame(&mut self, frame: ResetStream) -> Result<(), transport::Error> { self.stream_manager.on_reset_stream(&frame) } fn handle_stop_sending_frame(&mut self, frame: StopSending) -> Result<(), transport::Error> { self.stream_manager.on_stop_sending(&frame) } fn handle_stream_data_blocked_frame( &mut self, frame: StreamDataBlocked, ) -> Result<(), transport::Error> { self.stream_manager.on_stream_data_blocked(&frame) } fn handle_streams_blocked_frame( &mut self, frame: StreamsBlocked, ) -> Result<(), transport::Error> { self.stream_manager.on_streams_blocked(&frame) } fn handle_new_token_frame(&mut self, frame: NewToken) -> Result<(), transport::Error> { //= https://www.rfc-editor.org/rfc/rfc9000#section-19.7 //# A server MUST treat receipt //# of a NEW_TOKEN frame as a connection error of type //# PROTOCOL_VIOLATION. if Config::ENDPOINT_TYPE.is_server() { return Err(transport::Error::PROTOCOL_VIOLATION .with_reason(Self::INVALID_FRAME_ERROR) .with_frame_type(frame.tag().into())); } // TODO add support for NEW_TOKEN_FRAMEs on the client Ok(()) } fn handle_new_connection_id_frame<Pub: event::ConnectionPublisher>( &mut self, frame: NewConnectionId, _datagram: &DatagramInfo, path_manager: &mut path::Manager<Config>, publisher: &mut Pub, ) -> Result<(), transport::Error> { if path_manager.active_path().peer_connection_id.is_empty() { //= https://www.rfc-editor.org/rfc/rfc9000#section-19.15 //# An endpoint that is sending packets with a zero-length Destination //# Connection ID MUST treat receipt of a NEW_CONNECTION_ID frame as a //# connection error of type PROTOCOL_VIOLATION. return Err(transport::Error::PROTOCOL_VIOLATION); } let peer_id = connection::PeerId::try_from_bytes(frame.connection_id) .expect("Length is validated when decoding the frame"); let sequence_number = frame .sequence_number .as_u64() .try_into() .map_err(|_err| transport::Error::PROTOCOL_VIOLATION)?; let retire_prior_to = frame .retire_prior_to .as_u64() .try_into() .map_err(|_err| transport::Error::PROTOCOL_VIOLATION)?; let stateless_reset_token = (*frame.stateless_reset_token).into(); path_manager.on_new_connection_id( &peer_id, sequence_number, retire_prior_to, &stateless_reset_token, publisher, ) } fn handle_retire_connection_id_frame( &mut self, frame: RetireConnectionId, datagram: &DatagramInfo, path: &mut Path<Config>, local_id_registry: &mut connection::LocalIdRegistry, ) -> Result<(), transport::Error> { let sequence_number = frame .sequence_number .as_u64() .try_into() .map_err(|_err| transport::Error::PROTOCOL_VIOLATION)?; //= https://www.rfc-editor.org/rfc/rfc9000#section-19.16 //# The sequence number specified in a RETIRE_CONNECTION_ID frame MUST //# NOT refer to the Destination Connection ID field of the packet in //# which the frame is contained. //= https://www.rfc-editor.org/rfc/rfc9000#section-19.16 //# The peer MAY treat this as a //# connection error of type PROTOCOL_VIOLATION. //= https://www.rfc-editor.org/rfc/rfc9000#section-19.16 //# Receipt of a RETIRE_CONNECTION_ID frame containing a sequence number //# greater than any previously sent to the peer MUST be treated as a //# connection error of type PROTOCOL_VIOLATION. local_id_registry .on_retire_connection_id( sequence_number, &datagram.destination_connection_id, path.rtt_estimator.smoothed_rtt(), datagram.timestamp, ) .map_err(|err| transport::Error::PROTOCOL_VIOLATION.with_reason(err.message())) } fn handle_path_challenge_frame( &mut self, frame: PathChallenge, path_id: path::Id, path_manager: &mut path::Manager<Config>, ) -> Result<(), transport::Error> { path_manager.on_path_challenge(path_id, &frame); Ok(()) } fn handle_path_response_frame<Pub: event::ConnectionPublisher>( &mut self, frame: PathResponse, path_manager: &mut path::Manager<Config>, publisher: &mut Pub, ) -> Result<(), transport::Error> { path_manager.on_path_response(&frame, publisher); Ok(()) } fn handle_handshake_done_frame<Pub: event::ConnectionPublisher>( &mut self, frame: HandshakeDone, timestamp: Timestamp, path: &mut Path<Config>, local_id_registry: &mut connection::LocalIdRegistry, handshake_status: &mut HandshakeStatus, publisher: &mut Pub, ) -> Result<(), transport::Error> { //= https://www.rfc-editor.org/rfc/rfc9000#section-19.20 //# A server MUST //# treat receipt of a HANDSHAKE_DONE frame as a connection error of type //# PROTOCOL_VIOLATION. if Config::ENDPOINT_TYPE.is_server() { return Err(transport::Error::PROTOCOL_VIOLATION .with_reason("Clients MUST NOT send HANDSHAKE_DONE frames") .with_frame_type(frame.tag().into())); } handshake_status.on_handshake_done_received(publisher); //= https://www.rfc-editor.org/rfc/rfc9001#section-4.1.2 //# At the //# client, the handshake is considered confirmed when a HANDSHAKE_DONE //# frame is received. self.on_handshake_confirmed(path, local_id_registry, timestamp); Ok(()) } fn on_processed_packet<Pub: event::ConnectionPublisher>( &mut self, processed_packet: ProcessedPacket, path_id: path::Id, path: &Path<Config>, publisher: &mut Pub, ) -> Result<(), transport::Error> { self.ack_manager.on_processed_packet( &processed_packet, path_event!(path, path_id), publisher, ); self.processed_packet_numbers .insert(processed_packet.packet_number) .expect("packet number was already checked"); Ok(()) } }