from __future__ import print_function import argparse import csv import json import os import shutil import sys import time from io import StringIO import numpy as np import pandas as pd from sagemaker_containers.beta.framework import ( content_types, encoders, env, modules, transformer, worker, ) from sklearn.compose import ColumnTransformer from sklearn.externals import joblib from sklearn.impute import SimpleImputer from sklearn.pipeline import Pipeline from sklearn.preprocessing import Binarizer, OneHotEncoder, StandardScaler # Since we get a headerless CSV file we specify the column names here. feature_columns_names = [ "YEAR_BUILT", "SQUARE_FEET", "NUM_BEDROOMS", "NUM_BATHROOMS", "LOT_ACRES", "GARAGE_SPACES", "FRONT_PORCH", "DECK", ] label_column = "PRICE" feature_columns_dtype = { "YEAR_BUILT": str, "SQUARE_FEET": np.float64, "NUM_BEDROOMS": np.float64, "NUM_BATHROOMS": np.float64, "LOT_ACRES": np.float64, "GARAGE_SPACES": np.float64, "FRONT_PORCH": str, "DECK": str, } label_column_dtype = {"PRICE": np.float64} if __name__ == "__main__": parser = argparse.ArgumentParser() # Sagemaker specific arguments. Defaults are set in the environment variables. parser.add_argument("--output-data-dir", type=str, default=os.environ["SM_OUTPUT_DATA_DIR"]) parser.add_argument("--model-dir", type=str, default=os.environ["SM_MODEL_DIR"]) parser.add_argument("--train", type=str, default=os.environ["SM_CHANNEL_TRAIN"]) args = parser.parse_args() # Take the set of files and read them all into a single pandas dataframe input_files = [os.path.join(args.train, file) for file in os.listdir(args.train)] if len(input_files) == 0: raise ValueError( ( "There are no files in {}.\n" + "This usually indicates that the train channel was incorrectly specified,\n" + "the data specification in S3 was incorrectly specified or the role specified\n" + "does not have permission to access the data.".format(args.train) ) ) for file in input_files: print("file :", file) raw_data = [pd.read_csv(file, header=None, names=feature_columns_names + [label_column])] concat_data = pd.concat(raw_data) print(concat_data) # This section is adapted from the scikit-learn example of using preprocessing pipelines: # # https://scikit-learn.org/stable/auto_examples/compose/plot_column_transformer_mixed_types.html # numeric_features = list(feature_columns_names) numeric_features.remove("FRONT_PORCH") numeric_features.remove("DECK") numeric_transformer = Pipeline(steps=[("scaler", StandardScaler())]) categorical_features = ["FRONT_PORCH", "DECK"] categorical_transformer = Pipeline(steps=[("onehot", OneHotEncoder(handle_unknown="ignore"))]) preprocessor = ColumnTransformer( transformers=[ ("num", numeric_transformer, numeric_features), ("cat", categorical_transformer, categorical_features), ], remainder="drop", ) preprocessor.fit(concat_data) joblib.dump(preprocessor, os.path.join(args.model_dir, "model.joblib")) print("saved model!") def input_fn(input_data, content_type): """Parse input data payload We currently only take csv input. Since we need to process both labelled and unlabelled data we first determine whether the label column is present by looking at how many columns were provided. """ if content_type == "text/csv": # Read the raw input data as CSV. df = pd.read_csv(StringIO(input_data), header=None) if len(df.columns) == len(feature_columns_names) + 1: # This is a labelled example, includes the ring label df.columns = feature_columns_names + [label_column] elif len(df.columns) == len(feature_columns_names): # This is an unlabelled example. df.columns = feature_columns_names return df else: raise ValueError("{} not supported by script!".format(content_type)) def output_fn(prediction, accept): """Format prediction output The default accept/content-type between containers for serial inference is JSON. We also want to set the ContentType or mimetype as the same value as accept so the next container can read the response payload correctly. """ if accept == "application/json": instances = [] for row in prediction.tolist(): instances.append({"features": row}) json_output = {"instances": instances} return worker.Response(json.dumps(json_output), mimetype=accept) elif accept == "text/csv": return worker.Response(encoders.encode(prediction, accept), mimetype=accept) else: raise RuntimeException("{} accept type is not supported by this script.".format(accept)) def predict_fn(input_data, model): """Preprocess input data We implement this because the default uses .predict(), but our model is a preprocessor so we want to use .transform(). The output is returned in the following order: rest of features either one hot encoded or standardized """ print("Input data type ", type(input_data)) print(input_data) features = model.transform(input_data) print("features type ", type(features)) print(features) features_array = features print("features_array ", type(features_array)) print(features_array) if label_column in input_data: # Return the label (as the first column) and the set of features. return np.insert(features_array, 0, input_data[label_column], axis=1) else: # Return only the set of features return features def model_fn(model_dir): """Deserialize fitted model""" preprocessor = joblib.load(os.path.join(model_dir, "model.joblib")) return preprocessor