StartAt: Initialize DDB States: Initialize DDB: Type: Parallel Next: Get Config Branches: - StartAt: Check Config States: Check Config: Type: Task Resource: arn:aws:states:::dynamodb:getItem Parameters: TableName: demo-event-emitter_event-pusher-config Key: config_id: S: event_emitter Next: Choice Choice: Type: Choice Choices: - Or: - Not: Variable: $.Item.worker_count.N IsPresent: true - Not: Variable: $.Item.thread_count.N IsPresent: true - Not: Variable: $.Item.min_wait.N IsPresent: true - Not: Variable: $.Item.max_wait.N IsPresent: true Next: Initialize Continuation Default: Initialization Success Initialize Continuation: Type: Task Resource: arn:aws:states:::dynamodb:putItem Parameters: TableName: demo-event-emitter_event-pusher-config Item: config_id: S: event_emitter worker_count: 'N': '10' thread_count: 'N': '10' min_wait: 'N': '1' max_wait: 'N': '10' continue: S: 'true' Next: Check Config Initialization Success: Type: Succeed ResultPath: null Get Config: Type: Task Resource: arn:aws:states:::dynamodb:getItem Parameters: TableName: ${dynamodb_table_config} Key: config_id: S: event_emitter ResultSelector: worker_count.$: States.StringToJson($.Item.worker_count.N) thread_count.$: States.StringToJson($.Item.thread_count.N) min_wait.$: States.StringToJson($.Item.min_wait.N) max_wait.$: States.StringToJson($.Item.max_wait.N) continue.$: $.Item.continue.S ResultPath: $.config Next: Continue Distributed Map? Continue Distributed Map?: Type: Choice Choices: - Not: Variable: $.config.continue StringEquals: 'true' Next: End Execution Default: Generate Items Generate Items: Type: Pass Next: Distributed Map Parameters: items.$: States.ArrayRange(1,$.config.worker_count,1) ResultPath: $.worker_items Distributed Map: Type: Map ItemProcessor: ProcessorConfig: Mode: DISTRIBUTED ExecutionType: EXPRESS StartAt: Reload Config States: Reload Config: Type: Task Resource: arn:aws:states:::dynamodb:getItem Parameters: TableName: ${dynamodb_table_config} Key: config_id: S: event_emitter ResultSelector: worker_count.$: States.StringToJson($.Item.worker_count.N) thread_count.$: States.StringToJson($.Item.thread_count.N) min_wait.$: States.StringToJson($.Item.min_wait.N) max_wait.$: States.StringToJson($.Item.max_wait.N) continue.$: $.Item.continue.S ResultPath: $.config Next: Continue Item Processor? Continue Item Processor?: Type: Choice Choices: - Not: Variable: $.config.continue StringEquals: 'true' Next: Item Processor Success Default: Set Parallelism Set Parallelism: Type: Pass ResultPath: $.worker_config Parameters: items.$: States.ArrayRange(1,$.config.thread_count,1) Next: Parallel Work Parallel Work: Type: Map ItemProcessor: ProcessorConfig: Mode: INLINE StartAt: Determine Wait Time States: Determine Wait Time: Type: Pass Next: Wait Parameters: wait_time.$: States.MathRandom($.config.min_wait,$.config.max_wait) config.$: $.config thread_id.$: $.thread_id Wait: Type: Wait SecondsPath: $.wait_time Next: Send Event # Send Event: # Type: Task # Resource: arn:aws:states:::events:putEvents # Parameters: # Entries: # - Detail: # Message: Hello from Step Functions! # DetailType: MyDetailType # EventBusName: Default # Source: MySource # Next: Success Send Event: Type: Task Resource: arn:aws:states:::sqs:sendMessage Parameters: QueueUrl: ${sqs_queue_url} MessageBody: thread_id.$: $.thread_id Next: Success Catch: - ErrorEquals: - States.ALL Next: Success Success: Type: Succeed ItemsPath: $.worker_config.items ItemSelector: thread_id.$: $$.Map.Item.Index config.$: $.config ResultPath: null Next: Reload Config Item Processor Success: Type: Succeed Next: Get Config Label: DistributedMap MaxConcurrency: 1000 ItemsPath: $.worker_items.items ItemSelector: worker_id.$: $$.Map.Item.Index ToleratedFailurePercentage: 100 ResultPath: null End Execution: Type: Succeed