// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 //! Synchronizes a value of type `T` exactly once towards the remote peer. use crate::{ contexts::{OnTransmitError, WriteContext}, sync::{DeliveryState, InFlightDelivery, InflightPacketInfo, ValueToFrameWriter}, transmission, }; use core::task::Poll; use s2n_quic_core::{ack, stream::StreamId}; /// Synchronizes a value of type `T` exactly once towards the remote peer. /// /// Retransmissions of the value will be performed if it got lost. However it is /// not possible to replace the value with a newer value. /// /// `S` is of type [`ValueToFrameWriter`] and used to serialize the value /// into an outgoing frame. #[derive(Debug)] pub struct OnceSync<T, S> { delivery: DeliveryState<T>, writer: S, } impl<T, S: Default> Default for OnceSync<T, S> { fn default() -> Self { Self { delivery: DeliveryState::NotRequested, writer: S::default(), } } } impl<T: Copy + Clone + Eq + PartialEq, S: ValueToFrameWriter<T>> OnceSync<T, S> { /// Creates a new OnceSync pub fn new() -> Self { Self::default() } /// Returns `true` if the delivery is current in progress. /// A packet has been sent, but no acknowledgement has been retrieved so far. #[inline] pub fn is_inflight(&self) -> bool { self.delivery.is_inflight() } /// Returns `true` if the synchronization has been cancelled #[inline] pub fn is_cancelled(&self) -> bool { self.delivery.is_cancelled() } /// Requested delivery of the given value. pub fn request_delivery(&mut self, value: T) { if let DeliveryState::NotRequested = self.delivery { self.delivery = DeliveryState::Requested(value); } } /// Forces transmission of the given value. pub fn force_delivery(&mut self, value: T) { self.delivery = DeliveryState::Requested(value); } /// Stop to synchronize the value to the peer #[inline] pub fn stop_sync(&mut self) { self.delivery.cancel(); } /// This method gets called when a packet delivery got acknowledged pub fn on_packet_ack<A: ack::Set>(&mut self, ack_set: &A) -> Poll<()> { // If the packet containing the frame gets acknowledged, mark the delivery as // succeeded. if let DeliveryState::InFlight(in_flight) = self.delivery { if ack_set.contains(in_flight.packet.packet_nr) { self.delivery = DeliveryState::Delivered(in_flight.value); return Poll::Ready(()); } } Poll::Pending } /// This method gets called when a packet loss is reported pub fn on_packet_loss<A: ack::Set>(&mut self, ack_set: &A) { // If the packet containing the frame was lost, remove the in_flight information. // This will trigger resending it. if let DeliveryState::InFlight(in_flight) = self.delivery { if ack_set.contains(in_flight.packet.packet_nr) { self.delivery = DeliveryState::Lost(in_flight.value); } } } /// Queries the component for any outgoing frames that need to get sent #[inline] pub fn on_transmit<W: WriteContext>( &mut self, stream_id: StreamId, context: &mut W, ) -> Result<(), OnTransmitError> { if let Some(value) = self .delivery .try_transmit(context.transmission_constraint()) .cloned() { let packet_nr = self .writer .write_value_as_frame(value, stream_id, context) .ok_or(OnTransmitError::CouldNotWriteFrame)?; // Overwrite the information about the pending delivery self.delivery = DeliveryState::InFlight(InFlightDelivery { packet: InflightPacketInfo { packet_nr, timestamp: context.current_time(), }, value, }); } Ok(()) } } impl<T, S> transmission::interest::Provider for OnceSync<T, S> { #[inline] fn transmission_interest<Q: transmission::interest::Query>( &self, query: &mut Q, ) -> transmission::interest::Result { self.delivery.transmission_interest(query) } }