--- AWSTemplateFormatVersion: 2010-09-09 Description: AWS Step Functions sample project for running jobs on Amazon EMR. Resources: EMRJobManagerStateMachine: Type: AWS::StepFunctions::StateMachine Properties: RoleArn: !GetAtt [ EMRJobManagerStateMachineExecutionRole, Arn ] StateMachineName: "makeshift-demo-state-machine" DefinitionString: !Sub - |- { "Comment": "An example of the Amazon States Language for running jobs on Amazon EMR", "StartAt": "Start in parallel", "States": { "Start in parallel": { "Comment": "Start two executions of the same state machine in parallel", "Type": "Parallel", "End": true, "Branches": [{ "StartAt": "Pre-Process", "States": { "Pre-Process": { "Type": "Pass", "Parameters": { "FormattedInputsToEmr": { "Name.$": "$.requestId", "Tags": [ { "Arg0.$" : "$.tagCreatedBy", "Arg1.$" : "$.tagCostCenter" } ] } }, "Next": "Create an EMR cluster" }, "Create an EMR cluster": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:createCluster.sync", "Parameters": { "Name.$": "$.FormattedInputsToEmr.Name", "VisibleToAllUsers": true, "ReleaseLabel": "emr-5.26.0", "Tags.$": "$.FormattedInputsToEmr.Tags[*][*]", "Applications": [{ "Name": "Spark" }], "ServiceRole": "${EMRServiceRole}", "JobFlowRole": "${EMREc2InstanceProfile}", "LogUri": "s3://${EMRLogS3Bucket}/logs/", "Instances": { "Ec2SubnetId" : "${EMRSubnetId}", "KeepJobFlowAliveWhenNoSteps": true, "InstanceFleets": [{ "Name": "MyMasterFleet", "InstanceFleetType": "MASTER", "TargetOnDemandCapacity": 1, "InstanceTypeConfigs": [{ "InstanceType": "m5.xlarge" }] }, { "Name": "MyCoreFleet", "InstanceFleetType": "CORE", "TargetOnDemandCapacity": 1, "InstanceTypeConfigs": [{ "InstanceType": "m5.xlarge" }] } ] } }, "ResultPath": "$.EMR", "Next": "Run first step" }, "Run first step": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:addStep.sync", "Parameters": { "ClusterId.$": "$.EMR.ClusterId", "Step": { "Name": "$.APIGatewayData.cognitoID", "ActionOnFailure": "TERMINATE_JOB_FLOW", "HadoopJarStep": { "Jar": "command-runner.jar", "Args": [ "spark-submit", "--deploy-mode", "cluster", "--class", "com.awsblogs.makeshift.requests.process.AccidentsSparkJob", "s3://${EMRInputBucket}/example-spark-job-1.0-SNAPSHOT-jar-with-dependencies.jar" ] } } }, "Retry": [{ "ErrorEquals": ["States.ALL"], "IntervalSeconds": 1, "MaxAttempts": 3, "BackoffRate": 2.0 }], "ResultPath": "$.firstStep", "Next": "Terminate Cluster" }, "Terminate Cluster": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:terminateCluster", "Parameters": { "ClusterId.$": "$.EMR.ClusterId" }, "End": true } } }, { "StartAt": "Initial DynamoDB update", "States": { "Initial DynamoDB update": { "Type": "Task", "Resource": "arn:aws:lambda:::function:makeshift-aws-blog-check-emr-status-lambda", "Next": "Wait 60 Seconds", "InputPath": "$", "ResultPath": "$.jobStatus", "Retry": [{ "ErrorEquals": ["States.ALL"], "IntervalSeconds": 1, "MaxAttempts": 3, "BackoffRate": 2 }] }, "Wait 60 Seconds": { "Type": "Wait", "Seconds": 60, "Next": "Update DynamoDB with Job Status" }, "Update DynamoDB with Job Status": { "Type": "Task", "Resource": "arn:aws:lambda:::function:makeshift-aws-blog-check-emr-status-lambda", "Next": "Job Complete?", "InputPath": "$", "ResultPath": "$.jobStatus", "Retry": [{ "ErrorEquals": ["States.ALL"], "IntervalSeconds": 1, "MaxAttempts": 3, "BackoffRate": 2 }] }, "Job Complete?": { "Type": "Choice", "Choices": [{ "Variable": "$.jobStatus", "StringEquals": "FAILED", "Next": "Job Failed" }, { "Variable": "$.jobStatus", "StringEquals": "COMPLETED", "Next": "Get Final Job Status" } ], "Default": "Wait 60 Seconds" }, "Job Failed": { "Type": "Fail", "Cause": "AWS Batch Job Failed", "Error": "DescribeJob returned FAILED" }, "Get Final Job Status": { "Type": "Task", "Resource": "arn:aws:lambda:::function:makeshift-aws-blog-check-emr-status-lambda", "InputPath": "$", "End": true, "Retry": [{ "ErrorEquals": ["States.ALL"], "IntervalSeconds": 1, "MaxAttempts": 3, "BackoffRate": 2 }] } } } ] } } } - EMRServiceRole: !Ref EMRServiceRole EMRInputBucket: !ImportValue "MakeshiftDemoBucketName" EMREc2InstanceProfile: !Ref EMREc2InstanceProfile EMRLogS3Bucket: !Ref EMRLogS3Bucket EMRSubnetId: !ImportValue "makeshift-public-subnet-a" EMRJobManagerStateMachineExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: states.amazonaws.com Action: sts:AssumeRole Path: / Policies: - PolicyName: StatesExecutionPolicy PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - iam:PassRole Resource: - !GetAtt EMRServiceRole.Arn - !GetAtt EMREc2Role.Arn - Effect: Allow Action: - elasticmapreduce:RunJobFlow - elasticmapreduce:TerminateJobFlows - elasticmapreduce:DescribeCluster - elasticmapreduce:AddJobFlowSteps - elasticmapreduce:DescribeStep - lambda:InvokeFunction Resource: "*" EMREc2InstanceProfile: Type: AWS::IAM::InstanceProfile Properties: Path: / Roles: - Ref: EMREc2Role EMREc2Role: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2008-10-17 Statement: - Effect: Allow Principal: Service: !Sub ec2.${AWS::URLSuffix} Action: sts:AssumeRole Policies: - PolicyDocument: Statement: - Action: - s3:* Effect: Allow Resource: !GetAtt EMRLogS3Bucket.Arn Version: 2012-10-17 PolicyName: EMREc2RolePolicy - PolicyDocument: Statement: - Action: - s3:* Effect: Allow Resource: - !ImportValue "MakeshiftDemoBucketArn" - Fn::Join: - '' - - !ImportValue "MakeshiftDemoBucketArn" - '/*' Version: 2012-10-17 PolicyName: EMREc2RoleS3AccessPolicy EMRServiceRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2008-10-17 Statement: - Effect: Allow Principal: Service: !Sub elasticmapreduce.${AWS::URLSuffix} Action: sts:AssumeRole ManagedPolicyArns: - !Sub arn:${AWS::Partition}:iam::aws:policy/service-role/AmazonElasticMapReduceRole Policies: - PolicyDocument: Statement: - Action: - s3:* Effect: Allow Resource: - !ImportValue "MakeshiftDemoBucketArn" - Fn::Join: - '' - - !ImportValue "MakeshiftDemoBucketArn" - '/*' Version: 2012-10-17 PolicyName: EMRServiceRoleS3AccessPolicy EMRLogS3Bucket: Type: AWS::S3::Bucket Properties: BucketName: Fn::Sub: makeshift-blog-emrlogs-${AWS::AccountId}-${AWS::Region} BucketEncryption: ServerSideEncryptionConfiguration: - ServerSideEncryptionByDefault: SSEAlgorithm: AES256 Outputs: StateMachineArn: Value: !Ref EMRJobManagerStateMachine Export: Name: "StateMachineArn" ExecutionInput: Description: Sample input to StartExecution. Value: > {}