# Setup for Pipelines lab

In [None]:
import sagemaker

In [None]:
%store -r bucket
%store -r prefix
%store -r region
%store -r docker_image_name

In [None]:
s3uri_code = f"s3://{bucket}/{prefix}/code"
s3uri_code

---

# Create and upload python scripts (created in the labs) to S3 

## (if you have ran some of the labs but not all, just pick the parts you skipped)

### 1-DataPrep

In [None]:
%%writefile preprocess.py
"""Feature engineers the customer churn dataset."""
import argparse
import logging
import pathlib

import boto3
import numpy as np
import pandas as pd

logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())

if __name__ == "__main__":
    logger.info("Starting preprocessing.")
    parser = argparse.ArgumentParser()
    parser.add_argument("--input-data", type=str, required=True)
    args = parser.parse_args()

    base_dir = "/opt/ml/processing"
    pathlib.Path(f"{base_dir}/data").mkdir(parents=True, exist_ok=True)
    input_data = args.input_data
    print(input_data)
    bucket = input_data.split("/")[2]
    key = "/".join(input_data.split("/")[3:])

    logger.info("Downloading data from bucket: %s, key: %s", bucket, key)
    fn = f"{base_dir}/data/raw-data.csv"
    s3 = boto3.resource("s3")
    s3.Bucket(bucket).download_file(key, fn)

    logger.info("Reading downloaded data.")

    # read in csv
    df = pd.read_csv(fn)

    # drop the "Phone" feature column
    df = df.drop(["Phone"], axis=1)

    # Change the data type of "Area Code"
    df["Area Code"] = df["Area Code"].astype(object)

    # Drop several other columns
    df = df.drop(["Day Charge", "Eve Charge", "Night Charge", "Intl Charge"], axis=1)

    # Convert categorical variables into dummy/indicator variables.
    model_data = pd.get_dummies(df)

    # Create one binary classification target column
    model_data = pd.concat(
        [
            model_data["Churn?_True."],
            model_data.drop(["Churn?_False.", "Churn?_True."], axis=1),
        ],
        axis=1,
    )

    # Split the data
    train_data, validation_data, test_data = np.split(
        model_data.sample(frac=1, random_state=1729),
        [int(0.7 * len(model_data)), int(0.9 * len(model_data))],
    )

    pd.DataFrame(train_data).to_csv(
        f"{base_dir}/train/train.csv", header=False, index=False
    )
    pd.DataFrame(validation_data).to_csv(
        f"{base_dir}/validation/validation.csv", header=False, index=False
    )
    pd.DataFrame(test_data).to_csv(
        f"{base_dir}/test/test.csv", header=False, index=False
    )

In [None]:
s3_dataprep_code_uri = sagemaker.s3.S3Uploader.upload("preprocess.py", s3uri_code)
%store s3_dataprep_code_uri
s3_dataprep_code_uri

---
### 2-Modeling

In [None]:
%%writefile xgboost_customer_churn.py
import argparse
import json
import os
import pickle
import random
import tempfile
import urllib.request

import xgboost
from smdebug import SaveConfig
from smdebug.xgboost import Hook


def parse_args():

    parser = argparse.ArgumentParser()

    parser.add_argument("--max_depth", type=int, default=5)
    parser.add_argument("--eta", type=float, default=0.2)
    parser.add_argument("--gamma", type=int, default=4)
    parser.add_argument("--min_child_weight", type=int, default=6)
    parser.add_argument("--subsample", type=float, default=0.8)
    parser.add_argument("--verbosity", type=int, default=0)
    parser.add_argument("--objective", type=str, default="binary:logistic")
    parser.add_argument("--num_round", type=int, default=50)
    parser.add_argument("--smdebug_path", type=str, default=None)
    parser.add_argument("--smdebug_frequency", type=int, default=1)
    parser.add_argument("--smdebug_collections", type=str, default='metrics')
    parser.add_argument("--output_uri", type=str, default="/opt/ml/output/tensors",
                        help="S3 URI of the bucket where tensor data will be stored.")

    parser.add_argument('--train', type=str, default=os.environ.get('SM_CHANNEL_TRAIN'))
    parser.add_argument('--validation', type=str, default=os.environ.get('SM_CHANNEL_VALIDATION'))
    parser.add_argument('--model-dir', type=str, default=os.environ['SM_MODEL_DIR'])
    
    args = parser.parse_args()

    return args


def create_smdebug_hook(out_dir, train_data=None, validation_data=None, frequency=1, collections=None,):

    save_config = SaveConfig(save_interval=frequency)
    hook = Hook(
        out_dir=out_dir,
        train_data=train_data,
        validation_data=validation_data,
        save_config=save_config,
        include_collections=collections,
    )

    return hook


def main():
    
    args = parse_args()

    train, validation = args.train, args.validation
    parse_csv = "?format=csv&label_column=0"
    dtrain = xgboost.DMatrix(train+parse_csv)
    dval = xgboost.DMatrix(validation+parse_csv)

    watchlist = [(dtrain, "train"), (dval, "validation")]

    params = {
        "max_depth": args.max_depth,
        "eta": args.eta,
        "gamma": args.gamma,
        "min_child_weight": args.min_child_weight,
        "subsample": args.subsample,
        "verbosity": args.verbosity,
        "objective": args.objective}

    # The output_uri is a the URI for the s3 bucket where the metrics will be
    # saved.
    output_uri = (
        args.smdebug_path
        if args.smdebug_path is not None
        else args.output_uri
    )

    collections = (
        args.smdebug_collections.split(',')
        if args.smdebug_collections is not None
        else None
    )

    hook = create_smdebug_hook(
        out_dir=output_uri,
        frequency=args.smdebug_frequency,
        collections=collections,
        train_data=dtrain,
        validation_data=dval,
    )

    bst = xgboost.train(
        params=params,
        dtrain=dtrain,
        evals=watchlist,
        num_boost_round=args.num_round,
        callbacks=[hook])
    
    if not os.path.exists(args.model_dir):
        os.makedirs(args.model_dir)

    model_location = os.path.join(args.model_dir, 'xgboost-model')
    pickle.dump(bst, open(model_location, 'wb'))


if __name__ == "__main__":

    main()


def model_fn(model_dir):
    """Load a model. For XGBoost Framework, a default function to load a model is not provided.
    Users should provide customized model_fn() in script.
    Args:
        model_dir: a directory where model is saved.
    Returns:
        A XGBoost model.
        XGBoost model format type.
    """
    model_files = (file for file in os.listdir(model_dir) if os.path.isfile(os.path.join(model_dir, file)))
    model_file = next(model_files)
    try:
        booster = pickle.load(open(os.path.join(model_dir, model_file), 'rb'))
        format = 'pkl_format'
    except Exception as exp_pkl:
        try:
            booster = xgboost.Booster()
            booster.load_model(os.path.join(model_dir, model_file))
            format = 'xgb_format'
        except Exception as exp_xgb:
            raise ModelLoadInferenceError("Unable to load model: {} {}".format(str(exp_pkl), str(exp_xgb)))
    booster.set_param('nthread', 1)
    return booster, format


In [None]:
!tar -czf sourcedir.tar.gz xgboost_customer_churn.py

In [None]:
train_script_name = "xgboost_customer_churn.py"
script_artifact = "sourcedir.tar.gz"
s3_modeling_code_uri = sagemaker.s3.S3Uploader.upload(script_artifact, s3uri_code)

%store train_script_name
%store s3_modeling_code_uri

train_script_name, s3_modeling_code_uri

---
### 3-Evaluation

In [None]:
%%writefile evaluate.py
"""Evaluation script for measuring model accuracy."""

import json
import os
import tarfile
import logging
import pickle

import pandas as pd
import xgboost

logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())

# May need to import additional metrics depending on what you are measuring.
# See https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-model-quality-metrics.html
from sklearn.metrics import classification_report, roc_auc_score, accuracy_score

def get_dataset(dir_path, dataset_name) -> pd.DataFrame:
    files = [ os.path.join(dir_path, file) for file in os.listdir(dir_path) ]
    if len(files) == 0:
        raise ValueError(('There are no files in {}.\n' +
                          'This usually indicates that the channel ({}) was incorrectly specified,\n' +
                          'the data specification in S3 was incorrectly specified or the role specified\n' +
                          'does not have permission to access the data.').format(files, dataset_name))
    raw_data = [ pd.read_csv(file, header=None) for file in files ]
    df = pd.concat(raw_data)
    return df

if __name__ == "__main__":
    model_path = "/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path) as tar:
        tar.extractall(path="..")

    logger.debug("Loading xgboost model.")
    model = pickle.load(open("xgboost-model", "rb"))

    logger.info("Loading test input data")
    test_path = "/opt/ml/processing/test"
    df = get_dataset(test_path, "test_set")

    logger.debug("Reading test data.")
    y_test = df.iloc[:, 0].to_numpy()
    df.drop(df.columns[0], axis=1, inplace=True)
    X_test = xgboost.DMatrix(df.values)

    logger.info("Performing predictions against test data.")
    predictions_probs = model.predict(X_test)
    predictions = predictions_probs.round()

    logger.info("Creating classification evaluation report")
    acc = accuracy_score(y_test, predictions)
    auc = roc_auc_score(y_test, predictions_probs)

    # The metrics reported can change based on the model used, but it must be a specific name per (https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-model-quality-metrics.html)
    report_dict = {
        "binary_classification_metrics": {
            "accuracy": {
                "value": acc,
                "standard_deviation": "NaN",
            },
            "auc": {"value": auc, "standard_deviation": "NaN"},
        },
    }

    logger.info("Classification report:\n{}".format(report_dict))

    evaluation_output_path = os.path.join(
        "/opt/ml/processing/evaluation", "evaluation.json"
    )
    logger.info("Saving classification report to {}".format(evaluation_output_path))

    with open(evaluation_output_path, "w") as f:
        f.write(json.dumps(report_dict))


In [None]:
s3_evaluation_code_uri = sagemaker.s3.S3Uploader.upload("evaluate.py", s3uri_code)

%store s3_evaluation_code_uri
s3_evaluation_code_uri

---
# Create functions to get processors and estimators (created in the labs)

## (if you have ran some of the labs but not all, just pick the parts you skipped)

### 1-DataPrep

In [None]:
%%writefile ../my_labs_solutions/dataprep_solution.py
# DataPrep
import sagemaker
from sagemaker.sklearn.processing import SKLearnProcessor

def get_dataprep_processor(
    processing_instance_type,
    processing_instance_count,
    role,
    base_job_prefix="CustomerChurn"
) -> SKLearnProcessor:
    
    sm_sess = sagemaker.session.Session()
    
    # Processing step for feature engineering
    sklearn_processor = SKLearnProcessor(
        framework_version="0.23-1",
        instance_type=processing_instance_type,
        instance_count=processing_instance_count,
        base_job_name=f"{base_job_prefix}/sklearn-CustomerChurn-preprocess",  # choose any name
        sagemaker_session=sm_sess,
        role=role,
    )
    return sklearn_processor

### 2-Modeling

In [None]:
%%writefile ../my_labs_solutions/modeling_solution.py
# Modeling
import boto3
import sagemaker
from sagemaker.inputs import TrainingInput

from sagemaker.debugger import rule_configs, Rule, DebuggerHookConfig

def get_modeling_estimator(bucket,
                           prefix,
                           s3_modeling_code_uri,
                           docker_image_name,
                           role,
                           entry_point_script = 'xgboost_customer_churn.py') -> sagemaker.estimator.Estimator:
    
    sm_sess = sagemaker.session.Session()

    # Input configs
    hyperparams = {"sagemaker_program": entry_point_script,
                   "sagemaker_submit_directory": s3_modeling_code_uri,
                   "max_depth": 5,
                   "subsample": 0.8,
                   "num_round": 600,
                   "eta": 0.2,
                   "gamma": 4,
                   "min_child_weight": 6,
                   "objective": 'binary:logistic',
                   "verbosity": 0
                  }

    # Debugger configs
    debug_rules = [
        Rule.sagemaker(rule_configs.loss_not_decreasing()),
        Rule.sagemaker(rule_configs.overtraining()),
        Rule.sagemaker(rule_configs.overfit())
    ]

    # Estimator configs
    xgb = sagemaker.estimator.Estimator(image_uri=docker_image_name,
                                        role=role,
                                        hyperparameters=hyperparams,
                                        instance_count=1, 
                                        instance_type='ml.m4.xlarge',
                                        output_path=f's3://{bucket}/{prefix}/output',
                                        base_job_name='pipeline-xgboost-customer-churn',
                                        sagemaker_session=sm_sess,
                                        rules=debug_rules)
    
    return xgb


### 3-Evaluation

In [None]:
%%writefile ../my_labs_solutions/evaluation_solution.py
# Evaluation
import sagemaker
from sagemaker.processing import (
    ProcessingInput,
    ProcessingOutput,
    ScriptProcessor,
)

def get_evaluation_processor(docker_image_name, role) -> ScriptProcessor:
    
    sm_sess = sagemaker.session.Session()

    # Processing step for evaluation
    processor = ScriptProcessor(
        image_uri=docker_image_name,
        command=["python3"],
        instance_type="ml.m5.xlarge",
        instance_count=1,
        base_job_name="CustomerChurn/eval-script",
        sagemaker_session=sm_sess,
        role=role,
    )
    
    return processor


Save vars for later:

In [None]:
%store -r s3uri_raw

In [None]:
role = sagemaker.get_execution_role()
role

In [None]:
import json

my_vars = {
    "bucket": bucket, 
    "prefix": prefix, 
    "region": region, 
    "docker_image_name": docker_image_name,
    "s3uri_raw": s3uri_raw, 
    "s3_dataprep_code_uri": s3_dataprep_code_uri,
    "s3_modeling_code_uri": s3_modeling_code_uri,
    "train_script_name": train_script_name,
    "s3_evaluation_code_uri": s3_evaluation_code_uri,
    "role": role
    }

with open("../my_labs_solutions/my-solution-vars.json", "w") as f:
    f.write(json.dumps(my_vars))


### [You can now go back to the main notebook for this lab](../pipelines.ipynb)