{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!pip install -q -U sagemaker" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Using Scikit-Learn Pipelines with Amazon SageMaker\n", "\n", "In this notebook, we will have a look at which features from Amazon SageMaker can help you bring your ML workloads based on Scikit-Learn, and in particular Scikit-Learn Pipelines, to the AWS cloud in order to create scheduled pipelines of preprocessing and training, as well as having endpoints for generating predictions in real-time." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 0 - Generate the dataset\n", "\n", "In this session, we are generating a random dataset for a classification job thanks to the module `sklearn.datasets.make_classification` from the SKLearn library. You can use your own dataset of course: to use the rest of the notebook with minimal changes, make sure your dataset is stored in S3.\n", "\n", "The following cell will create a dataset, generate a few CSV files, and stored them to S3 to the following path:\n", "\n", "> `s3://{bucket}/{prefix}/source/data.csv`\n", "> `s3://{bucket}/{prefix}/train/train.csv`\n", "> `s3://{bucket}/{prefix}/test/test.csv`" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sklearn.datasets import make_classification\n", "from sklearn.model_selection import train_test_split\n", "import pandas as pd\n", "from sagemaker import Session\n", "\n", "session = Session()\n", "bucket = session.default_bucket() # Change to another bucket if running outside of SageMaker\n", "prefix = \"sklearn-pipeline/data\" # Choose your preferred prefix, but keep it consistent\n", "\n", "# Create a random dataset for classification\n", "X, y = make_classification(random_state=42)\n", "data = pd.concat([pd.DataFrame(X), pd.DataFrame(y, columns=[\"y\"])], axis=1)\n", "data.to_csv(\"/tmp/data.csv\", index=False)\n", "X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=0)\n", "train = pd.concat([pd.DataFrame(X_train), pd.DataFrame(y_train, columns=[\"y\"])], axis=1)\n", "test = pd.concat([pd.DataFrame(X_test), pd.DataFrame(y_test, columns=[\"y\"])], axis=1)\n", "# Save locally the files\n", "train.to_csv(\"/tmp/train.csv\", index=False)\n", "test.to_csv(\"/tmp/test.csv\", index=False)\n", "# Upload to S3\n", "data_path = session.upload_data(path=\"/tmp/data.csv\", bucket=bucket, key_prefix=f\"{prefix}/source\")\n", "train_path = session.upload_data(path=\"/tmp/train.csv\", bucket=bucket, key_prefix=f\"{prefix}/train\")\n", "test_path = session.upload_data(path=\"/tmp/test.csv\", bucket=bucket, key_prefix=f\"{prefix}/test\")\n", "\n", "print(data_path)\n", "print(train_path)\n", "print(test_path)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Level 1: SKLearn Pipeline in the Training script\n", "\n", "One of the basic features of Amazon SageMaker is to orchestrate resources for your ML workloads. In particular, with SageMaker Training jobs you can provide your own script from your preferred ML framework (here, SKLearn) and your dataset, and SageMaker will automatically handle the spin-up, storage and spin-down of the resources needed to train the model.\n", "\n", "Thanks to Scikit-Learn Pipelines, it is possible to create a series of steps that are executed as a single object whenever it is called for training (`fit()`) or prediction (`predict()`). The easiest way that you can use an SKLearn Pipeline with Amazon SageMaker, is to consider the pipeline itself as a model, and let it automatically handle the preprocessing during the training step. This means that, when SageMaker wants to train the model, it will execute all the steps of the pipeline, including any preprocessing that's in them.\n", "\n", "This approach is great when:\n", "\n", "- the dataset is not too big\n", "- the person responsible for preprocessing/feature engineering of the data is the same as the one responsible for training the model (AKA: you only have one script)\n", "- you are ok with obtaining only one file containing both model and preprocessing pipeline and do not need two separate ones\n", "\n", "Let's create then a script that does exactly this: it defines a SKLearn Pipeline, made of a `StandardScaler` and a `RandomForestClassifier`. This is a pretty easy pipeline, but you can make it as complicated as you prefer.\n", "\n", "There are a few interesting things to note in the following script:\n", "\n", "- hyperparameters are obtained via `ArgumentParser`;\n", "- some hyperparameters have environmental variables set as default: they are used by SageMaker to understand where to read/write data;\n", "- the core of the training loop is in the `main` function;\n", "- two extra functions - needed by SageMaker - are provided here:\n", " - `model_fn` which serves the purpose of telling SageMaker how to load the model;\n", " - `predict_fn` to override SageMaker default function to generate a prediction;\n", "- apart from the default env vars and the two functions, nothing is custom to SageMaker: in fact in the next step we will see how to locally run this script, regardless of which platform or service we are using." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%writefile sklearn_pipeline.py\n", "\n", "from joblib import dump, load\n", "import pandas as pd, numpy as np, os, argparse\n", "\n", "from sklearn.ensemble import RandomForestClassifier\n", "from sklearn.preprocessing import StandardScaler\n", "from sklearn.pipeline import Pipeline\n", "from sklearn.metrics import accuracy_score\n", "\n", "# inference function - tells SageMaker how to load the model\n", "def model_fn(model_dir):\n", " clf = load(os.path.join(model_dir, \"pipeline.joblib\"))\n", " return clf\n", "\n", "\n", "def predict_fn(input_data, model):\n", " prediction = model.predict(input_data)\n", " return np.array(prediction)\n", "\n", "\n", "# Argument parser\n", "def _parse_args():\n", " parser = argparse.ArgumentParser()\n", " # Hyperparameters\n", " parser.add_argument(\"--n-estimators\", type=int, default=10)\n", " parser.add_argument(\"--min-samples-leaf\", type=int, default=3)\n", " # Data, model, and output directories\n", " parser.add_argument(\"--model-dir\", type=str, default=os.environ.get(\"SM_MODEL_DIR\"))\n", " parser.add_argument(\"--train\", type=str, default=os.environ.get(\"SM_CHANNEL_TRAIN\"))\n", " parser.add_argument(\"--test\", type=str, default=os.environ.get(\"SM_CHANNEL_TEST\"))\n", " parser.add_argument(\"--train-file\", type=str, default=\"train.csv\")\n", " parser.add_argument(\"--test-file\", type=str, default=\"test.csv\")\n", " # Parse the arguments\n", " return parser.parse_known_args()\n", "\n", "\n", "# Main Training Loop\n", "if __name__ == \"__main__\":\n", " # Process arguments\n", " args, _ = _parse_args()\n", " # Load the dataset\n", " train_df = pd.read_csv(os.path.join(args.train, args.train_file))\n", " X_train, y_train = train_df.drop(\"y\", axis=1), train_df.y\n", " test_df = pd.read_csv(os.path.join(args.test, args.test_file))\n", " X_test, y_test = test_df.drop(\"y\", axis=1), test_df.y\n", " # Define the pipeline and train it\n", " pipe = Pipeline(\n", " [\n", " (\"scaler\", StandardScaler()),\n", " (\n", " \"rfc\",\n", " RandomForestClassifier(\n", " n_estimators=args.n_estimators,\n", " min_samples_leaf=args.min_samples_leaf,\n", " ),\n", " ),\n", " ]\n", " )\n", " pipe.fit(X_train, y_train)\n", " # Evaluate the model performances\n", " print(f\"Model Accuracy: {pipe.score(X_test, y_test)}\")\n", " # Store the pipeline\n", " dump(pipe, os.path.join(args.model_dir, \"pipeline.joblib\"))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Local test\n", "\n", "Once the script has been written locally, you can now execute it locally to make sure it works and debug it if necessary." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!python sklearn_pipeline.py --model-dir . --train /tmp/ --test /tmp/" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sklearn_pipeline import model_fn, predict_fn\n", "\n", "pipe = model_fn(\".\")\n", "predict_fn([X[0]], pipe)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "As you can see, a prediction has been correctly generated by our script. We can now train it and deploy it on AWS resources thanks to Amazon SageMaker." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Train and Deploy on SageMaker\n", "\n", "In the following cell, we will create an Object called **Estimator**. A SageMaker Estimator is a way for SageMaker to gather some information about the kind of job that we want to run:\n", "\n", "- in this case, it's a scikit-learn script, so we'll use the `SKLearnEstimator`;\n", "- on how many and which kind of instances we want to use for training;\n", "- which version of the framework;\n", "- the metrics that we want our training to expose;\n", "- the hyperparameters to be used by the training job (remember the `ArgumentParser` from before?)\n", "\n", "Once everything is set up, we can call the `.fit()` function to start training our model on our data stored in S3: we'll provide it via two channels, `train` and `test`." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# We use the Estimator from the SageMaker Python SDK\n", "from sagemaker import get_execution_role\n", "from sagemaker.sklearn.estimator import SKLearn\n", "\n", "FRAMEWORK_VERSION = \"0.23-1\"\n", "\n", "# Define the Estimator from SageMaker (Script Mode)\n", "sklearn_estimator = SKLearn(\n", " entry_point=\"sklearn_pipeline.py\",\n", " role=get_execution_role(),\n", " instance_count=1,\n", " instance_type=\"ml.c5.xlarge\",\n", " framework_version=FRAMEWORK_VERSION,\n", " base_job_name=\"rf-scikit\",\n", " metric_definitions=[{\"Name\": \"model_accuracy\", \"Regex\": \"Model Accuracy: ([0-9.]+).*$\"}],\n", " hyperparameters={\"n-estimators\": 100, \"min-samples-leaf\": 3},\n", ")\n", "\n", "# Train the model (~5 minutes)\n", "sklearn_estimator.fit({\"train\": train_path, \"test\": test_path})" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Once the model has been created, we can very easily deploy it from this estimator. Again, SageMaker needs to know which instances you want your model to be deployed on, and how many of them. This is called a **real-time inference endpoint**, since it's an instance that exposes a DNS endpoint, it's up 24/7 (unless manually stopped) and accepts HTTPS requests. Spinning it up usually requires around 4-5 minutes." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "predictor = sklearn_estimator.deploy(1, \"ml.c4.large\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The SageMaker Python SDK provides a very useful `.predict()` function that allows you to very easily query the endpoint and obtain a prediction in response." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "predictor.predict([X[0]])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Once you're done testing, you can now delete the endpoint, in order not to incur in additional costs; let's also clean some resources from S3 we will not use later." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "predictor.delete_endpoint()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!aws s3 rm s3://$bucket/$prefix/train/ --recursive\n", "!aws s3 rm s3://$bucket/$prefix/test/ --recursive" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Level 2 : Preprocessing with Processing, Training, Transformation & Inference in one Script\n", "\n", "Let's take things a step further. If:\n", "\n", "- the dataset is big enough that processing takes some time and can be improved by horizontal scaling;\n", "- the person responsible for preprocessing/feature engineering of the data is NOT the same as the one responsible for training the model (AKA: you prefer to have multiple scripts, one for training and one for feature engineering);\n", "- you want two separate files, one for the preprocessing pipeline and one for the model itself;\n", "\n", "You can instead opt to choose to separate preprocessing from training, by leveraging other features on the SageMaker platform.\n", "\n", "You can use **Amazon SageMaker Processing** to run steps for data pre-processing or post-processing, feature engineering, data validation, or model evaluation workloads on Amazon SageMaker. Processing jobs accept data from Amazon S3 as input and store data into Amazon S3 as output. In this step, you create your pipeline for preprocessing, then create a pickled version of this pipeline, and store it in S3 to be used later, together with the transformed training dataset.\n", "\n", "Then just like before, you can use the SageMaker Training jobs to train the model and SageMaker inference to run predictions. In the inference phase of course you need the pipeline from the Processing step: you must load it as part of your \"model\" in the `model_fn()` function, so that SageMaker can consider preprocessing + model as a single entity when running predictions." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's do things one step at a time. First, let's create the preprocessing script. In this script, we will have our `sklearn.Pipeline` responsible for preprocessing. No training here! At the end of the script, we `dump` the pipeline to be re-used later." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%writefile preprocessing.py\n", "\n", "from joblib import dump, load\n", "import pandas as pd, numpy as np, os, argparse\n", "\n", "from sklearn.preprocessing import StandardScaler\n", "from sklearn.pipeline import Pipeline\n", "from sklearn.model_selection import train_test_split\n", "\n", "# Argument parser\n", "def _parse_args():\n", " parser = argparse.ArgumentParser()\n", " parser.add_argument(\"--filepath\", type=str, default=\"/opt/ml/processing/input/\")\n", " parser.add_argument(\"--filename\", type=str, default=\"data.csv\")\n", " parser.add_argument(\"--outputpath\", type=str, default=\"/opt/ml/processing/output/\")\n", " # Parse the arguments\n", " return parser.parse_known_args()\n", "\n", "\n", "# Main Training Loop\n", "if __name__ == \"__main__\":\n", " # Process arguments\n", " args, _ = _parse_args()\n", " # Load the dataset\n", " df = pd.read_csv(os.path.join(args.filepath, args.filename))\n", " X, y = df.drop(\"y\", axis=1), df.y\n", " # Define the pipeline and train it\n", " pipe = Pipeline([(\"scaler\", StandardScaler())])\n", " transformed = pipe.fit_transform(X)\n", " # Generate the output files - train and test\n", " output = pd.concat([pd.DataFrame(transformed), y], axis=1)\n", " train, test = train_test_split(output, random_state=42)\n", " train.to_csv(os.path.join(args.outputpath, \"train/train.csv\"), index=False)\n", " test.to_csv(os.path.join(args.outputpath, \"test/test.csv\"), index=False)\n", " # Store the pipeline\n", " dump(pipe, os.path.join(args.outputpath, \"pipeline/preproc-pipeline.joblib\"))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Then, create the script that will be used for both training and inference. Note: the `model_fn()` function from before has changed, since it has to load another file, which is the preprocessing pipeline, along with the actual model itself. Note: here, the model could have been a pipeline as well!" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%writefile training.py\n", "\n", "from sklearn.ensemble import RandomForestClassifier\n", "from sklearn.metrics import accuracy_score\n", "from sklearn.pipeline import Pipeline\n", "from joblib import dump, load\n", "import pandas as pd, numpy as np, os, argparse\n", "from shutil import copy\n", "\n", "# inference function - tells SageMaker how to load the model and do the prediction\n", "def model_fn(model_dir):\n", " preproc = load(os.path.join(model_dir, \"preproc.joblib\"))\n", " model = load(os.path.join(model_dir, \"model.joblib\"))\n", " pipe = Pipeline([(\"preproc\", preproc), (\"model\", model)])\n", " return pipe\n", "\n", "\n", "def predict_fn(input_data, model):\n", " prediction = model.predict(input_data)\n", " return np.array(prediction)\n", "\n", "\n", "# Argument parser\n", "def _parse_args():\n", " parser = argparse.ArgumentParser()\n", " # Hyperparameters\n", " parser.add_argument(\"--n-estimators\", type=int, default=10)\n", " parser.add_argument(\"--min-samples-leaf\", type=int, default=3)\n", " # Data, model, and output directories\n", " parser.add_argument(\"--model-dir\", type=str, default=os.environ.get(\"SM_MODEL_DIR\"))\n", " parser.add_argument(\"--train\", type=str, default=os.environ.get(\"SM_CHANNEL_TRAIN\"))\n", " parser.add_argument(\"--test\", type=str, default=os.environ.get(\"SM_CHANNEL_TEST\"))\n", " parser.add_argument(\"--pipeline\", type=str, default=os.environ.get(\"SM_CHANNEL_PIPELINE\"))\n", " parser.add_argument(\"--train-file\", type=str, default=\"train.csv\")\n", " parser.add_argument(\"--test-file\", type=str, default=\"test.csv\")\n", " parser.add_argument(\"--pipeline-file\", type=str, default=\"preproc-pipeline.joblib\")\n", " # Parse the arguments\n", " return parser.parse_known_args()\n", "\n", "\n", "# Main Training Loop\n", "if __name__ == \"__main__\":\n", " # Process arguments\n", " args, _ = _parse_args()\n", " # Load the dataset\n", " train_df = pd.read_csv(os.path.join(args.train, args.train_file))\n", " test_df = pd.read_csv(os.path.join(args.test, args.test_file))\n", " # Separate X and y\n", " X_train, y_train = train_df.drop(\"y\", axis=1), train_df.y\n", " X_test, y_test = test_df.drop(\"y\", axis=1), test_df.y\n", " # Define the model and train it\n", " model = RandomForestClassifier(\n", " n_estimators=args.n_estimators,\n", " min_samples_leaf=args.min_samples_leaf,\n", " n_jobs=-1,\n", " )\n", " model.fit(X_train, y_train)\n", " # Evaluate the model performances\n", " print(f\"Model Accuracy: {accuracy_score(y_test, model.predict(X_test))}\")\n", " dump(model, os.path.join(args.model_dir, \"model.joblib\"))\n", " copy(\n", " os.path.join(args.pipeline, args.pipeline_file),\n", " os.path.join(args.model_dir, \"preproc.joblib\"),\n", " )" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Local Testing\n", "\n", "Let's test locally to see if everything works alright." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!mkdir ./temp/ ./temp/train/ ./temp/test/ ./temp/pipeline/" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!python preprocessing.py --filepath /tmp/ --filename data.csv --outputpath ./temp/" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!python training.py --train ./temp/train --test ./temp/test --pipeline ./temp/pipeline --model-dir ./temp/" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from training import model_fn, predict_fn\n", "\n", "pipe = model_fn(\"./temp/\")\n", "predict_fn([X[0]], pipe)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Awesome! Now, let's use SageMaker resources instead." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### SageMaker Processing" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.sklearn.processing import SKLearnProcessor\n", "from sagemaker.processing import ProcessingInput, ProcessingOutput\n", "from sagemaker import get_execution_role\n", "\n", "train_path = f\"s3://{bucket}/{prefix}/train\"\n", "test_path = f\"s3://{bucket}/{prefix}/test\"\n", "pipeline_path = f\"s3://{bucket}/{prefix}/pipeline\"\n", "\n", "sklearn_processor = SKLearnProcessor(\n", " framework_version=\"0.23-1\",\n", " role=get_execution_role(),\n", " instance_type=\"ml.m5.large\",\n", " instance_count=1,\n", " base_job_name=\"sklearn-pipeline-processing\",\n", ")\n", "\n", "sklearn_processor.run(\n", " code=\"preprocessing.py\",\n", " inputs=[\n", " ProcessingInput(\n", " source=data_path,\n", " destination=\"/opt/ml/processing/input\",\n", " s3_input_mode=\"File\",\n", " s3_data_distribution_type=\"ShardedByS3Key\",\n", " )\n", " ],\n", " outputs=[\n", " ProcessingOutput(\n", " output_name=\"train_data\",\n", " source=\"/opt/ml/processing/output/train\",\n", " destination=train_path,\n", " ),\n", " ProcessingOutput(\n", " output_name=\"test_data\",\n", " source=\"/opt/ml/processing/output/test\",\n", " destination=test_path,\n", " ),\n", " ProcessingOutput(\n", " output_name=\"pipeline\",\n", " source=\"/opt/ml/processing/output/pipeline\",\n", " destination=pipeline_path,\n", " ),\n", " ],\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!aws s3 ls s3://$bucket/$prefix/ --recursive" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### SageMaker Training" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# We use the Estimator from the SageMaker Python SDK\n", "from sagemaker import get_execution_role\n", "from sagemaker.sklearn.estimator import SKLearn\n", "\n", "FRAMEWORK_VERSION = \"0.23-1\"\n", "\n", "# Define the Estimator from SageMaker (Script Mode)\n", "sklearn_estimator = SKLearn(\n", " entry_point=\"training.py\",\n", " role=get_execution_role(),\n", " instance_count=1,\n", " instance_type=\"ml.c5.xlarge\",\n", " framework_version=FRAMEWORK_VERSION,\n", " base_job_name=\"sklearn-pipeline-training\",\n", " metric_definitions=[{\"Name\": \"model_accuracy\", \"Regex\": \"Model Accuracy: ([0-9.]+).*$\"}],\n", " hyperparameters={\"n-estimators\": 100, \"min-samples-leaf\": 3},\n", ")\n", "\n", "# Train the model (~5 minutes)\n", "sklearn_estimator.fit({\"train\": train_path, \"test\": test_path, \"pipeline\": pipeline_path})" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now that the model has been trained, we can finally check the content of the file stored by SageMaker on S3. We expect to find two files inside the archive:\n", "\n", "- `model.joblib` - the actual RFC model\n", "- `preproc.joblib` - our preprocessing pipeline" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "training_output = sklearn_estimator.model_data\n", "!aws s3 cp $training_output /tmp" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!tar -tf /tmp/model.tar.gz" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Batch Inference with SageMaker Batch Transform\n", "\n", "Now that everything worked, let's use the model and the preprocessing logic to run a prediction against 50% of the dataset (or an external validation dataset). This is good to evaluate performances of the model with a dataset that the model has never seen before. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Sample random set of X dataset (50%)\n", "sampled = data.sample(int(len(data) / 2))\n", "sampled_x = sampled.drop(\"y\", axis=1)\n", "sampled_y = sampled.y\n", "sampled_x.to_csv(\"/tmp/sampled_data.csv\", index=False, header=False)\n", "sampled_x_path = session.upload_data(\n", " \"/tmp/sampled_data.csv\", bucket=bucket, key_prefix=f\"{prefix}/sampled\"\n", ")\n", "sampled_x_path" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In order to do this, we will use yet another feature of Amazon SageMaker called **SageMaker Batch Transform**. With batch transform, you create a batch transform job using a trained model and the dataset, which must be stored in Amazon S3. Amazon SageMaker saves the inferences in an S3 bucket that you specify when you create the batch transform job. Batch transform manages all the resources required to get inferences, including launching instances and deleting them after the batch transform job has completed - just like SageMaker Processing!\n", "\n", "Use batch transform when you:\n", "\n", "- Want to get inferences for an entire dataset and index them to serve inferences in real time\n", "- Don't need a persistent endpoint that applications (for example, web or mobile apps) can call to get inferences (e.g.: scheduled inferences)\n", "- Don't need the sub-second latency that SageMaker hosted endpoints provide" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "transformer = sklearn_estimator.transformer(1, \"ml.m5.large\")\n", "transformer.transform(sampled_x_path, content_type=\"text/csv\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's retrieve the inferences from S3, then check the confusion matrix from the predictions:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!aws s3 cp $transformer.output_path/sampled_data.csv.out /tmp/predictions.out" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import numpy as np\n", "import json\n", "\n", "with open(\"/tmp/predictions.out\", \"r\") as r:\n", " a = r.read()[1:-1].split(\", \")\n", " predictions = [int(numeric_string) for numeric_string in a]\n", " predictions = np.asarray(predictions)\n", "\n", "y_true = sampled_y\n", "\n", "pd.crosstab(\n", " index=y_true.values,\n", " columns=predictions,\n", " rownames=[\"actuals\"],\n", " colnames=[\"predictions\"],\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Level 3 : SageMaker Inference Pipelines\n", "\n", "A more advanced usage of SageMaker resources is to delegate preprocessing and inference to two different containers running sequentially on the same real-time endpoint. This is called Inference Pipeline. It's really powerful and perfect for more advanced logic. If you want to know more about Inference Pipelines and test them out for yourself, check out this blog post: [Preprocess input data before making predictions using Amazon SageMaker inference pipelines and Scikit-learn](https://aws.amazon.com/it/blogs/machine-learning/preprocess-input-data-before-making-predictions-using-amazon-sagemaker-inference-pipelines-and-scikit-learn/)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "instance_type": "ml.t3.medium", "kernelspec": { "display_name": "Python 3 (Data Science)", "language": "python", "name": "python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:eu-west-1:470317259841:image/datascience-1.0" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.10" } }, "nbformat": 4, "nbformat_minor": 4 }