{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Orchestrating Jobs with Amazon SageMaker Model Building Pipelines\n", "\n", "Amazon SageMaker Model Building Pipelines offers machine learning (ML) application developers and operations engineers the ability to orchestrate SageMaker jobs and author reproducible ML pipelines." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Notebook Overview\n", "\n", "This notebook shows how to:\n", "\n", "* Define a set of Pipeline parameters that can be used to parametrize a SageMaker Pipeline.\n", "\n", "* Define a Processing step that performs cleaning, feature engineering, and splitting the input data into train and test data sets.\n", "\n", "* Define a Training step that trains a model on the preprocessed train data set.\n", "\n", "* Define a Processing step that evaluates the trained model's performance on the test dataset.\n", "\n", "* Define a Register Model step that creates a model package from the estimator and model \n", "artifacts used to train the model.\n", "\n", "* Define a Conditional step that measures a condition based on output from prior steps and conditionally executes other steps." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Usecase and Dataset\n", "\n", "In this tutorial, you will assume the role of a machine learning developer working at a bank. You have been asked to develop a machine learning model to predict whether a customer will enroll for a certificate of deposit (CD). The model will be trained on the marketing dataset that contains information on customer demographics, responses to marketing events, and external factors.\n", "\n", "The data has been labeled for your convenience and a column in the dataset identifies whether the customer is enrolled for a product offered by the bank. A version of this dataset is publicly available from the ML repository curated by the University of California, Irvine. This tutorial implements a supervised machine learning model, since the data is labeled. (Unsupervised learning occurs when the datasets are not labeled.)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Pipelines Instance\n", "A SageMaker Pipeline instance is composed of three components:\n", " 1. Name\n", " 2. Parameters\n", " 3. Steps" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Initial set up" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Setup\n", "import boto3\n", "import sagemaker\n", "\n", "region = boto3.Session().region_name\n", "sagemaker_session = sagemaker.session.Session()\n", "role = sagemaker.get_execution_role()\n", "default_bucket = sagemaker_session.default_bucket()\n", "model_package_group_name = f\"DirectMarketingPackageGroupName\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Upload the data into the default bucket. You can select your own data set for the `input_data_uri` as is appropriate." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "local_path = \"bank-additional-full.csv\"\n", "\n", "base_uri = f\"s3://{default_bucket}/marketing\"\n", "input_data_uri = sagemaker.s3.S3Uploader.upload(\n", " local_path=local_path, \n", " desired_s3_uri=base_uri,\n", ")\n", "print(input_data_uri)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Define Parameters to Parametrize Pipeline Execution\n", "\n", "Define Pipeline parameters that you can use to parametrize the pipeline. Parameters enable custom pipeline executions and schedules without having to modify the Pipeline definition.\n", "\n", "The supported parameter types include:\n", "\n", "* `ParameterString` - represents a `str` Python type\n", "* `ParameterInteger` - represents an `int` Python type\n", "* `ParameterFloat` - represents a `float` Python type\n", "\n", "These parameters support providing a default value, which can be overridden on pipeline execution. The default value specified should be an instance of the type of the parameter.\n", "\n", "The parameters defined in this workflow include:\n", "\n", "* `processing_instance_type` - The `ml.*` instance type of the processing job.\n", "* `processing_instance_count` - The instance count of the processing job.\n", "* `training_instance_type` - The `ml.*` instance type of the training job.\n", "* `model_approval_status` - What approval status to register the trained model with for CI/CD purposes ( \"PendingManualApproval\" is the default).\n", "* `input_data` - The S3 bucket URI location of the input data" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Define pipeline parameters\n", "from sagemaker.workflow.parameters import (\n", " ParameterInteger,\n", " ParameterString,\n", ")\n", "\n", "processing_instance_count = ParameterInteger(\n", " name=\"ProcessingInstanceCount\",\n", " default_value=1\n", ")\n", "processing_instance_type = ParameterString(\n", " name=\"ProcessingInstanceType\",\n", " default_value=\"ml.m5.xlarge\"\n", ")\n", "training_instance_type = ParameterString(\n", " name=\"TrainingInstanceType\",\n", " default_value=\"ml.m5.xlarge\"\n", ")\n", "model_approval_status = ParameterString(\n", " name=\"ModelApprovalStatus\",\n", " default_value=\"PendingManualApproval\"\n", ")\n", "input_data = ParameterString(\n", " name=\"InputData\",\n", " default_value=input_data_uri,\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Define a Processing Step for Feature Engineering\n", "\n", "First, develop a preprocessing script that is specified in the Processing step.\n", "\n", "This notebook cell writes a file `preprocessing.py`, which contains the preprocessing script. You can update the script, and rerun this cell to overwrite. The preprocessing script uses `scikit-learn` to do the following:\n", "\n", "* It creates new variables: no_previous_contact, not_working\n", "* It drops columns that are not necessary\n", "* It one-hot encodes categorical variables\n", "\n", "The Processing step executes the script on the input data. The Training step uses the preprocessed training features and labels to train a model. The Evaluation step uses the trained model and preprocessed test features and labels to evaluate the model." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!mkdir -p marketing" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%writefile marketing/preprocessing.py\n", "import argparse\n", "import os\n", "\n", "import numpy as np\n", "import pandas as pd\n", "\n", "# Helper method\n", "def process(df):\n", " # Add two new indicators\n", " df[\"no_previous_contact\"] = (df[\"pdays\"] == 999).astype(int)\n", " df[\"not_working\"] = df[\"job\"].isin([\"student\", \"retired\", \"unemployed\"]).astype(int)\n", " columns = list(df.columns)\n", " \n", " toremove = [\"emp.var.rate\", \"cons.price.idx\", \"cons.conf.idx\", \"euribor3m\", \"nr.employed\"]\n", " columns = [x for x in columns if x not in toremove]\n", " \n", " # Keeping only columns that we need\n", " df = df[columns]\n", " \n", " # One hot encode\n", " df=pd.get_dummies(df)\n", " df = pd.concat([df['y_yes'], df.drop(['y_no', 'y_yes'], axis=1)], axis=1)\n", " df = df.sample(frac=1).reset_index(drop=True)\n", " return df\n", "\n", "if __name__ == \"__main__\":\n", " parser = argparse.ArgumentParser()\n", " parser.add_argument(\"--input-path\", type=str, default=\"/opt/ml/processing\")\n", " args, _ = parser.parse_known_args()\n", " \n", " base_dir = args.input_path\n", "\n", " df = pd.read_csv(\n", " f\"{base_dir}/input/bank-additional-full.csv\",\n", " header=0\n", " )\n", " \n", " # Call the helper method\n", " df = process(df)\n", " \n", " train, validation, test = np.split(df, [int(.7*len(df)), int(.85*len(df))])\n", "\n", " train.to_csv(f\"{base_dir}/train/train.csv\", header=False, index=False)\n", " validation.to_csv(f\"{base_dir}/validation/validation.csv\", header=False, index=False)\n", " test.to_csv(f\"{base_dir}/test/test.csv\", header=False, index=False)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Next, create an instance of an `SKLearnProcessor` processor and use that in our `ProcessingStep`.\n", "\n", "You also specify the `framework_version` to use throughout this notebook.\n", "\n", "Note the `processing_instance_type` and `processing_instance_count` parameters used by the processor instance." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.sklearn.processing import SKLearnProcessor\n", "\n", "\n", "framework_version = \"0.23-1\"\n", "\n", "sklearn_processor = SKLearnProcessor(\n", " framework_version=framework_version,\n", " instance_type=processing_instance_type,\n", " instance_count=processing_instance_count,\n", " base_job_name=\"sklearn-marketing-process\",\n", " role=role,\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Finally, use the processor instance to construct a `ProcessingStep`, along with the input and output channels, and the code that will be executed when the pipeline invokes pipeline execution. This is similar to a processor instance's `run` method in the Python SDK.\n", "\n", "Note the `input_data` parameters passed into `ProcessingStep` is the input data used in the step. This input data is used by the processor instance when it is run.\n", "\n", "Also, note the `\"train_data\"` and `\"test_data\"` named channels specified in the output configuration for the processing job. Step `Properties` can be used in subsequent steps and resolve to their runtime values at execution. Specifically, this usage is called out when you define the training step." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.processing import ProcessingInput, ProcessingOutput\n", "from sagemaker.workflow.steps import ProcessingStep\n", " \n", "\n", "step_process = ProcessingStep(\n", " name=\"MarketingProcess\",\n", " processor=sklearn_processor,\n", " inputs=[\n", " ProcessingInput(source=input_data, destination=\"/opt/ml/processing/input\"), \n", " ],\n", " outputs=[\n", " ProcessingOutput(output_name=\"train\", source=\"/opt/ml/processing/train\"),\n", " ProcessingOutput(output_name=\"validation\", source=\"/opt/ml/processing/validation\"),\n", " ProcessingOutput(output_name=\"test\", source=\"/opt/ml/processing/test\")\n", " ],\n", " code=\"marketing/preprocessing.py\",\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Define a Training Step to Train a Model\n", "\n", "In this section, use Amazon SageMaker's [XGBoost Algorithm](https://docs.aws.amazon.com/sagemaker/latest/dg/xgboost.html) to train on this dataset. Configure an Estimator for the XGBoost algorithm and the input dataset. A typical training script loads data from the input channels, configures training with hyperparameters, trains a model, and saves a model to `model_dir` so that it can be hosted later.\n", "\n", "The model path where the models from training will be saved is also specified.\n", "\n", "Note the `training_instance_type` parameter may be used in multiple places in the pipeline. In this case, the `training_instance_type` is passed into the estimator." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.estimator import Estimator\n", "\n", "model_path = f\"s3://{default_bucket}/MarketingTrain\"\n", "image_uri = sagemaker.image_uris.retrieve(\n", " framework=\"xgboost\",\n", " region=region,\n", " version=\"1.0-1\",\n", " py_version=\"py3\",\n", " instance_type=training_instance_type,\n", ")\n", "xgb_train = Estimator(\n", " image_uri=image_uri,\n", " instance_type=training_instance_type,\n", " instance_count=1,\n", " output_path=model_path,\n", " role=role,\n", ")\n", "xgb_train.set_hyperparameters(\n", " objective=\"binary:logistic\",\n", " num_round=50,\n", " max_depth=5,\n", " eta=0.2,\n", " gamma=4,\n", " min_child_weight=6,\n", " subsample=0.7,\n", " silent=0\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Finally, use the estimator instance to construct a `TrainingStep` as well as the `properties` of the prior `ProcessingStep` used as input in the `TrainingStep` inputs and the code that's executed when the pipeline invokes the pipeline execution. This is similar to an estimator's `fit` method in the Python SDK.\n", "\n", "Pass in the `S3Uri` of the `\"train_data\"` output channel to the `TrainingStep`. Also, use the other `\"test_data\"` output channel for model evaluation in the pipeline. The `properties` attribute of a Pipeline step matches the object model of the corresponding response of a describe call. These properties can be referenced as placeholder values and are resolved at runtime. For example, the `ProcessingStep` `properties` attribute matches the object model of the [DescribeProcessingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeProcessingJob.html) response object." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.inputs import TrainingInput\n", "from sagemaker.workflow.steps import TrainingStep\n", "\n", "\n", "step_train = TrainingStep(\n", " name=\"MarketingTrain\",\n", " estimator=xgb_train,\n", " inputs={\n", " \"train\": TrainingInput(\n", " s3_data=step_process.properties.ProcessingOutputConfig.Outputs[\n", " \"train\"\n", " ].S3Output.S3Uri,\n", " content_type=\"text/csv\"\n", " ),\n", " \"validation\": TrainingInput(\n", " s3_data=step_process.properties.ProcessingOutputConfig.Outputs[\n", " \"validation\"\n", " ].S3Output.S3Uri,\n", " content_type=\"text/csv\"\n", " )\n", " },\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Define a Model Evaluation Step to Evaluate the Trained Model\n", "\n", "First, develop an evaluation script that is specified in a Processing step that performs the model evaluation.\n", "\n", "After pipeline execution, you can examine the resulting `evaluation.json` for analysis.\n", "\n", "The evaluation script uses `xgboost` to do the following:\n", "\n", "* Load the model.\n", "* Read the test data.\n", "* Issue predictions against the test data.\n", "* Build a classification report, including accuracy and ROC curve.\n", "* Save the evaluation report to the evaluation directory." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%writefile marketing/evaluation.py\n", "import os\n", "import json\n", "import pathlib\n", "import pickle\n", "import tarfile\n", "\n", "import pandas as pd\n", "import xgboost\n", "\n", "import joblib\n", "import numpy as np\n", "import pandas as pd\n", "import xgboost\n", "\n", "from sklearn.metrics import accuracy_score, classification_report, roc_auc_score\n", "\n", "if __name__ == \"__main__\":\n", " model_path = f\"/opt/ml/processing/model/model.tar.gz\"\n", " with tarfile.open(model_path) as tar:\n", " tar.extractall(path=\".\")\n", " \n", " model = pickle.load(open(\"xgboost-model\", \"rb\"))\n", "\n", " print(\"Loading test input data\")\n", " test_path = \"/opt/ml/processing/test/test.csv\"\n", " df = pd.read_csv(test_path, header=None)\n", " \n", " y_test = df.iloc[:, 0].to_numpy()\n", " df.drop(df.columns[0], axis=1, inplace=True)\n", " \n", " X_test = xgboost.DMatrix(df.values)\n", " \n", " predictions = model.predict(X_test)\n", "\n", " print(\"Creating classification evaluation report\")\n", " acc = accuracy_score(y_test, predictions.round())\n", " auc = roc_auc_score(y_test, predictions.round())\n", " \n", " # The metrics reported can change based on the model used, \n", " # but it must be a specific name per \n", " # (https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-model-quality-metrics.html)\n", " report_dict = {\n", " \"binary_classification_metrics\": {\n", " \"accuracy\": {\n", " \"value\": acc,\n", " \"standard_deviation\": \"NaN\",\n", " },\n", " \"auc\": {\"value\": auc, \"standard_deviation\": \"NaN\"},\n", " },\n", " }\n", " \n", " print(\"Classification report:\\n{}\".format(report_dict))\n", "\n", " evaluation_output_path = os.path.join(\"/opt/ml/processing/evaluation\", \"evaluation.json\")\n", " print(\"Saving classification report to {}\".format(evaluation_output_path))\n", "\n", " with open(evaluation_output_path, \"w\") as f:\n", " f.write(json.dumps(report_dict))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Next, create an instance of a `ScriptProcessor` processor and use it in the `ProcessingStep`.\n", "\n", "Note the `processing_instance_type` parameter passed into the processor." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.processing import ScriptProcessor\n", "\n", "\n", "script_eval = ScriptProcessor(\n", " image_uri=image_uri,\n", " command=[\"python3\"],\n", " instance_type=processing_instance_type,\n", " instance_count=1,\n", " base_job_name=\"script-marketing-eval\",\n", " role=role,\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Use the processor instance to construct a `ProcessingStep`, along with the input and output channels and the code that will be executed when the pipeline invokes pipeline execution. This is similar to a processor instance's `run` method in the Python SDK.\n", "\n", "Specifically, the `S3ModelArtifacts` from the `step_train` `properties` and the `S3Uri` of the `\"test_data\"` output channel of the `step_process` `properties` are passed into the inputs. The `TrainingStep` and `ProcessingStep` `properties` attribute matches the object model of the [DescribeTrainingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeTrainingJob.html) and [DescribeProcessingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeProcessingJob.html) response objects, respectively." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.workflow.properties import PropertyFile\n", "\n", "\n", "evaluation_report = PropertyFile(\n", " name=\"EvaluationReport\",\n", " output_name=\"evaluation\",\n", " path=\"evaluation.json\"\n", ")\n", "step_eval = ProcessingStep(\n", " name=\"MarketingEval\",\n", " processor=script_eval,\n", " inputs=[\n", " ProcessingInput(\n", " source=step_train.properties.ModelArtifacts.S3ModelArtifacts,\n", " destination=\"/opt/ml/processing/model\"\n", " ),\n", " ProcessingInput(\n", " source=step_process.properties.ProcessingOutputConfig.Outputs[\n", " \"test\"\n", " ].S3Output.S3Uri,\n", " destination=\"/opt/ml/processing/test\"\n", " )\n", " ],\n", " outputs=[\n", " ProcessingOutput(output_name=\"evaluation\", source=\"/opt/ml/processing/evaluation\"),\n", " ],\n", " code=\"marketing/evaluation.py\",\n", " property_files=[evaluation_report],\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Define a Register Model Step to Create a Model Package\n", "\n", "Use the estimator instance specified in the training step to construct an instance of `RegisterModel`. The result of executing `RegisterModel` in a pipeline is a model package. A model package is a reusable model artifacts abstraction that packages all ingredients required for inference. Primarily, it consists of an inference specification that defines the inference image to use along with an optional model weights location.\n", "\n", "A model package group is a collection of model packages. A model package group can be created for a specific ML business problem, and new versions of the model packages can be added to it. Typically, customers are expected to create a ModelPackageGroup for a SageMaker pipeline so that model package versions can be added to the group for every SageMaker Pipeline run.\n", "\n", "The construction of `RegisterModel` is similar to an estimator instance's `register` method in the Python SDK.\n", "\n", "Specifically, pass in the `S3ModelArtifacts` from the `TrainingStep`, `step_train` properties. The `TrainingStep` `properties` attribute matches the object model of the [DescribeTrainingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeTrainingJob.html) response object.\n", "\n", "Note that the specific model package group name provided in this notebook can be used in the model registry and CI/CD work with SageMaker Projects." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.model_metrics import MetricsSource, ModelMetrics \n", "from sagemaker.workflow.step_collections import RegisterModel\n", "\n", "\n", "model_metrics = ModelMetrics(\n", " model_statistics=MetricsSource(\n", " s3_uri=\"{}/evaluation.json\".format(\n", " step_eval.arguments[\"ProcessingOutputConfig\"][\"Outputs\"][0][\"S3Output\"][\"S3Uri\"]\n", " ),\n", " content_type=\"application/json\"\n", " )\n", ")\n", "step_register = RegisterModel(\n", " name=\"MarketingRegisterModel\",\n", " estimator=xgb_train,\n", " model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,\n", " content_types=[\"text/csv\"],\n", " response_types=[\"text/csv\"],\n", " inference_instances=[\"ml.t2.medium\", \"ml.m5.xlarge\"],\n", " transform_instances=[\"ml.m5.xlarge\"],\n", " model_package_group_name=model_package_group_name,\n", " approval_status=model_approval_status,\n", " model_metrics=model_metrics,\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Define a Condition Step to Check Accuracy and Conditionally Register a Model in the Model Registry\n", "\n", "In this step, the model is registered only if the accuracy of the model, as determined by the evaluation step `step_eval`, exceeded a specified value. A `ConditionStep` enables pipelines to support conditional execution in the pipeline DAG based on the conditions of the step properties. \n", "\n", "In the following section, you:\n", "\n", "* Define a `ConditionLessThanOrEqualTo` on the accuracy value found in the output of the evaluation step, `step_eval`.\n", "* Use the condition in the list of conditions in a `ConditionStep`.\n", "* Pass the `RegisterModel` step collection into the `if_steps` of the `ConditionStep`, which are only executed, if the condition evaluates to `True`." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo\n", "from sagemaker.workflow.condition_step import (\n", " ConditionStep,\n", " JsonGet,\n", ")\n", "\n", "cond_lte = ConditionGreaterThanOrEqualTo( # You can change the condition here\n", " left=JsonGet(\n", " step=step_eval,\n", " property_file=evaluation_report,\n", " json_path=\"binary_classification_metrics.accuracy.value\", # This should follow the structure of your report_dict defined in the evaluate.py file.\n", " ),\n", " right=0.8, # You can change the threshold here\n", ")\n", "\n", "step_cond = ConditionStep(\n", " name=\"MarketingAccuracyCond\",\n", " conditions=[cond_lte],\n", " if_steps=[step_register],\n", " else_steps=[]\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Define a Pipeline of Parameters, Steps, and Conditions\n", "\n", "In this section, combine the steps into a Pipeline so it can be executed.\n", "\n", "A pipeline requires a `name`, `parameters`, and `steps`. Names must be unique within an `(account, region)` pair.\n", "\n", "Note:\n", "\n", "* All of the parameters used in the definitions must be present.\n", "* Steps passed into the pipeline do not have to be listed in the order of execution. The SageMaker Pipeline service resolves the _data dependency_ DAG as steps for the execution to complete.\n", "* Steps must be unique to across the pipeline step list and all condition step if/else lists." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.workflow.pipeline import Pipeline\n", "\n", "\n", "pipeline_name = f\"MarketingPipeline\"\n", "pipeline = Pipeline(\n", " name=pipeline_name,\n", " parameters=[\n", " processing_instance_type, \n", " processing_instance_count,\n", " training_instance_type,\n", " model_approval_status,\n", " input_data,\n", " ],\n", " steps=[step_process, step_train, step_eval, step_cond],\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### (Optional) Examining the pipeline definition\n", "\n", "The JSON of the pipeline definition can be examined to confirm the pipeline is well-defined and the parameters and step properties resolve correctly." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import json\n", "\n", "definition = json.loads(pipeline.definition())\n", "definition" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Submit the pipeline to SageMaker and start execution\n", "\n", "Submit the pipeline definition to the Pipeline service. The role passed in will be used by the Pipeline service to create all the jobs defined in the steps." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pipeline.upsert(role_arn=role)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Start the pipeline and accept all of the default parameters." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "execution = pipeline.start()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Pipeline Operations: Examining and Waiting for Pipeline Execution\n", "\n", "Describe the pipeline execution." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "execution.describe()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Wait for the execution to complete." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "execution.wait()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "List the steps in the execution. These are the steps in the pipeline that have been resolved by the step executor service." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "execution.list_steps()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Examining the Evalution\n", "\n", "Examine the resulting model evaluation after the pipeline completes. Download the resulting `evaluation.json` file from S3 and print the report." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from pprint import pprint\n", "\n", "\n", "evaluation_json = sagemaker.s3.S3Downloader.read_file(\"{}/evaluation.json\".format(\n", " step_eval.arguments[\"ProcessingOutputConfig\"][\"Outputs\"][0][\"S3Output\"][\"S3Uri\"]\n", "))\n", "pprint(json.loads(evaluation_json))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Lineage\n", "\n", "Review the lineage of the artifacts generated by the pipeline." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import time\n", "from sagemaker.lineage.visualizer import LineageTableVisualizer\n", "\n", "\n", "viz = LineageTableVisualizer(sagemaker.session.Session())\n", "for execution_step in reversed(execution.list_steps()):\n", " print(execution_step)\n", " display(viz.show(pipeline_execution_step=execution_step))\n", " time.sleep(5)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Parametrized Executions\n", "\n", "You can run additional executions of the pipeline and specify different pipeline parameters. The parameters argument is a dictionary containing parameter names, and where the values are used to override the defaults values.\n", "\n", "Based on the performance of the model, you might want to kick off another pipeline execution on a compute-optimized instance type and set the model approval status to \"Approved\" automatically. This means that the model package version generated by the `RegisterModel` step is automatically ready for deployment through CI/CD pipelines, such as with SageMaker Projects." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "execution = pipeline.start(\n", " parameters=dict(\n", " ProcessingInstanceType=\"ml.c5.xlarge\",\n", " ModelApprovalStatus=\"Approved\",\n", " )\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "execution.wait()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "execution.list_steps()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "instance_type": "ml.t3.medium", "kernelspec": { "display_name": "Python 3 (Data Science)", "language": "python", "name": "python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-east-1:081325390199:image/datascience-1.0" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.10" } }, "nbformat": 4, "nbformat_minor": 4 }