AWSTemplateFormatVersion: "2010-09-09" Transform: AWS::Serverless-2016-10-31 Description: SFW-Module-14 Globals: Function: CodeUri: functions/temps/ Runtime: python3.9 Timeout: 120 Architectures: - arm64 Resources: # Resources for the state machine that copies data from the public NOAA bucket DistributedMapWorkshopDataset: Type: AWS::S3::Bucket Properties: AccessControl: Private BucketEncryption: ServerSideEncryptionConfiguration: - ServerSideEncryptionByDefault: SSEAlgorithm: AES256 VersioningConfiguration: Status: Enabled # Resources for the NOAA data state machine DistributedMapResultsBucket: Type: AWS::S3::Bucket Properties: AccessControl: Private BucketEncryption: ServerSideEncryptionConfiguration: - ServerSideEncryptionByDefault: SSEAlgorithm: AES256 VersioningConfiguration: Status: Enabled ResultsDynamoDBTable: Type: AWS::Serverless::SimpleTable Properties: PrimaryKey: Name: pk Type: String TemperatureStateMachineRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Principal: AWS: !Ref "AWS::AccountId" Service: - states.amazonaws.com Action: - "sts:AssumeRole" ManagedPolicyArns: - arn:aws:iam::aws:policy/CloudWatchFullAccess - arn:aws:iam::aws:policy/AWSXrayFullAccess Policies: - PolicyName: ReadDataPolicy PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - S3:GetObject - S3:ListBucket Resource: - !GetAtt DistributedMapWorkshopDataset.Arn - !Join ["/", [!GetAtt DistributedMapWorkshopDataset.Arn, "*"]] - PolicyName: WriteResultsPolicy PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - S3:PutObject Resource: !Join ["/", [!GetAtt DistributedMapResultsBucket.Arn, "*"]] - PolicyName: StartExecutionPolicy PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - states:StartExecution Resource: !Sub "arn:aws:states:${AWS::Region}:${AWS::AccountId}:stateMachine:DistributedMap-WeatherAnalysis" - PolicyName: InvokeMapperReducerPolicy PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - lambda:InvokeFunction Resource: - !Join [":", [!GetAtt TemperaturesFunction.Arn, "*"]] - !Join [":", [!GetAtt ReducerFunction.Arn, "*"]] TemperaturesFunction: Type: AWS::Serverless::Function Properties: InlineCode: | # ----------------------------------------------------------- # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: MIT-0 # # Permission is hereby granted, free of charge, to any person obtaining a copy of this # software and associated documentation files (the "Software"), to deal in the Software # without restriction, including without limitation the rights to use, copy, modify, # merge, publish, distribute, sublicense, and/or sell copies of the Software, and to # permit persons to whom the Software is furnished to do so. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, # INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A # PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT # HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE # SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. # ----------------------------------------------------------- import json import os import csv import io from datetime import datetime from decimal import Decimal from typing import Dict, List import boto3 S3_CLIENT = boto3.client("s3") def lambda_handler(event: dict, context): """Handler that will find the weather station that has the highest average temperature by month. Returns a dictionary with "year-month" as the key and dictionary (weather station info) as value. """ input_bucket_name = os.environ["INPUT_BUCKET_NAME"] high_by_month: Dict[str, Dict] = {} for item in event["Items"]: csv_data = get_file_from_s3(input_bucket_name, item["Key"]) dict_data = get_csv_dict_from_string(csv_data) for row in dict_data: avg_temp = float(row["TEMP"]) date = datetime.fromisoformat(row["DATE"]) month_str = date.strftime("%Y-%m") monthly_high_record = high_by_month.get(month_str) or {} if not monthly_high_record: row["TEMP"] = avg_temp high_by_month[month_str] = row continue if avg_temp > float(monthly_high_record["TEMP"]): high_by_month[month_str] = row return high_by_month def reducer_handler(event: dict, context: dict): """Reducer function will read all of the mapped results from S3 and write to DDB. Args: event (dict): The event payload that arrives after the distributed map run has the folllowing structure: { "MapRunArn": "arn-of-the-map-run", "ResultWriterDetails": { "Bucket": "bucket-name-where-results-are-written", "Key": "results/dee8fb57-3653-3f09-88dd-4f39225d2367/manifest.json", }, } context (dict): Lambda context """ print(event) results_bucket = event["ResultWriterDetails"]["Bucket"] manifest = get_file_from_s3( results_bucket, event["ResultWriterDetails"]["Key"], ) maniftest_json = json.loads(manifest) high_by_month: Dict[str, Dict] = {} for result in maniftest_json["ResultFiles"].get("SUCCEEDED", []): results = get_file_from_s3(results_bucket, result["Key"]) for json_result in json.loads(results): monthly_highs: Dict[str, Dict] = json.loads(json_result["Output"]) for month_str, row in monthly_highs.items(): high_temp = float(row["TEMP"]) monthly_high = high_by_month.get(month_str) if not monthly_high: high_by_month[month_str] = row continue if high_temp > float(monthly_high["TEMP"]): high_by_month[month_str] = row _write_results_to_ddb(high_by_month) def _write_results_to_ddb(high_by_month: Dict[str, Dict]): dynamodb = boto3.resource("dynamodb") table = dynamodb.Table(os.environ["RESULTS_DYNAMODB_TABLE_NAME"]) for month_str, row in high_by_month.items(): row["pk"] = month_str row["TEMP"] = round(Decimal(row["TEMP"]), 1) table.put_item(Item=row) def get_file_from_s3(input_bucket_name: str, key: str) -> str: resp = S3_CLIENT.get_object(Bucket=input_bucket_name, Key=key) return resp["Body"].read().decode("utf-8") def get_csv_dict_from_string(csv_string: str) -> dict: return csv.DictReader(io.StringIO(csv_string)) Handler: index.lambda_handler MemorySize: 2048 Environment: Variables: INPUT_BUCKET_NAME: !Ref DistributedMapWorkshopDataset Policies: - S3ReadPolicy: BucketName: !Ref DistributedMapWorkshopDataset ReducerFunction: Type: AWS::Serverless::Function Properties: InlineCode: | # ----------------------------------------------------------- # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: MIT-0 # # Permission is hereby granted, free of charge, to any person obtaining a copy of this # software and associated documentation files (the "Software"), to deal in the Software # without restriction, including without limitation the rights to use, copy, modify, # merge, publish, distribute, sublicense, and/or sell copies of the Software, and to # permit persons to whom the Software is furnished to do so. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, # INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A # PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT # HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE # SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. # ----------------------------------------------------------- import json import os import csv import io from datetime import datetime from decimal import Decimal from typing import Dict, List import boto3 S3_CLIENT = boto3.client("s3") def lambda_handler(event: dict, context): """Handler that will find the weather station that has the highest average temperature by month. Returns a dictionary with "year-month" as the key and dictionary (weather station info) as value. """ input_bucket_name = os.environ["INPUT_BUCKET_NAME"] high_by_month: Dict[str, Dict] = {} for item in event["Items"]: csv_data = get_file_from_s3(input_bucket_name, item["Key"]) dict_data = get_csv_dict_from_string(csv_data) for row in dict_data: avg_temp = float(row["TEMP"]) date = datetime.fromisoformat(row["DATE"]) month_str = date.strftime("%Y-%m") monthly_high_record = high_by_month.get(month_str) or {} if not monthly_high_record: row["TEMP"] = avg_temp high_by_month[month_str] = row continue if avg_temp > float(monthly_high_record["TEMP"]): high_by_month[month_str] = row return high_by_month def reducer_handler(event: dict, context: dict): """Reducer function will read all of the mapped results from S3 and write to DDB. Args: event (dict): The event payload that arrives after the distributed map run has the folllowing structure: { "MapRunArn": "arn-of-the-map-run", "ResultWriterDetails": { "Bucket": "bucket-name-where-results-are-written", "Key": "results/dee8fb57-3653-3f09-88dd-4f39225d2367/manifest.json", }, } context (dict): Lambda context """ print(event) results_bucket = event["ResultWriterDetails"]["Bucket"] manifest = get_file_from_s3( results_bucket, event["ResultWriterDetails"]["Key"], ) maniftest_json = json.loads(manifest) high_by_month: Dict[str, Dict] = {} for result in maniftest_json["ResultFiles"].get("SUCCEEDED", []): results = get_file_from_s3(results_bucket, result["Key"]) for json_result in json.loads(results): monthly_highs: Dict[str, Dict] = json.loads(json_result["Output"]) for month_str, row in monthly_highs.items(): high_temp = float(row["TEMP"]) monthly_high = high_by_month.get(month_str) if not monthly_high: high_by_month[month_str] = row continue if high_temp > float(monthly_high["TEMP"]): high_by_month[month_str] = row _write_results_to_ddb(high_by_month) def _write_results_to_ddb(high_by_month: Dict[str, Dict]): dynamodb = boto3.resource("dynamodb") table = dynamodb.Table(os.environ["RESULTS_DYNAMODB_TABLE_NAME"]) for month_str, row in high_by_month.items(): row["pk"] = month_str row["TEMP"] = round(Decimal(row["TEMP"]), 1) table.put_item(Item=row) def get_file_from_s3(input_bucket_name: str, key: str) -> str: resp = S3_CLIENT.get_object(Bucket=input_bucket_name, Key=key) return resp["Body"].read().decode("utf-8") def get_csv_dict_from_string(csv_string: str) -> dict: return csv.DictReader(io.StringIO(csv_string)) Handler: index.reducer_handler MemorySize: 2048 Environment: Variables: RESULTS_BUCKET_NAME: !Ref DistributedMapResultsBucket RESULTS_DYNAMODB_TABLE_NAME: !Ref ResultsDynamoDBTable Policies: - S3ReadPolicy: BucketName: !Ref DistributedMapResultsBucket - DynamoDBWritePolicy: TableName: !Ref ResultsDynamoDBTable FunctionS3Create: Type: 'AWS::Serverless::Function' Properties: InlineCode: | /*! Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * SPDX-License-Identifier: MIT-0 */ // Lambda function used as CloudFormation custom resource to create an S3 object. //const AWS = require("aws-sdk"); //const S3 = new AWS.S3(); // Get service clients module and commands using ES6 syntax. const { S3Client, ListObjectsCommand, CopyObjectCommand, DeleteObjectCommand } = require("@aws-sdk/client-s3"); const response = require("./cfn-response.js"); const sourceClient = new S3Client({region: "us-east-1"}); const targetClient = new S3Client({region: process.env.AWS_REGION}); function send(event, context, responseStatus, responseData, physicalResourceId, noEcho) { try { const https = require("https"); const { URL } = require("url"); const responseBody = { Status: responseStatus, Reason: "See the details in CloudWatch Log Stream: " + context.logStreamName, PhysicalResourceId: context.logStreamName, StackId: event.StackId, RequestId: event.RequestId, LogicalResourceId: event.LogicalResourceId, NoEcho: false, Data: responseData, }; console.log("Response body:\n", JSON.stringify(responseBody)); const parsedUrl = new URL(event.ResponseURL); const requestOptions = { hostname: parsedUrl.hostname, port: 443, path: parsedUrl.pathname + parsedUrl.search, method: "PUT", headers: { "content-type": "", "content-length": JSON.stringify(responseBody).length, }, }; console.log("Request options:\n", JSON.stringify(requestOptions)); // Send response back to CloudFormation return new Promise((resolve, reject) => { const request = https.request(requestOptions, function (response) { response.on("data", () => {}); response.on("end", () => { console.log("Status code: ", response.statusCode); console.log("Status message: ", response.statusMessage); resolve("Success"); }); }); request.on("error", (e) => { console.error(e); reject("Error"); }); request.write(JSON.stringify(responseBody)); request.end(); }); } catch (error) { console.error("Error in cfn_response:\n", error); return; } }; exports.handler = async (event, context) => { console.log("Event:\n", JSON.stringify(event)); let responseData = {}; let responseStatus = response.FAILED; // CloudFormation custom resource request types: https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/crpg-ref-requesttypes.html if (event.RequestType == "Delete") { // If the CloudFormation stack is deleted: try { // To allow the CloudFormation process to continue regardless of the delete operation response, // set the responseStatus to SUCCESS before the delete operation begins. responseStatus = response.SUCCESS; // Required resource properties to delete an S3 object: const bucketParams = { Bucket: process.env.BUCKET_NAME } const s3Contents = await targetClient.send(new ListObjectsCommand(bucketParams)); for (let i = 0; i < s3Contents["Contents"].length; i += 1) { const params = { Bucket: process.env.BUCKET_NAME, CopySource: `${process.env.BUCKET_NAME}/${s3Contents["Contents"][i]["Key"]}`, Key: s3Contents["Contents"][i]["Key"] }; const data = await targetClient.send(new DeleteObjectCommand(params)); console.log(data) } } catch (error) { console.error("Error during S3 delete:\n", error); } } else { // If the CloudFormation stack is created or updated: try { // Required resource properties to list/copy an S3 object: const bucketParams = { Bucket: "noaa-gsod-pds" } const s3Copy = await sourceClient.send(new ListObjectsCommand(bucketParams)); console.log(s3Copy) for (let i = 0; i < s3Copy["Contents"].length; i += 1) { const params = { Bucket: process.env.BUCKET_NAME, CopySource: `noaa-gsod-pds/${s3Copy["Contents"][i]["Key"]}`, Key: s3Copy["Contents"][i]["Key"] }; const data = await targetClient.send(new CopyObjectCommand(params)); console.log(data) } // Response data that is sent back to CloudFormation: responseData = {"Response": "Response"} responseStatus = response.SUCCESS; } catch (error) { console.error("Error during S3 upload:\n", error); } } await send(event, context, responseStatus, responseData); return; }; Runtime: nodejs18.x MemorySize: 1024 Timeout: 600 Handler: index.handler Environment: Variables: BUCKET_NAME: !Ref DistributedMapWorkshopDataset Policies: - S3CrudPolicy: BucketName: !Ref DistributedMapWorkshopDataset - S3ReadPolicy: BucketName: noaa-gsod-pds CustomResourceS3Create: Type: 'Custom::S3Create' Properties: ServiceToken: !GetAtt FunctionS3Create.Arn Outputs: DynamoDBTableName: Description: DynamoDB table name where final results are written Value: !Ref ResultsDynamoDBTable DistributedMapWorkshopDataset: Description: Bucket where the NOAA data will be copied, and where the analysis will read Value: !Ref DistributedMapWorkshopDataset StateMachineResultsBucket: Description: Bucket where the distributed map run will write results Value: !Ref DistributedMapResultsBucket