AWSTemplateFormatVersion: 2010-09-09

Transform: AWS::Serverless-2016-10-31

Description: Deploy AppSync example to expose IoT data in Amazon Timestream DB (uksb-1ttmurd0b)

Parameters:
  TimestreamCellEndpoint:
    Description: timestream cell endpoint name
    Type: String
    Default: cell1
    AllowedPattern: ^[a-zA-Z0-9_-]*$
    ConstraintDescription: Must contain only alphanumeric characters.

Resources:
  iotstreamDatabase:
    Type: AWS::Timestream::Database

  sensoreventsTable:
    Type: AWS::Timestream::Table
    Properties:
      DatabaseName: !Ref iotstreamDatabase

  iotTimestreamAppSyncApi:
    Type: AWS::AppSync::GraphQLApi
    Properties:
      Name: appsync-timestream-api
      AuthenticationType: API_KEY

  GraphQLSchema:
    Type: AWS::AppSync::GraphQLSchema
    Properties:
      ApiId: !GetAtt iotTimestreamAppSyncApi.ApiId
      Definition: |
        schema {
          query: Query
        }

        type IOT {
          fleet: String
          fuel_capacity_in_litres: String
          load_capacity_in_tons: String
          make: String
          current_fuel_lvl_in_litres: String
          gps_location_latlong: String
          model: String
          truck_id: String
        }

        type Query {
          getSensorDataUsingLambdaResolver(durationInMinutes: Int): [IOT]
          getSensorDataUsingJsResolver(durationInMinutes: Int): [IOT]
        }

  ApiKey:
    Type: AWS::AppSync::ApiKey
    DependsOn:
      - GraphQLSchema
    Properties:
      ApiId: !GetAtt iotTimestreamAppSyncApi.ApiId
      Description: Public access

  HttpDataSourceRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Action: "sts:AssumeRole"
            Effect: Allow
            Principal:
              Service: appsync.amazonaws.com
      Policies:
        - PolicyName: timestream
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - timestream:Select
                Resource: !Sub "arn:${AWS::Partition}:timestream:${AWS::Region}:${AWS::AccountId}:database/${iotstreamDatabase}/table/${sensoreventsTable.Name}"

  HttpDataSource:
    Type: AWS::AppSync::DataSource
    Properties:
      ApiId: !GetAtt iotTimestreamAppSyncApi.ApiId
      Type: HTTP
      Name: timestream
      Description: timestream
      HttpConfig:
        AuthorizationConfig:
          AuthorizationType: AWS_IAM
          AwsIamConfig:
            SigningRegion: !Ref AWS::Region
            SigningServiceName: timestream
        Endpoint: !Sub https://query-${TimestreamCellEndpoint}.timestream.${AWS::Region}.${AWS::URLSuffix}
      ServiceRoleArn: !GetAtt HttpDataSourceRole.Arn

  LambdaDataSourceRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Action: "sts:AssumeRole"
            Effect: Allow
            Principal:
              Service: appsync.amazonaws.com
      Policies:
        - PolicyName: lambdaInvokeFunction
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action: "lambda:InvokeFunction"
                Resource:
                  - !GetAtt DataSourceFunction.Arn
                  - !Sub "${DataSourceFunction.Arn}:*"

  LambdaDataSource:
    Type: AWS::AppSync::DataSource
    Properties:
      ApiId: !GetAtt iotTimestreamAppSyncApi.ApiId
      Type: AWS_LAMBDA
      Name: lambdaDatasource
      LambdaConfig:
        LambdaFunctionArn: !GetAtt DataSourceFunction.Arn
      ServiceRoleArn: !GetAtt LambdaDataSourceRole.Arn

  LambdaResolver:
    Type: AWS::AppSync::Resolver
    DependsOn:
      - GraphQLSchema
    Properties:
      ApiId: !GetAtt iotTimestreamAppSyncApi.ApiId
      FieldName: getSensorDataUsingLambdaResolver
      TypeName: Query
      DataSourceName: !GetAtt LambdaDataSource.Name
      Kind: UNIT

  FunctionConfiguration:
    Type: AWS::AppSync::FunctionConfiguration
    DependsOn:
      - GraphQLSchema
    Properties:
      ApiId: !GetAtt iotTimestreamAppSyncApi.ApiId
      DataSourceName: !GetAtt HttpDataSource.Name
      Name: timestreamQuery
      FunctionVersion: 2018-05-29
      Runtime:
        Name: APPSYNC_JS
        RuntimeVersion: 1.0.0
      Code: !Sub |
        import { util } from '@aws-appsync/utils'

        function request(ctx) {
            const { durationInMinutes } = ctx.args;
            const QUERY_1 = `SELECT * FROM "${iotstreamDatabase}"."${sensoreventsTable.Name}" where time > ago(${!durationInMinutes}m)`;

            return {
                method: 'POST',
                params: {
                    headers: {
                        'content-type': 'application/x-amz-json-1.0',
                        'x-amz-target': 'Timestream_20181101.Query',
                    },
                    body: {
                        "ClientToken": util.autoId(),
                        "QueryString": QUERY_1
                    }
                },
                "resourcePath": '/'
            };
        }

        /**
        * Returns the result
        * @param ctx the context object holds contextual information about the function invocation.
        */
        function response(ctx) {
            if (ctx.result.statusCode !== 200) {
                return util.appendError('none 200 error');
            } else if (ctx.error) {
                return util.appendError(ctx.error.message, ctx.error.type, ctx.result);
            }

            const queryResult = JSON.parse(ctx.result.body)
            return parseQueryResult(queryResult);
        }

        function parseQueryResult(data) {
            const cols = data.ColumnInfo;
            return data.Rows.map((r) => parseDatum(cols, r));
        }
        function parseDatum(cols, row) {
            return row.Data.reduce((obj, datum, i) => {
                const key = cols[i].Name.replace('::', '_');
                const colType = cols[i].Type;
                console.log(key, colType, datum.ScalarValue);
                if (datum.NullValue) {
                    const _key = cols[i].Name.split('::')[0];
                    obj[_key] = null;
                    return obj;
                }
                obj[key] = datum.ScalarValue;
                return obj;
            }, {});
        } export { request, response };

  HttpResolver:
    Type: AWS::AppSync::Resolver
    DependsOn:
      - GraphQLSchema
    Properties:
      ApiId: !GetAtt iotTimestreamAppSyncApi.ApiId
      FieldName: getSensorDataUsingJsResolver
      TypeName: Query
      Code: |
        export function request(){}
        export function response(ctx){return ctx.prev.result}
      Kind: PIPELINE
      Runtime:
        Name: APPSYNC_JS
        RuntimeVersion: 1.0.0
      PipelineConfig:
        Functions:
          - !GetAtt FunctionConfiguration.FunctionId

  DataSourceFunction:
    Type: AWS::Serverless::Function
    Metadata:
      cfn_nag:
        rules_to_suppress:
          - id: W89
            reason: VPC not required.
          - id: W92
            reason: Reserved Concurrency not required.
          - id: W11
            reason: Resource level permissions not available for this API.
    Properties:
      Runtime: nodejs16.x
      MemorySize: 1024
      Timeout: 2
      Handler: index.handler
      Environment:
        Variables:
          TIMESTREAM_DB_NAME: !Ref iotstreamDatabase
          TIMESTREAM_TABLE_NAME: !GetAtt sensoreventsTable.Name
      Policies:
        - Version: 2012-10-17
          Statement:
            - Effect: Allow
              Action:
                - timestream:DescribeEndpoints
              Resource: "*"
            - Effect: Allow
              Action:
                - timestream:Select
              Resource: !Sub "arn:${AWS::Partition}:timestream:${AWS::Region}:${AWS::AccountId}:database/${iotstreamDatabase}/table/${sensoreventsTable.Name}"
      InlineCode: |
        "use strict";

        Object.defineProperty(exports, "__esModule", { value: true });

        const AWS = require("aws-sdk");
        const tsdbClient = new AWS.TimestreamQuery();
        const getSensorDataUsingLambdaResolver = async (durationInMinutes) => {
            try {
                let response;
                const QUERY_1 = `SELECT * FROM "${process.env.TIMESTREAM_DB_NAME}"."${process.env.TIMESTREAM_TABLE_NAME}" where time > ago(${durationInMinutes}m)`;
                console.log(QUERY_1);
                var nextToken = undefined;
                const params = {
                    QueryString: QUERY_1,
                    NextToken: nextToken,
                };
                response = await tsdbClient.query(params).promise();
                let result = JSON.parse(parseQueryResult(response));
                console.log('query output', result);
                return result;
            }
            catch (err) {
                console.log(err);
                throw err;
            }
        };
        function parseQueryResult(response) {
            var res = "";
            const queryStatus = response.QueryStatus;
            console.log("Current query status: " + JSON.stringify(queryStatus));
            const columnInfo = response.ColumnInfo;
            const rows = response.Rows;
            res = "[";
            var i = 0;
            rows.forEach(function (row) {
                if (i != 0)
                    res += ",";
                i = i + 1;
                res += parseRow(columnInfo, row);
            });
            return res + "]";
        }
        function parseRow(columnInfo, row) {
            const data = row.Data;
            const rowOutput = [];
            var i;
            for (i = 0; i < data.length; i++) {
                let info = columnInfo[i];
                let datum = data[i];
                rowOutput.push(parseDatum(info, datum));
            }
            return `{${rowOutput.join(", ")}}`;
        }
        function parseDatum(info, datum) {
            if (datum.NullValue != null && datum.NullValue === true) {
                return `\"${info.Name.split(':')[0]}\":\"\"`;
            }
            const columnType = info.Type;
            // If the column is of TimeSeries Type
            if (columnType.TimeSeriesMeasureValueColumnInfo != null) {
                return parseTimeSeries(info, datum);
            }
            // If the column is of Array Type
            else if (columnType.ArrayColumnInfo != null) {
                const arrayValues = datum.ArrayValue;
                return `\"${info.Name.split(':')[0]}\":${parseArray(info.Type.ArrayColumnInfo, arrayValues)}`;
            }
            // If the column is of Row Type
            else if (columnType.RowColumnInfo != null) {
                const rowColumnInfo = info.Type.RowColumnInfo;
                const rowValues = datum.RowValue;
                return parseRow(rowColumnInfo, rowValues);
            }
            // If the column is of Scalar Type
            else {
                return parseScalarType(info, datum);
            }
        }
        function parseTimeSeries(info, datum) {
            const timeSeriesOutput = [];
            datum.TimeSeriesValue.forEach(function (dataPoint) {
                timeSeriesOutput.push(`{time=${dataPoint.Time}, value=${parseDatum(info.Type.TimeSeriesMeasureValueColumnInfo, dataPoint.Value)}}`);
            });
            return `[${timeSeriesOutput.join(", ")}]`;
        }
        function parseScalarType(info, datum) {
            return parseColumnName(info) + "\"" + datum.ScalarValue + "\"";
        }
        function parseColumnName(info) {
            if (info.Name.split('::')[1] != null)
                return info.Name == null ? "" : `\"${info.Name.split('::')[0] + '_' + info.Name.split('::')[1]}\":`;
            else
                return info.Name == null ? "" : `\"${info.Name}\":`;
        }
        function parseArray(arrayColumnInfo, arrayValues) {
            const arrayOutput = [];
            arrayValues.forEach(function (datum) {
                arrayOutput.push(parseDatum(arrayColumnInfo, datum));
            });
            return `[${arrayOutput.join(", ")}]`;
        }

        exports.handler = async (event) => {
            switch (event.info.fieldName) {
                case "getSensorDataUsingLambdaResolver":
                    return await (0, getSensorDataUsingLambdaResolver)(event.arguments.durationInMinutes);
                default:
                    return null;
            }
        };

  ScheduledFunction:
    Type: AWS::Serverless::Function
    Metadata:
      cfn_nag:
        rules_to_suppress:
          - id: W89
            reason: VPC not required.
          - id: W92
            reason: Reserved Concurrency not required.
          - id: W11
            reason: Resource level permissions not available for this API.
    Properties:
      Runtime: python3.9
      MemorySize: 1024
      Timeout: 5
      Handler: index.lambda_handler
      Environment:
        Variables:
          TIMESTREAM_DB_NAME: !Ref iotstreamDatabase
          TIMESTREAM_TABLE_NAME: !GetAtt sensoreventsTable.Name
      Policies:
        - Version: 2012-10-17
          Statement:
            - Effect: Allow
              Action:
                - timestream:DescribeEndpoints
              Resource: "*"
            - Effect: Allow
              Action:
                - timestream:WriteRecords
              Resource: !Sub "arn:${AWS::Partition}:timestream:${AWS::Region}:${AWS::AccountId}:database/${iotstreamDatabase}/table/${sensoreventsTable.Name}"
      Events:
        ScheduleEvent:
          Type: Schedule
          Properties:
            Schedule: rate(2 minutes)
      InlineCode: |
        #!/usr/bin/python

        import boto3
        from botocore.config import Config
        import json
        #import random
        import time
        import os


        DATABASE_NAME = os.getenv('TIMESTREAM_DB_NAME')
        TABLE_NAME = os.getenv('TIMESTREAM_TABLE_NAME')


        class IngestTimestream:
            def __init__(self, database_name, table_name, write_client):
                self.database_name = database_name
                self.table_name = table_name
                self.write_client = write_client
                

            @staticmethod
            def print_rejected_records_exceptions(err):
                print("RejectedRecords: ", err)
                for rr in err.response["RejectedRecords"]:
                    print("Rejected Index " + str(rr["RecordIndex"]) + ": " + rr["Reason"])
                    if "ExistingVersion" in rr:
                        print("Rejected record existing version: ", rr["ExistingVersion"])
            
            @staticmethod
            def current_milli_time():
                return str(int(round(time.time() * 1000)))
                
            def write_records(self):
                
                current_time = IngestTimestream.current_milli_time()
                
                fuel_in_litres= 10 #random.randint(10,80)
                gps_location_latlong='55.3618,-3.4433'
                
                dimensions = [
                    {'Name': 'fleet', 'Value': "acme_fleet"},
                    {'Name': 'fuel_capacity_in_litres', 'Value': str(80)},
                    {'Name': 'load_capacity_in_tons', 'Value': str(20)},
                    {'Name': 'make', 'Value': "volvo"},
                    {'Name': 'model', 'Value': "v2"},
                    {'Name': 'truck_id', 'Value': "truck123"}
                ]

                common_attributes = {
                    'Dimensions': dimensions,
                    'Time': current_time
                }

                current_fuel_lvl_in_litres = {
                    'Name': 'current_fuel_lvl_in_litres',
                    'Value': str(fuel_in_litres),
                    'Type': 'BIGINT'
                }

                gps_location_latlong = {
                    'Name': 'gps_location_latlong',
                    'Value': str(gps_location_latlong),
                    'Type': 'VARCHAR'
                }

                computational_record = {
                    'MeasureName': 'computational_record',
                    'MeasureValues': [current_fuel_lvl_in_litres,gps_location_latlong],
                    'MeasureValueType': 'MULTI'
                }
                
                

                records = [computational_record]

                try:
                    result = self.write_client.write_records(DatabaseName=self.database_name, TableName=self.table_name, Records=records, CommonAttributes=common_attributes)
                    if result and result['ResponseMetadata']:
                        print("WriteRecords Status: [%s]" % result['ResponseMetadata']['HTTPStatusCode'])
                except self.write_client.exceptions.RejectedRecordsException as err:
                    IngestTimestream.print_rejected_records_exceptions(err)
                except Exception as err:
                    print("Error:", err)



        def lambda_handler(event, context):

            session = boto3.Session();
            write_client = session.client('timestream-write',
                                          config=Config(region_name='eu-west-1', read_timeout=20, max_pool_connections=5000,retries={'max_attempts': 10}))

            
            ingestTimestream=IngestTimestream(DATABASE_NAME, TABLE_NAME, write_client)
            ingestTimestream.write_records()
            
            return {
                'statusCode': 200,
                'body': json.dumps('Hello from simulator!')
            }

Outputs:
  GraphQLAPIURL:
    Value: !GetAtt iotTimestreamAppSyncApi.GraphQLUrl

  GraphQLAPIKey:
    Value: !GetAtt ApiKey.ApiKey

  StackRegion:
    Value: !Ref AWS::Region

  TimestreamDatabaseARN:
    Value: !GetAtt iotstreamDatabase.Arn

  TableARN:
    Value: !GetAtt sensoreventsTable.Arn