########################################################################### # Author: Bruna Gomes # Date: Aug-2022 # Version: 1.0 # AWS Customer Agreement: https://aws.amazon.com/agreement # Description: AWS APG Cloud Formation - Sqoop to EMR env ########################################################################### AWSTemplateFormatVersion: 2010-09-09 Description: Deploys a dynamic conversion flow - sqoop scripts to EMR Metadata: AWS::CloudFormation::Interface: ParameterLabels: pInstanceType: default: EMR instance type pSecretName: default: Secret Name pDBUsername: default: user name DB pDBPassword: default: password DB pReleaseEMR: default: release EMR pCoreInstanceQtt: default: Core instance quantity pTaskInstanceQtt: default: Task instance quantity pS3Bootstrap: default: custom bootstrap file location on S3 pS3Jars: default: Jars location on S3 pS3Vault: default: Hadoop Vault Script location pS3VaultFile: default: name of Hadoop Vault Script pS3BucketPathSource: default: source files location on S3 pS3BucketPathMapping: default: mapping file location on S3 ParameterGroups: - Label: default: DB and EMR Parameters Parameters: - pInstanceType - pSecretName - pDBUsername - pDBPassword - pReleaseEMR - pCoreInstanceQtt - pTaskInstanceQtt - Label: default: S3 Parameters Parameters: - pS3Bootstrap - pS3Jars - pS3Vault - pS3VaultFile - pS3BucketPathSource - pS3BucketPathMapping Parameters: pInstanceType: Type: String Description: EMR instance type - master, worker and task Default: m5.xlarge AllowedValues: [m5.xlarge, m5.2xlarge, m5.4xlarge, m5.8xlarge, m5.12xlarge, m5.16xlarge, m5.24xlarge, m5a.xlarge, m5a.2xlarge, m5a.4xlarge, m5a.8xlarge, m5a.12xlarge, m5a.16xlarge, m5a.24xlarge, m5d.xlarge, m5d.2xlarge, m5d.4xlarge, m5d.8xlarge, m5d.12xlarge, m5d.16xlarge, m5d.24xlarge, m5zn.xlarge, m5zn.2xlarge, m5zn.3xlarge, m5zn.6xlarge, m5zn.12xlarge, m6g.xlarge, m6g.2xlarge, m6g.4xlarge, m6g.8xlarge, m6g.12xlarge, m6g.16xlarge, m6gd.xlarge, m6gd.2xlarge, m6gd.4xlarge, m6gd.8xlarge, m6gd.12xlarge, m6gd.16xlarge] pS3Bootstrap: Type: String Description: Complete S3 path for the bootstrap script pS3Jars: Type: String Description: Complete S3 path for the folder containing the jars pS3Vault: Type: String Description: Complete S3 path for the hadoop vault script pS3VaultFile: Type: String Description: Hadoop vault script name pS3BucketPathSource: Type: String Description: source files location on S3 pS3BucketPathMapping: Type: String Description: mapping file location on S3 pSecretName: Type: String Description: Name of the secret for the source database pDBUsername: Type: String AllowedPattern: "[a-zA-Z][a-zA-Z0-9]*" Default: admin Description: Username for the source database MaxLength: 16 MinLength: 1 pDBPassword: Type: String AllowedPattern: "^[a-zA-Z0-9]*$" Description: Password for the source database NoEcho: true MaxLength: 40 MinLength: 1 pReleaseEMR: Type: String Description: EMR release to be used on stack Default: emr-6.6.0 AllowedValues: [emr-6.6.0, emr-6.5.0, emr-6.4.0, emr-6.3.1, emr-6.3.0, emr-6.2.1, emr-6.2.0, emr-6.1.1, emr-6.1.0, emr-6.0.1, emr-6.0.0] pCoreInstanceQtt: Type: Number Description: Quantity of core instances on EMR Default: 1 pTaskInstanceQtt: Type: Number Description: Quantity of task instances on EMR Default: 1 Resources: rS3Bucket: Type: AWS::S3::Bucket DeletionPolicy: Retain Properties: BucketName: !Sub "sqoop-migration-${AWS::AccountId}" NotificationConfiguration: EventBridgeConfiguration: EventBridgeEnabled: true rEventRule: Type: AWS::Events::Rule Properties: Description: "Event rule" EventPattern: source: - "aws.s3" detail-type: - "Object Created" detail: bucket: name: - !Ref rS3Bucket object: key: - prefix: !Ref pS3BucketPathSource State: "ENABLED" Targets: - Arn: !GetAtt rLambdaSqoopConversion.Arn Id: "TargetLambda" rPermissionForEventsToInvokeLambda: Type: AWS::Lambda::Permission Properties: FunctionName: !Ref rLambdaSqoopConversion Action: "lambda:InvokeFunction" Principal: "events.amazonaws.com" SourceArn: !GetAtt rEventRule.Arn rEMRCluster: Type: AWS::EMR::Cluster Properties: Instances: MasterInstanceGroup: InstanceCount: 1 InstanceType: !Ref pInstanceType Market: ON_DEMAND Name: cfnMaster CoreInstanceGroup: InstanceCount: !Ref pCoreInstanceQtt InstanceType: !Ref pInstanceType Market: ON_DEMAND Name: cfnCore TaskInstanceGroups: - InstanceCount: !Ref pCoreInstanceQtt InstanceType: !Ref pInstanceType Market: ON_DEMAND Name: cfnTask TerminationProtected: false Applications: - Name: Sqoop - Name: Hive - Name: Hadoop - Name: Spark BootstrapActions: - Name: bootstrap ScriptBootstrapAction: Args: - !Ref pS3Jars - !Ref pS3Vault Path: !Ref pS3Bootstrap Name: CFN-EMR-SQOOP JobFlowRole: !Ref rEmrEc2InstanceProfile Steps: - ActionOnFailure: CANCEL_AND_WAIT HadoopJarStep: Jar: "command-runner.jar" Args: - "python3" - !Sub "/home/hadoop/${pS3VaultFile}" - !Ref pSecretName Name: hadoop_vault ServiceRole: !Ref rEmrRole ReleaseLabel: !Ref pReleaseEMR VisibleToAllUsers: true LogUri: !Sub "s3://${rS3Bucket}/emr-logs/" rEmrRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2008-10-17 Statement: - Sid: '' Effect: Allow Principal: Service: elasticmapreduce.amazonaws.com Action: 'sts:AssumeRole' Path: / ManagedPolicyArns: - 'arn:aws:iam::aws:policy/service-role/AmazonElasticMapReduceRole' rEmrEc2Role: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2008-10-17 Statement: - Sid: '' Effect: Allow Principal: Service: ec2.amazonaws.com Action: 'sts:AssumeRole' Path: / Policies: - PolicyDocument: Statement: - Action: - secretsmanager:GetSecretValue Effect: Allow Resource: - !Ref rMySecretBD PolicyName: secrets ManagedPolicyArns: - 'arn:aws:iam::aws:policy/service-role/AmazonElasticMapReduceforEC2Role' rEmrEc2InstanceProfile: Type: AWS::IAM::InstanceProfile Properties: Path: / Roles: - !Ref rEmrEc2Role rMySecretBD: Type: 'AWS::SecretsManager::Secret' Properties: Name: !Ref pSecretName Description: Secret SecretString: !Sub '{"username":"${pDBUsername}","password":"${pDBPassword}"}' rIamLambdaRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Statement: - Action: - sts:AssumeRole Effect: Allow Principal: Service: - lambda.amazonaws.com Version: 2012-10-17 Path: / Policies: - PolicyDocument: Statement: - Action: - s3:GetObject - s3:PutObject - s3:ListBucket Effect: Allow Resource: - !Sub "arn:aws:s3:::${rS3Bucket}/*" - !Sub "arn:aws:s3:::${rS3Bucket}" - Action: - logs:CreateLogGroup - logs:CreateLogStream - logs:PutLogEvents Effect: Allow Resource: - "*" Version: 2012-10-17 PolicyName: lambda-role rLambdaSqoopConversion: Type: AWS::Lambda::Function Properties: FunctionName: Lambda-sqoop-conv Code: ZipFile: !Sub | import boto3 import configparser import re bucket_name = '${rS3Bucket}' source_path = '${pS3BucketPathSource}' mapping_filename = '${pS3BucketPathMapping}' s3 = boto3.resource('s3') bucket = s3.Bucket(bucket_name) def read_obj_s3(bucket, key): return s3.Object(bucket, key).get()['Body'].read().decode() def read_config(): config = configparser.ConfigParser(allow_no_value=True, delimiters=('?')) config.optionxform = str mapping = read_obj_s3(bucket_name, mapping_filename) config.read_string(mapping) return config def modify_file_insert(mapping, obj_content): for k in mapping['insert']: if k.startswith("-Dhadoop.security"): pos = max(obj_content.find('import '), obj_content.find('export '), obj_content.find('eval ')) obj_content = obj_content[:pos+7] + k + " " + obj_content[pos+7:] else: obj_content = obj_content + ' ' + k return obj_content def modify_file_replace(mapping, obj_content): for k in mapping['replace'].keys(): de = mapping['replace'][k].split('::')[0] para = mapping['replace'][k].split('::')[1] pos_ini_k = obj_content.find(k) pos_end_k = obj_content.find(" ", pos_ini_k) pos_end_v = obj_content.find(" --", pos_end_k) if pos_end_v == -1: pos_end_v = obj_content.find(" -m ", pos_end_k) if pos_end_v == -1: pos_end_v = obj_content.find(" -e ", pos_end_k) if pos_end_v == -1: pos_end_v = obj_content.find(" -z ", pos_end_k) if pos_end_v == -1: obj_content = obj_content[:pos_end_k+1] + obj_content[pos_end_k+1:].replace(de, para) else: obj_content = obj_content[:pos_end_k+1] + obj_content[pos_end_k+1:pos_end_v].replace(de, para) + obj_content[pos_end_v:] return obj_content def modify_file_rmvusr(mapping, obj_content): pos_pwdalias_ini = obj_content.find('--password-alias') if pos_pwdalias_ini != -1: pos_pwdalias_end = obj_content.find(' --', pos_pwdalias_ini) obj_content = obj_content[:pos_pwdalias_ini] + obj_content[pos_pwdalias_end:] pos_pwd_ini = obj_content.find('--password') if pos_pwd_ini != -1: pos_pwd_end = obj_content.find(' --', pos_pwd_ini) obj_content = obj_content[:pos_pwd_ini] + obj_content[pos_pwd_end:] pos_usr_ini = obj_content.find('--username') if pos_usr_ini != -1: pos_usr_end = obj_content.find(' --', pos_usr_ini) obj_content = obj_content[:pos_usr_ini+10] + " ${!param}" + obj_content[pos_usr_end:] else: obj_content = obj_content + ' --username ${!param}' return obj_content def save_obj_s3(bucket_name, name_file, new_content): filename_pos = len(name_file.split('/')) name_file_txt = name_file.split('/')[filename_pos-1] object = s3.Object(bucket_name,f'target/{name_file_txt}') result = object.put(Body=new_content) res = result.get('ResponseMetadata') if res.get('HTTPStatusCode') == 200: print(f'{name_file_txt} was uploaded successfully in the following path: s3://{bucket_name}/target') else: print(f'{name_file_txt} was not uploaded') def lambda_handler(event, context): mapping = read_config() source_bucket_list = bucket.objects.filter(Prefix=source_path) for obj in source_bucket_list: if obj.key.split('/')[1] != '': obj_content = read_obj_s3(bucket_name, obj.key) obj_content = obj_content.replace('\r\n', ' ').replace('\n', ' ').replace('\t',' ').replace(' \\', ' ').replace('. ', ' ') obj_content = modify_file_replace(mapping, obj_content) obj_content = modify_file_rmvusr(mapping, obj_content) obj_content = modify_file_insert(mapping, obj_content) obj_content = re.sub(' +', ' ', obj_content) save_obj_s3(bucket_name, obj.key, obj_content.replace('sqoop', 'param=$1\nsqoop') .replace(' --', ' \\\n--') .replace(' -Dhadoop', ' \\\n-Dhadoop') .replace(' -m ', ' \\\n-m ') .replace(' -e ', ' \\\n-e ') .replace(' -z ', ' \\\n-z ')) Handler: index.lambda_handler Role: !GetAtt rIamLambdaRole.Arn Runtime: python3.9 Timeout: 900 Outputs: oStackName: Description: 'Stack name' Value: !Sub '${AWS::StackName}' Export: Name: !Sub '${AWS::StackName}'