{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Module 4: Inference Patterns- endpoint based feature look up\n", "**This notebook uses the feature groups created in `module 1` and `module 2` and model trained in `module 3` to show how we can look up features from online feature store in realtime from an endpoint**\n", "\n", "**Note:** Please set kernel to `Python 3 (Data Science)` and select instance to `ml.t3.medium`\n", "\n", "\n", "## Contents\n", "\n", "1. [Background](#Background)\n", "2. [Setup](#Setup)\n", "3. [Load feature groups names](#Load-feature-groups-names)\n", "4. [Load the traind model to use for inference](#Load-the-traind-model-to-use-for-inference)\n", "5. [Deploy the model](#Deploy-the-model)\n", "6. [Make inferences using the deployed model](#Make-inferences-using-the-deployed-model)\n", "7. [Cleanup](#Cleanup)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Background\n", "In this notebook, we demonstrate how to retrieve features from two online feature groups within an endpoint. \n", "\n", "First we use the feature sets derived and ingested to online features stores in Modules 1 and 2 as well as the model trained in module 3 that was a SageMaker XGBoost binary classifier algorithm predicting whether a given product is added by a user to their basket.\n", "\n", "In this notebook, we will deploy the already trained model from the model artifact on to a SageMaker endpoint for real-time inference. Our endpoint will get the features from two online features groups (customers and products feature groups created in Module 2). By sending the request body as customer id and product id we retrieve the associated features from customer and product feature groups from low latency online feature stores and send them to the model endpoint for real-time inference.\n", "\n", "For this we will create a custom inference script, and specify the feature groups as well as features. We Utilise a custom library (helper.py) that faciliates returning the results from featurestore using some helper functions. The returned result is then fed into the model for inference.\n", "\n", "Take a few minutes reviewing the following architecture:\n", "\n", "![Inference endpoint lookup](../images/m4_nb2_inference_pattern.png \"Inference endpoint feature look up\")\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Setup" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.serializers import CSVSerializer\n", "from sagemaker.inputs import TrainingInput\n", "from sagemaker.predictor import Predictor\n", "from sagemaker import get_execution_role\n", "import pandas as pd\n", "import numpy as np\n", "import sagemaker\n", "import logging\n", "import json\n", "import os\n", "import sys\n", "sys.path.append('..')\n", "#if required upgrade Boto3\n", "#!pip install boto3 --upgrade\n", "import boto3\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Essentials\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "logger = logging.getLogger('__name__')\n", "logger.setLevel(logging.DEBUG)\n", "logger.addHandler(logging.StreamHandler())" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sagemaker_execution_role = get_execution_role()\n", "logger.info(f'Role = {sagemaker_execution_role}')\n", "session = boto3.Session()\n", "sagemaker_session = sagemaker.Session()\n", "sagemaker_client = session.client(service_name=\"sagemaker\")\n", "default_bucket = sagemaker_session.default_bucket()\n", "prefix = 'sagemaker-featurestore-workshop'\n", "s3 = session.resource('s3')\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Load feature groups names" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%store -r customers_feature_group_name\n", "%store -r products_feature_group_name" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Load the trained model to use for inference\n", "We will use the model artificats we trained in module 3 and use it for making inference- We will customise the inference of the model as described below" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%store -r training_jobName\n", "\n", "from sagemaker.xgboost.model import XGBoostModel\n", "\n", "training_job_info = sagemaker_client.describe_training_job(\n", " TrainingJobName=training_jobName\n", ")\n", "xgb_model_data = training_job_info[\"ModelArtifacts\"][\"S3ModelArtifacts\"]\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Deploy the model\n", "We deploy our model into an real-time endpoint and customise that by passing an inference script as well as our helper python library.\n", "\n", "First we prepare an inference.py entry script. If you pay attention inside the script, you will see few changes. Pay attention to the following details in the script:\n", "\n", "1. Firstly, you will see that we have defined a list that includes feature group names along side feature names, where it allows to select for a pre-defined features by specifying their names or all the features by specifying '*'. \n", "For example:\n", "\n", "To get selected features from users and product featurs stores:\n", "\n", "**`feature_list=['fscw-products-10-18-00-12: age_60-69, age_70-plus',\n", " 'fscw-customers-10-18-00-12: category_packaged_cheese']`**\n", "\n", "To get all features from users and product features stores: \n", "\n", "**`feature_list=['fscw-products-10-18-00-12:*',\n", " 'fscw-customers-10-18-00-12:*']`**\n", "\n", "2. Our model was built using all the features from the two feature groups and therefore we will exctract all the features.\n", "\n", "3. We are defining a custom input_fn function. Our request body is in a form of customer id and product id that calls the feature store client and loop up the feature values, using the helper library.\n", "\n", "4. You can take note of the time it takes for the features to be returned from the feature stores within the cloudwatch logs.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%writefile custom_library/inference_entry.py\n", "\n", "import json\n", "from io import StringIO\n", "import os\n", "import pickle as pkl\n", "import joblib\n", "import time\n", "import sys\n", "import subprocess\n", "import numpy as np\n", "\n", "\n", "import pandas as pd\n", "import boto3\n", "import sagemaker\n", "import helper\n", "import sagemaker_xgboost_container.encoder as xgb_encoders\n", "import argparse\n", "import os\n", "import json\n", "import os\n", "import pickle as pkl\n", "import numpy as np\n", "import ast\n", "\n", "boto_session = boto3.Session()\n", "region= boto_session.region_name\n", "\n", "#The feature list is passed as an environemnt variable to the script- feature list is defined by the client.\n", "feature_list=os.environ['feature_list']\n", "feature_list=ast.literal_eval(feature_list)\n", "\n", "\n", "\n", "def model_fn(model_dir):\n", " \"\"\"\n", " Deserialize and return fitted model.\n", " \"\"\"\n", " model_file = \"xgboost-model\"\n", " booster = pkl.load(open(os.path.join(model_dir, model_file), \"rb\"))\n", " return booster\n", "\n", "def input_fn(request_body, request_content_type):\n", " print(request_content_type)\n", " \"\"\"\n", " The SageMaker XGBoost model server receives the request data body and the content type,\n", " and invokes the `input_fn`.\n", " Return a DMatrix (an object that can be passed to predict_fn).\n", " \"\"\"\n", " print(request_content_type)\n", " if request_content_type == \"text/csv\":\n", " params =request_body.split(',')\n", " id_dict={'customer_id':params[0].strip(), 'product_id':params[1].strip()}\n", " \n", " start = time.time()\n", " records= helper.get_latest_featureset_values(id_dict, feature_list, verbose=True)\n", " end= time.time()\n", " duration= end-start\n", " print (\"time to lookup features from two feature stores:\", duration)\n", " \n", " records= \",\".join([str(e) for e in records.values()])\n", " return xgb_encoders.csv_to_dmatrix(records)\n", " else:\n", " raise ValueError(\"{} not supported by script!\".format(request_content_type))\n", " " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Prepare the featuregroup names and list of features to be retrieved from the online featurestore defined by the client and passed on to the script as an environemnt variable" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%store -r customers_feature_group_name\n", "%store -r products_feature_group_name\n", "\n", "customers_fg = sagemaker_client.describe_feature_group(\n", " FeatureGroupName=customers_feature_group_name)\n", "\n", "products_fg = sagemaker_client.describe_feature_group(\n", " FeatureGroupName=products_feature_group_name)\n", "\n", "\n", "'''select all features from the feature group using '*' OR selected a list from the complete list of features, you can get via the following code.\n", "customers_feats='*'\n", "products_feats='*'\n", "\n", "OR\n", "\n", "customers_feats=','.join(i['FeatureName'] for i in customers_fg['FeatureDefinitions'])\n", "products_feats=','.join(i['FeatureName'] for i in products_fg['FeatureDefinitions'])\n", "'''\n", "\n", "customers_feats='*'\n", "products_feats='*'\n", "\n", "customer_feats_desc=customers_fg[\"FeatureGroupName\"]+ \":\"+customers_feats\n", "products_feats_desc=products_fg[\"FeatureGroupName\"]+ \":\"+products_feats\n", "\n", "feature_list=str([customer_feats_desc,products_feats_desc])\n", "print(feature_list)\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.xgboost.model import XGBoostModel\n", "from sagemaker.serializers import CSVSerializer\n", "\n", "instance_type = \"ml.m4.xlarge\"\n", "\n", "env={\"feature_list\": feature_list}\n", "\n", "xgboost_inference_feature = XGBoostModel(\n", " model_data= xgb_model_data,\n", " role=sagemaker_execution_role,\n", " source_dir= './custom_library',\n", " entry_point=\"inference_entry.py\",\n", " framework_version=\"1.2-2\",\n", " env=env,\n", ")\n", "\n", "predictor_feature = xgboost_inference_feature.deploy(\n", " initial_instance_count=1, instance_type=instance_type)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Make inferences using the deployed model" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "predictor_feature.serializer = CSVSerializer()\n", "\n", "cust_id='C50'\n", "prod_id='P2'\n", "test_data= f'{cust_id}, {prod_id}'\n", "print(test_data)\n", "\n", "print(predictor_feature.predict(test_data))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### *Optional*- Use the input_fn utilised in the inference script to evaluate how fast features are returned from the two features stores within your inference endpoint" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import numpy as np\n", "import sagemaker\n", "import time\n", "import pandas as pd\n", "from custom_library import helper\n", "import ast\n", "\n", "feature_list=ast.literal_eval(feature_list)\n", "\n", "\n", "def input_fn(request_data, request_content_type):\n", " if request_content_type == \"text/csv\":\n", " params =request_data.split(',')\n", " id_dict={'customer_id':params[0].strip(), 'product_id':params[1].strip()}\n", " start = time.time()\n", " records= helper.get_latest_featureset_values(id_dict, feature_list)\n", " end= time.time()\n", " duration= end-start\n", " print(\"time to lookup features from two feature stores:\", duration)\n", " records= \",\".join([str(e) for e in records.values()])\n", " return records, duration\n", " else:\n", " raise ValueError(\"{} not supported by script!\".format(request_content_type))\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "cust_id='C45'\n", "prod_id='P26'\n", "payload= f'{cust_id},{prod_id}'\n", "print(payload)\n", "\n", "input_fn(payload, \"text/csv\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Cleanup" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "endpoint_name = predictor_feature.endpoint_name\n", "print(endpoint_name)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response = sagemaker_client.describe_endpoint_config(EndpointConfigName=endpoint_name)\n", "model_name = response['ProductionVariants'][0]['ModelName']\n", "model_name" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sagemaker_client.delete_model(ModelName=model_name) \n", "sagemaker_client.delete_endpoint(EndpointName=endpoint_name)\n", "sagemaker_client.delete_endpoint_config(EndpointConfigName=endpoint_name)" ] } ], "metadata": { "instance_type": "ml.t3.medium", "kernelspec": { "display_name": "Python 3 (Data Science)", "language": "python", "name": "python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-east-2:429704687514:image/datascience-1.0" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.10" }, "toc-autonumbering": false, "toc-showcode": false, "toc-showmarkdowntxt": true, "toc-showtags": false }, "nbformat": 4, "nbformat_minor": 4 }