--- AWSTemplateFormatVersion: "2010-09-09" Description: >- (WWPS-GLS-WF-SFN-WORKFLOW-EXAMPLE-MAPPING) Creates an example AWS Step-Functions StateMachine that demonstrates how to use dynamic parallelism with the Map state for a genomics workflow. Mappings: TagMap: default: architecture: "genomics-workflows" solution: "step-functions" tags: - Key: "architecture" Value: "genomics-workflows" - Key: "solution" Value: "step-functions" Parameters: Namespace: Type: String Description: Namespace (e.g. project name) to use to label resources. S3BucketName: Description: S3 URI (e.g. s3://bucket/prefix) you are using for workflow inputs and outputs Type: String IAMStepFunctionsExecutionRole: Description: > ARN of the IAM role that allows StepFunctions to call other AWS services on your behalf. In this case it needs to be able to submit AWS Batch Jobs and create CloudWatch Event rules and targets. Type: String BatchJobQueue: Description: Name or ARN of the AWS Batch Job Queue the workflow will use by default Type: String BatchJobDefinitionBwa: Description: Name or ARN of the AWS Batch Job Definition for bwa Type: String BatchJobDefinitionSamtools: Description: Name or ARN of the AWS Batch Job Definition for samtool Type: String BatchJobDefinitionBcftools: Description: Name or ARN of the AWS Batch Job Definition for bcftools Type: String Resources: SfnStateMachine: Type: AWS::StepFunctions::StateMachine Properties: StateMachineName: !Sub example-genomics-workflow-${Namespace} RoleArn: !Ref IAMStepFunctionsExecutionRole Tags: !FindInMap ["TagMap", "default", "tags"] # In the state machine below each Task state `ResultPath` is set to `$.result`. # This will write the results returned from each task to this member of the Task # state's corresponding input. Since this is the same for all tasks, each task overwrites # the last task's result. The full result history is stored in the execution # event history. Doing this keeps the overall payload for each task state # within limits. You can reduce this further by setting `ResultPath` to `null` DefinitionString: !Sub |- { "Comment": "Simple Genomics Workflow with mapping", "StartAt": "Initialize", "States": { "Initialize": { "Type": "Pass", "InputPath": "$", "Parameters": { "workflow": { "name.$": "$$.StateMachine.Name", "execution.$": "$$.Execution.Name" }, "params.$": "$.params", "jobdefs": { "bwa": "${BatchJobDefinitionBwa}", "samtools": "${BatchJobDefinitionSamtools}", "bcftools": "${BatchJobDefinitionBcftools}" } }, "Next": "BwaMem" }, "BwaMem": { "Type": "Task", "InputPath": "$", "ResultPath": "$.result", "Resource": "arn:aws:states:::batch:submitJob.sync", "Parameters": { "JobName": "bwa-mem", "JobDefinition.$": "$.jobdefs.bwa", "JobQueue.$": "$.params.queue", "ContainerOverrides": { "Vcpus": 8, "Memory": 8000, "Environment": [ {"Name": "JOB_WORKFLOW_NAME", "Value.$": "$.workflow.name"}, {"Name": "JOB_WORKFLOW_EXECUTION", "Value.$": "$.workflow.execution"}, {"Name": "JOB_INPUTS", "Value": "s3://broad-references/hg38/v0/Homo_sapiens_assembly38.fasta* ${!SOURCE_DATA_PREFIX}/${!SAMPLE_ID}*"}, {"Name": "JOB_OUTPUTS", "Value": "*.sam"}, {"Name": "JOB_OUTPUT_PREFIX", "Value.$": "$.params.environment.JOB_OUTPUT_PREFIX"}, {"Name": "JOB_AWS_CLI_PATH", "Value.$": "$.params.environment.JOB_AWS_CLI_PATH"}, {"Name": "SOURCE_DATA_PREFIX", "Value.$": "$.params.environment.SOURCE_DATA_PREFIX"}, {"Name": "SAMPLE_ID", "Value.$": "$.params.environment.SAMPLE_ID"}, {"Name": "REFERENCE_NAME", "Value.$": "$.params.environment.REFERENCE_NAME"} ], "Command": [ "bwa mem -t 8 -p -o ${!SAMPLE_ID}.sam ${!REFERENCE_NAME}.fasta ${!SAMPLE_ID}_*1*.fastq.gz" ] } }, "Next": "SamtoolsSort" }, "SamtoolsSort": { "Type": "Task", "InputPath": "$", "ResultPath": "$.result", "Resource": "arn:aws:states:::batch:submitJob.sync", "Parameters": { "JobName": "samtools-sort", "JobDefinition.$": "$.jobdefs.samtools", "JobQueue.$": "$.params.queue", "ContainerOverrides": { "Vcpus": 4, "Memory": 8000, "Environment": [ {"Name": "JOB_WORKFLOW_NAME", "Value.$": "$.workflow.name"}, {"Name": "JOB_WORKFLOW_EXECUTION", "Value.$": "$.workflow.execution"}, {"Name": "JOB_INPUTS", "Value": "$JOB_OUTPUT_PREFIX/*.sam"}, {"Name": "JOB_OUTPUTS", "Value": "*.bam"}, {"Name": "JOB_OUTPUT_PREFIX", "Value.$": "$.params.environment.JOB_OUTPUT_PREFIX"}, {"Name": "JOB_AWS_CLI_PATH", "Value.$": "$.params.environment.JOB_AWS_CLI_PATH"}, {"Name": "SAMPLE_ID", "Value.$": "$.params.environment.SAMPLE_ID"} ], "Command": [ "samtools sort -@ 4 -o ${!SAMPLE_ID}.bam ${!SAMPLE_ID}.sam" ] } }, "Next": "SamtoolsIndex" }, "SamtoolsIndex": { "Type": "Task", "InputPath": "$", "ResultPath": "$.result", "Resource": "arn:aws:states:::batch:submitJob.sync", "Parameters": { "JobName": "samtools-index", "JobDefinition.$": "$.jobdefs.samtools", "JobQueue.$": "$.params.queue", "ContainerOverrides": { "Vcpus": 1, "Memory": 1024, "Environment": [ {"Name": "JOB_WORKFLOW_NAME", "Value.$": "$.workflow.name"}, {"Name": "JOB_WORKFLOW_EXECUTION", "Value.$": "$.workflow.execution"}, {"Name": "JOB_INPUTS", "Value": "$JOB_OUTPUT_PREFIX/*.bam"}, {"Name": "JOB_OUTPUTS", "Value": "*.bam*"}, {"Name": "JOB_OUTPUT_PREFIX", "Value.$": "$.params.environment.JOB_OUTPUT_PREFIX"}, {"Name": "JOB_AWS_CLI_PATH", "Value.$": "$.params.environment.JOB_AWS_CLI_PATH"}, {"Name": "SAMPLE_ID", "Value.$": "$.params.environment.SAMPLE_ID"} ], "Command": [ "samtools index ${!SAMPLE_ID}.bam" ] } }, "Next": "CallVariantsByChromosome" }, "CallVariantsByChromosome" : { "Type": "Map", "InputPath": "$", "ItemsPath": "$.params.chromosomes", "MaxConcurrency": 23, "ResultPath": "$.result", "Parameters": { "workflow.$": "$.workflow", "params.$": "$.params", "chromosome.$": "$$.Map.Item.Value", "jobdefs.$": "$.jobdefs" }, "Iterator": { "StartAt": "BcftoolsMPileup", "States": { "BcftoolsMPileup": { "Type": "Task", "InputPath": "$", "ResultPath": "$.result", "Resource": "arn:aws:states:::batch:submitJob.sync", "Parameters": { "JobName": "bcftools-mpileup", "JobDefinition.$": "$.jobdefs.bcftools", "JobQueue.$": "$.params.queue", "ContainerOverrides": { "Vcpus": 2, "Memory": 2048, "Environment": [ {"Name": "JOB_WORKFLOW_NAME", "Value.$": "$.workflow.name"}, {"Name": "JOB_WORKFLOW_EXECUTION", "Value.$": "$.workflow.execution"}, {"Name": "JOB_INPUTS", "Value": "s3://broad-references/hg38/v0/Homo_sapiens_assembly38.fasta* $JOB_OUTPUT_PREFIX/*.bam*"}, {"Name": "JOB_OUTPUTS", "Value": "*.mpileup.gz"}, {"Name": "JOB_OUTPUT_PREFIX", "Value.$": "$.params.environment.JOB_OUTPUT_PREFIX"}, {"Name": "JOB_AWS_CLI_PATH", "Value.$": "$.params.environment.JOB_AWS_CLI_PATH"}, {"Name": "SAMPLE_ID", "Value.$": "$.params.environment.SAMPLE_ID"}, {"Name": "REFERENCE_NAME", "Value.$": "$.params.environment.REFERENCE_NAME"}, {"Name": "CHROMOSOME", "Value.$": "$.chromosome"} ], "Command": [ "bcftools mpileup --threads 2 -r ${!CHROMOSOME} -Oz -f ${!REFERENCE_NAME}.fasta -o ${!SAMPLE_ID}.${!CHROMOSOME}.mpileup.gz ${!SAMPLE_ID}.bam" ] } }, "Next": "BcftoolsCall" }, "BcftoolsCall": { "Type": "Task", "InputPath": "$", "ResultPath": "$.result", "Resource": "arn:aws:states:::batch:submitJob.sync", "Parameters": { "JobName": "bcftools-call", "JobDefinition.$": "$.jobdefs.bcftools", "JobQueue.$": "$.params.queue", "ContainerOverrides": { "Vcpus": 2, "Memory": 1024, "Environment": [ {"Name": "JOB_WORKFLOW_NAME", "Value.$": "$.workflow.name"}, {"Name": "JOB_WORKFLOW_EXECUTION", "Value.$": "$.workflow.execution"}, {"Name": "JOB_INPUTS", "Value": "$JOB_OUTPUT_PREFIX/*.${!CHROMOSOME}.mpileup.gz"}, {"Name": "JOB_OUTPUTS", "Value": "*.vcf.gz"}, {"Name": "JOB_OUTPUT_PREFIX", "Value.$": "$.params.environment.JOB_OUTPUT_PREFIX"}, {"Name": "JOB_AWS_CLI_PATH", "Value.$": "$.params.environment.JOB_AWS_CLI_PATH"}, {"Name": "SAMPLE_ID", "Value.$": "$.params.environment.SAMPLE_ID"}, {"Name": "CHROMOSOME", "Value.$": "$.chromosome"} ], "Command": [ "bcftools call -m --threads 2 -t ${!CHROMOSOME} -Oz -o ${!SAMPLE_ID}.${!CHROMOSOME}.vcf.gz ${!SAMPLE_ID}.${!CHROMOSOME}.mpileup.gz" ] } }, "End": true } } }, "End": true } } } Outputs: StateMachine: Value: !Ref SfnStateMachine Description: Step-functions state-machine that demonstrates a simple genomics workflow with dynamic parallelism StateMachineInput: Description: Example input for the state machine Value: !Sub |- { "params": { "queue": "${BatchJobQueue}", "environment": { "REFERENCE_NAME": "Homo_sapiens_assembly38", "SAMPLE_ID": "NIST7035", "SOURCE_DATA_PREFIX": "s3://aws-batch-genomics-shared/secondary-analysis/example-files/fastq", "JOB_OUTPUT_PREFIX": "s3://${S3BucketName}", "JOB_AWS_CLI_PATH": "/opt/aws-cli/bin" }, "chromosomes": [ "chr19", "chr20", "chr21", "chr22" ] } } ...