AWSTemplateFormatVersion: 2010-09-09 Description: Wild Rydes ML data processing infrastructure Resources: DataBucket: Type: AWS::S3::Bucket DependsOn: IngestUnicornRawDataFunction Properties: BucketName: !Sub "${AWS::StackName}-databucket-${AWS::AccountId}" Tags: - Key: Workshop Value: Wild Rydes - Key: Module Value: Machine Learning NotificationConfiguration: LambdaConfigurations: - Function: !GetAtt IngestUnicornRawDataFunction.Arn Event: "s3:ObjectCreated:*" Filter: S3Key: Rules: - Name: prefix Value: raw/ - Name: suffix Value: json S3DataBucketInvokeLambdaPermission: Type: AWS::Lambda::Permission Properties: Action: lambda:InvokeFunction FunctionName: !Ref IngestUnicornRawDataFunction Principal: s3.amazonaws.com SourceAccount: !Ref AWS::AccountId SourceArn: !Sub "arn:aws:s3:::${AWS::StackName}-databucket-${AWS::AccountId}" DataProcessingExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: sts:AssumeRole Policies: - PolicyName: "AllowLambdaFunctionality" PolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Action: "*" Resource: - !Sub "arn:aws:s3:::${AWS::StackName}-databucket-${AWS::AccountId}/*" - !Sub "arn:aws:s3:::${AWS::StackName}-databucket-${AWS::AccountId}" - !GetAtt TransformAndMapDataFunctionDLQ.Arn - !GetAtt IngestUnicornRawDataFunctionDLQ.Arn - !GetAtt IngestedRawDataFanOutQueue.Arn - PolicyName: "AllowLogging" PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: ['logs:*'] Resource: 'arn:aws:logs:*:*:*' TransformAndMapDataFunctionDLQ: Type: "AWS::SQS::Queue" Properties: Tags: - Key: Workshop Value: Wild Rydes - Key: Module Value: Machine Learning IngestUnicornRawDataFunctionDLQ: Type: "AWS::SQS::Queue" Properties: Tags: - Key: Workshop Value: Wild Rydes - Key: Module Value: Machine Learning IngestedRawDataFanOutQueue: Type: "AWS::SQS::Queue" Properties: Tags: - Key: Workshop Value: Wild Rydes - Key: Module Value: Machine Learning IngestUnicornRawDataFunction: Type: AWS::Lambda::Function Properties: Code: ZipFile: | import boto3 import json import os # Function Name: # Process Unicorn Data (function #1) # Function Path: # MachineLearning/1_DataProcessing/lambda-functions/1-process-s3-event-fan-out/index.py client = boto3.client('s3') sqs = boto3.client('sqs') queue_url = os.environ['OUTPUT_QUEUE'] def send_to_sqs(json_data): data = json_data if 'travel_data' in json_data: data = json_data['travel_data'] counter = 0 for entry in data: lower_cased = {k.lower(): v for k, v in entry.items()} response = sqs.send_message( QueueUrl=queue_url, DelaySeconds=0, MessageBody=json.dumps(lower_cased) ) counter = counter + 1 print('Sent ', counter, ' data entries') print('all rows sent to sqs') def lambda_handler(event, context): for record in event['Records']: bucket = record['s3']['bucket']['name'] key = record['s3']['object']['key'] content_object = client.get_object(Bucket=bucket, Key=key) file_content = content_object['Body'].read().decode('utf-8') json_content = json.loads(file_content) send_to_sqs(json_content) Description: Wild Rydes lambda function to parse unicorn JSON data and send downstream to lookup nearest groundstation Handler: index.lambda_handler DeadLetterConfig: TargetArn: !GetAtt IngestUnicornRawDataFunctionDLQ.Arn MemorySize: 256 Environment: Variables: OUTPUT_QUEUE: !Ref IngestedRawDataFanOutQueue Role: !GetAtt DataProcessingExecutionRole.Arn Runtime: python3.7 Timeout: 900 Tags: - Key: Workshop Value: Wild Rydes - Key: Module Value: Machine Learning TransformAndMapDataFunction: Type: AWS::Lambda::Function Properties: Code: ZipFile: | # https://docs.aws.amazon.com/lambda/latest/dg/python-programming-model-handler-types.html # Function Name: # Find Closest Groundstation (function B) # Function Path: # MachineLearning/1_DataProcessing/lambda-functions/2-transform-data-output-to-s3/lambda_function.py from math import cos, asin, sqrt import boto3 import json import os s3 = boto3.client('s3') bucket = os.environ['OUTPUT_BUCKET'] def lambda_handler(event, context): print('## EVENT') print(event) for record in event['Records']: json_event = json.loads(record['body']) json_event = event_format_check(json_event) json_event["groundstation"] = closest(groundstations(), {"latitude": json_event["latitude"], "longitude": json_event["longitude" ]})["id"] print('Closest weatherstation for ', json_event["latitude"], ', ', json_event["longitude"], ' is ', json_event["groundstation"]) # events look like: # {"name": "Shadowfax", "statustime": "2019-05-07 20:11:04.247", "latitude": 41.441963, "longitude": -73.574745, "distance": "29.212845", "healthpoints": "217", "magicpoints": "112", "groundstation": "USC00305799"} send_event_to_s3(json_event, context) return event def event_format_check(event): event["latitude"] = float(event["latitude"]) event["longitude"] = float(event["longitude"]) return event def distance(lat1, lon1, lat2, lon2): p = 0.017453292519943295 a = 0.5 - cos((lat2-lat1)*p)/2 + cos(lat1*p)*cos(lat2*p) * (1-cos((lon2-lon1)*p)) / 2 return 12742 * asin(sqrt(a)) def closest(listOfStations, target): return min(listOfStations, key=lambda p: distance(target['latitude'],target['longitude'],p['latitude'],p['longitude'])) def groundstations(): return [{"id": "US1NYNY0074", "latitude": 40.7969, "longitude": -73.9330, "elevation": 6.1}, {"id": "USW00014732", "latitude": 40.7794, "longitude": -73.8803, "elevation": 3.4}, {"id": "USW00094728", "latitude": 40.7789, "longitude": -73.9692, "elevation": 39.6}, {"id": "USW00094789", "latitude": 40.6386, "longitude": -73.7622, "elevation": 3.4}] def send_event_to_s3(event, context): key = get_key_from_event_json(event) body = get_body_from_event_json(event) upload_file(body, key) def get_key_from_event_json(event): return 'processed/' + event["statustime"] + '.csv' def get_body_from_event_json(event): body = ",".join(event.keys()) + "\n" body = body + ",".join(map(str, event.values())) return body def upload_file(body, key): s3.put_object(Body=body, Bucket=bucket, Key=key) Description: Wild Rydes lambda function to look up nearest weather groundstation based on lat/long Handler: index.lambda_handler DeadLetterConfig: TargetArn: !GetAtt TransformAndMapDataFunctionDLQ.Arn MemorySize: 128 Environment: Variables: OUTPUT_BUCKET: !Ref DataBucket Role: !GetAtt DataProcessingExecutionRole.Arn Runtime: python3.7 Timeout: 3 Tags: - Key: Workshop Value: Wild Rydes - Key: Module Value: Machine Learning LookUpNearestGroundstationEventSource: Type: AWS::Lambda::EventSourceMapping Properties: BatchSize: 10 Enabled: True EventSourceArn: !GetAtt IngestedRawDataFanOutQueue.Arn FunctionName: !Ref TransformAndMapDataFunction ##### # CloudWatch Dashboard ##### CloudWatchDashboard: Type: "AWS::CloudWatch::Dashboard" Properties: DashboardName: !Sub "Wild_Rydes_Machine_Learning_${AWS::StackName}" DashboardBody: !Sub - | { "widgets": [{ "type": "metric", "x": 0, "y": 0, "width": 12, "height": 12, "properties": { "metrics": [ ["AWS/Lambda", "Invocations", "FunctionName", "${IngestUnicornRawDataFunction}"], ["...", "${TransformAndMapDataFunction}"] ], "view": "timeSeries", "stacked": false, "region": "${AWS::Region}", "stat": "Sum", "period": 60 } }, { "type": "metric", "x": 12, "y": 0, "width": 12, "height": 6, "properties": { "metrics": [ ["AWS/SQS", "NumberOfMessagesSent", "QueueName", "${IngestedRawDataFanOutQueue}"] ], "view": "timeSeries", "stacked": false, "region": "${AWS::Region}", "stat": "Sum", "period": 300 } }, { "type": "metric", "x": 12, "y": 6, "width": 12, "height": 6, "properties": { "metrics": [ ["AWS/SQS", "NumberOfEmptyReceives", "QueueName", "${IngestedRawDataFanOutQueue}"] ], "view": "timeSeries", "stacked": false, "region": "${AWS::Region}", "stat": "Sum", "period": 300 } }] } - { IngestUnicornRawDataFunction: !Ref IngestUnicornRawDataFunction, TransformAndMapDataFunction: !Ref TransformAndMapDataFunction, IngestedRawDataFanOutQueue: !GetAtt IngestedRawDataFanOutQueue.QueueName } Outputs: DataBucketName: Value: !Ref DataBucket