{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Integrate Amazon Lookout for Metrics with Amazon Augmented AI (A2I)\n", "\n", "Amazon Lookout for Metrics can help you identify anomalies within your data metrics that you gather on a periodic basis. In this notebook we show how to pass the anomalous results for human review and use the feedback for improving model accuracy. \n", "\n", "We are extending the example usecase for Amazon Lookout for Metrics that was discussed in an [earlier blog](https://aws.amazon.com/blogs/machine-learning/introducing-amazon-lookout-for-metrics-an-anomaly-detection-service-to-proactively-monitor-the-health-of-your-business/) and integrating it with Amazon A2I.\n", "\n", "\n", "## Workflow\n", "\n", "1. Create a detector and configure its detection properties.\n", "2. Create a Metric Set:\n", " 1. Provide the location of your source data and the IAM permissions needed to access it. \n", " 1. Define the metrics that you want to investigate.\n", " 1. Attach the dataset to your Detector.\n", "3. Activate the detector.\n", "4. Pass the detected outliers to a human work team for review.\n", "6. Provide feedback on the outliers to improve predictor model's accuracy.\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "### 1.0. Prerequisites\n", "\n", "1. The code uses Python 3.7. Please use the Python 3 (Data Science) kernel for this notebook.\n", "2. If you need this Notebook to create an IAM role for Lookout For Metrics, SageMaker will need permissions to create the IAM role. You can find the IAM role used by SageMaker Studio in the [SageMaker Studio Control Panel](https://docs.aws.amazon.com/sagemaker/latest/dg/onboard-quick-start.html). \n", "\n", "Note: It is OK to encounter the following error in the output of next cell.\n", "
ERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.\n", "awscli 1.19.47 requires botocore==1.20.47, but you have botocore 1.20.91 which is incompatible.\n", "awscli 1.19.47 requires s3transfer<0.4.0,>=0.3.0, but you have s3transfer 0.4.2 which is incompatible.\n", "aiobotocore 1.2.2 requires botocore<1.19.53,>=1.19.52, but you have botocore 1.20.91 which is incompatible.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# First, let's get the latest versions of our dependencies\n", "## IGNORE ANY ERRORS ##\n", "!pip install --upgrade pip\n", "!pip install botocore --upgrade\n", "!pip install boto3 --upgrade\n", "!pip install -U botocore" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import time\n", "import os\n", "import zipfile\n", "import shutil\n", "import datetime\n", "import pprint\n", "import json\n", "import uuid\n", "import re\n", "import texttable as tt\n", "\n", "import boto3\n", "import pandas as pd\n", "import numpy as np\n", "import sagemaker\n", "import botocore\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "### 1.1. Create S3 bucket\n", "\n", "Create an S3 bucket where we will upload data for Amazon Lookout for Metrics. The default bucket name uses the following format: `--lookoutmetrics-a2i` \n", " \n", "You can change the bucket name in the following cell if needed. \n", "The bucket is created only if it does not already exist.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "region = boto3.Session().region_name\n", "\n", "account_id = boto3.client('sts').get_caller_identity().get('Account')\n", "\n", "bucket_name = account_id + \"-\" + region + \"-lookoutmetrics-a2i\"" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Create the bucket of it does not exist\n", "\n", "s3 = boto3.resource('s3')\n", "exists = True\n", "if s3.Bucket(bucket_name).creation_date is None:\n", " exists = False\n", "\n", "if not exists:\n", " try:\n", " if region == 'us-east-1':\n", " s3.create_bucket(Bucket=bucket_name)\n", " else: \n", " s3.create_bucket(Bucket=bucket_name, CreateBucketConfiguration={ 'LocationConstraint': region })\n", " print('S3 bucket {} created successfully'.format(bucket_name))\n", " except Exception as e:\n", " print('S3 error: ', e)\n", "else: \n", " print(\"S3 Bucket: {} already exists.\".format(bucket_name))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "### 1.2. Configure IAM Role\n", "\n", "Create an IAM role that will be assumed by Amazon Lookout for Metrics service and will allow it to communicate with S3. The default name of the IAM role is `L4M_iam_role`. \n", "\n", "You can change the default name in the following cell, if needed. The role is created only if it does not already exist.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# The following IAM role will be created if it does not exist.\n", "\n", "role_name = \"L4M_iam_role\"" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "iam = boto3.client(\"iam\")\n", "\n", "assume_role_policy_document = {\n", " \"Version\": \"2012-10-17\",\n", " \"Statement\": [\n", " {\n", " \"Effect\": \"Allow\",\n", " \"Principal\": {\n", " \"Service\": \"lookoutmetrics.amazonaws.com\"\n", " },\n", " \"Action\": \"sts:AssumeRole\"\n", " }\n", " ]\n", "}\n", "\n", "try:\n", " create_role_response = iam.create_role(\n", " RoleName = role_name,\n", " AssumeRolePolicyDocument = json.dumps(assume_role_policy_document)\n", " )\n", " role_arn = create_role_response[\"Role\"][\"Arn\"]\n", " \n", " print(\"Created %s\" % role_name)\n", " print(\"Attaching policies\")\n", "\n", " iam.attach_role_policy(\n", " RoleName=role_name,\n", " PolicyArn='arn:aws:iam::aws:policy/AmazonS3FullAccess',\n", " )\n", " print(\"Waiting for a minute to allow IAM role policy attachment to propagate\")\n", " time.sleep(60)\n", "\n", "\n", "except iam.exceptions.EntityAlreadyExistsException:\n", " print(\"Role %s already exists\" % role_name )\n", " role_arn = boto3.resource('iam').Role(role_name).arn\n", "\n", "print(role_arn)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "### 2.0. Generate synthetic data\n", "\n", "We will generate data both for training the detector and for prediction of anomalies. In this example the detector will be used in continuous mode once every hour to detect anomalies. Starting from the current date when this notebook is run, we generate data for a default period of 6 months in the past and 3 days in the future. The period is configurable through constants defined in the next cell.\n", "\n", "The historical data is used for training the model, while the future data is used for predicting anomalies on an ongoing basis.\n", "\n", "* Historical data is created in a csv file called `./data/ecommerce/backtest/input.csv`\n", "* Hourly data files are stored with the naming format as `./data/ecommerce/live/{yyyyMMdd}/{HH:mm}/{yyyyMMdd_HH:mm:ss}.csv`\n", "* Complete data along with the anomaly labels is available in `/data/ecommerce/label.csv`\n", "\n", "The contents of local folder `./data` are replaced based on current date, on every execution of this section.\n", "\n", "*Note: Synthetic data generation may take about 4 minutes.*" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# local folders for data\n", "DATASET_NAME = \"ecommerce\"\n", "DIR_PATH = './data'\n", "\n", "#######################################################################\n", "# Set constants for the duration of historical and future periods for data generation.\n", "NUM_MONTHS_HISTORICAL_DATA = 6\n", "NUM_DAYS_FUTURE_DATA = 3\n", "\n", "# Metrics will be received at the top of every hour.\n", "FREQUENCY = \"PT1H\" # one of 'P1D', 'PT1H', 'PT10M' and 'PT5M'\n", "#######################################################################\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "### 2.1. Create and save data locally\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "\n", "import os\n", "import math\n", "import random\n", "import itertools\n", "import datetime\n", "from datetime import date\n", "from dateutil.relativedelta import relativedelta\n", "import shutil\n", "import pandas as pd\n", "\n", "dimensions = { \"platform\" : [ \"pc_web\", \"mobile_web\", \"mobile_app\" ], \"marketplace\" : [ \"us\", \"uk\", \"de\", \"fr\", \"es\", \"it\", \"jp\" ] }\n", "metrics = [ \"views\", \"revenue\" ]\n", "\n", "metric_period = \"1H\"\n", "\n", "daily_peak_size_range = ( 200, 400 )\n", "daily_peak_time = ( 12 * 60, 21 * 60 )\n", "daily_offset_range = ( 100, 200 )\n", "\n", "random_factor_size_range = (2, 10)\n", "\n", "anomaly_size_range = ( 100, 600 )\n", "anomaly_length_range = ( 1, 5 * 60 )\n", "anomaly_possibility = 0.005\n", "#anomaly_possibility = 0.2\n", "\n", "introduce_metric_from_upstream = [\n", " lambda x : max( int(x), 0 ), # sin curve -> views \n", " lambda x : x * 0.3, # views -> revenue\n", "]\n", "\n", "random.seed(1234)\n", "\n", "class DailyPattern:\n", " \n", " def __init__( self ):\n", " self.peak_size = random.uniform( *daily_peak_size_range )\n", " self.peak_time = random.uniform( *daily_peak_time )\n", " self.offset = random.uniform( *daily_offset_range )\n", " \n", " def get( self, t ):\n", " \n", " minutes_in_day = t.hour * 60 + t.minute\n", " \n", " factor1 = math.cos( (( minutes_in_day - self.peak_time ) / ( 24 * 60 )) * 2 * math.pi ) * self.peak_size + self.peak_size + self.offset\n", " \n", " return factor1\n", "\n", "class RandomFactor:\n", " \n", " def __init__( self ):\n", " self.size = random.uniform( *random_factor_size_range )\n", "\n", " def get(self):\n", " return random.uniform( -self.size, self.size )\n", "\n", "\n", "class Anomaly:\n", " \n", " def __init__(self):\n", " self.remaining_time = random.randint( *anomaly_length_range )\n", " self.offset = random.uniform( *anomaly_size_range ) * (random.randint(0,1)*2-1)\n", " #print( self.offset )\n", "\n", " def proceed_time(self):\n", " self.remaining_time -= pd.to_timedelta(metric_period).seconds / 60\n", " return self.remaining_time <= 0\n", "\n", " def get(self):\n", " return self.offset\n", "\n", "class Item:\n", "\n", " def __init__( self, dimension ):\n", " \n", " #print( dimension )\n", " \n", " self.dimension = dimension\n", " \n", " self.daily_pattern = DailyPattern()\n", " self.random_factor = RandomFactor()\n", " self.anomaly = None\n", " \n", " def get( self, t ):\n", " \n", " if random.random() < anomaly_possibility:\n", " self.anomaly = Anomaly()\n", " \n", " value = self.daily_pattern.get(t)\n", " \n", " value += self.random_factor.get()\n", "\n", " is_anomaly = bool(self.anomaly)\n", " if self.anomaly:\n", " value += self.anomaly.get()\n", " if self.anomaly.proceed_time():\n", " self.anomaly = None\n", " \n", " metric_values = []\n", " for i, metric in enumerate(metrics):\n", " value = introduce_metric_from_upstream[i](value)\n", " metric_values.append(value)\n", " \n", " #if (is_anomaly):\n", " #print (metric_values, is_anomaly)\n", " \n", " return metric_values, is_anomaly\n", "\n", "\n", "def synthesize(period):\n", "\n", " # create item list\n", " item_list = []\n", " for dimension_values in itertools.product( *dimensions.values() ):\n", " item = Item( dict( zip( dimensions.keys(), dimension_values ) ) )\n", " item_list.append(item)\n", " \n", " # itereate and prepare data \n", " dimension_values_list = []\n", " for i in range( len(dimensions) ):\n", " dimension_values_list.append([])\n", "\n", " timestamp_list = []\n", "\n", " metric_values_list = []\n", " for i, metric in enumerate(metrics):\n", " metric_values_list.append([])\n", "\n", " labels_list = []\n", " for i, metric in enumerate(metrics):\n", " labels_list.append([])\n", " \n", " t = period[0]\n", " while t 0:\n", " df_anomalies_by_ts = pd.concat(df_anomalies_list)\n", "\n", " # fold multiple metrics into same rows\n", " df_anomalies_by_ts = df_anomalies_by_ts.groupby([\"timestamp\", *dimension_names_set], as_index=False).max() \n", " print(\"Anomalies saved in dataframe.\")\n", "else:\n", " print(\"No anomalies found.\")\n", "\n", "print(\"\\ndf_anomalies_by_ts:\\n\" + str(df_anomalies_by_ts))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pd.set_option('display.max_columns', None)\n", "df_anomalies_by_ts.head()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "### 5.2. Export the Results\n", "\n", "Create a CSV file with anomaly results." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "anomalies_filename = ecom_anomaly_detector_arn.split(':')[-1] + \"-anomalies.csv\"\n", "\n", "df_anomalies_by_ts.to_csv(anomalies_filename, index=False )\n", "print(anomalies_filename)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "### 6.0. Create a private Workteam in SageMaker Console\n", "\n", "The instructions for creating a private workteam can be [found here](https://docs.aws.amazon.com/sagemaker/latest/dg/sms-workforce-create-private-console.html). \n", "\n", "When you create a workteam in the AWS Console, it is automatically integrated with Amazon Cognito for secure authentication. After creating the private workteam add yourself as a member of the workteam.\n", "\n", "Copy the the ARN of the workteam created through the console and update it in the next cell.\n", "\n", "The ARN of the workteam will have the following format:
\n", "*arn:aws:sagemaker:\\:\\:workteam/private-crowd/\\*\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "workteam_ARN = '' " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "while workteam_ARN == '':\n", " workteam_ARN = input(\"Please enter the ARN of the Work Team:\\n\")\n", " if len(workteam_ARN) > 0:\n", " break\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "\n", "# Flow definition name - this value is unique per account and region. You can also provide your own value here.\n", "l4m_flowDefinitionName = 'l4m-ecommerce-workflow'\n", "\n", "# Task UI name - this value is unique per account and region. You can also provide your own value here.\n", "l4m_taskUIName = 'l4m-ecommerce-ui'\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "### 6.1. Create a human task UI \n", "\n", "In the next cell, we define and create a custom task template using HTML that will be presented to the workers. It uses Crowd HTML web components, a web standard that abstracts HTML markup, CSS, and JavaScript functionality into an HTML tag or set of tags. \n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "\n", "# We customized the tabular template for our notebook as below\n", "ecom_a2i_template = r\"\"\"\n", "\n", "\n", "\n", "\n", "\n", "
\n", "

Ecommerce Revenue and Views by Platform and Market

\n", "
\n", "\n", "
\n", "

Instructions

\n", "

The following entries were identified as anomalies.
\n", " Please review the views and revenue scores for the platform and market place.
\n", " Check the radio button to confirm whether it was an anomaly.
\n", " Please enter any optional comments for the anomaly.\n", "

\n", "
\n", "
\n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " {% for entry in task.input.l4m_ecom_anomaly %}\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " {% endfor %}\n", "
TimestampPlatformMarket PlaceAnomaly metricmetric valueAnomaly?Comment
Anomaly Found\n", "
\n", "
\n", "
\n", "
\n", "
\n", "\n", "\n", "\n", "\n", "\"\"\"\n", "\n", "sagemaker_client = boto3.client('sagemaker')\n", "\n", "# Check if lookout for metrics UI for this example already exists\n", "try:\n", " describe_human_task_ui_response = sagemaker_client.describe_human_task_ui(\n", " HumanTaskUiName=l4m_taskUIName\n", " )\n", " print(\"\\nDescribe human task UI: \")\n", " pprint.pprint(describe_human_task_ui_response, width = 2)\n", " \n", "except:\n", " print(\"Human task UI {} not found\")\n", " describe_human_task_ui_response = {}\n", "\n", "if not describe_human_task_ui_response:\n", " # Create the human task UI\n", " create_human_task_ui_response = sagemaker_client.create_human_task_ui(\n", " HumanTaskUiName=l4m_taskUIName,\n", " UiTemplate={'Content': ecom_a2i_template}) \n", "\n", " print(\"\\nCreate human task ui response: \")\n", " pprint.pprint(create_human_task_ui_response, width = 2)\n", "\n", " l4m_review_ui_arn = create_human_task_ui_response['HumanTaskUiArn']\n", "else:\n", " l4m_review_ui_arn = describe_human_task_ui_response['HumanTaskUiArn'] \n", " \n", "print(\"\\nHuman task UI ARN: {}\".format(l4m_review_ui_arn))\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "### 6.2. Create a Human task Workflow \n", "\n", "We use Amazon Augmented AI's user interface to create a custom task workflow. The new workflow is created only if one does not exist already with the same name. The results of human review are stored in the Amazon S3 bucket created above. \n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sagemaker_role_arn = sagemaker.get_execution_role()\n", "\n", "s3_output_path = f's3://' + bucket_name + '/ecommerce' + '/a2i-results'\n", "print(\"S3 output path: {}\".format(s3_output_path))\n", "\n", "# Check if Amazon Lookout For Metrics Workflow exists\n", "try:\n", " describe_flow_definition_response = sagemaker_client.describe_flow_definition(\n", " FlowDefinitionName=l4m_flowDefinitionName\n", " )\n", " ###### print describe_flow_definition_response\n", " #print(\"\\nDescribe flow definition response: \")\n", " #pprint.pprint(describe_flow_definition_response, width=2)\n", " \n", "except:\n", " describe_flow_definition_response = {}\n", " \n", "# Create Amazon Lookout For Metrics Workflow if it does not exist already\n", "\n", "if not describe_flow_definition_response:\n", " create_workflow_definition_response = sagemaker_client.create_flow_definition(\n", " FlowDefinitionName = l4m_flowDefinitionName,\n", " RoleArn=sagemaker_role_arn,\n", " HumanLoopConfig= {\n", " \"WorkteamArn\": workteam_ARN,\n", " \"HumanTaskUiArn\": l4m_review_ui_arn,\n", " \"TaskCount\": 1,\n", " \"TaskDescription\": \"Review the anomalies detected by Amazon Lookout for Metrics\",\n", " \"TaskTitle\": \"Ecommerce Anomalies Review\"\n", " },\n", " OutputConfig={\n", " \"S3OutputPath\" : s3_output_path\n", " }\n", " )\n", " \n", " # Wait until the newly created flow becomes Active\n", " while True:\n", "\n", " response = sagemaker_client.describe_flow_definition(FlowDefinitionName=l4m_flowDefinitionName)\n", " print(response['FlowDefinitionStatus'])\n", " if (response['FlowDefinitionStatus'] == 'Active'):\n", " print(\"Flow Definition is active\")\n", " break\n", " time.sleep(5)\n", "\n", "\n", " flowDefinitionArn = create_workflow_definition_response['FlowDefinitionArn'] \n", "\n", "else:\n", " flowDefinitionArn = describe_flow_definition_response['FlowDefinitionArn'] \n", " \n", "\n", "print(\"Flow definition Arn: {}\".format(flowDefinitionArn))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Anomaly records were saved in the dataframe in a previous cell. \n", "# We had also saved the data in a csv file.\n", "\n", "if len(df_anomalies_by_ts) == 0:\n", " df_anomalies_by_ts = pd.read_csv(anomalies_filename)\n", "\n", "df_anomalies_by_ts.head()\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "Create a Python list object containing anomalies to pass into the human review task " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#tseriesid_list = ecom_anomalies_df['tseriesid'].astype(str).to_list()\n", "timestamp_list = df_anomalies_by_ts['timestamp'].astype(str).to_list()\n", "platform_list = df_anomalies_by_ts['platform'].astype(str).to_list()\n", "marketplace_list = df_anomalies_by_ts['marketplace'].astype(str).to_list()\n", "\n", "anomaly_metric_list = df_anomalies_by_ts['anomaly_metric'].astype(str).to_list()\n", "metric_value_list = df_anomalies_by_ts['anomaly_metric_value'].astype(str).to_list()\n", "\n", "for i in range(len(timestamp_list)):\n", " ecom_review_list = [ {'timestamp': timestamp_list[i], \\\n", " 'platform': platform_list[i], \\\n", " 'marketplace': marketplace_list[i], \\\n", " 'metric_name': anomaly_metric_list[i], \\\n", " 'metric_value': metric_value_list[i]} \\\n", " for i in range(len(timestamp_list))\n", "\n", " ]\n", " \n", "ip_content = {\"l4m_ecom_anomaly\": ecom_review_list} # passed into workflow\n", "\n", "ecom_review_list\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "### 6.3. Start the human review workflow. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "a2i_client = boto3.client('sagemaker-a2i-runtime')\n", "\n", "humanLoopName = str(uuid.uuid4())\n", "\n", "start_human_loop_response = a2i_client.start_human_loop(\n", " HumanLoopName=humanLoopName,\n", " FlowDefinitionArn=flowDefinitionArn,\n", " HumanLoopInput={\n", " \"InputContent\": json.dumps(ip_content)\n", " }\n", " )\n", "\n", "print(\"\\nStart human loop response: \")\n", "\n", "print(\"\\nHuman Loop ARN: {}\".format(start_human_loop_response['HumanLoopArn']))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "Get the workteam name from the workteam ARN that you provided above, to find the URL of the portal for providing feedback on anomalies.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "workteamName = workteam_ARN[workteam_ARN.rfind('/') + 1:]\n", "\n", "describe_workteam_response = sagemaker_client.describe_workteam(WorkteamName=workteamName)\n", "\n", "if not describe_workteam_response:\n", " print(\"You need to log into SageMaker console and create a Workteam\")\n", " sys.exit()\n", " \n", "workteam_portal = 'https://' + describe_workteam_response['Workteam']['SubDomain']\n", "\n", "print(\"\\nLog into the work team portal link provided below, and review the anomalies.\\n\")\n", "\n", "print (\"=\" * 60)\n", "print(\"Portal URL\\n{}\".format(workteam_portal))\n", "print ('=' * 60)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "### 6.4. Complete the review of anomalies\n", "\n", "The URL to the portal was printed out in the last step. Open the URL in a browser and log in with credentials of the human review worker.
\n", "You should have sent an invitation email to yourself for joining the workteam when creating the private workteam in the Amazon A2I console in an earlier step.\n", "\n", "Once you have completed the review, update the value of variable`review_complete` in the next cell to `Y`" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "review_complete = 'N'" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "while review_complete == 'N':\n", " \n", " review_complete = input(\"\\nPlease log into A2I Portal and complete the review.\\nIs the review complete (Y/N)?\")\n", " \n", " if review_complete == 'Y':\n", " break" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "completed_human_loops_s3_output = \"\"\n", "\n", "try:\n", " describe_human_loop_response = a2i_client.describe_human_loop(HumanLoopName=humanLoopName)\n", " print(\"\\nDescribe human loop response: \")\n", " pprint.pprint(describe_human_loop_response, width=2)\n", " \n", " completed_human_loops_s3_output = describe_human_loop_response[\"HumanLoopOutput\"][\"OutputS3Uri\"]\n", " print(\"HumanLoop Status: {}\".format(describe_human_loop_response[\"HumanLoopStatus\"]))\n", "except:\n", " print(\"Error getting human loop\")\n", "\n", "\n", "print(\"\\nOutput in S3 at: \\n{}\".format(describe_human_loop_response[\"HumanLoopOutput\"][\"OutputS3Uri\"]))\n", " " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pp = pprint.PrettyPrinter(indent=4)\n", "json_output = ''\n", "\n", "s3_bucket_name, s3_object_name = completed_human_loops_s3_output.replace(\"s3://\", \"\").split(\"/\", 1)\n", "\n", "print(\"S3 bucket name: {}\".format(s3_bucket_name))\n", "print(\"S3 object name: {}\".format(s3_object_name))\n", "\n", "# Amazon S3 client \n", "s3_client = boto3.client('s3')\n", "\n", "try:\n", " get_object_response = s3_client.get_object(Bucket=s3_bucket_name, Key=s3_object_name)\n", " content = get_object_response[\"Body\"].read()\n", " json_output = json.loads(content)\n", " pp.pprint(json_output)\n", " print('\\n')\n", "\n", "except:\n", " print(\"Error getting S3 object: {}\".format(completed_human_loops_s3_output))\n", "\n", "review_result = json_output['humanAnswers'][0]['answerContent']\n", "\n", "#print(review_result)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "### 7.0. Update human review feedback in Amazon Lookout For Metrics\n", "\n", "The detector learns from your feedback and will improve its prediction accuracy in the future." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df_anomalies_by_ts.head()\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "\n", "if not L4M:\n", " L4M = boto3.client( \"lookoutmetrics\")\n", "\n", "col_name_suffix = 1\n", "anomaly_col_name = 'anomaly_found-' + str(col_name_suffix)\n", "\n", "while anomaly_col_name in review_result:\n", " \n", " #tseriesid = review_result['tseriesid-' + str(col_name_suffix)] \n", " review_timestamp = review_result['timestamp-' + str(col_name_suffix)]\n", " review_marketplace = review_result['marketplace-' + str(col_name_suffix)]\n", " review_platform = review_result['platform-' + str(col_name_suffix)]\n", " review_metric_name = review_result['metric_name-' + str(col_name_suffix)]\n", " \n", " print(\"\\nRow num: {}\".format(str(col_name_suffix)))\n", " \n", " is_anomaly = review_result[anomaly_col_name]['on']\n", " print(\"Is Anomaly: {}, Timestamp: {}, Marketplace: {}, Platform: {}\\n\".format(is_anomaly, review_timestamp, review_marketplace, review_platform))\n", " \n", " print(df_anomalies_by_ts.loc[(df_anomalies_by_ts['timestamp'] == review_timestamp) & (df_anomalies_by_ts['marketplace'] == review_marketplace) & (df_anomalies_by_ts['platform'] == review_platform) & (df_anomalies_by_ts['anomaly_metric'] == review_metric_name)])\n", " \n", " # get corresponding time series id and anomaly group id from dataframe df_anomalies_by_ts\n", " row_value = df_anomalies_by_ts.loc[(df_anomalies_by_ts['timestamp'] == review_timestamp) & (df_anomalies_by_ts['marketplace'] == review_marketplace) & (df_anomalies_by_ts['platform'] == review_platform) & (df_anomalies_by_ts['anomaly_metric'] == review_metric_name)]\n", " #print(\"Row :{}\".format(row_value))\n", "\n", " tseriesid = row_value['tseriesid'].tolist()[0]\n", " print(\"tseriesid: {}\".format(tseriesid))\n", " \n", " anomaly_group_id = row_value['anomaly_group_id'].tolist()[0]\n", " print(\"Anomaly group id: {}\".format(anomaly_group_id))\n", " \n", " col_name_suffix += 1\n", " anomaly_col_name = 'anomaly_found-' + str(col_name_suffix)\n", "\n", " try:\n", " put_feedback_response = L4M.put_feedback(\n", " AnomalyDetectorArn=ecom_anomaly_detector_arn,\n", " AnomalyGroupTimeSeriesFeedback={\n", " 'AnomalyGroupId': anomaly_group_id,\n", " 'TimeSeriesId': tseriesid,\n", " 'IsAnomaly': is_anomaly} \n", " )\n", " except botocore.exceptions.ClientError as error:\n", " print(\"Error: {0}\".format(error))\n", " \n", "\n", " \n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "### 8.0. Clean up resources \n", "\n", "Next we cleanup the resources that were created. This will remove all the resources that were created above, so wait to run this until you are sure you wish to delete everything.\n", "\n", "\n", "**Note that since we created a continuous detector, it will continue to run once every hour and incur charges until it is deleted.**\n", "\n", "\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Check if the detector exists\n", "answer = input(\"Delete resources? (y/n)\")\n", "\n", "if answer in [\"y\", \"Y\", \"yes\", \"YES\"]:\n", " delete_resources = True\n", "else:\n", " delete_resources = False\n", " \n", "if delete_resources:\n", "\n", " print(\"Deleting detector: {}\".format(ecom_anomaly_detector_arn))\n", "\n", " while True:\n", "\n", " try:\n", " response = L4M.describe_anomaly_detector( AnomalyDetectorArn = ecom_anomaly_detector_arn )\n", " \n", " if response[\"Status\"] != \"DELETING\":\n", " L4M.delete_anomaly_detector(AnomalyDetectorArn = ecom_anomaly_detector_arn)\n", " \n", " print(\"status: DELETING\")\n", " time.sleep(5)\n", " continue\n", " \n", " except L4M.exceptions.ResourceNotFoundException:\n", " break\n", " \n", " print(\"Deleted detector: {}\".format(ecom_anomaly_detector_arn))\n", "\n", "\n", " iam = boto3.client(\"iam\")\n", " \n", " try:\n", " iam.detach_role_policy(PolicyArn = \"arn:aws:iam::aws:policy/AmazonS3FullAccess\", RoleName = role_name)\n", " \n", " except botocore.exceptions.ClientError as error:\n", " print(\"Error: {0}\".format(error))\n", " \n", " try:\n", " iam.delete_role(RoleName=role_name)\n", " print(\"Deleted %s\" % role_name)\n", " except botocore.exceptions.ClientError as error:\n", " print(\"Error: {0}\".format(error))\n", " \n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# remove the resources\n", "\n", "if delete_resources:\n", " \n", " # Check the status of human loop\n", " describe_human_loop_response = a2i_client.describe_human_loop(\n", " HumanLoopName=humanLoopName\n", " )\n", "\n", " print(\"\\nDescribe human loop response: \")\n", " pprint.pprint(describe_human_loop_response, width=2)\n", "\n", "\n", " # \n", " if describe_human_loop_response['HumanLoopStatus'] == \"InProgress\":\n", " stop_human_loop_response = a2i_client.stop_human_loop(\n", " HumanLoopName=humanLoopName\n", " )\n", "\n", " # Wait until human loop has stopped\n", " while True:\n", " describe_human_loop_response = a2i_client.describe_human_loop(\n", " HumanLoopName=humanLoopName\n", " )\n", " if describe_human_loop_response['HumanLoopStatus'] in [\"Stopped\", \"Failed\", \"Completed\"]:\n", " break\n", " time.sleep(5)\n", " \n", " \n", " # Delete human loop\n", " delete_human_loop_response = a2i_client.delete_human_loop(\n", " HumanLoopName=humanLoopName\n", " )\n", " print(\"\\nDelete human loop response: \")\n", " pprint.pprint(delete_human_loop_response, width=2)\n", " \n", " # Delete work flow.\n", " delete_flow_definition_response = sagemaker_client.delete_flow_definition(\n", " FlowDefinitionName=l4m_flowDefinitionName\n", " )\n", "\n", " print(\"\\nDelete flow definition response: \")\n", " pprint.pprint(delete_flow_definition_response, width=2)\n", "\n", " # Delete human task UI\n", " # Check if Amazon lookout for metrics UI exists\n", " try:\n", " delete_human_task_ui_response = sagemaker_client.delete_human_task_ui(\n", " HumanTaskUiName=l4m_taskUIName\n", " )\n", " print(\"\\nDelete human task UI: \")\n", " pprint.pprint(delete_human_task_ui_response, width = 2)\n", " except:\n", " print(\"Human task UI {} not found\".format(l4m_taskUIName))\n" ] } ], "metadata": { "instance_type": "ml.t3.medium", "kernelspec": { "display_name": "Python 3 (Data Science)", "language": "python", "name": "python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-west-2:236514542706:image/datascience-1.0" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.10" } }, "nbformat": 4, "nbformat_minor": 4 }