{ "cells": [ { "cell_type": "markdown", "id": "30b55228", "metadata": {}, "source": [ "# MLOps: Repeatable - Automate machine learning workflows" ] }, { "cell_type": "markdown", "id": "9f9991b4", "metadata": { "tags": [] }, "source": [ "## Contents\n", "\n", "- [Introduction](#Introduction)\n", "- [Model Build pipeline with SageMaker Pipelines](#Training-pipeline-with-SageMaker-Pipelines)\n", " - [Pipeline inputs](#Pipeline-inputs)\n", " - [SageMaker Processing step](#SageMaker-Processing-step)\n", " - [SageMaker Training step](#SageMaker-Training-step)\n", " - [Model evaluation step](#Model-evaluation-step)\n", " - [Register model in Model Registry step](#Register-model-in-Model-Registry-step)\n", " - [Define & create the training pipeline](#Assemble-the-training-pipeline)\n", " - [Execute the training pipeline](#Execute-the-training-pipeline)\n", "- [Deployment pipeline (real-time endpoint) with SageMaker Pipelines](#Deployment-pipeline-with-SageMaker-Pipelines)\n", " - [Assemble the deployment pipeline](#Assemble-the-deployment-pipeline)\n", " - [Execute the deployment pipeline](#Execute-the-deployment-pipeline)\n", " - [Test the SageMaker endpoint](#Test-the-SageMaker-endpoint)" ] }, { "cell_type": "markdown", "id": "0c955aa1", "metadata": {}, "source": [ "## Introduction" ] }, { "cell_type": "markdown", "id": "a698b834", "metadata": {}, "source": [ "This is our fourth notebook (Lab 2) which will dive deep into automating machine learning workflows to create a more repeatable path to production. \n", "\n", "Here, we will put on the hat of a `ML Engineer` and perform the tasks required to automate the tasks within our machine learning workflows as well as orchestrate the steps. For this, we'll build pipeline steps that include all the previous notebooks components into one singular entity. This pipeline entity accomplishes a repeatable ML workflow with some reliability built in through quality minimal quality gates. \n", "\n", "For this task we will be using Amazon SageMaker Pipelines capabilities to build out an end-to-end machine learning pipeline. \n", "\n", "![Notebook4](images/Notebook-4.png)\n", "\n", "Keep in mind, CI/CD practicies are typically more aligned with the *Reliable* stage so you'll notice we have not yet considered a more robust set of pipelines that considers the lifecycle of each stage (build vs deploy), source/version control, automated triggers, or additional quality gates. \n", "\n", "Let's get started!" ] }, { "cell_type": "markdown", "id": "b77c3c66", "metadata": {}, "source": [ "**Imports**" ] }, { "cell_type": "code", "execution_count": null, "id": "362016c0-d6be-4122-b37c-cbe4a2206788", "metadata": { "tags": [] }, "outputs": [], "source": [ "!pip install -U sagemaker" ] }, { "cell_type": "code", "execution_count": null, "id": "de783141", "metadata": { "tags": [] }, "outputs": [], "source": [ "%store -r" ] }, { "cell_type": "code", "execution_count": null, "id": "5bf9398a", "metadata": { "tags": [] }, "outputs": [], "source": [ "# Processing imports\n", "from sagemaker.sklearn.processing import SKLearnProcessor\n", "from sagemaker.processing import ProcessingInput, ProcessingOutput, ScriptProcessor\n", "\n", "# SageMaker Pipeline imports\n", "from sagemaker.workflow.properties import PropertyFile\n", "from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo\n", "from sagemaker.workflow.condition_step import ConditionStep\n", "from sagemaker.workflow.functions import JsonGet\n", "\n", "from sagemaker.workflow.pipeline import Pipeline\n", "from sagemaker.workflow.steps import ProcessingStep, TrainingStep, CreateModelStep, TransformStep\n", "from sagemaker.workflow.model_step import ModelStep\n", "\n", "from sagemaker.workflow.parameters import (\n", " ParameterInteger,\n", " ParameterString,\n", ")\n", "\n", "# Other imports\n", "import json\n", "import time\n", "from time import gmtime, strftime\n", "from sagemaker.sklearn.estimator import SKLearn\n", "from sagemaker.model import Model\n", "from sagemaker.tuner import IntegerParameter, HyperparameterTuner\n", "from sagemaker.inputs import TrainingInput\n", "from sagemaker.lambda_helper import Lambda\n", "from sagemaker.workflow.lambda_step import (\n", " LambdaStep,\n", " LambdaOutput,\n", " LambdaOutputTypeEnum,\n", ")\n", "\n", "# To test the endpoint once it's deployed\n", "from sagemaker.predictor import Predictor\n", "from sagemaker.serializers import CSVSerializer\n", "from sagemaker.deserializers import JSONDeserializer, CSVDeserializer\n", "from sagemaker.workflow.pipeline_context import PipelineSession\n", "import sagemaker\n", "import json\n", "import boto3\n", "from sagemaker.model_metrics import ModelMetrics, MetricsSource\n", "import pandas as pd\n", "from sagemaker.feature_store.feature_group import FeatureGroup\n", "from helper_library import *\n", "\n", "from sagemaker.workflow.steps import CacheConfig" ] }, { "cell_type": "markdown", "id": "1d4f7796", "metadata": {}, "source": [ "**Session variables**" ] }, { "cell_type": "code", "execution_count": null, "id": "4eb244ac", "metadata": { "tags": [] }, "outputs": [], "source": [ "# Useful SageMaker variables\n", "session = PipelineSession()\n", "bucket = session.default_bucket()\n", "role_arn= sagemaker.get_execution_role()\n", "region = session.boto_region_name\n", "sagemaker_client = boto3.client('sagemaker')\n", "aws_account_id = boto3.client('sts').get_caller_identity().get('Account')\n", "lambda_role = create_lambda_iam_role('LambdaSageMakerExecutionRole')" ] }, { "cell_type": "markdown", "id": "3f885a04", "metadata": {}, "source": [ "## Model Build pipeline with SageMaker Pipelines" ] }, { "cell_type": "markdown", "id": "e5be5471", "metadata": {}, "source": [ "[Amazon SageMaker Pipelines](https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-sdk.html) provides the ability to create a directed acryclic graph (DAG) containing the pipeline steps need to build and/or deploy machine learning models. Each pipeline, created through the provided Python SDK, is a series of interconnected steps. This same pipeline can also be exported as a JSON pipeline definition. \n", "\n", "The structure of a pipeline's DAG is determined by the data dependencies between steps. These data dependencies are created when the properties of a step's output are passed as the input to another step. The following image is a pipeline DAG that we'll be creating for our training pipeline:\n", "\n", "![](images/sagemaker-pipelines-dag.png)" ] }, { "cell_type": "markdown", "id": "29cefc9b", "metadata": {}, "source": [ "#### Pipeline Parameters" ] }, { "cell_type": "markdown", "id": "a135d550", "metadata": {}, "source": [ "SageMaker Pipelines supports [pipeline parameters](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-parameters.html) allowing you to provide runtime parameters for each run of your pipeline. This allows you to change key inputs for each run of your pipeline without changing your pipeline code (ex. raw data on input)\n", "\n", "Here, we'll identify the parameters and set the parameter default. You can also use this feature to make it reusable (you'll be able to override these inputs upon executing the pipeline later in the notebook)." ] }, { "cell_type": "code", "execution_count": null, "id": "22d9ae4d", "metadata": { "tags": [] }, "outputs": [], "source": [ "processing_instance_count = ParameterInteger(\n", " name='ProcessingInstanceCount',\n", " default_value=1\n", ")" ] }, { "cell_type": "markdown", "id": "39dcd96d-a7c1-4a6a-88b5-972eeec17e7b", "metadata": {}, "source": [ "#### Setup Step Caching Configuration\n", "\n", "This configuration can be enabled on pipeline steps to allow SageMaker Pipelines to automatically check if a previous (successful) run of a pipeline step with the same values for specific parameters is found. If it is found, Pipelines propogates the results of that step to the next step without re-running the step saving both time and compute costs." ] }, { "cell_type": "code", "execution_count": null, "id": "5df22e33-f92b-44f6-bdd1-b2e4acb26b72", "metadata": { "tags": [] }, "outputs": [], "source": [ "cache_config = CacheConfig(enable_caching=True, expire_after=\"PT12H\")" ] }, { "cell_type": "markdown", "id": "fb440646", "metadata": {}, "source": [ "#### SageMaker Processing step" ] }, { "cell_type": "markdown", "id": "a961856d", "metadata": {}, "source": [ "This should look very similar to the SageMaker Processing Job you configured in notebook 2. The only new line of code is the `ProcessingStep` line at the bottom of the cell below which allows us to take the Processing Job configuration and include it as a pipeline step. " ] }, { "cell_type": "code", "execution_count": null, "id": "c629b5bf", "metadata": { "tags": [] }, "outputs": [], "source": [ "preprocess_data_processor = SKLearnProcessor(\n", " framework_version='0.23-1',\n", " role=role_arn,\n", " instance_type='ml.m5.xlarge',\n", " instance_count=processing_instance_count,\n", " base_job_name='preprocess-data',\n", " sagemaker_session=session,\n", ")\n", "\n", "preprocess_dataset_step = ProcessingStep(\n", " name='PreprocessData',\n", " code='./pipeline_scripts/preprocessing.py',\n", " processor=preprocess_data_processor,\n", " inputs=[\n", " ProcessingInput(\n", " source=raw_s3,\n", " destination='/opt/ml/processing/input',\n", " s3_data_distribution_type='ShardedByS3Key'\n", " )\n", " ],\n", " outputs=[\n", " ProcessingOutput(\n", " output_name='train',\n", " destination=f'{output_path}/train',\n", " source='/opt/ml/processing/output/train'\n", " ),\n", " ProcessingOutput(\n", " output_name='validation',\n", " destination=f'{output_path}/validation',\n", " source='/opt/ml/processing/output/validation'\n", " ),\n", " ProcessingOutput(\n", " output_name='test',\n", " destination=f'{output_path}/test',\n", " source='/opt/ml/processing/output/test'\n", " )\n", " ],\n", " cache_config=cache_config\n", ")" ] }, { "cell_type": "markdown", "id": "98a13bfa", "metadata": {}, "source": [ "#### SageMaker Training step" ] }, { "cell_type": "markdown", "id": "deaa727f", "metadata": {}, "source": [ "This configuration should also look very similar to the SageMaker Training job you did in notebook 2. The only new line of code is the `TrainingStep` line at the bottom of the cell below to allow us to run the training job as a step in our pipeline." ] }, { "cell_type": "code", "execution_count": null, "id": "e65af28f", "metadata": { "tags": [] }, "outputs": [], "source": [ " # Tuned hyperparameters\n", "hyperparameters = {\n", " 'max_depth': 8, \n", " 'n_jobs': 4, \n", " 'n_estimators': 80}\n", "\n", "train_instance_type = 'ml.c5.xlarge'\n", "\n", "# Metrics to be captured from logs.\n", "metric_definitions = [{'Name': 'r_squared',\n", " 'Regex': 'r-squared: ([0-9\\\\.]+)'},\n", " {'Name': 'mse',\n", " 'Regex': 'MSE: ([0-9\\\\.]+)'}]\n", "\n", "estimator_parameters = {\n", " 'entry_point': './pipeline_scripts/train_deploy_scikitlearn_randomforestregressor.py',\n", " 'framework_version': '0.23-1',\n", " 'py_version': 'py3',\n", " 'instance_type': train_instance_type,\n", " 'instance_count': 1,\n", " 'hyperparameters': hyperparameters,\n", " 'role': role_arn,\n", " 'metric_definitions': metric_definitions,\n", " 'base_job_name': 'randomforestregressor-model',\n", " 'output_path': f's3://{bucket}/{s3_prefix}/'\n", "}\n", "\n", "estimator = SKLearn(**estimator_parameters)\n", "\n", "training_step = TrainingStep(\n", " name='TrainModel',\n", " estimator=estimator,\n", " inputs={\n", " 'train': TrainingInput(\n", " s3_data=preprocess_dataset_step.properties.ProcessingOutputConfig.Outputs[\n", " 'train'\n", " ].S3Output.S3Uri\n", " ),\n", " 'validation': TrainingInput(\n", " s3_data=preprocess_dataset_step.properties.ProcessingOutputConfig.Outputs[\n", " 'validation'\n", " ].S3Output.S3Uri\n", " )\n", " },\n", " cache_config=cache_config\n", ")" ] }, { "cell_type": "markdown", "id": "0fcdd83e", "metadata": {}, "source": [ "#### Model evaluation step" ] }, { "cell_type": "markdown", "id": "02abb56a", "metadata": {}, "source": [ "After the training step in our pipeline, we'll want to then evaluate our model's performance. To do that, we can create a SageMaker Processing Step that will utilize evaluation code (evaluation.py) that we specify to perform evaluation of the model using the test hold-out dataset that is output of the preprocess data step configured above. " ] }, { "cell_type": "code", "execution_count": null, "id": "dd97e600", "metadata": { "tags": [] }, "outputs": [], "source": [ "evaluation_processor = SKLearnProcessor(\n", " framework_version='0.23-1',\n", " role=role_arn,\n", " instance_type='ml.m5.xlarge',\n", " instance_count=processing_instance_count,\n", " base_job_name='evaluation',\n", " sagemaker_session=session,\n", ")" ] }, { "cell_type": "code", "execution_count": null, "id": "782d1a7e", "metadata": { "tags": [] }, "outputs": [], "source": [ "# Specify where we'll store the model evaluation results so\n", "# that other steps can access those results\n", "evaluation_report = PropertyFile(\n", " name='EvaluationReport',\n", " output_name='evaluation',\n", " path='evaluation.json',\n", ")\n", "\n", "evaluation_step = ProcessingStep(\n", " name='EvaluateModel',\n", " processor=evaluation_processor,\n", " inputs=[\n", " ProcessingInput(\n", " source=training_step.properties.ModelArtifacts.S3ModelArtifacts,\n", " destination='/opt/ml/processing/model',\n", " ),\n", " ProcessingInput(\n", " source=preprocess_dataset_step.properties.ProcessingOutputConfig.Outputs['test'].S3Output.S3Uri,\n", " destination='/opt/ml/processing/test',\n", " ),\n", " ],\n", " outputs=[\n", " ProcessingOutput(\n", " output_name='evaluation', source='/opt/ml/processing/evaluation'\n", " ),\n", " ],\n", " code='./pipeline_scripts/evaluation.py',\n", " property_files=[evaluation_report],\n", ")" ] }, { "cell_type": "markdown", "id": "3bd32643", "metadata": {}, "source": [ "#### Register model in Model Registry step" ] }, { "cell_type": "markdown", "id": "61b43c83", "metadata": {}, "source": [ "Once we've evaluated the model's peformance, we'll want to register the model version in SageMaker Model Registery IF the model is performing well according to the conditional criteria we've identified. For this, key metadata for each pipeline step will be captured in the model registry. \n", "\n", "The conditional step will be setup after this step because the model registration will have a dependency of meeting the objective criteria defined so the step must be defined before we can reference it in the conditional step. " ] }, { "cell_type": "code", "execution_count": null, "id": "3b15553d", "metadata": { "tags": [] }, "outputs": [], "source": [ "model_metrics = ModelMetrics(\n", " model_statistics=MetricsSource(\n", " s3_uri='{}/evaluation.json'.format(\n", " evaluation_step.arguments['ProcessingOutputConfig']['Outputs'][0]['S3Output'][\n", " 'S3Uri'\n", " ]\n", " ),\n", " content_type='application/json',\n", " )\n", ")\n", "\n", "model = Model(\n", " image_uri=estimator.training_image_uri(),\n", " model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts,\n", " entry_point=estimator.entry_point,\n", " role=role_arn,\n", " sagemaker_session=session\n", ")\n", "\n", "model_registry_args = model.register(\n", " content_types=['text/csv'],\n", " response_types=['application/json'],\n", " inference_instances=['ml.t2.medium', 'ml.m5.xlarge'],\n", " transform_instances=['ml.m5.xlarge'],\n", " model_package_group_name=model_package_group_name,\n", " approval_status='PendingManualApproval',\n", " model_metrics=model_metrics\n", ")\n", "\n", "register_step = ModelStep(\n", " name='RegisterModel',\n", " step_args=model_registry_args\n", ")" ] }, { "attachments": {}, "cell_type": "markdown", "id": "fb436e40", "metadata": {}, "source": [ "Because we only want to register the model if its performance meets a predefined threshold that we set, we now need to create a Condition Step that says if our model's MSE values is less than 320,000,000 then we'll registery the model." ] }, { "cell_type": "code", "execution_count": null, "id": "b6a45852", "metadata": { "tags": [] }, "outputs": [], "source": [ "# Condition step for evaluating model quality and branching execution\n", "\n", "cond_lte = ConditionLessThanOrEqualTo(\n", " left=JsonGet(\n", " step_name=evaluation_step.name,\n", " property_file=evaluation_report,\n", " json_path='regression_metrics.mse.value',\n", " ),\n", " right=320000000.0,\n", ")\n", "condition_step = ConditionStep(\n", " name='CheckEvaluation',\n", " conditions=[cond_lte],\n", " if_steps=[register_step],\n", " else_steps=[],\n", ")" ] }, { "cell_type": "markdown", "id": "92ae7857", "metadata": {}, "source": [ "#### Define & create the training pipeline" ] }, { "cell_type": "markdown", "id": "89d737ad", "metadata": {}, "source": [ "Next, we need to define the pipeline, `Pipeline()`, and then create the pipeline, `upsert()`. \n", "\n", "When defining the pipeline, the steps do not need to be in the order to be executed. SageMaker will automatically infer order based on dependencies between the steps. " ] }, { "cell_type": "code", "execution_count": null, "id": "6e00006e", "metadata": { "tags": [] }, "outputs": [], "source": [ "# pipeline_name = 'synthetic-housing-training-pipeline-{}'.format(strftime('%d-%H-%M-%S', gmtime()))\n", "pipeline_name = 'synthetic-housing-training-pipeline'\n", "step_list = [preprocess_dataset_step,\n", " training_step,\n", " evaluation_step,\n", " condition_step]\n", "\n", "training_pipeline = Pipeline(\n", " name=pipeline_name,\n", " parameters=[\n", " processing_instance_count\n", " ],\n", " steps=step_list\n", ")\n", "\n", "# Note: If an existing pipeline has the same name it will be overwritten.\n", "training_pipeline.upsert(role_arn=role_arn)\n", "\n", "# Viewing the pipeline definition will all the string variables interpolated may help debug pipeline bugs. It is commented out here due to length.\n", "#json.loads(training_pipeline.definition())" ] }, { "cell_type": "markdown", "id": "3e6e5187", "metadata": {}, "source": [ "#### Execute the training pipeline" ] }, { "cell_type": "code", "execution_count": null, "id": "5d3a5325", "metadata": { "tags": [] }, "outputs": [], "source": [ "# This is where you could optionally override parameter defaults \n", "execution = training_pipeline.start()" ] }, { "cell_type": "markdown", "id": "333d7ff1", "metadata": {}, "source": [ "Programmatically check on status of pipeline using describe() " ] }, { "cell_type": "code", "execution_count": null, "id": "4f8bc68a", "metadata": { "tags": [] }, "outputs": [], "source": [ "execution.describe()" ] }, { "cell_type": "code", "execution_count": null, "id": "4737fc46", "metadata": { "tags": [] }, "outputs": [], "source": [ "execution.list_steps()" ] }, { "cell_type": "markdown", "id": "b185a630-e5de-4c76-9940-70de4caeba9b", "metadata": {}, "source": [ "To visualize you pipeline steps as well as monitor and debug pipeline runs, you can utilize Studio's Pipeline interface by selecting **Home** from the left-hand menu then choosing **Pipelines** and navigating to the pipeline execution in process. You'll see a DAG similar to the one below where steps that have completed are green, steps in process are blue, and steps that have not yet run are in grey. You can also click on a step to understand inputs, outputs, logs (for debugging), as well as step metadata. \n", "\n", "\n", "![Pipelines](images/pipelines-view.png)\n" ] }, { "cell_type": "markdown", "id": "7f5e9f32-c623-4e14-a8b6-e47761f26c9f", "metadata": {}, "source": [ "(Optional) If you have time, feel free to start the pipeline again. Because we enabled step caching, you'll notice that SageMaker Pipelines is able to identify that the previous steps ran with the same input as the prior execution and you'll see cache hits identified by Pipelines. This allows you to avoid unnecessarily re-running pipeline steps saving not only time but cost of recomputing unnecessary tasks. \n", "\n", "![Pipelines](images/cache-hit.png)\n", "\n" ] }, { "cell_type": "markdown", "id": "8f84fabe", "metadata": {}, "source": [ "## Deployment pipeline with SageMaker Pipelines" ] }, { "cell_type": "markdown", "id": "b9770d04", "metadata": {}, "source": [ "Now let's create a separate pipeline that will take the model that was registered in Model Registry and deploy it as a SageMaker hosted endpoint. This is include to show how you could use SageMaker Pipelines to deploy a real-time endpoint but you should also consider the advantages of deploying through a CD pipeline that utilizes IaC/CaC for more advanced deployment strategies and rollback capabilities. \n", "\n", "For a real-time endpoint, you'll need to utilize a Lambda step (which is a great option for custom logic); however, for batch use cases there is a native SageMaker Pipeline step for a SageMaker Batch Transform to orchestrate your batch inference pipelines. " ] }, { "cell_type": "markdown", "id": "52a0f831", "metadata": {}, "source": [ "First we'll specify the input parameters to our deployment pipeline so that we can reuse it for other use cases." ] }, { "cell_type": "code", "execution_count": null, "id": "586cb9d0", "metadata": { "tags": [] }, "outputs": [], "source": [ "model_name = ParameterString(\n", " name='ModelName',\n", " default_value='my-awesome-model'\n", ")" ] }, { "cell_type": "markdown", "id": "b15e162f", "metadata": {}, "source": [ "Next, we'll create a Lambda function that will pull the specified model (or latest approved model) from the Model Registry and deploy as a Sagemaker endpoint." ] }, { "cell_type": "code", "execution_count": null, "id": "6a74a3c7", "metadata": { "tags": [] }, "outputs": [], "source": [ "lambda_name = 'sagemaker-pipelines-deploy-model'\n", "\n", "lambda_function = Lambda(\n", " function_name=lambda_name,\n", " execution_role_arn=lambda_role,\n", " script='./pipeline_scripts/lambda_deploy.py',\n", " handler='lambda_deploy.lambda_handler',\n", " timeout=600,\n", " memory_size=3000,\n", ")\n", "\n", "try:\n", " lambda_function_response = lambda_function.create()\n", " lambda_function_arn = lambda_function_response['FunctionArn']\n", " print(f'Lambda function arn: {lambda_function_arn}')\n", "except:\n", " print('Lambda function already exists!')" ] }, { "cell_type": "markdown", "id": "b514b2cd", "metadata": {}, "source": [ "Now we'll create a Lambda step for our pipeline and associate it with the new Lambda function we just created." ] }, { "cell_type": "code", "execution_count": null, "id": "7c6b6de8", "metadata": { "tags": [] }, "outputs": [], "source": [ "# The dictionary retured by the Lambda function is captured by LambdaOutput, each key in the dictionary corresponds to a\n", "# LambdaOutput\n", "\n", "output_param_1 = LambdaOutput(output_name='statusCode', output_type=LambdaOutputTypeEnum.String)\n", "output_param_2 = LambdaOutput(output_name='body', output_type=LambdaOutputTypeEnum.String)\n", "\n", "deploy_lambda_step = LambdaStep(\n", " name='LambdaStepDeploy',\n", " lambda_func=lambda_function,\n", " inputs={\n", " 'region': region,\n", " 'aws_account_id': aws_account_id,\n", " 'model_package_group_name': model_package_group_name,\n", " 'model_name': model_name,\n", " 'instance_count': 1,\n", " 'role_arn': role_arn\n", " },\n", " outputs=[\n", " output_param_1, \n", " output_param_2\n", " ],\n", ")" ] }, { "cell_type": "markdown", "id": "8d029f8c", "metadata": {}, "source": [ "Excellent, now we just need to assemble the pipeline." ] }, { "cell_type": "markdown", "id": "dc014f56", "metadata": {}, "source": [ "#### Define & create the deployment pipeline" ] }, { "cell_type": "code", "execution_count": null, "id": "dcad8242", "metadata": { "tags": [] }, "outputs": [], "source": [ "# pipeline_name = 'synthetic-housing-deployment-pipeline-{}'.format(strftime('%d-%H-%M-%S', gmtime()))\n", "pipeline_name = 'synthetic-housing-deployment-pipeline'\n", "step_list = [deploy_lambda_step]\n", "\n", "deployment_pipeline = Pipeline(\n", " name=pipeline_name,\n", " parameters=[\n", " model_name\n", " ],\n", " steps=step_list\n", ")\n", "\n", "# Note: If an existing pipeline has the same name it will be overwritten.\n", "deployment_pipeline.upsert(role_arn=role_arn)\n", "\n", "# Viewing the pipeline definition will all the string variables interpolated may help debug pipeline bugs. It is commented out here due to length.\n", "json.loads(deployment_pipeline.definition())" ] }, { "cell_type": "markdown", "id": "6b589851", "metadata": {}, "source": [ "#### Execute the deployment pipeline" ] }, { "cell_type": "code", "execution_count": null, "id": "c1f99d96", "metadata": { "tags": [] }, "outputs": [], "source": [ "deployed_model_name = 'my-xgboost-model'\n", "execution = deployment_pipeline.start(\n", " parameters = {\n", " 'ModelName': deployed_model_name\n", " }\n", ")" ] }, { "cell_type": "markdown", "id": "ec4558b9", "metadata": {}, "source": [ "Check on status of pipeline programmatically (shown below) or via the Studio Pipelines interface" ] }, { "cell_type": "code", "execution_count": null, "id": "afeb7a29", "metadata": { "tags": [] }, "outputs": [], "source": [ "execution.describe()" ] }, { "cell_type": "code", "execution_count": null, "id": "b7ca47c2", "metadata": { "tags": [] }, "outputs": [], "source": [ "execution.list_steps()" ] }, { "cell_type": "markdown", "id": "ba30241b", "metadata": {}, "source": [ "#### Test the SageMaker endpoint" ] }, { "cell_type": "markdown", "id": "cd9c29c1-15a7-45f5-9587-9af776f1d3aa", "metadata": {}, "source": [ "Let's now send some data to the endpoint and test it is working properly. \n", "\n", "You could optionally do this as part of the pipeline, in a Lambda step, which would be recommended as you mature your MLOps practices.\n", "\n", "For this, we first load our test data from Feature Store" ] }, { "cell_type": "code", "execution_count": null, "id": "5176339b", "metadata": { "tags": [] }, "outputs": [], "source": [ "# Read in test set that was used for batch transform\n", "fs_group = FeatureGroup(name=test_feature_group_name, sagemaker_session=session) \n", "query = fs_group.athena_query()\n", "table = query.table_name\n", "query_string = f'SELECT {features_to_select} FROM \"sagemaker_featurestore\".\"{table}\" ORDER BY record_id'\n", "query_results= 'sagemaker-featurestore'\n", "output_location = f's3://{bucket}/{query_results}/query_results/'\n", "query.run(query_string=query_string, output_location=output_location)\n", "query.wait()\n", "df = query.as_dataframe()\n", "df.head()" ] }, { "cell_type": "markdown", "id": "88096555-6338-4335-96ed-6e51328306db", "metadata": {}, "source": [ "Then we query the endpoint once it is available" ] }, { "cell_type": "code", "execution_count": null, "id": "5df224d2", "metadata": { "tags": [] }, "outputs": [], "source": [ "response_status = 'None'\n", "while response_status != 'InService':\n", " if response_status != 'None':\n", " print(f'Waiting for the endpoint deployment to finish. Current endpoint status: {response_status}')\n", " time.sleep(120) # wait until endpoint is in service\n", " response = sagemaker_client.describe_endpoint(\n", " EndpointName=deployed_model_name+'-endpoint'\n", " )\n", " response_status = response['EndpointStatus']\n", "# Attach to the SageMaker endpoint\n", "predictor = Predictor(endpoint_name=deployed_model_name+'-endpoint',\n", " sagemaker_session=session,\n", " serializer=CSVSerializer(),\n", " deserializer=CSVDeserializer())\n", "\n", "# Get a real-time prediction\n", "predictor.predict(df.drop(columns=[\"price\"]).to_csv(index=False, header=False))[0]" ] }, { "cell_type": "code", "execution_count": null, "id": "334e3bc9-6155-4c28-824f-080fca64bd31", "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "instance_type": "ml.t3.medium", "kernelspec": { "display_name": "Python 3 (Data Science)", "language": "python", "name": "python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-east-1:081325390199:image/datascience-1.0" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.10" }, "toc-autonumbering": true }, "nbformat": 4, "nbformat_minor": 5 }