StartAt: PreparePollerJob States: PreparePollerJob: Type: Task Resource: arn:aws:states:::lambda:invoke InputPath: "$" OutputPath: "$.Payload" Parameters: FunctionName: "${WorkflowPreparePollerJobFunction}" Payload.$: "$" Next: ProcessPollerTasks Comment: Check the level of parallelism, split requests into chunks and invoke lamndas Retry: - ErrorEquals: - RetriablePollerError IntervalSeconds: 1 MaxAttempts: 2 BackoffRate: 1 ProcessPollerTasks: Type: Map Next: PollerJobComplete InputPath: "$" ItemsPath: "$.tasks" Parameters: task.$: "$$.Map.Item.Value" taskStateMap.$: "$.taskStateMap" nextState.$: "$.taskStateMap.Started" MaxConcurrency: 25 Iterator: StartAt: ReportProgress States: Done: Type: Pass Parameters: jobId.$: "$.task.jobId" taskId.$: "$.task.taskId" state.$: "$.nextState" payloadLocation.$: "$.payloadLocation" End: True ReportProgress: Type: Task Resource: "${DDBPutItem}" ResultPath: "$.ddb" Parameters: Item: JobId: S.$: "$.task.jobId" TaskId: S.$: "$.task.taskId" TimeToLive: N.$: "$.task.ttl" State: S.$: "$.nextState" TableName: "${PollerTasksTable}" Next: ChooseNextAction ChooseNextAction: Type: Choice Choices: - Variable: "$.nextState" StringEqualsPath: "$.taskStateMap.Started" Next: FetchPayload - Variable: "$.nextState" StringEqualsPath: "$.taskStateMap.PayloadSaved" Next: SendToSQS - Variable: "$.nextState" StringEqualsPath: "$.taskStateMap.Enqueued" Next: Done - Variable: "$.nextState" StringEqualsPath: "$.taskStateMap.Failed" Next: Done Default: Done FetchPayload: Type: Task Resource: arn:aws:states:::lambda:invoke OutputPath: "$.Payload" Parameters: FunctionName: "${WorkflowFetchPayloadFunction}" Payload.$: "$" Next: OnPayloadSaved Retry: - ErrorEquals: - RetriablePollerError IntervalSeconds: 2 MaxAttempts: 6 BackoffRate: 2 Catch: - Next: OnFailure ResultPath: "$.error" ErrorEquals: - States.ALL SendToSQS: Type: Task Resource: "${SQSPutItem}" ResultPath: "$.sqs" Parameters: QueueUrl: "${ProcessPayloadQueueUrl}" MessageBody.$: "$" MessageAttributes: jobId: DataType: "String" StringValue.$: "$.task.jobId" taskId: DataType: "String" StringValue.$: "$.task.taskId" Next: OnSentToSQS # grouping state reporting tasks together OnPayloadSaved: Type: Pass Next: ReportProgress Parameters: taskStateMap.$: "$.taskStateMap" task.$: "$.task" nextState.$: "$.taskStateMap.PayloadSaved" payloadLocation.$: "$.payloadLocation" OnSentToSQS: Type: Pass Next: ReportProgress Parameters: taskStateMap.$: "$.taskStateMap" task.$: "$.task" nextState.$: "$.taskStateMap.Enqueued" payloadLocation.$: "$.payloadLocation" OnFailure: Type: Pass Next: ReportProgress Parameters: taskStateMap.$: "$.taskStateMap" task.$: "$.task" nextState.$: "$.taskStateMap.Failed" payloadLocation.$: "$.payloadLocation" # grouping state reporting tasks together PollerJobComplete: Type: Pass Comment: Used for result aggregation End: true TimeoutSeconds: 150