# Operationalize end-to-end Amazon Personalize model deployment process using AWS Step Functions Data Science SDK

1. [Introduction](#Introduction)
2. [Setup](#Setup)
3. [Task-States](#Task-States)
4. [Wait-States](#Wait-States)
5. [Choice-States](#Choice-States)
6. [Workflow](#Workflow)
7. [Generate-Recommendations](#Generate-Recommendations)



## Introduction

This notebook describes using the AWS Step Functions Data Science SDK to create and manage an Amazon Personalize workflow. The Step Functions SDK is an open source library that allows data scientists to easily create and execute machine learning workflows using AWS Step Functions. For more information on Step Functions SDK, see the following.
* [AWS Step Functions](https://aws.amazon.com/step-functions/)
* [AWS Step Functions Developer Guide](https://docs.aws.amazon.com/step-functions/latest/dg/welcome.html)
* [AWS Step Functions Data Science SDK](https://aws-step-functions-data-science-sdk.readthedocs.io)

In this notebook we will use the SDK to create steps to create Personalize resources, link them together to create a workflow, and execute the workflow in AWS Step Functions. 

For more information, on Amazon Personalize see the following.

* [Amazon Personalize](https://aws.amazon.com/personalize/)


## Setup

### Import required modules from the SDK

In [None]:
#import sys
#!{sys.executable} -m pip install --upgrade stepfunctions

In [None]:
import boto3
import json
import numpy as np
import pandas as pd
import time

personalize = boto3.client('personalize')
personalize_runtime = boto3.client('personalize-runtime')


import stepfunctions
import logging

from stepfunctions.steps import *
from stepfunctions.workflow import Workflow

stepfunctions.set_stream_logger(level=logging.INFO)

workflow_execution_role = "" # paste the StepFunctionsWorkflowExecutionRole ARN from above

### Setup S3 location and filename

In [None]:
bucket = "" # replace with the name of your S3 bucket
filename = "" # replace with a name that you want to save the dataset under

### Setup IAM Roles

#### Create an execution role for Step Functions

You need an execution role so that you can create and execute workflows in Step Functions.

1. Go to the [IAM console](https://console.aws.amazon.com/iam/)
2. Select **Roles** and then **Create role**.
3. Under **Choose the service that will use this role** select **Step Functions**
4. Choose **Next** until you can enter a **Role name**
5. Enter a name such as `StepFunctionsWorkflowExecutionRole` and then select **Create role**


Attach a policy to the role you created. The following steps attach a policy that provides full access to Step Functions, however as a good practice you should only provide access to the resources you need. 

1. Under the **Permissions** tab, click **Add inline policy**
2. Enter the following in the **JSON** tab

```json
{
 "Version": "2012-10-17",
 "Statement": [
 
 {
 "Effect": "Allow",
 "Action": [
 "personalize:*"
 ],
 "Resource": "*"
 }, 

 {
 "Effect": "Allow",
 "Action": [
 "lambda:InvokeFunction"
 ],
 "Resource": "*"
 },
 {
 "Effect": "Allow",
 "Action": [
 "iam:PassRole"
 ],
 "Resource": "*",
 },
 {
 "Effect": "Allow",
 "Action": [
 "events:PutTargets",
 "events:PutRule",
 "events:DescribeRule"
 ],
 "Resource": "*"
 }
 ]
}
```

3. Choose **Review policy** and give the policy a name such as `StepFunctionsWorkflowExecutionPolicy`
4. Choose **Create policy**. You will be redirected to the details page for the role.
5. Copy the **Role ARN** at the top of the **Summary**



In [None]:
lambda_state_role = LambdaStep(
 state_id="create bucket and role",
 parameters={ 
 "FunctionName": "stepfunction_create_personalize_role", #replace with the name of the function you created
 "Payload": { 
 "bucket": bucket
 }
 },
 result_path='$'
 
)

lambda_state_role.add_retry(Retry(
 error_equals=["States.TaskFailed"],
 interval_seconds=5,
 max_attempts=1,
 backoff_rate=4.0
))

lambda_state_role.add_catch(Catch(
 error_equals=["States.TaskFailed"],
 next_step=Fail("CreateRoleTaskFailed")
))

#### Attach Policy to S3 Bucket

In [None]:
s3 = boto3.client("s3")

policy = {
 "Version": "2012-10-17",
 "Id": "PersonalizeS3BucketAccessPolicy",
 "Statement": [
 {
 "Sid": "PersonalizeS3BucketAccessPolicy",
 "Effect": "Allow",
 "Principal": {
 "Service": "personalize.amazonaws.com"
 },
 "Action": [
 "s3:GetObject",
 "s3:ListBucket"
 ],
 "Resource": [
 "arn:aws:s3:::{}".format(bucket),
 "arn:aws:s3:::{}/*".format(bucket)
 
 ]
 }
 ]
}

s3.put_bucket_policy(Bucket=bucket, Policy=json.dumps(policy))

# AmazonPersonalizeFullAccess provides access to any S3 bucket with a name that includes "personalize" or "Personalize" 
# if you would like to use a bucket with a different name, please consider creating and attaching a new policy
# that provides read access to your bucket or attaching the AmazonS3ReadOnlyAccess policy to the role


#### Create Personalize Role


In [None]:
iam = boto3.client("iam")

role_name = "" # Create a personalize role


assume_role_policy_document = {
 "Version": "2012-10-17",
 "Statement": [
 {
 "Effect": "Allow",
 "Principal": {
 "Service": "personalize.amazonaws.com"
 },
 "Action": "sts:AssumeRole"
 }
 ]
}

create_role_response = iam.create_role(
 RoleName = role_name,
 AssumeRolePolicyDocument = json.dumps(assume_role_policy_document)
)



policy_arn = "arn:aws:iam::aws:policy/service-role/AmazonPersonalizeFullAccess"
iam.attach_role_policy(
 RoleName = role_name,
 PolicyArn = policy_arn
)

time.sleep(60) # wait for a minute to allow IAM role policy attachment to propagate

role_arn = create_role_response["Role"]["Arn"]


## Data-Preparation

### Download, Prepare, and Upload Training Data

In [None]:
!pwd

In [None]:
!wget -N http://files.grouplens.org/datasets/movielens/ml-100k.zip
!unzip -o ml-100k.zip
data = pd.read_csv('./ml-100k/u.data', sep='\t', names=['USER_ID', 'ITEM_ID', 'RATING', 'TIMESTAMP'])
pd.set_option('display.max_rows', 5)
data



In [None]:
data = data[data['RATING'] > 2] # keep only movies rated 2 and above
data2 = data[['USER_ID', 'ITEM_ID', 'TIMESTAMP']] 
data2.to_csv(filename, index=False)

boto3.Session().resource('s3').Bucket(bucket).Object(filename).upload_file(filename)

## Task-States

### Lambda Task state

A `Task` State in Step Functions represents a single unit of work performed by a workflow. Tasks can call Lambda functions and orchestrate other AWS services. See [AWS Service Integrations](https://docs.aws.amazon.com/step-functions/latest/dg/concepts-service-integrations.html) in the *AWS Step Functions Developer Guide*.

The following creates a [LambdaStep](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/compute.html#stepfunctions.steps.compute.LambdaStep) called `lambda_state`, and then configures the options to [Retry](https://docs.aws.amazon.com/step-functions/latest/dg/concepts-error-handling.html#error-handling-retrying-after-an-error) if the Lambda function fails.

#### Create a Lambda functions

The Lambda task states in this workflow uses Lambda function **(Python 3.x)** that returns a Personalize resources such as Schema, Datasetgroup, Dataset, Solution, SolutionVersion, etc. Create the following functions in the [Lambda console](https://console.aws.amazon.com/lambda/).

1. stepfunction-create-schema
2. stepfunctioncreatedatagroup
3. stepfunctioncreatedataset
4. stepfunction-createdatasetimportjob
5. stepfunction_select-recipe_create-solution
6. stepfunction_create_solution_version
7. stepfunction_getsolution_metric_create_campaign

Copy/Paste the corresponding lambda function code from ./Lambda/ folder in the repo


#### Create Schema

In [None]:
lambda_state_schema = LambdaStep(
 state_id="create schema",
 parameters={ 
 "FunctionName": "stepfunction-create-schema", #replace with the name of the function you created
 "Payload": { 
 "input": "personalize-stepfunction-schema263"
 }
 },
 result_path='$' 
)

lambda_state_schema.add_retry(Retry(
 error_equals=["States.TaskFailed"],
 interval_seconds=5,
 max_attempts=1,
 backoff_rate=4.0
))

lambda_state_schema.add_catch(Catch(
 error_equals=["States.TaskFailed"],
 next_step=Fail("CreateSchemaTaskFailed")
))

#### Create Datasetgroup

In [None]:
lambda_state_datasetgroup = LambdaStep(
 state_id="create dataset Group",
 parameters={ 
 "FunctionName": "stepfunctioncreatedatagroup", #replace with the name of the function you created
 "Payload": { 
 "input": "personalize-stepfunction-dataset-group", 
 "schemaArn.$": '$.Payload.schemaArn'
 }
 },

 result_path='$'
)



lambda_state_datasetgroup.add_retry(Retry(
 error_equals=["States.TaskFailed"],
 interval_seconds=5,
 max_attempts=1,
 backoff_rate=4.0
))


lambda_state_datasetgroup.add_catch(Catch(
 error_equals=["States.TaskFailed"],
 next_step=Fail("CreateDataSetGroupTaskFailed")
))

#### Create Dataset

In [None]:
lambda_state_createdataset = LambdaStep(
 state_id="create dataset",
 parameters={ 
 "FunctionName": "stepfunctioncreatedataset", #replace with the name of the function you created
# "Payload": { 
# "schemaArn.$": '$.Payload.schemaArn',
# "datasetGroupArn.$": '$.Payload.datasetGroupArn',
 
 
# }
 
 "Payload": { 
 "schemaArn.$": '$.schemaArn',
 "datasetGroupArn.$": '$.datasetGroupArn', 
 } 
 
 
 },
 result_path = '$'
)

lambda_state_createdataset.add_retry(Retry(
 error_equals=["States.TaskFailed"],
 interval_seconds=5,
 max_attempts=1,
 backoff_rate=4.0
))

lambda_state_createdataset.add_catch(Catch(
 error_equals=["States.TaskFailed"],
 next_step=Fail("CreateDataSetTaskFailed")
))

#### Create Dataset Import Job

In [None]:
lambda_state_datasetimportjob = LambdaStep(
 state_id="create dataset import job",
 parameters={ 
 "FunctionName": "stepfunction-createdatasetimportjob", #replace with the name of the function you created
 "Payload": { 
 "datasetimportjob": "stepfunction-createdatasetimportjob",
 "dataset_arn.$": '$.Payload.dataset_arn',
 "datasetGroupArn.$": '$.Payload.datasetGroupArn',
 "bucket_name": bucket,
 "file_name": filename,
 "role_arn": role_arn
 
 }
 },

 result_path = '$'
)

lambda_state_datasetimportjob.add_retry(Retry(
 error_equals=["States.TaskFailed"],
 interval_seconds=5,
 max_attempts=1,
 backoff_rate=4.0
))

lambda_state_datasetimportjob.add_catch(Catch(
 error_equals=["States.TaskFailed"],
 next_step=Fail("DatasetImportJobTaskFailed")
))

#### Create Receipe and Solution

In [None]:
lambda_state_select_receipe_create_solution = LambdaStep(
 state_id="select receipe and create solution",
 parameters={ 
 "FunctionName": "stepfunction_select-recipe_create-solution", #replace with the name of the function you created
 "Payload": { 
 #"dataset_group_arn.$": '$.Payload.datasetGroupArn' 
 "dataset_group_arn.$": '$.datasetGroupArn'
 }
 },
 result_path = '$'
)

lambda_state_select_receipe_create_solution.add_retry(Retry(
 error_equals=["States.TaskFailed"],
 interval_seconds=5,
 max_attempts=1,
 backoff_rate=4.0
))

lambda_state_select_receipe_create_solution.add_catch(Catch(
 error_equals=["States.TaskFailed"],
 next_step=Fail("DatasetReceiptCreateSolutionTaskFailed")
))

#### Create Solution Version

In [None]:
lambda_create_solution_version = LambdaStep(
 state_id="create solution version",
 parameters={ 
 "FunctionName": "stepfunction_create_solution_version", 
 "Payload": { 
 "solution_arn.$": '$.Payload.solution_arn' 
 }
 },
 result_path = '$'
)

lambda_create_solution_version.add_retry(Retry(
 error_equals=["States.TaskFailed"],
 interval_seconds=5,
 max_attempts=1,
 backoff_rate=4.0
))

lambda_create_solution_version.add_catch(Catch(
 error_equals=["States.TaskFailed"],
 next_step=Fail("CreateSolutionVersionTaskFailed")
))

#### Create Campaign

In [None]:
lambda_create_campaign = LambdaStep(
 state_id="create campaign",
 parameters={ 
 "FunctionName": "stepfunction_getsolution_metric_create_campaign", 
 "Payload": { 
 #"solution_version_arn.$": '$.Payload.solution_version_arn' 
 "solution_version_arn.$": '$.solution_version_arn'
 }
 },
 result_path = '$'
)

lambda_create_campaign.add_retry(Retry(
 error_equals=["States.TaskFailed"],
 interval_seconds=5,
 max_attempts=1,
 backoff_rate=4.0
))

lambda_create_campaign.add_catch(Catch(
 error_equals=["States.TaskFailed"],
 next_step=Fail("CreateCampaignTaskFailed")
))

## Wait-States

#### A `Wait` state in Step Functions waits a specific amount of time. See [Wait](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/states.html#stepfunctions.steps.states.Wait) in the AWS Step Functions Data Science SDK documentation.

#### Wait for Schema to be ready

In [None]:
wait_state_schema = Wait(
 state_id="Wait for create schema - 5 secs",
 seconds=5
)

#### Wait for Datasetgroup to be ready

In [None]:
wait_state_datasetgroup = Wait(
 state_id="Wait for datasetgroup - 30 secs",
 seconds=30
)

#### Wait for Dataset to be ready

In [None]:
wait_state_dataset = Wait(
 state_id="wait for dataset - 30 secs",
 seconds=30
)

#### Wait for Dataset Import Job to be ACTIVE

In [None]:
wait_state_datasetimportjob = Wait(
 state_id="Wait for datasetimportjob - 30 secs",
 seconds=30
)

#### Wait for Receipe to ready

In [None]:
wait_state_receipe = Wait(
 state_id="Wait for receipe - 30 secs",
 seconds=30
)

#### Wait for Solution Version to be ACTIVE

In [None]:
wait_state_solutionversion = Wait(
 state_id="Wait for solution version - 60 secs",
 seconds=60
)

#### Wait for Campaign to be ACTIVE

In [None]:
wait_state_campaign = Wait(
 state_id="Wait for Campaign - 30 secs",
 seconds=30
)



### Check status of the lambda task and take action accordingly

#### If a state fails, move it to `Fail` state. See [Fail](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/states.html#stepfunctions.steps.states.Fail) in the AWS Step Functions Data Science SDK documentation.

### check datasetgroup status

In [None]:
lambda_state_datasetgroupstatus = LambdaStep(
 state_id="check dataset Group status",
 parameters={ 
 "FunctionName": "stepfunction_waitforDatasetGroup", #replace with the name of the function you created
 "Payload": { 
 "input.$": '$.Payload.datasetGroupArn',
 "schemaArn.$": '$.Payload.schemaArn'
 }
 },
 result_path = '$'
)

lambda_state_datasetgroupstatus.add_retry(Retry(
 error_equals=["States.TaskFailed"],
 interval_seconds=5,
 max_attempts=1,
 backoff_rate=4.0
))

lambda_state_datasetgroupstatus.add_catch(Catch(
 error_equals=["States.TaskFailed"],
 next_step=Fail("DatasetGroupStatusTaskFailed")
))

### check dataset import job status

In [None]:
lambda_state_datasetimportjob_status = LambdaStep(
 state_id="check dataset import job status",
 parameters={ 
 "FunctionName": "stepfunction_waitfordatasetimportjob", #replace with the name of the function you created
 "Payload": { 
 "dataset_import_job_arn.$": '$.Payload.dataset_import_job_arn',
 "datasetGroupArn.$": '$.Payload.datasetGroupArn'
 }
 },
 result_path = '$'
)

lambda_state_datasetimportjob_status.add_retry(Retry(
 error_equals=["States.TaskFailed"],
 interval_seconds=5,
 max_attempts=1,
 backoff_rate=4.0
))

lambda_state_datasetimportjob_status.add_catch(Catch(
 error_equals=["States.TaskFailed"],
 next_step=Fail("DatasetImportJobStatusTaskFailed")
))

### check solution version status

In [None]:

solutionversion_succeed_state = Succeed(
 state_id="The Solution Version ready?"
)

In [None]:
lambda_state_solutionversion_status = LambdaStep(
 state_id="check solution version status",
 parameters={ 
 "FunctionName": "stepfunction_waitforSolutionVersion", #replace with the name of the function you created
 "Payload": { 
 "solution_version_arn.$": '$.Payload.solution_version_arn' 
 }
 },
 result_path = '$'
)

lambda_state_solutionversion_status.add_retry(Retry(
 error_equals=["States.TaskFailed"],
 interval_seconds=5,
 max_attempts=1,
 backoff_rate=4.0
))

lambda_state_solutionversion_status.add_catch(Catch(
 error_equals=["States.TaskFailed"],
 next_step=Fail("SolutionVersionStatusTaskFailed")
))

### check campaign status

In [None]:
lambda_state_campaign_status = LambdaStep(
 state_id="check campaign status",
 parameters={ 
 "FunctionName": "stepfunction_waitforCampaign", #replace with the name of the function you created
 "Payload": { 
 "campaign_arn.$": '$.Payload.campaign_arn' 
 }
 },
 result_path = '$'
)

lambda_state_campaign_status.add_retry(Retry(
 error_equals=["States.TaskFailed"],
 interval_seconds=5,
 max_attempts=1,
 backoff_rate=4.0
))

lambda_state_campaign_status.add_catch(Catch(
 error_equals=["States.TaskFailed"],
 next_step=Fail("CampaignStatusTaskFailed")
))

## Choice-States

Now, attach branches to the Choice state you created earlier. See *Choice Rules* in the [AWS Step Functions Data Science SDK documentation](https://aws-step-functions-data-science-sdk.readthedocs.io) .

#### Chain together steps for the define the workflow path

The following cell links together the steps you've created above into a sequential group. The new path sequentially includes the Lambda state, Wait state, and the Succeed state that you created earlier.

#### After chaining together the steps for the workflow path, we will define and visualize the workflow.

In [None]:
create_campaign_choice_state = Choice(
 state_id="Is the Campaign ready?"
)

In [None]:
create_campaign_choice_state.add_choice(
 rule=ChoiceRule.StringEquals(variable=lambda_state_campaign_status.output()['Payload']['status'], value='ACTIVE'),
 next_step=Succeed("CampaignCreatedSuccessfully") 
)
create_campaign_choice_state.add_choice(
 ChoiceRule.StringEquals(variable=lambda_state_campaign_status.output()['Payload']['status'], value='CREATE PENDING'),
 next_step=wait_state_campaign
)
create_campaign_choice_state.add_choice(
 ChoiceRule.StringEquals(variable=lambda_state_campaign_status.output()['Payload']['status'], value='CREATE IN_PROGRESS'),
 next_step=wait_state_campaign
)

create_campaign_choice_state.default_choice(next_step=Fail("CreateCampaignFailed"))


In [None]:
solutionversion_choice_state = Choice(
 state_id="Is the Solution Version ready?"
)

In [None]:
solutionversion_succeed_state = Succeed(
 state_id="The Solution Version ready?"
)

In [None]:
solutionversion_choice_state.add_choice(
 rule=ChoiceRule.StringEquals(variable=lambda_state_solutionversion_status.output()['Payload']['status'], value='ACTIVE'),
 next_step=solutionversion_succeed_state 
)
solutionversion_choice_state.add_choice(
 ChoiceRule.StringEquals(variable=lambda_state_solutionversion_status.output()['Payload']['status'], value='CREATE PENDING'),
 next_step=wait_state_solutionversion
)
solutionversion_choice_state.add_choice(
 ChoiceRule.StringEquals(variable=lambda_state_solutionversion_status.output()['Payload']['status'], value='CREATE IN_PROGRESS'),
 next_step=wait_state_solutionversion
)

solutionversion_choice_state.default_choice(next_step=Fail("create_solution_version_failed"))


In [None]:
datasetimportjob_succeed_state = Succeed(
 state_id="The Solution Version ready?"
)

In [None]:
datasetimportjob_choice_state = Choice(
 state_id="Is the DataSet Import Job ready?"
)

In [None]:
datasetimportjob_choice_state.add_choice(
 rule=ChoiceRule.StringEquals(variable=lambda_state_datasetimportjob_status.output()['Payload']['status'], value='ACTIVE'),
 next_step=datasetimportjob_succeed_state 
)
datasetimportjob_choice_state.add_choice(
 ChoiceRule.StringEquals(variable=lambda_state_datasetimportjob_status.output()['Payload']['status'], value='CREATE PENDING'),
 next_step=wait_state_datasetimportjob
)
datasetimportjob_choice_state.add_choice(
 ChoiceRule.StringEquals(variable=lambda_state_datasetimportjob_status.output()['Payload']['status'], value='CREATE IN_PROGRESS'),
 next_step=wait_state_datasetimportjob
)


datasetimportjob_choice_state.default_choice(next_step=Fail("dataset_import_job_failed"))


In [None]:
datasetgroupstatus_choice_state = Choice(
 state_id="Is the DataSetGroup ready?"
)

## Workflow

### Define Workflow

In the following cell, you will define the step that you will use in our workflow. Then you will create, visualize and execute the workflow. 

Steps relate to states in AWS Step Functions. For more information, see [States](https://docs.aws.amazon.com/step-functions/latest/dg/concepts-states.html) in the *AWS Step Functions Developer Guide*. For more information on the AWS Step Functions Data Science SDK APIs, see: https://aws-step-functions-data-science-sdk.readthedocs.io. 




### Dataset workflow

In [None]:
Dataset_workflow_definition=Chain([lambda_state_schema,
 wait_state_schema,
 lambda_state_datasetgroup,
 wait_state_datasetgroup,
 lambda_state_datasetgroupstatus
 ])

In [None]:
Dataset_workflow = Workflow(
 name="Dataset-workflow",
 definition=Dataset_workflow_definition,
 role=workflow_execution_role
)

In [None]:
Dataset_workflow.render_graph()

In [None]:
DatasetWorkflowArn = Dataset_workflow.create()

### DatasetImportWorkflow

In [None]:
DatasetImport_workflow_definition=Chain([lambda_state_createdataset,
 wait_state_dataset,
 lambda_state_datasetimportjob,
 wait_state_datasetimportjob,
 lambda_state_datasetimportjob_status,
 datasetimportjob_choice_state
 ])

In [None]:
DatasetImport_workflow = Workflow(
 name="DatasetImport-workflow",
 definition=DatasetImport_workflow_definition,
 role=workflow_execution_role
)

In [None]:
DatasetImport_workflow.render_graph()

In [None]:
DatasetImportflowArn = DatasetImport_workflow.create()

Recepie and Solution workflow

In [None]:
Create_receipe_sol_workflow_definition=Chain([lambda_state_select_receipe_create_solution,
 wait_state_receipe,
 lambda_create_solution_version,
 wait_state_solutionversion,
 lambda_state_solutionversion_status,
 solutionversion_choice_state
 ])

In [None]:
Create_receipe_sol_workflow = Workflow(
 name="Create_receipe_sol-workflow",
 definition=Create_receipe_sol_workflow_definition,
 role=workflow_execution_role
)

In [None]:
Create_receipe_sol_workflow.render_graph()

In [None]:
CreateReceipeArn = Create_receipe_sol_workflow.create()

Create Campaign Workflow

In [None]:
Create_Campaign_workflow_definition=Chain([lambda_create_campaign,
 wait_state_campaign,
 lambda_state_campaign_status,
 wait_state_datasetimportjob,
 create_campaign_choice_state
 ])

In [None]:
Campaign_workflow = Workflow(
 name="Campaign-workflow",
 definition=Create_Campaign_workflow_definition,
 role=workflow_execution_role
)

In [None]:
Campaign_workflow.render_graph()

In [None]:
CreateCampaignArn = Campaign_workflow.create()

Main workflow

In [None]:
call_dataset_workflow_state = Task(
 state_id="DataSetWorkflow",
 resource="arn:aws:states:::states:startExecution.sync:2",
 parameters={
 "Input": "true",
 #"StateMachineArn": "arn:aws:states:us-east-1:444602785259:stateMachine:Dataset-workflow",
 "StateMachineArn": DatasetWorkflowArn
 }
)

In [None]:
call_datasetImport_workflow_state = Task(
 state_id="DataSetImportWorkflow",
 resource="arn:aws:states:::states:startExecution.sync:2",
 parameters={
 "Input":{
 "schemaArn.$": "$.Output.Payload.schemaArn",
 "datasetGroupArn.$": "$.Output.Payload.datasetGroupArn"
 },
 "StateMachineArn": DatasetImportflowArn,
 }
)

In [None]:
call_receipe_solution_workflow_state = Task(
 state_id="ReceipeSolutionWorkflow",
 resource="arn:aws:states:::states:startExecution.sync:2",
 parameters={
 "Input":{
 "datasetGroupArn.$": "$.Output.Payload.datasetGroupArn"

 },
 "StateMachineArn": CreateReceipeArn
 }
)

In [None]:
call_campaign_solution_workflow_state = Task(
 state_id="CampaignWorkflow",
 resource="arn:aws:states:::states:startExecution.sync:2",
 parameters={
 "Input":{
 "solution_version_arn.$": "$.Output.Payload.solution_version_arn"

 },
 "StateMachineArn": CreateCampaignArn
 }
)

In [None]:
Main_workflow_definition=Chain([call_dataset_workflow_state,
 call_datasetImport_workflow_state,
 call_receipe_solution_workflow_state,
 call_campaign_solution_workflow_state
 ])

In [None]:
Main_workflow = Workflow(
 name="Main-workflow",
 definition=Main_workflow_definition,
 role=workflow_execution_role
)

In [None]:
Main_workflow.render_graph()

In [None]:
Main_workflow.create()

In [None]:
Main_workflow_execution = Main_workflow.execute()

Main_workflow_execution = Workflow(
 name="Campaign_Workflow",
 definition=path1,
 role=workflow_execution_role
)


In [None]:
#Main_workflow_execution.render_graph()

### Create and execute the workflow

In the next cells, we will create the branching happy workflow in AWS Step Functions with [create](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/workflow.html#stepfunctions.workflow.Workflow.create) and execute it with [execute](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/workflow.html#stepfunctions.workflow.Workflow.execute).


In [None]:
#personalize_workflow.create()

In [None]:
#personalize_workflow_execution = happy_workflow.execute()

### Review the workflow progress

Review the workflow progress with the [render_progress](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/workflow.html#stepfunctions.workflow.Execution.render_progress).

Review the execution history by calling [list_events](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/workflow.html#stepfunctions.workflow.Execution.list_events) to list all events in the workflow execution.

In [None]:
Main_workflow_execution.render_progress()

In [None]:
Main_workflow_execution.list_events(html=True)

## Generate-Recommendations

### Now that we have a successful campaign, let's generate recommendations for the campaign

#### Select a User and an Item

In [None]:
items = pd.read_csv('./ml-100k/u.item', sep='|', usecols=[0,1], encoding='latin-1')
items.columns = ['ITEM_ID', 'TITLE']


user_id, item_id, rating, timestamp = data.sample().values[0]

user_id = int(user_id)
item_id = int(item_id)

print("user_id",user_id)
print("items",items)


item_title = items.loc[items['ITEM_ID'] == item_id].values[0][-1]
print("USER: {}".format(user_id))
print("ITEM: {}".format(item_title))
print("ITEM ID: {}".format(item_id))


In [None]:
wait_recommendations = Wait(
 state_id="Wait for recommendations - 10 secs",
 seconds=10
)

#### Lambda Task

In [None]:
lambda_state_get_recommendations = LambdaStep(
 state_id="get recommendations",
 parameters={ 
 "FunctionName": "stepfunction_getRecommendations", 
 "Payload": { 
 "campaign_arn": 'arn:aws:personalize:us-east-1:261602857181:campaign/stepfunction-campaign', 
 "user_id": user_id, 
 "item_id": item_id 
 }
 },
 result_path = '$'
)

lambda_state_get_recommendations.add_retry(Retry(
 error_equals=["States.TaskFailed"],
 interval_seconds=5,
 max_attempts=1,
 backoff_rate=4.0
))

lambda_state_get_recommendations.add_catch(Catch(
 error_equals=["States.TaskFailed"],
 next_step=Fail("GetRecommendationTaskFailed")
 #next_step=recommendation_path 
))

#### Create a Succeed State

In [None]:
workflow_complete = Succeed("WorkflowComplete")

In [None]:
recommendation_path = Chain([ 
lambda_state_get_recommendations,
wait_recommendations,
workflow_complete
])

### Define, Create, Render, and Execute Recommendation Workflow

In the next cells, we will create a workflow in AWS Step Functions with [create](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/workflow.html#stepfunctions.workflow.Workflow.create) and execute it with [execute](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/workflow.html#stepfunctions.workflow.Workflow.execute).

In [None]:
recommendation_workflow = Workflow(
 name="Recommendation_Workflow4",
 definition=recommendation_path,
 role=workflow_execution_role
)



In [None]:
recommendation_workflow.render_graph()

In [None]:
recommendation_workflow.create()

In [None]:
recommendation_workflow_execution = recommendation_workflow.execute()

### Review progress

Review workflow progress with the [render_progress](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/workflow.html#stepfunctions.workflow.Execution.render_progress).

Review execution history by calling [list_events](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/workflow.html#stepfunctions.workflow.Execution.list_events) to list all events in the workflow execution.

In [None]:
recommendation_workflow_execution.render_progress()

In [None]:
recommendation_workflow_execution.list_events(html=True)


In [None]:
item_list = recommendation_workflow_execution.get_output()['Payload']['item_list']

### Get Recommendations

In [None]:
item_list = recommendation_workflow_execution.get_output()['Payload']['item_list']

print("Recommendations:")
for item in item_list:
 np.int(item['itemId'])
 item_title = items.loc[items['ITEM_ID'] == np.int(item['itemId'])].values[0][-1]
 print(item_title)


## Clean up Amazon Personalize resources

Make sure to clean up the Amazon Personalize and the state machines created blog. Login to Amazon Personalize console and delete resources such as Dataset Groups, Dataset, Solutions, Receipts, and Campaign. 

## Clean up State Machine resources

In [None]:
Campaign_workflow.delete()

recommendation_workflow.delete()

Main_workflow.delete()

Create_receipe_sol_workflow.delete()

DatasetImport_workflow.delete()

Dataset_workflow.delete()
