{ "cells": [ { "cell_type": "markdown", "id": "ce11b1f7-803e-4623-8499-97478594f1e9", "metadata": {}, "source": [ "# Option 1. Train Pipeline (SageMaker Pipelines)" ] }, { "cell_type": "markdown", "id": "67f8f66b-5391-484d-af30-273f6039bb8d", "metadata": {}, "source": [ "- [Overview](#overview)\n", "- [Build the pipeline components](#build-the-pipeline-components)\n", " 1. [Import statements and declare parameters and constants](#import-statements-and-declare-parameters-and-constants)\n", " 2. [Collect and prepare data](#collect-and-prepare-data)\n", " 3. [Define Processing Step](#define-processing-step)\n", " 4. [Define HyperParameter Tuning Step](#define-hyperparameter-tuning-step)\n", " 5. [Define the evaluation script and model evaluation step](#define-the-evaluation-script-and-model-evaluation-step)\n", " 6. [Define a register model step](#define-a-register-model-step)\n", " 7. [Define a condition step to check AUC score](#define-a-condition-step-to-check-auc-score)\n", "- [Build and Trigger the pipeline run](#build-and-trigger-the-pipeline-run)" ] }, { "cell_type": "markdown", "id": "189b06ac-9c5d-49ca-8df6-dc53283494be", "metadata": {}, "source": [ "## Overview" ] }, { "cell_type": "markdown", "id": "2f4cd6d8-b36d-474a-af41-027c26cbff06", "metadata": {}, "source": [ "The following diagram illustrates the high-level architecture of the ML workflow with the different steps to train the model.\n", "\n", "![](images/Train_Pipeline_Pattern.png)\n", "\n", "Train Pipeline consists of the following steps:\n", "\n", "1. Preprocess data to build features required and split data into train, validation, and test datasets.\n", "2. Apply hyperparameter tuning based on the ranges provided with the SageMaker XGBoost framework to give the best model, which is determined based on AUC score.\n", "3. Evaluate the trained model using the test dataset and check if the AUC score is above a predefined threshold.\n", "4. Check if the AUC score is greater than the threshold, if true register the model into SageMaker model registry." ] }, { "cell_type": "markdown", "id": "ad0c0074-467f-46e9-a57b-091e8db6d171", "metadata": {}, "source": [ "## Build the pipeline components" ] }, { "cell_type": "markdown", "id": "c4d89f34-3526-41df-a964-5f2a7112f48c", "metadata": {}, "source": [ "### Step 1: Import statements and declare parameters and constants" ] }, { "cell_type": "code", "execution_count": 2, "id": "70816378-1c97-4511-931c-b1dbd3d91e4c", "metadata": { "tags": [] }, "outputs": [], "source": [ "import boto3 \n", "import pandas as pd \n", "import sagemaker \n", "from sagemaker.workflow.pipeline_context import PipelineSession \n", "\n", "s3_client = boto3.resource('s3') \n", "pipeline_name = f\"sagemaker-immersion-train-pipeline\" \n", "sagemaker_session = sagemaker.session.Session() \n", "region = sagemaker_session.boto_region_name \n", "role = sagemaker.get_execution_role() \n", "pipeline_session = PipelineSession() \n", "default_bucket = sagemaker_session.default_bucket() \n", "model_package_group_name = f\"ChurnModelPackageGroup\"" ] }, { "cell_type": "code", "execution_count": 3, "id": "7c0d50be-8cef-45e6-9f7c-827c05a98c27", "metadata": { "tags": [] }, "outputs": [], "source": [ "from sagemaker.workflow.parameters import ( \n", " ParameterInteger, \n", " ParameterString, \n", " ParameterFloat) \n", "\n", "auc_score_threshold = 0.75 \n", "base_job_prefix = \"churn-example\"\n", "processing_instance_count = ParameterInteger(name=\"ProcessingInstanceCount\", default_value=1)\n", "processing_instance_type = ParameterString( name=\"ProcessingInstanceType\", default_value=\"ml.m5.xlarge\") \n", "training_instance_type = ParameterString( name=\"TrainingInstanceType\", default_value=\"ml.m5.xlarge\") \n", "input_data = \"storedata_total.csv\" \n", "model_approval_status = ParameterString( name=\"ModelApprovalStatus\", default_value=\"PendingManualApproval\")\n" ] }, { "cell_type": "markdown", "id": "a5dfc86a-7e27-40f3-aa27-0748be17fc6a", "metadata": {}, "source": [ "### Step 2: Collect and prepare data" ] }, { "cell_type": "markdown", "id": "d0f50d77-c9c9-48c1-a960-e88fe27d6b3f", "metadata": {}, "source": [ "To follow along with this lab, you need to download and save the [_sample dataset_](https://www.kaggle.com/uttamp/store-data) into the project directly within the SageMaker Studio environment." ] }, { "cell_type": "code", "execution_count": 4, "id": "8f3fbb77-805e-4c7f-8c0c-a80f596c7178", "metadata": { "tags": [] }, "outputs": [], "source": [ "# convert the store_data file into csv format \n", "store_data = pd.read_excel(\"storedata_total.xlsx\") \n", "store_data.to_csv(\"storedata_total.csv\")" ] }, { "cell_type": "markdown", "id": "fa42ce11-837e-4d3c-bcba-ea37e9e43bed", "metadata": {}, "source": [ "### Step 3: Define Processing Step" ] }, { "cell_type": "code", "execution_count": 4, "id": "e85b6816-3ef0-43ec-a5eb-615a8f804917", "metadata": { "tags": [] }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Overwriting ML Pipelines scripts/churn_preprocess.py\n" ] } ], "source": [ "%%writefile \"ML Pipelines scripts/churn_preprocess.py\"\n", "\n", "import os\n", "import tempfile\n", "import numpy as np\n", "import pandas as pd\n", "import datetime as dt\n", "if __name__ == \"__main__\":\n", " base_dir = \"/opt/ml/processing\"\n", " #Read Data\n", " df = pd.read_csv(\n", " f\"{base_dir}/input/storedata_total.csv\"\n", " )\n", " # convert created column to datetime\n", " df[\"created\"] = pd.to_datetime(df[\"created\"])\n", " #Convert firstorder and lastorder to datetime datatype\n", " df[\"firstorder\"] = pd.to_datetime(df[\"firstorder\"],errors='coerce')\n", " df[\"lastorder\"] = pd.to_datetime(df[\"lastorder\"],errors='coerce')\n", " #Drop Rows with Null Values\n", " df = df.dropna()\n", " #Create column which gives the days between the last order and the first order\n", " df['first_last_days_diff'] = (df['lastorder'] - df['firstorder']).dt.days\n", " #Create column which gives the days between the customer record was created and the first order\n", " df['created_first_days_diff'] = (df['created'] - df['firstorder']).dt.days\n", " #Drop columns\n", " df.drop(['custid', 'created','firstorder','lastorder'], axis=1, inplace=True)\n", " #Apply one hot encoding on favday and city columns\n", " df = pd.get_dummies(df, prefix=['favday', 'city'], columns=['favday', 'city'])\n", " # Split into train, validation and test datasets\n", " y = df.pop(\"retained\")\n", " X_pre = df\n", " y_pre = y.to_numpy().reshape(len(y), 1)\n", " X = np.concatenate((y_pre, X_pre), axis=1)\n", " np.random.shuffle(X)\n", " # Split in Train, Test and Validation Datasets\n", " train, validation, test = np.split(X, [int(.7*len(X)), int(.85*len(X))])\n", " train_rows = np.shape(train)[0]\n", " validation_rows = np.shape(validation)[0]\n", " test_rows = np.shape(test)[0]\n", " train = pd.DataFrame(train)\n", " test = pd.DataFrame(test)\n", " validation = pd.DataFrame(validation)\n", " # Convert the label column to integer\n", " train[0] = train[0].astype(int)\n", " test[0] = test[0].astype(int)\n", " validation[0] = validation[0].astype(int)\n", " # Save the Dataframes as csv files\n", " train.to_csv(f\"{base_dir}/train/train.csv\", header=False, index=False)\n", " validation.to_csv(f\"{base_dir}/validation/validation.csv\", header=False, index=False)\n", " test.to_csv(f\"{base_dir}/test/test.csv\", header=False, index=False)" ] }, { "cell_type": "code", "execution_count": 5, "id": "f53f8523-f2d1-4d75-b18e-7bcb5fd2c1b4", "metadata": { "tags": [] }, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "/opt/conda/lib/python3.7/site-packages/sagemaker/workflow/pipeline_context.py:261: UserWarning: Running within a PipelineSession, there will be No Wait, No Logs, and No Job being started.\n", " UserWarning,\n" ] } ], "source": [ "# Define Processing Step for Feature Engineering\n", "from sagemaker.sklearn.processing import SKLearnProcessor\n", "from sagemaker.processing import ProcessingInput, ProcessingOutput\n", "from sagemaker.workflow.steps import ProcessingStep\n", "\n", "framework_version = \"1.0-1\"\n", "sklearn_processor = SKLearnProcessor(\n", " framework_version=framework_version,\n", " instance_type=\"ml.m5.xlarge\",\n", " instance_count=processing_instance_count,\n", " base_job_name=\"sklearn-churn-process\",\n", " role=role,\n", " sagemaker_session=pipeline_session,\n", ")\n", "processor_args = sklearn_processor.run(\n", " inputs=[\n", " ProcessingInput(source=input_data, destination=\"/opt/ml/processing/input\"), \n", " ],\n", " outputs=[\n", " ProcessingOutput(output_name=\"train\", source=\"/opt/ml/processing/train\",\\\n", " destination=f\"s3://{default_bucket}/output/train\" ),\n", " ProcessingOutput(output_name=\"validation\", source=\"/opt/ml/processing/validation\",\\\n", " destination=f\"s3://{default_bucket}/output/validation\"),\n", " ProcessingOutput(output_name=\"test\", source=\"/opt/ml/processing/test\",\\\n", " destination=f\"s3://{default_bucket}/output/test\")\n", " ],\n", " code=f\"ML Pipelines scripts/churn_preprocess.py\",\n", ")\n", "step_process = ProcessingStep(name=\"ChurnModelProcess\", step_args=processor_args)" ] }, { "cell_type": "markdown", "id": "966e4ef6-2077-4281-a769-d492e156b7f6", "metadata": {}, "source": [ "### Step 4: Define HyperParameter Tuning Step" ] }, { "cell_type": "code", "execution_count": 6, "id": "a6fe5621-bc56-479f-a4ca-f7636866c0b2", "metadata": { "tags": [] }, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "The input argument instance_type of function (sagemaker.image_uris.retrieve) is a pipeline variable (), which is not allowed. The default_value of this Parameter object will be used to override it. Please make sure the default_value is valid.\n" ] } ], "source": [ "from sagemaker.estimator import Estimator\n", "from sagemaker.inputs import TrainingInput\n", "from sagemaker.tuner import (\n", " IntegerParameter,\n", " CategoricalParameter,\n", " ContinuousParameter,\n", " HyperparameterTuner,\n", ")\n", "from sagemaker.workflow.steps import TuningStep\n", "\n", "# training step for generating model artifacts\n", "model_path = f\"s3://{default_bucket}/output\"\n", "image_uri = sagemaker.image_uris.retrieve(\n", " framework=\"xgboost\",\n", " region=region,\n", " version=\"1.0-1\",\n", " py_version=\"py3\",\n", " instance_type=training_instance_type,\n", ")\n", "fixed_hyperparameters = {\n", "\"eval_metric\":\"auc\",\n", "\"objective\":\"binary:logistic\",\n", "\"num_round\":\"100\",\n", "\"rate_drop\":\"0.3\",\n", "\"tweedie_variance_power\":\"1.4\"\n", "}\n", "xgb_train = Estimator(\n", " image_uri=image_uri,\n", " instance_type=training_instance_type,\n", " instance_count=1,\n", " hyperparameters=fixed_hyperparameters,\n", " output_path=model_path,\n", " base_job_name=f\"churn-train\",\n", " sagemaker_session=pipeline_session,\n", " role=role,\n", ")" ] }, { "cell_type": "code", "execution_count": 7, "id": "9fd27af1-28c8-4af5-a80e-08314ecd9088", "metadata": { "tags": [] }, "outputs": [], "source": [ "hyperparameter_ranges = {\n", "\"eta\": ContinuousParameter(0, 1),\n", "\"min_child_weight\": ContinuousParameter(1, 10),\n", "\"alpha\": ContinuousParameter(0, 2),\n", "\"max_depth\": IntegerParameter(1, 10),\n", "}\n", "objective_metric_name = \"validation:auc\"\n", "\n", "tuner = HyperparameterTuner(\n", " xgb_train,\n", " objective_metric_name,\n", " hyperparameter_ranges,\n", " max_jobs=2,\n", " max_parallel_jobs=2,\n", ")\n", "\n", "hpo_args = tuner.fit(\n", " inputs={\n", " \"train\": TrainingInput(\n", " s3_data=step_process.properties.ProcessingOutputConfig.Outputs[\"train\"].S3Output.S3Uri,\n", " content_type=\"text/csv\",\n", " ),\n", " \"validation\": TrainingInput(\n", " s3_data=step_process.properties.ProcessingOutputConfig.Outputs[\n", " \"validation\"\n", " ].S3Output.S3Uri,\n", " content_type=\"text/csv\",\n", " ),\n", " }\n", ")\n", "\n", "step_tuning = TuningStep(\n", " name=\"ChurnHyperParameterTuning\",\n", " step_args=hpo_args,\n", ")" ] }, { "cell_type": "markdown", "id": "99223372-35b0-424f-982a-2d9e4f442373", "metadata": {}, "source": [ "### Step 5: Define the evaluation script and model evaluation step" ] }, { "cell_type": "code", "execution_count": 8, "id": "175e88f4-6894-42ec-9fda-e7ffc9f5ba41", "metadata": { "tags": [] }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Overwriting ML Pipelines scripts/churn_evaluate.py\n" ] } ], "source": [ "%%writefile \"ML Pipelines scripts/churn_evaluate.py\"\n", "\n", "import json\n", "import pathlib\n", "import pickle\n", "import tarfile\n", "import joblib\n", "import numpy as np\n", "import pandas as pd\n", "import xgboost\n", "import datetime as dt\n", "from sklearn.metrics import roc_curve,auc\n", "if __name__ == \"__main__\": \n", " #Read Model Tar File\n", " model_path = f\"/opt/ml/processing/model/model.tar.gz\"\n", " with tarfile.open(model_path) as tar:\n", " tar.extractall(path=\".\")\n", " model = pickle.load(open(\"xgboost-model\", \"rb\"))\n", " #Read Test Data using which we evaluate the model\n", " test_path = \"/opt/ml/processing/test/test.csv\"\n", " df = pd.read_csv(test_path, header=None)\n", " y_test = df.iloc[:, 0].to_numpy()\n", " df.drop(df.columns[0], axis=1, inplace=True)\n", " X_test = xgboost.DMatrix(df.values)\n", " #Run Predictions\n", " predictions = model.predict(X_test)\n", " #Evaluate Predictions\n", " fpr, tpr, thresholds = roc_curve(y_test, predictions)\n", " auc_score = auc(fpr, tpr)\n", " report_dict = {\n", " \"classification_metrics\": {\n", " \"auc_score\": {\n", " \"value\": auc_score,\n", " },\n", " },\n", " }\n", " #Save Evaluation Report\n", " output_dir = \"/opt/ml/processing/evaluation\"\n", " pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)\n", " evaluation_path = f\"{output_dir}/evaluation.json\"\n", " with open(evaluation_path, \"w\") as f:\n", " f.write(json.dumps(report_dict))" ] }, { "cell_type": "code", "execution_count": 9, "id": "7cedbe3a-9127-44d7-835d-4aa57a06b061", "metadata": { "tags": [] }, "outputs": [], "source": [ "# define model evaluation step to evaluate the trained model\n", "from sagemaker.processing import ScriptProcessor\n", "script_eval = ScriptProcessor(\n", " image_uri=image_uri,\n", " command=[\"python3\"],\n", " instance_type=processing_instance_type,\n", " instance_count=1,\n", " base_job_name=\"script-churn-eval\",\n", " role=role,\n", " sagemaker_session=pipeline_session,\n", ")\n", "eval_args = script_eval.run(\n", " inputs=[\n", " ProcessingInput(\n", " source=step_tuning.get_top_model_s3_uri(top_k=0,s3_bucket=default_bucket,prefix=\"output\"),\n", " destination=\"/opt/ml/processing/model\"\n", " ),\n", " ProcessingInput(\n", " source=step_process.properties.ProcessingOutputConfig.Outputs[\n", " \"test\"\n", " ].S3Output.S3Uri,\n", " destination=\"/opt/ml/processing/test\"\n", " )\n", " ],\n", " outputs=[\n", " ProcessingOutput(output_name=\"evaluation\", source=\"/opt/ml/processing/evaluation\",\\\n", " destination=f\"s3://{default_bucket}/output/evaluation\"),\n", " ],\n", " code=f\"ML Pipelines scripts/churn_evaluate.py\",\n", ")\n", "from sagemaker.workflow.properties import PropertyFile\n", "\n", "evaluation_report = PropertyFile(\n", " name=\"ChurnEvaluationReport\", output_name=\"evaluation\", path=\"evaluation.json\"\n", ")\n", "step_eval = ProcessingStep(\n", " name=\"ChurnEvalModel\",\n", " step_args=eval_args,\n", " property_files=[evaluation_report],\n", ")" ] }, { "cell_type": "markdown", "id": "37520bf7-21fb-44e9-b741-6c195e2803df", "metadata": {}, "source": [ "### Step 6: Define a register model step" ] }, { "cell_type": "code", "execution_count": 14, "id": "f797e1ed-af79-491e-aac5-3d15101e8ec7", "metadata": { "tags": [] }, "outputs": [], "source": [ "from sagemaker import Model\n", "from sagemaker.workflow.model_step import ModelStep\n", "\n", "model = Model(\n", " image_uri=image_uri,\n", " model_data=step_tuning.get_top_model_s3_uri(top_k=0,s3_bucket=default_bucket,prefix=\"output\"),\n", " sagemaker_session=pipeline_session,\n", " role=role,\n", ")\n", "from sagemaker.model_metrics import MetricsSource, ModelMetrics\n", "\n", "model_metrics = ModelMetrics(\n", " model_statistics=MetricsSource(\n", " s3_uri=\"{}/evaluation.json\".format(\n", " step_eval.arguments[\"ProcessingOutputConfig\"][\"Outputs\"][0][\"S3Output\"][\"S3Uri\"]\n", " ),\n", " content_type=\"application/json\",\n", " )\n", ")\n", "register_args = model.register(\n", " content_types=[\"text/csv\"],\n", " response_types=[\"text/csv\"],\n", " inference_instances=[\"ml.t2.medium\", \"ml.m5.xlarge\"],\n", " transform_instances=[\"ml.m5.xlarge\"],\n", " model_package_group_name=model_package_group_name,\n", " approval_status=model_approval_status,\n", " model_metrics=model_metrics,\n", ")\n", "step_register = ModelStep(name=\"ChurnRegisterModel\", step_args=register_args)" ] }, { "cell_type": "markdown", "id": "9e90a2b3-6de9-4e57-8b95-dc2e4c16100b", "metadata": {}, "source": [ "### Step 7: Define a condition step to check AUC score" ] }, { "cell_type": "code", "execution_count": 11, "id": "362a1c48-25c4-4c33-bdbe-13779a9efba4", "metadata": { "tags": [] }, "outputs": [], "source": [ "from sagemaker.workflow.conditions import ConditionGreaterThan\n", "from sagemaker.workflow.condition_step import ConditionStep\n", "from sagemaker.workflow.functions import JsonGet\n", "cond_lte = ConditionGreaterThan(\n", " left=JsonGet(\n", " step_name=step_eval.name,\n", " property_file=evaluation_report,\n", " json_path=\"classification_metrics.auc_score.value\",\n", " ),\n", " right=auc_score_threshold,\n", ")\n", "step_cond = ConditionStep(\n", " name=\"CheckAUCScoreChurnEvaluation\",\n", " conditions=[cond_lte],\n", " if_steps=[step_register],\n", ")" ] }, { "cell_type": "markdown", "id": "cdafd7f0-907b-4b55-9c43-e67888e26cc6", "metadata": {}, "source": [ "## Build and Trigger the pipeline run" ] }, { "cell_type": "markdown", "id": "d648ad67-80e5-404f-b47e-27df019c6d81", "metadata": {}, "source": [ "After defining all of the component steps, you can assemble them into a Pipelines object. You don’t need to specify the order of pipeline because Pipelines automatically infers the order sequence based on the dependencies between the steps." ] }, { "cell_type": "code", "execution_count": 15, "id": "3123803e-f286-4acc-b016-4620eb2017a7", "metadata": { "tags": [] }, "outputs": [], "source": [ "import json\n", "from sagemaker.workflow.pipeline import Pipeline\n", "\n", "pipeline = Pipeline(\n", " name=pipeline_name,\n", " parameters=[\n", " processing_instance_count,\n", " processing_instance_type,\n", " training_instance_type,\n", " model_approval_status,\n", " input_data,\n", " auc_score_threshold,\n", " ],\n", " steps=[step_process, step_tuning, step_eval, step_cond],\n", ") \n", "definition = json.loads(pipeline.definition())\n", "print(definition)" ] }, { "cell_type": "code", "execution_count": 16, "id": "ee7f3338-f9d3-49b5-8d70-9bfdb7676770", "metadata": { "tags": [] }, "outputs": [], "source": [ "# Create a new or update existing Pipeline\n", "pipeline.upsert(role_arn=role)\n", "# start Pipeline execution\n", "pipeline.start()" ] } ], "metadata": { "instance_type": "ml.t3.medium", "kernelspec": { "display_name": "Python 3 (Data Science)", "language": "python", "name": "python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-west-2:236514542706:image/datascience-1.0" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.10" } }, "nbformat": 4, "nbformat_minor": 5 }