//! EventBridge bus implementation //! //! Bus implementation using the AWS SDK for EventBridge. use super::EventBus; use crate::{Error, Event}; use async_trait::async_trait; use aws_sdk_eventbridge::Client; use futures::future::join_all; use tracing::{info, instrument}; mod ext; use ext::EventExt; /// EventBridge bus implementation. pub struct EventBridgeBus { client: Client, bus_name: String, } impl EventBridgeBus { pub fn new(client: Client, bus_name: String) -> Self { Self { client, bus_name } } } #[async_trait] impl EventBus for EventBridgeBus { type E = Event; /// Publish an event to the event bus. #[instrument(skip(self))] async fn send_event(&self, event: &Self::E) -> Result<(), Error> { info!("Publishing event to EventBridge"); self.client .put_events() .entries(event.to_eventbridge(&self.bus_name)) .send() .await?; Ok(()) } /// Publish a batch of events to the event bus. #[instrument(skip(self, events))] async fn send_events(&self, events: &[Self::E]) -> Result<(), Error> { // Send batches of 10 events at a time // // EventBridge has a limit of 10 events per `put_events()` request. // // `send()` returns a Future, so we can use `join_all` to wait for all of the // futures to complete. This means we can send all batches at the same time // and not have to wait for each batch to complete before sending the next one. info!("Publishing events to EventBridge"); let res = join_all(events.iter().collect::>().chunks(10).map(|chunk| { self.client .put_events() .set_entries(Some( chunk .iter() .map(|e| e.to_eventbridge(&self.bus_name)) .collect::>(), )) .send() })) .await; // Retrieve errors from the response vector // // If any of the responses contained an error, we'll return an error. res.into_iter().collect::, _>>()?; Ok(()) } } #[cfg(test)] mod tests { use super::*; use crate::{Event, Product}; use aws_sdk_eventbridge::{Client, Config, Credentials, Region}; use aws_smithy_client::{erase::DynConnector, test_connection::TestConnection}; use aws_smithy_http::body::SdkBody; // Config for mocking EventBridge async fn get_mock_config() -> Config { let cfg = aws_config::from_env() .region(Region::new("eu-west-1")) .credentials_provider(Credentials::new( "accesskey", "privatekey", None, None, "dummy", )) .load() .await; Config::new(&cfg) } fn get_request_builder() -> http::request::Builder { http::Request::builder() .header("content-type", "application/x-amz-json-1.1") .uri(http::uri::Uri::from_static( "https://events.eu-west-1.amazonaws.com/", )) } #[tokio::test] async fn test_send_event() -> Result<(), Error> { // GIVEN a mock EventBridge client let conn = TestConnection::new(vec![( get_request_builder() .header("x-amz-target", "AWSEvents.PutEvents") .body(SdkBody::from( r#"{"Entries":[{"Source":"rust-products","Resources":["test-id"],"DetailType":"ProductCreated","Detail":"{\"type\":\"Created\",\"product\":{\"id\":\"test-id\",\"name\":\"test-name\",\"price\":10.0}}","EventBusName":"test-bus"}]}"#, )) .unwrap(), http::Response::builder() .status(200) .body(SdkBody::from("{}")) .unwrap(), )]); let client = Client::from_conf_conn(get_mock_config().await, DynConnector::new(conn.clone())); let event_bus = EventBridgeBus::new(client, "test-bus".to_string()); // WHEN we send an event let event = Event::Created { product: Product { id: "test-id".to_string(), name: "test-name".to_string(), price: 10.0, }, }; event_bus.send_event(&event).await?; // THEN the request should have been sent to EventBridge assert_eq!(conn.requests().len(), 1); conn.assert_requests_match(&vec![]); Ok(()) } #[tokio::test] async fn test_send_events() -> Result<(), Error> { // GIVEN a mock EventBridge client let conn = TestConnection::new(vec![( get_request_builder() .header("x-amz-target", "AWSEvents.PutEvents") .body(SdkBody::from( r#"{"Entries":[{"Source":"rust-products","Resources":["test-id"],"DetailType":"ProductCreated","Detail":"{\"type\":\"Created\",\"product\":{\"id\":\"test-id\",\"name\":\"test-name\",\"price\":10.0}}","EventBusName":"test-bus"},{"Source":"rust-products","Resources":["test-id-2"],"DetailType":"ProductDeleted","Detail":"{\"type\":\"Deleted\",\"product\":{\"id\":\"test-id-2\",\"name\":\"test-name-2\",\"price\":20.0}}","EventBusName":"test-bus"}]}"#, )) .unwrap(), http::Response::builder() .status(200) .body(SdkBody::from("{}")) .unwrap(), )]); let client = Client::from_conf_conn(get_mock_config().await, DynConnector::new(conn.clone())); let event_bus = EventBridgeBus::new(client, "test-bus".to_string()); // WHEN we send a batch of events let events = vec![ Event::Created { product: Product { id: "test-id".to_string(), name: "test-name".to_string(), price: 10.0, }, }, Event::Deleted { product: Product { id: "test-id-2".to_string(), name: "test-name-2".to_string(), price: 20.0, }, }, ]; event_bus.send_events(&events).await?; // THEN the request should have been sent to EventBridge assert_eq!(conn.requests().len(), 1); conn.assert_requests_match(&vec![]); Ok(()) } #[tokio::test] async fn test_send_events0() -> Result<(), Error> { // GIVEN a mock EventBridge client let conn: TestConnection = TestConnection::new(vec![]); let client = Client::from_conf_conn(get_mock_config().await, DynConnector::new(conn.clone())); let event_bus = EventBridgeBus::new(client, "test-bus".to_string()); // WHEN we send zero events event_bus.send_events(&vec![]).await?; // THEN no request should have been sent to EventBridge assert_eq!(conn.requests().len(), 0); conn.assert_requests_match(&vec![]); Ok(()) } #[tokio::test] async fn test_send_events15() -> Result<(), Error> { // GIVEN a mock EventBridge client let conn = TestConnection::new(vec![( get_request_builder() .header("x-amz-target", "AWSEvents.PutEvents") .body(SdkBody::from( r#"{"Entries":[{"Source":"rust-products","Resources":["test-id-0"],"DetailType":"ProductCreated","Detail":"{\"type\":\"Created\",\"product\":{\"id\":\"test-id-0\",\"name\":\"test-name-0\",\"price\":10.0}}","EventBusName":"test-bus"},{"Source":"rust-products","Resources":["test-id-1"],"DetailType":"ProductCreated","Detail":"{\"type\":\"Created\",\"product\":{\"id\":\"test-id-1\",\"name\":\"test-name-1\",\"price\":11.0}}","EventBusName":"test-bus"},{"Source":"rust-products","Resources":["test-id-2"],"DetailType":"ProductCreated","Detail":"{\"type\":\"Created\",\"product\":{\"id\":\"test-id-2\",\"name\":\"test-name-2\",\"price\":12.0}}","EventBusName":"test-bus"},{"Source":"rust-products","Resources":["test-id-3"],"DetailType":"ProductCreated","Detail":"{\"type\":\"Created\",\"product\":{\"id\":\"test-id-3\",\"name\":\"test-name-3\",\"price\":13.0}}","EventBusName":"test-bus"},{"Source":"rust-products","Resources":["test-id-4"],"DetailType":"ProductCreated","Detail":"{\"type\":\"Created\",\"product\":{\"id\":\"test-id-4\",\"name\":\"test-name-4\",\"price\":14.0}}","EventBusName":"test-bus"},{"Source":"rust-products","Resources":["test-id-5"],"DetailType":"ProductCreated","Detail":"{\"type\":\"Created\",\"product\":{\"id\":\"test-id-5\",\"name\":\"test-name-5\",\"price\":15.0}}","EventBusName":"test-bus"},{"Source":"rust-products","Resources":["test-id-6"],"DetailType":"ProductCreated","Detail":"{\"type\":\"Created\",\"product\":{\"id\":\"test-id-6\",\"name\":\"test-name-6\",\"price\":16.0}}","EventBusName":"test-bus"},{"Source":"rust-products","Resources":["test-id-7"],"DetailType":"ProductCreated","Detail":"{\"type\":\"Created\",\"product\":{\"id\":\"test-id-7\",\"name\":\"test-name-7\",\"price\":17.0}}","EventBusName":"test-bus"},{"Source":"rust-products","Resources":["test-id-8"],"DetailType":"ProductCreated","Detail":"{\"type\":\"Created\",\"product\":{\"id\":\"test-id-8\",\"name\":\"test-name-8\",\"price\":18.0}}","EventBusName":"test-bus"},{"Source":"rust-products","Resources":["test-id-9"],"DetailType":"ProductCreated","Detail":"{\"type\":\"Created\",\"product\":{\"id\":\"test-id-9\",\"name\":\"test-name-9\",\"price\":19.0}}","EventBusName":"test-bus"}]}"#, )) .unwrap(), http::Response::builder() .status(200) .body(SdkBody::from("{}")) .unwrap(), ), ( get_request_builder() .header("x-amz-target", "AWSEvents.PutEvents") .body(SdkBody::from( r#"{"Entries":[{"Source":"rust-products","Resources":["test-id-10"],"DetailType":"ProductCreated","Detail":"{\"type\":\"Created\",\"product\":{\"id\":\"test-id-10\",\"name\":\"test-name-10\",\"price\":20.0}}","EventBusName":"test-bus"},{"Source":"rust-products","Resources":["test-id-11"],"DetailType":"ProductCreated","Detail":"{\"type\":\"Created\",\"product\":{\"id\":\"test-id-11\",\"name\":\"test-name-11\",\"price\":21.0}}","EventBusName":"test-bus"},{"Source":"rust-products","Resources":["test-id-12"],"DetailType":"ProductCreated","Detail":"{\"type\":\"Created\",\"product\":{\"id\":\"test-id-12\",\"name\":\"test-name-12\",\"price\":22.0}}","EventBusName":"test-bus"},{"Source":"rust-products","Resources":["test-id-13"],"DetailType":"ProductCreated","Detail":"{\"type\":\"Created\",\"product\":{\"id\":\"test-id-13\",\"name\":\"test-name-13\",\"price\":23.0}}","EventBusName":"test-bus"},{"Source":"rust-products","Resources":["test-id-14"],"DetailType":"ProductCreated","Detail":"{\"type\":\"Created\",\"product\":{\"id\":\"test-id-14\",\"name\":\"test-name-14\",\"price\":24.0}}","EventBusName":"test-bus"}]}"#, )) .unwrap(), http::Response::builder() .status(200) .body(SdkBody::from("{}")) .unwrap(), )]); let client = Client::from_conf_conn(get_mock_config().await, DynConnector::new(conn.clone())); let event_bus = EventBridgeBus::new(client, "test-bus".to_string()); // WHEN we send 15 events let events = (0..15) .map(|i| Event::Created { product: Product { id: format!("test-id-{}", i), name: format!("test-name-{}", i), price: 10.0 + i as f64, }, }) .collect::>(); event_bus.send_events(&events).await?; // THEN two requests should have been sent to EventBridge assert_eq!(conn.requests().len(), 2); conn.assert_requests_match(&vec![]); Ok(()) } }