--- AWSTemplateFormatVersion: 2010-09-09 Description: Leverage ML predictions over Amazon DynamoDB data using Athena ML and Athena Federation SDK. **WARNING** This template creates SageMaker Inference and related resources. You will be billed for the AWS resources used if you create a stack from this template. Parameters: DynamoDBAthenaConnectorFunctionName: Type: String Default: dynamodb-athena-connector-udf Transform: AWS::Serverless-2016-10-31 Resources: S3Bucket: Type: AWS::S3::Bucket Properties: BucketName: !Sub 'athena-dynamodb-ml-${AWS::AccountId}-${AWS::Region}' DynamoDBTableDocklessVehicles: Type: AWS::DynamoDB::Table Properties: TableName: DynamoDBTableDocklessVehicles AttributeDefinitions: - AttributeName: TripID AttributeType: S KeySchema: - AttributeName: TripID KeyType: HASH ProvisionedThroughput: ReadCapacityUnits: 5 WriteCapacityUnits: 10 DDBLambdaLoaderLogGroup: Type: AWS::Logs::LogGroup Properties: LogGroupName: /aws/lambda/load_dynamo_db_lambda RetentionInDays: 7 DDBLambdaLoader: DependsOn: DDBLambdaLoaderLogGroup Type: AWS::Lambda::Function Properties: FunctionName: load_dynamo_db_lambda Code: ZipFile: | import json import boto3 import os import csv import codecs import sys import traceback import cfnresponse from datetime import datetime s3 = boto3.resource('s3') dynamodb = boto3.resource('dynamodb') location_client = boto3.client('location') bucket = os.environ['S3_CSV_BUCKET'] key = os.environ['S3_CSV_BUCKET_KEY'] tableName = os.environ['DYNAMODB_TABLE_NAME'] stop_count = os.environ['END_BATCH_NUM'] # location_index_name = os.environ['LocationIndexName'] def lambda_handler(event, context): try: print('In Lambda.......................................') if event['RequestType'] == 'Create': print('In Create....') obj = s3.Object(bucket, key).get()['Body'] table = dynamodb.Table(tableName) i = 0 for row in csv.DictReader(codecs.getreader('utf-8')(obj)): start_date_time = row['StartDate']+ " " + row['StartTime'] end_date_time = row['EndDate']+ " " + row['EndTime'] try: row['start_epoch'] = int(datetime.strptime(start_date_time, '%Y-%m-%d %H:%M').strftime('%s')) row['end_epoch'] = int(datetime.strptime(end_date_time, '%Y-%m-%d %H:%M').strftime('%s')) with table.batch_writer() as batch1: batch1.put_item(Item=row) print(f'Item {i} inserted in db') except Exception as e: traceback.print_exc() error_stack = traceback.format_exc() print(error_stack) print(f'ERROR on record ^^^^^^^^^ {i} ^^^^^^^^^^^^^^^^^^^^^^') i+=1 cfnresponse.send(event, context, cfnresponse.SUCCESS, None,None ) elif event['RequestType'] == 'Update': print('In Update....') cfnresponse.send(event, context, cfnresponse.SUCCESS, None,None ) elif event['RequestType'] == 'Delete': print('In Delete....') cfnresponse.send(event, context, cfnresponse.SUCCESS, None,None ) else: print('No RequestType Status found....') cfnresponse.send(event, context, cfnresponse.SUCCESS, None,None ) except Exception as e: traceback.print_exc() error_stack = traceback.format_exc() print(error_stack) cfnresponse.send(event, context, cfnresponse.FAILED, None, None) Handler: index.lambda_handler Environment: Variables: DYNAMODB_TABLE_NAME: !Ref 'DynamoDBTableDocklessVehicles' S3_CSV_BUCKET: aws-bigdata-blog S3_CSV_BUCKET_KEY: artifacts/bdb-1462-athena-dynamodb-ml/data/dockless-vehicles-3_2019-09-07.csv REGION: !Sub 'AWS::REGION' START_BATCH_NUM: 0, END_BATCH_NUM: 1500 Role: !GetAtt 'DDBLambdaLoaderRole.Arn' Runtime: python3.8 Timeout: 300 DDBLambdaLoaderRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole Path: / Policies: - PolicyName: root PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - s3:GetObject Resource: - arn:aws:s3:::aws-bigdata-blog/artifacts/bdb-1462-athena-dynamodb-ml/* - Effect: Allow Action: - logs:CreateLogGroup - logs:CreateLogStream - logs:PutLogEvents Resource: !Sub 'arn:${AWS::Partition}:logs:*:*:*' - Effect: Allow Action: - dynamodb:DescribeTable - dynamodb:Query - dynamodb:Scan - dynamodb:GetItem - dynamodb:PutItem - dynamodb:UpdateItem - dynamodb:DeleteItem - dynamodb:BatchWriteItem - dynamodb:BatchDeleteItem Resource: !Sub 'arn:${AWS::Partition}:dynamodb:*:*:*' - Effect: Allow Action: - geo:CreatePlaceIndex - geo:DeletePlaceIndex Resource: '*' CRDataLoader: Type: Custom::DDBLambdaLoader Properties: ServiceToken: !GetAtt 'DDBLambdaLoader.Arn' DependsOn: - DynamoDBTableDocklessVehicles - DDBLambdaLoaderRole V2EngineWorkGroup: Type: AWS::Athena::WorkGroup Properties: Name: V2EngineWorkGroup RecursiveDeleteOption: true WorkGroupConfiguration: EnforceWorkGroupConfiguration: true EngineVersion: SelectedEngineVersion: Athena engine version 2 PublishCloudWatchMetricsEnabled: true ResultConfiguration: OutputLocation: !Sub 's3://${S3Bucket}/athena_output' CakgGlueDatabase: Type: AWS::Glue::Database Properties: CatalogId: !Ref 'AWS::AccountId' DatabaseInput: Name: !Sub 'athena-ml-db-${AWS::AccountId}' Description: AWS Glue Database for Dockless Vehicles AthenaMLQuery1: DependsOn: - AthenaDynamoDBConnector - SageMakerEndpoint Type: AWS::Athena::NamedQuery Properties: Database: !Ref 'CakgGlueDatabase' Description: 'Q1: Query to Load Neighborhoods Data from Shape Files.' Name: 'Q1: Neighborhoods' WorkGroup: !Sub '${V2EngineWorkGroup}' QueryString: | CREATE EXTERNAL TABLE `louisville_ky_neighborhoods`( `objectid` int, `nh_code` int, `nh_name` string, `shapearea` double, `shapelen` double, `bb_west` double, `bb_south` double, `bb_east` double, `bb_north` double, `shape` string, `cog_longitude` double, `cog_latitude` double) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://aws-bigdata-blog/artifacts/bdb-1462-athena-dynamodb-ml/louisville_ky_neighborhoods/' TBLPROPERTIES ( 'has_encrypted_data'='false', 'transient_lastDdlTime'='1620312624') AthenaMLQuery2: DependsOn: - AthenaDynamoDBConnector Type: AWS::Athena::NamedQuery Properties: Database: !Ref 'CakgGlueDatabase' Description: 'Q2: Query to predict scooter demand for the next hour by Neighbourhood / Geo - coordinates' Name: 'Q2: DynamoDBAthenaMLScooterPredict' WorkGroup: !Sub '${V2EngineWorkGroup}' QueryString: !Sub - | ------------------------- Q2: Query to predict scooter demand for the next hour by Neighbourhood / Geo - coordinates -------------------- -- -- Define function to represent the SageMaker model and endpoint for the prediction USING EXTERNAL FUNCTION predict_demand( location_id BIGINT, hr BIGINT , dow BIGINT, n_pickup_1 BIGINT, n_pickup_2 BIGINT, n_pickup_3 BIGINT, n_pickup_4 BIGINT, n_dropoff_1 BIGINT, n_dropoff_2 BIGINT, n_dropoff_3 BIGINT, n_dropoff_4 BIGINT ) RETURNS DOUBLE SAGEMAKER '${SageMakerEndpoint}' -- query raw trip data from DynamoDB, we only need the past five hours (i.e. 5 hour time window ending with the time we set as current time) WITH trips_raw AS ( SELECT * , from_unixtime(start_epoch) AS t_start , from_unixtime(end_epoch) AS t_end FROM "lambda:${AthenaDynamoDBConnectorFunction}"."${Database}"."${DynamoDBTableDocklessVehicles}" dls WHERE start_epoch BETWEEN to_unixtime(TIMESTAMP '2019-09-07 15:00' - interval '5' hour) AND to_unixtime(TIMESTAMP '2019-09-07 15:00') OR end_epoch BETWEEN to_unixtime(TIMESTAMP '2019-09-07 15:00' - interval '5' hour) AND to_unixtime(TIMESTAMP '2019-09-07 15:00') ), -- prepare individual trip records trips AS ( SELECT tr.* , nb1.nh_code AS start_nbid , nb2.nh_code AS end_nbid , floor( ( tr.start_epoch - to_unixtime(TIMESTAMP '2019-09-07 15:00' - interval '5' hour) )/3600 ) AS t_hour_start , floor( ( tr.end_epoch - to_unixtime(TIMESTAMP '2019-09-07 15:00' - interval '5' hour) )/3600 ) AS t_hour_end FROM trips_raw tr JOIN "AwsDataCatalog"."${Database}"."louisville_ky_neighborhoods" nb1 ON ST_Within(ST_POINT(CAST(tr.startlongitude AS DOUBLE), CAST(tr.startlatitude AS DOUBLE)), ST_GeometryFromText(nb1.shape)) JOIN "AwsDataCatalog"."${Database}"."louisville_ky_neighborhoods" nb2 ON ST_Within(ST_POINT(CAST(tr.endlongitude AS DOUBLE), CAST(tr.endlatitude AS DOUBLE)), ST_GeometryFromText(nb2.shape)) ), -- aggregating trips over start time and start neighborhood start_count AS ( SELECT start_nbid AS nbid, COUNT(start_nbid) AS n_total_start , SUM(CASE WHEN t_hour_start=1 THEN 1 ELSE 0 END) AS n1_start , SUM(CASE WHEN t_hour_start=2 THEN 1 ELSE 0 END) AS n2_start , SUM(CASE WHEN t_hour_start=3 THEN 1 ELSE 0 END) AS n3_start , SUM(CASE WHEN t_hour_start=4 THEN 1 ELSE 0 END) AS n4_start FROM trips WHERE start_nbid BETWEEN 1 AND 98 GROUP BY start_nbid ), -- aggregating trips over end time and end neighborhood end_count AS ( SELECT end_nbid AS nbid, COUNT(end_nbid) AS n_total_end , SUM(CASE WHEN t_hour_end=1 THEN 1 ELSE 0 END) AS n1_end , SUM(CASE WHEN t_hour_end=2 THEN 1 ELSE 0 END) AS n2_end , SUM(CASE WHEN t_hour_end=3 THEN 1 ELSE 0 END) AS n3_end , SUM(CASE WHEN t_hour_end=4 THEN 1 ELSE 0 END) AS n4_end FROM trips WHERE end_nbid BETWEEN 1 AND 98 GROUP BY end_nbid ), -- call the predictive model to get the demand forecast for the next hour predictions AS ( SELECT sc.nbid , predict_demand( CAST(sc.nbid AS BIGINT), hour(TIMESTAMP '2019-09-07 15:00'), day_of_week(TIMESTAMP '2019-09-07 15:00'), sc.n1_start, sc.n2_start, sc.n3_start, sc.n4_start, ec.n1_end, ec.n2_end, ec.n3_end, ec.n4_end ) AS n_demand FROM start_count sc JOIN end_count ec ON sc.nbid=ec.nbid ) -- finally join the predicted values with the neighborhoods' meta data SELECT nh.nh_code AS nbid, nh.nh_name AS neighborhood, nh.cog_longitude AS longitude, nh.cog_latitude AS latitude , COALESCE( round(predictions.n_demand), 0 ) AS demand FROM "AwsDataCatalog"."${Database}"."louisville_ky_neighborhoods" nh LEFT JOIN predictions ON nh.nh_code=predictions.nbid ORDER BY demand desc - AthenaDynamoDBConnectorFunction: !Ref 'DynamoDBAthenaConnectorFunctionName' DynamoDBTableDocklessVehicles: !Ref 'DynamoDBTableDocklessVehicles' SageMakerEndpoint: !GetAtt 'SageMakerEndpoint.EndpointName' Database: !Ref 'CakgGlueDatabase' DDBAthenaConnectorLogGroup: Type: AWS::Logs::LogGroup Properties: LogGroupName: !Sub '/aws/lambda/${DynamoDBAthenaConnectorFunctionName}' RetentionInDays: 7 AthenaDynamoDBConnector: DependsOn: DDBAthenaConnectorLogGroup Type: AWS::Serverless::Application Properties: Location: ApplicationId: arn:aws:serverlessrepo:us-east-1:292517598671:applications/AthenaDynamoDBConnector SemanticVersion: 2021.18.1 Parameters: AthenaCatalogName: !Ref 'DynamoDBAthenaConnectorFunctionName' SpillBucket: !Ref 'S3Bucket' QuickSightPolicy: Type: AWS::IAM::ManagedPolicy Properties: PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - glue:GetTable - athena:GetQueryResultsStream - athena:GetDataCatalog - athena:StartQueryExecution - athena:GetQueryExecution - athena:ListDataCatalogs - athena:GetQueryExecution - athena:StartQueryExecution - athena:GetDataCatalog - athena:ListDataCatalogs - athena:ListDatabases - athena:ListTableMetadata - athena:GetQueryResultsStream - athena:ListWorkGroups - athena:StopQueryExecution - glue:GetDatabases - glue:GetTables - glue:GetTable - s3:GetObject - s3:GetBucketLocation - s3:GetObject - s3:PutObject - s3:HeadBucket - s3:ListBucket - s3:GetBucketLocation - s3:ListBuckets - s3:ListAllMyBuckets - sagemaker:InvokeEndpoint - lambda:InvokeFunction - dynamodb:DescribeTable - dynamodb:Query - dynamodb:Scan - dynamodb:GetItem - dynamodb:PutItem - dynamodb:UpdateItem - dynamodb:DeleteItem - dynamodb:BatchWriteItem - dynamodb:BatchDeleteItem Resource: '*' SageMakerModelRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - sagemaker.amazonaws.com Action: - sts:AssumeRole Path: / Policies: - PolicyName: root PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - cloudwatch:PutMetricData - logs:CreateLogStream - logs:PutLogEvents - logs:CreateLogGroup - logs:DescribeLogStreams - ecr:GetAuthorizationToken Resource: '*' - Effect: Allow Action: - s3:GetObject Resource: arn:aws:s3:::aws-bigdata-blog/artifacts/bdb-1462-athena-dynamodb-ml/model/model.tar.gz - Effect: Allow Action: - ecr:BatchCheckLayerAvailability - ecr:GetDownloadUrlForLayer - ecr:BatchGetImage Resource: '*' SageMakerModel: DependsOn: SageMakerEndpointLogGroup Type: AWS::SageMaker::Model Properties: ExecutionRoleArn: !GetAtt 'SageMakerModelRole.Arn' PrimaryContainer: Image: 683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemaker-xgboost:1.2-1 Mode: SingleModel ModelDataUrl: s3://aws-bigdata-blog/artifacts/bdb-1462-athena-dynamodb-ml/model/model.tar.gz SageMakerEndpointConfig: Type: AWS::SageMaker::EndpointConfig Properties: ProductionVariants: - InitialInstanceCount: 1 InitialVariantWeight: 1 InstanceType: ml.t2.medium ModelName: !GetAtt 'SageMakerModel.ModelName' VariantName: !GetAtt 'SageMakerModel.ModelName' SageMakerEndpoint: DependsOn: SageMakerEndpointLogGroup Type: AWS::SageMaker::Endpoint Properties: EndpointName: Sg-athena-ml-dynamodb-model-endpoint EndpointConfigName: !GetAtt 'SageMakerEndpointConfig.EndpointConfigName' SageMakerEndpointLogGroup: Type: AWS::Logs::LogGroup Properties: LogGroupName: /aws/sagemaker/Endpoints/Sg-athena-ml-dynamodb-model-endpoint RetentionInDays: 7 Outputs: S3Bucket: Description: S3 bucket Value: !Ref 'S3Bucket' QuickSightPolicy: Description: QuickSight Policy ARN Value: !Ref 'QuickSightPolicy'