In [None]:
!pip install -q -U sagemaker

# Using Scikit-Learn Pipelines with Amazon SageMaker

In this notebook, we will have a look at which features from Amazon SageMaker can help you bring your ML workloads based on Scikit-Learn, and in particular Scikit-Learn Pipelines, to the AWS cloud in order to create scheduled pipelines of preprocessing and training, as well as having endpoints for generating predictions in real-time.

## Step 0 - Generate the dataset

In this session, we are generating a random dataset for a classification job thanks to the module `sklearn.datasets.make_classification` from the SKLearn library. You can use your own dataset of course: to use the rest of the notebook with minimal changes, make sure your dataset is stored in S3.

The following cell will create a dataset, generate a few CSV files, and stored them to S3 to the following path:

> `s3://{bucket}/{prefix}/source/data.csv`
> `s3://{bucket}/{prefix}/train/train.csv`
> `s3://{bucket}/{prefix}/test/test.csv`

In [None]:
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
import pandas as pd
from sagemaker import Session

session = Session()
bucket = session.default_bucket() # Change to another bucket if running outside of SageMaker
prefix = "sklearn-pipeline/data" # Choose your preferred prefix, but keep it consistent

# Create a random dataset for classification
X, y = make_classification(random_state=42)
data = pd.concat([pd.DataFrame(X), pd.DataFrame(y, columns=["y"])], axis=1)
data.to_csv("/tmp/data.csv", index=False)
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=0)
train = pd.concat([pd.DataFrame(X_train), pd.DataFrame(y_train, columns=["y"])], axis=1)
test = pd.concat([pd.DataFrame(X_test), pd.DataFrame(y_test, columns=["y"])], axis=1)
# Save locally the files
train.to_csv("/tmp/train.csv", index=False)
test.to_csv("/tmp/test.csv", index=False)
# Upload to S3
data_path = session.upload_data(path="/tmp/data.csv", bucket=bucket, key_prefix=f"{prefix}/source")
train_path = session.upload_data(path="/tmp/train.csv", bucket=bucket, key_prefix=f"{prefix}/train")
test_path = session.upload_data(path="/tmp/test.csv", bucket=bucket, key_prefix=f"{prefix}/test")

print(data_path)
print(train_path)
print(test_path)

# Level 1: SKLearn Pipeline in the Training script

One of the basic features of Amazon SageMaker is to orchestrate resources for your ML workloads. In particular, with SageMaker Training jobs you can provide your own script from your preferred ML framework (here, SKLearn) and your dataset, and SageMaker will automatically handle the spin-up, storage and spin-down of the resources needed to train the model.

Thanks to Scikit-Learn Pipelines, it is possible to create a series of steps that are executed as a single object whenever it is called for training (`fit()`) or prediction (`predict()`). The easiest way that you can use an SKLearn Pipeline with Amazon SageMaker, is to consider the pipeline itself as a model, and let it automatically handle the preprocessing during the training step. This means that, when SageMaker wants to train the model, it will execute all the steps of the pipeline, including any preprocessing that's in them.

This approach is great when:

- the dataset is not too big
- the person responsible for preprocessing/feature engineering of the data is the same as the one responsible for training the model (AKA: you only have one script)
- you are ok with obtaining only one file containing both model and preprocessing pipeline and do not need two separate ones

Let's create then a script that does exactly this: it defines a SKLearn Pipeline, made of a `StandardScaler` and a `RandomForestClassifier`. This is a pretty easy pipeline, but you can make it as complicated as you prefer.

There are a few interesting things to note in the following script:

- hyperparameters are obtained via `ArgumentParser`;
- some hyperparameters have environmental variables set as default: they are used by SageMaker to understand where to read/write data;
- the core of the training loop is in the `main` function;
- two extra functions - needed by SageMaker - are provided here:
 - `model_fn` which serves the purpose of telling SageMaker how to load the model;
 - `predict_fn` to override SageMaker default function to generate a prediction;
- apart from the default env vars and the two functions, nothing is custom to SageMaker: in fact in the next step we will see how to locally run this script, regardless of which platform or service we are using.

In [None]:
%%writefile sklearn_pipeline.py

from joblib import dump, load
import pandas as pd, numpy as np, os, argparse

from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.metrics import accuracy_score

# inference function - tells SageMaker how to load the model
def model_fn(model_dir):
 clf = load(os.path.join(model_dir, "pipeline.joblib"))
 return clf


def predict_fn(input_data, model):
 prediction = model.predict(input_data)
 return np.array(prediction)


# Argument parser
def _parse_args():
 parser = argparse.ArgumentParser()
 # Hyperparameters
 parser.add_argument("--n-estimators", type=int, default=10)
 parser.add_argument("--min-samples-leaf", type=int, default=3)
 # Data, model, and output directories
 parser.add_argument("--model-dir", type=str, default=os.environ.get("SM_MODEL_DIR"))
 parser.add_argument("--train", type=str, default=os.environ.get("SM_CHANNEL_TRAIN"))
 parser.add_argument("--test", type=str, default=os.environ.get("SM_CHANNEL_TEST"))
 parser.add_argument("--train-file", type=str, default="train.csv")
 parser.add_argument("--test-file", type=str, default="test.csv")
 # Parse the arguments
 return parser.parse_known_args()


# Main Training Loop
if __name__ == "__main__":
 # Process arguments
 args, _ = _parse_args()
 # Load the dataset
 train_df = pd.read_csv(os.path.join(args.train, args.train_file))
 X_train, y_train = train_df.drop("y", axis=1), train_df.y
 test_df = pd.read_csv(os.path.join(args.test, args.test_file))
 X_test, y_test = test_df.drop("y", axis=1), test_df.y
 # Define the pipeline and train it
 pipe = Pipeline(
 [
 ("scaler", StandardScaler()),
 (
 "rfc",
 RandomForestClassifier(
 n_estimators=args.n_estimators,
 min_samples_leaf=args.min_samples_leaf,
 ),
 ),
 ]
 )
 pipe.fit(X_train, y_train)
 # Evaluate the model performances
 print(f"Model Accuracy: {pipe.score(X_test, y_test)}")
 # Store the pipeline
 dump(pipe, os.path.join(args.model_dir, "pipeline.joblib"))

### Local test

Once the script has been written locally, you can now execute it locally to make sure it works and debug it if necessary.

In [None]:
!python sklearn_pipeline.py --model-dir . --train /tmp/ --test /tmp/

In [None]:
from sklearn_pipeline import model_fn, predict_fn

pipe = model_fn(".")
predict_fn([X[0]], pipe)

As you can see, a prediction has been correctly generated by our script. We can now train it and deploy it on AWS resources thanks to Amazon SageMaker.

### Train and Deploy on SageMaker

In the following cell, we will create an Object called **Estimator**. A SageMaker Estimator is a way for SageMaker to gather some information about the kind of job that we want to run:

- in this case, it's a scikit-learn script, so we'll use the `SKLearnEstimator`;
- on how many and which kind of instances we want to use for training;
- which version of the framework;
- the metrics that we want our training to expose;
- the hyperparameters to be used by the training job (remember the `ArgumentParser` from before?)

Once everything is set up, we can call the `.fit()` function to start training our model on our data stored in S3: we'll provide it via two channels, `train` and `test`.

In [None]:
# We use the Estimator from the SageMaker Python SDK
from sagemaker import get_execution_role
from sagemaker.sklearn.estimator import SKLearn

FRAMEWORK_VERSION = "0.23-1"

# Define the Estimator from SageMaker (Script Mode)
sklearn_estimator = SKLearn(
 entry_point="sklearn_pipeline.py",
 role=get_execution_role(),
 instance_count=1,
 instance_type="ml.c5.xlarge",
 framework_version=FRAMEWORK_VERSION,
 base_job_name="rf-scikit",
 metric_definitions=[{"Name": "model_accuracy", "Regex": "Model Accuracy: ([0-9.]+).*$"}],
 hyperparameters={"n-estimators": 100, "min-samples-leaf": 3},
)

# Train the model (~5 minutes)
sklearn_estimator.fit({"train": train_path, "test": test_path})

Once the model has been created, we can very easily deploy it from this estimator. Again, SageMaker needs to know which instances you want your model to be deployed on, and how many of them. This is called a **real-time inference endpoint**, since it's an instance that exposes a DNS endpoint, it's up 24/7 (unless manually stopped) and accepts HTTPS requests. Spinning it up usually requires around 4-5 minutes.

In [None]:
predictor = sklearn_estimator.deploy(1, "ml.c4.large")

The SageMaker Python SDK provides a very useful `.predict()` function that allows you to very easily query the endpoint and obtain a prediction in response.

In [None]:
predictor.predict([X[0]])

Once you're done testing, you can now delete the endpoint, in order not to incur in additional costs; let's also clean some resources from S3 we will not use later.

In [None]:
predictor.delete_endpoint()

In [None]:
!aws s3 rm s3://$bucket/$prefix/train/ --recursive
!aws s3 rm s3://$bucket/$prefix/test/ --recursive

# Level 2 : Preprocessing with Processing, Training, Transformation & Inference in one Script

Let's take things a step further. If:

- the dataset is big enough that processing takes some time and can be improved by horizontal scaling;
- the person responsible for preprocessing/feature engineering of the data is NOT the same as the one responsible for training the model (AKA: you prefer to have multiple scripts, one for training and one for feature engineering);
- you want two separate files, one for the preprocessing pipeline and one for the model itself;

You can instead opt to choose to separate preprocessing from training, by leveraging other features on the SageMaker platform.

You can use **Amazon SageMaker Processing** to run steps for data pre-processing or post-processing, feature engineering, data validation, or model evaluation workloads on Amazon SageMaker. Processing jobs accept data from Amazon S3 as input and store data into Amazon S3 as output. In this step, you create your pipeline for preprocessing, then create a pickled version of this pipeline, and store it in S3 to be used later, together with the transformed training dataset.

Then just like before, you can use the SageMaker Training jobs to train the model and SageMaker inference to run predictions. In the inference phase of course you need the pipeline from the Processing step: you must load it as part of your "model" in the `model_fn()` function, so that SageMaker can consider preprocessing + model as a single entity when running predictions.

Let's do things one step at a time. First, let's create the preprocessing script. In this script, we will have our `sklearn.Pipeline` responsible for preprocessing. No training here! At the end of the script, we `dump` the pipeline to be re-used later.

In [None]:
%%writefile preprocessing.py

from joblib import dump, load
import pandas as pd, numpy as np, os, argparse

from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.model_selection import train_test_split

# Argument parser
def _parse_args():
 parser = argparse.ArgumentParser()
 parser.add_argument("--filepath", type=str, default="/opt/ml/processing/input/")
 parser.add_argument("--filename", type=str, default="data.csv")
 parser.add_argument("--outputpath", type=str, default="/opt/ml/processing/output/")
 # Parse the arguments
 return parser.parse_known_args()


# Main Training Loop
if __name__ == "__main__":
 # Process arguments
 args, _ = _parse_args()
 # Load the dataset
 df = pd.read_csv(os.path.join(args.filepath, args.filename))
 X, y = df.drop("y", axis=1), df.y
 # Define the pipeline and train it
 pipe = Pipeline([("scaler", StandardScaler())])
 transformed = pipe.fit_transform(X)
 # Generate the output files - train and test
 output = pd.concat([pd.DataFrame(transformed), y], axis=1)
 train, test = train_test_split(output, random_state=42)
 train.to_csv(os.path.join(args.outputpath, "train/train.csv"), index=False)
 test.to_csv(os.path.join(args.outputpath, "test/test.csv"), index=False)
 # Store the pipeline
 dump(pipe, os.path.join(args.outputpath, "pipeline/preproc-pipeline.joblib"))

Then, create the script that will be used for both training and inference. Note: the `model_fn()` function from before has changed, since it has to load another file, which is the preprocessing pipeline, along with the actual model itself. Note: here, the model could have been a pipeline as well!

In [None]:
%%writefile training.py

from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
from sklearn.pipeline import Pipeline
from joblib import dump, load
import pandas as pd, numpy as np, os, argparse
from shutil import copy

# inference function - tells SageMaker how to load the model and do the prediction
def model_fn(model_dir):
 preproc = load(os.path.join(model_dir, "preproc.joblib"))
 model = load(os.path.join(model_dir, "model.joblib"))
 pipe = Pipeline([("preproc", preproc), ("model", model)])
 return pipe


def predict_fn(input_data, model):
 prediction = model.predict(input_data)
 return np.array(prediction)


# Argument parser
def _parse_args():
 parser = argparse.ArgumentParser()
 # Hyperparameters
 parser.add_argument("--n-estimators", type=int, default=10)
 parser.add_argument("--min-samples-leaf", type=int, default=3)
 # Data, model, and output directories
 parser.add_argument("--model-dir", type=str, default=os.environ.get("SM_MODEL_DIR"))
 parser.add_argument("--train", type=str, default=os.environ.get("SM_CHANNEL_TRAIN"))
 parser.add_argument("--test", type=str, default=os.environ.get("SM_CHANNEL_TEST"))
 parser.add_argument("--pipeline", type=str, default=os.environ.get("SM_CHANNEL_PIPELINE"))
 parser.add_argument("--train-file", type=str, default="train.csv")
 parser.add_argument("--test-file", type=str, default="test.csv")
 parser.add_argument("--pipeline-file", type=str, default="preproc-pipeline.joblib")
 # Parse the arguments
 return parser.parse_known_args()


# Main Training Loop
if __name__ == "__main__":
 # Process arguments
 args, _ = _parse_args()
 # Load the dataset
 train_df = pd.read_csv(os.path.join(args.train, args.train_file))
 test_df = pd.read_csv(os.path.join(args.test, args.test_file))
 # Separate X and y
 X_train, y_train = train_df.drop("y", axis=1), train_df.y
 X_test, y_test = test_df.drop("y", axis=1), test_df.y
 # Define the model and train it
 model = RandomForestClassifier(
 n_estimators=args.n_estimators,
 min_samples_leaf=args.min_samples_leaf,
 n_jobs=-1,
 )
 model.fit(X_train, y_train)
 # Evaluate the model performances
 print(f"Model Accuracy: {accuracy_score(y_test, model.predict(X_test))}")
 dump(model, os.path.join(args.model_dir, "model.joblib"))
 copy(
 os.path.join(args.pipeline, args.pipeline_file),
 os.path.join(args.model_dir, "preproc.joblib"),
 )

### Local Testing

Let's test locally to see if everything works alright.

In [None]:
!mkdir ./temp/ ./temp/train/ ./temp/test/ ./temp/pipeline/

In [None]:
!python preprocessing.py --filepath /tmp/ --filename data.csv --outputpath ./temp/

In [None]:
!python training.py --train ./temp/train --test ./temp/test --pipeline ./temp/pipeline --model-dir ./temp/

In [None]:
from training import model_fn, predict_fn

pipe = model_fn("./temp/")
predict_fn([X[0]], pipe)

Awesome! Now, let's use SageMaker resources instead.

### SageMaker Processing

In [None]:
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker import get_execution_role

train_path = f"s3://{bucket}/{prefix}/train"
test_path = f"s3://{bucket}/{prefix}/test"
pipeline_path = f"s3://{bucket}/{prefix}/pipeline"

sklearn_processor = SKLearnProcessor(
 framework_version="0.23-1",
 role=get_execution_role(),
 instance_type="ml.m5.large",
 instance_count=1,
 base_job_name="sklearn-pipeline-processing",
)

sklearn_processor.run(
 code="preprocessing.py",
 inputs=[
 ProcessingInput(
 source=data_path,
 destination="/opt/ml/processing/input",
 s3_input_mode="File",
 s3_data_distribution_type="ShardedByS3Key",
 )
 ],
 outputs=[
 ProcessingOutput(
 output_name="train_data",
 source="/opt/ml/processing/output/train",
 destination=train_path,
 ),
 ProcessingOutput(
 output_name="test_data",
 source="/opt/ml/processing/output/test",
 destination=test_path,
 ),
 ProcessingOutput(
 output_name="pipeline",
 source="/opt/ml/processing/output/pipeline",
 destination=pipeline_path,
 ),
 ],
)

In [None]:
!aws s3 ls s3://$bucket/$prefix/ --recursive

### SageMaker Training

In [None]:
# We use the Estimator from the SageMaker Python SDK
from sagemaker import get_execution_role
from sagemaker.sklearn.estimator import SKLearn

FRAMEWORK_VERSION = "0.23-1"

# Define the Estimator from SageMaker (Script Mode)
sklearn_estimator = SKLearn(
 entry_point="training.py",
 role=get_execution_role(),
 instance_count=1,
 instance_type="ml.c5.xlarge",
 framework_version=FRAMEWORK_VERSION,
 base_job_name="sklearn-pipeline-training",
 metric_definitions=[{"Name": "model_accuracy", "Regex": "Model Accuracy: ([0-9.]+).*$"}],
 hyperparameters={"n-estimators": 100, "min-samples-leaf": 3},
)

# Train the model (~5 minutes)
sklearn_estimator.fit({"train": train_path, "test": test_path, "pipeline": pipeline_path})

Now that the model has been trained, we can finally check the content of the file stored by SageMaker on S3. We expect to find two files inside the archive:

- `model.joblib` - the actual RFC model
- `preproc.joblib` - our preprocessing pipeline

In [None]:
training_output = sklearn_estimator.model_data
!aws s3 cp $training_output /tmp

In [None]:
!tar -tf /tmp/model.tar.gz

### Batch Inference with SageMaker Batch Transform

Now that everything worked, let's use the model and the preprocessing logic to run a prediction against 50% of the dataset (or an external validation dataset). This is good to evaluate performances of the model with a dataset that the model has never seen before. 

In [None]:
# Sample random set of X dataset (50%)
sampled = data.sample(int(len(data) / 2))
sampled_x = sampled.drop("y", axis=1)
sampled_y = sampled.y
sampled_x.to_csv("/tmp/sampled_data.csv", index=False, header=False)
sampled_x_path = session.upload_data(
 "/tmp/sampled_data.csv", bucket=bucket, key_prefix=f"{prefix}/sampled"
)
sampled_x_path

In order to do this, we will use yet another feature of Amazon SageMaker called **SageMaker Batch Transform**. With batch transform, you create a batch transform job using a trained model and the dataset, which must be stored in Amazon S3. Amazon SageMaker saves the inferences in an S3 bucket that you specify when you create the batch transform job. Batch transform manages all the resources required to get inferences, including launching instances and deleting them after the batch transform job has completed - just like SageMaker Processing!

Use batch transform when you:

- Want to get inferences for an entire dataset and index them to serve inferences in real time
- Don't need a persistent endpoint that applications (for example, web or mobile apps) can call to get inferences (e.g.: scheduled inferences)
- Don't need the sub-second latency that SageMaker hosted endpoints provide

In [None]:
transformer = sklearn_estimator.transformer(1, "ml.m5.large")
transformer.transform(sampled_x_path, content_type="text/csv")

Let's retrieve the inferences from S3, then check the confusion matrix from the predictions:

In [None]:
!aws s3 cp $transformer.output_path/sampled_data.csv.out /tmp/predictions.out

In [None]:
import numpy as np
import json

with open("/tmp/predictions.out", "r") as r:
 a = r.read()[1:-1].split(", ")
 predictions = [int(numeric_string) for numeric_string in a]
 predictions = np.asarray(predictions)

y_true = sampled_y

pd.crosstab(
 index=y_true.values,
 columns=predictions,
 rownames=["actuals"],
 colnames=["predictions"],
)

# Level 3 : SageMaker Inference Pipelines

A more advanced usage of SageMaker resources is to delegate preprocessing and inference to two different containers running sequentially on the same real-time endpoint. This is called Inference Pipeline. It's really powerful and perfect for more advanced logic. If you want to know more about Inference Pipelines and test them out for yourself, check out this blog post: [Preprocess input data before making predictions using Amazon SageMaker inference pipelines and Scikit-learn](https://aws.amazon.com/it/blogs/machine-learning/preprocess-input-data-before-making-predictions-using-amazon-sagemaker-inference-pipelines-and-scikit-learn/)