# Use data from Amazon RedShift in a scheduled training pipeline with Amazon SageMaker

Customers in many different domains tend to work with multiple sources for their data: object-based storage like Amazon S3, relational databases like Amazon RDS, data warehouses like Amazon Redshift. Machine Learning practitioners are often driven to work with objects and files instead of databases and tables from the different frameworks they work with, and prefer local copies of such files in order to reduce to the minimum the latency of accessing them.

Nevertheless, ML engineers and Data Scientists might be required to directly extract with SQL-like queries data from data warehouses to obtain the datasets that they can use for training their models.

In this blog post, we use Amazon SageMaker Processing API to run a query against a RedShift cluster, create CSV files, and perform distributed processing. Then, we train a simple model to predict the total sales for new events, and build a pipeline with Amazon SageMaker Pipelines to be able to schedule it.

## Prerequisites

This blog post uses the sample data that is available when creating a Free Tier cluster in Amazon RedShift. Here, we take into account that your RedShift cluster has already been created and that you have attached to it an IAM role with the correct permissions. To learn how to do both these operations, check the two following links:

- Create the cluster with the sample dataset - [Link](https://docs.aws.amazon.com/redshift/latest/gsg/sample-data-load.html)
- Associate the role to the cluster - [Link](https://docs.aws.amazon.com/redshift/latest/mgmt/data-api.html#data-api-access)

After this has been created, you can use your IDE of choice to open the notebooks. The content has been developed and tested using SageMaker Studio, on a `ml.t3.medium` instance. If you want to know more about SageMaker Studio, you can learn here:

- What is Amazon SageMaker Studio - [Link](https://aws.amazon.com/sagemaker/studio/)
- How to set-up you Amazon Sagemaker Studio domain - [Link](https://docs.aws.amazon.com/sagemaker/latest/dg/gs-studio-onboard.html)
- Cloning a repository in Sagemaker Studio - [Link](https://docs.aws.amazon.com/sagemaker/latest/dg/studio-tasks-git.html)
- Changing instance type on a SageMaker Notebook - [Link](https://docs.aws.amazon.com/sagemaker/latest/dg/notebooks-run-and-manage-switch-instance-type.html)

## Setup: configure your cluster access, your role ARN, your output S3 URI

In the following cell, you will be prompted to provide some of the information associated to your Redshift cluster and S3 path of output.

In [None]:
import sagemaker

session = sagemaker.Session()
try:
    role = sagemaker.get_execution_role()
except:
    role = input("ARN of the Execution Role for this notebook:")

###### CLUSTER CONFIGURATION
cluster_id = input("The name of your Redshift cluster:")
cluster_role_name = input("The name of the Role you've associated to your Redshift Cluster (not the ARN, default: myRedshiftRole):") or "myRedshiftRole"
cluster_role_arn = f'arn:aws:iam::{session.account_id()}:role/service-role/{cluster_role_name}'
database = input("The database of your Redshift cluster (default: dev)") or 'dev'
db_user = input("The user of your Redshift cluster (default: awsuser)") or 'awsuser'

###### OUTPUT S3 PATH
bucket = input("Your S3 bucket (leave empty for default):") or session.default_bucket()
key_prefix = input("The path where to save the output of the Redshift query in S3 (default: redshift-demo/redshift2processing/data/)") or "redshift-demo/redshift2processing/data/"
output_s3_uri = f's3://{bucket}/{key_prefix}'

# Output the info
print(f'\n\nCluster ID: {cluster_id}\nRole ARN: {cluster_role_arn}\nOutput S3 URI: {output_s3_uri}')

Now that Redshift cluster is set, we can now prepare our SQL query string. In this example, we plan on predicting total sales for a specific event, provided its venue, category, date and holiday information. The query is a pretty basic one but can be improved as needed.

In [None]:
###### QUERY STRING
query_string = """
-- Find total sales for specific event, plus additional features
SELECT sum(s.qtysold) AS total_sold, sum(s.pricepaid) AS total_paid, e.venueid, e.catid, d.caldate, d.holiday
from sales s, event e, date d
WHERE s.eventid = e.eventid and e.dateid = d.dateid
GROUP BY e.venueid, e.catid, d.caldate, d.holiday
""" # this will work on the default Free Tier Redshift cluster. Change if needed.

## Reading from Redshift with SageMaker Processing

The first step is to create a `RedshiftDatasetDefinition`. This is part of the SageMaker Python SDK and defines how you are supposed to read data from RedShift.

In [None]:
from sagemaker.dataset_definition.inputs import RedshiftDatasetDefinition

rdd = RedshiftDatasetDefinition(
    cluster_id=cluster_id,
    database=database,
    db_user=db_user,
    query_string=query_string,
    cluster_role_arn=cluster_role_arn,
    output_format='CSV',
    output_s3_uri=output_s3_uri
)

Then, you can define the `DatasetDefinition`. This object is responsible of defining how SageMaker Processing will use the dataset loaded from Redshift. 

In [None]:
from sagemaker.dataset_definition.inputs import DatasetDefinition

dd = DatasetDefinition(
    data_distribution_type='ShardedByS3Key',
    local_path='/opt/ml/processing/input/data/',
    redshift_dataset_definition=rdd
)

Finally, you can use this object as input of your Processor of choice. Here, we have written a very simple SKLearn script that does a bit of cleaning of the dataset, some transformation, as well as a split of train and test dataset. You can check the code in the file `processing.py` . You can now define the `SKLearnProcessor` and pass the `dataset_definition`.

In [None]:
from sagemaker.sklearn import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker import get_execution_role

skp = SKLearnProcessor(
    framework_version='0.23-1',
    role=role,
    instance_type='ml.m5.large',
    instance_count=2
)
skp.run(
    code='processing/processing.py',
    inputs=[ProcessingInput(
        input_name='source',
        dataset_definition=dd,
        destination='/opt/ml/processing/input/data/',
        s3_data_distribution_type='ShardedByS3Key'
    )],
    outputs = [
        ProcessingOutput(output_name="train", source="/opt/ml/processing/output/train"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/output/test"),
    ]
)

With the outputs created by the Processing job, we can move to the training step, by the means of the `sagemaker.sklearn.SKLearn()` estimator.

In [None]:
from sagemaker.sklearn import SKLearn

s = SKLearn(
    entry_point='training/script.py',
    framework_version='0.23-1',
    instance_type='ml.m5.large',
    instance_count=1,
    role=role
)
s.fit({
    'train':skp.latest_job.outputs[0].destination, 
    'test':skp.latest_job.outputs[1].destination
})

## Putting the pieces together in a Pipeline

SageMaker Pipelines allows us to build a workflow by means of steps, each one of the referring to a specific SageMaker feature.

In [None]:
from sagemaker.workflow.parameters import ParameterString, ParameterInteger
from sagemaker.workflow.steps import (
    ProcessingStep, ProcessingInput, ProcessingOutput, 
    TrainingStep, TrainingInput, 
    CreateModelStep, CreateModelInput
)
from sagemaker.workflow.pipeline import Pipeline

The first step in the pipeline will preprocess the data to prepare it for training. We create a `SKLearnProcessor` object similar to the one created previously, but now parameterized so we can separately track and change the job configuration as needed, for example to increase the instance type size and count to accommodate a growing dataset.

In [None]:
#### PROCESSING STEP #####

# PARAMETERS
processing_instance_type = ParameterString(name='ProcessingInstanceType', default_value='ml.m5.large')
processing_instance_count = ParameterInteger(name='ProcessingInstanceCount', default_value=2)

# PROCESSOR
skp = SKLearnProcessor(
    framework_version='0.23-1',
    role=role,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count
)

# DEFINE THE STEP
processing_step = ProcessingStep(
    name='ProcessingStep',
    processor=skp,
    code='processing/processing.py',
    inputs=[ProcessingInput(
        dataset_definition=dd,
        destination='/opt/ml/processing/input/data/',
        s3_data_distribution_type='ShardedByS3Key'
    )],
    outputs = [
        ProcessingOutput(output_name="train", source="/opt/ml/processing/output/train"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/output/test"),
    ]
)

The following code sets up a pipeline step for a training job. We specify an `Estimator` object, and define a `TrainingStep` to insert the training job in the pipeline with inputs from the previous SageMaker Processing step. Since the training script is pretty basic, it does not accept any hyperparameter to the model, but this can be easily changed by adapting the `ArgumentParser` in the [`script.py`](training/script.py) file and adding the hyperparameters here below.

In [None]:
#### TRAINING STEP ####

# PARAMETERS
training_instance_type = ParameterString(name='TrainingInstanceType', default_value='ml.m5.large')
training_instance_count = ParameterInteger(name='TrainingInstanceCount', default_value=1)

# ESTIMATOR
s = SKLearn(
    entry_point='training/script.py',
    framework_version='0.23-1',
    instance_type=training_instance_type,
    instance_count=training_instance_count,
    role=role
)

# TRAININGSTEP
training_step = TrainingStep(
    name='TrainingStep',
    estimator=s,
    inputs={
        "train": TrainingInput(s3_data=processing_step.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri),
        "test": TrainingInput(s3_data=processing_step.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri)
    }
)

As another step, we create a SageMaker `Model` object to wrap the model artifact, and associate it with the SageMaker prebuilt SKLearn inference container to potentially use later, i.e. in an inference pipeline.

In [None]:
#### MODEL STEP ####

# PARAMETERS
inference_instance_type = ParameterString(name='InferenceInstanceType', default_value='ml.m5.large')

# MODEL
model = SKLearnModel(
    model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts,
    role=role,
    entry_point="training/script.py",
    framework_version="0.23-1",
    sagemaker_session=session
)

# MODELSTEP
model_step = CreateModelStep(
    name="Model",
    model=model,
    inputs=CreateModelInput(instance_type=inference_instance_type)
)

With all of the pipeline steps now defined, we can define the pipeline itself as a `Pipeline` object comprising a series of those steps. `ParallelStep` and `Condition` steps also are possible.

In [None]:
#### PIPELINE ####

pipeline = Pipeline(
    name = 'Redshift2Pipeline',
    parameters = [
        processing_instance_type, processing_instance_count,
        training_instance_type, training_instance_count,
        inference_instance_type
    ],
    steps = [
        processing_step, 
        training_step,
        model_step
    ]
)
pipeline.upsert(role_arn=role)

After upserting its definition, we can start the pipeline with the Pipeline object's `start()` method, and wait for the end of its execution:

In [None]:
execution = pipeline.start()
execution.wait()

After the pipeline started executing, you can view the pipeline run.

To view them, choose the SageMakers Components and registries button. On the Components and registires drop down, select Pipelines. Click the `Redshift2Pipeline` pipeline, and then double click on the execution. Now you can see the pipeline executing. Click on each step to see additional details such as the output, logs and additional information. Typically this pipeline should take about 10 minutes to complete. 

You can check its outputs by calling the `describe()` operation.

In [None]:
execution.describe()

## Conclusions

Congratulations! You have now created a complete training pipeline for your Redshift data! 

A few things you can do now:

- schedule the execution of this pipeline with Amazon Eventbridge Rules - [Link](https://docs.aws.amazon.com/sagemaker/latest/dg/automating-sagemaker-with-eventbridge.html)
- create a new scheduled SageMaker Pipeline for inference with the [TransformStep](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-transform)
- use the model you've created to update an existing real-time endpoint [manually](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_UpdateEndpoint.html) or as part of a [SageMaker Project](https://docs.aws.amazon.com/sagemaker/latest/dg/sagemaker-projects-whatis.html)

If you want some additional notebooks to play with you can check:

- how to use the RedShift Data API from within a SageMaker Notebook - [Link](extra-content/data-api-discovery.ipynb)
- how to integrate the Redshift Data API in a Lambda function to have more granular control, and add this step to a SageMaker Pipeline - [Link](extra-content/pipeline.ipynb) 