{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Automate detection of abnormal equipment behavior and review predictions with human in the loop using Amazon Lookout for Equipment and Amazon A2I\n", "\n", "In this notebook we will show you how you can setup Amazon Lookout for Equipment to train an abnormal behavior detection model using a wind turbine dataset for predictive maintenance and setup up a human in the loop workflow to review the predictions using Amazon A2I, augment the dataset and retrain the model.\n", "\n", "To get started with Amazon Lookout for Equipment, we will create a dataset, ingest data, train a model and run inference by setting up a scheduler. After going through these steps we will show you how you can quickly setup human review process using Amazon A2I and retrain your model with augmented or human reviewed datasets. we will walk you through the following steps:\n", "1.\tCreating a dataset in Amazon Lookout for Equipment\n", "2.\tIngesting data into the Amazon Lookout for Equipment dataset\n", "3.\tTraining a model in Amazon Lookout for Equipment\n", "4.\tRunning diagnostics on the trained model\n", "5.\tCreating an inference scheduler in Amazon Lookout for Equipment to send a simulated stream of real-time requests.\n", "6.\tSetting up an Amazon A2I private human loop and reviewing the predictions from Amazon Lookout for Equipment.\n", "7.\tRetraining your Amazon Lookout for Equipment model based on augmented datasets from Amazon A2I.\n", "\n", "**Note:** \n", "1. Before you get started, make sure you have downloaded the open source wind turbine dataset from Engie and saved it in a designated S3 path. If you haven't done this, please go through `1_data_preparation.ipynb` notebook. \n", "\n", "2. To run this notebook we provide you with pre-generated labels for our dataset example, so you can directly run this notebook after you run the `1_data_preparation.ipynb` notebook. \n", "\n", "3. In general, the open source wind turbine dataset doesn't come with known date ranges when the turbine behaved abnormaly and this is also a known and common issue for many of our customers. Please, also go through `2_discover_anomaly_labels.ipynb` notebook to generate labels." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Prerequisites\n", "\n", "1. Please ensure that the IAM role attached to your SageMaker notebook instance has the permissions to use Lookout for Equipment. You can enable this by adding the Lookout for Equipment policy to the IAM role attached to your notebook instance. To access this go to SageMaker Console --> Notebook Instances --> Click on your notebook name --> Scroll down to IAM and click on the IAM Role here\n", "\n", "2. Next ensure you attach \"lookoutequipment.amazonaws.com\" in the trust policy for the IAM role attached to your notebook instance as below:\n", "\n", "```\n", "{\n", " \"Version\": \"2012-10-17\",\n", " \"Statement\": [\n", " {\n", " \"Effect\": \"Allow\",\n", " \"Principal\": {\n", " \"Service\": [\n", " \"sagemaker.amazonaws.com\",\n", " \"lookoutequipment.amazonaws.com\",\n", " \"s3.amazonaws.com\"\n", " ]\n", " },\n", " \"Action\": \"sts:AssumeRole\"\n", " }\n", " ]\n", "}", "```\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Setup environment" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%sh\n", "pip -q install --upgrade pip\n", "pip -q install --upgrade awscli boto3 sagemaker smart_open\n", "pip -q install tqdm" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import boto3\n", "import datetime\n", "import os\n", "import pandas as pd\n", "import pprint\n", "import sagemaker\n", "from sagemaker import get_execution_role\n", "from sagemaker.s3 import S3Uploader, S3Downloader\n", "import s3fs\n", "import sys\n", "import time\n", "import uuid\n", "import warnings\n", "\n", "# Helper functions for managing Lookout for Equipment API calls:\n", "sys.path.append('../../getting_started/utils')\n", "import lookout_equipment_utils as lookout" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#Uncomment the lines below if you want to view all columns in a dataframe for example, but will be resource intensive\n", "#import pandas as pd\n", "#pd.set_option('display.max_rows', None)\n", "#pd.set_option('display.max_columns', None)\n", "#pd.set_option('display.width', None)\n", "#pd.set_option('display.max_colwidth', -1)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "REGION_NAME = 'us-east-1'\n", "BUCKET = ''\n", "PREFIX = 'wind-turbine'\n", "\n", "ROLE_ARN = sagemaker.get_execution_role()\n", "\n", "TURBINE_ID = 'R80711'\n", "TRAIN_DATA = f's3://{BUCKET}/{PREFIX}/training_data/{TURBINE_ID}'\n", "LABEL_DATA = f's3://{BUCKET}/{PREFIX}/labelled_data/{TURBINE_ID}'\n", "\n", "DATASET_NAME = 'wind-turbine-dataset'\n", "MODEL_NAME = 'wind-turbine-model'" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Load and view data" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df = pd.read_csv(f'{TRAIN_DATA}/telemetry.csv', index_col = 'Timestamp')\n", "df.head()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df.shape" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "labels = pd.read_csv('labels/labels.csv', header=None)\n", "labels.head()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "labels.shape" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create the Dataset Component Map" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "DATASET_COMPONENT_FIELDS_MAP = dict()\n", "DATASET_COMPONENT_FIELDS_MAP[TURBINE_ID] = df.reset_index().columns.to_list()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create L4E Dataset" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "lookout_dataset = lookout.LookoutEquipmentDataset(\n", " dataset_name=DATASET_NAME,\n", " component_fields_map=DATASET_COMPONENT_FIELDS_MAP,\n", " region_name=REGION_NAME,\n", " access_role_arn=ROLE_ARN\n", ")\n", "\n", "pp = pprint.PrettyPrinter(depth=5)\n", "pp.pprint(eval(lookout_dataset.dataset_schema))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "lookout_dataset.create()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Ingest data into L4E dataset" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response = lookout_dataset.ingest_data(BUCKET, f'{PREFIX}/training_data/')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Get the ingestion job ID and status:\n", "data_ingestion_job_id = response['JobId']\n", "data_ingestion_status = response['Status']\n", "\n", "# Wait until ingestion completes:\n", "print(\"=====Polling Data Ingestion Status=====\\n\")\n", "lookout_client = lookout.get_client(region_name=REGION_NAME)\n", "print(str(pd.to_datetime(datetime.datetime.now()))[:19], \"| \", data_ingestion_status)\n", "\n", "while data_ingestion_status == 'IN_PROGRESS':\n", " time.sleep(60)\n", " describe_data_ingestion_job_response = lookout_client.describe_data_ingestion_job(JobId=data_ingestion_job_id)\n", " data_ingestion_status = describe_data_ingestion_job_response['Status']\n", " print(str(pd.to_datetime(datetime.datetime.now()))[:19], \"| \", data_ingestion_status)\n", " \n", "print(\"\\n=====End of Polling Data Ingestion Status=====\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "describe_data_ingestion_job_response" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Train L4E Model" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Split train and test data" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "train_ratio = 0.8\n", "train_split = int(len(df.index)*train_ratio)\n", "\n", "def change_date_format(datetime):\n", " return pd.to_datetime(datetime).strftime(\"%Y-%m-%d %H:%M:%S\")\n", "\n", "training_start = pd.to_datetime(df.index[0])\n", "training_end = pd.to_datetime(df.index[train_split])\n", "evaluation_start = pd.to_datetime(df.index[train_split+1])\n", "evaluation_end = pd.to_datetime(df.index[-1])\n", "\n", "print(f'Training period: from {training_start} to {training_end}')\n", "print(f'Evaluation period: from {evaluation_start} to {evaluation_end}')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Prepare labels\n", "For this notebook example we are using the existing labels available in our dataset. If you would like to know how to create your own labels for your dataset please refer to 2_discover_anomaly_labels.ipynb" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df_labels = pd.read_csv('labels/labels.csv', header=None, parse_dates=True)\n", "df_labels[0] = [pd.to_datetime(x).strftime(\"%Y-%m-%dT%H:%M:%S.%f\") for x in df_labels[0]]\n", "df_labels[1] = [pd.to_datetime(x).strftime(\"%Y-%m-%dT%H:%M:%S.%f\") for x in df_labels[1]]\n", "df_labels" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df_labels.to_csv('labels/labels_reviewed.csv', header=None, index=None)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!aws s3 cp labels/labels_reviewed.csv s3://$BUCKET/$PREFIX/labelled_data/labels.csv" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Setup Training Config" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Prepare the model parameters:\n", "lookout_model = lookout.LookoutEquipmentModel(model_name=MODEL_NAME,\n", " dataset_name=DATASET_NAME,\n", " region_name=REGION_NAME)\n", "\n", "# Set the training / evaluation split date:\n", "lookout_model.set_time_periods(evaluation_start,\n", " evaluation_end,\n", " training_start,\n", " training_end)\n", "\n", "# Set the label data location:\n", "lookout_model.set_label_data(bucket=BUCKET, \n", " prefix=PREFIX+'/labelled_data/',\n", " access_role_arn=ROLE_ARN)\n", "\n", "# This sets up the rate the service will resample the data before \n", "# training:\n", "lookout_model.set_target_sampling_rate(sampling_rate='PT10M')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Train model" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Actually create the model and train it:\n", "lookout_model.train()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### the step below will make this notebook poll for 2.5 hours" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Run this only if you want this notebook to wait here till the training is complete\n", "lookout_model.poll_model_training()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Get diagnostics for the trained model" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "MODEL_NAME" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "lookout_client = lookout.get_client(region_name=REGION_NAME)\n", "describe_model_response = lookout_client.describe_model(ModelName=MODEL_NAME)\n", "list(describe_model_response.keys())" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "describe_model_response['Status']" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "LookoutDiagnostics = lookout.LookoutEquipmentAnalysis(model_name=MODEL_NAME, tags_df=df, region_name=REGION_NAME)\n", "LookoutDiagnostics.set_time_periods(evaluation_start, evaluation_end, training_start, training_end)\n", "predicted_ranges = LookoutDiagnostics.get_predictions()\n", "labels_fname = os.path.join(LABEL_DATA, 'labels.csv')\n", "labeled_ranges = LookoutDiagnostics.get_labels(labels_fname)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "labeled_ranges" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Model diagnostics with feature contribution (% that the feature contributed to the anomaly that was detected) toward anomaly patterns" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "list_d = []\n", "for rec in predicted_ranges['diagnostics']:\n", " list_d.append(pd.DataFrame.from_dict(rec).set_index('name'))\n", "diagnostics_df_ = pd.concat(list_d, axis=1).T.reset_index(drop=True)\n", "diagnostics_df = pd.concat([predicted_ranges[['start','end']],diagnostics_df_], axis=1)\n", "diagnostics_df" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Display Anomaly Events" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def build_labels_df(df, predicted_ranges, labeled_ranges):\n", " labels_df = pd.DataFrame(index=pd.to_datetime(df.index))\n", " labels_df['true'] = 0\n", " labels_df['predicted'] = 0\n", " \n", " mask = labels_df.index >= evaluation_start\n", " labels_df = labels_df.loc[mask, :]\n", " \n", " for row in labeled_ranges.iterrows():\n", " s = pd.to_datetime(row[1]['start'])\n", " e = pd.to_datetime(row[1]['end'])\n", " labels_df.loc[s:e,'true'] = 1\n", " \n", " for row in predicted_ranges.iterrows():\n", " s = pd.to_datetime(row[1]['start'])\n", " e = pd.to_datetime(row[1]['end'])\n", " labels_df.loc[s:e,'predicted'] = 1\n", " \n", " return labels_df\n", "\n", "labels_df = build_labels_df(df, predicted_ranges, labeled_ranges)\n", "labels_df" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "c_ = []\n", "for row in labeled_ranges.iterrows():\n", " s = pd.to_datetime(row[1]['start'])\n", " e = pd.to_datetime(row[1]['end'])\n", " a = labels_df.loc[s:e,:].index\n", " b = labels_df.loc[labels_df.sum(axis=1) == 2].index\n", " c = set(a).intersection(set(b))\n", " if c:\n", " c_.append(1)\n", "\n", "print('Total abnormal events detected: ', len(c_))\n", "print('Total abnormal events in the evaluation period: ', len(labeled_ranges.loc[labeled_ranges['start']>=evaluation_start,:]))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "PREDICTIONS_FNAME = 'predictions.csv'\n", "labels_df.to_csv(f's3://{BUCKET}/{PREFIX}/labelled_data/{PREDICTIONS_FNAME}')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Run inference on the L4E model" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create the inference scheduler\n", "The CreateInferenceScheduler API creates a scheduler **and** starts it: this means that this starts costing you right away. However, you can stop and start an existing scheduler at will (see at the end of this notebook):" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "ROLE_ARN = sagemaker.get_execution_role()\n", "\n", "# Name of the inference scheduler you want to create\n", "INFERENCE_SCHEDULER_NAME = 'wind-turbine-inference-scheduler'\n", "\n", "# Name of the model on which you want to create this inference scheduler\n", "MODEL_NAME_FOR_CREATING_INFERENCE_SCHEDULER = MODEL_NAME\n", "\n", "# Mandatory parameters:\n", "INFERENCE_DATA_SOURCE_BUCKET = BUCKET\n", "INFERENCE_DATA_SOURCE_PREFIX = f'{PREFIX}/inference-a2i/input/'\n", "INFERENCE_DATA_OUTPUT_BUCKET = BUCKET\n", "INFERENCE_DATA_OUTPUT_PREFIX = f'{PREFIX}/inference-a2i/output/'\n", "ROLE_ARN_FOR_INFERENCE = ROLE_ARN\n", "DATA_UPLOAD_FREQUENCY = 'PT10M'" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "DATA_DELAY_OFFSET_IN_MINUTES = None\n", "INPUT_TIMEZONE_OFFSET = '+00:00'\n", "COMPONENT_TIMESTAMP_DELIMITER = '_'\n", "TIMESTAMP_FORMAT = 'yyyyMMddHHmmss'" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "scheduler = lookout.LookoutEquipmentScheduler(\n", " scheduler_name=INFERENCE_SCHEDULER_NAME,\n", " model_name=MODEL_NAME_FOR_CREATING_INFERENCE_SCHEDULER,\n", " region_name=REGION_NAME\n", ")\n", "\n", "scheduler_params = {\n", " 'input_bucket': INFERENCE_DATA_SOURCE_BUCKET,\n", " 'input_prefix': INFERENCE_DATA_SOURCE_PREFIX,\n", " 'output_bucket': INFERENCE_DATA_OUTPUT_BUCKET,\n", " 'output_prefix': INFERENCE_DATA_OUTPUT_PREFIX,\n", " 'role_arn': ROLE_ARN_FOR_INFERENCE,\n", " 'upload_frequency': DATA_UPLOAD_FREQUENCY,\n", " 'delay_offset': DATA_DELAY_OFFSET_IN_MINUTES,\n", " 'timezone_offset': INPUT_TIMEZONE_OFFSET,\n", " 'component_delimiter': COMPONENT_TIMESTAMP_DELIMITER,\n", " 'timestamp_format': TIMESTAMP_FORMAT\n", "}\n", "\n", "scheduler.set_parameters(**scheduler_params)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Prepare the inference data\n", "---\n", "Let's prepare and send some data in the S3 input location our scheduler will monitor:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Let's load all our original signals:\n", "all_tags_fname = TRAIN_DATA+'/'+turbine_id+'/'+turbine_id+'.csv'\n", "all_tags_df = pd.read_csv(all_tags_fname)\n", "all_tags_df['Timestamp']= pd.to_datetime(all_tags_df['Timestamp'])\n", "all_tags_df = all_tags_df.set_index('Timestamp')\n", "all_tags_df.head()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "all_tags_df.index.max()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### To build our sample inference dataset, we will extract the last few minutes of the evaluation period of the original time series:\n", "Specifically we will create 3 csv files for our turbine 5 minutes apart. These are all stored in s3 in the inference-a2i folder" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# How many sequences do we want to extract:\n", "num_sequences = 3\n", "\n", "# The scheduling frequency in minutes: this **MUST** match the\n", "# resampling rate used to train the model:\n", "frequency = 10\n", "# Getting a better range for more data points\n", "duration = 40\n", "\n", "# Loops through each sequence:\n", "start = all_tags_df.index.max() + datetime.timedelta(minutes=-duration * (num_sequences))\n", "j = 0\n", "for i in range(num_sequences):\n", " print(\"num seq i: \" + str(i))\n", " end = start + datetime.timedelta(minutes=+duration)\n", " \n", "# Rounding time to the previous 5 minutes:\n", " tm = datetime.datetime.now()\n", " print(tm)\n", " tm = tm - datetime.timedelta(\n", " minutes=tm.minute % frequency,\n", " seconds=tm.second,\n", " microseconds=tm.microsecond\n", " )\n", " tm = tm + datetime.timedelta(minutes=+frequency * (i))\n", " current_timestamp = (tm).strftime(format='%Y%m%d%H%M%S')\n", "\n", "\n", " # For each sequence, we need to loop through all components:\n", " print(f'Extracting data from {start} to {end}:')\n", " new_index = None\n", " \n", " # Extracting the dataframe for this component and this particular time range:\n", " signals = list(df.columns)\n", " signals_df = all_tags_df.loc[start:end, signals]\n", " \n", " # We need to reset the index to match the time \n", " # at which the scheduler will run inference:\n", " if new_index is None:\n", " new_index = pd.date_range(\n", " start=tm,\n", " periods=signals_df.shape[0], \n", " freq='2min'\n", " )\n", " signals_df.index = new_index\n", " signals_df.index.name = 'Timestamp'\n", " signals_df = signals_df.reset_index()\n", " signals_df['Timestamp'] = pd.to_datetime(signals_df['Timestamp'], errors='coerce')\n", " # IMPORTANT STEP - we are populating a new data frame here to be used in A2I display UI for reference\n", " if j == 0:\n", " sig_full_df = signals_df\n", " j = 1\n", " else:\n", " sig_full_df = pd.concat([sig_full_df,signals_df], ignore_index=True)\n", " # Export this file in CSV format:\n", " component_fname = os.path.join(INFER_DATA_A2I, 'input', f'{turbine_id}_{current_timestamp}.csv')\n", " print(\"creating inference input files: \" + component_fname)\n", " signals_df.to_csv(component_fname, index=None)\n", " \n", " start = start + datetime.timedelta(minutes=+duration)\n", " \n", " # Upload the whole folder to S3, in the input location:\n", " INFERENCE_INPUT = os.path.join(INFER_DATA_A2I, 'input')\n", " !aws s3 cp --recursive --quiet $INFERENCE_INPUT s3://$BUCKET/$PREFIX/inference-a2i/input\n", " \n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sig_full_df" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Now that we've prepared the data, create the scheduler by running:\n", "create_scheduler_response = scheduler.create()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Get inference results\n", "---" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### List inference executions" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Let's now wait for 5-15 minutes to give some time to the scheduler to run its first inferences.** Once the wait is over, we can use the ListInferenceExecution API for our current inference scheduler. The only mandatory parameter is the scheduler name.\n", "\n", "You can also choose a time period for which you want to query inference executions for. If you don't specify it, then all executions for an inference scheduler will be listed. If you want to specify the time range, you can do this:\n", "\n", "```python\n", "START_TIME_FOR_INFERENCE_EXECUTIONS = datetime.datetime(2010,1,3,0,0,0)\n", "END_TIME_FOR_INFERENCE_EXECUTIONS = datetime.datetime(2010,1,5,0,0,0)\n", "```\n", "\n", "Which means the executions after `2010-01-03 00:00:00` and before `2010-01-05 00:00:00` will be listed.\n", "\n", "You can also choose to query for executions in particular status, the allowed status are `IN_PROGRESS`, `SUCCESS` and `FAILED`." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "START_TIME_FOR_INFERENCE_EXECUTIONS = None\n", "END_TIME_FOR_INFERENCE_EXECUTIONS = None\n", "EXECUTION_STATUS = None\n", "\n", "execution_summaries = []\n", "\n", "while len(execution_summaries) == 0:\n", " execution_summaries = scheduler.list_inference_executions(\n", " start_time=START_TIME_FOR_INFERENCE_EXECUTIONS,\n", " end_time=END_TIME_FOR_INFERENCE_EXECUTIONS,\n", " execution_status=EXECUTION_STATUS\n", " )\n", " if len(execution_summaries) == 0:\n", " print('WAITING FOR THE FIRST INFERENCE EXECUTION')\n", " time.sleep(60)\n", " \n", " else:\n", " print('FIRST INFERENCE EXECUTED\\n')\n", " break\n", " \n", "execution_summaries" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Get actual prediction results" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "After each successful inference, a CSV file is created in the output location of your bucket. Each inference creates a new folder with a single `results.csv` file in it. Let's read these files and display their content here:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# If not installed at the beginning of the notebook, run this\n", "#!pip install smart_open" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import json\n", "from smart_open import smart_open\n", "results_df = []\n", "something = 0\n", "for execution_summary in execution_summaries:\n", " print(\"Checking inference for \" + str(execution_summary['ScheduledStartTime']) + \" with status \" + execution_summary['Status'])\n", " if execution_summary['Status'] == 'SUCCESS':\n", " something = 1\n", " bucket = execution_summary['CustomerResultObject']['Bucket']\n", " key = execution_summary['CustomerResultObject']['Key']\n", " fname = f's3://{bucket}/{key}'\n", " with smart_open(fname,'r') as file:\n", " data = json.load(file)\n", " results_df.append(pd.DataFrame([data]))\n", "\n", " # Assembles them into a DataFrame:\n", "if something == 1:\n", " results_df = pd.concat(results_df, axis='index')\n", " results_df.columns = ['Timestamp', 'Predictions']\n", " results_df['Timestamp'] = pd.to_datetime(results_df['Timestamp'],errors='coerce')\n", " results_df = results_df.set_index('Timestamp')\n", "else:\n", " results_df.append('No successful inference results yet, please try again..')\n", "\n", "results_df" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "results_df.to_csv(os.path.join(INFER_DATA_A2I, 'output', 'results.csv'))\n", "results_df" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Stop Inference Scheduler\n", "Let's make sure to stop the inference scheduler as we won't require it for the rest of the steps below. But, as part of your solution, the inference scheduler should be running to ensure real-time inference for your equipment are continued." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "scheduler.stop(wait=True)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# IF we dont need this scheduler anymore\n", "scheduler.delete()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# A2I activities start here\n", "Now that we saw the inference has been executed, let's now understand how to setup a UI to review the inference results and update it, so we can send it back to L4E for retraining the model. Follow the steps provided below" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Initialize handlers" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "timestamp = time.strftime(\"%Y-%m-%d-%H-%M-%S\", time.gmtime())\n", "# Amazon SageMaker client\n", "sagemaker_client = boto3.client('sagemaker')\n", "\n", "# Amazon Augment AI (A2I) client\n", "a2i = boto3.client('sagemaker-a2i-runtime')\n", "\n", "# Amazon S3 client \n", "s3 = boto3.client('s3')\n", "\n", "# Flow definition name - this value is unique per account and region. You can also provide your own value here.\n", "flowDefinitionName = 'fd-l4e-' + timestamp\n", "\n", "# Task UI name - this value is unique per account and region. You can also provide your own value here.\n", "taskUIName = 'ui-l4e-' + timestamp\n", "\n", "# Flow definition outputs - temp S3 bucket in current region, as L4E is in AP region currently - to be changed at GA\n", "a2ibucket = 'prem-experiments'\n", "OUTPUT_PATH = f's3://' + a2ibucket + '/' + PREFIX + '/a2i-results'\n", "\n", "role = get_execution_role()\n", "print(\"RoleArn: {}\".format(role))\n", "WORKTEAM_ARN = ''" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create the human task UI\n", "Create a human task UI resource, giving a UI template in liquid html.You can download this tempalte and customize it This template will be rendered to the human workers whenever human loop is required. For over 70 pre built UIs, check: https://github.com/aws-samples/amazon-a2i-sample-task-uis. But first, lets declare some variables that we need during the next set of steps." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# We customized the tabular template for our notebook as below\n", "template = r\"\"\"\n", "\n", "\n", "\n", "\n", "
\n", "

Instructions

\n", "

Please review the equipment sensor inference inputs, and make corrections to anomaly predictions from the Lookout for Equipment Model.

\n", " \n", "
\n", "

Equipment Sensor Readings

\n", "
\n", " \n", "
\n", "
\n", "
\n", "

Select the correct equipment status below

\n", "

0 means the equipment is fine. 1 means the equipment is faulty or is in the process of wearing down

\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " {% for pair in task.input.anomaly %}\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " {% endfor %}\n", "
StartEndPredicted AnomalyCorrected StartCorrected EndCorrected StatusComments
\n", "

\n", " \n", "

\n", "
\n", "

\n", " \n", "

\n", "
\n", "

\n", " \n", " 1-Faulty
\n", " 0-Good\n", "
\n", "

\n", "
\n", "

\n", " \n", "

\n", "
\n", "
\n", "
\n", "
\n", "\n", "\n", "\"\"\"" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def create_task_ui():\n", " '''\n", " Creates a Human Task UI resource.\n", " Returns:\n", " struct: HumanTaskUiArn\n", " '''\n", " response = sagemaker_client.create_human_task_ui(\n", " HumanTaskUiName=taskUIName,\n", " UiTemplate={'Content': template})\n", " return response" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Create task UI\n", "humanTaskUiResponse = create_task_ui()\n", "humanTaskUiArn = humanTaskUiResponse['HumanTaskUiArn']\n", "print(humanTaskUiArn)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "role = get_execution_role()\n", "print(\"RoleArn: {}\".format(role))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "create_workflow_definition_response = sagemaker_client.create_flow_definition(\n", " FlowDefinitionName= flowDefinitionName,\n", " RoleArn=role,\n", " HumanLoopConfig= {\n", " \"WorkteamArn\": WORKTEAM_ARN,\n", " \"HumanTaskUiArn\": humanTaskUiArn,\n", " \"TaskCount\": 1,\n", " \"TaskDescription\": \"Review the contents and select correct values as indicated\",\n", " \"TaskTitle\": \"Equipment Condition Review\"\n", " },\n", " OutputConfig={\n", " \"S3OutputPath\" : OUTPUT_PATH\n", " }\n", " )\n", "flowDefinitionArn = create_workflow_definition_response['FlowDefinitionArn'] # let's save this ARN for future use" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "for x in range(60):\n", " describeFlowDefinitionResponse = sagemaker_client.describe_flow_definition(FlowDefinitionName=flowDefinitionName)\n", " print(describeFlowDefinitionResponse['FlowDefinitionStatus'])\n", " if (describeFlowDefinitionResponse['FlowDefinitionStatus'] == 'Active'):\n", " print(\"Flow Definition is active\")\n", " break\n", " time.sleep(2)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Sending predictions to Amazon A2I human loops" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "a2i_sig_full_df = sig_full_df.reset_index()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "NUM_TO_REVIEW = 5 # number of line items to review\n", "dftimestamp = a2i_sig_full_df['Timestamp'].astype(str).to_list()\n", "dfsig001 = a2i_sig_full_df['Q_avg'].astype(str).to_list()\n", "dfsig002 = a2i_sig_full_df['Ws1_avg'].astype(str).to_list()\n", "dfsig003 = a2i_sig_full_df['Ot_avg'].astype(str).to_list()\n", "dfsig004 = a2i_sig_full_df['Nf_avg'].astype(str).to_list()\n", "dfsig046 = a2i_sig_full_df['Ba_avg'].astype(str).to_list()\n", "sig_list = [{'timestamp': dftimestamp[x], 'reactive_power': dfsig001[x], 'wind_speed_1': dfsig002[x], 'outdoor_temp': dfsig003[x], 'grid_frequency': dfsig004[x], 'pitch_angle': dfsig046[x]} for x in range(NUM_TO_REVIEW)]\n", "sig_list" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "old_results_df = results_df" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# To be executed only for the first time for after an inference call\n", "results_df.reset_index(inplace=True)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "results_df['StartTimestamp'] = results_df['Timestamp'] - datetime.timedelta(minutes=frequency*12)\n", "results_df['EndTimestamp'] = results_df['Timestamp'] + datetime.timedelta(minutes=frequency*12)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#results_df = results_df.drop(['index'], axis=1)\n", "results_df" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "dfstartts = results_df['StartTimestamp'].astype(str).to_list()\n", "dfendts = results_df['EndTimestamp'].astype(str).to_list()\n", "dfano = results_df['Predictions'].to_list()\n", "ano_list = [{'startts': dfstartts[x], 'endts': dfendts[x], 'ano': dfano[x]} for x in range(len(results_df))]\n", "ano_list" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "ip_content = {\"signal\": sig_list,\n", " 'anomaly': ano_list\n", " }" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Start the human review" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import json\n", "humanLoopName = str(uuid.uuid4())\n", "\n", "start_loop_response = a2i.start_human_loop(\n", " HumanLoopName=humanLoopName,\n", " FlowDefinitionArn=flowDefinitionArn,\n", " HumanLoopInput={\n", " \"InputContent\": json.dumps(ip_content)\n", " }\n", " )\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "completed_human_loops = []\n", "resp = a2i.describe_human_loop(HumanLoopName=humanLoopName)\n", "print(f'HumanLoop Name: {humanLoopName}')\n", "print(f'HumanLoop Status: {resp[\"HumanLoopStatus\"]}')\n", "print(f'HumanLoop Output Destination: {resp[\"HumanLoopOutput\"]}')\n", "print('\\n')\n", " \n", " \n", "if resp[\"HumanLoopStatus\"] == \"Completed\":\n", " completed_human_loops.append(resp)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# login link to navigate to the private workforce portal" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "workteamName = WORKTEAM_ARN[WORKTEAM_ARN.rfind('/') + 1:]\n", "print(\"Navigate to the private worker portal and do the tasks. Make sure you've invited yourself to your workteam!\")\n", "print('https://' + sagemaker_client.describe_workteam(WorkteamName=workteamName)['Workteam']['SubDomain'])" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "completed_human_loops = []\n", "resp = a2i.describe_human_loop(HumanLoopName=humanLoopName)\n", "print(f'HumanLoop Name: {humanLoopName}')\n", "print(f'HumanLoop Status: {resp[\"HumanLoopStatus\"]}')\n", "print(f'HumanLoop Output Destination: {resp[\"HumanLoopOutput\"]}')\n", "print('\\n')\n", " \n", " \n", "if resp[\"HumanLoopStatus\"] == \"Completed\":\n", " completed_human_loops.append(resp)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Evaluating the results\n", "\n", "When the labeling work is complete, your results should be available in the S3 output path specified in the human review workflow definition. \n", "The human answers are returned and saved in the JSON file." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import re\n", "import pprint\n", "\n", "pp = pprint.PrettyPrinter(indent=4)\n", "json_output = ''\n", "for resp in completed_human_loops:\n", " splitted_string = re.split('s3://' + a2ibucket + '/', resp['HumanLoopOutput']['OutputS3Uri'])\n", " print(splitted_string[1])\n", " output_bucket_key = splitted_string[1]\n", " response = s3.get_object(Bucket=a2ibucket, Key=output_bucket_key)\n", " content = response[\"Body\"].read()\n", " json_output = json.loads(content)\n", " pp.pprint(json_output)\n", " print('\\n')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Retrain L4E based on A2I correction\n", "Now we'll take the A2I output, preprocess it and send it back to L4E for retraining our model based on the user corrections" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "labels_df = pd.read_csv(os.path.join(LABEL_DATA, 'labels.csv'), header=None)\n", "labels_df[0] = pd.to_datetime(labels_df[0])\n", "labels_df[1] = pd.to_datetime(labels_df[1])\n", "labels_df.columns = ['start', 'end']\n", "labels_df.tail()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "a2i_lbl_df = pd.DataFrame()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Update Labels with new date ranges" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "faulty = False\n", "a2i_lbl_df = labels_df\n", "x = json_output['humanAnswers'][0]\n", "row_df = pd.DataFrame(columns=['rownr'])\n", "tslist = {}\n", "\n", "# Let's first check if the users mark equipment as faulty and if so get those row numbers into a dataframe \n", "for i in json_output['humanAnswers']:\n", " print(\"checking equipment review...\")\n", " x = i['answerContent']\n", " for idx, key in enumerate(x):\n", " if \"faulty\" in key:\n", " if str(x.get(key)).split(':')[1].lstrip().strip('}') == \"True\": # faulty equipment selected\n", " faulty = True\n", " row_df.loc[len(row_df.index)] = [key.split('-')[1]] \n", " print(\"found faulty equipment in row: \" + key.split('-')[1])\n", "\n", "\n", "# Now we will get the date ranges for the faulty choices \n", "for idx,k in row_df.iterrows():\n", " x = json_output['humanAnswers'][0]\n", " strchk = \"TrueStart\"+k['rownr']\n", " endchk = \"TrueEnd\"+k['rownr']\n", " for i in x['answerContent']:\n", " if i == strchk:\n", " tslist[i] = x['answerContent'].get(i)\n", " if i == endchk:\n", " tslist[i] = x['answerContent'].get(i)\n", "\n", " \n", "# And finally let's add it to our new a2i labels dataset\n", "for idx,k in row_df.iterrows():\n", " x = json_output['humanAnswers'][0]\n", " strchk = \"TrueStart\"+k['rownr']\n", " endchk = \"TrueEnd\"+k['rownr']\n", " a2i_lbl_df.loc[len(a2i_lbl_df.index)] = [tslist[strchk], tslist[endchk]]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Dont execute steps below if no new label was added" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Updated Labels after A2I results are included\n", "a2i_lbl_df" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "a2i_label_src_fname = os.path.join(A2I_LABEL_DATA, 'labels.csv')\n", "a2i_lbl_df.to_csv(a2i_label_src_fname, header=None, index=False)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Uploading label dataset to S3:\n", "a2i_label_s3_dest_path = f's3://{BUCKET}/{PREFIX}/augmented-labelled-data/labels.csv'\n", "!aws s3 cp $a2i_label_src_fname $a2i_label_s3_dest_path" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Update the training dataset with new measurements\n", "We will now update our original training dataset with the new measurement range based on what we got back from A2I" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "turbine_id = 'R80711'\n", "file = '../data/wind-turbine/final/training-data/'+turbine_id+'/'+turbine_id+'.csv'\n", "newdf = pd.read_csv(file, index_col='Timestamp')\n", "newdf.head()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "newdf = newdf.shape" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sig_full_df = sig_full_df.set_index('Timestamp')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sig_full_df" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sig_full_df.shape" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "tm = pd.to_datetime('2021-04-05 20:30:00')\n", "print(tm)\n", "new_index = pd.date_range(\n", " start=tm,\n", " periods=sig_full_df.shape[0], \n", " freq='10min'\n", " )\n", "sig_full_df.index = new_index\n", "sig_full_df.index.name = 'Timestamp'\n", "sig_full_df = sig_full_df.reset_index()\n", "sig_full_df['Timestamp'] = pd.to_datetime(sig_full_df['Timestamp'], errors='coerce')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sig_full_df" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Append the original training data with the new measurements that we simulated before we ran our inference. We should be updating this only \n", "# if A2I reviews tagged faulty equipment\n", "newdf = newdf.reset_index()\n", "newdf = pd.concat([newdf,sig_full_df])\n", "newdf.head()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "newdf.tail()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "newdf = newdf.set_index('Timestamp')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Note:** As we can see above, 15 rows were appended to the end of the training dataset. Now lets create a csv file and copy the data to the training channel in S3" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "TRAIN_DATA_AUGMENTED = os.path.join(TRAIN_DATA,'augmented')\n", "os.makedirs(TRAIN_DATA_AUGMENTED, exist_ok=True)\n", "newdf.to_csv('../data/wind-turbine/final/training-data/augmented/'+turbine_id+'.csv')\n", "!aws s3 sync $TRAIN_DATA_AUGMENTED s3://$BUCKET/$PREFIX/training_data/augmented" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Update the component map for augmented dataset. You should not see any changes to the dataset structure because of A2I updates but just in case\n", "DATASET_COMPONENT_FIELDS_MAP = dict()\n", "for subsystem in components:\n", " if subsystem not in \".ipynb_checkpoints\" and subsystem in \"augmented\":\n", " subsystem = turbine_id\n", " print(\"sub: \" + subsystem)\n", " subsystem_tags = ['Timestamp']\n", " for root, _, files in os.walk(f'{TRAIN_DATA}/{subsystem}'):\n", " for file in files:\n", " print(\"file: \" + file)\n", " fname = os.path.join(root, file)\n", " current_subsystem_df = pd.read_csv(fname, nrows=1)\n", " subsystem_tags = subsystem_tags + current_subsystem_df.columns.tolist()[1:]\n", "\n", " DATASET_COMPONENT_FIELDS_MAP.update({subsystem: subsystem_tags}) " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "DATASET_COMPONENT_FIELDS_MAP" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create the augmented dataset" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "ROLE_ARN = sagemaker.get_execution_role()\n", "# REGION_NAME = boto3.session.Session().region_name\n", "\n", "DATASET_NAME = 'wind-turbine-dataset-augmented'\n", "MODEL_NAME = 'wind-turbine-model-augmented'\n", "\n", "\n", "lookout_dataset = lookout.LookoutEquipmentDataset(\n", " dataset_name=DATASET_NAME,\n", " component_fields_map=DATASET_COMPONENT_FIELDS_MAP,\n", " region_name=REGION_NAME,\n", " access_role_arn=ROLE_ARN\n", ")\n", "\n", "pp = pprint.PrettyPrinter(depth=5)\n", "pp.pprint(eval(lookout_dataset.dataset_schema))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "lookout_dataset.create()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Ingest augmented data into L4E" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response = lookout_dataset.ingest_data(BUCKET, f'{PREFIX}/training_data/augmented/')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Get the ingestion job ID and status:\n", "data_ingestion_job_id = response['JobId']\n", "data_ingestion_status = response['Status']\n", "\n", "# Wait until ingestion completes:\n", "print(\"=====Polling Data Ingestion Status=====\\n\")\n", "lookout_client = lookout.get_client(region_name=REGION_NAME)\n", "print(str(pd.to_datetime(datetime.datetime.now()))[:19], \"| \", data_ingestion_status)\n", "\n", "while data_ingestion_status == 'IN_PROGRESS':\n", " time.sleep(60)\n", " describe_data_ingestion_job_response = lookout_client.describe_data_ingestion_job(JobId=data_ingestion_job_id)\n", " data_ingestion_status = describe_data_ingestion_job_response['Status']\n", " print(str(pd.to_datetime(datetime.datetime.now()))[:19], \"| \", data_ingestion_status)\n", " \n", "print(\"\\n=====End of Polling Data Ingestion Status=====\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "describe_data_ingestion_job_response" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Update the time ranges for training and evaluation of augmented dataset\n", "In the case of a continuous interval of data sampling and training, there will not be any gaps (or minimal gaps) in the time ranges between the previous training run and the current augmented training run. However in our wind turbine example we are looking at a dataset that was last recorded in 2018. As a result we select the training and evaluation period choices as shown below. During operational application, choose a time period that provides you the flexibility of a back test window for your evaluation with adequate data made available for training. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "newdf.index = pd.to_datetime(newdf.index)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Loading time ranges, for augmented training, the training end will go upto the original evaluation end, and the evaluation end will be the last timestamp for \n", "# new data points\n", "\n", "train_ratio = 0.8\n", "train_split = int(len(df.index)*train_ratio)\n", "\n", " \n", "training_start = pd.to_datetime(newdf.index[0])\n", "training_end = pd.to_datetime(newdf.index[train_split])\n", "evaluation_start = pd.to_datetime(newdf.index[train_split+1])\n", "evaluation_end = pd.to_datetime(newdf.index.max())\n", " \n", "\n", "print(f'Training period: from {training_start} to {training_end}')\n", "print(f'Evaluation period: from {evaluation_start} to {evaluation_end}')\n", "\n", "print('Dataset used:', DATASET_NAME)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "REGION_NAME" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Finally retrain L4E based on Augmented dataset" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Prepare the model parameters:\n", "lookout_model = lookout.LookoutEquipmentModel(model_name=MODEL_NAME,\n", " dataset_name=DATASET_NAME,\n", " region_name=REGION_NAME)\n", "\n", "# Set the training / evaluation split date:\n", "lookout_model.set_time_periods(evaluation_start,\n", " evaluation_end,\n", " training_start,\n", " training_end)\n", "\n", "# Set the label data location:\n", "lookout_model.set_label_data(bucket=BUCKET, \n", " prefix=f'{PREFIX}/augmented-labelled-data/',\n", " access_role_arn=ROLE_ARN)\n", "\n", "# This sets up the rate the service will resample the data before \n", "# training:\n", "lookout_model.set_target_sampling_rate(sampling_rate='PT10M')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Actually create the model and train it:\n", "lookout_model.train()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "lookout_model.poll_model_training()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## End of notebook\n" ] } ], "metadata": { "kernelspec": { "display_name": "conda_python3", "language": "python", "name": "conda_python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.10" } }, "nbformat": 4, "nbformat_minor": 4 }