--- AWSTemplateFormatVersion: '2010-09-09' Description: 'Brings up EMR cluster in No-Auth mode, taking SageMaker vpc, security group, subnets as input. ' Mappings: Studio: s3params: S3Bucket: aws-ml-blog S3Key: artifacts/sma-milestone1/ ClusterConfigurations: emr: masterInstanceCount: 1 BootStrapScriptFile: installpylibs-v2.sh StepScriptFile: configurekdc.sh Parameters: SageMakerProjectName: Type: String Description: Name of the project SageMakerProjectId: Type: String Description: Service generated Id of the project. EmrClusterName: Type: String Description: EMR cluster Name. MasterInstanceType: Type: String Description: Instance type of the EMR master node. Default: m5.xlarge AllowedValues: - m5.xlarge - m5.2xlarge - m5.4xlarge CoreInstanceType: Type: String Description: Instance type of the EMR core nodes. Default: m5.xlarge AllowedValues: - m5.xlarge - m5.2xlarge - m5.4xlarge - m3.medium - m3.large - m3.xlarge - m3.2xlarge CoreInstanceCount: Type: String Description: Number of core instances in the EMR cluster. Default: '2' AllowedValues: - '2' - '5' - '10' EmrReleaseVersion: Type: String Description: The release version of EMR to launch. Default: emr-5.33.1 AllowedValues: - emr-5.33.1 - emr-6.4.0 IdleTimeout: Type: Number Description: Terminate EMR cluster automatically after x amount of seconds of inactivity. Default: 7200 ConstraintDescription: Must be between 60 seconds - 7 days (604800 seconds) VpcId: # This can be hardcoded by DevOps Admin Type: String Description: 'SageMaker Studio VPCId' SagemakerStudioSecurityGroupId: # This can be hardcoded by DevOps Admin Type: String Description: Security group id. SagemakerStudioSubnetId: # This can be hardcoded by DevOps Admin Type: String Description: Subnet id. Resources: S3Bucket: Type: AWS::S3::Bucket Properties: BucketName: !Join ["-", ["sm-emr-sc-blog", !Select [4, !Split ["-", !Select [2, !Split ["/", !Ref AWS::StackId]]]]]] masterSecurityGroup: Type: AWS::EC2::SecurityGroup Properties: GroupDescription: EMR Master SG SecurityGroupEgress: - IpProtocol: -1 FromPort: -1 ToPort: -1 CidrIp: 0.0.0.0/0 VpcId: !Ref VpcId slaveSecurityGroup: Type: AWS::EC2::SecurityGroup Properties: GroupDescription: EMR Slave SG SecurityGroupEgress: - IpProtocol: -1 FromPort: -1 ToPort: -1 CidrIp: 0.0.0.0/0 VpcId: !Ref VpcId emrServiceSecurityGroup: Type: AWS::EC2::SecurityGroup Properties: GroupDescription: EMR Service Access SG SecurityGroupEgress: - IpProtocol: -1 FromPort: -1 ToPort: -1 CidrIp: 0.0.0.0/0 VpcId: !Ref VpcId emrMasterIngressSelfICMP: Type: AWS::EC2::SecurityGroupIngress Properties: GroupId: Ref: masterSecurityGroup IpProtocol: icmp FromPort: -1 ToPort: -1 SourceSecurityGroupId: Ref: masterSecurityGroup emrMasterIngressSlaveICMP: Type: AWS::EC2::SecurityGroupIngress Properties: GroupId: Ref: masterSecurityGroup IpProtocol: icmp FromPort: -1 ToPort: -1 SourceSecurityGroupId: Ref: slaveSecurityGroup emrMasterIngressSelfAllTcp: Type: AWS::EC2::SecurityGroupIngress Properties: GroupId: Ref: masterSecurityGroup IpProtocol: tcp FromPort: 0 ToPort: 65535 SourceSecurityGroupId: Ref: masterSecurityGroup emrMasterIngressSlaveAllTcp: Type: AWS::EC2::SecurityGroupIngress Properties: GroupId: Ref: masterSecurityGroup IpProtocol: tcp FromPort: 0 ToPort: 65535 SourceSecurityGroupId: Ref: slaveSecurityGroup emrMasterIngressSelfAllUdp: Type: AWS::EC2::SecurityGroupIngress Properties: GroupId: Ref: masterSecurityGroup IpProtocol: udp FromPort: 0 ToPort: 65535 SourceSecurityGroupId: Ref: masterSecurityGroup emrMasterIngressSlaveAllUdp: Type: AWS::EC2::SecurityGroupIngress Properties: GroupId: Ref: masterSecurityGroup IpProtocol: udp FromPort: 0 ToPort: 65535 SourceSecurityGroupId: Ref: slaveSecurityGroup emrMasterIngressLivySG: Type: AWS::EC2::SecurityGroupIngress Properties: GroupId: Ref: masterSecurityGroup IpProtocol: tcp FromPort: 8998 ToPort: 8998 SourceSecurityGroupId: !Ref SagemakerStudioSecurityGroupId emrMasterIngressHiveSG: Type: AWS::EC2::SecurityGroupIngress Properties: GroupId: Ref: masterSecurityGroup IpProtocol: tcp FromPort: 10000 ToPort: 10000 SourceSecurityGroupId: !Ref SagemakerStudioSecurityGroupId emrMasterIngressServiceSg: Type: AWS::EC2::SecurityGroupIngress Properties: GroupId: Ref: masterSecurityGroup IpProtocol: tcp FromPort: 8443 ToPort: 8443 SourceSecurityGroupId: Ref: emrServiceSecurityGroup emrServiceIngressMasterSg: Type: AWS::EC2::SecurityGroupIngress Properties: GroupId: Ref: emrServiceSecurityGroup IpProtocol: tcp FromPort: 9443 ToPort: 9443 SourceSecurityGroupId: Ref: masterSecurityGroup emrServiceEgressMaster: Type: AWS::EC2::SecurityGroupEgress Properties: GroupId: Ref: emrServiceSecurityGroup IpProtocol: tcp FromPort: 8443 ToPort: 8443 DestinationSecurityGroupId: Ref: masterSecurityGroup emrServiceEgressSlave: Type: AWS::EC2::SecurityGroupEgress Properties: GroupId: Ref: emrServiceSecurityGroup IpProtocol: tcp FromPort: 8443 ToPort: 8443 DestinationSecurityGroupId: Ref: slaveSecurityGroup emrSlaveIngressSelfICMP: Type: AWS::EC2::SecurityGroupIngress Properties: GroupId: Ref: slaveSecurityGroup IpProtocol: icmp FromPort: -1 ToPort: -1 SourceSecurityGroupId: Ref: slaveSecurityGroup emrSlaveIngressMasterICMP: Type: AWS::EC2::SecurityGroupIngress Properties: GroupId: Ref: slaveSecurityGroup IpProtocol: icmp FromPort: -1 ToPort: -1 SourceSecurityGroupId: Ref: masterSecurityGroup emrSlaveIngressSelfAllTcp: Type: AWS::EC2::SecurityGroupIngress Properties: GroupId: Ref: slaveSecurityGroup IpProtocol: tcp FromPort: 0 ToPort: 65535 SourceSecurityGroupId: Ref: slaveSecurityGroup emrSlaveIngressMasterAllTcp: Type: AWS::EC2::SecurityGroupIngress Properties: GroupId: Ref: slaveSecurityGroup IpProtocol: tcp FromPort: 0 ToPort: 65535 SourceSecurityGroupId: Ref: masterSecurityGroup emrSlaveIngressSelfAllUdp: Type: AWS::EC2::SecurityGroupIngress Properties: GroupId: Ref: slaveSecurityGroup IpProtocol: udp FromPort: 0 ToPort: 65535 SourceSecurityGroupId: Ref: slaveSecurityGroup emrSlaveIngressMasterAllUdp: Type: AWS::EC2::SecurityGroupIngress Properties: GroupId: Ref: slaveSecurityGroup IpProtocol: udp FromPort: 0 ToPort: 65535 SourceSecurityGroupId: Ref: masterSecurityGroup emrSlaveIngressServiceSg: Type: AWS::EC2::SecurityGroupIngress Properties: GroupId: Ref: slaveSecurityGroup IpProtocol: tcp FromPort: 8443 ToPort: 8443 SourceSecurityGroupId: Ref: emrServiceSecurityGroup EMRClusterServiceRole: Properties: AssumeRolePolicyDocument: Statement: - Action: - sts:AssumeRole Effect: Allow Principal: Service: - elasticmapreduce.amazonaws.com Version: '2012-10-17' ManagedPolicyArns: - arn:aws:iam::aws:policy/service-role/AmazonElasticMapReduceRole Path: "/" Type: AWS::IAM::Role EMRClusterinstanceProfile: Properties: Path: "/" Roles: - Ref: EMRClusterinstanceProfileRole Type: AWS::IAM::InstanceProfile EMRClusterinstanceProfileRole: Properties: RoleName: Fn::Sub: "${AWS::StackName}-EMRClusterinstanceProfileRole" AssumeRolePolicyDocument: Statement: - Action: - sts:AssumeRole Effect: Allow Principal: Service: - ec2.amazonaws.com Version: '2012-10-17' ManagedPolicyArns: - arn:aws:iam::aws:policy/service-role/AmazonElasticMapReduceforEC2Role Path: "/" Type: AWS::IAM::Role CleanUpBucketonDelete: DependsOn: CleanUpBucketonDeleteLambda Type: Custom::emptybucket Properties: ServiceToken: Fn::GetAtt: - CleanUpBucketonDeleteLambda - Arn inputBucketName: Ref: S3Bucket CleanUpBucketonDeleteLambda: DependsOn: - S3Bucket - CleanUpBucketonDeleteLambdaRole Type: AWS::Lambda::Function Properties: Description: Empty bucket on delete Handler: index.lambda_handler Role: Fn::GetAtt: - CleanUpBucketonDeleteLambdaRole - Arn Runtime: python3.7 Timeout: 60 Code: ZipFile: Fn::Join: - "\n" - - import json - import boto3 - import urllib3 - '' - 'def empty_bucket(bucket_name):' - ' print("Attempting to empty the bucket {0}".format(bucket_name))' - " s3_client = boto3.client('s3')" - " s3 = boto3.resource('s3')" - '' - " try:" - " bucket = s3.Bucket(bucket_name).load()" - " except ClientError:" - ' print("Bucket {0} does not exist".format(bucket_name))' - " return" - " # Confirm if versioning is enabled" - " version_status = s3_client.get_bucket_versioning(Bucket=bucket_name)" - " status = version_status.get('Status','')" - " if status == 'Enabled':" - " version_status = s3_client.put_bucket_versioning(Bucket=bucket_name," - " VersioningConfiguration={'Status': 'Suspended'})" - " version_paginator = s3_client.get_paginator('list_object_versions')" - " version_iterator = version_paginator.paginate(" - " Bucket=bucket_name" - " )" - '' - " for page in version_iterator:" - " print(page)" - " if 'DeleteMarkers' in page:" - " delete_markers = page['DeleteMarkers']" - " if delete_markers is not None:" - " for delete_marker in delete_markers:" - " key = delete_marker['Key']" - " versionId = delete_marker['VersionId']" - " s3_client.delete_object(Bucket=bucket_name, Key=key, VersionId=versionId)" - " if 'Versions' in page and page['Versions'] is not None:" - " versions = page['Versions']" - " for version in versions:" - " print(version)" - " key = version['Key']" - " versionId = version['VersionId']" - " s3_client.delete_object(Bucket=bucket_name, Key=key, VersionId=versionId)" - " object_paginator = s3_client.get_paginator('list_objects_v2')" - " object_iterator = object_paginator.paginate(" - " Bucket=bucket_name" - " )" - " for page in object_iterator:" - " if 'Contents' in page:" - " for content in page['Contents']:" - " key = content['Key']" - " s3_client.delete_object(Bucket=bucket_name, Key=content['Key'])" - ' print("Successfully emptied the bucket {0}".format(bucket_name))' - '' - '' - '' - 'def lambda_handler(event, context):' - " try:" - " bucket = event['ResourceProperties']['inputBucketName']" - " if event['RequestType'] == 'Delete':" - " empty_bucket(bucket)" - ' sendResponse(event, context, "SUCCESS")' - " except Exception as e:" - " print(e)" - ' sendResponse(event, context, "FAILED")' - '' - 'def sendResponse(event, context, status):' - " http = urllib3.PoolManager()" - " response_body = {'Status': status," - " 'Reason': 'Log stream name: ' + context.log_stream_name," - " 'PhysicalResourceId': context.log_stream_name," - " 'StackId': event['StackId']," - " 'RequestId': event['RequestId']," - " 'LogicalResourceId': event['LogicalResourceId']," - ' ''Data'': json.loads("{}")}' - " http.request('PUT', event['ResponseURL'], body=json.dumps(response_body))" CleanUpBucketonDeleteLambdaRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole Path: "/" Policies: - PolicyName: Fn::Sub: CleanUpBucketonDeleteLambdaPolicy-${AWS::StackName} PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - s3:* Resource: - Fn::GetAtt: - S3Bucket - Arn - Fn::Join: - '' - - Fn::GetAtt: - S3Bucket - Arn - "/" - Fn::Join: - '' - - Fn::GetAtt: - S3Bucket - Arn - "/*" - Effect: Deny Action: - s3:DeleteBucket Resource: "*" - Effect: Allow Action: - logs:* Resource: "*" emrMasterIngressKDCSG: Type: AWS::EC2::SecurityGroupIngress Properties: GroupId: Ref: masterSecurityGroup IpProtocol: tcp FromPort: 88 ToPort: 88 SourceSecurityGroupId: !Ref SagemakerStudioSecurityGroupId emrMasterIngressKDCAdminSG: Type: AWS::EC2::SecurityGroupIngress Properties: GroupId: Ref: masterSecurityGroup IpProtocol: tcp FromPort: 749 ToPort: 749 SourceSecurityGroupId: !Ref SagemakerStudioSecurityGroupId emrMasterIngressKinit464SG: Type: AWS::EC2::SecurityGroupIngress Properties: GroupId: Ref: masterSecurityGroup IpProtocol: tcp FromPort: 464 ToPort: 464 SourceSecurityGroupId: !Ref SagemakerStudioSecurityGroupId allowEMRFSAccessForUser1: Type: AWS::IAM::Role Properties: RoleName: Fn::Sub: "${AWS::StackName}-allowEMRFSAccessForUser1" AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: AWS: Fn::Sub: arn:aws:iam::${AWS::AccountId}:role/${AWS::StackName}-EMRClusterinstanceProfileRole Action: - sts:AssumeRole Path: "/" Policies: - PolicyName: Fn::Sub: "${AWS::StackName}-emrFS-user1" PolicyDocument: Version: '2012-10-17' Statement: - Action: - s3:ListBucket Resource: - Fn::Sub: arn:aws:s3:::${S3Bucket} Effect: Allow - Action: - s3:* Resource: - Fn::Sub: arn:aws:s3:::${S3Bucket}/* Effect: Allow CopyZips: Type: Custom::CopyZips Properties: ServiceToken: Fn::GetAtt: CopyZipsFunction.Arn DestBucket: Ref: S3Bucket SourceBucket: Fn::FindInMap: - Studio - s3params - S3Bucket Prefix: Fn::FindInMap: - Studio - s3params - S3Key Objects: - Fn::FindInMap: - ClusterConfigurations - emr - BootStrapScriptFile - Fn::FindInMap: - ClusterConfigurations - emr - StepScriptFile CopyZipsRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: lambda.amazonaws.com Action: sts:AssumeRole ManagedPolicyArns: - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole Path: "/" Policies: - PolicyName: lambda-copier PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - s3:GetObject Resource: "*" - Effect: Allow Action: - s3:PutObject - s3:DeleteObject Resource: - Fn::Sub: arn:aws:s3:::${S3Bucket}/* CopyZipsFunction: Type: AWS::Lambda::Function Properties: Description: Copies objects from a source S3 bucket to a destination Handler: index.handler Runtime: python3.8 Role: Fn::GetAtt: CopyZipsRole.Arn Timeout: 900 Code: ZipFile: | import json import logging import threading import boto3 import cfnresponse def copy_objects(source_bucket, dest_bucket, prefix, objects): s3 = boto3.client('s3') for o in objects: key = prefix + o copy_source = { 'Bucket': source_bucket, 'Key': key } print('copy_source: %s' % copy_source) print('dest_bucket = %s'%dest_bucket) print('key = %s' %key) s3.copy_object(CopySource=copy_source, Bucket=dest_bucket, Key=key) def delete_objects(bucket, prefix, objects): s3 = boto3.client('s3') objects = {'Objects': [{'Key': prefix + o} for o in objects]} s3.delete_objects(Bucket=bucket, Delete=objects) def timeout(event, context): logging.error('Execution is about to time out, sending failure response to CloudFormation') cfnresponse.send(event, context, cfnresponse.FAILED, {}, None) def handler(event, context): # make sure we send a failure to CloudFormation if the function # is going to timeout timer = threading.Timer((context.get_remaining_time_in_millis() / 1000.00) - 0.5, timeout, args=[event, context]) timer.start() print('Received event: %s' % json.dumps(event)) status = cfnresponse.SUCCESS try: source_bucket = event['ResourceProperties']['SourceBucket'] dest_bucket = event['ResourceProperties']['DestBucket'] prefix = event['ResourceProperties']['Prefix'] objects = event['ResourceProperties']['Objects'] if event['RequestType'] == 'Delete': delete_objects(dest_bucket, prefix, objects) else: copy_objects(source_bucket, dest_bucket, prefix, objects) except Exception as e: logging.error('Exception: %s' % e, exc_info=True) status = cfnresponse.FAILED finally: timer.cancel() cfnresponse.send(event, context, status, {}, None) EMRCluster: DependsOn: - CopyZips - CleanUpBucketonDelete Type: AWS::EMR::Cluster Properties: Name: Ref: EmrClusterName Applications: - Name: Spark - Name: Hive - Name: Livy BootstrapActions: - Name: Dummy bootstrap action ScriptBootstrapAction: Args: - dummy - parameter Path: Fn::Sub: s3://${S3Bucket}/artifacts/sma-milestone1/installpylibs-v2.sh AutoScalingRole: EMR_AutoScaling_DefaultRole Configurations: - Classification: livy-conf ConfigurationProperties: livy.server.session.timeout: 2h EbsRootVolumeSize: 100 Instances: CoreInstanceGroup: EbsConfiguration: EbsBlockDeviceConfigs: - VolumeSpecification: SizeInGB: '320' VolumeType: gp2 VolumesPerInstance: '1' EbsOptimized: 'true' InstanceCount: Ref: CoreInstanceCount InstanceType: Ref: CoreInstanceType Market: ON_DEMAND Name: coreNode MasterInstanceGroup: EbsConfiguration: EbsBlockDeviceConfigs: - VolumeSpecification: SizeInGB: '320' VolumeType: gp2 VolumesPerInstance: '1' EbsOptimized: 'true' InstanceCount: 1 InstanceType: Ref: MasterInstanceType Market: ON_DEMAND Name: masterNode Ec2SubnetId: !Ref SagemakerStudioSubnetId EmrManagedMasterSecurityGroup: Ref: masterSecurityGroup EmrManagedSlaveSecurityGroup: Ref: slaveSecurityGroup ServiceAccessSecurityGroup: Ref: emrServiceSecurityGroup TerminationProtected: false JobFlowRole: Ref: EMRClusterinstanceProfile LogUri: Fn::Sub: s3://${S3Bucket}/artifacts/sma-milestone1/ ReleaseLabel: Ref: EmrReleaseVersion ServiceRole: Ref: EMRClusterServiceRole VisibleToAllUsers: true Steps: - ActionOnFailure: CONTINUE HadoopJarStep: Args: - Fn::Sub: s3://${S3Bucket}/artifacts/sma-milestone1/configurekdc.sh Jar: Fn::Sub: s3://${AWS::Region}.elasticmapreduce/libs/script-runner/script-runner.jar MainClass: '' Name: run any bash or java job in spark AutoTerminationPolicy: IdleTimeout: 'Ref': IdleTimeout Outputs: S3BucketName: Description: Bucket Name for Amazon S3 bucket Value: Ref: S3Bucket EMRMasterDNSName: Description: DNS Name of the EMR Master Node Value: Fn::GetAtt: - EMRCluster - MasterPublicDNS