{ "cells": [ { "cell_type": "markdown", "id": "5a8c710c", "metadata": {}, "source": [ "# Module 4: Inference Patterns- Inference pipline based feature look up\n", "**This notebook uses the feature groups created in `module 1` and `module 2` and model trained in `module 3` to show how we can look up features from online feature store in realtime from an endpoint**\n", "\n", "\n", "**Note:** Please set kernel to `Python 3 (Data Science)` and select instance to `ml.t3.medium`\n", "\n", "\n", "## Contents\n", "\n", "1. [Background](#Background)\n", "2. [Setup](#Setup)\n", "3. [Loading feature group names](#Loading-feature-group-names)\n", "4. [Prepare a script to look up features from the featurestore](#Prepare-a-script-to-look-up-features-from-the-featurestore)\n", "5. [Load pre-trained xgboost model](#Load-pre-trained-xgboost-model)\n", "6. [Create and deploy an inference pipline](#Create-and-deploy-an-inference-pipline)\n", "7.[Make inference using the inference pipeline](#Make-inference-using-the-inference-pipeline)\n", "8. [Cleanup](#Cleanup)\n" ] }, { "cell_type": "markdown", "id": "04bab1cf", "metadata": {}, "source": [ "## Background\n", "In this notebook, we demonstrate how to retreive features from two online feature groups within an endpoint. First we use the feature set derived in Modules 1 and 2 as well as the model trained in module 3 that was a SageMaker XGBoost algorithm predicting which product the user would add to their baskets.\n", "\n", "Retreiving the already trained model, we will create an inference pipline [Inference pipeline](https://docs.aws.amazon.com/sagemaker/latest/dg/inference-pipelines.html). Using inference pipeline you can chain a sequence of 2 to 15 containers and delpoy on the same endpoint- Inference pipeline are great for real time inferences where a sequence of models feed into one another to generate the final prediction, or where pre-processing or post-processing of restuls in real time are requiered. \n", "\n", "In this notebook, we will see how we can use a XGBoost container as the first container within the inference pipeline to look up features from online features stores and feed the retreived features into a second XGboost container for model inference. You will also see how we delpoy these two container onto the same endpoint via using inference pipelineModel.\n", "\n", "Our first XGBoost contianer will get the features from two online features stores (customers and products feature groups created in Module 2) by sending the request body as customer id and product id to retreive their associated features from customer and product feature groups. \n", "\n", "Take a few minutes reviewing the following architecture that shows an exmaple of an inference pipeline with multiple container.\n", "\n", "![Inference endpoint lookup](../images/m4_nb3_inference_pattern.png \"Inference Pipeline endpoint feature look up\")\n" ] }, { "cell_type": "markdown", "id": "55128006", "metadata": {}, "source": [ "## Setup" ] }, { "cell_type": "code", "execution_count": null, "id": "970ce734", "metadata": {}, "outputs": [], "source": [ "import sagemaker\n", "from sagemaker.serializers import CSVSerializer\n", "from sagemaker.inputs import TrainingInput\n", "from sagemaker.predictor import Predictor\n", "from sagemaker import get_execution_role\n", "import pandas as pd\n", "import numpy as np\n", "import sagemaker\n", "import logging\n", "import json\n", "import os\n", "import sys\n", "sys.path.append('..')\n", "import boto3\n", "\n" ] }, { "cell_type": "markdown", "id": "695b0a5d", "metadata": {}, "source": [ "### Essentials" ] }, { "cell_type": "code", "execution_count": null, "id": "4c6e8279", "metadata": {}, "outputs": [], "source": [ "logger = logging.getLogger('__name__')\n", "logger.setLevel(logging.DEBUG)\n", "logger.addHandler(logging.StreamHandler())" ] }, { "cell_type": "code", "execution_count": null, "id": "0fde9200", "metadata": {}, "outputs": [], "source": [ "sagemaker_execution_role = get_execution_role()\n", "logger.info(f'Role = {sagemaker_execution_role}')\n", "session = boto3.Session()\n", "sagemaker_session = sagemaker.Session()\n", "default_bucket = sagemaker_session.default_bucket()\n", "prefix = 'sagemaker-featurestore-workshop'\n", "s3 = session.resource('s3')\n", "sagemaker_client = session.client(service_name=\"sagemaker\")" ] }, { "cell_type": "markdown", "id": "667169ed", "metadata": {}, "source": [ "## Loading feature group names\n", "\n", "We will be loading and using the data we created and ingested in feature groups created in module 1 and 2. Therefore we will restore the feature group name to use." ] }, { "cell_type": "code", "execution_count": null, "id": "0e21e5a0", "metadata": {}, "outputs": [], "source": [ "%store -r customers_feature_group_name\n", "%store -r products_feature_group_name" ] }, { "cell_type": "markdown", "id": "6d034228", "metadata": {}, "source": [ "## Prepare a script to look up features from the featurestore\n", "Within an inference pipeline we can deploy multiple containers in sequence- the first container for example can be pre-precessing container using Sklearn or any other framework of your choice- for the demonstration we will create a xgboost model object that does nothing but looks up the features from feature store. For this, we will prepare a customised inference script to use when creating the model object. Please note that in the code we are returning 'None' as the model, as we have not trained any model using an estimators or any processor model.\n" ] }, { "cell_type": "code", "execution_count": null, "id": "dd7fc87e", "metadata": {}, "outputs": [], "source": [ "%%writefile custom_library/inference_get_features.py\n", "\n", "import json\n", "from io import StringIO\n", "import os\n", "import pickle as pkl\n", "import joblib\n", "import time\n", "import sys\n", "import subprocess\n", "import numpy as np\n", "import pandas as pd\n", "import numpy as np\n", "import boto3\n", "import sagemaker\n", "import helper\n", "import json\n", "import os\n", "import pickle as pkl\n", "import numpy as np\n", "import ast\n", "from sagemaker.serializers import CSVSerializer\n", "\n", "boto_session = boto3.Session()\n", "region= boto_session.region_name\n", "\n", "#The feature list is passed as an environemnt variable to the script- feature list is defined by the client.\n", "feature_list=os.environ['feature_list']\n", "feature_list=ast.literal_eval(feature_list)\n", "\n", "\n", "\n", "def model_fn(model_dir):\n", " print ('processing - in model_fn')\n", " return None\n", "\n", "\n", "\n", "def input_fn(request_body, request_content_type):\n", " print(request_content_type)\n", " \"\"\"\n", " The SageMaker XGBoost model server receives the request data body and the content type,\n", " and invokes the `input_fn`.\n", " Return a DMatrix (an object that can be passed to predict_fn).\n", " \"\"\"\n", " if request_content_type == \"text/csv\":\n", " params =request_body.split(',')\n", " id_dict={'customer_id':params[0].strip(), 'product_id':params[1].strip()}\n", " start = time.time()\n", " recs= helper.get_latest_featureset_values(id_dict, feature_list)\n", " end= time.time()\n", " duration= end-start\n", " print(\"time to lookup features from two feature stores:\", duration)\n", " records= [e for e in recs.values()]\n", " return [records]\n", " else:\n", " raise ValueError(\"{} not supported by script!\".format(request_content_type))\n", " \n", "\n", "def predict_fn(input_data, model):\n", " \"\"\"\n", " SageMaker XGBoost model server invokes `predict_fn` on the return value of `input_fn`.\n", " Return a two-dimensional NumPy array where the first columns are predictions\n", " and the remaining columns are the feature contributions (SHAP values) for that prediction.\n", " \"\"\"\n", " return input_data\n" ] }, { "cell_type": "markdown", "id": "a3e17d31", "metadata": {}, "source": [ "### Prepare the featuregroup names and list of features to be retreived from the online featurestore defined by the client and passed on to the script as an environemnt variable" ] }, { "cell_type": "code", "execution_count": null, "id": "0cfca94b", "metadata": {}, "outputs": [], "source": [ "%store -r customers_feature_group_name\n", "%store -r products_feature_group_name\n", "\n", "customers_fg = sagemaker_client.describe_feature_group(\n", " FeatureGroupName=customers_feature_group_name)\n", "\n", "products_fg = sagemaker_client.describe_feature_group(\n", " FeatureGroupName=products_feature_group_name)\n", "\n", "\n", "'''select all features from the feature group using '*' OR OR selected a list from the complete list of features, you can get via the following code.\n", "customers_feats='*'\n", "products_feats='*'\n", "\n", "OR\n", "\n", "customers_feats=','.join(i['FeatureName'] for i in customers_fg['FeatureDefinitions'])\n", "products_feats=','.join(i['FeatureName'] for i in products_fg['FeatureDefinitions'])\n", "'''\n", "\n", "customers_feats='*'\n", "products_feats='*'\n", "\n", "customer_feats_desc=customers_fg[\"FeatureGroupName\"]+ \":\"+customers_feats\n", "products_feats_desc=products_fg[\"FeatureGroupName\"]+ \":\"+products_feats\n", "\n", "feature_list=str([customer_feats_desc,products_feats_desc])\n", "print(feature_list)\n" ] }, { "cell_type": "markdown", "id": "d0c143db", "metadata": {}, "source": [ "### Create the XGB model object and pass on the customised inference script. Take note of the environemt variables we are defining, in particular feature_list passed on by the client to the script" ] }, { "cell_type": "code", "execution_count": null, "id": "35cef15b", "metadata": {}, "outputs": [], "source": [ "from sagemaker.xgboost.model import XGBoostModel\n", "\n", "\n", "#env={\"feature_list\": feature_list}\n", "\n", "fs_lookup_model = XGBoostModel(\n", " model_data=None,\n", " role=sagemaker_execution_role,\n", " source_dir= './custom_library',\n", " entry_point=\"inference_get_features.py\",\n", " framework_version=\"1.3-1\",\n", " sagemaker_session=sagemaker_session,\n", ")\n", "\n", "fs_lookup_model.env = {\"SAGEMAKER_DEFAULT_INVOCATIONS_ACCEPT\":\"text/csv\", \"feature_list\": feature_list}" ] }, { "cell_type": "markdown", "id": "d8b44444", "metadata": {}, "source": [ "## Load pre-trained xgboost model\n", "As for the model which actually does the inference we load our already trained XGboost model and deploy it as the second container in the sequence of inference pipeline. We do this simply by loading the trained model from module 3 and creating the model object to pass on to the inference pipeline" ] }, { "cell_type": "code", "execution_count": null, "id": "7a5d7ba6", "metadata": {}, "outputs": [], "source": [ "%store -r training_jobName\n", "sagemaker_client = session.client(service_name=\"sagemaker\")\n", "from sagemaker.xgboost.model import XGBoostModel\n", "\n", "training_job_info = sagemaker_client.describe_training_job(\n", " TrainingJobName=training_jobName\n", ")\n", "xgb_model_data = training_job_info[\"ModelArtifacts\"][\"S3ModelArtifacts\"]\n", "print(xgb_model_data)\n", "\n", "container_uri = training_job_info['AlgorithmSpecification']['TrainingImage']" ] }, { "cell_type": "code", "execution_count": null, "id": "73f8c526", "metadata": {}, "outputs": [], "source": [ "from time import gmtime, strftime\n", "from sagemaker.utils import name_from_base\n", "from sagemaker.model import Model\n", "\n", "xgb_model = Model(\n", " image_uri=container_uri,\n", " model_data=xgb_model_data,\n", " role=sagemaker_execution_role,\n", " name=name_from_base(\"fs-workshop-xgboost-model\"),\n", " sagemaker_session=sagemaker_session,\n", ")\n" ] }, { "cell_type": "markdown", "id": "bad8b054", "metadata": {}, "source": [ "## Create and deploy an inference pipline\n", "As shown in the following code, we use Pipleline model and pass on the two models as a sequence to the pipeline and deploy it similar to any other deployment to an endpoint" ] }, { "cell_type": "code", "execution_count": null, "id": "7499fe59", "metadata": {}, "outputs": [], "source": [ "from sagemaker.pipeline import PipelineModel\n", "\n", "instance_type = \"ml.m5.2xlarge\"\n", " \n", "model_name = name_from_base(\"inference-pipeline\")\n", "endpoint_name = name_from_base(\"inference-pipeline-ep\")\n", "\n", "sm_model = PipelineModel(name=model_name, role=sagemaker_execution_role, models=[fs_lookup_model, xgb_model])\n", "\n", "sm_model.deploy(initial_instance_count=1, instance_type=instance_type, endpoint_name=endpoint_name)" ] }, { "cell_type": "markdown", "id": "862cf712", "metadata": {}, "source": [ "## Make inference using the inference pipeline" ] }, { "cell_type": "code", "execution_count": null, "id": "cf27154f", "metadata": {}, "outputs": [], "source": [ "from sagemaker.predictor import Predictor\n", "from sagemaker.serializers import CSVSerializer\n", "\n", "cust_id='C50'\n", "prod_id='P2'\n", "test_data= f'{cust_id},{prod_id}'\n", "print(test_data)\n", "\n", "predictor = Predictor(\n", " endpoint_name=endpoint_name,\n", " sagemaker_session=None,\n", " serializer=CSVSerializer(),\n", " Content_Type=\"text/csv\",\n", " Accept=\"text/csv\"\n", ")\n", "print(predictor.predict(test_data))" ] }, { "cell_type": "markdown", "id": "0cdddde7", "metadata": {}, "source": [ "## Cleanup" ] }, { "cell_type": "code", "execution_count": null, "id": "e7b94351", "metadata": {}, "outputs": [], "source": [ "endpoint_name = sm_model.endpoint_name\n", "print(endpoint_name)" ] }, { "cell_type": "code", "execution_count": null, "id": "ced8ed49", "metadata": {}, "outputs": [], "source": [ "response = sagemaker_client.describe_endpoint_config(EndpointConfigName=endpoint_name)\n", "model_name = response['ProductionVariants'][0]['ModelName']\n", "model_name" ] }, { "cell_type": "code", "execution_count": null, "id": "24c6c60a", "metadata": {}, "outputs": [], "source": [ "sagemaker_client.delete_model(ModelName=model_name) \n", "sagemaker_client.delete_endpoint(EndpointName=endpoint_name)\n", "sagemaker_client.delete_endpoint_config(EndpointConfigName=endpoint_name)" ] }, { "cell_type": "code", "execution_count": null, "id": "2a34ec94", "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "conda_python3", "language": "python", "name": "conda_python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.13" } }, "nbformat": 4, "nbformat_minor": 5 }