{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Module 7: Feature Monitoring\n", "**This notebook leverages AWS Glue DataBrew to implement Feature Monitoring of Offline Feature Groups.**\n", "\n", "**Note:** Please set kernel to `Python 3 (Data Science)` and select instance to `ml.t3.medium`\n", "\n", "\n", "---\n", "\n", "## Contents\n", "\n", "1. [Overview](##Overview)\n", "2. [Feature Group Monitoring Preparation](#Feature-Group-Monitoring-Preparation)\n", "3. [Run Feature Group Monitoring using DataBrew Profile Job](#Run-Feature-Group-Monitoring-using-DataBrew-Profile-Job)\n", "4. [Visualisation of Feature Group statistics](#Visualisation-of-Feature-Group-statistics)\n", "5. [Clean up](#Clean-Up)\n", "\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Overview\n", "\n", "In previous module (`Module-1 Introduction to SageMaker Feature Store`), we demonstrated how to create multiple features groups inside a Amazon SageMaker Feature Store and ingest data into it.\n", "\n", "In this notebook, we will illustrate how to implement feature monitoring using AWS Glue DataBrew to create feature statistics.\n", "\n", "[AWS Glue DataBrew](https://aws.amazon.com/glue/features/databrew/) is a visual data preparation tool that helps you clean and normalize data without writing code. DataBrew also allows customers to specify which data quality statistics to auto-generate for datasets when running a profile job. This allows users to customize data profile statistics such as determining duplicate values, correlations, and outliers based on the nature and size of their datasets, and create a custom data profile overview with only the statistics that meet their needs.\n", "\n", "We will cover the following aspects:\n", "\n", "* Preparation step to Feature Monitoring\n", "* Executing Feature Monitoring using a DataBrew Profile job to generate statistics for a feature group\n", "* Extract statistics from profile job execution and persist to S3\n", "* Additional visualisation of the feature group statistics\n", "\n", "![Feature Monitoring Architecture](../images/feature_monitoring_architecture.png \"Feature Monitoring Architecture\")\n", "---" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Prerequisites" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Before continuing with this module, you need to have run `Module-1 Introduction to SageMaker Feature Store`." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Setup" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Imports" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%capture\n", "!pip install pyathena" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.feature_store.feature_group import FeatureGroup\n", "from sagemaker import get_execution_role\n", "import sagemaker\n", "import logging\n", "import boto3\n", "import pandas as pd\n", "import time\n", "import re\n", "import os\n", "import sys\n", "from IPython.display import display, Markdown\n", "from time import gmtime, strftime\n", "\n", "#module containing utility functions for this notebook\n", "import feature_monitoring_utils" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "logger = logging.getLogger('__name__')\n", "logger.setLevel(logging.DEBUG)\n", "logger.addHandler(logging.StreamHandler())\n", "logger.info(f'Using SageMaker version: {sagemaker.__version__}')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Retrieve Feature Group" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In this notebook, we will be using and monitoring the orders feature group created in Module 1 of the workshop. In case you would like to use a different feature group, please uncomment and replace **ORDERS_FEATURE_GROUP_NAME** with the name of your Orders Feature Group. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Need to retrieve FG names from previous FSCW notebooks\n", "# Retrieve FG names\n", "%store -r orders_feature_group_name\n", "\n", "#orders_feature_group_name = \"\"\n", "\n", "# Set up the results bucket location\n", "results_bucket=sagemaker.Session().default_bucket() # You might change this for a different s3 bucket\n", "results_key='aws-databrew-results/Offline-FS'" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Update SageMaker Studio execution role \n", "If you are running this notebook in Amazon SageMaker Studio, the IAM role assumed by your Studio user needs permission to perform Glue and DataBrew operations. To provide this permission to the role, do the following.\n", "\n", "1. Open the [Amazon SageMaker console](https://console.aws.amazon.com/sagemaker/).\n", "2. Select Amazon SageMaker Studio and choose your user name.\n", "3. Under **User summary**, copy just the name part of the execution role ARN \n", "5. Go to the [IAM console](https://console.aws.amazon.com/iam) and click on **Roles**. \n", "6. Find the role associated with your SageMaker Studio user\n", "7. Under the Permissions tab, click **Attach policies** and add the following: **AWSGlueServiceRole**, **AWSGlueDataBrewFullAccessPolicy**, **AmazonAthenaFullAccess**\n", "8. Under Trust relationships, click **Edit trust relationship** and add the following JSON,\n", "```\n", "{\n", " \"Version\": \"2012-10-17\",\n", " \"Statement\": [\n", " {\n", " \"Effect\": \"Allow\",\n", " \"Principal\": {\n", " \"Service\": \"sagemaker.amazonaws.com\"\n", " },\n", " \"Action\": \"sts:AssumeRole\"\n", " },\n", " {\n", " \"Effect\": \"Allow\",\n", " \"Principal\": {\n", " \"Service\": \"glue.amazonaws.com\"\n", " },\n", " \"Action\": \"sts:AssumeRole\"\n", " },\n", " {\n", " \"Effect\": \"Allow\",\n", " \"Principal\": {\n", " \"Service\": \"databrew.amazonaws.com\"\n", " },\n", " \"Action\": \"sts:AssumeRole\"\n", " }\n", " ]\n", "}\n", "```\n", " " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Feature Group Monitoring Preparation" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This function will execute a few steps to prepare the data before running DataBrew profiling job and generate statistics.\n", "\n", "* Create Snapshot of a Feature Group from Offline Feature Store (before creating using DataBrew profiling capability, we need to create a snapshot of the Feature from the Offline Feature Store which only keeps the latest version of a feature and removes records marked as to be deleted)\n", "* Crawl the Snapshot table (the newly created table needs to be crawled to be accessible by DataBrew. The following function will create an AWS Glue Crawler, start the crawler)\n", "* Create AWS Glue DataBrew dataset\n", "* Create AWS Glue DataBrew Profile Job" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response_brew_prep = feature_monitoring_utils.feature_monitoring_prep(\n", " orders_feature_group_name, \n", " results_bucket, \n", " results_key,\n", " verbose = False\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Run Feature Group Monitoring using DataBrew Profile Job" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This section will run an AWS Glue DataBrew Profile (defined in the previous section)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Launch DataBrew Profile Job" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Call the main profile execution function\n", "response_brew_job = feature_monitoring_utils.feature_monitoring_run(\n", " orders_feature_group_name,\n", " verbose=False\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Retrieve Link to DataBrew Profile Job Visualizations" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Users can access the statistics for a particular job run using the AWS DataBrew Console UI. The code below retrieves the URL link.\n", "![Feature Group Tag](../images/databrew_visualization.png \"DataBrew Visualization\")\n", "\n", "This link is also added as tag to the Feature Group (as per the picture below).\n", "\n", "![Feature Group Tag](../images/feature_group_tags.png \"Feature Group Tag\")\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Display the Report S3 location\n", "databrew_profile_console_url = response_brew_job[2]\n", "brew_results_s3 = response_brew_job[4]\n", "print(\"Report is available at the following S3 location:\\n\" + brew_results_s3 + \"\\n\")\n", "\n", "# Display the DataBrew link\n", "print(\"Please click on the link below to access visulizations in Glue DataBrew console:\")\n", "databrew_link = f'[DataBrew Profile Job Visualizations]({databrew_profile_console_url})'\n", "display(Markdown(databrew_link))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Ingest more rows into the feature group to simulate feature drift\n", "\n", "In order to see feature drift we simulate some data changes in the orders FG. The CSV file used for the ingestion was previously generated in 1st module.\n", "\n", "If the correspondent CSV file is not present at *../data/transformed/* folder, you will have to generate it by launching this notebook *m1_nb0_prepare_datasets.ipynb* from 1st module.\n", "\n", "If you want to simulate feature drift for another Feature Group you will need to customize this part and generate some changes to your data before launching the visualization section." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Provide the path to input CVS file\n", "csv_path = '.././data/transformed/orders.csv'\n", "orders_count = int(os.popen(f'wc -l {csv_path}').readline().split()[0]) - 1\n", "nbrows = 0\n", "nb_runs = 2 # Number of reports we want to generate\n", "nb_sample = int(orders_count*0.25/nb_runs)\n", "\n", "# Ingest rows and execute profile job\n", "for x in range(nb_runs):\n", " nbrows = nbrows + nb_sample\n", " # Ingest rows into FG from CSV file\n", " feature_monitoring_utils.ingest_rows_fg(orders_feature_group_name, csv_path, nbrows=nbrows)\n", " \n", " # Call the main profile execution function\n", " resp_job = feature_monitoring_utils.feature_monitoring_run(orders_feature_group_name, verbose=False)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Visualisation of Feature Group statistics" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Overview\n", "\n", "The main goal of this notebook is to create visualizations of features and trends over time based on AWS Glue DataBrew quality statistics generated reports.\n", "\n", "### Main steps\n", "\n", "- Consolidate historic monitor reports, generates a data frame and save CSV files in S3\n", "- Visualization of features statistics over time\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Creates a Pandas dataframe from Json reports and write CSV files to S3\n", "\n", "These CVS files can be further used by other visualisation or BI tools" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "start_date = '20210701' # input('Insert Date as a form YYYYmmdd')\n", "\n", "# Creates a Pandas dataframe from Json reports and write CSV files to S3\n", "response_consolid = feature_monitoring_utils.consolidate_monitor_reports(\n", " orders_feature_group_name, \n", " results_bucket,\n", " results_key,\n", " start_date\n", ")\n", "\n", "analysis_data = response_consolid[0]\n", "consolidated_s3_file = response_consolid[2]\n", "print(consolidated_s3_file)\n", "analysis_data.head()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Visualize the evolution in time for a particular feature" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**purchase_amount's entropy drift over time**" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "feature_name = 'purchase_amount'\n", "analysis_data_purchase_amount = feature_monitoring_utils.plot_profiling_data(\n", " analysis_data, \n", " feature_name, \n", " ['entropy'], \n", " kind='line'\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**See two statistics (entropy and standardDeviation) for purchase_amount feature**\n", "\n", "**Separate plots**" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "analysis_data_purchase_amount = feature_monitoring_utils.plot_profiling_data(\n", " analysis_data, \n", " feature_name, \n", " ['entropy','standardDeviation'],\n", " multiple_plots=True\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Same plot with different scales for each statistic**" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "analysis_data_purchase_amount = feature_monitoring_utils.plot_profiling_data(\n", " analysis_data, \n", " feature_name, \n", " ['entropy','standardDeviation']\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**purchase_amount's entropy drift over time using a Bar Plot**" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Clean up" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**This section ensures you delete all AWS resources that was created as part of this workshop**" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Clean up resources - crawler, DataBrew dataset, and profile job - as well as local analysis folder\n", "analysis_folder_name=response_consolid[4]\n", "\n", "response = feature_monitoring_utils.feature_monitoring_cleanup(\n", " orders_feature_group_name, \n", " analysis_folder_name\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "instance_type": "ml.t3.medium", "kernelspec": { "display_name": "Python 3 (Data Science)", "language": "python", "name": "python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:eu-west-1:470317259841:image/datascience-1.0" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.10" } }, "nbformat": 4, "nbformat_minor": 4 }