Description: > This template will deploy an EMR cluster and the necessary security groups. Parameters: EnvironmentName: Description: An environment name that will be prefixed to resource names Type: String Default: workshop VPC: Type: String Description: Choose which VPC the EMR cluster and SageMaker notebook should be deployed to PublicSubnet: Description: Choose which subnets the EMR Cluster should be deployed to Type: String OutputS3Bucket: Description: Logs S3 Bucket for EMR Type: String Metadata: AWS::CloudFormation::Interface: ParameterGroups: - Label: default: "EMR Configuration" Parameters: - OutputS3Bucket - Label: default: "Environment Configuration" Parameters: - EnvironmentName - VPC - PublicSubnet ParameterLabels: EnvironmentName: default: "Environment Name" OutputS3Bucket: default: "EMR Logs S3 Bucket" Resources: MasterSecurityGroup: Type: 'AWS::EC2::SecurityGroup' Properties: GroupName: !Sub '${EnvironmentName}-MasterSecurityGroup' GroupDescription: Master security group for EMR VpcId: !Ref VPC SecurityGroupIngress: - IpProtocol: '-1' CidrIp: 0.0.0.0/0 SecurityGroupEgress: - IpProtocol: '-1' CidrIp: 0.0.0.0/0 SlaveSecurityGroup: Type: 'AWS::EC2::SecurityGroup' Properties: GroupName: !Sub '${EnvironmentName}-SlaveSecurityGroup' GroupDescription: Slave security group for EMR VpcId: !Ref VPC SecurityGroupIngress: - IpProtocol: '-1' CidrIp: 0.0.0.0/0 SecurityGroupEgress: - IpProtocol: '-1' CidrIp: 0.0.0.0/0 EmrCluster: Type: AWS::EMR::Cluster Properties: Applications: - Name: Hive - Name: Spark - Name: Zeppelin - Name: Ganglia - Name: MXNet - Name: Presto - Name: Livy - Name: JupyterHub - Name: Hue - Name: Tez Instances: AdditionalMasterSecurityGroups: - !Ref EmrAdditionalSecurityGroup AdditionalSlaveSecurityGroups: - !Ref EmrAdditionalSecurityGroup Ec2SubnetId: !Ref PublicSubnet EmrManagedMasterSecurityGroup: !Ref MasterSecurityGroup EmrManagedSlaveSecurityGroup: !Ref SlaveSecurityGroup MasterInstanceGroup: EbsConfiguration: EbsBlockDeviceConfigs: - VolumeSpecification: SizeInGB: 32 VolumeType: gp2 InstanceCount: 1 InstanceType: m4.2xlarge Market: ON_DEMAND Name: Master instance group CoreInstanceGroup: EbsConfiguration: EbsBlockDeviceConfigs: - VolumeSpecification: SizeInGB: 32 VolumeType: gp2 InstanceCount: 4 InstanceType: m4.2xlarge Market: ON_DEMAND Name: Core instance group JobFlowRole: !Ref EmrInstanceProfile LogUri: !Join [ '', [ 's3://', !Ref OutputS3Bucket, '/emrlogs/' ] ] Name: !Sub ${EnvironmentName} EMR Cluster ReleaseLabel: emr-5.20.0 ScaleDownBehavior: TERMINATE_AT_TASK_COMPLETION ServiceRole: !Ref EmrServiceRole Tags: - Key: Name Value: !Sub ${EnvironmentName} EMR Cluster VisibleToAllUsers: true Configurations: - Classification: spark ConfigurationProperties: "maximizeResourceAllocation": "true" - Classification: hive-site ConfigurationProperties: "hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory" - Classification: presto-connector-hive ConfigurationProperties: "hive.metastore.glue.datacatalog.enabled": "true" - Classification: spark-hive-site ConfigurationProperties: "hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory" EbsRootVolumeSize: 10 EmrIamRole: Type: "AWS::IAM::Role" Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Principal: Service: - "ec2.amazonaws.com" Action: - "sts:AssumeRole" ManagedPolicyArns: - "arn:aws:iam::aws:policy/service-role/AmazonElasticMapReduceforEC2Role" - "arn:aws:iam::aws:policy/AmazonSageMakerFullAccess" EmrServiceRole: Type: "AWS::IAM::Role" Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Principal: Service: - "elasticmapreduce.amazonaws.com" Action: - "sts:AssumeRole" ManagedPolicyArns: - "arn:aws:iam::aws:policy/service-role/AmazonElasticMapReduceRole" EmrInstanceProfile: Type: "AWS::IAM::InstanceProfile" Properties: Roles: - !Ref EmrIamRole EmrAdditionalSecurityGroup: Type: "AWS::EC2::SecurityGroup" Properties: GroupName: EmrAdditionalSecurityGroup GroupDescription: Allow SSH and Sagemaker access SecurityGroupIngress: - CidrIp: '0.0.0.0/0' Description: Sagemaker IpProtocol: tcp FromPort: 8998 ToPort: 8998 - FromPort: 8890 ToPort: 8890 IpProtocol: tcp CidrIp: '0.0.0.0/0' Tags: - Key: Name Value: !Sub ${EnvironmentName} EMR Additional Security Group VpcId: !Ref VPC CleanupSecurityGroups: Type: Custom::CleanupSecurityGroups Properties: ServiceToken: !GetAtt [CleanupSecurityGroupsFunction, Arn] CleanupSecurityGroupsFunction: Type: AWS::Lambda::Function Properties: Description: Lambda function to cleanup security groups Handler: index.handler Role: !GetAtt [LambdaEMRExecutionRole, Arn] Runtime: python2.7 Timeout: 30 Environment: Variables: MASTER_SECURITY_GROUP : !GetAtt MasterSecurityGroup.GroupId SLAVE_SECURITY_GROUP : !GetAtt SlaveSecurityGroup.GroupId Code: ZipFile: | import logging import json import zipfile import os import boto3 from botocore.exceptions import ClientError from botocore.client import Config from urllib2 import build_opener, HTTPHandler, Request LOGGER = logging.getLogger() LOGGER.setLevel(logging.INFO) ec2 = boto3.resource('ec2') def send_response(event, context, response_status, response_data): '''Send a resource manipulation status response to CloudFormation''' response_body = json.dumps({ "Status": response_status, "Reason": "See the details in CloudWatch Log Stream: " + context.log_stream_name, "PhysicalResourceId": context.log_stream_name, "StackId": event['StackId'], "RequestId": event['RequestId'], "LogicalResourceId": event['LogicalResourceId'], "Data": response_data }) LOGGER.info('ResponseURL: %s', event['ResponseURL']) LOGGER.info('ResponseBody: %s', response_body) opener = build_opener(HTTPHandler) request = Request(event['ResponseURL'], data=response_body) request.add_header('Content-Type', '') request.add_header('Content-Length', len(response_body)) request.get_method = lambda: 'PUT' response = opener.open(request) LOGGER.info("Status code: %s", response.getcode()) LOGGER.info("Status message: %s", response.msg) def handler(event, context): '''Handle Lambda event from AWS''' client = boto3.client('ec2') master = os.environ['MASTER_SECURITY_GROUP'] slave = os.environ['SLAVE_SECURITY_GROUP'] try: LOGGER.info('REQUEST RECEIVED:\n %s', event) LOGGER.info('REQUEST RECEIVED:\n %s', context) if event['RequestType'] == 'Create': LOGGER.info('CREATE!') send_response(event, context, "SUCCESS", {"Message": "Resource creation successful!"}) elif event['RequestType'] == 'Update': LOGGER.info('UPDATE!') send_response(event, context, "SUCCESS", {"Message": "Resource update successful!"}) elif event['RequestType'] == 'Delete': LOGGER.info('DELETE! of master ' + master + ' and slave ' + slave) revoke_security_group_ingress(master) revoke_security_group_ingress(slave) send_response(event, context, "SUCCESS", {"Message": "Resource deletion successful!"}) else: LOGGER.info('FAILED!') send_response(event, context, "FAILED", {"Message": "Unexpected event received from CloudFormation"}) except: #pylint: disable=W0702 LOGGER.info('FAILED!') send_response(event, context, "FAILED", { "Message": "Exception during processing"}) def revoke_security_group_ingress(group_id): try: security_group = ec2.SecurityGroup(group_id) deleteIngress = security_group.revoke_ingress(IpPermissions=security_group.ip_permissions) except (ClientError, AttributeError) as e: error = e.response['Error']['Code'] print(error) LambdaEMRExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole Path: / Policies: - PolicyName: root PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: 'ec2:*' Resource: '*' - Effect: Allow Action: - logs:CreateLogGroup - logs:CreateLogStream - logs:PutLogEvents Resource: arn:aws:logs:*:*:* Outputs: EMRMasterNodeDNS: Description: Public DNS name of the master EMR instance Value: !GetAtt EmrCluster.MasterPublicDNS EMRClusterId: Description: EMR Cluster Id Value: !Ref EmrCluster