AWSTemplateFormatVersion: '2010-09-09' Transform: AWS::Serverless-2016-10-31 Globals: Function: Runtime: nodejs14.x Handler: index.handler Timeout: 600 MemorySize: 256 Description: > Web based event driven architecture Parameters: RedshiftClusterEndpoint: Description: Redshift cluster endpoint including port number and database name Type: String Default: redshift-cluster.xxxxxx.region.redshift.amazonaws.com:5439/dev DbUsername: Description: Redshift database user name which has access to run SQL Script. Type: String AllowedPattern: "([a-z])([a-z]|[0-9])*" Default: 'awsuser' DDBTableName: Type: String Default: client_connections Description: The name of the new DynamoDB to store connection identifiers for each connected clients. You may leave it as the default value client_connections MinLength: 3 MaxLength: 50 AllowedPattern: ^[A-Za-z_]+$ ConstraintDescription: Required. Can be characters and underscore only. No numbers or special characters allowed. WebSocketEndpointSSMParameterName: Type: String Default: REDSHIFT_WSS_ENDPOINT Description: Parameter Name in SSM Parameter Store to store the websocket endpoint uri. You may leave it as the default value REDSHIFT_WSS_ENDPOINT MinLength: 3 MaxLength: 50 AllowedPattern: ^[A-Za-z_]+$ ConstraintDescription: Required. Can be characters and underscore only. No numbers or special characters allowed. RestApiEndpointSSMParameterName: Type: String Default: REDSHIFT_REST_API_ENDPOINT Description: Parameter Name in SSM Parameter Store to store the REST endpoint uri. You may leave it as the default value REDSHIFT_REST_API_ENDPOINT MinLength: 3 MaxLength: 50 AllowedPattern: ^[A-Za-z_]+$ ConstraintDescription: Required. Can be characters and underscore only. No numbers or special characters allowed. DDLScriptS3Path: Description: S3 URI to store the SQL script file. Please note, this automation would grant READ access on your SQL script files' S3 bucket. Type: String Default: s3:///setupscript.sql QueryScriptS3Path: Description: S3 URI to store the SQL script file. Type: String Default: s3:///queryscript.sql UserPoolARN: Description: Cognito Pool ARN. Type: String Metadata: AWS::CloudFormation::Interface: ParameterGroups: - Label: default: Input Parameters Parameters: - RedshiftClusterEndpoint - DbUsername - DDBTableName - WebSocketEndpointSSMParameterName - RestApiEndpointSSMParameterName - DDLScriptS3Path - QueryScriptS3Path Resources: ApplicationWebSocket: Type: AWS::ApiGatewayV2::Api Properties: Name: ApplicationWebSocket ProtocolType: WEBSOCKET RouteSelectionExpression: $request.body.action ConnectRoute: Type: AWS::ApiGatewayV2::Route Properties: ApiId: Ref: ApplicationWebSocket RouteKey: $connect AuthorizationType: NONE OperationName: ConnectRoute Target: Fn::Join: - / - - integrations - Ref: ConnectInteg ConnectInteg: Type: AWS::ApiGatewayV2::Integration Properties: ApiId: Ref: ApplicationWebSocket Description: Connect Integration IntegrationType: AWS_PROXY IntegrationUri: Fn::Sub: arn:aws:apigateway:${AWS::Region}:lambda:path/2015-03-31/functions/${OnConnectFunction.Arn}/invocations DisconnectRoute: Type: AWS::ApiGatewayV2::Route Properties: ApiId: Ref: ApplicationWebSocket RouteKey: $disconnect AuthorizationType: NONE OperationName: DisconnectRoute Target: Fn::Join: - / - - integrations - Ref: DisconnectInteg DisconnectInteg: Type: AWS::ApiGatewayV2::Integration Properties: ApiId: Ref: ApplicationWebSocket Description: Disconnect Integration IntegrationType: AWS_PROXY IntegrationUri: Fn::Sub: arn:aws:apigateway:${AWS::Region}:lambda:path/2015-03-31/functions/${OnDisconnectFunction.Arn}/invocations SendRoute: Type: AWS::ApiGatewayV2::Route Properties: ApiId: Ref: ApplicationWebSocket RouteKey: $default AuthorizationType: NONE OperationName: SendRoute Target: Fn::Join: - / - - integrations - Ref: SendInteg SendInteg: Type: AWS::ApiGatewayV2::Integration Properties: ApiId: Ref: ApplicationWebSocket Description: Send Integration IntegrationType: AWS_PROXY IntegrationUri: Fn::Sub: arn:aws:apigateway:${AWS::Region}:lambda:path/2015-03-31/functions/${OnConnectFunction.Arn}/invocations Deployment: Type: AWS::ApiGatewayV2::Deployment DependsOn: - ConnectRoute - DisconnectRoute - SendRoute Properties: ApiId: Ref: ApplicationWebSocket Stage: Type: AWS::ApiGatewayV2::Stage Properties: StageName: Prod Description: Prod Stage DeploymentId: Ref: Deployment ApiId: Ref: ApplicationWebSocket ApplicationRestAPI: Type: AWS::ApiGateway::RestApi Properties: Name: ApplicationRestApi Description: Rest API for GetResults from Redshift EndpointConfiguration: Types: - REGIONAL TestAuthorizer: Type: AWS::ApiGateway::Authorizer Properties: IdentitySource: method.request.header.authorization Name: CognitoAuthorizer ProviderARNs: - !Ref UserPoolARN RestApiId: !Ref ApplicationRestAPI Type: COGNITO_USER_POOLS ApplicationRestApiGatewayMethod: Type: AWS::ApiGateway::Method Properties: AuthorizationType: COGNITO_USER_POOLS AuthorizerId: !Ref TestAuthorizer HttpMethod: GET Integration: IntegrationHttpMethod: POST Type: AWS_PROXY Uri: !Sub - arn:aws:apigateway:${AWS::Region}:lambda:path/2015-03-31/functions/${LambdaArn}/invocations - LambdaArn: !GetAtt GetResultsFunction.Arn IntegrationResponses: - StatusCode: 200 ResponseTemplates: {"application/json": "$input.json('$.body')"} ResourceId: !GetAtt ApplicationRestAPI.RootResourceId RestApiId: !Ref ApplicationRestAPI MethodResponses: - StatusCode: 200 ResponseModels: { "application/json": "Empty" } ApplicationRestApiGatewayDeployment: Type: AWS::ApiGateway::Deployment DependsOn: - ApplicationRestApiGatewayMethod Properties: RestApiId: !Ref ApplicationRestAPI ApplicationRestApiStage: Type: AWS::ApiGateway::Stage Properties: DeploymentId: !Ref ApplicationRestApiGatewayDeployment RestApiId: !Ref ApplicationRestAPI StageName: Prod ConnectionsTable: Type: AWS::DynamoDB::Table Properties: AttributeDefinitions: - AttributeName: connectionId AttributeType: S - AttributeName: statementId AttributeType: S - AttributeName: topicName AttributeType: S KeySchema: - AttributeName: connectionId KeyType: HASH - AttributeName: topicName KeyType: RANGE GlobalSecondaryIndexes: - IndexName: "GSI" KeySchema: - AttributeName: "statementId" KeyType: "HASH" Projection: ProjectionType: "KEYS_ONLY" ProvisionedThroughput: ReadCapacityUnits: 5 WriteCapacityUnits: 5 ProvisionedThroughput: ReadCapacityUnits: 5 WriteCapacityUnits: 5 SSESpecification: SSEEnabled: true TableName: Ref: DDBTableName LambdaIAMRole: Type: AWS::IAM::Role DependsOn: - ApplicationWebSocket Properties: Description : LambdaStartWhatIfIAMRole AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole Path: / ManagedPolicyArns: - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole Policies: - PolicyName: QueryScriptS3AccessPolicy PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - 's3:GetBucketLocation' - 's3:GetObject' - 's3:ListBucket' Resource: - !Sub - arn:aws:s3:::${QueryScriptS3}/* - {QueryScriptS3: !Select [2, !Split ["/", !Ref QueryScriptS3Path]]} - !Sub - arn:aws:s3:::${QueryScriptS3} - {QueryScriptS3: !Select [2, !Split ["/", !Ref QueryScriptS3Path]]} - PolicyName: LambdaInvokePolicy PolicyDocument : Version: 2012-10-17 Statement: - Effect: "Allow" Action: - dynamodb:GetItem - dynamodb:DeleteItem - dynamodb:PutItem - dynamodb:Scan - dynamodb:Query - dynamodb:UpdateItem - dynamodb:BatchWriteItem - dynamodb:BatchGetItem - dynamodb:DescribeTable - dynamodb:ConditionCheckItem Resource: - !Sub "arn:${AWS::Partition}:dynamodb:${AWS::Region}:${AWS::AccountId}:table/${DDBTableName}" - !Sub "arn:${AWS::Partition}:dynamodb:${AWS::Region}:${AWS::AccountId}:table/${DDBTableName}/index/*" - Effect: "Allow" Action: - execute-api:ManageConnections Resource: - !Sub "arn:aws:execute-api:${AWS::Region}:${AWS::AccountId}:${ApplicationWebSocket}/*" - Effect: Allow Action: - redshift-data:ListStatements - redshift-data:GetStatementResult - redshift-data:DescribeStatement - ssm:GetParameters - ssm:GetParameter - ssm:DescribeParameters Resource: - '*' - Effect: Allow Action: - redshift:GetClusterCredentials - redshift-data:ExecuteStatement Resource: - !Sub - arn:aws:redshift:${AWS::Region}:${AWS::AccountId}:cluster:${SourceRedshiftClusterIdentifier} - {SourceRedshiftClusterIdentifier: !Select [0, !Split [".", !Ref RedshiftClusterEndpoint]]} - !Sub - "arn:aws:redshift:${AWS::Region}:${AWS::AccountId}:dbname:${SourceRedshiftClusterIdentifier}/${RedshiftDatabaseName}" - {SourceRedshiftClusterIdentifier: !Select [0, !Split [".", !Ref RedshiftClusterEndpoint]],RedshiftDatabaseName: !Select [1, !Split ["/", !Ref RedshiftClusterEndpoint]]} - !Sub - "arn:aws:redshift:${AWS::Region}:${AWS::AccountId}:dbuser:${SourceRedshiftClusterIdentifier}/${DbUsername}" - {SourceRedshiftClusterIdentifier: !Select [0, !Split [".", !Ref RedshiftClusterEndpoint]]} SetupLambdaRedshiftDataAPIRole: Type: 'AWS::IAM::Role' Properties: Description : IAM Role for lambda to access Redshift, S3 and execute the Data API AssumeRolePolicyDocument: Statement: - Action: - 'sts:AssumeRole' Effect: Allow Principal: Service: - lambda.amazonaws.com Version: 2012-10-17 Path: / Policies: - PolicyName: SetupRedshiftDataApiPolicy PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - 'redshift-data:GetStatementResult' - 'redshift-data:DescribeStatement' Resource: '*' - Effect: Allow Action: - 'redshift:GetClusterCredentials' - 'redshift-data:ExecuteStatement' Resource: - !Sub - arn:aws:redshift:${AWS::Region}:${AWS::AccountId}:cluster:${SourceRedshiftClusterIdentifier} - {SourceRedshiftClusterIdentifier: !Select [0, !Split [".", !Ref RedshiftClusterEndpoint]]} - !Sub - arn:aws:redshift:${AWS::Region}:${AWS::AccountId}:dbname:${SourceRedshiftClusterIdentifier}/${RedshiftDatabaseName} - {SourceRedshiftClusterIdentifier: !Select [0, !Split [".", !Ref RedshiftClusterEndpoint]],RedshiftDatabaseName: !Select [1, !Split ["/", !Ref RedshiftClusterEndpoint]]} - !Sub - arn:aws:redshift:${AWS::Region}:${AWS::AccountId}:dbuser:${SourceRedshiftClusterIdentifier}/${DbUsername} - {SourceRedshiftClusterIdentifier: !Select [0, !Split [".", !Ref RedshiftClusterEndpoint]]} - PolicyName: DDLScriptS3AccessPolicy PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - 's3:GetBucketLocation' - 's3:GetObject' - 's3:ListBucket' Resource: - !Sub - arn:aws:s3:::${DDLScriptS3}/* - {DDLScriptS3: !Select [2, !Split ["/", !Ref DDLScriptS3Path]]} - !Sub - arn:aws:s3:::${DDLScriptS3} - {DDLScriptS3: !Select [2, !Split ["/", !Ref DDLScriptS3Path]]} ManagedPolicyArns: - 'arn:aws:iam::aws:policy/AWSLambdaExecute' PermissionForRedshiftEventToInvokeLambda: Type: AWS::Lambda::Permission Properties: FunctionName: Ref: "SendMessageFunction" Action: "lambda:InvokeFunction" Principal: "events.amazonaws.com" SourceArn: Fn::GetAtt: - "EventBridgeRedshiftEventRule" - "Arn" OnConnectPermission: Type: AWS::Lambda::Permission DependsOn: - ApplicationWebSocket Properties: Action: lambda:InvokeFunction FunctionName: Ref: OnConnectFunction Principal: apigateway.amazonaws.com SendMessagePermission: Type: AWS::Lambda::Permission DependsOn: - ApplicationWebSocket Properties: Action: lambda:InvokeFunction FunctionName: Ref: SendMessageFunction Principal: apigateway.amazonaws.com GetResultsPermission: Type: AWS::Lambda::Permission DependsOn: - ApplicationWebSocket Properties: Action: lambda:InvokeFunction FunctionName: Ref: GetResultsFunction Principal: apigateway.amazonaws.com OnDisconnectPermission: Type: AWS::Lambda::Permission DependsOn: - ApplicationWebSocket Properties: Action: lambda:InvokeFunction FunctionName: Ref: OnDisconnectFunction Principal: apigateway.amazonaws.com EventBridgeRedshiftEventRule: Type: "AWS::Events::Rule" Properties: EventPattern: !Sub "{\"source\": [\"aws.redshift-data\"],\"detail\": {\"principal\": [\"arn:aws:sts::${AWS::AccountId}:assumed-role/${LambdaIAMRole}/${OnConnectFunction}\"]}}" Description: Respond to Redshift-data events State: "ENABLED" Targets: - Arn: !GetAtt 'SendMessageFunction.Arn' Id: EventBridgeRedshiftEventRule InputTransformer: InputPathsMap: {"body":"$.detail"} InputTemplate: !Sub "{\"action\":\"sendmessage\", \"data\":}" PermissionForRedshiftEventToInvokeLambda: Type: AWS::Lambda::Permission Properties: FunctionName: Ref: "SendMessageFunction" Action: "lambda:InvokeFunction" Principal: "events.amazonaws.com" SourceArn: Fn::GetAtt: - "EventBridgeRedshiftEventRule" - "Arn" RedshiftDataApiWebSocketEndpoint: Type: AWS::SSM::Parameter Properties: Name: !Ref WebSocketEndpointSSMParameterName Type: String Value: Fn::Join: - '' - - Ref: ApplicationWebSocket - .execute-api. - Ref: AWS::Region - .amazonaws.com/ - Ref: Stage RedshiftDataApiRestApiEndpoint: Type: AWS::SSM::Parameter Properties: Name: !Ref RestApiEndpointSSMParameterName Type: String Value: Fn::Join: - '' - - Ref: ApplicationRestAPI - .execute-api. - Ref: AWS::Region - .amazonaws.com/ - Ref: ApplicationRestApiStage SetupRedshiftLambdaFunction: Type: "AWS::Serverless::Function" Properties: Description: SetupRedshiftLambdaFunction Handler: index.handler Role: !GetAtt 'SetupLambdaRedshiftDataAPIRole.Arn' Environment: Variables: REDSHIFT_ENDPOINT: Ref: RedshiftClusterEndpoint REDSHIFT_USER: Ref: DbUsername DDL_SCRIPT_S3_URI: Ref: DDLScriptS3Path InlineCode: | const AWS = require('aws-sdk'); const cfnResponse = require('cfn-response'); exports.handler = async (event, context) => { console.log(event); const redshiftClusterId = process.env.REDSHIFT_ENDPOINT.split('.')[0]; const redshiftDatabaseName = process.env.REDSHIFT_ENDPOINT.split('/')[1]; const s3Client = new AWS.S3({apiVersion: '2006-03-01'}); // https://docs.aws.amazon.com/sdk-for-javascript/v2/developer-guide/s3-example-creating-buckets.html const ddlScriptS3Path = process.env.DDL_SCRIPT_S3_URI; const script = await runSQLScriptFromS3(s3Client, ddlScriptS3Path); const res = await executeRedshiftSql(event, context, redshiftClusterId, redshiftDatabaseName, process.env.REDSHIFT_USER, script); await sendCfnRes(event, context, cfnResponse.SUCCESS); }; const sendCfnRes = (event, context, status, data) => { return new Promise(() => cfnResponse.send(event, context, status, {'Data': data})); } const runSQLScriptFromS3 = async (s3Client, scriptS3Path) => { if (scriptS3Path.length === 0){ console.error('setup script s3 path is empty.'); } let path = scriptS3Path.replace("s3://","").split("/"); const bucket = path[0]; path.shift(); const key = path.join('/') try { const params = { Bucket: bucket, Key: key } const data = await s3Client.getObject(params).promise(); return data.Body.toString('utf-8'); } catch (e) { throw new Error(`Could not retrieve file from S3: ${e.message}`) } } const executeRedshiftSql = async (event, context, redshiftClusterId, redshiftDatabaseName, redshiftUser, script) => { const sqlStatements = script.split(/(?<=\;)/); const redshiftDataClient = new AWS.RedshiftData({region: process.env.AWS_REGION}); let res; for (const sql of sqlStatements) { res = await executeSqlDataApi(event, context, redshiftDataClient, redshiftClusterId, redshiftDatabaseName, redshiftUser, sql, true); } return res; } const executeSqlDataApi = async (event, context, redshiftDataApiClient, redshiftClusterId, redshiftDatabaseName, redshiftUser, query, isSynchronous) => { let queryId = ''; const executeStatementInput = { ClusterIdentifier: redshiftClusterId, Database: redshiftDatabaseName, DbUser: redshiftUser, Sql: query }; // Calling Redshift Data API with executeStatement() await redshiftDataApiClient.executeStatement(executeStatementInput).promise() .then((response) => { queryId = response.Id; }) .catch((error) => { console.log('ExecuteStatement has failed.'); throw new Error(error); }); let { Status: queryStatus } = await getDescribeStatement(redshiftDataApiClient, queryId); console.log(`Query Status: ${queryStatus} | QueryId: ${queryId}`); if (isSynchronous) { queryStatus = await executeSynchronousWait(event, context, redshiftDataApiClient, queryStatus, queryId); } return queryStatus; }; const executeSynchronousWait = async (event,context, redshiftDataApiClient, queryStatus, queryId) => { let attempts = 0; let describeStatementInfo = {}; while (true) { attempts++; await sleep(10); ({ Status: queryStatus, ...describeStatementInfo } = await getDescribeStatement(redshiftDataApiClient, queryId)); if (queryStatus === 'FAILED') { console.log(`SQL query failed: ${queryId}: \n Error: ${describeStatementInfo.Error}`) await sendCfnRes(event, context, cfnResponse.FAILED); throw new Error(`SQL query failed: ${queryId}: \n Error: ${describeStatementInfo.Error}`); } else if (queryStatus === 'FINISHED') { console.log(`Query status is: ${queryStatus} for query id: ${queryId} `); break; } else { console.log(`Currently working... query status is ${queryStatus}`); } } return queryStatus; }; const getDescribeStatement = async (redshiftDataApiClient, queryId) => redshiftDataApiClient .describeStatement({ Id: queryId }) .promise() .then(response => { return response; }) .catch((error) => { console.log('DescribeStatement has failed.'); throw new Error(error); }); const sleep = (seconds) => { return new Promise(resolve => setTimeout(resolve, seconds * 1000)); }; OnConnectFunction: Type: "AWS::Serverless::Function" Properties: Description: OnConnectFunction Role: !GetAtt 'LambdaIAMRole.Arn' Environment: Variables: TABLE_NAME: Ref: DDBTableName REDSHIFT_ENDPOINT: Ref: RedshiftClusterEndpoint REDSHIFT_USER: Ref: DbUsername WS_ENDPOINT_SSM_PARAMETER_NAME: Ref: WebSocketEndpointSSMParameterName QUERY_SQL_SCRIPT_S3_URI: Ref: QueryScriptS3Path InlineCode: | const AWS = require('aws-sdk'); exports.handler = async event => { console.log(event); if (event.requestContext.eventType === "MESSAGE"){ const redshiftClusterId = process.env.REDSHIFT_ENDPOINT.split('.')[0]; const redshiftDatabaseName = process.env.REDSHIFT_ENDPOINT.split('/')[1]; const s3Client = new AWS.S3({apiVersion: '2006-03-01'}); const queryScriptS3Path = process.env.QUERY_SQL_SCRIPT_S3_URI; const apiGateway = new AWS.ApiGatewayManagementApi({ apiVersion: '2018-11-29', wssEndpoint: await getSSMParameter(process.env.WS_ENDPOINT_SSM_PARAMETER_NAME) }) const param = JSON.parse(event.body).tripMonth; const topicName = JSON.parse(event.body).topicName; let script = await runSQLScriptFromS3(s3Client, queryScriptS3Path); script = script.replace(/{}/g, param); const statementId = await executeRedshiftSql(script, redshiftClusterId, redshiftDatabaseName, process.env.REDSHIFT_USER); await putConnectionRecordDdb (process.env.TABLE_NAME, event.requestContext.connectionId, statementId, topicName); } const response = { statusCode: 200, body: "Connected" }; return response; }; const getSSMParameter = async (param) => new AWS.SSM() .getParameter({ Name: param }) .promise() .then(response => { return response.Parameter.Value; }) .catch((e) => { console.log('getSSMParameter failed.'); console.log(e.stack); throw e; }); const runSQLScriptFromS3 = async (s3Client, scriptS3Path) => { if (scriptS3Path.length === 0){ console.error('setup script s3 path is empty.'); } let path = scriptS3Path.replace("s3://","").split("/"); const bucket = path[0]; path.shift(); const key = path.join('/') try { const params = { Bucket: bucket, Key: key } const data = await s3Client.getObject(params).promise(); return data.Body.toString('utf-8'); } catch (e) { throw new Error(`Could not retrieve file from S3: ${e.message}`) } } const executeRedshiftSql = async (sqlScript, redshiftClusterId, redshiftDatabaseName, redshiftUser) => new AWS.RedshiftData({ region: process.env.AWS_REGION }) .executeStatement({ ClusterIdentifier: redshiftClusterId, Database: redshiftDatabaseName, DbUser: redshiftUser, Sql: sqlScript, WithEvent: true }) .promise() .then(response => response.Id) .catch((e) => { console.log('executeRedshiftSql failed.'); console.log(e.stack); throw e; }); const putConnectionRecordDdb = async (tableName, connectionId, statementId, topicName) => new AWS.DynamoDB.DocumentClient({ apiVersion: '2012-08-10', region: process.env.AWS_REGION }) .put({TableName: tableName, Item: { statementId: statementId, connectionId: connectionId, topicName: topicName } }) .promise() .catch((e) => { console.log('putConnectionRecordDdb failed.'); console.log(e.stack); throw e; }); SendMessageFunction: Type: "AWS::Serverless::Function" Properties: Description: SendMessageFunction Role: !GetAtt 'LambdaIAMRole.Arn' Environment: Variables: TABLE_NAME: Ref: DDBTableName WS_ENDPOINT_SSM_PARAMETER_NAME: Ref: WebSocketEndpointSSMParameterName InlineCode: | const AWS = require('aws-sdk'); exports.handler = async event => { console.log(event); const { TABLE_NAME } = process.env; const wssEndpoint = await getSSMParameter(process.env.WS_ENDPOINT_SSM_PARAMETER_NAME); console.log(wssEndpoint) const statementId = event.data.statementId; const response = await getConnectionIdTopicName(TABLE_NAME, statementId); const connectionId = response.connectionId; const topicName = response.topicName; if (event.data.state == 'FAILED'){ await postMessage(wssEndpoint, connectionId, '{"columns":["Error"],"records":[["SQL Query Failed"]]}'); } else{ await postMessage(wssEndpoint, connectionId, topicName, statementId); } return { statusCode: 200, body: 'Data sent.' }; }; const postMessage = async (wssEndpoint, connectionId, topicName, statementId) => new AWS.ApiGatewayManagementApi({apiVersion: '2018-11-29',endpoint: wssEndpoint}) .postToConnection({ ConnectionId: connectionId, Data: `${topicName}/${statementId}` }) .promise() .catch((e) => { console.log('postMessage failed.'); console.log(e.stack); throw e; }); const getSSMParameter = async (param) => new AWS.SSM() .getParameter({ Name: param }) .promise() .then(response => { return response.Parameter.Value; }) .catch((e) => { console.log('getSSMParameter failed.'); console.log(e.stack); throw e; }); const getConnectionIdTopicName = async (tableName, statementId) => new AWS.DynamoDB.DocumentClient({ apiVersion: '2012-08-10', region: process.env.AWS_REGION }) .query({TableName: tableName, IndexName: "GSI", KeyConditionExpression: "statementId = :s", ExpressionAttributeValues: { ":s": statementId } }) .promise() .then(response => { console.log(response); return { connectionId: response.Items[0].connectionId, topicName: response.Items[0].topicName } }) .catch((e) => { console.log('getConnectionIdTopicName failed.'); console.log(e.stack); throw e; }); GetResultsFunction: Type: "AWS::Serverless::Function" Properties: Description: GetResultsFunctions Role: !GetAtt 'LambdaIAMRole.Arn' InlineCode: | const AWS = require('aws-sdk'); exports.handler = async (event) => { console.log(event); const topicName = event.queryStringParameters.topicName; const statementId = event.queryStringParameters.queryId; const results = await getResults(statementId) console.log(results) const response = { statusCode: 200, headers: { "Access-Control-Allow-Headers" : "Content-Type", "Access-Control-Allow-Origin": "*", "Access-Control-Allow-Methods": "GET,PUT,POST,DELETE" }, body: results }; return response; }; const getResults = async (statementId) => new AWS.RedshiftData({ region: process.env.AWS_REGION }) .getStatementResult({Id: statementId}) .promise() .then(response => { let records = []; let output = {}; let rowValues = []; output.columns = response.ColumnMetadata.map(metaData => metaData.label); response.Records.forEach( record => { rowValues = record.map(item => Object.values(item)); records.push(rowValues.flat()); }); output.records=records; return JSON.stringify(output); }) .catch((e) => { console.log('getResults failed.'); console.log(e.stack); throw e; }); OnDisconnectFunction: Type: "AWS::Serverless::Function" Properties: Description: OnDisconnectFunction Role: !GetAtt 'LambdaIAMRole.Arn' Environment: Variables: TABLE_NAME: Ref: DDBTableName InlineCode: | const AWS = require('aws-sdk'); const ddb = new AWS.DynamoDB.DocumentClient({ apiVersion: '2012-08-10', region: process.env.AWS_REGION }); exports.handler = event => { console.log(event.requestContext) getRecords(event).then((data) => { data.Items.forEach((item) => { deleteItem(item).then((data) => {}); }); }); return { statusCode: 200, body: 'Disconnected.' }; }; const getRecords = (event) => { const params = { TableName: process.env.TABLE_NAME, } return new Promise((resolve, reject) =>{ ddb.scan(params, (err, data) =>{ if (err){ console.log(err) reject(err); } else { resolve(data); } }) }); } const deleteItem = (item) => { const params = { TableName: process.env.TABLE_NAME, Key: { topicName: item.topicName, connectionId: item.connectionId }, }; return new Promise((resolve, reject) => { ddb.delete(params, (err, data) => { if (err) { console.log(err) reject(err); } else { resolve(); } }); }); } InitializationSteps: Type: Custom::SetupRedshiftLambdaFunction Properties: ServiceToken: !GetAtt [SetupRedshiftLambdaFunction, Arn] Outputs: RedshiftDataApiWebSocketEndpoint: Description: The WSS Protocol URI to connect to Value: Fn::Join: - '' - - wss:// - Ref: ApplicationWebSocket - .execute-api. - Ref: AWS::Region - .amazonaws.com/ - Ref: Stage RedshiftDataApiRestApiEndpoint: Description: The REST API endpoint to connect to Value: Fn::Join: - '' - - https:// - Ref: ApplicationRestAPI - .execute-api. - Ref: AWS::Region - .amazonaws.com/ - Ref: ApplicationRestApiStage