# Build Step Functions statemachine
In this notebook, we will build the Amazon Forecast pipeline using the Step Functions statemachine. We also set up a Rule for CloudWatch Events so that Step Functions will run when the file is put to S3.

## Create statemachine

In [None]:
import boto3

In [None]:
import sagemaker
sagemaker_session = sagemaker.Session()

sts = boto3.client('sts')
id_info = sts.get_caller_identity()
import json

In [None]:
from sagemaker import get_execution_role

role_sm = get_execution_role()

In [None]:
def_sfn={
  "Comment": "Amazon Forecast example of the Amazon States Language using an AWS Lambda Function",
  "StartAt": "Create-DatasetGroup",
  "States": {
    "Create-DatasetGroup": {
      "Type": "Task",
      "InputPath":"$",
      "Resource": "arn:aws:lambda:" + sagemaker_session.boto_region_name + ":" + id_info['Account'] + ":function:createdatasetgroup",
      "ResultPath":"$",
      "Next": "Create-Dataset"
    },
    "Create-Dataset": {
      "Type": "Task",
      "InputPath":"$",
      "Resource": "arn:aws:lambda:" + sagemaker_session.boto_region_name + ":" + id_info['Account'] + ":function:createdataset",
      "ResultPath":"$",
      "Next": "Import-Data"
    },  
    "Import-Data": {
      "Type": "Task",
      "InputPath": "$",
      "Resource": "arn:aws:lambda:" + sagemaker_session.boto_region_name + ":" + id_info['Account'] + ":function:createdatasetimport",
      "ResultPath": "$",
      "Next": "Wait-Import-Data"
    },
    "Wait-Import-Data": {
      "Type": "Task",
      "InputPath": "$",
      "Resource": "arn:aws:lambda:" + sagemaker_session.boto_region_name + ":" + id_info['Account'] + ":function:getstatusimport",
      "Retry": [
        {
          "ErrorEquals": ["SatusActive"],
          "IntervalSeconds": 60,
          "MaxAttempts": 30,
          "BackoffRate": 1.0
        }
      ],
      "ResultPath": "$",
      "Next": "Create-Predictor"
    },
    "Create-Predictor": {
      "Type": "Task",
      "InputPath": "$",
      "Resource": "arn:aws:lambda:" + sagemaker_session.boto_region_name + ":" + id_info['Account'] + ":function:createpredictor",
      "ResultPath": "$",
      "Next": "Wait-Create-Predictor"
    },
    "Wait-Create-Predictor": {
      "Type": "Task",
      "InputPath": "$",
      "Resource": "arn:aws:lambda:" + sagemaker_session.boto_region_name + ":" + id_info['Account'] + ":function:getstatuspredictor",
      "Retry": [
        {
          "ErrorEquals": ["SatusActive"],
          "IntervalSeconds": 60,
          "MaxAttempts": 30,
          "BackoffRate": 1.0
        }
      ],
      "ResultPath": "$",
      "Next": "Create-Forecast"
    },
    "Create-Forecast": {
      "Type": "Task",
      "InputPath": "$",
      "Resource": "arn:aws:lambda:" + sagemaker_session.boto_region_name + ":" + id_info['Account'] + ":function:createforecast",
      "ResultPath": "$",
      "Next": "Wait-Create-Forecast"
    },
    "Wait-Create-Forecast": {
      "Type": "Task",
      "InputPath": "$",
      "Resource": "arn:aws:lambda:" + sagemaker_session.boto_region_name + ":" + id_info['Account'] + ":function:getstatusforecast",
      "Retry": [
        {
          "ErrorEquals": ["SatusActive"],
          "IntervalSeconds": 60,
          "MaxAttempts": 30,
          "BackoffRate": 1.0
        }
      ],
      "ResultPath": "$",
      "Next": "Export-Forecast"
    },
    "Export-Forecast": {
      "Type": "Task",
      "InputPath": "$",
      "Resource": "arn:aws:lambda:" + sagemaker_session.boto_region_name + ":" + id_info['Account'] + ":function:createforecastexportjob",
      "ResultPath": "$",
      "Next": "Wait-Export-Forecast"
    },
    "Wait-Export-Forecast": {
      "Type": "Task",
      "InputPath": "$",
      "Resource": "arn:aws:lambda:" + sagemaker_session.boto_region_name + ":" + id_info['Account'] + ":function:getstatusforecastexportjob",
      "Retry": [
        {
          "ErrorEquals": ["SatusActive"],
          "IntervalSeconds": 60,
          "MaxAttempts": 30,
          "BackoffRate": 1.0
        }
      ],
      "ResultPath": "$",
      "End": True
    }
  }
}

In [None]:
with open('./sfn_definition.json', 'w') as f:
    json.dump(def_sfn, f, indent=2, ensure_ascii=False)

In [None]:
import boto3
sfn = boto3.client('stepfunctions')

In [None]:
sfn.create_state_machine(
        name="workshop-timeseries-retail",
        definition=open("sfn_definition.json").read(),
        roleArn=role_sm
)

## Create CloudWatch Events Rule

In order for StepFunctions to start when you put the file in S3, you will need to set up a trail of CloudTrail and configure a CloudWatch Event Rule. Create a CloudWatch Events Rule.

https://docs.aws.amazon.com/step-functions/latest/dg/tutorial-cloudwatch-events-s3.html

In [None]:
import sagemaker
sagemaker_session = sagemaker.Session()
sts = boto3.client('sts')
id_info = sts.get_caller_identity()

In [None]:
bucket_name = 'workshop-timeseries-retail-' + id_info['Account'] + '-source'

In [None]:
cwe = boto3.client('events')

In [None]:
ep_str ='{"source":["aws.s3"], \
        "detail-type":["AWS API Call via CloudTrail"], \
        "detail":{"eventSource":["s3.amazonaws.com"], \
        "eventName":["PutObject", "CompleteMultipartUpload"], \
        "requestParameters":{"bucketName":["'+ bucket_name + '"]}}}'

In [None]:
cwe.put_rule(
    Name='workshop-timeseries-retail',
    EventPattern=ep_str,
    State='ENABLED'
)

In [None]:
cwe.put_targets(
    Rule='workshop-timeseries-retail',
    Targets=[
        {
            'Id': 'forecast',
            'Arn': "arn:aws:states:" + sagemaker_session.boto_region_name + ":" + id_info['Account'] + ":stateMachine:workshop-timeseries-retail",
            'RoleArn': role_sm
        }
    ]
)

## Next
Now that we have the pipeline set up, let's put the training data into S3 and run the pipeline.
Proceed 2_preprocess_and_put_dataset.ipynb