# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: MIT-0 # # Permission is hereby granted, free of charge, to any person obtaining a copy of this # software and associated documentation files (the "Software"), to deal in the Software # without restriction, including without limitation the rights to use, copy, modify, # merge, publish, distribute, sublicense, and/or sell copies of the Software, and to # permit persons to whom the Software is furnished to do so. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, # INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A # PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT # HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE # SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. Description: >- This is a cloudformation template to perform pre-processing of VCF temples on EMR for genomics analysis Parameters: InputS3Path: Description: "Input S3 path containing the VCF (sample) files in .bgz compressed format or uncompressed format" Type: String Default: s3://redshift-demos/genomics/sample_vcfs/ SamplesPerCluster: Description: "Number of VCF files (Samples) that needs to be processed per EMR cluster. Based on this value number of EMR clusters would be launched (=total number of Samples/Samples per cluster)" Type: Number Default: 100 OutputS3Path: Description: "S3 path where output samples are written in parquet format after pre-processing from EMR " Type: String HailScriptPath: Description: "(Optional) S3 path to pyspark script to be used to covert VCF files into parquet format. Leave default to use the existing pyspark script to convert VCF files to parquet." Type: String Default: s3://redshift-demos/genomics/vcfToParquetTransform.py HailBucketName: Description: "S3 bucket name in your account to store EMR logs" Type: String EMRClusterName: Description: "EMR Cluster name with hail installed to pre-process samples" Type: String Default: HailTestEMR EMRConfigs: Description: "S3 Bucket path containing custom configurations for EMR cluster to pre-process sample files (leave as default if you do not have specific requirements to configure EMR). See https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-configure-apps.html" Type: String Default: s3://redshift-demos/genomics/emr_config.json EmrMasterInstanceSize: AllowedValues: - "c1.medium" - "c1.xlarge" - "c3.xlarge" - "c3.2xlarge" - "c3.4xlarge" - "c3.8xlarge" - "c4.large" - "c4.xlarge" - "c4.2xlarge" - "c4.4xlarge" - "c4.8xlarge" - "c5.xlarge" - "c5.2xlarge" - "c5.4xlarge" - "c5.9xlarge" - "c5.18xlarge" - "cc2.8xlarge" - "cg1.4xlarge" - "cr1.8xlarge" - "d2.xlarge" - "d2.2xlarge" - "d2.4xlarge" - "d2.8xlarge" - "g2.2xlarge" - "hi1.4xlarge" - "hs1.8xlarge" - "i2.xlarge" - "i2.2xlarge" - "i2.4xlarge" - "i2.8xlarge" - "i3.xlarge" - "i3.2xlarge" - "i3.4xlarge" - "i3.8xlarge" - "i3.16xlarge" - "m1.medium" - "m1.large" - "m1.xlarge" - "m2.xlarge" - "m2.2xlarge" - "m2.4xlarge" - "m3.xlarge" - "m3.2xlarge" - "m4.large" - "m4.xlarge" - "m4.2xlarge" - "m4.4xlarge" - "m4.10xlarge" - "m4.16xlarge" - "m5.xlarge" - "m5.2xlarge" - "m5.4xlarge" - "m5.12xlarge" - "m5.24xlarge" - "p2.xlarge" - "p2.8xlarge" - "p2.16xlarge" - "p3.2xlarge" - "p3.8xlarge" - "p3.16xlarge" - "r3.xlarge" - "r3.2xlarge" - "r3.4xlarge" - "r3.8xlarge" - "r4.xlarge" - "r4.2xlarge" - "r4.4xlarge" - "r4.8xlarge" - "r4.16xlarge" - "r5.xlarge" - "r5.2xlarge" - "r5.4xlarge" - "r5.12xlarge" - "r5a.xlarge" - "r5a.2xlarge" - "r5a.4xlarge" - "r5a.12xlarge" - "r5a.24xlarge" - "r5d.xlarge" - "r5d.2xlarge" - "r5d.4xlarge" - "r5d.12xlarge" - "r5d.24xlarge" Default: "m5.xlarge" Type: "String" EmrCoreInstanceSize: AllowedValues: - "c1.medium" - "c1.xlarge" - "c3.xlarge" - "c3.2xlarge" - "c3.4xlarge" - "c3.8xlarge" - "c4.large" - "c4.xlarge" - "c4.2xlarge" - "c4.4xlarge" - "c4.8xlarge" - "c5.xlarge" - "c5.2xlarge" - "c5.4xlarge" - "c5.9xlarge" - "c5.18xlarge" - "cc2.8xlarge" - "cg1.4xlarge" - "cr1.8xlarge" - "d2.xlarge" - "d2.2xlarge" - "d2.4xlarge" - "d2.8xlarge" - "g2.2xlarge" - "hi1.4xlarge" - "hs1.8xlarge" - "i2.xlarge" - "i2.2xlarge" - "i2.4xlarge" - "i2.8xlarge" - "i3.xlarge" - "i3.2xlarge" - "i3.4xlarge" - "i3.8xlarge" - "i3.16xlarge" - "m1.medium" - "m1.large" - "m1.xlarge" - "m2.xlarge" - "m2.2xlarge" - "m2.4xlarge" - "m3.xlarge" - "m3.2xlarge" - "m4.large" - "m4.xlarge" - "m4.2xlarge" - "m4.4xlarge" - "m4.10xlarge" - "m4.16xlarge" - "m5.xlarge" - "m5.2xlarge" - "m5.4xlarge" - "m5.12xlarge" - "m5.24xlarge" - "p2.xlarge" - "p2.8xlarge" - "p2.16xlarge" - "p3.2xlarge" - "p3.8xlarge" - "p3.16xlarge" - "r3.xlarge" - "r3.2xlarge" - "r3.4xlarge" - "r3.8xlarge" - "r4.xlarge" - "r4.2xlarge" - "r4.4xlarge" - "r4.8xlarge" - "r4.16xlarge" - "r5.xlarge" - "r5.2xlarge" - "r5.4xlarge" - "r5.12xlarge" - "r5a.xlarge" - "r5a.2xlarge" - "r5a.4xlarge" - "r5a.12xlarge" - "r5a.24xlarge" - "r5d.xlarge" - "r5d.2xlarge" - "r5d.4xlarge" - "r5d.12xlarge" - "r5d.24xlarge" Default: "r5.xlarge" Description: "" Type: "String" EmrCoreInstanceCount: Default: 2 Description: "Must be 1 or greater. Number of core nodes to be launched for EMR, leave default value to launch EMR with pre-configured setup" MinValue: 1 # Cannot be 0. Type: "Number" EmrTaskInstanceSize: AllowedValues: - "c1.medium" - "c1.xlarge" - "c3.xlarge" - "c3.2xlarge" - "c3.4xlarge" - "c3.8xlarge" - "c4.large" - "c4.xlarge" - "c4.2xlarge" - "c4.4xlarge" - "c4.8xlarge" - "c5.xlarge" - "c5.2xlarge" - "c5.4xlarge" - "c5.9xlarge" - "c5.18xlarge" - "cc2.8xlarge" - "cg1.4xlarge" - "cr1.8xlarge" - "d2.xlarge" - "d2.2xlarge" - "d2.4xlarge" - "d2.8xlarge" - "g2.2xlarge" - "hi1.4xlarge" - "hs1.8xlarge" - "i2.xlarge" - "i2.2xlarge" - "i2.4xlarge" - "i2.8xlarge" - "i3.xlarge" - "i3.2xlarge" - "i3.4xlarge" - "i3.8xlarge" - "i3.16xlarge" - "m1.medium" - "m1.large" - "m1.xlarge" - "m2.xlarge" - "m2.2xlarge" - "m2.4xlarge" - "m3.xlarge" - "m3.2xlarge" - "m4.large" - "m4.xlarge" - "m4.2xlarge" - "m4.4xlarge" - "m4.10xlarge" - "m4.16xlarge" - "m5.xlarge" - "m5.2xlarge" - "m5.4xlarge" - "m5.12xlarge" - "m5.24xlarge" - "p2.xlarge" - "p2.8xlarge" - "p2.16xlarge" - "p3.2xlarge" - "p3.8xlarge" - "p3.16xlarge" - "r3.xlarge" - "r3.2xlarge" - "r3.4xlarge" - "r3.8xlarge" - "r4.xlarge" - "r4.2xlarge" - "r4.4xlarge" - "r4.8xlarge" - "r4.16xlarge" - "r5.xlarge" - "r5.2xlarge" - "r5.4xlarge" - "r5.12xlarge" - "r5a.xlarge" - "r5a.2xlarge" - "r5a.4xlarge" - "r5a.12xlarge" - "r5a.24xlarge" - "r5d.xlarge" - "r5d.2xlarge" - "r5d.4xlarge" - "r5d.12xlarge" - "r5d.24xlarge" Default: "r5.xlarge" Description: "" Type: "String" EmrTaskInstanceCount: Default: 1 Description: "Must be 1 or greater. Number of task nodes to be launched for EMR, leave default value to launch EMR with pre-configured setup" MinValue: 1 # Cannot be 0. Type: "Number" VPCTarget: Type: "String" Description: "Choose \"new\" to use the AWS VPC Quick Start to create a new VPC with three public and three private subnets. If you choose \"existing\", VPCId and SubnetId network parameters are required." AllowedValues: - "new" - "existing" Default: "existing" VPCCIDR: Type: "String" AllowedPattern: "^(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])(\\/(1[6-9]|2[0-8]))$" Description: "Required for a new VPC. A /16 address space is recommended for a new VPC." Default: "10.0.0.0/16" VPCId: Description: "Required for existing VPC target." Type: AWS::EC2::VPC::Id Default: "" VPCSubnetId: Description: "Required for an existing VPC target. Subnet ID in the existing VPC in which EMR Cluster will be launched. A private subnet is recommended." Type: AWS::EC2::Subnet::Id Default: "" ConstraintDescription: "must be the subnet from selected VPC" Metadata: AWS::CloudFormation::Interface: ParameterGroups: - Label: default: "Required" Parameters: - "InputS3Path" - "OutputS3Path" - "HailBucketName" - "VPCTarget" - "VPCCIDR" - "VPCId" - "VPCSubnetId" - Label: default: "Optional" Parameters: - "HailScriptPath" - "SamplesPerCluster" - "EMRClusterName" - "EMRConfigs" - "EmrMasterInstanceSize" - "EmrCoreInstanceSize" - "EmrCoreInstanceCount" - "EmrTaskInstanceSize" - "EmrTaskInstanceCount" ParameterLabels: InputS3Path: default: "Input S3 path to Sample files" SamplesPerCluster: default: "Number of Samples per EMR cluster" HailScriptPath: default: "Pyspark script S3 path" OutputS3Path: default: "Output S3 path for pre-processed samples" HailBucketName: default: "EMR Logs S3 Bucket name" EMRClusterName: default: "EMR Cluster name with Hail 0.2 installed" EMRConfigs: default: "EMR Custom Configurations S3 Bucket path" EmrMasterInstanceSize: default: "EMR Master Instance type" EmrCoreInstanceSize: default: "EMR Core instance type" EmrCoreInstanceCount: default: "EMR Core instance count" EmrTaskInstanceSize: default: "EMR Task instance type" EmrTaskInstanceCount: default: "EMR Task instance count" VPCTarget: default: "Target VPC" VPCCIDR: default: "New VPC CIDR" VPCId: default: "Existing VPC Id" VPCSubnetId: default: "Existing Subnet Id" Conditions: createVpc: !Equals [!Ref VPCTarget, "new"] Mappings: Region: eu-north-1: ami: 'ami-0097c8916181505c5' ap-south-1: ami: 'ami-0cc18a6e8cf105185' eu-west-3: ami: 'ami-09f35326ba84d2ee0' eu-west-2: ami: 'ami-04bbc6780b6719abe' eu-west-1: ami: 'ami-05adfeb1ffea4f488' ap-northeast-2: ami: 'ami-0fac2662a22702e92' ap-northeast-1: ami: 'ami-0a2a15ed71805f23d' sa-east-1: ami: 'ami-0ea74a00f1109fe14' ca-central-1: ami: 'ami-052c9e8e247ad39b1' ap-southeast-1: ami: 'ami-07124736552a4152b' ap-southeast-2: ami: 'ami-0fa25f9d65099152c' eu-central-1: ami: 'ami-0a9294d79a555d742' us-east-1: ami: 'ami-0f33e21674eed03c6' us-east-2: ami: 'ami-03cc99a0a57b9a8f4' us-west-1: ami: 'ami-0ed287d132c16a457' us-west-2: ami: 'ami-083d074beb4c62cfc' Resources: EMRGenomics: Type: 'AWS::Lambda::Function' Properties: Handler: index.handler Role: !GetAtt - TestLambdaExecutionRole - Arn Code: ZipFile: !Join - |+ - - from __future__ import print_function - import json - import subprocess - import boto3 - import time - import os - import cfnresponse - paginator = boto3.client('s3').get_paginator('list_objects_v2') - s3 = boto3.client('s3') - 'def handler(event, context):' - ' input_s3 = os.environ["INPUT_S3"]' - ' s3_script_path = os.environ["S3_SCRIPT"]' - ' samples_per_cluster = os.environ["SAMPLES_PER_CLUSTER"]' - ' output_s3 = os.environ["OUTPUT_S3"]' - ' hail_bucket = os.environ["HAIL_BUCKET"]' - ' cluster_name = os.environ["CLUSTER_NAME"]' - ' emr_config = os.environ["EMR_CONFIG"]' - ' master_type = os.environ["MASTER_TYPE"]' - ' core_type = os.environ["CORE_TYPE"]' - ' core_count = os.environ["CORE_COUNT"]' - ' task_type = os.environ["TASK_TYPE"]' - ' task_count = os.environ["TASK_COUNT"]' - ' subnetid = os.environ["SUBNET_ID"]' - ' customAmiId = os.environ["CUSTOM_AMI"]' - ' service_role = os.environ["EMR_SERVICE"]' - ' emr_ec2 = os.environ["EMR_EC2_ROLE"]' - ' autoscaling_role = os.environ["EMR_AUTOSCALING_ROLE"]' - ' bucket = emr_config.replace("s3://","",1).split("/",1)[0]' - ' key = emr_config.replace("s3://","",1).split("/",1)[1]' - ' inpbucket = input_s3.replace("s3://","",1).split("/",1)[0]' - ' inpkey = input_s3.replace("s3://","",1).split("/",1)[1].rsplit("/",1)[0]' - ' outputData = {}' - ' try:' - ' response = s3.get_object(Bucket = bucket, Key = key)' - ' content = response["Body"]' - ' jsonObject = json.loads(content.read())' - ' maxkey=int(samples_per_cluster)' - ' pages = paginator.paginate(Bucket=inpbucket,Prefix=inpkey,MaxKeys=maxkey)' - ' params=''''' - ' for page in pages:' - ' for obj in page[''Contents'']:' - ' if "bgz" in str(obj["Key"]):' - ' params=params+str(obj[''Key'']).split("/")[-1]+","' - " params=params[:-1]" - " print(params)" - " cluster_id=boto3.client('emr').run_job_flow(Name=cluster_name,LogUri=f's3://{hail_bucket}/elasticmapreduce/',ReleaseLabel='emr-5.29.0', BootstrapActions=[{'Name':'pythonDateutilReinstall','ScriptBootstrapAction':{'Path':'file:/usr/bin/sudo','Args':['python3','-m','pip','install','-I','python-dateutil']}}], Instances={'InstanceGroups':[{'Name':'Master Instance Group','Market':'ON_DEMAND','InstanceRole':'MASTER','InstanceType':master_type,'InstanceCount':1, 'EbsConfiguration':{'EbsBlockDeviceConfigs':[{ 'VolumeSpecification':{ 'VolumeType':'gp2', 'SizeInGB':32 }, 'VolumesPerInstance':2 }]}}, {'Name':'Core Instance Group','Market':'ON_DEMAND','InstanceRole':'CORE','InstanceType':core_type,'InstanceCount':int(core_count),'EbsConfiguration':{'EbsBlockDeviceConfigs':[{ 'VolumeSpecification':{ 'VolumeType':'gp2', 'SizeInGB':1500 }, 'VolumesPerInstance':1 }]}},{'Name':'Task Instance Group','Market':'ON_DEMAND','InstanceRole':'TASK','InstanceType':task_type,'InstanceCount':int(task_count),'EbsConfiguration':{'EbsBlockDeviceConfigs':[{ 'VolumeSpecification':{ 'VolumeType':'gp2', 'SizeInGB':1500 }, 'VolumesPerInstance':1 }]}}],'KeepJobFlowAliveWhenNoSteps':False,'Ec2SubnetId':subnetid}, Applications=[{'Name':'Ganglia'},{'Name':'Hadoop'},{'Name':'Hive'},{'Name':'Livy'},{'Name':'Spark'}],Configurations=jsonObject,ServiceRole=service_role,VisibleToAllUsers=True,JobFlowRole=emr_ec2, Tags=[{'Key':'owner','Value':''},{'Key':'environment','Value':'development'},{'Key':'allow-emr-ssm','Value':'false'},{'Key':'Name','Value':'hailtest'},], Steps=[ {'Name':'hailApachePlotDir','ActionOnFailure':'CONTINUE','HadoopJarStep':{'Jar':'command-runner.jar','Args':['sudo','mkdir','-p','/var/www/html/plots']}},{'Name':'hailMainPlotDir','ActionOnFailure':'CONTINUE','HadoopJarStep':{'Jar':'command-runner.jar','Args':['sudo','ln','-s','/var/www/html/plots','/plots']}}, {'Name':'hailLivyPlotOwnership','ActionOnFailure':'CONTINUE','HadoopJarStep':{'Jar':'command-runner.jar','Args':['sudo','chown','livy:livy','/var/www/html/plots']}}, {'Name':'vepOwnership','ActionOnFailure':'CONTINUE','HadoopJarStep':{'Jar':'command-runner.jar','Args':['sudo','bash','-c','if test -d /opt/vep/; then chown -R hadoop:hadoop /opt/vep; fi']}}, {'Name':'clusterManifestToS3','ActionOnFailure':'CONTINUE','HadoopJarStep':{'Jar':'command-runner.jar','Args':['/usr/local/bin/cluster_manifest.sh']}}, {'Name':'Spark Application','ActionOnFailure':'CONTINUE','HadoopJarStep':{'Jar':'command-runner.jar','Args':['spark-submit','--conf','spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2','--conf','spark.sql.parquet.filterPushdown=true','--conf','spark.sql.parquet.fs.optimized.committer.optimization-enabled=true','--conf','spark.sql.catalogImplementation=hive','--conf','spark.executor.cores=1','--conf','spark.executor.instances=1','--conf','spark.sql.shuffle.partitions=1000','--conf','spark.executor.memory=10g','--jars','/opt/hail/hail-all-spark.jar',s3_script_path,params,input_s3,output_s3]}}], AutoScalingRole=autoscaling_role,ScaleDownBehavior='TERMINATE_AT_TASK_COMPLETION',CustomAmiId=customAmiId,EbsRootVolumeSize=100)" - " print(cluster_id['JobFlowId'])" - " outputData['Result'] = cluster_id['JobFlowId']" - " cfnresponse.send(event, context, cfnresponse.SUCCESS, outputData, {})" - " params=''" - " time.sleep(10)" - ' except Exception as e:' - ' print(e)' - ' cfnresponse.send(event, context, cfnresponse.FAILED, { ''error'': str(e) }, {})' - ' return' Timeout: '60' MemorySize: '256' Environment: Variables: INPUT_S3: !Ref InputS3Path S3_SCRIPT: !Ref HailScriptPath SAMPLES_PER_CLUSTER: !Ref SamplesPerCluster OUTPUT_S3: !Ref OutputS3Path HAIL_BUCKET: !Ref HailBucketName CLUSTER_NAME: !Ref EMRClusterName EMR_CONFIG: !Ref EMRConfigs MASTER_TYPE: !Ref EmrMasterInstanceSize CORE_TYPE: !Ref EmrCoreInstanceSize CORE_COUNT: !Ref EmrCoreInstanceCount TASK_TYPE: !Ref EmrTaskInstanceSize TASK_COUNT: !Ref EmrTaskInstanceCount SUBNET_ID: !If [createVpc, !GetAtt 'vpc.Outputs.PrivateSubnet1AID', !Ref VPCSubnetId] CUSTOM_AMI: !FindInMap [ Region, !Ref "AWS::Region", ami ] EMR_SERVICE: !Ref emrclusterHailEMR EMR_EC2_ROLE: !Ref instanceProfileEmrEc2 EMR_AUTOSCALING_ROLE: !Ref emrautoscalingHailEMR Runtime: python3.7 EventEMRGenomics: Type: 'AWS::Lambda::Function' Properties: Handler: index.handler Role: !GetAtt - TestLambdaExecutionRole - Arn Code: ZipFile: !Join - |+ - - from __future__ import print_function - import json - import subprocess - import boto3 - import time - import os - import cfnresponse - import logging - import traceback - paginator = boto3.client('s3').get_paginator('list_objects_v2') - s3 = boto3.client('s3') - 'def load_log_config():' - ' root = logging.getLogger()' - ' root.setLevel(logging.INFO)' - ' return root' - 'logger = load_log_config()' - 'def handler(event, context):' - ' s3_script_path = os.environ["S3_SCRIPT"]' - ' output_s3 = os.environ["OUTPUT_S3"]' - ' hail_bucket = os.environ["HAIL_BUCKET"]' - ' cluster_name = os.environ["CLUSTER_NAME"]' - ' emr_config = os.environ["EMR_CONFIG"]' - ' master_type = os.environ["MASTER_TYPE"]' - ' core_type = os.environ["CORE_TYPE"]' - ' core_count = os.environ["CORE_COUNT"]' - ' task_type = os.environ["TASK_TYPE"]' - ' task_count = os.environ["TASK_COUNT"]' - ' subnetid = os.environ["SUBNET_ID"]' - ' customAmiId = os.environ["CUSTOM_AMI"]' - ' service_role = os.environ["EMR_SERVICE"]' - ' emr_ec2 = os.environ["EMR_EC2_ROLE"]' - ' autoscaling_role = os.environ["EMR_AUTOSCALING_ROLE"]' - ' bucket = emr_config.replace("s3://","",1).split("/",1)[0]' - ' key = emr_config.replace("s3://","",1).split("/",1)[1]' - ' try:' - ' response = s3.get_object(Bucket = bucket, Key = key)' - ' content = response["Body"]' - ' jsonObject = json.loads(content.read())' - " lambda_message = event['Records'][0]" - " inpbucket = lambda_message['s3']['bucket']['name']" - " inpkey = lambda_message['s3']['object']['key']" - " p_full_path = inpkey" - " p_base_file_name = os.path.basename(p_full_path)" - " print(p_base_file_name)" - " prefix = inpkey.split('/')[0]" - " input_s3 = 's3://' + inpbucket + '/' + prefix + '/'" - " maxkey=10" - " if not key.endswith('/'):" - " pages = paginator.paginate(Bucket=inpbucket,Prefix=prefix,MaxKeys=maxkey)" - " params=''" - " for page in pages:" - " for obj in page['Contents']:" - " params=params+str(obj['Key']).split('/')[1]+','" - " params=params[:-1]" - " print(params)" - " cluster_id=boto3.client('emr').run_job_flow(Name=cluster_name,LogUri=f's3://{hail_bucket}/elasticmapreduce/',ReleaseLabel='emr-5.29.0', BootstrapActions=[{'Name':'pythonDateutilReinstall','ScriptBootstrapAction':{'Path':'file:/usr/bin/sudo','Args':['python3','-m','pip','install','-I','python-dateutil']}}], Instances={'InstanceGroups':[{'Name':'Master Instance Group','Market':'ON_DEMAND','InstanceRole':'MASTER','InstanceType':master_type,'InstanceCount':1, 'EbsConfiguration':{'EbsBlockDeviceConfigs':[{ 'VolumeSpecification':{ 'VolumeType':'gp2', 'SizeInGB':32 }, 'VolumesPerInstance':2 }]}}, {'Name':'Core Instance Group','Market':'ON_DEMAND','InstanceRole':'CORE','InstanceType':core_type,'InstanceCount':int(core_count),'EbsConfiguration':{'EbsBlockDeviceConfigs':[{ 'VolumeSpecification':{ 'VolumeType':'gp2', 'SizeInGB':1500 }, 'VolumesPerInstance':1 }]}},{'Name':'Task Instance Group','Market':'ON_DEMAND','InstanceRole':'TASK','InstanceType':task_type,'InstanceCount':int(task_count),'EbsConfiguration':{'EbsBlockDeviceConfigs':[{ 'VolumeSpecification':{ 'VolumeType':'gp2', 'SizeInGB':1500 }, 'VolumesPerInstance':1 }]}}],'KeepJobFlowAliveWhenNoSteps':False,'Ec2SubnetId':subnetid}, Applications=[{'Name':'Ganglia'},{'Name':'Hadoop'},{'Name':'Hive'},{'Name':'Livy'},{'Name':'Spark'}],Configurations=jsonObject,ServiceRole=service_role,VisibleToAllUsers=True,JobFlowRole=emr_ec2, Tags=[{'Key':'owner','Value':''},{'Key':'environment','Value':'development'},{'Key':'allow-emr-ssm','Value':'false'},{'Key':'Name','Value':'hailtest'},], Steps=[ {'Name':'hailApachePlotDir','ActionOnFailure':'CONTINUE','HadoopJarStep':{'Jar':'command-runner.jar','Args':['sudo','mkdir','-p','/var/www/html/plots']}},{'Name':'hailMainPlotDir','ActionOnFailure':'CONTINUE','HadoopJarStep':{'Jar':'command-runner.jar','Args':['sudo','ln','-s','/var/www/html/plots','/plots']}}, {'Name':'hailLivyPlotOwnership','ActionOnFailure':'CONTINUE','HadoopJarStep':{'Jar':'command-runner.jar','Args':['sudo','chown','livy:livy','/var/www/html/plots']}}, {'Name':'vepOwnership','ActionOnFailure':'CONTINUE','HadoopJarStep':{'Jar':'command-runner.jar','Args':['sudo','bash','-c','if test -d /opt/vep/; then chown -R hadoop:hadoop /opt/vep; fi']}}, {'Name':'clusterManifestToS3','ActionOnFailure':'CONTINUE','HadoopJarStep':{'Jar':'command-runner.jar','Args':['/usr/local/bin/cluster_manifest.sh']}}, {'Name':'Spark Application','ActionOnFailure':'CONTINUE','HadoopJarStep':{'Jar':'command-runner.jar','Args':['spark-submit','--conf','spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2','--conf','spark.sql.parquet.filterPushdown=true','--conf','spark.sql.parquet.fs.optimized.committer.optimization-enabled=true','--conf','spark.sql.catalogImplementation=hive','--conf','spark.executor.cores=1','--conf','spark.executor.instances=1','--conf','spark.sql.shuffle.partitions=1000','--conf','spark.executor.memory=10g','--jars','/opt/hail/hail-all-spark.jar',s3_script_path,params,input_s3,output_s3]}}], AutoScalingRole=autoscaling_role,ScaleDownBehavior='TERMINATE_AT_TASK_COMPLETION',CustomAmiId=customAmiId,EbsRootVolumeSize=100)" - " print(cluster_id['JobFlowId'])" - " params=''" - " time.sleep(10)" - " else:" - " logger.info('Event was triggered by a folder')" - " logger.info('bucket: '+bucket)" - " logger.info('key: '+key)" - ' except Exception as e:' - ' print(e)' - ' track = traceback.format_exc()' - ' message = {' - ' "ErrorMessage" : str(e),' - ' "StackTrace" : track,' - ' "BaseFileName" : p_base_file_name' - ' }' - ' return' Timeout: '60' MemorySize: '256' Environment: Variables: S3_SCRIPT: !Ref HailScriptPath OUTPUT_S3: !Ref OutputS3Path HAIL_BUCKET: !Ref HailBucketName CLUSTER_NAME: !Ref EMRClusterName EMR_CONFIG: !Ref EMRConfigs MASTER_TYPE: !Ref EmrMasterInstanceSize CORE_TYPE: !Ref EmrCoreInstanceSize CORE_COUNT: !Ref EmrCoreInstanceCount TASK_TYPE: !Ref EmrTaskInstanceSize TASK_COUNT: !Ref EmrTaskInstanceCount SUBNET_ID: !If [createVpc, !GetAtt 'vpc.Outputs.PrivateSubnet1AID', !Ref VPCSubnetId] CUSTOM_AMI: !FindInMap [ Region, !Ref "AWS::Region", ami ] EMR_SERVICE: !Ref emrclusterHailEMR EMR_EC2_ROLE: !Ref instanceProfileEmrEc2 EMR_AUTOSCALING_ROLE: !Ref emrautoscalingHailEMR Runtime: python3.7 Primerinvoke: Type: AWS::CloudFormation::CustomResource Version: "1.0" Properties: ServiceToken: !GetAtt EMRGenomics.Arn TestLambdaExecutionRole: Type: 'AWS::IAM::Role' Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - 'sts:AssumeRole' Path: / Policies: - PolicyName: cwlogs PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: 'logs:CreateLogGroup' Resource: !Join - ':' - - 'arn:aws:logs' - !Ref 'AWS::Region' - !Ref 'AWS::AccountId' - '*' - Effect: Allow Action: - 'logs:CreateLogStream' - 'logs:PutLogEvents' Resource: - !Join - ':' - - 'arn:aws:logs' - !Ref 'AWS::Region' - !Ref 'AWS::AccountId' - log-group - /aws/lambda/* - '*' ManagedPolicyArns: - 'arn:aws:iam::aws:policy/AmazonS3FullAccess' - 'arn:aws:iam::aws:policy/AmazonElasticMapReduceFullAccess' emrclusterHailEMR: Type: "AWS::IAM::Role" Properties: AssumeRolePolicyDocument: Version: "2008-10-17" Statement: - Sid: "" Effect: "Allow" Principal: Service: "elasticmapreduce.amazonaws.com" Action: "sts:AssumeRole" Path: "/" ManagedPolicyArns: - "arn:aws:iam::aws:policy/service-role/AmazonElasticMapReduceRole" RoleName: "emr-cluster-EMRHail17" emrec2HailEMR: Type: "AWS::IAM::Role" Properties: AssumeRolePolicyDocument: Version: "2008-10-17" Statement: - Sid: "" Effect: "Allow" Principal: Service: "ec2.amazonaws.com" Action: "sts:AssumeRole" ManagedPolicyArns: - "arn:aws:iam::aws:policy/service-role/AmazonElasticMapReduceforEC2Role" - "arn:aws:iam::aws:policy/AmazonSSMManagedInstanceCore" Path: "/" Policies: - PolicyName: "s3ManifestPutObject" PolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Action: "s3:PutObject" Resource: !Join [ "", ["arn:aws:s3:::", !Ref HailBucketName, ":/manifests/*"]] - PolicyName: "cloudformationDescribeStack" PolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Action: "cloudformation:DescribeStacks" Resource: !Ref "AWS::StackId" RoleName: 'emr-ec2-EMRHail17' emrautoscalingHailEMR: Type: 'AWS::IAM::Role' Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - elasticmapreduce.amazonaws.com - application-autoscaling.amazonaws.com Action: - 'sts:AssumeRole' Path: / ManagedPolicyArns: - 'arn:aws:iam::aws:policy/service-role/AmazonElasticMapReduceforAutoScalingRole' RoleName: 'emr-autoscaling-EMRHail17' instanceProfileEmrEc2: Type: "AWS::IAM::InstanceProfile" Properties: Path: "/" Roles: - !Ref emrec2HailEMR InstanceProfileName: !Ref emrec2HailEMR vpc: Type: "AWS::CloudFormation::Stack" Condition: createVpc Properties: TemplateURL: !Sub - 'https://aws-quickstart.s3.${S3Region}.${AWS::URLSuffix}/quickstart-hail/submodules/quickstart-aws-vpc/templates/aws-vpc.template' - S3Region: !Ref 'AWS::Region' Parameters: AvailabilityZones: !Join - ',' - - !Select - 0 - Fn::GetAZs: !Ref 'AWS::Region' - !Select - 1 - Fn::GetAZs: !Ref 'AWS::Region' VPCCIDR: !Ref VPCCIDR PublicSubnet1CIDR: !Select [ 0, !Cidr [ !Ref VPCCIDR, 6, 12 ]] PrivateSubnet1ACIDR: !Select [ 1, !Cidr [ !Ref VPCCIDR, 6, 12 ]] PrivateSubnet1BCIDR: !Select [ 2, !Cidr [ !Ref VPCCIDR, 6, 12 ]] PublicSubnet2CIDR: !Select [ 3, !Cidr [ !Ref VPCCIDR, 6, 12 ]] PrivateSubnet2ACIDR: !Select [ 4, !Cidr [ !Ref VPCCIDR, 6, 12 ]] PrivateSubnet2BCIDR: !Select [ 5, !Cidr [ !Ref VPCCIDR, 6, 12 ]] TimeoutInMinutes: 10 Outputs: Result: Value: !GetAtt - Primerinvoke - Result