AWSTemplateFormatVersion: "2010-09-09" Description: "Service Catalog: Amazon Redshift Reference Architecture Template. This template builds a AWS Glue Job which can connect to user supplied Redshift Cluster and execute either a sample scripts to load TPC-DS data or a user-provided script. See https://github.com/aws-samples/amazon-redshift-commands-using-aws-glue for more info. (fdp-redshift002)" Parameters: DatabaseHostName: Description: The hostname on which the cluster accepts incoming connections. Type: String MasterUsername: Description: The user name which will be used to execute the SQL Script. Type: String AllowedPattern: "([a-z])([a-z]|[0-9])*" MasterUserPassword: Description: The password which will be used to execute the SQL Script. Type: String NoEcho: 'true' PortNumber: Description: The port number on which the cluster accepts incoming connections. Type: Number Default: '5439' DatabaseName: Description: The name of the database which will be used to execute the SQL Script. created Type: String Default: 'dev' AllowedPattern: "([a-z]|[0-9])+" Script: Description: Enter the s3 location of an SQL script located in your AWS Region that you'd like to execute in Redshift. The default script will load a 100GB TPC-DS dataset. Type: String Default: 's3://redshift-demos/sql/redshift-tpcds.sql' ScriptParameters: Description: A comma seperated list of parameters required by the script in the form of ${n}. For the default script enter a Role which is attached to the Redshift Cluster and which as S3 Read Access. See the following reference more detail on creating a Role for Redshift https://docs.aws.amazon.com/redshift/latest/gsg/rs-gsg-create-an-iam-role.html. Type: String Default: 'arn:aws:iam:::role/' Conditions: IsDefault: !Equals - !Ref Script - s3://redshift-demos/sql/redshift-tpcds.sql IsNotDefault: !Not - !Equals - !Ref Script - s3://redshift-demos/sql/redshift-tpcds.sql Metadata: AWS::CloudFormation::Interface: ParameterGroups: - Label: default: "Connection Details" Parameters: - DatabaseHostName - MasterUsername - MasterUserPassword - PortNumber - DatabaseName - Script Resources: GlueBucket: Type: AWS::S3::Bucket Properties: VersioningConfiguration: Status: Enabled BucketEncryption: ServerSideEncryptionConfiguration: - ServerSideEncryptionByDefault: SSEAlgorithm: AES256 LambdaCFNCustomRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole Path: / ManagedPolicyArns: - arn:aws:iam::aws:policy/AmazonS3FullAccess - arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole - arn:aws:iam::aws:policy/CloudWatchLogsFullAccess - arn:aws:iam::aws:policy/AmazonVPCFullAccess - arn:aws:iam::aws:policy/AmazonRedshiftReadOnlyAccess GlueLoadRedshiftRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - glue.amazonaws.com Action: - sts:AssumeRole Path: / Policies: - PolicyName: GlueGetSecretPolicy PolicyDocument : Version: 2012-10-17 Statement: - Effect: Allow Action: - secretsmanager:GetSecretValue Resource: - Ref: Secret ManagedPolicyArns: - arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess - arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole - arn:aws:iam::aws:policy/CloudWatchLogsFullAccess Secret: Type: AWS::SecretsManager::Secret Properties: Description: Secret for Redshift Command Glue Job. SecretString: !Sub - "{\"user\": \"${user}\", \"password\": \"${pass}\", \"host\": \"${host}\", \"database\": \"${db}\", \"port\": \"${port}\"}" - {user: !Ref MasterUsername, pass: !Ref MasterUserPassword, host: !Ref DatabaseHostName, db: !Ref DatabaseName, port: !Ref PortNumber} GlueJobRedshiftCommands: Type: AWS::Glue::Job DependsOn: - InitCreateGlueConnection Properties: Role: !GetAtt 'GlueLoadRedshiftRole.Arn' ExecutionProperty: MaxConcurrentRuns: 10 Connections: Connections: - Ref: InitCreateGlueConnection Command: Name: pythonshell PythonVersion: 3 ScriptLocation: !Sub - s3://${bucket}/RedshiftCommands.py - {bucket: !Ref GlueBucket} DefaultArguments: "--job-bookmark-option" : "job-bookmark-disable" "--TempDir" : !Sub - s3://${bucket} - {bucket: !Ref GlueBucket} "--enable-metrics" : "" LambdaGlueJobRedshiftCommands: Type: "AWS::Lambda::Function" Properties: Role: !GetAtt 'LambdaCFNCustomRole.Arn' Timeout: 300 Code: ZipFile: | import json import boto3 import cfnresponse import logging logging.basicConfig() logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) glue = boto3.client('glue') #start_job_run def handler(event, context): logger.info(json.dumps(event)) if event['RequestType'] != 'Create': cfnresponse.send(event, context, cfnresponse.SUCCESS, {'Data': 'NA'}) else: try: sqlScript = event['ResourceProperties']['sqlScript'] secret = event['ResourceProperties']['secret'] params = event['ResourceProperties']['params'] jobName = event['ResourceProperties']['jobName'] response = glue.start_job_run( JobName=jobName, Arguments={ '--SQLScript':sqlScript, '--Secret':secret, '--Params':params}) message = 'Glue triggered successfully.' cfnresponse.send(event, context, cfnresponse.SUCCESS, {'Data': message}) except Exception as e: message = 'Glue Job Issue' logger.info(e) cfnresponse.send(event, context, cfnresponse.FAILED, {'Data': message}) Handler: index.handler Runtime: python3.7 DependsOn: - LambdaCFNCustomRole LambdaFunctionS3Copy: Type: "AWS::Lambda::Function" Properties: Timeout: 30 Code: ZipFile: | import json import boto3 import cfnresponse import logging logging.basicConfig() logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) def handler(event, context): logger.info(json.dumps(event)) s3 = boto3.client('s3') #delete_object, copy_object s3BucketTarget = event['ResourceProperties']['s3BucketTarget'] s3Bucket = event['ResourceProperties']['s3Bucket'] s3Object = event['ResourceProperties']['s3Object'] if event['RequestType'] == 'Delete': try: s3.delete_object(Bucket=s3BucketTarget, Key=s3Object) s3.delete_object(Bucket=s3BucketTarget, Key=s3Object+'.temp') except Exception as e: logger.info(e) cfnresponse.send(event, context, cfnresponse.SUCCESS, {'Data': 'Delete complete'}) else: try: s3.delete_object(Bucket=s3BucketTarget, Key=s3Object) except Exception as e: logger.info(e) try: s3.copy_object(Bucket=s3BucketTarget, CopySource=s3Bucket+"/"+s3Object, Key=s3Object) cfnresponse.send(event, context, cfnresponse.SUCCESS, {'Data': 'Copy complete'}) except Exception as e: logger.error(e) cfnresponse.send(event, context, cfnresponse.FAILED, {'Data': 'Copy failed'}) Handler: index.handler Role: Fn::GetAtt: [LambdaCFNCustomRole, Arn] Runtime: python3.7 DependsOn: - LambdaCFNCustomRole LambdaCreateGlueConnection: Type: "AWS::Lambda::Function" Properties: Timeout: 30 Code: ZipFile: | import json import boto3 import cfnresponse import logging logging.basicConfig() logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) def handler(event, context): logger.info(json.dumps(event)) try: glue = boto3.client('glue') #delete_connection, create_connection rs = boto3.client('redshift') #describe_clusters, describe_cluster_subnet_groups accountId = event['ResourceProperties']['Account'] if event['RequestType'] == 'Delete': try: glue.delete_connection(CatalogId=accountId, ConnectionName=event['PhysicalResourceId']) except Exception as e: logger.info(e) cfnresponse.send(event, context, cfnresponse.SUCCESS, {'Data': 'Delete complete'}) else: databaseHostName = event['ResourceProperties']['DatabaseHostName'] requestId = event['RequestId'] clusterId = databaseHostName.split('.')[0] cluster = rs.describe_clusters(ClusterIdentifier=clusterId)["Clusters"][0] availabilityzone = cluster["AvailabilityZone"] securitygroup = cluster["VpcSecurityGroups"][0]["VpcSecurityGroupId"] subnetgroupname = cluster["ClusterSubnetGroupName"] subnetgroup = rs.describe_cluster_subnet_groups(ClusterSubnetGroupName=subnetgroupname)["ClusterSubnetGroups"][0] for subnet in subnetgroup["Subnets"] : subnetid = subnet["SubnetIdentifier"] if (availabilityzone == subnet["SubnetAvailabilityZone"]["Name"]): break connectionInput = { 'Name':'GlueRedshiftConnection-'+requestId, 'ConnectionType':'JDBC', 'ConnectionProperties': { 'JDBC_CONNECTION_URL':'jdbc:redshift://host:9999/db', 'USERNAME':'user', 'PASSWORD':'password' }, 'PhysicalConnectionRequirements': { 'SubnetId': subnetid, 'SecurityGroupIdList': [securitygroup], 'AvailabilityZone':availabilityzone } } glue.create_connection(CatalogId=accountId, ConnectionInput=connectionInput) cfnresponse.send(event, context, cfnresponse.SUCCESS, {'Data': 'Create successful'}, 'GlueRedshiftConnection-'+requestId) except Exception as e: logger.error(e) cfnresponse.send(event, context, cfnresponse.FAILED, {'Data': 'Failed'}) Handler: index.handler Role: Fn::GetAtt: [LambdaCFNCustomRole, Arn] Runtime: python3.7 DependsOn: - LambdaCFNCustomRole LambdaCreateS3Connection: Type: "AWS::Lambda::Function" Properties: Timeout: 30 Code: ZipFile: | import json import boto3 import cfnresponse import logging logging.basicConfig() logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) def handler(event, context): logger.info(json.dumps(event)) try: ec2 = boto3.client('ec2') #delete_vpc_endpoints, create_vpc_endpoint, describe_route_tables rs = boto3.client('redshift') #describe_clusters, describe_cluster_subnet_groups if event['RequestType'] == 'Delete': try: ec2.delete_vpc_endpoints(VpcEndpointIds=[event['PhysicalResourceId']]) except Exception as e: logger.info(e) cfnresponse.send(event, context, cfnresponse.SUCCESS, {'Data': 'Delete complete'}) else: databaseHostName = event['ResourceProperties']['DatabaseHostName'] clusterId = databaseHostName.split('.')[0] cluster = rs.describe_clusters(ClusterIdentifier=clusterId)["Clusters"][0] vpc = cluster["VpcId"] availabilityzone = cluster["AvailabilityZone"] subnetgroupname = cluster["ClusterSubnetGroupName"] subnetgroup = rs.describe_cluster_subnet_groups(ClusterSubnetGroupName=subnetgroupname)["ClusterSubnetGroups"][0] for subnet in subnetgroup["Subnets"] : subnetid = subnet["SubnetIdentifier"] if (availabilityzone == subnet["SubnetAvailabilityZone"]["Name"]): break try: routetable = ec2.describe_route_tables(Filters=[{'Name': 'association.subnet-id','Values': [subnetid]}])["RouteTables"][0]["RouteTableId"] except: routetable = ec2.describe_route_tables()["RouteTables"][0]["RouteTableId"] region = event['ResourceProperties']['Region'] policyDocument = { "Version":"2012-10-17", "Statement":[{ "Effect":"Allow", "Principal": "*", "Action":"*", "Resource":"*" }] } try: response = ec2.create_vpc_endpoint( VpcEndpointType='Gateway', RouteTableIds=[routetable], VpcId=vpc, ServiceName='com.amazonaws.'+region+'.s3', PolicyDocument=json.dumps(policyDocument) ) cfnresponse.send(event, context, cfnresponse.SUCCESS, {'Data': 'Create successful'}, response['VpcEndpoint']['VpcEndpointId']) except Exception as e: logger.error(e) if e.response["Error"]["Code"] == 'RouteAlreadyExists': cfnresponse.send(event, context, cfnresponse.SUCCESS, {'Data': 'Create successful'}) else: cfnresponse.send(event, context, cfnresponse.FAILED, {'Data': 'Create Failed'}) except Exception as e: logger.error(e) cfnresponse.send(event, context, cfnresponse.FAILED, {'Data': 'Failed'}) Handler: index.handler Role: Fn::GetAtt: [LambdaCFNCustomRole, Arn] Runtime: python3.7 DependsOn: - LambdaCFNCustomRole LambdaCreateSecretConnection: Type: "AWS::Lambda::Function" Properties: Timeout: 30 Code: ZipFile: | import json import boto3 import cfnresponse import logging logging.basicConfig() logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) def handler(event, context): logger.info(json.dumps(event)) try: ec2 = boto3.client('ec2') #delete_vpc_endpoints, create_vpc_endpoint rs = boto3.client('redshift') #describe_clusters, describe_cluster_subnet_groups if event['RequestType'] == 'Delete': try: ec2.delete_vpc_endpoints(VpcEndpointIds=[event['PhysicalResourceId']]) except Exception as e: logger.info(e) cfnresponse.send(event, context, cfnresponse.SUCCESS, {'Data': 'Delete complete'}) else: databaseHostName = event['ResourceProperties']['DatabaseHostName'] clusterId = databaseHostName.split('.')[0] cluster = rs.describe_clusters(ClusterIdentifier=clusterId)["Clusters"][0] vpc = cluster["VpcId"] availabilityzone = cluster["AvailabilityZone"] securitygroup = cluster["VpcSecurityGroups"][0]["VpcSecurityGroupId"] subnetgroupname = cluster["ClusterSubnetGroupName"] subnetgroup = rs.describe_cluster_subnet_groups(ClusterSubnetGroupName=subnetgroupname)["ClusterSubnetGroups"][0] for subnet in subnetgroup["Subnets"] : subnetid = subnet["SubnetIdentifier"] if (availabilityzone == subnet["SubnetAvailabilityZone"]["Name"]): break region = event['ResourceProperties']['Region'] try: response = ec2.create_vpc_endpoint( VpcEndpointType='Interface', SubnetIds=[subnetid], VpcId=vpc, ServiceName='com.amazonaws.'+region+'.secretsmanager', PrivateDnsEnabled=True, SecurityGroupIds=[securitygroup] ) cfnresponse.send(event, context, cfnresponse.SUCCESS, {'Data': 'Create successful'}, response['VpcEndpoint']['VpcEndpointId']) except Exception as e: if 'there is already a conflicting DNS domain' in e.response["Error"]["Message"]: cfnresponse.send(event, context, cfnresponse.SUCCESS, {'Data': 'Create successful'}) else: cfnresponse.send(event, context, cfnresponse.FAILED, {'Data': 'Create failed'}) except Exception as e: logger.error(e) cfnresponse.send(event, context, cfnresponse.FAILED, {'Data': 'Failed'}) Handler: index.handler Role: Fn::GetAtt: [LambdaCFNCustomRole, Arn] Runtime: python3.7 DependsOn: - LambdaCFNCustomRole ImportPyScript: Type: Custom::CopyScript DependsOn: - LambdaFunctionS3Copy - GlueBucket Properties: ServiceToken: Fn::GetAtt : [LambdaFunctionS3Copy, Arn] s3BucketTarget: Ref: GlueBucket s3Bucket: 'redshift-immersionday-labs' s3Object: 'RedshiftCommands.py' InitGlueJobRedshiftCommands: Condition: IsNotDefault Type: Custom::InitGlueJobRedshiftCommands DependsOn: - LambdaGlueJobRedshiftCommands - GlueJobRedshiftCommands - InitCreateS3Connection - InitCreateSecretConnection - ImportPyScript Properties: ServiceToken: !GetAtt 'LambdaGlueJobRedshiftCommands.Arn' sqlScript: !Ref Script secret: !Ref Secret params: !Ref ScriptParameters jobName: !Ref GlueJobRedshiftCommands InitGlueJobRedshiftCommandsDefault: Condition: IsDefault Type: Custom::InitGlueJobRedshiftCommands DependsOn: - LambdaGlueJobRedshiftCommands - GlueJobRedshiftCommands - InitCreateS3Connection - InitCreateSecretConnection - ImportPyScript Properties: ServiceToken: !GetAtt 'LambdaGlueJobRedshiftCommands.Arn' sqlScript: !Sub - s3://${bucket}/sql/redshift-tpcds.sql - {bucket: !Ref GlueBucket} secret: !Ref Secret params: !Ref ScriptParameters jobName: !Ref GlueJobRedshiftCommands ImportSqlScript: Type: Custom::CopyScript Condition: IsDefault DependsOn: - LambdaFunctionS3Copy - GlueBucket Properties: ServiceToken: Fn::GetAtt : [LambdaFunctionS3Copy, Arn] s3BucketTarget: Ref: GlueBucket s3Bucket: 'redshift-demos' s3Object: 'sql/redshift-tpcds.sql' InitCreateGlueConnection: Type: Custom::InitCreateGlueConnection DependsOn: - LambdaCreateGlueConnection Properties: ServiceToken: Fn::GetAtt : [LambdaCreateGlueConnection, Arn] DatabaseHostName: Ref: DatabaseHostName Account: !Sub "${AWS::AccountId}" InitCreateS3Connection: Type: Custom::InitCreateS3Connection DependsOn: - LambdaCreateS3Connection Properties: ServiceToken: Fn::GetAtt : [LambdaCreateS3Connection, Arn] DatabaseHostName: Ref: DatabaseHostName Region: !Sub "${AWS::Region}" InitCreateSecretConnection: Type: Custom::InitCreateSecretConnection DependsOn: - LambdaCreateSecretConnection Properties: ServiceToken: Fn::GetAtt : [LambdaCreateSecretConnection, Arn] DatabaseHostName: Ref: DatabaseHostName Region: !Sub "${AWS::Region}"