# SageMaker Processing Job Image URI Restriction

This example notebook can be used for testing the solution provided.

In order to use this notebook, you can crate the CloudFormation Stack by providing in the `ProcessingContainers` parameter the following value:

**ProcessingContainers**: sagemaker-scikit-learn:0.23-1-cpu-py3

In this way, we are avoiding the usage of the SageMaker Container for SKLearn v0.23-1.

## Dataset

We are using a subset of ~20000 records of synthetic transactions, each of which is labeled as fraudulent or not fraudulent.
We'd like to train a model based on the features of these transactions so that we can predict risky or fraudulent transactions in the future.

This is a binary classification problem:

* 1 - Fraud
* 0 - No Fraud

In [None]:
! rm -rf ./data && mkdir -p ./data

! aws s3 cp s3://sagemaker-sample-files/datasets/tabular/synthetic_credit_card_transactions/user0_credit_card_transactions.csv ./data/data.csv

***

## Prerequisites

Install the latest version of the SageMaker Python SDK

In [None]:
! pip install 'sagemaker' --upgrade

***

## Part 1/3 - Setup

Here we'll import some libraries and define some variables.

In [None]:
import boto3
import json
import logging
import sagemaker
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.sklearn.processing import SKLearnProcessor

In [None]:
logging.basicConfig(level=logging.INFO)
LOGGER = logging.getLogger(__name__)

In [None]:
s3_client = boto3.client("s3")

In [None]:
sagemaker_session = sagemaker.Session()
region = boto3.session.Session().region_name
role = sagemaker.get_execution_role()

bucket_name = sagemaker_session.default_bucket()

### Upload Dataset in the Default Amazon S3 Bucket

In order to make the data available, we are uploading the downloaded dataset into the default S3 bucket

In [None]:
s3_client.delete_object(Bucket=bucket_name, Key="sg-container-restriction/data/input")

input_data = sagemaker_session.upload_data(
 "./data/data.csv", key_prefix="sg-container-restriction/data/input"
)

input_data

***

### Create Processing Script

We are creating the file `processing.py` for using it in the SageMaker Processing Job

In [None]:
%%writefile processing.py

import argparse
import csv
import logging
import numpy as np
import os
from os import listdir
from os.path import isfile, join
import pandas as pd
from sklearn.model_selection import train_test_split
import traceback

logging.basicConfig(level=logging.INFO)
LOGGER = logging.getLogger(__name__)

BASE_PATH = os.path.join("/", "opt", "ml")
PROCESSING_PATH = os.path.join(BASE_PATH, "processing")
PROCESSING_PATH_INPUT = os.path.join(PROCESSING_PATH, "input")
PROCESSING_PATH_OUTPUT = os.path.join(PROCESSING_PATH, "output")

def extract_data(file_path, percentage=100):
 try:
 files = [f for f in listdir(file_path) if isfile(join(file_path, f)) and f.endswith(".csv")]
 LOGGER.info("{}".format(files))

 frames = []

 for file in files:
 df = pd.read_csv(
 os.path.join(file_path, file),
 sep=",",
 quotechar='"',
 quoting=csv.QUOTE_ALL,
 escapechar='\\',
 encoding='utf-8',
 error_bad_lines=False
 )

 df = df.head(int(len(df) * (percentage / 100)))

 frames.append(df)

 df = pd.concat(frames)

 return df
 except Exception as e:
 stacktrace = traceback.format_exc()
 LOGGER.error("{}".format(stacktrace))

 raise e

def load_data(df, file_path, file_name):
 try:
 if not os.path.exists(file_path):
 os.makedirs(file_path)

 path = os.path.join(file_path, file_name + ".csv")

 LOGGER.info("Saving file in {}".format(path))

 df.to_csv(
 path,
 index=False,
 header=True,
 quoting=csv.QUOTE_ALL,
 encoding="utf-8",
 escapechar="\\",
 sep=","
 )
 except Exception as e:
 stacktrace = traceback.format_exc()
 LOGGER.error("{}".format(stacktrace))

 raise e

def transform_data(df):
 try:
 df = df[df['Is Fraud?'].notna()]

 df.insert(0, 'ID', range(1, len(df) + 1))

 df["Errors?"].fillna('', inplace=True)
 df['Errors?'] = df['Errors?'].map(lambda x: x.strip())
 df["Errors?"] = df["Errors?"].map({
 "Insufficient Balance": 0,
 "Technical Glitch": 1,
 "Bad PIN": 2,
 "Bad Expiration": 3,
 "Bad Card Number": 4,
 "Bad CVV": 5,
 "Bad PIN,Insufficient Balance": 6,
 "Bad PIN,Technical Glitch": 7,
 "": 8
 })

 df["Use Chip"].fillna('', inplace=True)
 df['Use Chip'] = df['Use Chip'].map(lambda x: x.strip())
 df["Use Chip"] = df["Use Chip"].map({
 "Swipe Transaction": 0,
 "Chip Transaction": 1,
 "Online Transaction": 2
 })

 df['Is Fraud?'] = df['Is Fraud?'].map(lambda x: x.replace("'", ""))
 df['Is Fraud?'] = df['Is Fraud?'].map(lambda x: x.strip())
 df['Is Fraud?'] = df['Is Fraud?'].replace('', np.nan)
 df['Is Fraud?'] = df['Is Fraud?'].replace(' ', np.nan)

 df["Is Fraud?"] = df["Is Fraud?"].map({"No": 0, "Yes": 1})

 df = df.rename(
 columns={'Card': 'card', 'MCC': 'mcc', "Errors?": "errors", "Use Chip": "use_chip", "Is Fraud?": "labels"})

 df = df[["card", "mcc", "errors", "use_chip", "labels"]]

 return df

 except Exception as e:
 stacktrace = traceback.format_exc()
 LOGGER.error("{}".format(stacktrace))

 raise e

if __name__ == '__main__':
 parser = argparse.ArgumentParser()
 parser.add_argument("--dataset-percentage", type=int, required=False, default=100)
 args = parser.parse_args()

 LOGGER.info("Arguments: {}".format(args))

 df = extract_data(PROCESSING_PATH_INPUT, args.dataset_percentage)

 df = transform_data(df)

 data_train, data_test = train_test_split(df, test_size=0.2, shuffle=True)

 load_data(data_train, os.path.join(PROCESSING_PATH_OUTPUT, "train"), "train")
 load_data(data_test, os.path.join(PROCESSING_PATH_OUTPUT, "test"), "test")

### Global Parameters

In this section, we are defining the parameters for the SageMaker Estimator. As framework-version, we use the PyTorch v1.12 and check that the SageMaker Training Job can be executed

In [None]:
processing_input_files_path = "sg-container-restriction/data/input"
processing_output_files_path = "sg-container-restriction/data/output"
processing_framework_version = "1.0-1"

processing_instance_count = 1
processing_instance_type = "ml.t3.large"

### SageMaker Processing Job

In [None]:
processor = SKLearnProcessor(
 framework_version=processing_framework_version,
 role=role,
 instance_count=processing_instance_count,
 instance_type=processing_instance_type
)

In [None]:
processor.run(
 code="./processing.py",
 inputs=[
 ProcessingInput(
 source="s3://{}/{}".format(bucket_name, processing_input_files_path),
 destination="/opt/ml/processing/input",
 )
 ],
 outputs=[
 ProcessingOutput(
 output_name="output",
 source="/opt/ml/processing/output",
 destination="s3://{}/{}".format(bucket_name, processing_output_files_path),
 )
 ]
)

***

## Change Framework Version

If we change the SKLearn version to 0.23-1, the expected result is that the provided solution will automatically stop the SageMaker Job, since it is using a version in the provided black list

In [None]:
processing_framework_version = "0.23-1"

In [None]:
processor = SKLearnProcessor(
 framework_version=processing_framework_version,
 role=role,
 instance_count=processing_instance_count,
 instance_type=processing_instance_type
)

In [None]:
processor.run(
 code="./processing.py",
 inputs=[
 ProcessingInput(
 source="s3://{}/{}".format(bucket_name, processing_input_files_path),
 destination="/opt/ml/processing/input",
 )
 ],
 outputs=[
 ProcessingOutput(
 output_name="output",
 source="/opt/ml/processing/output",
 destination="s3://{}/{}".format(bucket_name, processing_output_files_path),
 )
 ]
)