AWSTemplateFormatVersion: "2010-09-09" Parameters: OmicsImportSequenceLambdaArn: Type: String OmicsImportSequenceJobRoleArn: Type: String CheckOmicsTaskLambdaFunctionArn: Type: String OmicsWorkflowStartRunLambdaArn: Type: String OmicsWorkflowStartRunJobRoleArn: Type: String OmicsImportVariantLambdaArn: Type: String OmicsImportVariantJobRoleArn: Type: String ApplyS3LifecycleLambdaFunctionArn: Type: String ReferenceFastaFileS3Uri: Type: String Mills1000GIndelsVcf: Type: String DbSnpVcf: Type: String KnownIndelsVcf: Type: String RunDate: Type: String Description: Example run date Default: 2016-09-01T02:00:00+0200 PlatformName: Type: String Description: Example platform name Default: Illumina SequencingCenter: Type: String Description: Example sequencing center Default: ABCD OmicsVariantStoreName: Type: String Description: >- State Machine for ingesting FASTQs into Omics Sequence Store, running GATK workflow with input FASTQs, ingesting post-workflow outputs into Omics Sequence and Variamt Stores, and S3 file tagging to enable activation of S3 lifecycle policies on inputs and outputs Resources: AmazonOmicsStepFunction: Type: AWS::StepFunctions::StateMachine Properties: RoleArn: !GetAtt AmazonOmicsStepFunctionRole.Arn StateMachineName: AmazonOmicsEndToEndStepFunction DefinitionSubstitutions: OmicsImportSequenceLambdaArn: !Ref OmicsImportSequenceLambdaArn OmicsImportSequenceJobRoleArn: !Ref OmicsImportSequenceJobRoleArn CheckOmicsTaskLambdaFunctionArn: !Ref CheckOmicsTaskLambdaFunctionArn OmicsWorkflowStartRunLambdaArn: !Ref OmicsWorkflowStartRunLambdaArn OmicsWorkflowStartRunJobRoleArn: !Ref OmicsWorkflowStartRunJobRoleArn OmicsImportVariantJobRoleArn: !Ref OmicsImportVariantJobRoleArn OmicsImportVariantLambdaArn: !Ref OmicsImportVariantLambdaArn ApplyS3LifecycleLambdaFunctionArn: !Ref ApplyS3LifecycleLambdaFunctionArn DefinitionString: !Sub | { "Comment": "StateMachine to orchestrate end-to-end Omics Workflow", "StartAt": "IngestFastqToReadSet", "States": { "IngestFastqToReadSet": { "InputPath": "$", "Next": "WaitForFastqIngest", "Parameters": { "FunctionName": "${OmicsImportSequenceLambdaArn}", "Payload": { "FileType": "FASTQ", "Read1.$": "$.Read1", "Read2.$": "$.Read2", "ReferenceArn.$": "$.ReferenceArn", "SampleId.$": "$.SampleId", "SequenceStoreId.$": "$.SequenceStoreId", "SubjectId.$": "$.SubjectId", "RoleArn": "${OmicsImportSequenceJobRoleArn}" } }, "Resource": "arn:aws:states:::lambda:invoke", "ResultSelector": { "import_fastq.$": "$.Payload" }, "ResultPath": "$.import_fastq", "Type": "Task" }, "WaitForFastqIngest": { "Next": "CheckFastqIngest", "Seconds": 10, "Type": "Wait" }, "SuccessState": { "Type": "Succeed" }, "CheckFastqIngest": { "InputPath": "$", "Next": "FastqIngestDone?", "Parameters": { "FunctionName": "${CheckOmicsTaskLambdaFunctionArn}", "Payload": { "task_type": "GetReadSetImportJob", "task_params": { "id.$": "$.import_fastq.import_fastq.importReadSetJobId", "sequence_store_id.$": "$.SequenceStoreId" } } }, "Resource": "arn:aws:states:::lambda:invoke", "ResultPath": "$.import_fastq.import_fastq.status.message", "Type": "Task" }, "FastqIngestFailed": { "Cause": "Fastq Ingest Failed", "Error": "$.import_fastq.import_fastq.status.error", "Type": "Fail" }, "FastqIngestDone?": { "Choices": [ { "Next": "RunOmicsWorkflowLambda", "StringEquals": "COMPLETED", "Variable": "$.import_fastq.import_fastq.status.message.Payload.task_status" }, { "Next": "FastqIngestFailed", "StringEquals": "FAILED", "Variable": "$.import_fastq.import_fastq.status.message.Payload.task_status" } ], "Default": "WaitForFastqIngest", "Type": "Choice" }, "RunOmicsWorkflowLambda": { "InputPath": "$", "Next": "WaitForOmicsWorkflow", "Parameters": { "FunctionName": "${OmicsWorkflowStartRunLambdaArn}", "Payload": { "sample_name.$": "$.SampleId", "fastq_1.$": "$.Read1", "fastq_2.$": "$.Read2", "ref_fasta": "${ReferenceFastaFileS3Uri}", "readgroup_name.$": "$.SampleId", "library_name.$": "$.SampleId", "platform_name": "${PlatformName}", "run_date": "${RunDate}", "sequencing_center": "${SequencingCenter}", "dbSNP_vcf": "${DbSnpVcf}", "Mills_1000G_indels_vcf": "${Mills1000GIndelsVcf}", "known_indels_vcf": "${KnownIndelsVcf}", "scattered_calling_intervals_archive.$": "$.IntervalsS3Path", "gatk_docker.$": "$.GatkDockerUri", "gotc_docker.$": "$.GotcDockerUri", "WorkflowId.$": "$.WorkflowId", "JobRoleArn": "${OmicsWorkflowStartRunJobRoleArn}", "OutputS3Path.$": "$.WorkflowOutputS3Path" } }, "Resource": "arn:aws:states:::lambda:invoke", "ResultSelector": { "workflow.$": "$.Payload" }, "ResultPath": "$.workflow", "Type": "Task" }, "WaitForOmicsWorkflow": { "Next": "CheckOmicsWorkflow", "Seconds": 60, "Type": "Wait" }, "CheckOmicsWorkflow": { "InputPath": "$", "Next": "OmicsWorkflowDone?", "Parameters": { "FunctionName": "${CheckOmicsTaskLambdaFunctionArn}", "Payload": { "task_type": "GetRun", "task_params": { "id.$": "$.workflow.workflow.WorkflowRunId" } } }, "Resource": "arn:aws:states:::lambda:invoke", "ResultPath": "$.workflow.workflow.status.message", "Type": "Task" }, "OmicsWorkflowDone?": { "Choices": [ { "Next": "PostWorkflowIngest", "StringEquals": "COMPLETED", "Variable": "$.workflow.workflow.status.message.Payload.task_status" }, { "Next": "OmicsWorkflowFailed", "StringEquals": "FAILED", "Variable": "$.workflow.workflow.status.message.Payload.task_status" } ], "Default": "WaitForOmicsWorkflow", "Type": "Choice" }, "OmicsWorkflowFailed": { "Cause": "Omics Workflow Failed", "Error": "$.workflow.workflow.status.error", "Type": "Fail" }, "PostWorkflowIngest": { "Branches": [ { "StartAt": "IngestBamToReadSet", "States": { "BamIngestDone?": { "Choices": [ { "Next": "PostWorkflowBamIngestCompleted", "StringEquals": "COMPLETED", "Variable": "$.import_bam.import_bam.status.message.Payload.task_status" }, { "Next": "BamIngestFailed", "StringEquals": "FAILED", "Variable": "$.import_bam.import_bam.status.message.Payload.task_status" } ], "Default": "WaitForBamIngest", "Type": "Choice" }, "BamIngestFailed": { "Cause": "Post Workflow BAM Ingest Failed", "Error": "$.import_bam.import_bam.status.error", "Type": "Fail" }, "CheckBamIngest": { "InputPath": "$", "Next": "BamIngestDone?", "Parameters": { "FunctionName": "${CheckOmicsTaskLambdaFunctionArn}", "Payload": { "task_type": "GetReadSetImportJob", "task_params": { "id.$": "$.import_bam.import_bam.importReadSetJobId", "sequence_store_id.$": "$.SequenceStoreId" } } }, "Resource": "arn:aws:states:::lambda:invoke", "ResultPath": "$.import_bam.import_bam.status.message", "Type": "Task" }, "IngestBamToReadSet": { "InputPath": "$", "Next": "WaitForBamIngest", "Parameters": { "FunctionName": "${OmicsImportSequenceLambdaArn}", "Payload": { "FileType": "BAM", "Read1.$": "States.Format('{}/{}/out/analysis_ready_bam/{}.hg38.bam', $.WorkflowOutputS3Path, $.workflow.workflow.WorkflowRunId, $.SampleId)", "ReferenceArn.$": "$.ReferenceArn", "SampleId.$": "$.SampleId", "SequenceStoreId.$": "$.SequenceStoreId", "SubjectId.$": "$.SubjectId", "RoleArn": "${OmicsImportSequenceJobRoleArn}" } }, "Resource": "arn:aws:states:::lambda:invoke", "ResultSelector": { "import_bam.$": "$.Payload" }, "ResultPath": "$.import_bam", "Type": "Task" }, "PostWorkflowBamIngestCompleted": { "End": true, "Type": "Pass" }, "WaitForBamIngest": { "Next": "CheckBamIngest", "Seconds": 10, "Type": "Wait" } } }, { "StartAt": "IngestVcfToVariantStore", "States": { "CheckVcfIngest": { "InputPath": "$", "Next": "VcfIngestDone?", "Parameters": { "FunctionName": "${CheckOmicsTaskLambdaFunctionArn}", "Payload": { "task_type": "GetVariantImportJob", "task_params": { "job_id.$": "$.import_vcf.import_vcf.VariantImportJobId" } } }, "Resource": "arn:aws:states:::lambda:invoke", "ResultPath": "$.import_vcf.import_vcf.status.message", "Type": "Task" }, "IngestVcfToVariantStore": { "InputPath": "$", "Next": "WaitForVcfIngest", "Parameters": { "FunctionName": "${OmicsImportVariantLambdaArn}", "Payload": { "VariantStoreName": "${OmicsVariantStoreName}", "OmicsImportVariantRoleArn": "${OmicsImportVariantJobRoleArn}", "VcfS3Uri.$": "States.Format('{}/{}/out/output_vcf/{}.hg38.vcf.gz', $.WorkflowOutputS3Path, $.workflow.workflow.WorkflowRunId, $.SampleId)" } }, "Resource": "arn:aws:states:::lambda:invoke", "ResultSelector": { "import_vcf.$": "$.Payload" }, "ResultPath": "$.import_vcf", "Type": "Task" }, "PostWorkflowVcfIngestCompleted": { "End": true, "Type": "Pass" }, "VcfIngestDone?": { "Choices": [ { "Next": "PostWorkflowVcfIngestCompleted", "StringEquals": "COMPLETED", "Variable": "$.import_vcf.import_vcf.status.message.Payload.task_status" }, { "Next": "VcfIngestFailed", "StringEquals": "FAILED", "Variable": "$.import_vcf.import_vcf.status.message.Payload.task_status" } ], "Default": "WaitForVcfIngest", "Type": "Choice" }, "VcfIngestFailed": { "Cause": "Post Workflow VCF Ingest to Variant Store Failed", "Error": "$.import_vcf.import_vcf.status.error", "Type": "Fail" }, "WaitForVcfIngest": { "Next": "CheckVcfIngest", "Seconds": 10, "Type": "Wait" } } } ], "Next": "AddLifeCycleTags", "Type": "Parallel" }, "AddLifeCycleTags": { "Next": "SuccessState", "InputPath": "$.[0]", "Parameters": { "FunctionName": "${ApplyS3LifecycleLambdaFunctionArn}", "Payload": { "inputs": { "fastq.$": "States.Array($.Read1,$.Read2)" }, "outputs": { "vcf.$": "States.Array(States.Format('{}/{}/out/output_vcf/{}.hg38.vcf.gz', $.WorkflowOutputS3Path, $.workflow.workflow.WorkflowRunId, $.SampleId))", "bam.$": "States.Array(States.Format('{}/{}/out/analysis_ready_bam/{}.hg38.bam', $.WorkflowOutputS3Path, $.workflow.workflow.WorkflowRunId, $.SampleId))" } } }, "Resource": "arn:aws:states:::lambda:invoke", "Type": "Task" } } } AmazonOmicsStepFunctionRole: Type: AWS::IAM::Role Properties: RoleName: AmazonOmicsStepFunctionRole AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: !Sub 'states.${AWS::Region}.amazonaws.com' Action: 'sts:AssumeRole' Policies: - PolicyName: InvokeLambda PolicyDocument: Statement: - Effect: Allow Action: 'lambda:InvokeFunction' Resource: - !Ref OmicsImportSequenceLambdaArn - !Ref CheckOmicsTaskLambdaFunctionArn - !Ref OmicsWorkflowStartRunLambdaArn - !Ref OmicsImportVariantLambdaArn - !Ref ApplyS3LifecycleLambdaFunctionArn Outputs: AmazonOmicsStepFunctionArn: Value: !GetAtt AmazonOmicsStepFunction.Arn Export: Name: AmazonOmicsStepFunctionArn