# Amazon SageMaker Processing - automation with Amazon SageMaker Pipelines

Amazon SageMaker Processing allows you to run steps for data pre- or post-processing, feature engineering, data validation, or model evaluation workloads on Amazon SageMaker. Processing jobs accept data from Amazon S3 as input and store data into Amazon S3 as output.

![processing](https://sagemaker.readthedocs.io/en/stable/_images/amazon_sagemaker_processing_image1.png)

Here, we'll import the dataset and transform it with SageMaker Processing, which can be used to process terabytes of data in a SageMaker-managed cluster separate from the instance running your notebook server. In a typical SageMaker workflow, notebooks are only used for prototyping and can be run on relatively inexpensive and less powerful instances, while processing, training and model hosting tasks are run on separate, more powerful SageMaker-managed instances.  SageMaker Processing includes off-the-shelf support for Scikit-learn, as well as a Bring Your Own Container option, so it can be used with many different data transformation technologies and tasks.    

### Step 0 - Set up

Let's setup some useful stuff for our use case. 

First we'll load the Boston Housing dataset, save the raw feature data and upload it to Amazon S3 for transformation by SageMaker Processing.  We'll also save the labels for training and testing.

In [1]:
import os
import boto3
import sagemaker

sess = sagemaker.session.Session()
bucket = sess.default_bucket() 
region = boto3.Session().region_name

data_dir = os.path.join(os.getcwd(), 'data')
os.makedirs(data_dir, exist_ok=True)

train_dir = os.path.join(os.getcwd(), 'data/train')
os.makedirs(train_dir, exist_ok=True)

test_dir = os.path.join(os.getcwd(), 'data/test')
os.makedirs(test_dir, exist_ok=True)

raw_dir = os.path.join(os.getcwd(), 'data/raw')
os.makedirs(raw_dir, exist_ok=True)


In [2]:
import numpy as np
from sklearn.datasets import load_boston
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split

data = load_boston()
X, y = data['data'], data['target']
x_train, x_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

np.save(os.path.join(raw_dir, 'x_train.npy'), x_train)
np.save(os.path.join(raw_dir, 'x_test.npy'), x_test)
np.save(os.path.join(raw_dir, 'y_train.npy'), y_train)
np.save(os.path.join(raw_dir, 'y_test.npy'), y_test)
s3_prefix = 'scheduled-processing'
rawdata_s3_prefix = '{}/data/raw'.format(s3_prefix)
raw_s3 = sess.upload_data(path='./data/raw/', key_prefix=rawdata_s3_prefix)
print(raw_s3)


  return f(*args, **kwds)
  return f(*args, **kwds)
  return f(*args, **kwds)
  return f(*args, **kwds)


s3://sagemaker-eu-west-1-859755744029/scheduled-processing/data/raw


To use SageMaker Processing, simply supply a Python data preprocessing script as shown below.  For this example, we're using a SageMaker prebuilt Scikit-learn container, which includes many common functions for processing data.  There are few limitations on what kinds of code and operations you can run, and only a minimal contract:  input and output data must be placed in specified directories.  If this is done, SageMaker Processing automatically loads the input data from S3 and uploads transformed data back to S3 when the job is complete.

In [3]:
%%writefile preprocessing.py

import glob
import numpy as np
import os
from sklearn.preprocessing import StandardScaler

if __name__=='__main__':
    
    input_files = glob.glob('{}/*.npy'.format('/opt/ml/processing/input'))
    print('\nINPUT FILE LIST: \n{}\n'.format(input_files))
    scaler = StandardScaler()
    x_train = np.load(os.path.join('/opt/ml/processing/input', 'x_train.npy'))
    scaler.fit(x_train)
    for file in input_files:
        raw = np.load(file)
        # only transform feature columns
        if 'y_' not in file:
            transformed = scaler.transform(raw)
        if 'train' in file:
            if 'y_' in file:
                output_path = os.path.join('/opt/ml/processing/train', 'y_train.npy')
                np.save(output_path, raw)
                print('SAVED LABEL TRAINING DATA FILE\n')
            else:
                output_path = os.path.join('/opt/ml/processing/train', 'x_train.npy')
                np.save(output_path, transformed)
                print('SAVED TRANSFORMED TRAINING DATA FILE\n')
        else:
            if 'y_' in file:
                output_path = os.path.join('/opt/ml/processing/test', 'y_test.npy')
                np.save(output_path, raw)
                print('SAVED LABEL TEST DATA FILE\n')
            else:
                output_path = os.path.join('/opt/ml/processing/test', 'x_test.npy')
                np.save(output_path, transformed)
                print('SAVED TRANSFORMED TEST DATA FILE\n')

Overwriting preprocessing.py


Finally, we need some parameters for the pipeline. 

You can introduce variables into your pipeline definition using parameters. Parameters that you define can be referenced throughout your pipeline definition. Parameters have a default value, which you can override by specifying parameter values when starting a pipeline execution. The default value must be an instance matching the parameter type. All parameters used in step definitions must be defined in your pipeline definition. Amazon SageMaker Model Building Pipelines supports the following parameter types:

- `ParameterString` – Representing a str Python type.
- `ParameterInteger` – Representing an int Python type.
- `ParameterFloat` – Representing a float Python type.

The following parameters are defined here in order to use them in the SageMaker `SKLearnProcessor` object, as well as in the pipeline. 

In [4]:
from sagemaker.workflow.parameters import ParameterInteger, ParameterString

## PARAMETERS
processed_s3 = f"s3://{bucket}/{s3_prefix}/data/processed"
# Path to S3
input_data = ParameterString(name="InputData", default_value=raw_s3)
processed_data = ParameterString(name="ProcessedData", default_value=processed_s3)
# processing step parameters
processing_instance_type = ParameterString(name="ProcessingInstanceType", default_value="ml.m5.xlarge")
processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)


### Step 1 - Define the Processor

Before starting the SageMaker Processing job, we instantiate a `SKLearnProcessor` object.  This object allows you to specify the instance type to use in the job, as well as how many instances.  Although the Boston Housing dataset is quite small, we'll use two instances to showcase how easy it is to spin up a cluster for SageMaker Processing.  

In [6]:
import boto3
import sagemaker
from sagemaker.sklearn.processing import SKLearnProcessor

role = sagemaker.get_execution_role()
framework_version = "0.23-1"

sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type=processing_instance_type, #ml.m5.xlarge
    instance_count=processing_instance_count, #1
    base_job_name="scheduled-processing",
    sagemaker_session=sess,
    role=role,
)


### Step 2 - Define the Processing Step in the SageMaker Pipelines

This step in the pipeline will preprocess the data to prepare it for training. We create a `SKLearnProcessor` object similar to the one above, but now parameterized so we can separately track and change the job configuration as needed, for example to increase the instance type size and count to accommodate a growing dataset.

In [7]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

## PROCESSING STEP
step_process = ProcessingStep(
    name="ScheduledProcessing",
    processor=sklearn_processor,
    inputs=[
        ProcessingInput(source=input_data, destination="/opt/ml/processing/input", s3_data_distribution_type='ShardedByS3Key'),
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train", destination=processed_data),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test", destination=processed_data),
    ],
    code="./preprocessing.py",
)


### Step 3 - Create the Pipeline

With all of the pipeline steps now defined, we can define the pipeline itself as a `Pipeline` object comprising a series of those steps. `Parallel` and `Conditional` steps also are possible.

In [8]:
from sagemaker.workflow.pipeline import Pipeline

pipeline_name = "ScheduledProcessingPipeline"

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        input_data,
        processing_instance_type, 
        processing_instance_count,
        processed_data,
    ],
    steps=[
        step_process
    ],
    sagemaker_session=sess
)

We can inspect the pipeline definition in JSON format:

In [9]:
import json

definition = json.loads(pipeline.definition())
definition

{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'InputData',
   'Type': 'String',
   'DefaultValue': 's3://sagemaker-eu-west-1-859755744029/scheduled-processing/data/raw'},
  {'Name': 'ProcessingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m5.xlarge'},
  {'Name': 'ProcessingInstanceCount', 'Type': 'Integer', 'DefaultValue': 1},
  {'Name': 'ProcessedData',
   'Type': 'String',
   'DefaultValue': 's3://sagemaker-eu-west-1-859755744029/scheduled-processing/data/processed'}],
 'Steps': [{'Name': 'ScheduledProcessing',
   'Type': 'Processing',
   'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType': {'Get': 'Parameters.ProcessingInstanceType'},
      'InstanceCount': {'Get': 'Parameters.ProcessingInstanceCount'},
      'VolumeSizeInGB': 30}},
    'AppSpecification': {'ImageUri': '141502667606.dkr.ecr.eu-west-1.amazonaws.com/sagemaker-scikit-learn:0.23-1-cpu-py3',
     'ContainerEntrypoint': ['python3',
      '/opt/ml/processing/input/code/

After upserting its definition, we can start the pipeline with the Pipeline object's `start` method:

In [10]:
pipeline.upsert(role_arn=role)
execution = pipeline.start()

We can now confirm that the pipeline is executing. In the log output below, we wait while `PipelineExecutionStatus` is `Executing`.

In [11]:
from time import sleep

ex_desc = execution.describe()
status = ex_desc['PipelineExecutionStatus']
print(status+ " ", end="")

while status == 'Executing':
    status = execution.describe()['PipelineExecutionStatus']
    print(".", end="")
    sleep(10)
    
print(f"\n{status}")

Executing ........................
Succeeded


You can now visualize the pipeline in SageMaker Studio. Hit the triangle button on the left, then select `Pipelines` in the dropdown menu, then `ScheduledProcessingPipeline`.

![pipelines](images/pipelines.png)
![pipeline-execution](images/pipeline-execution.png)