# Train a Scikit-Learn model in SageMaker and track with MLFlow

## Intro

The main objective of this notebook is to show how you can integrate Amazon SageMaker and MLFlow and MLFlow with SageMaker Experiments.

## Pre-Requisites

In order to run successfully this notebook, you must have prepared the infrastructure using CDK, which setups up for you the MLFlow server in an isolated VPC. When running this example in the SageMaker Notebook instance provisioned via CDK, you need to have access to the URI of the MLFlow server we will use for tracking purposes. In our case, this corresponds to the `HTTP API Gateway` endpoint that exposes our MLFlow server reacheable via a `PrivateLink` and have a SageMaker execution role with permissions to access the secret in `Amazon SecretsManager` from where we retrieve the username and password to interact with the MLFlow server.

This notebook runs on SageMaker Studio using the `Base Python 2.0` image on a `Python 3` kernel.

## The Machine Learning Problem

In this example, we will solve a regression problem which aims to answer the question: "what is the expected price of a house in the California area?". The target variable is the house value for California districts, expressed in hundreds of thousands of dollars ($100,000).

## Install required and/or update libraries

At the time of writing, we have used the `sagemaker` SDK version 2. The MLFlow SDK library used is the one corresponding to our MLFlow server version, i.e., `2.3.1`

In [None]:
!pip install -q --upgrade pip
!pip install sagemaker sagemaker-experiments scikit-learn==1.0.1 mlflow==2.3.1 boto3

Let's start by specifying:

- The S3 bucket and prefix that you want to use for training and model data.  This should be within the same region as the notebook instance, training, and hosting.
- The IAM role arn used to give training and hosting access to your data. See the [documentation](https://docs.aws.amazon.com/sagemaker/latest/dg/using-identity-based-policies.html) for more details on creating these.  Note, if a role not associated with the current notebook instance, or more than one role is required for training and/or hosting, please replace `sagemaker.get_execution_role()` with a the appropriate full IAM role arn string(s).
- The tracking URI where the MLFlow server runs
- The experiment name as the logical entity to keep our tests grouped and organized.

In [None]:
import os
import pandas as pd
import json
import random
import boto3

## SageMaker and SKlearn libraries
import sagemaker
from sagemaker.sklearn.estimator import SKLearn
from sagemaker.tuner import IntegerParameter, HyperparameterTuner

## SKLearn libraries
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split

## MLFlow libraries
import mlflow
from mlflow.tracking.client import MlflowClient
import mlflow.sagemaker

cloudformation_client = boto3.client('cloudformation')

sess = sagemaker.Session()
role = sagemaker.get_execution_role()
bucket = sess.default_bucket()
region = sess.boto_region_name
account = role.split("::")[1].split(":")[0]
tracking_uri = cloudformation_client.describe_stacks(StackName='HttpGatewayStack')['Stacks'][0]['Outputs'][0]['OutputValue']

mlflow_secret_name = "mlflow-server-credentials"
experiment_name = 'DEMO-sagemaker-mlflow'
model_name = 'california-housing-model'

print('SageMaker role: {}'.format(role.split("/")[-1]))
print('bucket: {}'.format(bucket))
print('Account: {}'.format(account))
print("Using AWS Region: {}".format(region))
print("MLflow server URI: {}".format(tracking_uri))
print("MLFLOW_SECRET_NAME: {}".format(mlflow_secret_name))

## Data Preparation
We load the dataset from sklearn, then split the data in training and testing datasets, where we allocate 75% of the data to the training dataset, and the remaining 25% to the traning dataset.

The variable `target` is what we intend to estimate, which represents the value of a house, expressed in hundreds of thousands of dollars ($100,000)

In [None]:
# we use the California housing dataset 
data = fetch_california_housing()

X_train, X_test, y_train, y_test = train_test_split(data.data, data.target, test_size=0.25, random_state=42)

trainX = pd.DataFrame(X_train, columns=data.feature_names)
trainX['target'] = y_train

testX = pd.DataFrame(X_test, columns=data.feature_names)
testX['target'] = y_test

Finally, we save a copy of the data locally, as well as in S3. The data stored in S3 will be used SageMaker to train and test the model.

In [None]:
# save the data locally
trainX.to_csv('california_train.csv', index=False)
testX.to_csv('california_test.csv', index=False)

# save the data to S3.
train_path = sess.upload_data(path='california_train.csv', bucket=bucket, key_prefix='sagemaker/sklearncontainer')
test_path = sess.upload_data(path='california_test.csv', bucket=bucket, key_prefix='sagemaker/sklearncontainer')

### Setup SageMaker Experiments

SageMaker Experiments is an AWS service for tracking machine learning Experiments. The SageMaker Experiments Python SDK is a high-level interface to this service that helps you track Experiment information using Python.

Conceptually, these are the following entities within `SageMaker Experiments`:

* Experiment: A collection of related Trials. Add Trials to an Experiment that you wish to compare together.
* Trial: A description of a multi-step machine learning workflow. Each step in the workflow is described by a TrialComponent.
* TrialComponent: A description of a single step in a machine learning workflow.
* Tracker: A Python context-manager for logging information about a single TrialComponent.

When running jobs (both training and processing ones) in the SageMaker managed infrastructure, SageMaker creates automatically a <i>TrialComponent</i>. <i>TrialComponents</i> includes by default jobs metadata and lineage information about the input and output data, models artifacts and metrics (for training jobs), and within your training script these data can be further enriched.

We want to show how you can easily enable a two-way interaction between MLflow and SageMaker Experiments.

Let us first create an `Experiment` and a `Trial`. These two entities are used to keep your experimentation organized.

In [None]:
from smexperiments.experiment import Experiment
from smexperiments.trial import Trial
from smexperiments.trial_component import TrialComponent
from smexperiments.tracker import Tracker

import time

try:
    my_experiment = Experiment.load(experiment_name=experiment_name)
    print("existing experiment loaded")
except Exception as ex:
    if "ResourceNotFound" in str(ex):
        my_experiment = Experiment.create(
            experiment_name = experiment_name,
            description = "MLFlow and SageMaker integration"
        )
        print("new experiment created")
    else:
        print(f"Unexpected {ex}=, {type(ex)}")
        print("Dont go forward!")
        raise

trial_name = "trial-v1"

try:
    my_first_trial = Trial.load(trial_name=trial_name)
    print("existing trial loaded")
except Exception as ex:
    if "ResourceNotFound" in str(ex):
        my_first_trial = Trial.create(
            experiment_name=experiment_name,
            trial_name=trial_name,
        )
        print("new trial created")
    else:
        print(f"Unexpected {ex}=, {type(ex)}")
        print("Dont go forward!")
        raise

create_date = time.strftime("%Y-%m-%d-%H-%M-%S")

experiment_config = {
    "ExperimentName": experiment_name,
    "TrialName": trial_name,
}

## Training

For this example, we use the `SKlearn` framework in script mode with SageMaker. Let us explore in more details the different components we need to define.

### Traning script and SageMaker environment

The `./source_dir/train.py` script provides all the code we need for training a SageMaker model. The training script is very similar to a training script you might run outside of SageMaker, but you can access useful properties about the training environment through various environment variables, such as:

* `SM_MODEL_DIR`: A string representing the path to the directory to write model artifacts to. These artifacts are uploaded to S3 for model hosting.
* `SM_CHANNEL_TRAIN`: A string representing the path to the directory containing data in the 'training' channel.
* `SM_CHANNEL_TEST`: A string representing the path to the directory containing data in the 'testing' channel.


For more information about training environment variables, please visit 
[SageMaker Training Toolkit](https://github.com/aws/sagemaker-training-toolkit/blob/master/ENVIRONMENT_VARIABLES.md).

We want to highlight in particular `SM_TRAINING_ENV` since it provides all the training information as a JSON-encoded dictionary (see [here](https://github.com/aws/sagemaker-training-toolkit/blob/master/ENVIRONMENT_VARIABLES.md#sm_training_env) for more details).

#### Hyperparmeters

We are using the `RandomForestRegressor` algorithm from the SKlearn framework. For the purpose of this exercise, we are only using a subset of hyperparameters supported by this algorithm, i.e. `n-estimators` and `min-samples-leaf`

If you would like to know more the different hyperparmeters for this algorithm, please refer to the [`RandomForestRegressor` official documentation](https://scikit-learn.org/stable/modules/generated/sklearn.ensemble.RandomForestRegressor.html).

Furthermore, it is important to note that for the purpose of this excercise, we are essentially omitting completely the feature engineering step, which is an essential step in any machine learning problem.

#### MLFlow interaction

To interact with the MLFlow server, we use the mlflow SDK, which allows us to set the tracking URI and the experiment name. One this initial setup is completed, we can store the parameters used (`mlflow.log_params(params)`), the model that is generated (`mlflow.sklearn.log_model(model, "model")`) with its associated metrics (`mlflow.log_metric(f'AE-at-{str(q)}th-percentile', np.percentile(a=abs_err, q=q))`).

TODO: explain the `mlflow.autolog()` and the <i>System Tags</i> (add link) and how to overwrite them to have the right reference in SageMaker

#### SageMaker

In [None]:
!pygmentize ./source_dir/train.py

### SKlearn container

For this example, we use the `SKlearn` framework in script mode with SageMaker. For more information please refere to [the official documentation](https://sagemaker.readthedocs.io/en/stable/frameworks/sklearn/using_sklearn.html)

Our training script makes use of other 3rd party libraries, i.e. `mlflow`, which are not installed by default in the `Sklearn` container SageMaker provides. However, this can be easily overcome by supplying a `requirement.txt` file in the `source_dir` folder, which then SageMaker will `pip`-install before executing the training script.

### Metric definition

SageMaker emits every log to CLoudWatch. Since we are using scripting mode, we need to specify a metric definition object to define the format of the metric we are interested in via regex, so that SageMaker knows how to extract this metric from the CloudWatch logs of the training job.

In our case our custom metric is as follow

```python
metric_definitions = [{'Name': 'median-AE', 'Regex': "AE-at-50th-percentile: ([0-9.]+).*$"}]
```

In [None]:
metric_definitions = [{'Name': 'median-AE', 'Regex': "AE-at-50th-percentile: ([0-9.]+).*$"}]

hyperparameters = {
    'tracking_uri': tracking_uri,
    'experiment_name': experiment_name,
    'secret_name': mlflow_secret_name,
    'region': region,
    'n-estimators': 100,
    'min-samples-leaf': 3,
    'features': 'MedInc HouseAge AveRooms AveBedrms Population AveOccup',
    'target': 'target'
}

estimator = SKLearn(
    entry_point='train.py',
    source_dir='source_dir',
    role=role,
    metric_definitions=metric_definitions,
    hyperparameters=hyperparameters,
    instance_count=1,
    instance_type='ml.m5.large',  # to run SageMaker in a managed infrastructure
    framework_version='1.0-1',
    base_job_name='mlflow',
)

Now we are ready to execute the training locally, which in turn will save its execution data to the MLFlow server. After initializing an `SKlearn` estimator object, all we need to do is to call the `.fit` method specifying where the training and testing data are located.

In [None]:
estimator.fit({'train':train_path, 'test': test_path}, experiment_config=experiment_config)

### From SageMaker to MLFlow

Load the <i>TrialComponent</i> associate with the `estimator`.

In [None]:
training_job_name = estimator.latest_training_job.name

trial_component = TrialComponent.load(f"{training_job_name}-aws-training-job")
mlflow_run_url = trial_component.parameters["mlflow-run-url"]

In [None]:
from IPython.core.display import HTML
HTML("<a href={}>link to MLFlow run</a>".format(mlflow_run_url))

### From MLFlow to SageMaker Experiments

Within SageMaker Experiments, we have enriched the <i>TrialComponent</i> with information specific to MLFlow. For example

* the experiment ID in MLFlow
* the MLFlow run ID corresponding to the SageMaker training job
* any additional MLFlow parameters and metrics generated by MLFlow
* the list of output artifacts generated by MLFlow (e.g., the output model) with their full path to S3

A visual inspection of the SageMaker Studio UI for the output artifacts can be seen below

![MLFlow Output Artifacts in SageMaker Experiments](./../../images/trialcomponent-output-artifacts-mlflow.png)

## Register the model to MLFlow

At the end of the training, our model has been saved to the MLflow server and we are ready to register the model, i.e. assign it to a model package and create a version. Please refer to the [official MLFlow documentation](https://www.mlflow.org/docs/latest/model-registry.html) for furthe information.

In [None]:
def retrieve_credentials(region_name, secret_name):
    session = boto3.session.Session()
    client = session.client(
        service_name='secretsmanager',
        region_name=region_name
    )
    
    kwarg = {'SecretId': secret_name}
    secret = client.get_secret_value(**kwarg)
    credentials = {}

    credentials['username'] = json.loads(secret['SecretString'])['username']
    credentials['password'] = json.loads(secret['SecretString'])['password']
    
    return credentials

# set the tracking token env variable will enable the mlflow SDK to set the header "Authentication: Basic <credentials>" to authenticate.
credentials = retrieve_credentials(region, mlflow_secret_name)
os.environ['MLFLOW_TRACKING_USERNAME'] = credentials['username']
os.environ['MLFLOW_TRACKING_PASSWORD'] = credentials['password']

In [None]:
mlflow.set_tracking_uri(tracking_uri)
mlflow.set_experiment(experiment_name)
client = MlflowClient()

run = mlflow.get_run(run_id=trial_component.parameters["run_id"])

try:
    client.create_registered_model(model_name)
except:
    print("Registered model already exists")

model_version = client.create_model_version(
    name=model_name,
    source="{}/model".format(run.info.artifact_uri),
    run_id=run.info.run_uuid
)

print("model_version: {}".format(model_version))

## Local Predictions

We are now ready to make predictions with our model locally for testing purposes.

In [None]:
# get the model URI from the MLFlow registry
model_uri = model_version.source
print("Model URI: {}".format(model_uri))

# Load model as a Sklearn model.
loaded_model = mlflow.sklearn.load_model(model_uri)

# get a random index to test the prediction from the test data
index = random.randrange(0, len(testX))
print("Random index value: {}".format(index))

# Prepare data on a Pandas DataFrame to make a prediction.
data = testX.drop(['Latitude','Longitude','target'], axis=1).iloc[[index]]

print("#######\nData for prediction \n{}".format(data))

y_hat = loaded_model.predict(data)[0]
y = y_test[index]

print("Predicted value: {}".format(y_hat))
print("Actual value: {}".format(y))

# Tune a Scikit-Learn model in SageMaker and track with MLFlow

At this point, we are going to offload the training to the remote infrastructure managed by SageMaker. We want now to leverage SageMaker's hyperparameter tuning to kick off multiple training jobs with different hyperparameter combinations, to find the set with best model performance. This is an important step in the machine learning process as hyperparameter settings can have a large impact on model accuracy. In this example, we'll use the SageMaker Python SDK to create a hyperparameter tuning job for an SKlearn estimator.

## Training
We are again using `SKlearn` in script mode, with the same training script we have used in the previous section, i.e. `./source_dir/train.py`.

In [None]:
hyperparameters = {
    'tracking_uri': tracking_uri,
    'experiment_name': experiment_name,
    'secret_name': mlflow_secret_name,
    'region': region,
    'features': 'MedInc HouseAge AveRooms AveBedrms Population AveOccup',
    'target': 'target'
}

metric_definitions = [{'Name': 'median-AE', 'Regex': "AE-at-50th-percentile: ([0-9.]+).*$"}]

estimator = SKLearn(
    entry_point='train.py',
    source_dir='source_dir',
    role=role,
    instance_count=1,
    instance_type='ml.m5.xlarge',
    hyperparameters=hyperparameters,
    metric_definitions=metric_definitions,
    framework_version='1.0-1',
    py_version='py3'
)

## Hyperparameter tuning

Once we've defined our estimator we can specify the hyperparameters we'd like to tune and their possible values.  We have three different types of hyperparameters.
- Categorical parameters need to take one value from a discrete set.  We define this by passing the list of possible values to `CategoricalParameter(list)`
- Continuous parameters can take any real number value between the minimum and maximum value, defined by `ContinuousParameter(min, max)`
- Integer parameters can take any integer value between the minimum and maximum value, defined by `IntegerParameter(min, max)`

*Note, if possible, it's almost always best to specify a value as the least restrictive type.  For example, tuning `thresh` as a continuous value between 0.01 and 0.2 is likely to yield a better result than tuning as a categorical parameter with possible values of 0.01, 0.1, 0.15, or 0.2.*

In [None]:
hyperparameter_ranges = {
    'n-estimators': IntegerParameter(50, 200),
    'min-samples-leaf': IntegerParameter(1, 10)
}

Next we'll specify the objective metric that we'd like to tune and its definition. This refers to the regular expression (Regex) needed to extract that metric from the CloudWatch logs of our training job we defined earlier, as well as whether we are looking to `Maximize` or `Minimize` the objective metric.

In [None]:
objective_metric_name = 'median-AE'
objective_type = 'Minimize'

Now, we'll create a `HyperparameterTuner` object, which we pass:
- The SKLearn estimator we created earlier
- Our hyperparameter ranges
- Objective metric name and type
- Number of training jobs to run in total and how many training jobs should be run simultaneously.  More parallel jobs will finish tuning sooner, but may sacrifice accuracy.  We recommend you set the parallel jobs value to less than 10% of the total number of training jobs (we'll set it higher just for this example to keep it short).

In [None]:
max_jobs = 5
max_parallel_jobs = 5

tuner = HyperparameterTuner(estimator,
                            objective_metric_name,
                            hyperparameter_ranges,
                            metric_definitions,
                            max_jobs=max_jobs,
                            max_parallel_jobs=max_parallel_jobs,
                            objective_type=objective_type,
                            base_tuning_job_name='mlflow')

And finally, we can start our tuning job by calling `.fit()` and passing in the S3 paths to our train and test datasets.

In [None]:
tuner.fit({'train':train_path, 'test': test_path})

We can now query the MLFlow server to see the different models and their metrics that have been stored.

# Deploy an MLflow model with SageMaker

We are finally ready to deploy a MLFlow model to a SageMaker hosted endpoint ready to be consumed for online predictions.

## Build MLflow docker image to serve the model with SageMaker

We first need to build a new MLflow Sagemaker image, assign it a name, and push to ECR.

The `mlflow sagemaker build-and-push-container` function does exactly that. It first builds an MLflow Docker image. The image is built locally and it requires Docker to run. Then, the image is pushed to ECR under current active AWS account and to current active AWS region. More information on this command can be found in the official [MLflow CLI documentation for SageMaker](https://www.mlflow.org/docs/latest/cli.html#mlflow-sagemaker).

Make sure that you the `mlflow-pyfunc` container has already been pushed to `ECR` from the `Cloud9` environment from where deployed the CDK stacks.

In [None]:
# URL of the ECR-hosted Docker image the model should be deployed into: make sure to include the tag 2.3.1
image_uri = "{}.dkr.ecr.{}.amazonaws.com/mlflow-pyfunc:{}".format(account, region, mlflow.__version__)
print("image URI: {}".format(image_uri))

## Deploy a SageMaker endpoint with our scikit-learn model

We first need to get the best performing model stored in MLFlow. Once it has been identified, we register it to the Registry and then deploy to a SageMaker managed endpoint via the MLflow SDK. More information can be found [here](https://www.mlflow.org/docs/latest/python_api/mlflow.sagemaker.html)

In [None]:
best_training_job_name = tuner.best_training_job()

best_trial_component = TrialComponent.load(f"{best_training_job_name}-aws-training-job")
best_mlflow_run_url = best_trial_component.parameters["mlflow-run-url"]

In [None]:
from IPython.core.display import HTML
HTML("<a href={}>MLFlow run corresponding to best training job</a>".format(best_mlflow_run_url))

In [None]:
experiment = mlflow.get_experiment_by_name(experiment_name)
experiment_id = experiment.experiment_id

run = mlflow.get_run(run_id=best_trial_component.parameters["run_id"])

try:
    client.create_registered_model(model_name)
except:
    print("Registered model already exists")

model_version = client.create_model_version(
    name=model_name,
    source="{}/model".format(run.info.artifact_uri),
    run_id=run.info.run_uuid
)

print("model_version: {}".format(model_version))

In [None]:
from mlflow.deployments import get_deploy_client

model_uri = "models:/{}/{}".format(model_version.name, model_version.version)

endpoint_name = 'california-housing'

config={
    'execution_role_arn': role,
    'image_url': image_uri,
    'instance_type': 'ml.m5.xlarge',
    'instance_count': 1,
    'region_name': region
}

client = get_deploy_client("sagemaker")

client.create_deployment(
    name=endpoint_name,
    model_uri=model_uri,
    flavor='python_function',
    config=config
)

## Predict

We are now ready to make predictions again the endpoint.

In [None]:
# load california  dataset
data = pd.read_csv('./california_test.csv')
df_y = data[['target']]
df = data.drop(['Latitude','Longitude','target'], axis=1)

client = get_deploy_client(f"sagemaker:/{region}")

for _ in range(0,2):
    # Randomly pick a row to test the prediction
    index = random.randrange(0, len(df_y))
    payload = df.iloc[[index]]
    y = df_y['target'][index]
    print(f"payload: {payload}")
    prediction = client.predict(endpoint_name, payload)
    print(f'This is the real value of the housing we want to predict (expressed in 100.000$): {y}')
    print(f"This is the predicted value from our model (expressed in 100.000$): {prediction['predictions'][0]}")

## Delete endpoint

In order to avoid unwanted costs, make sure you delete the endpoint.

In [None]:
client.delete_deployment(endpoint_name, config=config)

### Delete experiments (Optional)

In [None]:
my_experiment.delete_all(action="--force")