# Building an Automated Pipeline for Amazon Forecast
When a file is put in S3 with data for training, build a pipeline that automatically performs data import, training, forecasting, and exporting of predictions in Amazon Forecast.
We use Step Functions and Lambda.

# Configure SageMaker role
Attach policies for the services you use and also set up trust relationships, as shown in the figure below.

![IAMroles_Permissions](https://user-images.githubusercontent.com/27226946/89102049-7d18e380-d440-11ea-91a6-6ab2c7e63870.png)

![IAMroles_TrustRelationships](https://user-images.githubusercontent.com/27226946/89102054-84d88800-d440-11ea-9199-0583be09aa1c.png)

Edit trust relationship should be written as follows.

In [None]:
{
 "Version": "2012-10-17",
 "Statement": [
 {
 "Effect": "Allow",
 "Principal": {
 "Service": "sagemaker.amazonaws.com"
 },
 "Action": "sts:AssumeRole"
 },
 {
 "Effect": "Allow",
 "Principal": {
 "Service": "forecast.amazonaws.com"
 },
 "Action": "sts:AssumeRole"
 },
 {
 "Effect": "Allow",
 "Principal": {
 "Service": "lambda.amazonaws.com"
 },
 "Action": "sts:AssumeRole"
 },
 {
 "Effect": "Allow",
 "Principal": {
 "Service": "events.amazonaws.com"
 },
 "Action": "sts:AssumeRole"
 },
 {
 "Effect": "Allow",
 "Principal": {
 "Service": "states.amazonaws.com"
 },
 "Action": "sts:AssumeRole"
 }
 ]
}

## Getting SageMaker role

In [1]:
from sagemaker import get_execution_role

role_sm = get_execution_role()

In [2]:
role_sm

'arn:aws:iam::805433377179:role/service-role/AmazonSageMaker-ExecutionRole-20200716T084970'

# 1.make Lambda function
We will create a Lambda function. The files we will use are located in lambdas/.

In [3]:
import boto3

In [4]:
lambda_ = boto3.client('lambda')

In [5]:
!rm -f lambdas/createdatasetimport/datasetimport.zip
!cd lambdas/createdatasetimport; zip -r datasetimport .

zip_file = open("lambdas/createdatasetimport/datasetimport.zip", "rb").read()


lambda_.create_function(
 FunctionName="datasetimport",
 Runtime="python3.7",
 Role=role_sm,
 Handler="datasetimport.lambda_handler",
 Code={"ZipFile": zip_file},
 Timeout=60*15,
 MemorySize=3008
)

 adding: datasetimport.py (deflated 54%)


{'ResponseMetadata': {'RequestId': 'b45fbdeb-4c54-4706-aa18-5184e4ceff20',
 'HTTPStatusCode': 201,
 'HTTPHeaders': {'date': 'Sat, 08 Aug 2020 05:50:40 GMT',
 'content-type': 'application/json',
 'content-length': '893',
 'connection': 'keep-alive',
 'x-amzn-requestid': 'b45fbdeb-4c54-4706-aa18-5184e4ceff20'},
 'RetryAttempts': 0},
 'FunctionName': 'datasetimport',
 'FunctionArn': 'arn:aws:lambda:us-east-1:805433377179:function:datasetimport',
 'Runtime': 'python3.7',
 'Role': 'arn:aws:iam::805433377179:role/service-role/AmazonSageMaker-ExecutionRole-20200716T084970',
 'Handler': 'datasetimport.lambda_handler',
 'CodeSize': 563,
 'Description': '',
 'Timeout': 900,
 'MemorySize': 3008,
 'LastModified': '2020-08-08T05:50:40.778+0000',
 'CodeSha256': 'PoxpcyDxzFemgk0SYn6f9o5ovhfiFrt0/6ql8O4JS58=',
 'Version': '$LATEST',
 'TracingConfig': {'Mode': 'PassThrough'},
 'RevisionId': 'a72e6c64-145e-44b1-a257-b1dfd80cd91b',
 'State': 'Active',
 'LastUpdateStatus': 'Successful'}

In [6]:
!rm -f lambdas/GetStatusImport/getstatusimport.zip
!cd lambdas/GetStatusImport; zip -r getstatusimport .

zip_file = open("lambdas/GetStatusImport/getstatusimport.zip", "rb").read()

lambda_.create_function(
 FunctionName="getstatusimport",
 Runtime="python3.7",
 Role=role_sm,
 Handler="getstatusimport.lambda_handler",
 Code={"ZipFile": zip_file},
 Timeout=60*15,
 MemorySize=3008
)

 adding: getstatusimport.py (deflated 46%)


{'ResponseMetadata': {'RequestId': '4b18b01c-e16e-49c0-b96c-e98fc546d614',
 'HTTPStatusCode': 201,
 'HTTPHeaders': {'date': 'Sat, 08 Aug 2020 05:50:41 GMT',
 'content-type': 'application/json',
 'content-length': '899',
 'connection': 'keep-alive',
 'x-amzn-requestid': '4b18b01c-e16e-49c0-b96c-e98fc546d614'},
 'RetryAttempts': 0},
 'FunctionName': 'getstatusimport',
 'FunctionArn': 'arn:aws:lambda:us-east-1:805433377179:function:getstatusimport',
 'Runtime': 'python3.7',
 'Role': 'arn:aws:iam::805433377179:role/service-role/AmazonSageMaker-ExecutionRole-20200716T084970',
 'Handler': 'getstatusimport.lambda_handler',
 'CodeSize': 387,
 'Description': '',
 'Timeout': 900,
 'MemorySize': 3008,
 'LastModified': '2020-08-08T05:50:41.497+0000',
 'CodeSha256': '4N7tY/qqjlNGRU9NEkinRSOSPECeFJT+uB15WGceE28=',
 'Version': '$LATEST',
 'TracingConfig': {'Mode': 'PassThrough'},
 'RevisionId': '9fc8bb6b-fe49-48f6-94e2-76f3edfa7243',
 'State': 'Active',
 'LastUpdateStatus': 'Successful'}

In [7]:
!rm -f lambdas/createpredictor/predictor.zip
!cd lambdas/createpredictor; zip -r predictor .

zip_file = open("lambdas/createpredictor/predictor.zip", "rb").read()

lambda_.create_function(
 FunctionName="predictor",
 Runtime="python3.7",
 Role=role_sm,
 Handler="predictor.lambda_handler",
 Code={"ZipFile": zip_file},
 Timeout=60*15,
 MemorySize=3008
)

 adding: predictor.py (deflated 56%)


{'ResponseMetadata': {'RequestId': '8daf3893-6ea8-4af8-ae72-ea3f12cd75a0',
 'HTTPStatusCode': 201,
 'HTTPHeaders': {'date': 'Sat, 08 Aug 2020 05:50:42 GMT',
 'content-type': 'application/json',
 'content-length': '881',
 'connection': 'keep-alive',
 'x-amzn-requestid': '8daf3893-6ea8-4af8-ae72-ea3f12cd75a0'},
 'RetryAttempts': 0},
 'FunctionName': 'predictor',
 'FunctionArn': 'arn:aws:lambda:us-east-1:805433377179:function:predictor',
 'Runtime': 'python3.7',
 'Role': 'arn:aws:iam::805433377179:role/service-role/AmazonSageMaker-ExecutionRole-20200716T084970',
 'Handler': 'predictor.lambda_handler',
 'CodeSize': 645,
 'Description': '',
 'Timeout': 900,
 'MemorySize': 3008,
 'LastModified': '2020-08-08T05:50:42.562+0000',
 'CodeSha256': 'ip8vS4n9cWDNFEfKspM/g8DXIi/it/l1ZcqgcsXsc2c=',
 'Version': '$LATEST',
 'TracingConfig': {'Mode': 'PassThrough'},
 'RevisionId': '0b7c6138-0747-4cb2-8d6c-a4da51330ab1',
 'State': 'Active',
 'LastUpdateStatus': 'Successful'}

In [8]:
!rm -f lambdas/GetStatusPredictor/getstatuspredictor.zip
!cd lambdas/GetStatusPredictor; zip -r getstatuspredictor .

zip_file = open("lambdas/GetStatusPredictor/getstatuspredictor.zip", "rb").read()

lambda_.create_function(
 FunctionName="getstatuspredictor",
 Runtime="python3.7",
 Role=role_sm,
 Handler="getstatuspredictor.lambda_handler",
 Code={"ZipFile": zip_file},
 Timeout=60*15,
 MemorySize=3008
)

 adding: getstatuspredictor.py (deflated 46%)


{'ResponseMetadata': {'RequestId': '091a4a42-111c-4da6-99b8-c33d17bed07b',
 'HTTPStatusCode': 201,
 'HTTPHeaders': {'date': 'Sat, 08 Aug 2020 05:50:43 GMT',
 'content-type': 'application/json',
 'content-length': '908',
 'connection': 'keep-alive',
 'x-amzn-requestid': '091a4a42-111c-4da6-99b8-c33d17bed07b'},
 'RetryAttempts': 0},
 'FunctionName': 'getstatuspredictor',
 'FunctionArn': 'arn:aws:lambda:us-east-1:805433377179:function:getstatuspredictor',
 'Runtime': 'python3.7',
 'Role': 'arn:aws:iam::805433377179:role/service-role/AmazonSageMaker-ExecutionRole-20200716T084970',
 'Handler': 'getstatuspredictor.lambda_handler',
 'CodeSize': 382,
 'Description': '',
 'Timeout': 900,
 'MemorySize': 3008,
 'LastModified': '2020-08-08T05:50:43.596+0000',
 'CodeSha256': 'JnAo+fVvxxjk1WGjnqUdNW4ozQR0CmGLeLFJJW4DBLQ=',
 'Version': '$LATEST',
 'TracingConfig': {'Mode': 'PassThrough'},
 'RevisionId': 'c23e3bfc-eeb0-46fd-8d6b-a80bb761be1f',
 'State': 'Active',
 'LastUpdateStatus': 'Successful'}

In [9]:
!rm -f lambdas/createforecast/forecast.zip
!cd lambdas/createforecast; zip -r forecast .

zip_file = open("lambdas/createforecast/forecast.zip", "rb").read()

lambda_.create_function(
 FunctionName="forecast",
 Runtime="python3.7",
 Role=role_sm,
 Handler="forecast.lambda_handler",
 Code={"ZipFile": zip_file},
 Timeout=60*15,
 MemorySize=3008
)

 adding: forecast.py (deflated 45%)


{'ResponseMetadata': {'RequestId': '8a38d610-48b8-4378-9140-1799796c6c7f',
 'HTTPStatusCode': 201,
 'HTTPHeaders': {'date': 'Sat, 08 Aug 2020 05:50:44 GMT',
 'content-type': 'application/json',
 'content-length': '878',
 'connection': 'keep-alive',
 'x-amzn-requestid': '8a38d610-48b8-4378-9140-1799796c6c7f'},
 'RetryAttempts': 0},
 'FunctionName': 'forecast',
 'FunctionArn': 'arn:aws:lambda:us-east-1:805433377179:function:forecast',
 'Runtime': 'python3.7',
 'Role': 'arn:aws:iam::805433377179:role/service-role/AmazonSageMaker-ExecutionRole-20200716T084970',
 'Handler': 'forecast.lambda_handler',
 'CodeSize': 374,
 'Description': '',
 'Timeout': 900,
 'MemorySize': 3008,
 'LastModified': '2020-08-08T05:50:44.543+0000',
 'CodeSha256': 'CvuC6H1Ce7+QbYW8JHYuRJDdhVVizyDfjNeOr1Chb/4=',
 'Version': '$LATEST',
 'TracingConfig': {'Mode': 'PassThrough'},
 'RevisionId': 'd702c495-93df-48d0-9610-44a84a43e45a',
 'State': 'Active',
 'LastUpdateStatus': 'Successful'}

In [10]:
!rm -f lambdas/GetStatusForecast/getstatusforecast.zip
!cd lambdas/GetStatusForecast; zip -r getstatusforecast .

zip_file = open("lambdas/GetStatusForecast/getstatusforecast.zip", "rb").read()

lambda_.create_function(
 FunctionName="getstatusforecast",
 Runtime="python3.7",
 Role=role_sm,
 Handler="getstatusforecast.lambda_handler",
 Code={"ZipFile": zip_file},
 Timeout=60*15,
 MemorySize=3008
)

 adding: getstatusforecast.py (deflated 47%)


{'ResponseMetadata': {'RequestId': 'f958c6bc-7c73-404e-8bf9-dbdfd6b7cb1c',
 'HTTPStatusCode': 201,
 'HTTPHeaders': {'date': 'Sat, 08 Aug 2020 05:50:45 GMT',
 'content-type': 'application/json',
 'content-length': '905',
 'connection': 'keep-alive',
 'x-amzn-requestid': 'f958c6bc-7c73-404e-8bf9-dbdfd6b7cb1c'},
 'RetryAttempts': 0},
 'FunctionName': 'getstatusforecast',
 'FunctionArn': 'arn:aws:lambda:us-east-1:805433377179:function:getstatusforecast',
 'Runtime': 'python3.7',
 'Role': 'arn:aws:iam::805433377179:role/service-role/AmazonSageMaker-ExecutionRole-20200716T084970',
 'Handler': 'getstatusforecast.lambda_handler',
 'CodeSize': 374,
 'Description': '',
 'Timeout': 900,
 'MemorySize': 3008,
 'LastModified': '2020-08-08T05:50:45.439+0000',
 'CodeSha256': '/zOy8Vh2h6IOAxpWKIsdEVmFBzkd4PhdLnO0e40FPyQ=',
 'Version': '$LATEST',
 'TracingConfig': {'Mode': 'PassThrough'},
 'RevisionId': '1170e79a-9e8e-4f53-ae81-81530437f17c',
 'State': 'Active',
 'LastUpdateStatus': 'Successful'}

In [11]:
!rm -f lambdas/createforecastexportjob/forecastexportjob.zip
!cd lambdas/createforecastexportjob; zip -r forecastexportjob .

zip_file = open("lambdas/createforecastexportjob/forecastexportjob.zip", "rb").read()

lambda_.create_function(
 FunctionName="forecastexportjob",
 Runtime="python3.7",
 Role=role_sm,
 Handler="forecastexportjob.lambda_handler",
 Code={"ZipFile": zip_file},
 Timeout=60*15,
 MemorySize=3008
)

 adding: forecastexportjob.py (deflated 53%)


{'ResponseMetadata': {'RequestId': 'b282734e-be76-4071-8773-f402505d7fcf',
 'HTTPStatusCode': 201,
 'HTTPHeaders': {'date': 'Sat, 08 Aug 2020 05:50:46 GMT',
 'content-type': 'application/json',
 'content-length': '905',
 'connection': 'keep-alive',
 'x-amzn-requestid': 'b282734e-be76-4071-8773-f402505d7fcf'},
 'RetryAttempts': 0},
 'FunctionName': 'forecastexportjob',
 'FunctionArn': 'arn:aws:lambda:us-east-1:805433377179:function:forecastexportjob',
 'Runtime': 'python3.7',
 'Role': 'arn:aws:iam::805433377179:role/service-role/AmazonSageMaker-ExecutionRole-20200716T084970',
 'Handler': 'forecastexportjob.lambda_handler',
 'CodeSize': 508,
 'Description': '',
 'Timeout': 900,
 'MemorySize': 3008,
 'LastModified': '2020-08-08T05:50:46.474+0000',
 'CodeSha256': 'saHaLFRozjXJiN8TkSn0N9Rmo0Vveic+Urj2LzdKWh0=',
 'Version': '$LATEST',
 'TracingConfig': {'Mode': 'PassThrough'},
 'RevisionId': 'b76bf9cd-1c8f-4de3-8136-c65272bc237b',
 'State': 'Active',
 'LastUpdateStatus': 'Successful'}

In [12]:
!rm -f lambdas/GetStatusForecastExportJob/getstatusforecastexportjob.zip
!cd lambdas/GetStatusForecastExportJob; zip -r getstatusforecastexportjob .

zip_file = open("lambdas/GetStatusForecastExportJob/getstatusforecastexportjob.zip", "rb").read()

lambda_.create_function(
 FunctionName="getstatusforecastexportjob",
 Runtime="python3.7",
 Role=role_sm,
 Handler="getstatusforecastexportjob.lambda_handler",
 Code={"ZipFile": zip_file},
 Timeout=60*15,
 MemorySize=3008
)

 adding: getstatusforecastexportjob.py (deflated 47%)


{'ResponseMetadata': {'RequestId': 'b2978cbf-0db6-4129-88ec-ef2c1b027da0',
 'HTTPStatusCode': 201,
 'HTTPHeaders': {'date': 'Sat, 08 Aug 2020 05:50:47 GMT',
 'content-type': 'application/json',
 'content-length': '932',
 'connection': 'keep-alive',
 'x-amzn-requestid': 'b2978cbf-0db6-4129-88ec-ef2c1b027da0'},
 'RetryAttempts': 0},
 'FunctionName': 'getstatusforecastexportjob',
 'FunctionArn': 'arn:aws:lambda:us-east-1:805433377179:function:getstatusforecastexportjob',
 'Runtime': 'python3.7',
 'Role': 'arn:aws:iam::805433377179:role/service-role/AmazonSageMaker-ExecutionRole-20200716T084970',
 'Handler': 'getstatusforecastexportjob.lambda_handler',
 'CodeSize': 406,
 'Description': '',
 'Timeout': 900,
 'MemorySize': 3008,
 'LastModified': '2020-08-08T05:50:47.451+0000',
 'CodeSha256': 'vexcWxFlnRrXANyWYRS+dDmcn6gFvyUX9WC9sApFD4E=',
 'Version': '$LATEST',
 'TracingConfig': {'Mode': 'PassThrough'},
 'RevisionId': 'df40e2c6-2dc6-4948-b7a6-2f81269cb0ca',
 'State': 'Active',
 'LastUpdateSt

In [13]:
!rm -f lambdas/NotifyUser/notifyuser.zip
!cd lambdas/NotifyUser; zip -r notifyuser .

zip_file = open("lambdas/NotifyUser/notifyuser.zip", "rb").read()

lambda_.create_function(
 FunctionName="notifyuser",
 Runtime="python3.7",
 Role=role_sm,
 Handler="notifyuser.lambda_handler",
 Code={"ZipFile": zip_file},
 Timeout=60*15,
 MemorySize=3008
)

 adding: notifyuser.py (deflated 12%)


{'ResponseMetadata': {'RequestId': '1b0ccd82-cc2a-409d-9f0f-389c7c1533ac',
 'HTTPStatusCode': 201,
 'HTTPHeaders': {'date': 'Sat, 08 Aug 2020 05:50:48 GMT',
 'content-type': 'application/json',
 'content-length': '884',
 'connection': 'keep-alive',
 'x-amzn-requestid': '1b0ccd82-cc2a-409d-9f0f-389c7c1533ac'},
 'RetryAttempts': 0},
 'FunctionName': 'notifyuser',
 'FunctionArn': 'arn:aws:lambda:us-east-1:805433377179:function:notifyuser',
 'Runtime': 'python3.7',
 'Role': 'arn:aws:iam::805433377179:role/service-role/AmazonSageMaker-ExecutionRole-20200716T084970',
 'Handler': 'notifyuser.lambda_handler',
 'CodeSize': 263,
 'Description': '',
 'Timeout': 900,
 'MemorySize': 3008,
 'LastModified': '2020-08-08T05:50:48.430+0000',
 'CodeSha256': '1tZH3rulcoyzKrdIvZSNIBVPTAe+/dn78gtSbuknHkI=',
 'Version': '$LATEST',
 'TracingConfig': {'Mode': 'PassThrough'},
 'RevisionId': '50de254a-e4ab-44a4-bc7a-a259d1d08c79',
 'State': 'Active',
 'LastUpdateStatus': 'Successful'}

# 2. Step Functins: Create state machine
The definition of the state machine is created, and the creation of the state machine is performed based on the definition.

In [14]:
import sagemaker

In [15]:
sagemaker_session = sagemaker.Session()

In [16]:
sagemaker_session.boto_region_name

'us-east-1'

In [17]:
sts = boto3.client('sts')
id_info = sts.get_caller_identity()

In [18]:
id_info['Account']

'805433377179'

In [19]:
import json

In [20]:
def_sfn={
 "Comment": "Amazon Forecast example of the Amazon States Language using an AWS Lambda Function",
 "StartAt": "datasetimport",
 "States": {
 "datasetimport": {
 "Type": "Task",
 "InputPath":"$",
 "Resource": "arn:aws:lambda:" + sagemaker_session.boto_region_name + ":" + id_info['Account'] + ":function:datasetimport",
 "ResultPath":"$",
 "Next": "GetStatusImport"
 },
 "GetStatusImport": {
 "Type": "Task",
 "InputPath":"$",
 "Resource": "arn:aws:lambda:" + sagemaker_session.boto_region_name + ":" + id_info['Account'] + ":function:getstatusimport",
 "ResultPath":"$",
 "Next": "CheckStatusImport"
 },
 "CheckStatusImport": {
 "Type": "Choice",
 "InputPath":"$",
 "Choices": [
 {
 "Variable": "$.is_active_import",
 "BooleanEquals": True,
 "Next": "predictor"
 }
 ],
 "Default": "SleepCheckStatusImport"
 },
 "SleepCheckStatusImport": {
 "Type": "Wait",
 "Seconds": 300,
 "Next": "GetStatusImport"
 },
 "predictor": {
 "Type": "Task",
 "InputPath":"$",
 "Resource": "arn:aws:lambda:" + sagemaker_session.boto_region_name + ":" + id_info['Account'] + ":function:predictor",
 "ResultPath":"$",
 "Next": "GetStatusPredictor"
 },
 "GetStatusPredictor": {
 "Type": "Task",
 "InputPath":"$",
 "Resource": "arn:aws:lambda:" + sagemaker_session.boto_region_name + ":" + id_info['Account'] + ":function:getstatuspredictor",
 "ResultPath":"$",
 "Next": "CheckStatusPredictor"
 },
 "CheckStatusPredictor": {
 "Type": "Choice",
 "InputPath":"$",
 "Choices": [
 {
 "Variable": "$.is_active_predictor",
 "BooleanEquals": True,
 "Next": "forecast"
 }
 ],
 "Default": "SleepCheckStatusPredictor"
 },
 "SleepCheckStatusPredictor": {
 "Type": "Wait",
 "Seconds": 300,
 "Next": "GetStatusPredictor"
 },
 "forecast": {
 "Type": "Task",
 "InputPath":"$",
 "Resource": "arn:aws:lambda:" + sagemaker_session.boto_region_name + ":" + id_info['Account'] + ":function:forecast",
 "ResultPath":"$",
 "Next": "GetStatusForecast"
 },
 "GetStatusForecast": {
 "Type": "Task",
 "InputPath":"$",
 "Resource": "arn:aws:lambda:" + sagemaker_session.boto_region_name + ":" + id_info['Account'] + ":function:getstatusforecast",
 "ResultPath":"$",
 "Next": "CheckStatusForecast"
 }, 
 "CheckStatusForecast": {
 "Type": "Choice",
 "InputPath":"$",
 "Choices": [
 {
 "Variable": "$.is_active_forecast",
 "BooleanEquals": True,
 "Next": "forecastexportjob"
 }
 ],
 "Default": "SleepCheckStatusForecast"
 },
 "SleepCheckStatusForecast": {
 "Type": "Wait",
 "Seconds": 300,
 "Next": "GetStatusForecast"
 },
 "forecastexportjob": {
 "Type": "Task",
 "InputPath":"$",
 "Resource": "arn:aws:lambda:" + sagemaker_session.boto_region_name + ":" + id_info['Account'] + ":function:forecastexportjob",
 "ResultPath":"$",
 "Next": "GetStatusForecastExportjob"
 },
 "GetStatusForecastExportjob": {
 "Type": "Task",
 "InputPath":"$",
 "Resource": "arn:aws:lambda:" + sagemaker_session.boto_region_name + ":" + id_info['Account'] + ":function:getstatusforecastexportjob",
 "ResultPath":"$",
 "Next": "CheckStatusExport"
 }, 
 "CheckStatusExport": {
 "Type": "Choice",
 "InputPath":"$",
 "Choices": [
 {
 "Variable": "$.is_active_export",
 "BooleanEquals": True,
 "Next": "NotifyUser"
 }
 ],
 "Default": "SleepCheckStatusExport"
 },
 "SleepCheckStatusExport": {
 "Type": "Wait",
 "Seconds": 300,
 "Next": "GetStatusForecastExportjob"
 },
 "NotifyUser": {
 "Type": "Task",
 "InputPath":"$",
 "Resource": "arn:aws:lambda:" + sagemaker_session.boto_region_name + ":" + id_info['Account'] + ":function:notifyuser",
 "ResultPath":"$",
 "End": True
 }
 }
}


In [21]:
with open('./definition.json', 'w') as f:
 json.dump(def_sfn, f, indent=2, ensure_ascii=False)

In [22]:
import boto3
sfn = boto3.client('stepfunctions')

In [23]:
sfn.create_state_machine(
 name="demo-forecast",
 definition=open("definition.json").read(),
 roleArn=role_sm
)

{'stateMachineArn': 'arn:aws:states:us-east-1:805433377179:stateMachine:demo-forecast',
 'creationDate': datetime.datetime(2020, 8, 8, 5, 50, 56, 173000, tzinfo=tzlocal()),
 'ResponseMetadata': {'RequestId': '78b2c992-e93e-4ef2-9bba-9f53b6a455a3',
 'HTTPStatusCode': 200,
 'HTTPHeaders': {'x-amzn-requestid': '78b2c992-e93e-4ef2-9bba-9f53b6a455a3',
 'content-type': 'application/x-amz-json-1.0',
 'content-length': '118'},
 'RetryAttempts': 0}}

# 3.AWS CloudTrail :create trail 
https://docs.aws.amazon.com/step-functions/latest/dg/tutorial-cloudwatch-events-s3.html

Configure CloudTrail and CloudWatch Events to run Step Functions by triggering S3 object placement as described in this guide.

https://boto3.amazonaws.com/v1/documentation/api/1.9.42/reference/services/cloudtrail.html#CloudTrail.Client.create_trail

In [24]:
! pip freeze | grep boto3

boto3==1.14.16


In [25]:
cloudtrail = boto3.client('cloudtrail')

In [26]:
sts = boto3.client('sts')
id_info = sts.get_caller_identity()
print(id_info['Account'])

805433377179


In [27]:
bucket_name = 'demo-forecast-' + id_info['Account']

In [28]:
bucket_name

'demo-forecast-805433377179'

In [29]:
output_trail_bucket = bucket_name + '-trail'

In [30]:
s3 = boto3.client('s3')
s3.create_bucket(Bucket=output_trail_bucket)

{'ResponseMetadata': {'RequestId': '72B0723FF5865BB6',
 'HostId': 'V7JrH3tzZ1f0uNEsxfZNb3DwtkvjdK8pXC4qKisMxwnuqGKUm8AR71xzezA/VZli9E8gyr0kZOE=',
 'HTTPStatusCode': 200,
 'HTTPHeaders': {'x-amz-id-2': 'V7JrH3tzZ1f0uNEsxfZNb3DwtkvjdK8pXC4qKisMxwnuqGKUm8AR71xzezA/VZli9E8gyr0kZOE=',
 'x-amz-request-id': '72B0723FF5865BB6',
 'date': 'Sat, 08 Aug 2020 05:51:02 GMT',
 'location': '/demo-forecast-805433377179-trail',
 'content-length': '0',
 'server': 'AmazonS3'},
 'RetryAttempts': 0},
 'Location': '/demo-forecast-805433377179-trail'}

## set bucket policy
setting policy with boto3 
https://boto3.amazonaws.com/v1/documentation/api/latest/guide/s3-example-bucket-policies.html

In [31]:
import json

In [32]:
bucket_policy = {
 "Version": "2012-10-17",
 "Statement": [
 {
 "Sid": "AWSCloudTrailAclCheck20150319",
 "Effect": "Allow",
 "Principal": {
 "Service": "cloudtrail.amazonaws.com"
 },
 "Action": "s3:GetBucketAcl",
 "Resource": "arn:aws:s3:::" + bucket_name + "-trail"
 },
 {
 "Sid": "AWSCloudTrailWrite20150319",
 "Effect": "Allow",
 "Principal": {
 "Service": "cloudtrail.amazonaws.com"
 },
 "Action": "s3:PutObject",
 "Resource": "arn:aws:s3:::" + bucket_name + "-trail/AWSLogs/" + id_info['Account'] + "/*",
 "Condition": {
 "StringEquals": {
 "s3:x-amz-acl": "bucket-owner-full-control"
 }
 }
 }
 ]
}

# Convert the policy from JSON dict to string
bucket_policy = json.dumps(bucket_policy)

In [33]:
# Set the new policy
s3 = boto3.client('s3')
s3.put_bucket_policy(Bucket=output_trail_bucket, Policy=bucket_policy)

{'ResponseMetadata': {'RequestId': 'C959349CFB2936F4',
 'HostId': 'vTZwu7SKmG+IrE1gxQDyAW7u96qzSD/mxBl8wK1WuZD8+8Ix10PJR80NfeHEVIJ38RE6h5Dh2ec=',
 'HTTPStatusCode': 204,
 'HTTPHeaders': {'x-amz-id-2': 'vTZwu7SKmG+IrE1gxQDyAW7u96qzSD/mxBl8wK1WuZD8+8Ix10PJR80NfeHEVIJ38RE6h5Dh2ec=',
 'x-amz-request-id': 'C959349CFB2936F4',
 'date': 'Sat, 08 Aug 2020 05:51:06 GMT',
 'server': 'AmazonS3'},
 'RetryAttempts': 0}}

In [34]:
cloudtrail.create_trail(
 Name='forecast-trail',
 S3BucketName=output_trail_bucket,
 EnableLogFileValidation=True
)

{'Name': 'forecast-trail',
 'S3BucketName': 'demo-forecast-805433377179-trail',
 'IncludeGlobalServiceEvents': True,
 'IsMultiRegionTrail': False,
 'TrailARN': 'arn:aws:cloudtrail:us-east-1:805433377179:trail/forecast-trail',
 'LogFileValidationEnabled': True,
 'IsOrganizationTrail': False,
 'ResponseMetadata': {'RequestId': 'db78ab21-5808-40dc-981f-efd1b47aa45e',
 'HTTPStatusCode': 200,
 'HTTPHeaders': {'x-amzn-requestid': 'db78ab21-5808-40dc-981f-efd1b47aa45e',
 'content-type': 'application/x-amz-json-1.1',
 'content-length': '272',
 'date': 'Sat, 08 Aug 2020 05:51:05 GMT'},
 'RetryAttempts': 0}}

In [35]:
cloudtrail.put_event_selectors(
 TrailName='forecast-trail',
 EventSelectors=[
 {
 'ReadWriteType': 'All',
 'IncludeManagementEvents': True,
 'DataResources': [
 {
 'Type': 'AWS::S3::Object',
 'Values': [
 f'arn:aws:s3:::{bucket_name}/input',
 ]
 },
 ]
 },
 ]
)

{'TrailARN': 'arn:aws:cloudtrail:us-east-1:805433377179:trail/forecast-trail',
 'EventSelectors': [{'ReadWriteType': 'All',
 'IncludeManagementEvents': True,
 'DataResources': [{'Type': 'AWS::S3::Object',
 'Values': ['arn:aws:s3:::demo-forecast-805433377179/input']}],
 'ExcludeManagementEventSources': []}],
 'ResponseMetadata': {'RequestId': '3fb976be-ca56-4270-8660-c5a36f2efc86',
 'HTTPStatusCode': 200,
 'HTTPHeaders': {'x-amzn-requestid': '3fb976be-ca56-4270-8660-c5a36f2efc86',
 'content-type': 'application/x-amz-json-1.1',
 'content-length': '290',
 'date': 'Sat, 08 Aug 2020 05:51:06 GMT'},
 'RetryAttempts': 0}}

### enable logging
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/cloudtrail.html#CloudTrail.Client.start_logging

In [36]:
cloudtrail.start_logging(Name='forecast-trail')

{'ResponseMetadata': {'RequestId': '97173bec-db69-44aa-9697-d03ba43c8608',
 'HTTPStatusCode': 200,
 'HTTPHeaders': {'x-amzn-requestid': '97173bec-db69-44aa-9697-d03ba43c8608',
 'content-type': 'application/x-amz-json-1.1',
 'content-length': '2',
 'date': 'Sat, 08 Aug 2020 05:51:14 GMT'},
 'RetryAttempts': 0}}

# 4.CloudWatch Event: build a rule
https://boto3.amazonaws.com/v1/documentation/api/1.9.42/reference/services/events.html


put_rule(): create rule 
https://boto3.amazonaws.com/v1/documentation/api/1.9.42/reference/services/events.html#CloudWatchEvents.Client.put_rule


In [37]:
bucket_name

'demo-forecast-805433377179'

In [38]:
cwe = boto3.client('events')

In [39]:
ep_str ='{"source":["aws.s3"], \
 "detail-type":["AWS API Call via CloudTrail"], \
 "detail":{"eventSource":["s3.amazonaws.com"], \
 "eventName":["PutObject", "CompleteMultipartUpload"], \
 "requestParameters":{"bucketName":["'+ bucket_name + '"]}}}'

In [40]:
cwe.put_rule(
 Name='demo-forecast',
 EventPattern=ep_str,
 State='ENABLED'
)

{'RuleArn': 'arn:aws:events:us-east-1:805433377179:rule/demo-forecast',
 'ResponseMetadata': {'RequestId': '864dd3e4-805f-457a-93bd-e447e6fa9ed7',
 'HTTPStatusCode': 200,
 'HTTPHeaders': {'x-amzn-requestid': '864dd3e4-805f-457a-93bd-e447e6fa9ed7',
 'content-type': 'application/x-amz-json-1.1',
 'content-length': '70',
 'date': 'Sat, 08 Aug 2020 05:51:21 GMT'},
 'RetryAttempts': 0}}

In [41]:
cwe.put_targets(
 Rule='demo-forecast',
 Targets=[
 {
 'Id': 'forecast',
 'Arn': "arn:aws:states:" + sagemaker_session.boto_region_name + ":" + id_info['Account'] + ":stateMachine:demo-forecast",
 'RoleArn': role_sm
 }
 ]
)

{'FailedEntryCount': 0,
 'FailedEntries': [],
 'ResponseMetadata': {'RequestId': '4d6db0e4-4c38-4cdf-85e8-721248701413',
 'HTTPStatusCode': 200,
 'HTTPHeaders': {'x-amzn-requestid': '4d6db0e4-4c38-4cdf-85e8-721248701413',
 'content-type': 'application/x-amz-json-1.1',
 'content-length': '41',
 'date': 'Sat, 08 Aug 2020 05:51:21 GMT'},
 'RetryAttempts': 0}}

# 5.Put additional file into S3
When upload a file to S3, Step Functions runs.

It may not work if you run it all at once. wait about 30 seconds. the S3 file can be uploaded repeatedly.

In [42]:
s3 = boto3.resource('s3')
bucket = s3.Bucket(bucket_name)

bucket.upload_file('./output/tr_target_add_20091201_20101209.csv', 'input/tr_target_add_20091201_20101209.csv')

# 6.Next
When Step Functions is completed, the prediction results are output to S3, import the data source in QuickSight using the same procedure as before and visualize it.