// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 use crate::{ ack::AckManager, connection::{self, ConnectionTransmissionContext, ProcessingError}, endpoint, path, path::{path_event, Path}, processed_packet::ProcessedPacket, recovery, space::{CryptoStream, HandshakeStatus, PacketSpace, TxPacketNumbers}, transmission, }; use core::{fmt, marker::PhantomData}; use s2n_codec::EncoderBuffer; use s2n_quic_core::{ connection::PeerId, crypto::{tls, CryptoSuite, InitialKey}, event::{self, ConnectionPublisher as _, IntoEvent}, frame::{ack::AckRanges, crypto::CryptoRef, Ack, ConnectionClose}, inet::DatagramInfo, packet::{ encoding::{PacketEncoder, PacketEncodingError}, initial::{CleartextInitial, Initial, ProtectedInitial}, number::{PacketNumber, PacketNumberRange, PacketNumberSpace, SlidingWindow}, }, time::{timer, Timestamp}, transport, }; use smallvec::SmallVec; pub struct InitialSpace { pub ack_manager: AckManager, //= https://www.rfc-editor.org/rfc/rfc9001#section-4 //# If QUIC needs to retransmit that data, it MUST use //# the same keys even if TLS has already updated to newer keys. pub key: <::Session as CryptoSuite>::InitialKey, pub header_key: <::Session as CryptoSuite>::InitialHeaderKey, //= https://www.rfc-editor.org/rfc/rfc9001#section-4.9 //# If packets from a lower encryption level contain //# CRYPTO frames, frames that retransmit that data MUST be sent at the //# same encryption level. pub crypto_stream: CryptoStream, pub tx_packet_numbers: TxPacketNumbers, pub received_hello_message: bool, //= https://www.rfc-editor.org/rfc/rfc9000#section-17.2.5.3 //# Subsequent Initial packets from the client include the connection ID //# and token values from the Retry packet. retry_token: Vec, processed_packet_numbers: SlidingWindow, recovery_manager: recovery::Manager, } impl fmt::Debug for InitialSpace { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("InitialSpace") .field("ack_manager", &self.ack_manager) .field("tx_packet_numbers", &self.tx_packet_numbers) .field("processed_packet_numbers", &self.processed_packet_numbers) .field("recovery_manager", &self.recovery_manager) .finish() } } impl InitialSpace { pub fn new( key: <::Session as CryptoSuite>::InitialKey, header_key: <::Session as CryptoSuite>::InitialHeaderKey, now: Timestamp, ack_manager: AckManager, ) -> Self { Self { ack_manager, key, header_key, crypto_stream: CryptoStream::new(), tx_packet_numbers: TxPacketNumbers::new(PacketNumberSpace::Initial, now), received_hello_message: false, retry_token: Vec::new(), processed_packet_numbers: SlidingWindow::default(), recovery_manager: recovery::Manager::new(PacketNumberSpace::Initial), } } /// This method gets called when a Retry packet is processed. /// /// Reset the TLS stack and recover state when the first Retry packet is processed. /// Also regenerate the Initial keys based on the new retry_source_connection_id. pub fn on_retry_packet( &mut self, path: &mut path::Path, path_id: path::Id, retry_source_connection_id: &PeerId, retry_token: &[u8], publisher: &mut Pub, ) { debug_assert!(Config::ENDPOINT_TYPE.is_client()); self.retry_token = retry_token.to_vec(); //= https://www.rfc-editor.org/rfc/rfc9000#section-17.2.5.2 //# Changing the Destination Connection ID field also results in //# a change to the keys used to protect the Initial packet. let (initial_key, initial_header_key) = <::Session as CryptoSuite>::InitialKey::new_client( retry_source_connection_id.as_bytes(), ); self.key = initial_key; self.header_key = initial_header_key; //= https://www.rfc-editor.org/rfc/rfc9000#section-17.2.5.3 //# Other than updating the Destination Connection ID and Token fields, //# the Initial packet sent by the client is subject to the same //# restrictions as the first Initial packet. A client MUST use the same //# cryptographic handshake message it included in this packet. self.crypto_stream.on_retry_packet(); // Reset the recovery state; discarding any previous Initial packets that // might have been sent/lost. self.recovery_manager .on_retry_packet(path, path_id, publisher); } /// Returns true if the packet number has already been processed pub fn is_duplicate( &self, packet_number: PacketNumber, path_id: path::Id, path: &path::Path, publisher: &mut Pub, ) -> bool { let packet_check = self.processed_packet_numbers.check(packet_number); if let Err(error) = packet_check { publisher.on_duplicate_packet(event::builder::DuplicatePacket { packet_header: event::builder::PacketHeader::new( packet_number, publisher.quic_version(), ), path: path_event!(path, path_id), error: error.into_event(), }); } match packet_check { Ok(()) => false, Err(_) => true, } } pub fn on_transmit<'a>( &mut self, context: &mut ConnectionTransmissionContext, transmission_constraint: transmission::Constraint, handshake_status: &HandshakeStatus, buffer: EncoderBuffer<'a>, ) -> Result<(transmission::Outcome, EncoderBuffer<'a>), PacketEncodingError<'a>> { let mut packet_number = self.tx_packet_numbers.next(); if self.recovery_manager.requires_probe() { //= https://www.rfc-editor.org/rfc/rfc9002#section-6.2.4 //# If the sender wants to elicit a faster acknowledgement on PTO, it can //# skip a packet number to eliminate the acknowledgment delay. // TODO Does this interact negatively with persistent congestion detection, which // relies on consecutive packet numbers? packet_number = packet_number.next().unwrap(); } let packet_number_encoder = self.packet_number_encoder(); let mut outcome = transmission::Outcome::default(); let destination_connection_id = context.path().peer_connection_id; let payload = transmission::Transmission { config: PhantomData::, outcome: &mut outcome, packet_number, payload: transmission::early::Payload { ack_manager: &mut self.ack_manager, crypto_stream: &mut self.crypto_stream, packet_number_space: PacketNumberSpace::Initial, recovery_manager: &mut self.recovery_manager, }, timestamp: context.timestamp, transmission_constraint, transmission_mode: context.transmission_mode, tx_packet_numbers: &mut self.tx_packet_numbers, path_id: context.path_id, publisher: context.publisher, packet_interceptor: context.packet_interceptor, }; let packet = Initial { version: context.quic_version, destination_connection_id, source_connection_id: context.path_manager[context.path_id].local_connection_id, token: self.retry_token.as_slice(), packet_number, payload, }; let (_protected_packet, buffer) = packet.encode_packet( &self.key, &self.header_key, packet_number_encoder, context.min_packet_len, buffer, )?; let time_sent = context.timestamp; let path_id = context.path_id; let (recovery_manager, mut recovery_context) = self.recovery(handshake_status, path_id, context.path_manager); recovery_manager.on_packet_sent( packet_number, outcome, time_sent, context.ecn, context.transmission_mode, None, &mut recovery_context, context.publisher, ); context .publisher .on_packet_sent(event::builder::PacketSent { packet_header: event::builder::PacketHeader::new( packet_number, context.publisher.quic_version(), ), packet_len: outcome.bytes_sent, }); Ok((outcome, buffer)) } pub(super) fn on_transmit_burst_complete( &mut self, active_path: &Path, timestamp: Timestamp, is_handshake_confirmed: bool, ) { self.recovery_manager.on_transmit_burst_complete( active_path, timestamp, is_handshake_confirmed, ); } pub(super) fn on_transmit_close<'a>( &mut self, context: &mut ConnectionTransmissionContext, connection_close: &ConnectionClose, buffer: EncoderBuffer<'a>, ) -> Result<(transmission::Outcome, EncoderBuffer<'a>), PacketEncodingError<'a>> { let packet_number = self.tx_packet_numbers.next(); let packet_number_encoder = self.packet_number_encoder(); let mut outcome = transmission::Outcome::default(); let destination_connection_id = context.path().peer_connection_id; let payload = transmission::Transmission { config: PhantomData::, outcome: &mut outcome, packet_number, payload: transmission::connection_close::Payload { connection_close, packet_number_space: PacketNumberSpace::Initial, }, timestamp: context.timestamp, transmission_constraint: transmission::Constraint::None, transmission_mode: transmission::Mode::Normal, tx_packet_numbers: &mut self.tx_packet_numbers, path_id: context.path_id, publisher: context.publisher, packet_interceptor: context.packet_interceptor, }; let packet = Initial { version: context.quic_version, destination_connection_id, source_connection_id: context.path_manager[context.path_id].local_connection_id, token: &[0u8; 0][..], packet_number, payload, }; let (_protected_packet, buffer) = packet.encode_packet( &self.key, &self.header_key, packet_number_encoder, context.min_packet_len, buffer, )?; context .publisher .on_packet_sent(event::builder::PacketSent { packet_header: event::builder::PacketHeader::new( packet_number, context.publisher.quic_version(), ), packet_len: outcome.bytes_sent, }); Ok((outcome, buffer)) } /// Signals the connection was previously blocked by anti-amplification limits /// but is now no longer limited. pub fn on_amplification_unblocked( &mut self, path: &Path, timestamp: Timestamp, is_handshake_confirmed: bool, ) { debug_assert!( Config::ENDPOINT_TYPE.is_server(), "Clients are never in an anti-amplification state" ); //= https://www.rfc-editor.org/rfc/rfc9002#appendix-A.6 //# When a server is blocked by anti-amplification limits, receiving a //# datagram unblocks it, even if none of the packets in the datagram are //# successfully processed. In such a case, the PTO timer will need to //# be re-armed. self.recovery_manager .update_pto_timer(path, timestamp, is_handshake_confirmed); } /// Called when the connection timer expired pub fn on_timeout( &mut self, handshake_status: &HandshakeStatus, path_id: path::Id, path_manager: &mut path::Manager, random_generator: &mut Config::RandomGenerator, timestamp: Timestamp, publisher: &mut Pub, ) { self.ack_manager.on_timeout(timestamp); let (recovery_manager, mut context) = self.recovery(handshake_status, path_id, path_manager); recovery_manager.on_timeout(timestamp, random_generator, &mut context, publisher); } /// Called before the Initial packet space is discarded pub fn on_discard( &mut self, path: &mut Path, path_id: path::Id, publisher: &mut Pub, ) { publisher.on_key_space_discarded(event::builder::KeySpaceDiscarded { space: event::builder::KeySpace::Initial, }); self.recovery_manager .on_packet_number_space_discarded(path, path_id, publisher); } pub fn requires_probe(&self) -> bool { self.recovery_manager.requires_probe() } /// Returns the Packet Number to be used when decoding incoming packets pub fn packet_number_decoder(&self) -> PacketNumber { self.ack_manager.largest_received_packet_number_acked() } /// Returns the Packet Number to be used when encoding outgoing packets fn packet_number_encoder(&self) -> PacketNumber { self.tx_packet_numbers.largest_sent_packet_number_acked() } fn recovery<'a>( &'a mut self, handshake_status: &'a HandshakeStatus, path_id: path::Id, path_manager: &'a mut path::Manager, ) -> ( &'a mut recovery::Manager, RecoveryContext<'a, Config>, ) { ( &mut self.recovery_manager, RecoveryContext { ack_manager: &mut self.ack_manager, crypto_stream: &mut self.crypto_stream, tx_packet_numbers: &mut self.tx_packet_numbers, handshake_status, config: PhantomData, path_id, path_manager, }, ) } /// Validate packets in the Initial packet space pub fn validate_and_decrypt_packet<'a, Pub: event::ConnectionPublisher>( &self, protected: ProtectedInitial<'a>, path_id: path::Id, path: &path::Path, publisher: &mut Pub, ) -> Result, ProcessingError> { let packet_number_decoder = self.packet_number_decoder(); let packet = protected .unprotect(&self.header_key, packet_number_decoder) .map_err(|err| { publisher.on_packet_dropped(event::builder::PacketDropped { reason: event::builder::PacketDropReason::UnprotectFailed { space: event::builder::KeySpace::Initial, path: path_event!(path, path_id), }, }); err })?; if self.is_duplicate(packet.packet_number, path_id, path, publisher) { return Err(ProcessingError::DuplicatePacket); } let packet_header = event::builder::PacketHeader::new(packet.packet_number, publisher.quic_version()); let decrypted = packet.decrypt(&self.key).map_err(|err| { publisher.on_packet_dropped(event::builder::PacketDropped { reason: event::builder::PacketDropReason::DecryptionFailed { packet_header, path: path_event!(path, path_id), }, }); err })?; if Config::ENDPOINT_TYPE.is_client() && !decrypted.token.is_empty() { //= https://www.rfc-editor.org/rfc/rfc9000#section-17.2.2 //# Initial packets sent by the server MUST set the Token Length field //# to 0; clients that receive an Initial packet with a non-zero Token //# Length field MUST either discard the packet or generate a //# connection error of type PROTOCOL_VIOLATION. publisher.on_packet_dropped(event::builder::PacketDropped { reason: event::builder::PacketDropReason::NonEmptyRetryToken { path: path_event!(path, path_id), }, }); return Err(ProcessingError::NonEmptyRetryToken); } Ok(decrypted) } #[inline] fn parse_client_hello( &mut self, publisher: &mut Pub, ) -> Result<(), transport::Error> { debug_assert!(Config::ENDPOINT_TYPE.is_server()); if let Some(payload) = self.parse_hello(tls::HandshakeType::ClientHello)? { publisher.on_tls_client_hello(event::builder::TlsClientHello { payload: &payload }); } Ok(()) } #[inline] fn parse_server_hello( &mut self, publisher: &mut Pub, ) -> Result<(), transport::Error> { debug_assert!(Config::ENDPOINT_TYPE.is_client()); if let Some(payload) = self.parse_hello(tls::HandshakeType::ServerHello)? { publisher.on_tls_server_hello(event::builder::TlsServerHello { payload: &payload }); } Ok(()) } #[inline] fn parse_hello( &mut self, msg_type: tls::HandshakeType, ) -> Result>, transport::Error> { debug_assert!( !self.received_hello_message, "should only be called before the hello is parsed" ); debug_assert_eq!( self.crypto_stream.rx.consumed_len(), 0, "should not consume any crypto data" ); let chunks = self.crypto_stream.rx.iter(); let total_received_len = self.crypto_stream.rx.total_received_len(); let mut chunks = chunks.peekable(); let empty_chunk = &[][..]; let header_chunk = chunks.peek().unwrap_or(&empty_chunk); // TODO make this configurable: // https://github.com/aws/s2n-quic/issues/1001 const MAX_HELLO_SIZE: u64 = 2 << 16; let outcome = <::Session as tls::Session>::parse_hello( msg_type, header_chunk, total_received_len, MAX_HELLO_SIZE, )?; if let Some(offsets) = outcome { // record that we've received the hello message self.received_hello_message = true; let payload = offsets.trim_chunks(chunks).collect(); Ok(Some(payload)) } else { // we are still waiting on more data Ok(None) } } } impl timer::Provider for InitialSpace { #[inline] fn timers(&self, query: &mut Q) -> timer::Result { self.ack_manager.timers(query)?; self.recovery_manager.timers(query)?; Ok(()) } } impl transmission::interest::Provider for InitialSpace { #[inline] fn transmission_interest( &self, query: &mut Q, ) -> transmission::interest::Result { self.ack_manager.transmission_interest(query)?; self.crypto_stream.transmission_interest(query)?; self.recovery_manager.transmission_interest(query)?; Ok(()) } } impl connection::finalization::Provider for InitialSpace { fn finalization_status(&self) -> connection::finalization::Status { // there's nothing in here that hold up finalizing a connection connection::finalization::Status::Idle } } struct RecoveryContext<'a, Config: endpoint::Config> { ack_manager: &'a mut AckManager, crypto_stream: &'a mut CryptoStream, tx_packet_numbers: &'a mut TxPacketNumbers, handshake_status: &'a HandshakeStatus, config: PhantomData, path_id: path::Id, path_manager: &'a mut path::Manager, } impl<'a, Config: endpoint::Config> recovery::Context for RecoveryContext<'a, Config> { const ENDPOINT_TYPE: endpoint::Type = Config::ENDPOINT_TYPE; fn is_handshake_confirmed(&self) -> bool { self.handshake_status.is_confirmed() } fn path(&self) -> &Path { &self.path_manager[self.path_id] } fn path_mut(&mut self) -> &mut Path { &mut self.path_manager[self.path_id] } fn path_by_id(&self, path_id: path::Id) -> &path::Path { &self.path_manager[path_id] } fn path_mut_by_id(&mut self, path_id: path::Id) -> &mut path::Path { &mut self.path_manager[path_id] } fn path_id(&self) -> path::Id { self.path_id } fn validate_packet_ack( &mut self, timestamp: Timestamp, packet_number_range: &PacketNumberRange, ) -> Result<(), transport::Error> { self.tx_packet_numbers .on_packet_ack(timestamp, packet_number_range) } fn on_new_packet_ack( &mut self, packet_number_range: &PacketNumberRange, _publisher: &mut Pub, ) { self.crypto_stream.on_packet_ack(packet_number_range); } fn on_packet_ack(&mut self, timestamp: Timestamp, packet_number_range: &PacketNumberRange) { self.ack_manager .on_packet_ack(timestamp, packet_number_range); } fn on_packet_loss( &mut self, packet_number_range: &PacketNumberRange, _publisher: &mut Pub, ) { self.crypto_stream.on_packet_loss(packet_number_range); self.ack_manager.on_packet_loss(packet_number_range); } fn on_rtt_update(&mut self) {} } //= https://www.rfc-editor.org/rfc/rfc9000#section-17.2.2 //# The payload of an Initial packet includes a CRYPTO frame (or frames) //# containing a cryptographic handshake message, ACK frames, or both. //# PING, PADDING, and CONNECTION_CLOSE frames of type 0x1c are also //# permitted. An endpoint that receives an Initial packet containing //# other frames can either discard the packet as spurious or treat it as //# a connection error. impl PacketSpace for InitialSpace { const INVALID_FRAME_ERROR: &'static str = "invalid frame in initial space"; fn handle_crypto_frame( &mut self, frame: CryptoRef, _datagram: &DatagramInfo, _path: &mut Path, publisher: &mut Pub, ) -> Result<(), transport::Error> { self.crypto_stream.on_crypto_frame(frame)?; // try to parse out the hello message if we haven't yet if !self.received_hello_message { match Config::ENDPOINT_TYPE { endpoint::Type::Server => self.parse_client_hello(publisher)?, endpoint::Type::Client => self.parse_server_hello(publisher)?, } } Ok(()) } fn handle_ack_frame( &mut self, frame: Ack, timestamp: Timestamp, path_id: path::Id, path_manager: &mut path::Manager, packet_number: PacketNumber, handshake_status: &mut HandshakeStatus, _local_id_registry: &mut connection::LocalIdRegistry, random_generator: &mut Config::RandomGenerator, publisher: &mut Pub, ) -> Result<(), transport::Error> { let (recovery_manager, mut context) = self.recovery(handshake_status, path_id, path_manager); recovery_manager.on_ack_frame( timestamp, frame, packet_number, random_generator, &mut context, publisher, ) } fn handle_connection_close_frame( &mut self, frame: ConnectionClose, _timestamp: Timestamp, _path: &mut Path, ) -> Result<(), transport::Error> { //= https://www.rfc-editor.org/rfc/rfc9000#section-17.2.2 //# CONNECTION_CLOSE frames of type 0x1c are also //# permitted. if frame.tag() != 0x1c { return Err(transport::Error::PROTOCOL_VIOLATION); } Ok(()) } fn on_processed_packet( &mut self, processed_packet: ProcessedPacket, path_id: path::Id, path: &Path, publisher: &mut Pub, ) -> Result<(), transport::Error> { self.ack_manager.on_processed_packet( &processed_packet, path_event!(path, path_id), publisher, ); self.processed_packet_numbers .insert(processed_packet.packet_number) .expect("packet number was already checked"); Ok(()) } }