# Integrate Amazon Lookout for Metrics with Amazon Augmented AI (A2I)

Amazon Lookout for Metrics can help you identify anomalies within your data metrics that you gather on a periodic basis. In this notebook we show how to pass the anomalous results for human review and use the feedback for improving model accuracy.  

We are extending the example usecase for Amazon Lookout for Metrics that was discussed in an [earlier blog](https://aws.amazon.com/blogs/machine-learning/introducing-amazon-lookout-for-metrics-an-anomaly-detection-service-to-proactively-monitor-the-health-of-your-business/) and integrating it with Amazon A2I.


## Workflow

1. Create a Detector and configure its detection properties.
2. Create a Metric Set:
    1. Provide the location of your source data and the IAM permissions needed to access it. 
    1. Define the Metrics that you want to investigate.
    1. Attach the dataset to your Detector.
3. Activate the Detector.
4. Pass the detected outliers to a human work team for review.
6. Provide feedback on the outliers to improve predictor model's accuracy.


---
### 1.0. Prerequisites

1. The code uses Python 3.7. Please use the Python 3 (Data Science) kernel for this notebook.
2. If you need thsi Notebook to create an IAM role for Lookout For Metrics, SageMaker will need permissions to create the IAM role. You can find teh IAM role used by SageMaker Studio in the [SageMaker Studio Control Panel](https://docs.aws.amazon.com/sagemaker/latest/dg/onboard-quick-start.html). 

Note: It is OK to encounter the following error in the output of next cell.
<br/><span style="color:tomato">ERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
awscli 1.19.47 requires botocore==1.20.47, but you have botocore 1.20.91 which is incompatible.
awscli 1.19.47 requires s3transfer<0.4.0,>=0.3.0, but you have s3transfer 0.4.2 which is incompatible.
aiobotocore 1.2.2 requires botocore<1.19.53,>=1.19.52, but you have botocore 1.20.91 which is incompatible.</span>


In [None]:
# First, let's get the latest installations of our dependencies
## IGNORE ANY ERRORS ##
!pip install --upgrade pip
!pip install botocore --upgrade
!pip install boto3 --upgrade
!pip install -U botocore

In [None]:
import time
import os
import zipfile
import shutil
import datetime
import pprint
import json
import uuid
import re
import texttable as tt

import boto3
import pandas as pd
import numpy as np
import sagemaker
import botocore


---
### 1.1. Create S3 bucket

Create an S3 bucket where we will upload data for Amazon Lookout for Metrics. The bucket is created only if it does not already exist.


In [None]:
region = boto3.Session().region_name

account_id = boto3.client('sts').get_caller_identity().get('Account')

bucket_name = account_id + "-" + region + "-lookoutmetrics-lab"


# Use SageMaker's default S3 bucket, where training output will be stored. 
#bucket_name = session.default_bucket()  # Custom bucket name can be used.
#print ("S3 Bucket Name: " + bucket_name)

#Create the bucket of it does not exist
s3 = boto3.resource('s3')
exists = True
if s3.Bucket(bucket_name).creation_date is None:
    exists = False

if not exists:
    try:
        if  region == 'us-east-1':
          s3.create_bucket(Bucket=bucket_name)
        else: 
          s3.create_bucket(Bucket=bucket_name, CreateBucketConfiguration={ 'LocationConstraint': region })
        print('S3 bucket {} created successfully'.format(bucket_name))
    except Exception as e:
        print('S3 error: ', e)
else: 
    print("S3 Bucket: {} already exists.".format(bucket_name))



---
### 1.2. Configure IAM Role

Create an IAM role that will be assumed by Amazon Lookout for Metrics service and will allow it to communicate with S3.


In [None]:
iam = boto3.client("iam")

role_name = "L4M_iam_role"

assume_role_policy_document = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Service": "lookoutmetrics.amazonaws.com"
            },
            "Action": "sts:AssumeRole"
        }
    ]
}

try:
    create_role_response = iam.create_role(
        RoleName = role_name,
        AssumeRolePolicyDocument = json.dumps(assume_role_policy_document)
    )
    role_arn = create_role_response["Role"]["Arn"]
    
    print("Created %s" % role_name)
    print("Attaching policies")

    iam.attach_role_policy(
        RoleName=role_name,
        PolicyArn='arn:aws:iam::aws:policy/AmazonS3FullAccess',
    )
    print("Waiting for a minute to allow IAM role policy attachment to propagate")
    time.sleep(60)


except iam.exceptions.EntityAlreadyExistsException:
    print("Role %s already existed" % role_name )
    role_arn = boto3.resource('iam').Role(role_name).arn


#iam.attach_role_policy(
#    RoleName = role_name,
#    PolicyArn = "arn:aws:iam::aws:policy/AmazonSNSFullAccess"
#)

print(role_arn)


---
### 2.0. Generate synthetic data

We will generate data both for training the detector and for prediction of anomalies. In this example the detector will be used in continuous mode to detect anomalies. Starting from the current date, we generate data for a default period of 6 months in the past and 3 days in the future. The period is configurable through constants defined in the next cell.

The historical data is used for training the model, while the future data will be used for predicting anomalies on an ongoing basis.

* Historical data will be created as a csv file called "./data/ecommerce/backtest/input.csv"
* Hourly data files will be stored in folder, "./data/ecommerce/live/{yyyyMMdd}/{HH:mm}/{yyyyMMdd_HH:mm:ss}.csv"
* Complete data along with the anomaly labels is available in "./data/ecommerce/label.csv"

The data in local folders is replaced based on current date, on every execution of this section.

*Note: Synthetic data generation may take about 4 minutes.*

In [None]:
# local folders for data
DATASET_NAME = "ecommerce"
DIR_PATH = './data'

#######################################################################
# Set constants for the duration of historical and future periods for data generation.
NUM_MONTHS_HISTORICAL_DATA = 6
NUM_DAYS_FUTURE_DATA = 3

# Metrics will be received at the top of every hour.
FREQUENCY = "PT1H" # one of 'P1D', 'PT1H', 'PT10M' and 'PT5M'
#######################################################################



---
### 2.1. Generate data


In [None]:
%%time

import os
import math
import random
import itertools
import datetime
from datetime import date
from dateutil.relativedelta import relativedelta
import shutil
import pandas as pd

dimensions = { "platform" : [ "pc_web", "mobile_web", "mobile_app" ], "marketplace" : [ "us", "uk", "de", "fr", "es", "it", "jp" ] }
metrics = [ "views", "revenue" ]

metric_period = "1H"

daily_peak_size_range = ( 200, 400 )
daily_peak_time = ( 12 * 60, 21 * 60 )
daily_offset_range = ( 100, 200 )

random_factor_size_range = (2, 10)

anomaly_size_range = ( 100, 600 )
anomaly_length_range = ( 1, 5 * 60 )
anomaly_possibility = 0.005
#anomaly_possibility = 0.2

introduce_metric_from_upstream = [
    lambda x : max( int(x), 0 ),    # sin curve -> views 
    lambda x : x * 0.3,             # views -> revenue
]

random.seed(1234)

class DailyPattern:
    
    def __init__( self ):
        self.peak_size = random.uniform( *daily_peak_size_range )
        self.peak_time = random.uniform( *daily_peak_time )
        self.offset = random.uniform( *daily_offset_range )
    
    def get( self, t ):
        
        minutes_in_day = t.hour * 60 + t.minute
        
        factor1 = math.cos( (( minutes_in_day - self.peak_time ) / ( 24 * 60 )) * 2 * math.pi ) * self.peak_size + self.peak_size + self.offset
        
        return factor1

class RandomFactor:
    
    def __init__( self ):
        self.size = random.uniform( *random_factor_size_range )

    def get(self):
        return random.uniform( -self.size, self.size )


class Anomaly:
    
    def __init__(self):
        self.remaining_time = random.randint( *anomaly_length_range )
        self.offset = random.uniform( *anomaly_size_range ) * (random.randint(0,1)*2-1)
        #print( self.offset )

    def proceed_time(self):
        self.remaining_time -= pd.to_timedelta(metric_period).seconds / 60
        return self.remaining_time <= 0

    def get(self):
        return self.offset

class Item:

    def __init__( self, dimension ):
        
        #print( dimension )
        
        self.dimension = dimension
        
        self.daily_pattern = DailyPattern()
        self.random_factor = RandomFactor()
        self.anomaly = None
    
    def get( self, t ):
    
        if random.random() < anomaly_possibility:
            self.anomaly = Anomaly()
        
        value = self.daily_pattern.get(t)
        
        value += self.random_factor.get()

        is_anomaly = bool(self.anomaly)
        if self.anomaly:
            value += self.anomaly.get()
            if self.anomaly.proceed_time():
                self.anomaly = None
        
        metric_values = []
        for i, metric in enumerate(metrics):
            value = introduce_metric_from_upstream[i](value)
            metric_values.append(value)
        
        #if (is_anomaly):
            #print (metric_values, is_anomaly)
        
        return metric_values, is_anomaly


def synthesize(period):

    # create item list
    item_list = []
    for dimension_values in itertools.product( *dimensions.values() ):
        item = Item( dict( zip( dimensions.keys(), dimension_values ) ) )
        item_list.append(item)
    
    # itereate and prepare data    
    dimension_values_list = []
    for i in range( len(dimensions) ):
        dimension_values_list.append([])

    timestamp_list = []

    metric_values_list = []
    for i, metric in enumerate(metrics):
        metric_values_list.append([])

    labels_list = []
    for i, metric in enumerate(metrics):
        labels_list.append([])
    
    t = period[0]
    while t<period[1]:
        
        #print(t)

        for item in item_list:
            
            for i, d in enumerate(item.dimension.values()):
                #print(i,d)
                dimension_values_list[i].append(d)
            
            timestamp_list.append(t)
            
            metric_values, is_anomaly = item.get(t)
            for i, metric_value in enumerate(metric_values):
                metric_values_list[i].append(metric_value)
                labels_list[i].append( int(is_anomaly) )

        t += pd.to_timedelta(metric_period)
        
    # convert to DataFrame
    data = {}
    for dimension_name, dimension_values in zip( dimensions.keys(), dimension_values_list ):
        data[dimension_name] = dimension_values
    data["timestamp"] = timestamp_list
    for metric_name, metric_values in zip( metrics, metric_values_list ):
        data[metric_name] = metric_values
    for metric_name, labels in zip( metrics, labels_list ):
        data[metric_name + "_label"] = labels    
    df = pd.DataFrame(data)
    return df


def splot_into_intervals( df, output_dirname ):

    #print(df.head())
    df["views"] *= random.uniform(0.1, 2.0)
    df["views"] = df["views"].apply(lambda x: int(x))

    df["revenue"] *= random.uniform(0.1, 2.0)
    df["revenue"] = df["revenue"].apply(lambda x: round(x, 2))
    #print(df.head())
    
    for timestamp, df_single_timestamp in df.groupby("timestamp"):        
        dirname = os.path.join( output_dirname, timestamp.strftime( "%Y%m%d/%H%M" ) )
        filename = os.path.join( dirname, timestamp.strftime("%Y%m%d_%H%M%S.csv") )

        if not os.path.exists(dirname):
            os.makedirs( dirname )
        
        df_single_timestamp.to_csv( filename, index=False, date_format="%Y-%m-%d %H:%M:%S" )


def generate_data(period, data_type):
    
    df_full = synthesize(period)
    
    # Create new ones:
    if not os.path.exists("./data/{}/{}".format(DATASET_NAME,data_type)):
        os.makedirs("./data/{}/{}".format(DATASET_NAME,data_type))

    df_full.to_csv( "./data/%s/label.csv" % DATASET_NAME, index=False )
    label_colunn_names = [ metric_name + "_label" for metric_name in metrics ]
    df_input = df_full.drop( columns = label_colunn_names )
    
    if (data_type == "backtest"):
        df_input.to_csv( "./data/{}/backtest/input.csv".format(DATASET_NAME), index=False )
    else:
        splot_into_intervals( df_input, "./data/{}/live".format(DATASET_NAME))


# Get rid of old files:
try:
    shutil.rmtree(DIR_PATH, ignore_errors=False, onerror=None)
except:
    print('Error while deleting directory')

start = date.today() + relativedelta(months =- NUM_MONTHS_HISTORICAL_DATA)
end = date.today()
period = (datetime.datetime(start.year, start.month, start.day), datetime.datetime(end.year, end.month, end.day))

generate_data(period, "backtest")

start = date.today()
end = date.today()  + relativedelta(days =+ NUM_DAYS_FUTURE_DATA)
period = (datetime.datetime(start.year, start.month, start.day), datetime.datetime(end.year, end.month, end.day))

generate_data(period, "live")

In [None]:

backtest_df = pd.read_csv('data/ecommerce/backtest/input.csv')
backtest_df.head()

---
### 2.2. Save data to S3 bucket

Save the data into the s3 bucket created earlier.

*Will take about 1 min 30 sec.*

In [None]:
%%time
!aws s3 sync {DIR_PATH}/{DATASET_NAME}/ s3://{bucket_name}/{DATASET_NAME}/ --quiet --delete

### 3.0. Create Lookout for Metrics Detector

The `Detector` is a machine learning model that detects outliers in the metrics. The detector is automatically trained with the machine learning algorithm that best fits your data and use case. You can optionally provide your historical data for training, if you have any. Otherwise, get started with real-time data, and Amazon Lookout for Metrics will learn on-the-go. For this example we provide 6 months of historical data.

You specify the Amazon S3 location that Amazon Lookout for Metrics should continuously monitor for new data, and your detector analyzes your data and returns information about the outliers that it detected. When you create a `Detector`, you also specify a `detecting domain` and an `outlier detection frequency`. 

The `anomaly detection frequency` specifies how frequently the detector should wake-up and look for new data, run analysis and alert you with any interesting findings. For this example, the detector will look for anomalies at the top of every hour.

In [None]:
L4M = boto3.client( "lookoutmetrics")

ecom_anomaly_detector_name = "ecommerce-continuous-detector"
ecom_anomaly_detector_arn = ''

detector_list =[]

# Check if the eCommerce detector already exists 
list_anomaly_detectors_response = L4M.list_anomaly_detectors(MaxResults=100)

anomaly_detectors_list = list_anomaly_detectors_response['AnomalyDetectorSummaryList']

#print("Anomaly detectors list:\n {}".format(anomaly_detectors_list))

for anomaly_detector in anomaly_detectors_list:
    
    #print('anomaly_detector: {}'.format(anomaly_detector))

    if anomaly_detector['AnomalyDetectorName'] == ecom_anomaly_detector_name:
        # the detector for ecommerce example exists. Get its ARN
        ecom_anomaly_detector_arn = anomaly_detector['AnomalyDetectorArn']
        break

if len(ecom_anomaly_detector_arn) == 0:
    # Detector for ecommerce example does not exists. Create the anomaly detector.
    create_anomaly_detector_response = L4M.create_anomaly_detector( 
        AnomalyDetectorName = ecom_anomaly_detector_name,
        AnomalyDetectorDescription = "Anomaly detection on a sample ecommerce dataset.",
        AnomalyDetectorConfig = {
            "AnomalyDetectorFrequency" : FREQUENCY,   
        },
    )

    ecom_anomaly_detector_arn = create_anomaly_detector_response["AnomalyDetectorArn"]
    
print('\nAnomaly Detector ARN:\n{}'.format(ecom_anomaly_detector_arn))

---
### 3.1. Define Metrics

### Measures and Dimensions

`Measures` are variables or key performance indicators on which customers want to detect outliers and `Dimensions` are meta-data that represent categorical information about the measures. 

In this E-commerce example, views and revenue are our measures, and platform and marketplace are our dimensions. Customers may want to monitor their data for anomalies in number of views or revenue for every platform, marketplace, and combination of both. You can designate up to five measures and five dimensions per dataset.

### Metrics 

After creating a detector, and mapping your measures and dimensions, Amazon Lookout for Metrics will analyze each combination of these measures and dimensions. For the above example, we have of 7 unique values (us, jp, de, etc.) for marketplace and 3 unique values (mobile web, mobile app, pc web) for platform for a total of 21 unique combinations. Each unique combination of measures with the dimension values (e.g. us/mobile app/revenue) is a time series `metric`. In this case, we have 21 dimensions and 2 measures for a total of 42 time-series `metrics`. 

Amazon Lookout for Metrics detects anomalies at the most granular level so you are able to pin-point any unexpected behavior in your data.

### Datasets

Measures, dimensions and metrics map to `datasets`, which also contain the Amazon S3 locations of your source data, an IAM role that has both read and write permissions to those Amazon S3 locations, and the rate at which data should be ingested from the source location (the upload frequency and data ingestion delay).


In [None]:
# Check if the metric set for this example exists

ecom_metric_set_name = "ecommerce-metric-set"
ecom_metric_set_arn = ''

metric_sets_list =[]


list_metric_sets_response = L4M.list_metric_sets(
        AnomalyDetectorArn=ecom_anomaly_detector_arn,
        MaxResults=100)

metric_sets_list = list_metric_sets_response['MetricSetSummaryList']
#print("Anomaly detectors list:\n {}".format(metric_sets_list))

for metric_set in metric_sets_list:
    
    #print('metric_set: {}'.format(metric_set))

    if metric_set['MetricSetName'] == ecom_metric_set_name:
        
        ecom_metric_set_arn = metric_set['MetricSetArn']
        print("\nMetric Set ARN:\n{}".format(ecom_metric_set_arn))
        
        break
        


In [None]:
# If metric set for this example is not found, create it.
if len(ecom_metric_set_arn) == 0:
    s3_path_format = 's3://'+ bucket_name + '/ecommerce/live/{{yyyyMMdd}}/{{HHmm}}'
    s3_historical_path_prefix = 's3://'+ bucket_name + '/ecommerce/backtest/input.csv'


    params = {
        "AnomalyDetectorArn": ecom_anomaly_detector_arn,
        "MetricSetName" : ecom_metric_set_name,
        "MetricList" : [
            {
                "MetricName" : "views",
                "AggregationFunction" : "SUM",
            },
            {
                "MetricName" : "revenue",
                "AggregationFunction" : "SUM",
            },
        ],

        "DimensionList" : [ "platform", "marketplace" ],

        "TimestampColumn" : {
            "ColumnName" : "timestamp",
            "ColumnFormat" : "yyyy-MM-dd HH:mm:ss",
        },

        #"Delay" : 120, # seconds the detector will wait before attempting to read latest data per current time and detection frequency below
        "MetricSetFrequency" : FREQUENCY,

        "MetricSource" : {
            "S3SourceConfig": {
                "RoleArn" : role_arn,
                "HistoricalDataPathList": [
                    s3_historical_path_prefix,
                ],
                "TemplatedPathList": [
                    s3_path_format,
                ],

                "FileFormatDescriptor" : {
                    "CsvFormatDescriptor" : {
                        "FileCompression" : "NONE",
                        "Charset" : "UTF-8",
                        "ContainsHeader" : True,
                        "Delimiter" : ",",
                        "QuoteSymbol" : '"'
                    },
                }
            }
        },
    }

    create_metric_set_response = L4M.create_metric_set( ** params )
    ecom_metric_set_arn = create_metric_set_response["MetricSetArn"]

print("\nMetric Set ARN: {}".format(ecom_metric_set_arn))



---
### 4.0. Activate the Detector

During activation the model is trained with historical data that was generated above and stored in the "./data/ecommerce/backtest" folder.

*The activation process can take about 20 minutes.* 

In [None]:
%%time

# get detector details
describe_anomaly_detector_response =L4M.describe_anomaly_detector(
    AnomalyDetectorArn=ecom_anomaly_detector_arn)

#print("\nEcommerce Detector details: ")
#pprint.pprint(describe_anomaly_detector_response, width = 2)

ecom_detector_status = describe_anomaly_detector_response['Status']

if ecom_detector_status in ["INACTIVE", "ACTIVATING"]:
    
    # Activate the detector
    if ecom_detector_status == "INACTIVE":
        L4M.activate_anomaly_detector(AnomalyDetectorArn = ecom_anomaly_detector_arn)
    
        print("\nActivating ecommerce example Detector.")
    
    # Check status every 10 secs untile detector is ACTIVE
    while (ecom_detector_status in ["ACTIVATING", "INACTIVE", "LEARNING"]):
        response = L4M.describe_anomaly_detector( AnomalyDetectorArn = ecom_anomaly_detector_arn )
        ecom_detector_status = response["Status"]
        if ecom_detector_status == "ACTIVE" :
            break;
        else: 
            time.sleep(10)
            print("Detector status: {}".format(ecom_detector_status))
    
elif ecom_detector_status == "ACTIVE":
    print("\nEcommerce example detector is Active")
else:
    print("\nEcommerce Detector Status: {}".format(ecom_detector_status))
    print("\nDelete old detector in console and rerun this notebook.")


---
### 5.0. Fetch Anomalies

We have created a continuous detector that will operate on live data. It expects to receive input data every hour. We already generated some data into the future and you can find it in the "./data/ecommerce/live" folder. 

**Note:** 
*You may have to wait for the detector to run at the top of the hour to detect anomalies. So, if no anomalies are found when executing the next cell, you may want to come back later and run it again.*


In [None]:
#ecom_anomaly_detector_arn = 'arn:aws:lookoutmetrics:us-west-2:095351214964:AnomalyDetector:my-detector'
#ecom_anomaly_detector_name = "my-detector"

############################
anomaly_score_cutoff = 50
############################

anomaly_groups = []
next_token = None

while True:    
    params = {
        "AnomalyDetectorArn" : ecom_anomaly_detector_arn,
        "SensitivityThreshold" : anomaly_score_cutoff,
        "MaxResults" : 100,
    }

    if next_token:
        params["NextToken"] = next_token

    response = L4M.list_anomaly_group_summaries(**params )
    
    print("Anomaly group summaries:\n {}".format(response))

    anomaly_groups += response["AnomalyGroupSummaryList"]
    print('\ntype of AnomalyGroupSummaryList: {}'.format(type(anomaly_groups)))
    
    for entry in anomaly_groups:
        print('\nAnomaly group id: {}, Anomaly group score: {}'.format(entry['AnomalyGroupId'], entry['AnomalyGroupScore']))
        
    if "NextToken" in response:
        next_token = response["NextToken"]
        continue

    break

if len(anomaly_groups) == 0:
    print("\nAnomalies not found")


---
### 5.1. Build a Pandas Dataframe with anomaly details to pass to human workflow 

Iterate over the anomaly results and build a dataframe. For each measure in an anomaly group, we fetch the list of anomalous metrics. For each anomalous dimension and its value, we get an associated system generated time series id. After human review of anomalies, we will need the time series id for each metric to when we register human feedback into Lookout Metrics to improve future prediction results.
We save the time series id along with the timestamp and metric details into a new dataframe.


In [None]:

def datetime_from_string(s):
    try:
        dt = datetime.datetime.fromisoformat(s.split("[")[0])
    except ValueError:
        dt = datetime.datetime.strptime(s.split("[")[0], "%Y-%m-%dT%H:%MZ")
    
    return dt

def get_input_file_name(dt):
    time_tuple = t.timetuple()
    print("t: {}".format(time_tuple))
    year = str(time_tuple.tm_year).zfill(4)
    mon = str(time_tuple.tm_mon).zfill(2)
    day = str(time_tuple.tm_mday).zfill(2)
    hour = str(time_tuple.tm_hour).zfill(2)
    minute = str(time_tuple.tm_min).zfill(2)
    sec = str(time_tuple.tm_sec).zfill(2)
    file_name = '{6}/{7}/live/{0}{1}{2}/{3}{4}/{0}{1}{2}_{3}{4}{5}.csv'.format(year, mon, day, hour, minute, sec, DIR_PATH, DATASET_NAME)
        
    return file_name
    
    
# Since our metrics period is 1 hour
frequency_timedelta = datetime.timedelta(hours=1)

df_anomalies_list = []
dimension_names_set = set()

time_series_list = []
df_anomaly_file = pd.DataFrame()

for anomaly_group in anomaly_groups:

    print("\n\n")
    pprint.pprint(anomaly_group, width = 2)
    
    start_time = datetime_from_string( anomaly_group["StartTime"] )
    end_time = datetime_from_string( anomaly_group["EndTime"] )
    
    anomaly_group_id = anomaly_group["AnomalyGroupId"]
    anomaly_group_score = anomaly_group["AnomalyGroupScore"]
    primary_metric_name = anomaly_group["PrimaryMetricName"]
    
    next_token = None

    while True:    

        params = {
            "AnomalyDetectorArn" : ecom_anomaly_detector_arn,
            "AnomalyGroupId" : anomaly_group_id,
            "MetricName" : primary_metric_name,
            "MaxResults" : 100,
        }

        if next_token:
            params["NextToken"] = next_token

        anomaly_group_time_series_response = L4M.list_anomaly_group_time_series( **params )

        time_series_list += anomaly_group_time_series_response["TimeSeriesList"]
        
        if "NextToken" in response:
            next_token = response["NextToken"]
            continue

        break
    
    #print("\nAnomaly group time series: ")
    #pprint.pprint(anomaly_group_time_series_response, width = 2)

    #print("\nTime Series list: {}".format(time_series_list))

    for time_series in time_series_list:
        # time_series will have a lists of dimensions, metric values along with associted time series id
        data = {}
        dimension_dict = {}
        dim_names_set = set()
        
        data['tseriesid'] = time_series['TimeSeriesId']
        
        for dim_in_tseries in time_series["DimensionList"]:
            #dim_names_set.add(dim_in_tseries["DimensionName"])
            dimension_dict[dim_in_tseries["DimensionName"]] = dim_in_tseries["DimensionValue"]
            #dimension_dict['metric_name'] = primary_metric_name          
            
            #data[dimension["DimensionName"]] = [ dimension["DimensionValue"]]
            dimension_names_set.add(dim_in_tseries["DimensionName"])
            #data[primary_metric_name + "_group_score"] = [anomaly_group_score]
        
        print("data: {}".format(data))
        
        t = start_time
        
        while t <= end_time:
            
            file_name = get_input_file_name(t)
            print("File name: {}".format(file_name))
            
            df_from_csv = pd.read_csv(file_name) 
            #print("df_from_csv: {}".format(df_from_csv))
            
            df_temp = df_from_csv
            # drop rows where the dimension does not match
            for dim_name in dimension_dict:
                dim_value = dimension_dict[dim_name]
                print("dim_name: " + dim_name + ", dim value: " + dim_value)
                df_temp = df_temp[df_temp[dim_name] == dim_value] #drop rows
                
            df_temp['tseriesid'] = time_series['TimeSeriesId']
            df_temp['anomaly_group_id'] = anomaly_group_id
            df_temp['anomaly_metric'] = primary_metric_name
            df_temp['anomaly_metric_value'] = df_temp[primary_metric_name]
            
            #print("df_temp: {}".format(df_temp.to_string()))
            
            df_anomalies_list.append(df_temp)
            t += frequency_timedelta


if len(df_anomalies_list) > 0:
    df_anomalies_by_ts = pd.concat(df_anomalies_list)

    # fold multiple metrics into same rows
    df_anomalies_by_ts = df_anomalies_by_ts.groupby(["timestamp", *dimension_names_set], as_index=False).max() 
    print("Anomalies saved in dataframe.")
else:
    print("No anomalies found.")


print("\ndf_anomalies_by_ts:\n" + str(df_anomalies_by_ts))

In [None]:
pd.set_option('display.max_columns', None)
df_anomalies_by_ts.head()

---
### 5.2. Export the Results

Create a CSV file with anomaly results.

In [None]:
filename = ecom_anomaly_detector_arn.split(':')[-1] + "_anomalies.csv"
df_anomalies_by_ts.to_csv(filename, index=False )

---
### 6.0. Create a private Workteam in SageMaker Console

We do it through the AWS Console because it automatically integrates the workteam with Cognito for secure authentiation. 

Copy the the ARN of the workteam created through the console and enter it as response to the input command in the next cell.


In [None]:
workteam_ARN = 'arn:aws:sagemaker:us-west-2:095351214964:workteam/private-crowd/l4m-reviewers'

In [None]:
while workteam_ARN == '':
    workteam_ARN = input("Please enter the ARN of the Work Team:\n")
    if len(workteam_ARN) > 0:
        break

# WORKTEAM_ARN = arn:aws:sagemaker:us-west-2:095351214964:workteam/private-crowd/l4m-reviewers


In [None]:

# timestamp = time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime())

# Flow definition name - this value is unique per account and region. You can also provide your own value here.
l4m_flowDefinitionName = 'l4m-ecommerce-workflow'

# Task UI name - this value is unique per account and region. You can also provide your own value here.
l4m_taskUIName = 'l4m-ecommerce-ui'



---
### 6.1. Create a human task UI 

Create a custom task template using HTML that will be presented to the workers. It uses Crowd HTML web components, a web standard that abstracts HTML markup, CSS, and JavaScript functionality into an HTML tag or set of tags. 



In [None]:

# We customized the tabular template for our notebook as below
ecom_a2i_template = r"""
<script src="https://assets.crowd.aws/crowd-html-elements.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/Chart.js/2.5.0/Chart.min.js"></script>

<crowd-form>

    <div>
        <h1>Ecommerce Revenue and Views by Platform and Market</h1>
    </div>

    <div style="margin-left: 40px">
        <h2>Instructions</h2>
        <p>The following entries were identified as anomalies.<br/> 
            Please review the views and revenue scores for the platform and market place.<br/>
            Check the radio button to confirm whether it was an anomaly.<br/>
            Please enter any optional comments for the anomaly.
        </p>
        <br>
    </div>
    <div>
    <table>
    <tr>
        <th></th>
        <th>Timestamp</th>
        <th>Platform</th>
        <th>Market Place</th>
        <th>Anomaly metric</th>
        <th>metric value</th>
        <th>Anomaly?</th>
        <th>Comment</th>
    </tr>
    
    {% for entry in task.input.l4m_ecom_anomaly %}

        <tr>
            <td><crowd-text-area name="sno-{{ forloop.index }}" value="{{ forloop.index }}"></crowd-text-area></td>
            <td><crowd-text-area name="timestamp-{{ forloop.index }}" value="{{ entry.timestamp }}"></crowd-text-area></td>
            <td><crowd-text-area name="platform-{{ forloop.index }}" value="{{ entry.platform }}"></crowd-text-area></td>
            <td><crowd-text-area name="marketplace-{{ forloop.index }}" value="{{ entry.marketplace }}"></crowd-text-area></td>     
            <td><crowd-text-area name="metric_name-{{ forloop.index }}" value="{{ entry.metric_name }}"></crowd-text-area></td>     
            <td><crowd-text-area name="metric_value-{{ forloop.index }}" value="{{ entry.metric_value }}"></crowd-text-area></td>     
            <td><crowd-checkbox name="anomaly_found-{{ forloop.index }}">Anomaly Found</crowd-checkbox></td>
            <td>
                <div><crowd-input name="comment-{{ forloop.index }}" placeholder="Enter optional comment"></crowd-input>
            </td>
        </tr>
      {% endfor %}
    </table>
    <br>
    </div>
</crowd-form>

<style>
  greenbg {
    background: #feee23;
    display: block;
  }

  table {
    border-spacing: 0; 
  }

  th {
    background-color: #8888ee;
    color: #f3f3f3;
    font-weight: 700;
  }

  th, td {
      border: 1px solid blue;
  }

  td {
    padding-left: 10px ;
    padding-right: 10px ;
  }

</style>

<script>
window.chartColors = {
  red: 'rgb(255, 99, 132)',
  orange: 'rgb(255, 159, 64)',
  yellow: 'rgb(255, 205, 86)',
  green: 'rgb(75, 192, 192)',
  blue: 'rgb(54, 162, 235)',
  purple: 'rgb(153, 102, 255)',
  grey: 'rgb(231,233,237)'
};

var signal = "{{task.input.l4m_ecom_anomaly | to_json}}";
var timestamp = [];
var platform = [];
var marketplace = [];
var metric_name = [];
var metric_value = [];

  
</script>
"""

sagemaker_client = boto3.client('sagemaker')

# Check if lookout for metrics UI for this example already exists
try:
    describe_human_task_ui_response = sagemaker_client.describe_human_task_ui(
        HumanTaskUiName=l4m_taskUIName
    )
    #print("\nDescribe human task UI: ")
    #pprint.pprint(describe_human_task_ui_response, width = 2)
    
except:
    print("Human task UI {} not found")
    describe_human_task_ui_response = {}

if not describe_human_task_ui_response:
    # Create the human task UI
    create_human_task_ui_response = sagemaker_client.create_human_task_ui(
        HumanTaskUiName=l4m_taskUIName,
        UiTemplate={'Content': ecom_a2i_template}) 

    print("\nCreate human task ui response: ")
    pprint.pprint(create_human_task_ui_response, width = 2)

    l4m_review_ui_arn = create_human_task_ui_response['HumanTaskUiArn']
else:
    l4m_review_ui_arn = describe_human_task_ui_response['HumanTaskUiArn']    
    
print("\nHuman task UI ARN: {}".format(l4m_review_ui_arn))


---
### 6.2. Create a Human task Workflow 

We use Amazon Augmented AI's user interface to create a custom task workflow. The new flow is created only if one does not exist already with the same name. The results of human review are stored in an Amazon S3 bucket, which can be accessed by the client application. 


In [None]:
sagemaker_role_arn = sagemaker.get_execution_role()

s3_output_path = f's3://' + bucket_name + '/ecommerce' + '/a2i-results'
print("S3 output path: {}".format(s3_output_path))

# Check if Amazon Lookout For Metrics Workflow exists
try:
    describe_flow_definition_response = sagemaker_client.describe_flow_definition(
        FlowDefinitionName=l4m_flowDefinitionName
    )
    ###### print describe_flow_definition_response
    #print("\nDescribe flow definition response: ")
    #pprint.pprint(describe_flow_definition_response, width=2)
    
except:
    describe_flow_definition_response = {}
    
# Create Amazon Lookout For Metrics Workflow if it does not exist already

if not describe_flow_definition_response:
    create_workflow_definition_response = sagemaker_client.create_flow_definition(
        FlowDefinitionName = l4m_flowDefinitionName,
        RoleArn=sagemaker_role_arn,
        HumanLoopConfig= {
            "WorkteamArn": workteam_ARN,
            "HumanTaskUiArn": l4m_review_ui_arn,
            "TaskCount": 1,
            "TaskDescription": "Review the anomalies detected by Amazon Lookout for Metrics",
            "TaskTitle": "Ecommerce Anomalies Review"
        },
        OutputConfig={
            "S3OutputPath" : s3_output_path
        }
    )
    
    # Wait until the newly created flow becomes Active
    while True:

        response = sagemaker_client.describe_flow_definition(FlowDefinitionName=l4m_flowDefinitionName)
        print(response['FlowDefinitionStatus'])
        if (response['FlowDefinitionStatus'] == 'Active'):
            print("Flow Definition is active")
            break
        time.sleep(5)


    flowDefinitionArn = create_workflow_definition_response['FlowDefinitionArn'] 

else:
    flowDefinitionArn = describe_flow_definition_response['FlowDefinitionArn'] 
        

print("Flow definition Arn: {}".format(flowDefinitionArn))

In [None]:
# !aws s3 sync {data_dirname}/ecommerce/ s3://{bucket_name}/ecommerce/ --quiet

#!aws s3 cp ecommerce-continuous-detector_anomalies.csv s3://{bucket_name}/ecommerce/

# Anomaly records were saved in the dataframe in a previous cell. Otherwise read from csv file saved earlier.
if len(df_anomalies_by_ts) == 0:
    df_anomalies_by_ts = pd.read_csv("./ecommerce-continuous-detector_anomalies.csv")

df_anomalies_by_ts.head()


---
Create a Python list of anomalies to pass into the human review task 

In [None]:
#tseriesid_list = ecom_anomalies_df['tseriesid'].astype(str).to_list()
timestamp_list = df_anomalies_by_ts['timestamp'].astype(str).to_list()
platform_list = df_anomalies_by_ts['platform'].astype(str).to_list()
marketplace_list = df_anomalies_by_ts['marketplace'].astype(str).to_list()

anomaly_metric_list = df_anomalies_by_ts['anomaly_metric'].astype(str).to_list()
metric_value_list = df_anomalies_by_ts['anomaly_metric_value'].astype(str).to_list()

for i in range(len(timestamp_list)):
     ecom_review_list = [ {'timestamp': timestamp_list[i], \
                            'platform': platform_list[i], \
                            'marketplace': marketplace_list[i], \
                            'metric_name': anomaly_metric_list[i], \
                            'metric_value': metric_value_list[i]} \
                            for i in range(len(timestamp_list))

                        ]
        
ip_content = {"l4m_ecom_anomaly": ecom_review_list} # passed into workflow

ecom_review_list


---
### 6.3. Start the human review loop. 

In [None]:
a2i_client = boto3.client('sagemaker-a2i-runtime')

humanLoopName = str(uuid.uuid4())

start_human_loop_response = a2i_client.start_human_loop(
            HumanLoopName=humanLoopName,
            FlowDefinitionArn=flowDefinitionArn,
            HumanLoopInput={
                "InputContent": json.dumps(ip_content)
            }
        )

print("\nStart human loop response: ")
#pprint.pprint(start_human_loop_response, width=2)

print("\nHuman Loop ARN: {}".format(start_human_loop_response['HumanLoopArn']))

---
By this time we should have already created a Workteam in the cosole and provided its ARN as input to an ealier cell. We use the Workteam name from the Workteam ARN, to find the URL of the portal for providing feedback on anomalies.


In [None]:
workteamName = workteam_ARN[workteam_ARN.rfind('/') + 1:]

describe_workteam_response = sagemaker_client.describe_workteam(WorkteamName=workteamName)

if not describe_workteam_response:
    print("You need to log into SageMaker console and create a Workteam")
    sys.exit()
 
#print("\nDescribe work team response: ")
#pprint.pprint(describe_workteam_response, width=2)

workteam_portal = 'https://' + describe_workteam_response['Workteam']['SubDomain']


print("\nLog into the work team portal link provided below, and review the anomalies.\n")

print ("=" * 60)
print("Portal URL\n{}".format(workteam_portal))
print ('=' * 60)

#print('https://' + sagemaker_client.describe_workteam(WorkteamName=workteamName)['Workteam']['SubDomain'])

---
### 6.4. Complete the review

The URL to the portal was printed out in the last step. Open the URL in a browser and log in with credentials of the human review worker. </br>
You should have sent an invitation email to a worker for joining the workteam when creating the work team in the Amazon A2I console.

In [None]:
review_complete = 'N'

In [None]:
while review_complete == 'N':
    
    review_complete = input("\nPlease log into A2I Portal and complete the review.\nIs the review complete (Y/N)?")
    
    if review_complete == 'Y':
        break

In [None]:
completed_human_loops_s3_output = ""

try:
    describe_human_loop_response = a2i_client.describe_human_loop(HumanLoopName=humanLoopName)
    print("\nDescribe human loop response: ")
    pprint.pprint(describe_human_loop_response, width=2)
    
    completed_human_loops_s3_output = describe_human_loop_response["HumanLoopOutput"]["OutputS3Uri"]
    print("HumanLoop Status: {}".format(describe_human_loop_response["HumanLoopStatus"]))
except:
    print("Error getting human loop")


#print("\nHumanLoop Name: {}".format(humanLoopName))
#print("HumanLoop Status: {}".format(describe_human_loop_response["HumanLoopStatus"]))
print("\nOutput in S3 at: \n{}".format(describe_human_loop_response["HumanLoopOutput"]["OutputS3Uri"]))

   

In [None]:
pp = pprint.PrettyPrinter(indent=4)
json_output = ''

s3_bucket_name, s3_object_name = completed_human_loops_s3_output.replace("s3://", "").split("/", 1)

print("S3 bucket name: {}".format(s3_bucket_name))
print("S3 object name: {}".format(s3_object_name))

# Amazon S3 client 
s3_client = boto3.client('s3')

try:
    get_object_response = s3_client.get_object(Bucket=s3_bucket_name, Key=s3_object_name)
    content = get_object_response["Body"].read()
    json_output = json.loads(content)
    pp.pprint(json_output)
    print('\n')

except:
    print("Error getting S3 object: {}".format(completed_human_loops_s3_output))

#print("\nS3 get object response: ")
#pprint.pprint(get_object_response, width=2)


review_result = json_output['humanAnswers'][0]['answerContent']

#print(review_result)

---
### 7.0. Update human review feedback in Amazon Lookout For Metrics

This will help to improve the prediction accuracy.

In [None]:

if not L4M:
    L4M = boto3.client( "lookoutmetrics")

col_name_suffix = 1
anomaly_col_name = 'anomaly_found-' + str(col_name_suffix)

while anomaly_col_name in review_result:
    
    #tseriesid = review_result['tseriesid-' + str(col_name_suffix)]    
    
    print("\n{}".format(str(col_name_suffix)))
    
    is_anomaly = review_result[anomaly_col_name]['on']
    print("Is Anomaly: {}".format(is_anomaly))
        
    #print(df_anomalies_by_ts.loc[(df_anomalies_by_ts['timestamp'] == timestamp) & (df_anomalies_by_ts['marketplace'] ==marketplace) & (df_anomalies_by_ts['platform'] ==platform)])
    
    # get corresponding time series id and anomaly group id from dataframe df_anomalies_by_ts
    row_value = df_anomalies_by_ts.loc[(df_anomalies_by_ts['timestamp'] == timestamp) & (df_anomalies_by_ts['marketplace'] ==marketplace) & (df_anomalies_by_ts['platform'] ==platform)]
    #print("Row :{}".format(row_value))

    tseriesid = row_value['tseriesid'].tolist()[0]
    print("tseriesid: {}".format(tseriesid))
    
    anomaly_group_id = row_value['anomaly_group_id'].tolist()[0]
    print("Anomaly group id: {}".format(anomaly_group_id))
   
    #print('\nPut Feedback Response: {}'.format(put_feedback_response))
    
    col_name_suffix += 1
    anomaly_col_name = 'anomaly_found-' + str(col_name_suffix)

    put_feedback_response = L4M.put_feedback(
            AnomalyDetectorArn=ecom_anomaly_detector_arn,
            AnomalyGroupTimeSeriesFeedback={
                'AnomalyGroupId': anomaly_group_id,
                'TimeSeriesId': tseriesid,
                'IsAnomaly': is_anomaly}
    )

---
### 8.0. Clean up resources 

You can start to cleanup the resources that were created. This will erase all the resources that have been created, so wait to run this until you are sure you wish to delete everything.


**Note that since we created a continuous detector, it will continue to run once every hour until it is deleted.**




In [None]:
# Check if the detector exists
answer = input("Delete resources? (y/n)")

if answer in ["y", "Y", "yes", "YES"]:
    delete_resources = True
else:
    delete_resources = False
    
if delete_resources:
    L4M.delete_anomaly_detector( AnomalyDetectorArn = ecom_anomaly_detector_arn )
    while True:
        try:
            response = lookoutmetrics_client.describe_anomaly_detector( AnomalyDetectorArn = arn )
            if response["Status"] == "DELETING":
                print("status: DELETING")
                time.sleep(5)
                continue
            break
        except lookoutmetrics_client.exceptions.ResourceNotFoundException:
            break

    iam = boto3.client("iam")
    iam.detach_role_policy( PolicyArn = "arn:aws:iam::aws:policy/AmazonS3FullAccess", RoleName = role_name )
    iam.detach_role_policy( PolicyArn = "arn:aws:iam::aws:policy/AmazonSNSFullAccess", RoleName = role_name )
    iam.delete_role(RoleName=role_name)
    print("Deleted %s" % role_name)



In [None]:
# remove the resources

if delete_resources:
    
    # Check the status of human loop
    describe_human_loop_response = a2i_client.describe_human_loop(
        HumanLoopName=humanLoopName
    )

    print("\nDescribe human loop response: ")
    pprint.pprint(describe_human_loop_response, width=2)


    # 
    if describe_human_loop_response['HumanLoopStatus'] ==  "InProgress":
        stop_human_loop_response = a2i_client.stop_human_loop(
            HumanLoopName=humanLoopName
        )

        # Wait until human loop has stopped
        while True:
            describe_human_loop_response = a2i_client.describe_human_loop(
                HumanLoopName=humanLoopName
            )
            if describe_human_loop_response['HumanLoopStatus'] in ["Stopped", "Failed", "Completed"]:
                break
            time.sleep(5)
    
    
        # Delete human loop
        delete_human_loop_response = a2i_client.delete_human_loop(
            HumanLoopName=humanLoopName
        )
        print("\nDelete human loop response: ")
        pprint.pprint(delete_human_loop_response, width=2)
    
    # Delete work flow.
    delete_flow_definition_response = sagemaker_client.delete_flow_definition(
         FlowDefinitionName=l4m_flowDefinitionName
    )

    print("\nDelete flow definition response: ")
    pprint.pprint(delete_flow_definition_response, width=2)

    # Delete human task UI
    # Check if Amazon lookout for metrics UI exists
    try:
        delete_human_task_ui_response = sagemaker_client.delete_human_task_ui(
            HumanTaskUiName=l4m_taskUIName
        )
        print("\nDelete human task UI: ")
        pprint.pprint(delete_human_task_ui_response, width = 2)
    except:
        print("Human task UI {} not found".format(l4m_taskUIName))
