AWSTemplateFormatVersion: 2010-09-09 Transform: AWS::Serverless-2016-10-31 Description: Deploy AppSync example to expose IoT data in Amazon Timestream DB (uksb-1ttmurd0b) Parameters: TimestreamCellEndpoint: Description: timestream cell endpoint name Type: String Default: cell1 AllowedPattern: ^[a-zA-Z0-9_-]*$ ConstraintDescription: Must contain only alphanumeric characters. Resources: iotstreamDatabase: Type: AWS::Timestream::Database sensoreventsTable: Type: AWS::Timestream::Table Properties: DatabaseName: !Ref iotstreamDatabase iotTimestreamAppSyncApi: Type: AWS::AppSync::GraphQLApi Properties: Name: appsync-timestream-api AuthenticationType: API_KEY GraphQLSchema: Type: AWS::AppSync::GraphQLSchema Properties: ApiId: !GetAtt iotTimestreamAppSyncApi.ApiId Definition: | schema { query: Query } type IOT { fleet: String fuel_capacity_in_litres: String load_capacity_in_tons: String make: String current_fuel_lvl_in_litres: String gps_location_latlong: String model: String truck_id: String } type Query { getSensorDataUsingLambdaResolver(durationInMinutes: Int): [IOT] getSensorDataUsingJsResolver(durationInMinutes: Int): [IOT] } ApiKey: Type: AWS::AppSync::ApiKey DependsOn: - GraphQLSchema Properties: ApiId: !GetAtt iotTimestreamAppSyncApi.ApiId Description: Public access HttpDataSourceRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Action: "sts:AssumeRole" Effect: Allow Principal: Service: appsync.amazonaws.com Policies: - PolicyName: timestream PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - timestream:Select Resource: !Sub "arn:${AWS::Partition}:timestream:${AWS::Region}:${AWS::AccountId}:database/${iotstreamDatabase}/table/${sensoreventsTable.Name}" HttpDataSource: Type: AWS::AppSync::DataSource Properties: ApiId: !GetAtt iotTimestreamAppSyncApi.ApiId Type: HTTP Name: timestream Description: timestream HttpConfig: AuthorizationConfig: AuthorizationType: AWS_IAM AwsIamConfig: SigningRegion: !Ref AWS::Region SigningServiceName: timestream Endpoint: !Sub https://query-${TimestreamCellEndpoint}.timestream.${AWS::Region}.${AWS::URLSuffix} ServiceRoleArn: !GetAtt HttpDataSourceRole.Arn LambdaDataSourceRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Action: "sts:AssumeRole" Effect: Allow Principal: Service: appsync.amazonaws.com Policies: - PolicyName: lambdaInvokeFunction PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: "lambda:InvokeFunction" Resource: - !GetAtt DataSourceFunction.Arn - !Sub "${DataSourceFunction.Arn}:*" LambdaDataSource: Type: AWS::AppSync::DataSource Properties: ApiId: !GetAtt iotTimestreamAppSyncApi.ApiId Type: AWS_LAMBDA Name: lambdaDatasource LambdaConfig: LambdaFunctionArn: !GetAtt DataSourceFunction.Arn ServiceRoleArn: !GetAtt LambdaDataSourceRole.Arn LambdaResolver: Type: AWS::AppSync::Resolver DependsOn: - GraphQLSchema Properties: ApiId: !GetAtt iotTimestreamAppSyncApi.ApiId FieldName: getSensorDataUsingLambdaResolver TypeName: Query DataSourceName: !GetAtt LambdaDataSource.Name Kind: UNIT FunctionConfiguration: Type: AWS::AppSync::FunctionConfiguration DependsOn: - GraphQLSchema Properties: ApiId: !GetAtt iotTimestreamAppSyncApi.ApiId DataSourceName: !GetAtt HttpDataSource.Name Name: timestreamQuery FunctionVersion: 2018-05-29 Runtime: Name: APPSYNC_JS RuntimeVersion: 1.0.0 Code: !Sub | import { util } from '@aws-appsync/utils' function request(ctx) { const { durationInMinutes } = ctx.args; const QUERY_1 = `SELECT * FROM "${iotstreamDatabase}"."${sensoreventsTable.Name}" where time > ago(${!durationInMinutes}m)`; return { method: 'POST', params: { headers: { 'content-type': 'application/x-amz-json-1.0', 'x-amz-target': 'Timestream_20181101.Query', }, body: { "ClientToken": util.autoId(), "QueryString": QUERY_1 } }, "resourcePath": '/' }; } /** * Returns the result * @param ctx the context object holds contextual information about the function invocation. */ function response(ctx) { if (ctx.result.statusCode !== 200) { return util.appendError('none 200 error'); } else if (ctx.error) { return util.appendError(ctx.error.message, ctx.error.type, ctx.result); } const queryResult = JSON.parse(ctx.result.body) return parseQueryResult(queryResult); } function parseQueryResult(data) { const cols = data.ColumnInfo; return data.Rows.map((r) => parseDatum(cols, r)); } function parseDatum(cols, row) { return row.Data.reduce((obj, datum, i) => { const key = cols[i].Name.replace('::', '_'); const colType = cols[i].Type; console.log(key, colType, datum.ScalarValue); if (datum.NullValue) { const _key = cols[i].Name.split('::')[0]; obj[_key] = null; return obj; } obj[key] = datum.ScalarValue; return obj; }, {}); } export { request, response }; HttpResolver: Type: AWS::AppSync::Resolver DependsOn: - GraphQLSchema Properties: ApiId: !GetAtt iotTimestreamAppSyncApi.ApiId FieldName: getSensorDataUsingJsResolver TypeName: Query Code: | export function request(){} export function response(ctx){return ctx.prev.result} Kind: PIPELINE Runtime: Name: APPSYNC_JS RuntimeVersion: 1.0.0 PipelineConfig: Functions: - !GetAtt FunctionConfiguration.FunctionId DataSourceFunction: Type: AWS::Serverless::Function Metadata: cfn_nag: rules_to_suppress: - id: W89 reason: VPC not required. - id: W92 reason: Reserved Concurrency not required. - id: W11 reason: Resource level permissions not available for this API. Properties: Runtime: nodejs16.x MemorySize: 1024 Timeout: 2 Handler: index.handler Environment: Variables: TIMESTREAM_DB_NAME: !Ref iotstreamDatabase TIMESTREAM_TABLE_NAME: !GetAtt sensoreventsTable.Name Policies: - Version: 2012-10-17 Statement: - Effect: Allow Action: - timestream:DescribeEndpoints Resource: "*" - Effect: Allow Action: - timestream:Select Resource: !Sub "arn:${AWS::Partition}:timestream:${AWS::Region}:${AWS::AccountId}:database/${iotstreamDatabase}/table/${sensoreventsTable.Name}" InlineCode: | "use strict"; Object.defineProperty(exports, "__esModule", { value: true }); const AWS = require("aws-sdk"); const tsdbClient = new AWS.TimestreamQuery(); const getSensorDataUsingLambdaResolver = async (durationInMinutes) => { try { let response; const QUERY_1 = `SELECT * FROM "${process.env.TIMESTREAM_DB_NAME}"."${process.env.TIMESTREAM_TABLE_NAME}" where time > ago(${durationInMinutes}m)`; console.log(QUERY_1); var nextToken = undefined; const params = { QueryString: QUERY_1, NextToken: nextToken, }; response = await tsdbClient.query(params).promise(); let result = JSON.parse(parseQueryResult(response)); console.log('query output', result); return result; } catch (err) { console.log(err); throw err; } }; function parseQueryResult(response) { var res = ""; const queryStatus = response.QueryStatus; console.log("Current query status: " + JSON.stringify(queryStatus)); const columnInfo = response.ColumnInfo; const rows = response.Rows; res = "["; var i = 0; rows.forEach(function (row) { if (i != 0) res += ","; i = i + 1; res += parseRow(columnInfo, row); }); return res + "]"; } function parseRow(columnInfo, row) { const data = row.Data; const rowOutput = []; var i; for (i = 0; i < data.length; i++) { let info = columnInfo[i]; let datum = data[i]; rowOutput.push(parseDatum(info, datum)); } return `{${rowOutput.join(", ")}}`; } function parseDatum(info, datum) { if (datum.NullValue != null && datum.NullValue === true) { return `\"${info.Name.split(':')[0]}\":\"\"`; } const columnType = info.Type; // If the column is of TimeSeries Type if (columnType.TimeSeriesMeasureValueColumnInfo != null) { return parseTimeSeries(info, datum); } // If the column is of Array Type else if (columnType.ArrayColumnInfo != null) { const arrayValues = datum.ArrayValue; return `\"${info.Name.split(':')[0]}\":${parseArray(info.Type.ArrayColumnInfo, arrayValues)}`; } // If the column is of Row Type else if (columnType.RowColumnInfo != null) { const rowColumnInfo = info.Type.RowColumnInfo; const rowValues = datum.RowValue; return parseRow(rowColumnInfo, rowValues); } // If the column is of Scalar Type else { return parseScalarType(info, datum); } } function parseTimeSeries(info, datum) { const timeSeriesOutput = []; datum.TimeSeriesValue.forEach(function (dataPoint) { timeSeriesOutput.push(`{time=${dataPoint.Time}, value=${parseDatum(info.Type.TimeSeriesMeasureValueColumnInfo, dataPoint.Value)}}`); }); return `[${timeSeriesOutput.join(", ")}]`; } function parseScalarType(info, datum) { return parseColumnName(info) + "\"" + datum.ScalarValue + "\""; } function parseColumnName(info) { if (info.Name.split('::')[1] != null) return info.Name == null ? "" : `\"${info.Name.split('::')[0] + '_' + info.Name.split('::')[1]}\":`; else return info.Name == null ? "" : `\"${info.Name}\":`; } function parseArray(arrayColumnInfo, arrayValues) { const arrayOutput = []; arrayValues.forEach(function (datum) { arrayOutput.push(parseDatum(arrayColumnInfo, datum)); }); return `[${arrayOutput.join(", ")}]`; } exports.handler = async (event) => { switch (event.info.fieldName) { case "getSensorDataUsingLambdaResolver": return await (0, getSensorDataUsingLambdaResolver)(event.arguments.durationInMinutes); default: return null; } }; ScheduledFunction: Type: AWS::Serverless::Function Metadata: cfn_nag: rules_to_suppress: - id: W89 reason: VPC not required. - id: W92 reason: Reserved Concurrency not required. - id: W11 reason: Resource level permissions not available for this API. Properties: Runtime: python3.9 MemorySize: 1024 Timeout: 5 Handler: index.lambda_handler Environment: Variables: TIMESTREAM_DB_NAME: !Ref iotstreamDatabase TIMESTREAM_TABLE_NAME: !GetAtt sensoreventsTable.Name Policies: - Version: 2012-10-17 Statement: - Effect: Allow Action: - timestream:DescribeEndpoints Resource: "*" - Effect: Allow Action: - timestream:WriteRecords Resource: !Sub "arn:${AWS::Partition}:timestream:${AWS::Region}:${AWS::AccountId}:database/${iotstreamDatabase}/table/${sensoreventsTable.Name}" Events: ScheduleEvent: Type: Schedule Properties: Schedule: rate(2 minutes) InlineCode: | #!/usr/bin/python import boto3 from botocore.config import Config import json #import random import time import os DATABASE_NAME = os.getenv('TIMESTREAM_DB_NAME') TABLE_NAME = os.getenv('TIMESTREAM_TABLE_NAME') class IngestTimestream: def __init__(self, database_name, table_name, write_client): self.database_name = database_name self.table_name = table_name self.write_client = write_client @staticmethod def print_rejected_records_exceptions(err): print("RejectedRecords: ", err) for rr in err.response["RejectedRecords"]: print("Rejected Index " + str(rr["RecordIndex"]) + ": " + rr["Reason"]) if "ExistingVersion" in rr: print("Rejected record existing version: ", rr["ExistingVersion"]) @staticmethod def current_milli_time(): return str(int(round(time.time() * 1000))) def write_records(self): current_time = IngestTimestream.current_milli_time() fuel_in_litres= 10 #random.randint(10,80) gps_location_latlong='55.3618,-3.4433' dimensions = [ {'Name': 'fleet', 'Value': "acme_fleet"}, {'Name': 'fuel_capacity_in_litres', 'Value': str(80)}, {'Name': 'load_capacity_in_tons', 'Value': str(20)}, {'Name': 'make', 'Value': "volvo"}, {'Name': 'model', 'Value': "v2"}, {'Name': 'truck_id', 'Value': "truck123"} ] common_attributes = { 'Dimensions': dimensions, 'Time': current_time } current_fuel_lvl_in_litres = { 'Name': 'current_fuel_lvl_in_litres', 'Value': str(fuel_in_litres), 'Type': 'BIGINT' } gps_location_latlong = { 'Name': 'gps_location_latlong', 'Value': str(gps_location_latlong), 'Type': 'VARCHAR' } computational_record = { 'MeasureName': 'computational_record', 'MeasureValues': [current_fuel_lvl_in_litres,gps_location_latlong], 'MeasureValueType': 'MULTI' } records = [computational_record] try: result = self.write_client.write_records(DatabaseName=self.database_name, TableName=self.table_name, Records=records, CommonAttributes=common_attributes) if result and result['ResponseMetadata']: print("WriteRecords Status: [%s]" % result['ResponseMetadata']['HTTPStatusCode']) except self.write_client.exceptions.RejectedRecordsException as err: IngestTimestream.print_rejected_records_exceptions(err) except Exception as err: print("Error:", err) def lambda_handler(event, context): session = boto3.Session(); write_client = session.client('timestream-write', config=Config(region_name='eu-west-1', read_timeout=20, max_pool_connections=5000,retries={'max_attempts': 10})) ingestTimestream=IngestTimestream(DATABASE_NAME, TABLE_NAME, write_client) ingestTimestream.write_records() return { 'statusCode': 200, 'body': json.dumps('Hello from simulator!') } Outputs: GraphQLAPIURL: Value: !GetAtt iotTimestreamAppSyncApi.GraphQLUrl GraphQLAPIKey: Value: !GetAtt ApiKey.ApiKey StackRegion: Value: !Ref AWS::Region TimestreamDatabaseARN: Value: !GetAtt iotstreamDatabase.Arn TableARN: Value: !GetAtt sensoreventsTable.Arn