AWSTemplateFormatVersion: "2010-09-09" Transform: "AWS::Serverless-2016-10-31" Description: "AWS Data Exchange automated revision download upon publish Cloudwatch event((fdp-mlmp-adx)" Metadata: AWS::CloudFormation::Interface: ParameterGroups: - Label: default: "AWS Data Exchange - Revision Automation" Parameters: - DatasetID - RevisionID ParameterLabels: DatasetID: default: "Which Data set to set-up automated export for?" RevisionID: default: "Which Revision to export immediately?" Parameters: # Replace with data set ID that is in a Product subscribed to DatasetID: Type: String Description: "REQUIRED: the ID for the data set." Default: "" SubnetId1: Type: AWS::EC2::Subnet::Id Description: Subnet Id for Lambda function SubnetId2: Type: AWS::EC2::Subnet::Id Description: Subnet Id for Lambda function SecurityGroupId: Type: AWS::EC2::SecurityGroup::Id Description: Security Group Id for Lambda function to Execute RevisionID: Type: String Description: "OPTIONAL: ID for an initial Revision to download immediately." Default: "" RoleGetNewRevisionArn: Type: String Description: "ARN of the IAM role to be assumed by the lambda function" Default: "" DataS3Bucket: Description: S3Bucket of the dataset Type: String Resources: # Lambda Layer for dataexchangesdk dataexchangesdk: Type: AWS::Serverless::Application Properties: Location: ApplicationId: "arn:aws:serverlessrepo:us-east-1:305705277353:applications/dataexchangesdk" SemanticVersion: 1.17.4 # Lambda functions FunctionGetNewRevision: Type: "AWS::Lambda::Function" Properties: VpcConfig: SecurityGroupIds: - !Ref SecurityGroupId SubnetIds: - !Ref SubnetId1 - !Ref SubnetId2 Layers: - !GetAtt dataexchangesdk.Outputs.LayerArn Code: ZipFile: | import os os.environ['AWS_DATA_PATH'] = '/opt/' from itertools import islice import boto3 from datetime import datetime import time import json import cfnresponse region = os.environ['AWS_REGION'] destination_bucket = os.environ['S3_BUCKET'] if not destination_bucket: raise Exception("'S3_BUCKET' environment variable must be defined!") def handler(event, context): #Let AWS Cloudformation know its request succeeded if 'RequestType' in event: responseData = {} cfnresponse.send(event, context, cfnresponse.SUCCESS, responseData, "CustomResourcePhysicalID") dataexchange = boto3.client( service_name='dataexchange', region_name=region ) s3 = boto3.client( service_name='s3', region_name=region ) #If the request is from Cloudformation custom resource get the RevisionID, for first revision if 'ResourceProperties' in event: data_set_id = event['ResourceProperties']['data_set_id'] revision_ids = [event['ResourceProperties']['RevisionIds']] print ("Initial revision retrieval") print (event) else: data_set_id = event['resources'][0] revision_ids = event['detail']['RevisionIds'] print ("Triggered revision retrieval") print (event) # Used to store the Ids of the Jobs exporting the assets to S3. job_ids = set() for revision_id in revision_ids: # Create Jobs to export each revision to S3. export_job = dataexchange.create_job( Type='EXPORT_REVISIONS_TO_S3', Details={ 'ExportRevisionsToS3': { 'DataSetId': data_set_id, 'RevisionDestinations': [ { 'RevisionId': revision_id, 'Bucket': destination_bucket } ] } } ) # Start the Job and save the JobId. dataexchange.start_job(JobId=export_job['Id']) job_ids.add(export_job['Id']) # Iterate until all remaining workflow have reached a terminal state, or an error is found. completed_jobs = set() while job_ids != completed_jobs: for job_id in job_ids: if job_id in completed_jobs: continue get_job_response = dataexchange.get_job(JobId=job_id) if get_job_response['State'] == 'COMPLETED': print ("Job {} completed".format(job_id)) completed_jobs.add(job_id) if get_job_response['State'] == 'ERROR': job_errors = get_job_response['Errors'] raise Exception('JobId: {} failed with errors:\n{}'.format(job_id, job_errors)) # Sleep to ensure we don't get throttled by the GetJob API. time.sleep(0.2) return { 'statusCode': 200, 'body': json.dumps('All jobs completed.') } Handler: "index.handler" Environment: Variables: S3_BUCKET: !Ref DataS3Bucket Role: !Ref RoleGetNewRevisionArn Runtime: "python3.8" Timeout: 180 #Invoke Lambda to get First Revision FirstRevision: Type: "Custom::FirstInvoke" DeletionPolicy: Retain Properties: ServiceToken: !GetAtt "FunctionGetNewRevision.Arn" data_set_id: !Ref "DatasetID" RevisionIds: !Ref "RevisionID" #Cloudwatch Event Rule S3 Dataset bucket put object to Trigger Revision Lambda NewRevisionEventRule: Type: AWS::Events::Rule Properties: Description: "New Revision Event" EventPattern: source: - "aws.dataexchange" detail-type: - "Revision Published To Data Set" resources: - !Ref DatasetID State: "ENABLED" Targets: - Arn: !GetAtt - FunctionGetNewRevision - Arn Id: "TargetGetNewRevision" LambdaInvokePermission: Type: AWS::Lambda::Permission Properties: FunctionName: !GetAtt - FunctionGetNewRevision - Arn Action: "lambda:InvokeFunction" Principal: events.amazonaws.com SourceArn: !GetAtt - NewRevisionEventRule - Arn Outputs: S3DataPath: Value: !Join [ "", [ 's3://', !Ref DataS3Bucket,'/adx_free_data_sample/' ] ]