{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "# Getting Started\n", "\n", "ML Ops is gaining a lot of popularity. This example showcases a key piece you can use to construct your automation pipeline. As we can see in the following architecture diagram, you will be deploying an AWS Step Funciton Workflow containing AWS Lambda functions that call Amazon S3, Amazon Personalize, and Amazon SNS APIs.\n", "\n", "\n", "This package contains the source code of a Step Functions pipeline that is able to perform \n", "multiple actions within **Amazon Personalize**, including the following:\n", "\n", "- Dataset Group creation\n", "- Datasets creation and import\n", "- Solution creation\n", "- Solution version creation\n", "- Campaign creation\n", "\n", "**Note**: This notebook is an example of a [Custom Dataset Group and associated resources](https://docs.aws.amazon.com/personalize/latest/dg/custom-dataset-groups.html), please refer to the documentation for more information on [Domain Dataset Groups and Recommenders](https://docs.aws.amazon.com/personalize/latest/dg/domain-dataset-groups.html).\n", "\n", "Once the steps are completed, the step functions notifies the users of its completion through the\n", "use of an SNS topic.\n", "\n", "The below diagram describes the architecture of the solution:\n", "\n", "![Architecture Diagram](../../static/imgs/ml_ops_architecture.png)\n", "\n", "The below diagram showcases the StepFunction workflow definition:\n", "\n", "![stepfunction definition](../../static/imgs/step_functions.png)\n", "\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Uploading data" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's get the bucket that our cloudformation deployed. We will be uploading our data to this bucket, plus the configuration file to trigger the automation" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "bucket = !aws cloudformation describe-stacks --stack-name id-ml-ops --query \"Stacks[0].Outputs[?OutputKey=='InputBucketName'].OutputValue\" --output text\n", "bucket_name = bucket[0]\n", "print(bucket_name)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now that we have the bucket name, lets copy over our Media data so we can explore and upload to S3" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!cp -R /home/ec2-user/SageMaker/amazon-personalize-immersion-day/automation/ml_ops/domain/Media ./example" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Import Dependencies\n", "\n", "import boto3\n", "import json\n", "import pandas as pd\n", "import numpy as np\n", "import seaborn as sns\n", "import matplotlib.pyplot as plt\n", "import time\n", "import requests\n", "import csv\n", "import sys\n", "import botocore\n", "import uuid\n", "from collections import defaultdict\n", "import random\n", "import numpy as np\n", "\n", "from packaging import version\n", "from botocore.exceptions import ClientError\n", "from pathlib import Path\n", "\n", "%matplotlib inline\n", "\n", "# Setup Clients\n", "\n", "personalize = boto3.client('personalize')\n", "personalize_runtime = boto3.client('personalize-runtime')\n", "personalize_events = boto3.client('personalize-events')\n", "\n", "# We will upload our training data in these files:\n", "raw_items_filename = \"example/data/Items/items.csv\" # Do Not Change\n", "raw_users_filename = \"example/data/Users/users.csv\" # Do Not Change\n", "raw_interactions_filename = \"example/data/Interactions/interactions.csv\" # Do Not Change\n", "items_filename = \"items.csv\" # Do Not Change\n", "users_filename = \"users.csv\" # Do Not Change\n", "interactions_filename = \"interactions.csv\" # Do Not Change\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "interactions_df = pd.read_csv(raw_interactions_filename)\n", "interactions_df.head()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "There are 2 ways of uploading your datasets to S3:\n", "1. Using the boto3 SDK\n", "1. Using the CLI\n", "\n", "In this example we are going to use the CLI command" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!aws s3 sync ./example/data s3://$bucket_name" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Starting the State Machine Execution" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In order to execute the MLOps pipeline we need to provide a parameters file that will tell our state machine which names and configurations we want in our Amazon Personalize deployment.\n", "\n", "Let's create a parameters.json file and define our Amazon Personalize resources we want our MLOps pipeline to deploy" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "params = {\n", " \"datasetGroup\": {\n", " \"name\": \"AP-ML-Ops-1\"\n", " },\n", " \"datasets\": {\n", " \"Interactions\": {\n", " \"name\": \"InteractionsDataset\",\n", " \"schema\": {\n", " \"fields\": [\n", " {\n", " \"name\": \"USER_ID\",\n", " \"type\": \"string\"\n", " },\n", " {\n", " \"name\": \"ITEM_ID\",\n", " \"type\": \"string\"\n", " },\n", " {\n", " \"name\": \"EVENT_TYPE\",\n", " \"type\": \"string\"\n", " },\n", " {\n", " \"name\": \"TIMESTAMP\",\n", " \"type\": \"long\"\n", " }\n", " ],\n", " \"name\": \"Interactions\",\n", " \"namespace\": \"com.amazonaws.personalize.schema\",\n", " \"type\": \"record\",\n", " \"version\": \"1.0\"\n", " }\n", " },\n", " \"Items\": {\n", " \"name\": \"ItemsDataset\",\n", " \"schema\": {\n", " \"fields\": [\n", " {\n", " \"name\": \"ITEM_ID\",\n", " \"type\": \"string\"\n", " },\n", " {\n", " \"categorical\": True,\n", " \"name\": \"GENRES\",\n", " \"type\": \"string\"\n", " },\n", " {\n", " \"name\": \"YEAR\",\n", " \"type\": \"int\"\n", " }\n", " ],\n", " \"name\": \"Items\",\n", " \"namespace\": \"com.amazonaws.personalize.schema\",\n", " \"type\": \"record\",\n", " \"version\": \"1.0\"\n", " }\n", " }\n", " },\n", " \"solutions\": {\n", " \"sims\": {\n", " \"name\": \"na-simsCampaign-1\",\n", " \"recipeArn\": \"arn:aws:personalize:::recipe/aws-sims\"\n", " }\n", " },\n", " \"campaigns\": {\n", " \"simsCampaign\": {\n", " \"minProvisionedTPS\": 1,\n", " \"name\": \"na-simsCampaign-1\"\n", " }\n", " },\n", " \"eventTracker\": {\n", " \"name\": \"AutomationImmersionDayEventTracker-1\"\n", " }\n", "}" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(json.dumps(params, indent=4, sort_keys=True))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This parameters file will create a dataset group containing a campaign exposing a solution trained with the user-personalization recipe" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Updating and uploading your parameters file to S3" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "First let's write the file locally" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "with open('example/params.json', 'w') as outfile:\n", " json.dump(params, outfile)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now we can upload this file to S3, we are going to be using the CLI to do so" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!aws s3 cp ./example/params.json s3://$bucket_name" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Validating your MLOps pipeline\n", "\n", "Lets take a look at the stepfunctions execution." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "client = boto3.client('stepfunctions')\n", "stateMachineArn = !aws cloudformation describe-stacks --stack-name id-ml-ops --query \"Stacks[0].Outputs[?OutputKey=='DeployStateMachineArn'].OutputValue\" --output text\n", "stateMachineArn= stateMachineArn[0]\n", "stateMachineArn" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "executions_response = client.list_executions(\n", " stateMachineArn=stateMachineArn,\n", " statusFilter='RUNNING',\n", " maxResults=2\n", ")\n", "print(json.dumps(executions_response, indent=4, sort_keys=True, default=str))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This step will take at least 30 minutes to complete. \n", "\n", "You can check the status of the state machine execution in the console by:\n", "\n", "1. Navigate to the [Step Functions console](https://console.aws.amazon.com/states/home). \n", "\n", "\n", "2. Click on the number **1** under the **Running** column\n", "\n", "![stepfunction definition](../../static/imgs/step_functions_console.png)\n", "\n", "3. Click on the **current execution** that is named after the date\n", "\n", "![stepfunction definition](../../static/imgs/step_functions_console_execution.png)\n", "\n", "4. Here you can see which steps are currently executing highlighted in blue\n", "\n", "![stepfunction definition](../../static/imgs/step_functions_in_progress.png)\n", "\n", "\n", "This example step function definition will automatically retry each step by querying the describe service APIs with a backoff rate of 1.5, in each retry a new lambda function is executed looking for a success or a failure of a given step.\n", "\n", "These step functions will take around 20 minutes to finish executing, which includes importing the datasets, trainign a SIMS solution, and deploying a campaing. **Note:** we are only training a SIMS model due to time constrains.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "while ( len(client.list_executions(\n", " stateMachineArn=stateMachineArn,\n", " statusFilter='RUNNING',\n", " maxResults=2\n", " )['executions']) > 0):\n", " print ('State Machine is running...')\n", " time.sleep(60)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Let's look at the succeeded execution\n", "\n", "Once your step functions are done executing, you can list the executions and describe them" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "executions_response = client.list_executions(\n", " stateMachineArn=stateMachineArn,\n", " statusFilter='SUCCEEDED',\n", " maxResults=2\n", ")\n", "print(json.dumps(executions_response, indent=4, sort_keys=True, default=str))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "You can validate your Amazon Personalize deployment by navigating to the [Service Console](https://console.aws.amazon.com/personalize/home) and looking for the dataset group called **AP-ML-Ops-1**" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Let's look at the input that was delivered to the State Machine" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "As we can see below, this is the input from our Parameters file we uploaded to S3. This input json was then passed to lambda functions in the state machine to utilize across Amazon Personalize APIs" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "describe_executions_response = client.describe_execution(\n", " executionArn=executions_response['executions'][0]['executionArn']\n", ")\n", "print(json.dumps(json.loads(describe_executions_response['input']), indent=4, sort_keys=True, default=str))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Let's look at the time stamps" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "As we can see below, this is the input from our Parameters file we uploaded to S3. This input json was then passed to lambda functions in the state machine to utilize across Amazon Personalize APIs" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(\"Start Date:\")\n", "print(json.dumps(describe_executions_response['startDate'], indent=4, sort_keys=True, default=str))\n", "print(\"Stop Date:\")\n", "print(json.dumps(describe_executions_response['stopDate'], indent=4, sort_keys=True, default=str))\n", "print(\"Elapsed Time: \")\n", "elapsed_time = describe_executions_response['stopDate'] - describe_executions_response['startDate']\n", "print(elapsed_time)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "As we see above, the whole process does take a significant ammount of time, but now all the steps are fully automated!\n", "\n", "If you are interested in deploying this example in your environment, visit our [Github Samples Page](https://github.com/aws-samples/amazon-personalize-samples/tree/master/next_steps/operations/ml_ops) to download the latest codebase." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "conda_python3", "language": "python", "name": "conda_python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.13" } }, "nbformat": 4, "nbformat_minor": 4 }