{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# SageMaker Pipelines Customer Churn Prediction\n", "\n", "Amazon SageMaker Model Building Pipelines offers machine learning (ML) application developers and operations engineers the ability to orchestrate SageMaker jobs and author reproducible ML pipelines. It also enables them to deploy custom-build models for inference in real-time with low latency, run offline inferences with Batch Transform, and track lineage of artifacts. They can institute sound operational practices in deploying and monitoring production workflows, deploying model artifacts, and tracking artifact lineage through a simple interface, adhering to safety and best practice paradigms for ML application development.\n", "\n", "The SageMaker Pipelines service supports a SageMaker Pipeline domain specific language (DSL), which is a declarative JSON specification. This DSL defines a directed acyclic graph (DAG) of pipeline parameters and SageMaker job steps. The SageMaker Python Software Developer Kit (SDK) streamlines the generation of the pipeline DSL using constructs that engineers and scientists are already familiar with." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## SageMaker Pipelines\n", "\n", "SageMaker Pipelines supports the following activities, which are demonstrated in this notebook:\n", "\n", "* Pipelines - A DAG of steps and conditions to orchestrate SageMaker jobs and resource creation.\n", "* Processing job steps - A simplified, managed experience on SageMaker to run data processing workloads, such as feature engineering, data validation, model evaluation, and model interpretation.\n", "* Training job steps - An iterative process that teaches a model to make predictions by presenting examples from a training dataset.\n", "* Conditional execution steps - A step that provides conditional execution of branches in a pipeline.\n", "* Register model steps - A step that creates a model package resource in the Model Registry that can be used to create deployable models in Amazon SageMaker.\n", "* Create model steps - A step that creates a model for use in transform steps or later publication as an endpoint.\n", "* Parametrized Pipeline executions - Enables variation in pipeline executions according to specified parameters." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Notebook Overview\n", "\n", "This notebook shows how to:\n", "\n", "* Define a set of Pipeline parameters that can be used to parametrize a SageMaker Pipeline.\n", "* Define a Processing step that extracts data from feature store to create the train, validation and test data sets.\n", "* Define a Training step that trains a model on the preprocessed train data set.\n", "* Define a Processing step that evaluates the trained model's performance on the test dataset.\n", "* Define a Create Model step that creates a model from the model artifacts used in training.\n", "* Define a Conditional step that measures a condition based on output from prior steps and conditionally executes other steps.\n", "* Define a Register Model step that creates a model package from the estimator and model artifacts used to train the model.\n", "* Define a Processing step that delopy a real-time endpoint from the trained model.\n", "* Define and create a Pipeline definition in a DAG, with the defined parameters and steps.\n", "* Start a Pipeline execution and wait for execution to complete.\n", "* Download the model evaluation report from the S3 bucket for examination.\n", "* Clean up all artifacts created in this sample code." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## A SageMaker Pipeline\n", "\n", "The pipeline that you create follows a typical machine learning (ML) application pattern of preprocessing, training, evaluation, model creation, model registration, batch transform and endpoint deployment.\n", "\n", "
\n", "
ML workflow with SageMaker Pipeline
\n", "
\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 1. Preparation\n", "\n", "**Required IAM policies**\n", "- IAMFullAccess\n", "- AmazonSQSFullAccess" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 1.1 Serverless services preparation" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In this session, we create resources including `SQS`, `Lambda Function`, `Step Functions` and `AWS Glue job` for data preprocessing. We use [AWS Cloud Development Kit (AWS CDK)](https://docs.aws.amazon.com/cdk/v1/guide/home.html) to create these resources by following the steps beflow:\n", "\n", "- Step-1, clone this repo to local or [cloud9](https://aws.amazon.com/cloud9/)\n", "- Step-2, [install and set CDK environment](https://docs.aws.amazon.com/cdk/v1/guide/getting_started.html), we use cdk1.170.0 in this repo\n", "- Step-3, run `cd {project_root}/sagemaker-pipeline/cfn`\n", "- Step-4, run `cdk synth`\n", "- Step-5, run `cdk deploy --CfnStack --parameters ExecutionRoleArn={the role name of your current notebook instance}`. you can get the following result once deployment succeeded.\n", "
\n", " \n", "
" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Copy the `CfnStack.SqsURL` and will use it in Session-4." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 1.2 Python libraries import" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Let's start by specifying:**\n", "\n", "- The S3 bucket and prefix that you want to use for training and model data. This should be within the same region as the Notebook Instance, training, and hosting.\n", "- The IAM role arn used to give training and hosting access to your data. See the documentation for how to create these. Note, if more than one role is required for notebook instances, training, and/or hosting, please replace the boto regexp with a the appropriate full IAM role arn string(s)." ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "import json\n", "import os\n", "\n", "import boto3\n", "import sagemaker\n", "import sagemaker.session\n", "from sagemaker import get_execution_role\n", "\n", "import sagemaker\n", "from sagemaker.workflow.callback_step import (\n", " CallbackOutput,\n", " CallbackOutputTypeEnum,\n", " CallbackStep,\n", ")\n", "from sagemaker.workflow.lambda_step import (\n", " LambdaOutput,\n", " LambdaOutputTypeEnum,\n", " LambdaStep,\n", ")\n", "from sagemaker.workflow.parameters import (\n", " ParameterInteger,\n", " ParameterString,\n", ")\n", "from sagemaker.workflow.steps import (\n", " ProcessingStep,\n", " Step,\n", " TrainingStep,\n", " CreateModelStep\n", ")\n", "\n", "from sagemaker.workflow.condition_step import ConditionStep\n", "from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo\n", "from sagemaker.workflow.functions import Join, JsonGet\n", "from sagemaker.workflow.pipeline import Pipeline\n", "from sagemaker.workflow.step_collections import RegisterModel\n", "from sagemaker.workflow.properties import PropertyFile\n", "\n", "from sagemaker.inputs import TrainingInput, CreateModelInput\n", "from sagemaker.model_metrics import MetricsSource, ModelMetrics\n", "from sagemaker.sklearn.processing import SKLearnProcessor\n", "from sagemaker.processing import (\n", " ProcessingInput,\n", " ProcessingOutput,\n", " Processor,\n", " ScriptProcessor,\n", ")\n", "\n", "from sagemaker.xgboost.estimator import XGBoost\n", "from sagemaker.model import Model\n", "\n", "import uuid" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "role = get_execution_role()\n", "\n", "region = boto3.Session().region_name\n", "\n", "sagemaker_session = sagemaker.Session()\n", "account = sagemaker_session.boto_session.client(\"sts\").get_caller_identity()[\"Account\"]\n", "\n", "bucket=sagemaker.Session().default_bucket()\n", "prefix = 'sagemaker/DEMO-xgboost-customer-churn-connect'\n", "base_job_prefix = 'Demo-xgboost-churn-connect'" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "'sagemaker-us-east-1-822507008821'" ] }, "execution_count": 3, "metadata": {}, "output_type": "execute_result" } ], "source": [ "bucket" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "upload data to s3 input location" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "/home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages/boto3/compat.py:88: PythonDeprecationWarning: Boto3 will no longer support Python 3.6 starting May 30, 2022. To continue receiving service updates, bug fixes, and security updates please upgrade to Python 3.7 or later. More information can be found here: https://aws.amazon.com/blogs/developer/python-support-policy-updates-for-aws-sdks-and-tools/\n", " warnings.warn(warning, PythonDeprecationWarning)\n" ] } ], "source": [ "s3_client = boto3.client(\"s3\")\n", "s3_client.upload_file('../data/churn_processed.csv', bucket, prefix+'/input/churn_processed.csv')\n", "inputDir = f\"s3://{bucket}/{prefix}/input/\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 2. Define Parameters to Parametrize Pipeline Execution\n", "\n", "Define Pipeline parameters that you can use to parametrize the pipeline. Parameters enable custom pipeline executions and schedules without having to modify the Pipeline definition.\n", "\n", "The supported parameter types include:\n", "\n", "* `ParameterString` - represents a `str` Python type\n", "* `ParameterInteger` - represents an `int` Python type\n", "* `ParameterFloat` - represents a `float` Python type\n", "\n", "These parameters support providing a default value, which can be overridden on pipeline execution. The default value specified should be an instance of the type of the parameter.\n", "\n", "The parameters defined in this workflow include:\n", "\n", "* `processing_instance_type` - The `ml.*` instance type of the processing job.\n", "* `processing_instance_count` - The instance count of the processing job.\n", "* `training_instance_type` - The `ml.*` instance type of the training job.\n", "* `train_instance_count` - The instance count of the training job.\n", "* `model_approval_status` - What approval status to register the trained model with for CI/CD purposes ( \"PendingManualApproval\" is the default).\n", "* `model_output` - The S3 bucket URI location of the model output path" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**You must have run the previous sequential notebooks to retrieve variables using the StoreMagic command.**" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "# parameters for pipeline execution\n", "\n", "processing_instance_count = ParameterInteger(\n", " name=\"ProcessingInstanceCount\", default_value=1\n", ")\n", "processing_instance_type = ParameterString(\n", " name=\"ProcessingInstanceType\", default_value=\"ml.m5.xlarge\"\n", ")\n", "\n", "train_instance_count = ParameterInteger(\n", " name=\"TrainingInstanceCount\",\n", " default_value=1,\n", ")\n", "train_instance_type = ParameterString(\n", " name=\"TrainingInstance\",\n", " default_value=\"ml.m5.xlarge\",\n", ")\n", "\n", "model_approval_status = ParameterString(\n", " name=\"ModelApprovalStatus\",\n", " default_value=\"Approved\",\n", " enum_values=[\n", " \"PendingManualApproval\",\n", " \"Approved\",\n", " ],\n", ")\n", "model_output = ParameterString(\n", " name=\"ModelOutputUrl\",\n", " default_value=f\"s3://{bucket}/{prefix}/model\",\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 3. Pipeline Steps\n", "### 3.1 Callback Step\n", "First, we'll configure the callback step.\n", "\n", "The callback step will accept the following inputs:\n", "- S3 location for train/validation/test dataset\n", "- S3 location for the raw data\n", "- arn of the Step Functions for preprocessing data\n", "\n", "The callback step will return the following outputs:\n", "\n", "- S3 location of processed data to be used for model training, including train data, validation data and test data." ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [], "source": [ "queue_url = \"https://sqs.us-east-1.amazonaws.com/822507008821/CfnStack-pipelinecallbacksglueprep0F0C1313-GKfQjQWTy24d\" # Use CfnStack.SqsURL generated in cdk deployment." ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [], "source": [ "from sagemaker.workflow.callback_step import CallbackStep,CallbackOutput,CallbackOutputTypeEnum\n", "\n", "train_uri = CallbackOutput(output_name=\"trainUri\", output_type=CallbackOutputTypeEnum.String)\n", "val_uri = CallbackOutput(output_name=\"valUri\", output_type=CallbackOutputTypeEnum.String)\n", "test_uri = CallbackOutput(output_name=\"testUri\", output_type=CallbackOutputTypeEnum.String)\n", "\n", "step_callback_data = CallbackStep(\n", " name=\"GluePrepCallbackStep\",\n", " sqs_queue_url=queue_url,\n", " inputs={\n", " \"trainUri\": f\"s3://{bucket}/{prefix}/processed/train/train.csv\",\n", " \"valUri\": f\"s3://{bucket}/{prefix}/processed/validation/validation.csv\",\n", " \"testUri\": f\"s3://{bucket}/{prefix}/processed/test/test.csv\",\n", " \"inputDir\": inputDir\n", " },\n", " outputs=[\n", " train_uri,\n", " val_uri,\n", " test_uri\n", " ]\n", " )" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 3.2 Define a TrainStep\n", "\n", "#### Define a Training Step to Train a Model\n", "\n", "In this section, use Amazon SageMaker's [XGBoost Algorithm](https://docs.aws.amazon.com/sagemaker/latest/dg/xgboost.html) to train on this dataset. Configure an Estimator for the XGBoost algorithm and the input dataset. A typical training script loads data from the input channels, configures training with hyperparameters, trains a model, and saves a model to `model_output` so that it can be hosted later. \n", "\n", "The model path where the models from training will be saved is also specified.\n", "\n", "Note the `training_instance_type` parameter may be used in multiple places in the pipeline. In this case, the `training_instance_type` and `train_instance_count` is passed into the estimator." ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [], "source": [ "# training step for generating model artifacts\n", "image_uri = sagemaker.image_uris.retrieve(\n", " framework=\"xgboost\",\n", " region=region,\n", " version=\"1.0-1\",\n", " py_version=\"py3\",\n", ")\n", "xgb_train = sagemaker.estimator.Estimator(\n", " image_uri=image_uri,\n", " instance_type=train_instance_type,\n", " instance_count=train_instance_count,\n", " output_path=model_output,\n", " base_job_name=f\"{base_job_prefix}-train\",\n", " sagemaker_session=sagemaker_session,\n", " role=role,\n", ")\n", "\n", "# Set some hyper parameters\n", "# https://docs.aws.amazon.com/sagemaker/latest/dg/xgboost_hyperparameters.html\n", "xgb_train.set_hyperparameters(\n", " max_depth=5,\n", " eta=0.2,\n", " gamma=4,\n", " min_child_weight=6,\n", " subsample=0.8,\n", " silent=0,\n", " objective=\"binary:logistic\",\n", " num_round=100,\n", " eval_metric='auc'\n", ")\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Finally, use the estimator instance to construct a `TrainingStep` as well as the `properties` of the prior `ProcessingStep` used as input in the `TrainingStep` inputs and the code that's executed when the pipeline invokes the pipeline execution. This is similar to an estimator's `fit` method in the Python SDK.\n", "\n", "Pass in the `S3Uri` of the `\"train\"` output channel to the `TrainingStep`. Also, use the other `\"validation\"` output channel for model evaluation in the pipeline. The `properties` attribute of a Pipeline step matches the object model of the corresponding response of a describe call. These properties can be referenced as placeholder values and are resolved at runtime. For example, the `ProcessingStep` `properties` attribute matches the object model of the [DescribeProcessingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeProcessingJob.html) response object." ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [], "source": [ "step_train = TrainingStep(\n", " name=\"TrainModel\",\n", " estimator=xgb_train,\n", " inputs={\n", " \"train\": TrainingInput(\n", " s3_data=train_uri,\n", " content_type=\"text/csv\",\n", " ),\n", " \"validation\": TrainingInput(\n", " s3_data=val_uri,\n", " content_type=\"text/csv\",\n", " ),\n", " },\n", "\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 3.3 Define a Model Evaluation Step to Evaluate the Trained Model\n", "\n", "First, develop an evaluation script that is specified in a Processing step that performs the model evaluation.\n", "\n", "After pipeline execution, you can examine the resulting `evaluation.json` for analysis.\n", "\n", "The evaluation script uses `xgboost` to do the following:\n", "\n", "* Load the model.\n", "* Read the test data.\n", "* Issue predictions against the test data.\n", "* Build a classification report, including mae, mse, rmse and r2 metrics.\n", "* Save the evaluation report to the evaluation directory." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Next, create an instance of a `ScriptProcessor` processor and use it in the `ProcessingStep`.\n", "\n", "Note the `processing_instance_type` parameter passed into the processor." ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [], "source": [ "# processing step for evaluation\n", "script_eval = ScriptProcessor(\n", " image_uri=image_uri,\n", " command=[\"python3\"],\n", " instance_type=processing_instance_type,\n", " instance_count=1,\n", " base_job_name=f\"{base_job_prefix}/script-eval\",\n", " sagemaker_session=sagemaker_session,\n", " role=role,\n", ")\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Use the processor instance to construct a `ProcessingStep`, along with the input and output channels and the code that will be executed when the pipeline invokes pipeline execution. This is similar to a processor instance's `run` method in the Python SDK.\n", "\n", "Specifically, the `S3ModelArtifacts` from the `step_train` `properties` and the `S3Uri` of the `\"test\"` output channel of the `step_process` `properties` are passed into the inputs. The `TrainingStep` and `ProcessingStep` `properties` attribute matches the object model of the [DescribeTrainingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeTrainingJob.html) and [DescribeProcessingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeProcessingJob.html) response objects, respectively." ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [], "source": [ "evaluation_report = PropertyFile(\n", " name=\"EvaluationReport\",\n", " output_name=\"evaluation\",\n", " path=\"evaluation.json\",\n", ")\n", "eval_output = f\"s3://{bucket}/{prefix}/Evaluation/output/\"\n", "step_eval = ProcessingStep(\n", " name=\"EvaluateModel\",\n", " processor=script_eval,\n", " inputs=[\n", " ProcessingInput(\n", " source=step_train.properties.ModelArtifacts.S3ModelArtifacts,\n", " destination=\"/opt/ml/processing/model\",\n", " ),\n", " ProcessingInput(\n", " source=f\"s3://{bucket}/{prefix}/processed/test/test.csv\",\n", " destination=\"/opt/ml/processing/test\",\n", " ),\n", " ],\n", " outputs=[\n", " ProcessingOutput(\n", " output_name=\"evaluation\", \n", " source=\"/opt/ml/processing/evaluation\",\n", " destination=eval_output\n", " ),\n", " ],\n", " code=\"./code/evaluation.py\",\n", " property_files=[evaluation_report],\n", "\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 3.4 Define a Create Model Step to Create a Model\n", "\n", "In order to perform batch transformation using the example model, create a SageMaker model. \n", "\n", "Specifically, pass in the `S3ModelArtifacts` from the `TrainingStep`, `step_train` properties. The `TrainingStep` `properties` attribute matches the object model of the [DescribeTrainingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeTrainingJob.html) response object." ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [], "source": [ "model = Model(\n", " image_uri=image_uri,\n", " model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,\n", " sagemaker_session=sagemaker_session,\n", " role=role,\n", ")\n", "\n", "inputs = CreateModelInput(\n", " instance_type=\"ml.m5.xlarge\",\n", ")\n", "step_create_model = CreateModelStep(\n", " name=\"CustomerChurnCreateModel\",\n", " model=model,\n", " inputs=inputs,\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 3.5 Define a Register Model Step to Create a Model Package\n", "\n", "Use the estimator instance specified in the training step to construct an instance of `RegisterModel`. The result of executing `RegisterModel` in a pipeline is a model package. A model package is a reusable model artifacts abstraction that packages all ingredients required for inference. Primarily, it consists of an inference specification that defines the inference image to use along with an optional model weights location.\n", "\n", "A model package group is a collection of model packages. A model package group can be created for a specific ML business problem, and new versions of the model packages can be added to it. Typically, customers are expected to create a ModelPackageGroup for a SageMaker pipeline so that model package versions can be added to the group for every SageMaker Pipeline run.\n", "\n", "The construction of `RegisterModel` is similar to an estimator instance's `register` method in the Python SDK.\n", "\n", "Specifically, pass in the `S3ModelArtifacts` from the `TrainingStep`, `step_train` properties. The `TrainingStep` `properties` attribute matches the object model of the [DescribeTrainingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeTrainingJob.html) response object.\n", "\n", "Note that the specific model package group name provided in this notebook can be used in the model registry and CI/CD work with SageMaker Projects." ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [], "source": [ "# register model step that will be conditionally executed\n", "model_metrics = ModelMetrics(\n", " model_statistics=MetricsSource(\n", " s3_uri=eval_output,\n", " content_type=\"application/json\",\n", " )\n", ")\n", "model_package_group_name = \"CustomerChurnModelPackage\"\n", "step_register = RegisterModel(\n", " name=\"RegisterModel\",\n", " estimator=xgb_train,\n", " model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,\n", " content_types=[\"text/csv\"],\n", " response_types=[\"text/csv\"],\n", " inference_instances=[\"ml.t2.medium\", \"ml.t2.large\", \"ml.m5.large\"],\n", " transform_instances=[\"ml.m5.large\"],\n", " model_package_group_name=model_package_group_name,\n", " approval_status=model_approval_status,\n", " model_metrics=model_metrics,\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 3.6 Deploy a model" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [], "source": [ "deploy_model_script_uri = f\"s3://{bucket}/{prefix}/code/deploy_model.py\"\n", "deploy_model_instance_type = \"ml.m4.xlarge\"\n", "endpoint_name = \"demo-customer-churn-xgboost\"\n", "\n", "s3_client.upload_file(\n", " Filename=\"code/deploy_model.py\", Bucket=bucket, Key=f\"{prefix}/code/deploy_model.py\"\n", ")\n", "\n", "deploy_model_processor = SKLearnProcessor(\n", " framework_version=\"0.23-1\",\n", " role=role,\n", " instance_type=\"ml.t3.medium\",\n", " instance_count=1,\n", " base_job_name=\"customer-churn-deploy-model\",\n", " sagemaker_session=sagemaker_session,\n", ")\n", "\n", "deploy_step = ProcessingStep(\n", " name=\"DeployModel\",\n", " processor=deploy_model_processor,\n", " job_arguments=[\n", " \"--model-name\",\n", " step_create_model.properties.ModelName,\n", " \"--region\",\n", " region,\n", " \"--endpoint-instance-type\",\n", " deploy_model_instance_type,\n", " \"--endpoint-name\",\n", " endpoint_name,\n", " ],\n", " code=deploy_model_script_uri,\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 3.7 Define a Condition Step to Check Accuracy and Conditionally Create a Model and Run a Batch Transformation and Register a Model in the Model Registry\n", "\n", "In this step, the model is registered only if the accuracy of the model, as determined by the evaluation step `step_eval`, exceeded a specified value. A `ConditionStep` enables pipelines to support conditional execution in the pipeline DAG based on the conditions of the step properties. \n", "\n", "In the following section, you:\n", "\n", "* Define a `ConditionLessThanOrEqualTo` on the accuracy value found in the output of the evaluation step, `step_eval`.\n", "* Use the condition in the list of conditions in a `ConditionStep`.\n", "* Pass the `CreateModelStep` and `TransformStep` steps, and the `RegisterModel` step collection into the `if_steps` of the `ConditionStep`, which are only executed, if the condition evaluates to `True`." ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [], "source": [ "# condition step for evaluating model quality and branching execution\n", "cond_lte = ConditionGreaterThanOrEqualTo(\n", " left=JsonGet(\n", " step_name=step_eval.name,\n", " property_file=evaluation_report,\n", " json_path=\"binary_classification_metrics.accuracy.value\",\n", " ),\n", " right=0.95,\n", ")\n", "step_cond = ConditionStep(\n", " name=\"CheckEvaluation\",\n", " conditions=[cond_lte],\n", " if_steps=[step_create_model, step_register, \n", " deploy_step],\n", " else_steps=[],\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 3.8 Define a Pipeline of Parameters, Steps, and Conditions\n", "\n", "In this section, combine the steps into a Pipeline so it can be executed.\n", "\n", "A pipeline requires a `name`, `parameters`, and `steps`. Names must be unique within an `(account, region)` pair.\n", "\n", "Note:\n", "\n", "* All of the parameters used in the definitions must be present.\n", "* Steps passed into the pipeline do not have to be listed in the order of execution. The SageMaker Pipeline service resolves the _data dependency_ DAG as steps for the execution to complete.\n", "* Steps must be unique to across the pipeline step list and all condition step if/else lists." ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [], "source": [ "# pipeline instance\n", "pipeline = Pipeline(\n", " name=\"Demo-customer-churn-pipeline\",\n", " parameters=[\n", " processing_instance_count,\n", " processing_instance_type,\n", " train_instance_count,\n", " train_instance_type,\n", " model_approval_status,\n", " model_output\n", " ],\n", " steps=[step_callback_data,\n", " step_train,\n", " step_eval,\n", " step_cond],\n", " sagemaker_session=sagemaker_session,\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 3.9 Submit the pipeline to SageMaker and start execution\n", "\n", "Submit the pipeline definition to the Pipeline service. The role passed in will be used by the Pipeline service to create all the jobs defined in the steps." ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config\n", "No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config\n" ] }, { "data": { "text/plain": [ "{'PipelineArn': 'arn:aws:sagemaker:us-east-1:822507008821:pipeline/demo-customer-churn-pipeline',\n", " 'ResponseMetadata': {'RequestId': '37afb3c6-88d9-4438-b997-faa631211f80',\n", " 'HTTPStatusCode': 200,\n", " 'HTTPHeaders': {'x-amzn-requestid': '37afb3c6-88d9-4438-b997-faa631211f80',\n", " 'content-type': 'application/x-amz-json-1.1',\n", " 'content-length': '96',\n", " 'date': 'Fri, 02 Sep 2022 08:23:27 GMT'},\n", " 'RetryAttempts': 0}}" ] }, "execution_count": 17, "metadata": {}, "output_type": "execute_result" } ], "source": [ "pipeline.upsert(role_arn=role)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Pipeline Operations: Examining and Waiting for Pipeline Execution\n", "\n", "Describe the pipeline execution." ] }, { "cell_type": "code", "execution_count": 18, "metadata": {}, "outputs": [], "source": [ "execution = pipeline.start()" ] }, { "cell_type": "code", "execution_count": 19, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "{'PipelineArn': 'arn:aws:sagemaker:us-east-1:822507008821:pipeline/demo-customer-churn-pipeline',\n", " 'PipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:822507008821:pipeline/demo-customer-churn-pipeline/execution/z0q7hrouxfcu',\n", " 'PipelineExecutionDisplayName': 'execution-1662107007900',\n", " 'PipelineExecutionStatus': 'Executing',\n", " 'CreationTime': datetime.datetime(2022, 9, 2, 8, 23, 27, 828000, tzinfo=tzlocal()),\n", " 'LastModifiedTime': datetime.datetime(2022, 9, 2, 8, 23, 27, 828000, tzinfo=tzlocal()),\n", " 'CreatedBy': {},\n", " 'LastModifiedBy': {},\n", " 'ResponseMetadata': {'RequestId': '73da386b-3548-41b6-8047-f9c7e62264d6',\n", " 'HTTPStatusCode': 200,\n", " 'HTTPHeaders': {'x-amzn-requestid': '73da386b-3548-41b6-8047-f9c7e62264d6',\n", " 'content-type': 'application/x-amz-json-1.1',\n", " 'content-length': '421',\n", " 'date': 'Fri, 02 Sep 2022 08:23:28 GMT'},\n", " 'RetryAttempts': 0}}" ] }, "execution_count": 19, "metadata": {}, "output_type": "execute_result" } ], "source": [ "execution.describe()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Wait for the execution to complete.\n", "# execution.wait()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 3.10 Examining the Evalution\n", "\n", "Examine the resulting model evaluation after the pipeline completes. Download the resulting `evaluation.json` file from S3 and print the report." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from pprint import pprint\n", "\n", "\n", "evaluation_json = sagemaker.s3.S3Downloader.read_file(\"{}evaluation.json\".format(\n", " step_eval.arguments[\"ProcessingOutputConfig\"][\"Outputs\"][0][\"S3Output\"][\"S3Uri\"]\n", "))\n", "pprint(json.loads(evaluation_json))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Lineage\n", "\n", "Review the lineage of the artifacts generated by the pipeline." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import time\n", "from sagemaker.lineage.visualizer import LineageTableVisualizer\n", "\n", "\n", "viz = LineageTableVisualizer(sagemaker.session.Session())\n", "for execution_step in reversed(execution.list_steps()):\n", " print(execution_step)\n", " display(viz.show(pipeline_execution_step=execution_step))\n", " time.sleep(5)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 4. Clean up" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Delete pipeline\n", "pipeline.delete()\n", "\n", "sfn_client = boto3.client('sagemaker')\n", "sfn_client.delete_endpoint(EndpointName=endpoint_name)\n", "\n", "# Delete artefacts\n", "!aws s3 rm s3://$bucket/$prefix" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "You can go to [CloudFormation console](https://us-east-1.console.aws.amazon.com/cloudformation/home?region=us-east-1#/stacks?filteringStatus=active&filteringText=&viewNested=true&hideStacks=false) to delete the stack to remove all serverless resources such sqs, lambda and glue." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# GitHub Resource\n", "This demo is available on GitHub: https://github.com/aws-samples/real-time-churn-prediction-with-amazon-connect-and-amazon-sagemaker" ] } ], "metadata": { "instance_type": "ml.t3.medium", "kernelspec": { "display_name": "conda_python3", "language": "python", "name": "conda_python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.13" } }, "nbformat": 4, "nbformat_minor": 4 }