# Air Quality Forecasting ML Pipeline [manual]

---

Once you are familiar with using Amazon SageMaker built-in algorithm - [DeepAR](https://docs.aws.amazon.com/sagemaker/latest/dg/deepar.html) to do [air quality forecasting model traing](./01_train_and_evaluate_air_quality_deepar_model.ipynb), we are going to build a ML Pipeline to automate the workflow with [AWS Step Functions Data Science SDK](https://aws-step-functions-data-science-sdk.readthedocs.io). 

First and foremost, let's have a quick dive-in to the architecture design.

![Air Quality Forecasting Architecture Design](./img/air_quality_forecasting_architecture.png)

In the architecture diagram:
* There is preprocessing job to do data integration
  * A table is created in Amazon Athena to query data on open air quality data. Visit [Open AQ](https://openaq.org/) for detail.
  * A query to Amazon Athena to collect Sydney, Australia air quality data.
  * Data cleansing and feature engineering
  * Train and test data set are separated; we keep last 30 days' data as test set.
  * Batch Transform test data is constructed based on the latest 100 record in test set. 
* Hyperparameters optimization is optional
  * In pipeline, we will leave hyperparameter optimziation alone without doing batch transform.
* Model training with tuned hyperparameters
  * For example, you may collect the hyperparameters from HPO jobs with the best candidate.
* Batch transform job is triggerred to forecast air quality.
  * In the example, we do the batch inference for the latest 100 records in our test data set. 

In the notebook, we are going to demo how to create the workflow step by step and process till model training. Below is the related Step Functions workflow mapping to the ML pipeline with no HPO and using an trained model:

![Air Quality Forecasting ML Pipeline](./img/air_quality_forecasting_ml_pipeline.png)

## ML Pipeline Creation
---
To create ML pipeline, we will use Step Functions Data Science SDK v2.0.0rc1, which is compatible with SageMaker SDK 2.x.

We will cover pipeline creation at below:
* Environment initialization
* Build Docker image for SageMaker Processing
* Create ML Pipline with Step Functions Data Science SDK (v2.0.0rc1)

### Initialize Environment

In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
import sys
!{sys.executable} -m pip install --upgrade pip
!{sys.executable} -m pip install -qU awscli boto3 "sagemaker>=2.0.0" # 2.0.0
!{sys.executable} -m pip install -qU "stepfunctions==2.0.0rc1"
!{sys.executable} -m pip show sagemaker stepfunctions

In [None]:
# include imports, bucket_name, role and existing training model uri.
from pipeline.ml_pipeline_dependencies import *

Setup the workflow execution role. For the role arn, please refer to the output tab of the CloudFormation stack. 

In [None]:
ssm = boto3.client('ssm')
response = ssm.get_parameter(Name = "/ml_pipeline/workflow_execution_role")
WORKFLOW_EXECUTION_ROLE = response['Parameter']['Value']

In [None]:
if not WORKFLOW_EXECUTION_ROLE:
    raise Exception("ML Pipeline Parameters in System Manager is not setup properly. Please check whether the ml-pipeline stack has been created or not.")
else:
    print(f"Workflow execution IAM service role: {WORKFLOW_EXECUTION_ROLE}")

### Build Docker image for SageMaker Processing

Define your own processing container and install related dependencies.

Below, we walk through how to create a processing container, and how to use a `ScriptProcessor` to run your own code within a container. Create a container support data preprocessing, feature engineering and model evaluation. 

This block of code buils the container using the docker command, creates an Amazon Elastic Container Registry (Amazon ECR) repository, and pushes the image to Amazon ECR

In [None]:
# define repository name and uri variables
ecr_repository = 'air-quality-forecasting-preprocessing'
tag = ':latest'
uri_suffix = 'amazonaws.com'
if region in ['cn-north-1', 'cn-northwest-1']:
    uri_suffix = 'amazonaws.com.cn'
processing_repository_uri = f'{account_id}.dkr.ecr.{region}.{uri_suffix}/{ecr_repository + tag}'

In [None]:
# build the image.
!docker build -t $ecr_repository -f ./pipeline/ml_pipeline_preprocessing_Dockerfile .

In [None]:
# ECR repository should have been created with CloudFormation stack. Uncomment below to create it in case it wasn't.
!aws ecr create-repository --repository-name $ecr_repository

# Login and push the built docker image
!$(aws ecr get-login --region $region --registry-ids $account_id --no-include-email)
!docker tag {ecr_repository + tag} $processing_repository_uri
!docker push $processing_repository_uri

### Create ML Pipline with Step Functions Data Science SDK (v2.0.0rc1)

---

#### Create Processing Step for data preprocessing

We will now create the [ProcessingStep](https://aws-step-functions-data-science-sdk.readthedocs.io/en/stable/sagemaker.html#stepfunctions.steps.sagemaker.ProcessingStep) that will launch a SageMaker Processing Job.

In the processing job script `./pipeline/ml_pipeline_preprocessing.py`, the actions will be done:

* Create Athena table with external source - OpenAQ
* Query Sydney OpenAQ data 
* Feature engineering on the dataset
* Split training and test data 
* Store the data on S3 buckets.

Upload the preprocessing script.

In [None]:
PREPROCESSING_SCRIPT_LOCATION = "./pipeline/ml_pipeline_preprocessing.py"
input_code_uri = sagemaker_session.upload_data(
    PREPROCESSING_SCRIPT_LOCATION,
    bucket = bucket_name,
    key_prefix = "preprocessing/code",
)

The `ScriptProcessor` class lets you run a command inside the container, which you can use to run your own script.

In [None]:
preprocessing_processor = ScriptProcessor(
    command = ['python3'],
    image_uri = processing_repository_uri,
    role = role,
    instance_count = 1,
    instance_type = 'ml.m5.2xlarge',
    max_runtime_in_seconds = 1200
)

S3 locations of preprocessing output with training, test & all features.

In [None]:
output_data = f"s3://{bucket_name}/preprocessing/output"

This step will use ScriptProcessor as defined in previous steps along with the inputs and outputs objects that are defined in the below steps.

In [None]:
inputs = [
    ProcessingInput(
        source = input_code_uri,
        destination = "/opt/ml/processing/input/code",
        input_name = "code"
    )
]

outputs = [
    ProcessingOutput(
        source = "/opt/ml/processing/output/all",
        destination = f"{output_data}/all",
        output_name = "all_data"
    ),
    ProcessingOutput(
        source = "/opt/ml/processing/output/train",
        destination = f"{output_data}/train",
        output_name = "train_data"
    ),
    ProcessingOutput(
        source = "/opt/ml/processing/output/test",
        destination = f"{output_data}/test",
        output_name = "test_data"
    )
]

In [None]:
# Workflow Execution parameters
execution_input = ExecutionInput(
    schema = {
        "PreprocessingJobName": str,
        "ToDoHPO": bool,
        "ToDoTraining": bool,
        "TrainingJobName": str,
        "TuningJobName": str,
        "ModelName": str,
        "EndpointName": str,
        "TransformJobName": str
    }
)

`ProcessingStep` queries open air quality data for Sydney Australia with Amazon Athena. Especially, we are using our bucket to store query result. In case you setup default workgroup in Amazon Athena, please ensure to uncheck ***Override client-side settings***. 

In [None]:
processing_step = ProcessingStep(
    "AirQualityForecasting Preprocessing Step",
    processor = preprocessing_processor,
    job_name = execution_input["PreprocessingJobName"],
    inputs = inputs,
    outputs = outputs,
    container_arguments = ["--split-days", "30", "--region", region, "--bucket-name", bucket_name],
    container_entrypoint = ["python3", "/opt/ml/processing/input/code/ml_pipeline_preprocessing.py"]
)

#### Create Hyperparameter Tuning Step

Setup tuning step and use choice state to decide whether we should do HPO.

In [None]:
tuning_output_path = f's3://{bucket_name}/sagemaker/tuning/output'
image_uri = sagemaker.image_uris.retrieve('forecasting-deepar', region, '1')
ml_instance_type = 'ml.g4dn.8xlarge'

tuning_estimator = sagemaker.estimator.Estimator(
        sagemaker_session = sagemaker_session,
        image_uri = image_uri,
        role = role,
        instance_count = 1,
        instance_type = ml_instance_type,
        base_job_name = 'deepar-openaq-demo',
        output_path = tuning_output_path
)

#### Set static hyperparameters
The static parameters are the ones we know to be the best based on previously run HPO jobs, as well as the non-tunable parameters like prediction length and time frequency that are set according to requirements.

In [None]:
hpo = dict(
    time_freq= '1H'
    ,early_stopping_patience= 40
    ,prediction_length= 48
    ,num_eval_samples= 10

    # default quantiles [0.1, 0.2, 0.3, ..., 0.9] is used
    #,test_quantiles= quantiles
    
    # not setting these since HPO will use range of values
    #,epochs= 400
    #,context_length= 3
    #,num_cells= 157
    #,num_layers= 4
    #,dropout_rate= 0.04
    #,embedding_dimension= 12
    #,mini_batch_size= 633
    #,learning_rate= 0.0005
)

##### Set hyper-parameter ranges
The hyperparameter ranges define the parameters we want the runer to search across.

> Explore: Look in the [user guide](https://docs.aws.amazon.com/sagemaker/latest/dg/deepar_hyperparameters.html) for DeepAR and add the recommended ranges for `embedding_dimension` to the below.

In [None]:
hpo_ranges = dict(
    epochs= IntegerParameter(1, 1000)
    ,context_length= IntegerParameter(7, 48)
    ,num_cells= IntegerParameter(30,200)
    ,num_layers= IntegerParameter(1,8)
    ,dropout_rate= ContinuousParameter(0.0, 0.2)
    ,embedding_dimension= IntegerParameter(1, 50)
    ,mini_batch_size= IntegerParameter(32, 1028)
    ,learning_rate= ContinuousParameter(.00001, .1)
)

##### Create HPO tunning job step
Once we have the HPO tuner defined, we can define the tuning step.

In [None]:
tuning_estimator.set_hyperparameters(**hpo)

hpo_tuner = HyperparameterTuner(
    estimator = tuning_estimator, 
    objective_metric_name = 'train:final_loss',
    objective_type = 'Minimize',
    hyperparameter_ranges = hpo_ranges,
    max_jobs = 2,
    max_parallel_jobs = 1
)

hpo_data = dict(
    train = f"{output_data}/train",
    test = f"{output_data}/test"
)
# as long as HPO is selected, wait for completion.
tuning_step = TuningStep(
    "HPO Step",
    tuner = hpo_tuner,
    job_name = execution_input["TuningJobName"],
    data = hpo_data,
    wait_for_completion = True
)

#### Create Model Training Step

We create a DeepAR instance, which we will use to run a training job. This will be used to create a TrainingStep for the workflow.

##### Setup the training job step

In [None]:
training_output_path = f's3://{bucket_name}/sagemaker/training/output'
training_estimator = sagemaker.estimator.Estimator(
        sagemaker_session = sagemaker_session,
        image_uri = image_uri,
        role = role,
        instance_count = 1,
        instance_type = ml_instance_type,
        base_job_name = 'deepar-openaq-demo',
        output_path = training_output_path
)


In [None]:
# best hyper parameters for tuning
hpo = dict(
    time_freq= '1H'
    ,early_stopping_patience= 40
    ,prediction_length= 48
    ,num_eval_samples= 10
    #,test_quantiles= quantiles
    ,epochs= 400
    ,context_length= 3
    ,num_cells= 157
    ,num_layers= 4
    ,dropout_rate= 0.04
    ,embedding_dimension= 12
    ,mini_batch_size= 633
    ,learning_rate= 0.0005
)
training_estimator.set_hyperparameters(**hpo)

In [None]:
# use all the features for training.
data = dict(train = f"{output_data}/all/all_features.json")
training_step = TrainingStep(
    "Training Step",
    estimator = training_estimator,
    data = data,
    job_name = execution_input["TrainingJobName"],
    wait_for_completion = True
)

#### Create Model Step

In the following cell, we define a model step that will create a model in Amazon SageMaker using the artifacts created during the TrainingStep. See  [ModelStep](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/sagemaker.html#stepfunctions.steps.sagemaker.ModelStep) in the AWS Step Functions Data Science SDK documentation to learn more.

The model creation step typically follows the training step. The Step Functions SDK provides the [get_expected_model](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/sagemaker.html#stepfunctions.steps.sagemaker.TrainingStep.get_expected_model) method in the TrainingStep class to provide a reference for the trained model artifacts. Please note that this method is only useful when the ModelStep directly follows the TrainingStep.

In [None]:
model_step = ModelStep(
    "Save Model",
    model = training_step.get_expected_model(),
    model_name = execution_input["ModelName"],
    result_path = "$.ModelStepResults"
)

# for deploying existing model
existing_model_name = f"aqf-model-{uuid.uuid1().hex}"
existing_model = Model(
    model_data = EXISTING_MODEL_URI,
    image_uri = image_uri,
    role = role,
    name = existing_model_name
)
existing_model_step = ModelStep(
    "Existing Model",
    model = existing_model,
    model_name = execution_input["ModelName"]
)

#### Create Endpoint Configuration Step

> Endpoing Configuration Step won't be used in workflow as we demo Batch Transform in the lab.

In the following cell we create an endpoint configuration step. See [EndpointConfigStep](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/sagemaker.html#stepfunctions.steps.sagemaker.EndpointConfigStep) in the AWS Step Functions Data Science SDK documentation to learn more.

In [None]:
endpoint_config_step = EndpointConfigStep(
    "Create Model Endpoint Config",
    endpoint_config_name = execution_input["ModelName"],
    model_name = execution_input["ModelName"],
    initial_instance_count = 1,
    instance_type = 'ml.c5.xlarge'
)

#### Create Endpoint Step

> Endpoint Step won't be used in workflow as we demo Batch Transform in the lab.

In the following cells, we create the Endpoint step to deploy the new model as a managed API endpoint, updating an existing SageMaker endpoint if our choice state is sucessful.

In [None]:
endpoint_step = EndpointStep(
    "Model Endpoint",
    endpoint_name = execution_input["EndpointName"],
    endpoint_config_name = execution_input["ModelName"],
    update = False
)

#### Batch Transform Step
In the following cells, we create the Batch Transform step to do batch inference.

In [None]:
# assume we only check '0.5' quatiles predictions.
environment_param = {
    'num_samples': 20,
    'output_types': ['quantiles'],
    'quantiles': ['0.5']
}

In [None]:
transformer = Transformer(
    execution_input["ModelName"],
    1,
    'ml.c5.2xlarge',
    output_path=f's3://{bucket_name}/sagemaker/batch_transform/output',
    sagemaker_session=sagemaker_session,
    strategy='MultiRecord',
    assemble_with='Line',
    env = {
        'DEEPAR_INFERENCE_CONFIG': json.dumps(environment_param)
    }
)

transformStep = TransformStep(
    state_id = "Batch Transform Step",
    transformer = transformer,
    job_name = execution_input["TransformJobName"],
    model_name = execution_input["ModelName"],
    data = f"{output_data}/test/batch_transform_test.json",
    split_type = 'Line'
)

#### Setup Workflow Process

Create `Fail` state to mark the workflow failed in case any of the steps fail.

In [None]:
failed_state_sagemaker_pipeline_step_failure = Fail(
    "ML Workflow Failed", cause = "SageMakerPipelineStepFailed"
)

In [None]:
training_path = Chain([training_step, model_step, transformStep])
deploy_existing_model_path = Chain([existing_model_step, transformStep])

##### Choice Step Configuration

Now, we need to setup choice state for choose HPO / Training or not. See *Choice Rules* in the [AWS Step Functions Data Science SDK documentation](https://aws-step-functions-data-science-sdk.readthedocs.io) .

In [None]:
hpo_choice = Choice(
    "To do HPO?"
)
training_choice = Choice(
    "To do Model Training?"
)

# refer to execution input variable with required format - not user friendly.
hpo_choice.add_choice(
    rule = ChoiceRule.BooleanEquals(variable = "$$.Execution.Input['ToDoHPO']", value = True),
    next_step = tuning_step
)
hpo_choice.add_choice(
    rule = ChoiceRule.BooleanEquals(variable = "$$.Execution.Input['ToDoHPO']", value = False),
    next_step = training_choice
)
training_choice.add_choice(
    rule = ChoiceRule.BooleanEquals(variable = "$$.Execution.Input['ToDoTraining']", value = True),
    next_step = training_path
)
training_choice.add_choice(
    rule = ChoiceRule.BooleanEquals(variable = "$$.Execution.Input['ToDoTraining']", value = False),
    next_step = deploy_existing_model_path
)

##### Error Handling in the Workflow

In [None]:
catch_state_processing = Catch(
    error_equals = ["States.TaskFailed"],
    next_step = failed_state_sagemaker_pipeline_step_failure   
)
processing_step.add_catch(catch_state_processing)
tuning_step.add_catch(catch_state_processing)
training_step.add_catch(catch_state_processing)
model_step.add_catch(catch_state_processing)
endpoint_config_step.add_catch(catch_state_processing)
endpoint_step.add_catch(catch_state_processing)
existing_model_step.add_catch(catch_state_processing)

#### Create and execute the Workflow

In [None]:
# execution input parameter values
preprocessing_job_name = f"aqf-preprocessing-{uuid.uuid1().hex}"
tuning_job_name = f"aqf-tuning-{uuid.uuid1().hex}"
training_job_name = f"aqf-training-{uuid.uuid1().hex}"
model_job_name = f"aqf-model-{uuid.uuid1().hex}"
endpoint_job_name = f"aqf-endpoint-{uuid.uuid1().hex}"
batch_transform_job_name = f"aqf-transform-{uuid.uuid1().hex}"

In [None]:
# variables
WORKFLOW_NAME = "manaul-aqf-ml-pipeline"
TO_DO_HPO = False
TO_DO_TRAINING = False

In [None]:
#workflow_graph = Chain([processing_step, hpo_choice])
workflow_graph = Chain([processing_step, hpo_choice])
workflow = Workflow(
    name = WORKFLOW_NAME,
    definition = workflow_graph,
    role = WORKFLOW_EXECUTION_ROLE
)
workflow.create()
# update() to ensure existing workflow can get updated as create() just return ARN for the existing one.
workflow.update(definition = workflow_graph) 

# execute workflow
execution = workflow.execute(
    inputs = {
        "PreprocessingJobName": preprocessing_job_name,
        "ToDoHPO": TO_DO_HPO,
        "ToDoTraining": TO_DO_TRAINING,
        "TrainingJobName": training_job_name,
        "TuningJobName": tuning_job_name,
        "ModelName": model_job_name,
        "EndpointName": endpoint_job_name,
        "TransformJobName": batch_transform_job_name
    }
)

In [None]:
response = execution.describe()
execution_id = response['name']
# advice state machine console link
display_state_machine_advice(WORKFLOW_NAME, execution_id)

Run below cell multiple times to observe the workflow execution progress. Please note that the execution may take 15-20mins with using existing model for batch transform. 

In [None]:
execution.render_progress(portrait = True)