AWSTemplateFormatVersion: '2010-09-09' Description: 'Amazon Redshift' Transform: AWS::SecretsManager-2020-07-23 Parameters: ClusterName: Description: Name of the Redshift cluster Type: String Default: redshift-cluster DatabaseName: Description: The name of the first database to be created when the cluster is created. Type: String Default: dev AllowedPattern: '([a-z]|[0-9])+' NumberOfNodes: Description: The number of compute nodes in the cluster. For multi-node clusters, the NumberOfNodes parameter must be greater than 1. Type: Number Default: '2' NodeType: Description: The type of node to be provisioned Type: String Default: ra3.xlplus AllowedValues: - ra3.xlplus - ra3.4xlarge - ra3.16xlarge - dc2.large - dc2.8xlarge - ds2.xlarge - ds2.8xlarge MasterUsername: Description: The user name that is associated with the master user account for the cluster that is being created. Type: String Default: awsuser AllowedPattern: '([a-z])([a-z]|[0-9])*' ConstraintDescription: must start with a-z and contain only a-z or 0-9. PreExistingS3BucketToGrantRedshiftAccess: Description: The existing Amazon S3 bucket in same AWS Region, which can be accessed by Redshift Type: String Default: 'redshiftbucket-ml-sagemaker' SetupSQLScriptS3Path: Description: S3 path of SQL scripts to be run for the initial setup Type: String Default: 's3://redshiftbucket-ml-sagemaker/workshop/setup_script.sql' OnPremisesCIDR: Description: IP range (CIDR notation) for your existing infrastructure to access the target and replica redshift clusters Type: String Default: 72.21.0.0/16 MinLength: '9' MaxLength: '18' AllowedPattern: "(\\d{1,3})\\.(\\d{1,3})\\.(\\d{1,3})\\.(\\d{1,3})/(\\d{1,2})" ConstraintDescription: must be a valid IP CIDR range of the form x.x.x.x/x. VpcCIDR: Description: IP range (CIDR notation) for this VPC Type: String Default: 10.210.0.0/16 MinLength: '9' MaxLength: '18' AllowedPattern: "(\\d{1,3})\\.(\\d{1,3})\\.(\\d{1,3})\\.(\\d{1,3})/(\\d{1,2})" ConstraintDescription: must be a valid IP CIDR range of the form x.x.x.x/x. PublicSubnet1CIDR: Description: IP range (CIDR notation) for the public subnet in the first Availability Zone Type: String Default: 10.210.10.0/24 MinLength: '9' MaxLength: '18' AllowedPattern: "(\\d{1,3})\\.(\\d{1,3})\\.(\\d{1,3})\\.(\\d{1,3})/(\\d{1,2})" ConstraintDescription: must be a valid IP CIDR range of the form x.x.x.x/x. PublicSubnet2CIDR: Description: IP range (CIDR notation) for the public subnet in the second Availability Zone Type: String Default: 10.210.11.0/24 MinLength: '9' MaxLength: '18' AllowedPattern: "(\\d{1,3})\\.(\\d{1,3})\\.(\\d{1,3})\\.(\\d{1,3})/(\\d{1,2})" ConstraintDescription: must be a valid IP CIDR range of the form x.x.x.x/x. Mappings: Redshift: # static values related to the redshift cluster Port: Number: 5439 SnapshotRetention: Days: 10 Accessible: Public: false Encrypted: Kms: true Password: Length: 32 AuditLogging: ExpirationDays: 500 TransitionDays: 60 CPUUtilizationAlarm: Threshold: 95 AQUA: Status: enabled AZ: Relocation: true VPC: EnhancedRouting: false Conditions: RedshiftSingleNodeClusterCondition: Fn::Equals: - Ref: NumberOfNodes - '1' IsPreExistingS3Bucket: Fn::Not: - Fn::Equals: - 'N/A' - Ref: PreExistingS3BucketToGrantRedshiftAccess IsRA3: Fn::Equals: - !Select [0, !Split [".", !Ref NodeType]] - 'ra3' IsAquaCompatible: !Or [!Equals [!Ref "NodeType", "ra3.16xlarge"], !Equals [!Ref "NodeType", "ra3.4xlarge"]] Resources: VPC: Type: AWS::EC2::VPC Properties: CidrBlock: !Ref VpcCIDR EnableDnsSupport: true EnableDnsHostnames: true InternetGateway: Type: AWS::EC2::InternetGateway InternetGatewayAttachment: Type: AWS::EC2::VPCGatewayAttachment Properties: InternetGatewayId: !Ref InternetGateway VpcId: !Ref VPC PublicSubnet1: Type: AWS::EC2::Subnet Properties: VpcId: !Ref VPC AvailabilityZone: !Select [ 0, !GetAZs '' ] CidrBlock: !Ref PublicSubnet1CIDR MapPublicIpOnLaunch: true PublicSubnet2: Type: AWS::EC2::Subnet Properties: VpcId: !Ref VPC AvailabilityZone: !Select [ 1, !GetAZs '' ] CidrBlock: !Ref PublicSubnet2CIDR MapPublicIpOnLaunch: true PrimaryNatGatewayEIP: Type: AWS::EC2::EIP DependsOn: InternetGatewayAttachment Properties: Domain: vpc SecondaryNatGatewayEIP: Type: AWS::EC2::EIP DependsOn: InternetGatewayAttachment Properties: Domain: vpc PrimaryNatGateway: Type: AWS::EC2::NatGateway Properties: AllocationId: !GetAtt PrimaryNatGatewayEIP.AllocationId SubnetId: !Ref PublicSubnet1 SecondaryNatGateway: Type: AWS::EC2::NatGateway Properties: AllocationId: !GetAtt SecondaryNatGatewayEIP.AllocationId SubnetId: !Ref PublicSubnet2 PublicRouteTable: Type: AWS::EC2::RouteTable Properties: VpcId: !Ref VPC DefaultPublicRoute: Type: AWS::EC2::Route DependsOn: InternetGatewayAttachment Properties: RouteTableId: !Ref PublicRouteTable DestinationCidrBlock: 0.0.0.0/0 GatewayId: !Ref InternetGateway PublicSubnet1RouteTableAssociation: Type: AWS::EC2::SubnetRouteTableAssociation Properties: RouteTableId: !Ref PublicRouteTable SubnetId: !Ref PublicSubnet1 PublicSubnet2RouteTableAssociation: Type: AWS::EC2::SubnetRouteTableAssociation Properties: RouteTableId: !Ref PublicRouteTable SubnetId: !Ref PublicSubnet2 RedshiftS3Bucket: Type: 'AWS::S3::Bucket' Properties: BucketEncryption: ServerSideEncryptionConfiguration: - ServerSideEncryptionByDefault: SSEAlgorithm: AES256 Tags: - Key: Name Value: !Join [ '-', [ !Ref 'AWS::StackName', 'RedshiftS3Bucket', ], ] RedshiftAccessIamPolicy: Type: 'AWS::IAM::ManagedPolicy' Properties: PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - redshift:GetClusterCredentials Resource: - !Sub arn:aws:redshift:${AWS::Region}:${AWS::AccountId}:cluster:${ClusterName} - !Sub arn:aws:redshift:${AWS::Region}:${AWS::AccountId}:dbname:${ClusterName}/${DatabaseName} - !Sub arn:aws:redshift:${AWS::Region}:${AWS::AccountId}:dbuser:${ClusterName}/${MasterUsername} - Effect: Allow Action: - iam:PassRole - ec2:Describe* - redshift:restoreFromClusterSnapshot - redshift:describeClusterSnapshots - redshift-data:ExecuteStatement - redshift-data:ListStatements - redshift-data:GetStatementResult - redshift-data:DescribeStatement Resource: - '*' RedshiftBucketAccessIamPolicy: Type: 'AWS::IAM::ManagedPolicy' Properties: PolicyDocument: Version: '2012-10-17' Statement: - Effect: 'Allow' Action: - s3:GetBucketLocation - s3:GetObject - s3:ListMultipartUploadParts - s3:ListBucket - s3:ListBucketMultipartUploads Resource: - !Sub "arn:aws:s3:::${RedshiftS3Bucket}" - !Sub "arn:aws:s3:::${RedshiftS3Bucket}/*" - !If - IsPreExistingS3Bucket - !Sub "arn:aws:s3:::${PreExistingS3BucketToGrantRedshiftAccess}" - !Ref 'AWS::NoValue' - !If - IsPreExistingS3Bucket - !Sub "arn:aws:s3:::${PreExistingS3BucketToGrantRedshiftAccess}/*" - !Ref 'AWS::NoValue' - Effect: 'Allow' Action: - s3:PutObject Resource: - !Sub "arn:aws:s3:::${RedshiftS3Bucket}/*" - !If - IsPreExistingS3Bucket - !Sub "arn:aws:s3:::${PreExistingS3BucketToGrantRedshiftAccess}/*" - !Ref 'AWS::NoValue' SageMakerAccessIamPolicy: Type: 'AWS::IAM::ManagedPolicy' Properties: PolicyDocument: Version: '2012-10-17' Statement: - Effect: 'Allow' Action: - sagemaker:CreateApp - sagemaker:DescribeApp - sagemaker:CreateTrainingJob Resource: - '*' - Effect: Allow Action: - iam:PassRole - iam:GetRole - sagemaker:DescribeTrainingJob - sagemaker:CreateModel - sagemaker:CreateEndpointConfig - sagemaker:DescribeEndpoint - sagemaker:CreateEndpoint - sagemaker:InvokeEndpoint - sagemaker:DeleteEndpoint - sagemaker:DescribeEndpointConfig Resource: - !Sub "arn:aws:iam::${AWS::AccountId}:role/${AWS::StackName}-Sagemaker-${AWS::AccountId}-${AWS::Region}" - !Sub "arn:aws:sagemaker:${AWS::Region}:${AWS::AccountId}:*" - Effect: Allow Action: - logs:DescribeLogStreams Resource: - !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:*" IamRoleRedshiftCluster: Type: 'AWS::IAM::Role' Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: 'Allow' Principal: Service : - redshift.amazonaws.com - sagemaker.amazonaws.com Action: - 'sts:AssumeRole' Path: '/' RoleName: !Sub "${AWS::StackName}-Redshift-${AWS::AccountId}-${AWS::Region}" ManagedPolicyArns: - !Ref RedshiftBucketAccessIamPolicy Policies: - PolicyName: !Join [ '-', [ 'Spectrum-Glue-Access-Policy', !Ref 'AWS::StackName' ], ] PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - glue:* - logs:* - ec2:Describe* - ec2:CreateNetworkInterface - ec2:CreateNetworkInterfacePermission - ec2:DeleteNetworkInterface - ec2:DeleteNetworkInterfacePermission - cloudwatch:PutMetricData - ecr:BatchCheckLayerAvailability - ecr:BatchGetImage - ecr:GetAuthorizationToken - ecr:GetDownloadUrlForLayer - sagemaker:*Job* - sagemaker:InvokeEndpoint Resource: - '*' - Effect: Allow Action: - iam:PassRole - iam:GetRole Resource: - !Sub "arn:aws:iam::${AWS::AccountId}:role/${AWS::StackName}-Redshift-${AWS::AccountId}-${AWS::Region}" SecurityGroupRedshift: Type: 'AWS::EC2::SecurityGroup' Properties: GroupDescription: security group associated to Amazon Redshift VpcId: !Ref VPC SecurityGroupIngress: - IpProtocol: tcp FromPort: !FindInMap [ Redshift, Port, Number] ToPort: !FindInMap [ Redshift, Port, Number] CidrIp: !Ref OnPremisesCIDR Description: 'Redshift Access to on prem users CIDR' SecurityGroupSageMakerNotebook: Type: AWS::EC2::SecurityGroup Properties: GroupDescription: 'SageMaker Notebook security group' SecurityGroupIngress: - CidrIp: !Ref VpcCIDR Description : Allow inbound access on redshift port IpProtocol: tcp FromPort: 5439 ToPort: 5439 VpcId: !Ref VPC SecurityGroupSelfReference: Type: AWS::EC2::SecurityGroupIngress Properties: Description: Self Referencing Rule IpProtocol: -1 FromPort: -1 ToPort: -1 GroupId: !GetAtt [SecurityGroupRedshift, GroupId] SourceSecurityGroupId: !GetAtt [SecurityGroupRedshift, GroupId] RedshiftClusterParameterGroup: Type: AWS::Redshift::ClusterParameterGroup Properties: Description: Redshift Cluster Parameter Grup with Auto WLM ParameterGroupFamily: redshift-1.0 Parameters: - ParameterName: enable_user_activity_logging ParameterValue: 'true' - ParameterName: require_ssl ParameterValue: 'true' - ParameterName: auto_analyze ParameterValue: 'true' - ParameterName: max_concurrency_scaling_clusters ParameterValue: '1' - ParameterName: 'wlm_json_configuration' ParameterValue: '[ { "query_group" : [ ],"query_group_wild_card" : 0,"user_group" : [ ],"user_group_wild_card" : 0,"concurrency_scaling" : "off","rules" : [ { "rule_name" : "DiskSpilling", "predicate" : [ { "metric_name" : "query_temp_blocks_to_disk", "operator" : ">", "value" : 100000 } ], "action" : "log"}, { "rule_name" : "QueryRunningMoreThan30min", "predicate" : [ { "metric_name" : "query_execution_time", "operator" : ">", "value" : 1800 } ], "action" : "log"} ],"priority" : "normal","queue_type" : "auto","auto_wlm" : true }, {"short_query_queue" : true } ]' Tags: - Key: Name Value: !Join [ '-', [ !Ref 'AWS::StackName', 'Primary Cluster Parameter group', ], ] RedshiftClusterSubnetGroup: Type: 'AWS::Redshift::ClusterSubnetGroup' Properties: Description: Cluster subnet group SubnetIds: - !Ref PublicSubnet1 - !Ref PublicSubnet2 CMKeyRedshiftCluster: Type: AWS::KMS::Key Properties: Description: 'Customer managed key to be used for encryption at rest' Enabled: Yes EnableKeyRotation: Yes KeyPolicy: Version: 2012-10-17 Id: key-default-1 Statement: - Sid: Enable KMS Permissions for root account user Effect: Allow Principal: AWS: !Sub 'arn:aws:iam::${AWS::AccountId}:root' Action: 'kms:*' Resource: '*' - Sid: Enable IAM User Permissions Effect: Allow Principal: AWS: !Sub 'arn:aws:iam::${AWS::AccountId}:root' Action: 'kms:*' Resource: '*' - Sid: 'Allow access through RedShift for all principals in the account that are authorized to use RedShift' Effect: 'Allow' Principal: AWS: '*' Action: - 'kms:Encrypt' - 'kms:Decrypt' - 'kms:ReEncrypt*' - 'kms:GenerateDataKey*' - 'kms:CreateGrant' - 'kms:ListGrants' - 'kms:DescribeKey' Resource: '*' Condition: StringEquals: 'kms:CallerAccount': !Sub '${AWS::AccountId}' 'kms:ViaService': !Sub 'redshift.${AWS::Region}.amazonaws.com' SecretRedshiftMasterUser: Type: "AWS::SecretsManager::Secret" Properties: Description: "Secrets Manager to store Redshift master user credentials" GenerateSecretString: SecretStringTemplate: !Sub - '{"username": "${MasterUsername}"}' - {MasterUsername: !Ref MasterUsername} GenerateStringKey: "password" PasswordLength: !FindInMap [ Redshift, Password, Length] ExcludePunctuation: true RedshiftCluster: Type: 'AWS::Redshift::Cluster' DeletionPolicy: 'Delete' UpdateReplacePolicy: 'Delete' Properties: ClusterType: !If [RedshiftSingleNodeClusterCondition, 'single-node', 'multi-node'] ClusterIdentifier: !Ref ClusterName NumberOfNodes: !If [ RedshiftSingleNodeClusterCondition, !Ref 'AWS::NoValue', !Ref NumberOfNodes, ] NodeType: !Ref NodeType DBName: !Ref DatabaseName KmsKeyId: !Ref CMKeyRedshiftCluster Encrypted: !FindInMap [ Redshift, Encrypted, Kms] Port: !FindInMap [ Redshift, Port, Number] MasterUsername: !Join ['', ['{{resolve:secretsmanager:', !Ref SecretRedshiftMasterUser, ':SecretString:username}}' ]] MasterUserPassword: !Join ['', ['{{resolve:secretsmanager:', !Ref SecretRedshiftMasterUser, ':SecretString:password}}' ]] ClusterParameterGroupName: !Ref RedshiftClusterParameterGroup AquaConfigurationStatus: !If [IsAquaCompatible, !FindInMap [ Redshift, AQUA, Status], !Ref 'AWS::NoValue'] AvailabilityZoneRelocation: !If [IsRA3, !FindInMap [ Redshift, AZ, Relocation], !Ref 'AWS::NoValue'] EnhancedVpcRouting: !FindInMap [ Redshift, VPC, EnhancedRouting] VpcSecurityGroupIds: - !Ref SecurityGroupRedshift AutomatedSnapshotRetentionPeriod: !FindInMap [ Redshift, SnapshotRetention, Days] PubliclyAccessible: !FindInMap [ Redshift, Accessible, Public] ClusterSubnetGroupName: !Ref RedshiftClusterSubnetGroup IamRoles: - !GetAtt IamRoleRedshiftCluster.Arn Tags: - Key: Name Value: !Join [ '-', [!Ref 'AWS::StackName', 'Redshift-Cluster'], ] SecretAttachmentRedshiftMasterUser: Type: "AWS::SecretsManager::SecretTargetAttachment" Properties: SecretId: !Ref SecretRedshiftMasterUser TargetId: !Ref RedshiftCluster TargetType: AWS::Redshift::Cluster SagemakerNotebookIAMRole: Type: AWS::IAM::Role Properties : AssumeRolePolicyDocument: Version : 2012-10-17 Statement : - Effect : Allow Principal : Service : - sagemaker.amazonaws.com Action : - sts:AssumeRole Path : / RoleName: !Sub "${AWS::StackName}-Sagemaker-${AWS::AccountId}-${AWS::Region}" ManagedPolicyArns: - !Ref RedshiftBucketAccessIamPolicy - !Ref RedshiftAccessIamPolicy - !Ref SageMakerAccessIamPolicy - arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess NotebookInstance: Type: "AWS::SageMaker::NotebookInstance" Properties: InstanceType: "ml.t3.medium" RoleArn: !GetAtt SagemakerNotebookIAMRole.Arn DirectInternetAccess: Enabled RootAccess: Enabled SubnetId: !Ref PublicSubnet1 VolumeSizeInGB: 20 LifecycleConfigName: Fn::GetAtt: - NotebookLifecycle - NotebookInstanceLifecycleConfigName SecurityGroupIds: - Ref: SecurityGroupSageMakerNotebook NotebookLifecycle: Type: "AWS::SageMaker::NotebookInstanceLifecycleConfig" Properties: OnCreate: - Content: Fn::Base64: | #!/usr/bin/env bash SAGEMAKER_DIR="/home/ec2-user/SageMaker" chown -R ec2-user:ec2-user "${SAGEMAKER_DIR}" cd ${SAGEMAKER_DIR} /usr/bin/aws s3 cp s3://redshiftbucket-ml-sagemaker/workshop/redshiftml-bring-your-own-model-RCF.ipynb . AWSBatchIamRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: batch.amazonaws.com Action: sts:AssumeRole ManagedPolicyArns: - arn:aws:iam::aws:policy/service-role/AWSBatchServiceRole EcsTaskExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service : - ecs-tasks.amazonaws.com - sagemaker.amazonaws.com Action: sts:AssumeRole ManagedPolicyArns: - arn:aws:iam::aws:policy/service-role/AmazonECSTaskExecutionRolePolicy - arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess - !Ref RedshiftBucketAccessIamPolicy - !Ref RedshiftAccessIamPolicy - !Ref SageMakerAccessIamPolicy RedshiftJobDefinition: Type: AWS::Batch::JobDefinition Properties: Type: container RetryStrategy: Attempts: 1 PlatformCapabilities: - FARGATE ContainerProperties: Command: - sh - -c - yum install -y awscli; aws s3 cp $BOOTSTRAP_SCRIPT ./bootstrap.sh; sh ./bootstrap.sh Image: public.ecr.aws/amazonlinux/amazonlinux NetworkConfiguration: AssignPublicIp: ENABLED ResourceRequirements: - Type: VCPU Value: 4 - Type: MEMORY Value: 16384 JobRoleArn: !GetAtt EcsTaskExecutionRole.Arn ExecutionRoleArn: !GetAtt EcsTaskExecutionRole.Arn LogConfiguration: LogDriver: awslogs Options: "awslogs-group": !Ref RedshiftConfigTestingLogGroup "awslogs-stream-prefix": "batch" RedshiftConfigTestingLogGroup: Type: AWS::Logs::LogGroup Properties: LogGroupName: !Join [ '', [ '/', { Ref: 'AWS::StackName' }, '/log' ] ] RetentionInDays: 14 RedshiftJobQueue: Type: AWS::Batch::JobQueue Properties: Priority: 1 ComputeEnvironmentOrder: - Order: 1 ComputeEnvironment: Ref: ManagedComputeEnvironment ManagedComputeEnvironment: Type: AWS::Batch::ComputeEnvironment Properties: Type: MANAGED ComputeResources: Type: FARGATE MaxvCpus: 256 Subnets: - Ref: PublicSubnet1 SecurityGroupIds: - Ref: SecurityGroupRedshift ServiceRole: !GetAtt 'AWSBatchIamRole.Arn' RedshiftLambdaRole: Type: AWS::IAM::Role Properties: Description : IAM Role for lambda to invoke Redshift lambda function AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole ManagedPolicyArns: - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole - arn:aws:iam::aws:policy/AmazonS3FullAccess - !Ref RedshiftBucketAccessIamPolicy - !Ref RedshiftAccessIamPolicy Path: / Policies: - PolicyName: RedshiftAccessPolicy PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - batch:SubmitJob Resource: - !Ref RedshiftJobDefinition - !Ref RedshiftJobQueue - Effect: Allow Action: - batch:DescribeJobs Resource: "*" LambdaRedshift: Type: "AWS::Lambda::Function" Properties: Description: Lambda function to load customer data into Amazon Redshift from Amazon S3 Handler: index.handler Role: !GetAtt 'RedshiftLambdaRole.Arn' Runtime: python3.9 Timeout: 400 Environment: Variables: redshift_cluster_id: !Ref ClusterName redshift_database: !Ref DatabaseName redshift_user: !Ref MasterUsername redshift_iam_role: !GetAtt IamRoleRedshiftCluster.Arn Code: ZipFile: | import json import boto3 import logging import time import unicodedata import traceback import sys import time import os # from pip._internal import main # install latest version of boto3 # main(['install', '-I', '-q', 'boto3', '--target', '/tmp/', '--no-cache-dir', '--disable-pip-version-check']) # sys.path.insert(0,'/tmp/') import boto3 # initiate redshift-data and s3 client in boto3 redshift_data_client = boto3.client('redshift-data') s3_client = boto3.client('s3') logging.basicConfig() logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) def handler(event, context): logger.info(json.dumps(event)) redshift_cluster_id = os.getenv('redshift_cluster_id') redshift_database = os.getenv('redshift_database') redshift_user = os.getenv('redshift_user') redshift_cluster_iam_role = os.getenv('redshift_iam_role') s3_bucket_name = event['Records'][0]['s3']['bucket']['name'] sql_text = ''' CREATE TABLE IF NOT EXISTS customer_raw_data ( customerid varchar(256), record_date date, monthlycharges numeric(18,8), totalcharges numeric(18,8), customerservicecalls integer, state varchar(256), city varchar(256), zipcode integer, latitude numeric(18,8), longitude numeric(18,8), gender varchar(256), seniorcitizen varchar(256), partner varchar(256), dependents varchar(256), tenuremonths varchar(256), phoneservice varchar(256), multiplelines varchar(256), internetservice varchar(256), onlinesecurity varchar(256), onlinebackup varchar(256), deviceprotection varchar(256), techsupport varchar(256), streamingtv varchar(256), streamingmovies varchar(256), contract varchar(256), paperlessbilling varchar(256), paymentmethod varchar(256), cltv integer); COPY customer_raw_data from 's3://{}/customer_raw_data/' iam_role '{}' CSV IGNOREHEADER 1 DELIMITER ','; ''' sql_text = sql_text.format(s3_bucket_name,redshift_cluster_iam_role); run_sql(redshift_data_client, sql_text, redshift_cluster_id, redshift_database, redshift_user) def run_sql(redshift_data_client, sql_text, redshift_cluster_id, redshift_database, redshift_user, with_event=True ): res = redshift_data_client.execute_statement(Database=redshift_database, DbUser=redshift_user, Sql=sql_text, ClusterIdentifier=redshift_cluster_id, WithEvent=with_event) time.sleep(5) query_id = res["Id"] desc = redshift_data_client.describe_statement(Id=query_id) logger.info('Query Status: {} | Query Id: {}'.format(query_id, desc["Status"])) def get_sql_script_from_s3(script_s3_path): bucket, key = script_s3_path.replace("s3://", "").split("/", 1) obj = s3_client.get_object(Bucket=bucket, Key=key) return obj['Body'].read().decode('utf-8') BucketPermission: Type: AWS::Lambda::Permission Properties: Action: 'lambda:InvokeFunction' FunctionName: !Ref LambdaRedshift Principal: s3.amazonaws.com SourceAccount: !Ref 'AWS::AccountId' SourceArn: !Sub 'arn:aws:s3:::${RedshiftS3Bucket}' ApplyNotificationFunctionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: lambda.amazonaws.com Action: sts:AssumeRole ManagedPolicyArns: - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole Path: / Policies: - PolicyName: S3BucketNotificationPolicy PolicyDocument: Version: '2012-10-17' Statement: - Sid: AllowBucketNotification Effect: Allow Action: - s3:PutBucketNotification - s3:PutObject - s3:ListBucket Resource: - !Sub 'arn:aws:s3:::${RedshiftS3Bucket}' - !Sub 'arn:aws:s3:::${RedshiftS3Bucket}/*' ApplyBucketNotificationFunction: Type: AWS::Lambda::Function Properties: Handler: index.handler Runtime: python3.7 Role: !GetAtt 'ApplyNotificationFunctionRole.Arn' Timeout: 240 Code: ZipFile: | import boto3 import logging import json import cfnresponse s3Client = boto3.client('s3') logger = logging.getLogger() logger.setLevel(logging.DEBUG) def addBucketNotification(bucketName, notificationId, functionArn): folder_name = "customer_raw_data" s3Client.put_object(Bucket=bucketName, Key=(folder_name+'/')) notificationResponse = s3Client.put_bucket_notification_configuration( Bucket=bucketName, NotificationConfiguration={ 'LambdaFunctionConfigurations': [ { 'Id': notificationId, 'LambdaFunctionArn': functionArn, 'Events': [ 's3:ObjectCreated:*' ], 'Filter': { 'Key': { 'FilterRules': [ { 'Name': 'prefix', 'Value': 'customer_raw_data/' }, ] } } }, ] } ) return notificationResponse def create(properties, physical_id): bucketName = properties['S3Bucket'] notificationId = properties['NotificationId'] functionArn = properties['FunctionARN'] response = addBucketNotification(bucketName, notificationId, functionArn) logger.info('AddBucketNotification response: %s' % json.dumps(response)) return cfnresponse.SUCCESS, physical_id def update(properties, physical_id): return cfnresponse.SUCCESS, None def delete(properties, physical_id): return cfnresponse.SUCCESS, None def handler(event, context): logger.info('Received event: %s' % json.dumps(event)) status = cfnresponse.FAILED new_physical_id = None try: properties = event.get('ResourceProperties') physical_id = event.get('PhysicalResourceId') status, new_physical_id = { 'Create': create, 'Update': update, 'Delete': delete }.get(event['RequestType'], lambda x, y: (cfnresponse.FAILED, None))(properties, physical_id) except Exception as e: logger.error('Exception: %s' % e) status = cfnresponse.FAILED finally: cfnresponse.send(event, context, status, {}, new_physical_id) StartUpLambda: Type: 'AWS::Lambda::Function' Properties: Handler: index.handler Runtime: python3.7 Timeout: 900 Role: !GetAtt RedshiftLambdaRole.Arn Environment: Variables: redshift_cluster_identifier: !Ref ClusterName redshift_database_name: !Ref DatabaseName redshift_database_user: !Ref MasterUsername redshift_iam_role: !GetAtt IamRoleRedshiftCluster.Arn redshift_s3_bucket: !Ref RedshiftS3Bucket redshift_cluster_endpoint: !Sub "${RedshiftCluster.Endpoint.Address}:${RedshiftCluster.Endpoint.Port}/${DatabaseName}" job_definition: !Ref RedshiftJobDefinition job_queue: !Ref RedshiftJobQueue Code: ZipFile: | import os import sys import time import cfnresponse from pathlib import Path from pip._internal import main import traceback import botocore.exceptions as be # install latest version of boto3 # main(['install', '-I', '-q', 'boto3', '--target', '/tmp/', '--no-cache-dir', '--disable-pip-version-check']) # sys.path.insert(0, '/tmp/') import boto3 # print(boto3.__version__) redshift_client = boto3.client('redshift-data') s3_client = boto3.client('s3') def get_sql_script_from_s3(script_s3_path): bucket, key = script_s3_path.replace("s3://", "").split("/", 1) obj = s3_client.get_object(Bucket=bucket, Key=key) return obj['Body'].read().decode('utf-8') def batch_job_status(job_id): if job_id == "N/A": return "FINISHED" else: job_stats = boto3.client('batch').describe_jobs(jobs=[job_id]).get('jobs')[0] if job_stats.get('status') == "FAILED": raise Exception('Error:' + str(job_stats)) elif job_stats.get('status') == "SUCCEEDED": return "FINISHED" else: return job_stats.get('status') def split_sql_statement(script_s3_path, redshift_s3_bucket,redshift_iam_role): script = get_sql_script_from_s3(script_s3_path).format(redshift_s3_bucket=redshift_s3_bucket, redshift_iam_role=redshift_iam_role) sql_statements = list(filter(None, script.split(';'))) return script def run_batch_sql(redshift_config, sql_statements, statement_name): # execute the input SQL statement in the specified Amazon Redshift cluster api_response = redshift_client.execute_statement(ClusterIdentifier=redshift_config["redshift_cluster_id"] , Database=redshift_config["redshift_database"] , DbUser=redshift_config["redshift_user"] , Sql=sql_statements , StatementName=statement_name) sql_id = api_response["Id"] return sql_id def get_sql_status(sql_id): desc = redshift_client.describe_statement(Id=sql_id) status = desc["Status"] if status == "FAILED": raise Exception('SQL query failed:' + sql_id + ": " + desc["Error"]) return status.strip('"') def handler(event, context): print(event) res = {} try: if event['RequestType'] != 'Delete': redshift_config = {} redshift_config["redshift_cluster_id"] = os.getenv('redshift_cluster_identifier') redshift_config["redshift_database"] = os.getenv('redshift_database_name') redshift_config["redshift_user"] = os.getenv('redshift_database_user') redshift_s3_bucket = os.getenv('redshift_s3_bucket') redshift_iam_role = os.getenv('redshift_iam_role') redshift_cluster_endpoint = os.getenv('redshift_cluster_endpoint') job_queue = os.getenv('job_queue') job_definition = os.getenv('job_definition') redshift_script_s3_path = 's3://'+redshift_s3_bucket+'/workshop/setup_script.sql' bootstrap_script = 's3://'+redshift_s3_bucket+'/workshop/bootstrap_script.sh' python_script= 's3://'+redshift_s3_bucket+'/workshop/rcf-byom.py' sql_statements = split_sql_statement(redshift_script_s3_path, redshift_s3_bucket, redshift_iam_role) statement_name = Path(redshift_script_s3_path).name print(sql_statements) time.sleep(120) sql_id = run_batch_sql(redshift_config, sql_statements, statement_name) print(sql_id) response = boto3.client('batch').submit_job(jobName='RunRCFModel', jobQueue=job_queue, jobDefinition=job_definition, containerOverrides={ "command": ["sh", "-c", "yum install -y awscli; aws s3 cp $BOOTSTRAP_SCRIPT ./bootstrap.sh; sh ./bootstrap.sh"], "environment": [ {"name": "BOOTSTRAP_SCRIPT", "value": bootstrap_script}, {"name": "PYTHON_SCRIPT", "value": python_script}, {"name": "REDSHIFT_IAM_ROLE", "value": redshift_iam_role}, {"name": "REDSHIFT_USER", "value": redshift_config["redshift_user"]}, {"name": "REDSHIFT_ENDPOINT", "value": redshift_cluster_endpoint}, {"name": "SAGEMAKER_S3_BUCKET", "value": redshift_s3_bucket} ] }) job_id = response['jobId'] print(job_id) time.sleep(30) sql_status = get_sql_status(sql_id) batch_status = batch_job_status(job_id) api_response = {'job_id': job_id, 'sql_id': sql_id,'sql_status':sql_status, 'batch_status':batch_status} print(api_response) cfnresponse.send(event, context, cfnresponse.SUCCESS, api_response) except Exception as e: res = {'error': e.response['Error']['Code'] if isinstance(e, be.ClientError) else 'failed'} print(traceback.format_exc()) cfnresponse.send(event, context, cfnresponse.FAILED, res) raise return api_response StartUp: Type: Custom::StartUpLambda DependsOn: - RedshiftCluster - CopyRedshiftConfig Properties: ServiceToken: !GetAtt [StartUpLambda, Arn] LambdaFunctionS3Copy: Type: "AWS::Lambda::Function" Properties: Handler: index.handler Runtime: python3.8 Role: !GetAtt 'RedshiftLambdaRole.Arn' Timeout: 60 Code: ZipFile: | import boto3 import cfnresponse import traceback def handler(event, context): print(event) source_folder_s3_path = event['ResourceProperties']['source_folder_s3_path'] target_folder_s3_path = event['ResourceProperties']['target_folder_s3_path'] s3 = boto3.resource('s3') if event['RequestType'] == 'Delete': try: bucket_name = target_folder_s3_path.split('/')[2] prefix = target_folder_s3_path.split('/')[3] + '/' bucket = s3.Bucket(bucket_name) bucket.objects.filter(Prefix=prefix).delete() except Exception as e: print(e) print(traceback.format_exc()) cfnresponse.send(event, context, cfnresponse.SUCCESS, {'Data': 'Delete complete'}) else: try: source_bucket, source_prefix = source_folder_s3_path.replace("s3://", "").split("/", 1) target_bucket, target_prefix = target_folder_s3_path.replace("s3://", "").split("/", 1) for obj in s3.Bucket(source_bucket).objects.filter(Prefix=source_prefix): source_object = {'Bucket': source_bucket, 'Key': obj.key} new_key = obj.key.replace(source_prefix, target_prefix, 1) target_object = s3.Bucket(target_bucket).Object(new_key) target_object.copy(source_object) cfnresponse.send(event, context, cfnresponse.SUCCESS, {'Data': 'Copy complete'}) except Exception as e: print(e) print(traceback.format_exc()) cfnresponse.send(event, context, cfnresponse.FAILED, {'Data': 'Copy failed'}) CopyRedshiftConfig: Type: Custom::CopyRedshiftConfig Properties: ServiceToken: Fn::GetAtt : [LambdaFunctionS3Copy, Arn] source_folder_s3_path: s3://redshiftbucket-ml-sagemaker/workshop/ target_folder_s3_path: !Sub s3://${RedshiftS3Bucket}/workshop/ ApplyNotification: Type: Custom::ApplyNotification Properties: ServiceToken: !GetAtt 'ApplyBucketNotificationFunction.Arn' S3Bucket: !Ref 'RedshiftS3Bucket' FunctionARN: !GetAtt 'LambdaRedshift.Arn' NotificationId: S3ObjectCreatedEvent Outputs: RedshiftS3Bucket: Value: !Ref RedshiftS3Bucket IamRoleRedshiftCluster: Value: !Ref IamRoleRedshiftCluster RedshiftClusterEndpoint: Value: !Sub "redshift://${RedshiftCluster.Endpoint.Address}:${RedshiftCluster.Endpoint.Port}/${DatabaseName}"