#!/usr/bin/env python3 # Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: LicenseRef-.amazon.com.-AmznSL-1.0 # Licensed under the Amazon Software License http://aws.amazon.com/asl/ # Based on original Copyright 2018 H2O.ai. # SPDX-License-Identifier: Apache-2.0 from __future__ import print_function import json import os import sys import traceback import time import socket from collections import Counter import h2o import helper_functions from h2o.estimators.gbm import H2OGradientBoostingEstimator def _connect_to_cluster(): print("Creating Connection to H2O-3") h2o_launched = False i = 0 h2o.init() while h2o_launched is False: try: s = socket.socket() s.connect(("127.0.0.1", 54321)) h2o_launched = True except Exception as e: time.sleep(6) if i % 5 == 0: print("Attempt {}: H2O-3 not running yet...".format(i)) if i > 30: raise Exception( """Could not connect to H2O Cluster in {} attempts Last Error: {}""".format( i, e ) ) i += 1 finally: s.close() def _get_data(channel_name): prefix = "/opt/ml/" input_path = prefix + "input/data" training_path = os.path.join(input_path, channel_name) data_files = [ os.path.join(training_path, filename) for filename in os.listdir(training_path) if not filename.startswith(".") ] if len(data_files) == 0: raise ValueError( ( "There are no files in {}.\n" + "This usually indicates that the channel ({}) " "was incorrectly specified,\n" + "the data specification in S3 was incorrectly " "specified or the role specified\n" + "does not have permission to access the " "data." ).format(training_path, channel_name) ) elif len(data_files) == 1: import_data = h2o.import_file(path=data_files[0]) else: prefix = os.path.commonprefix(data_files) suffix_counter = Counter() for filename in data_files: suffix_counter[filename.split(".")[-1]] += 1 suffix = suffix_counter.most_common(1)[0][0] import_data = h2o.import_file(path=training_path, pattern="{}.*\{}".format(prefix, suffix)) return import_data def _train_model(hyperparameters={}, resource_params={}): training_params_str, gbm_params = helper_functions._parse_hyperparameters(hyperparameters) training_params = json.loads(training_params_str) prefix = "/opt/ml/" output_path = os.path.join(prefix, "output") model_path = os.path.join(prefix, "model") gbm_params["distribution"] = training_params["distribution"] print("Beginning Model Training") try: response_label = training_params.get("target") categorical_columns = training_params.get("categorical_columns", []).split(",") ignored_columns = training_params.get("ignored_columns", []).split(",") train_data = _get_data("training") validation_data = _get_data("validation") if len(ignored_columns) > 0: print("Ignored columns for model training are:") print(",".join(ignored_columns)) train_data = train_data.drop(index=ignored_columns) validation_data = validation_data.drop(index=ignored_columns) X = train_data.columns y = response_label # We don't want the target column present in the training try: X.remove(y) except ValueError: raise ValueError( 'Incorrect target - column "%s" does not exist in the data!' % response_label ) print( "Setting Response Column to Categorical based on family: {}".format( training_params.get("distribution", "AUTO") ) ) if training_params.get("distribution", "AUTO") in ["bernoulli", "multinomial"]: print( "Family is {}: setting target value to categorical".format( training_params.get("distribution", "AUTO") ) ) train_data[y] = train_data[y].asfactor() validation_data[y] = validation_data[y].asfactor() else: print( "Family is {}: Value can be continous. Nothing to do".format( training_params.get("family", "AUTO") ) ) print("Converting specified columns to categorical values:") print(categorical_columns) for col in categorical_columns: train_data[col] = train_data[col].asfactor() validation_data[col] = validation_data[col].asfactor() for i, v in gbm_params.items(): if i in { "ntrees", "min_rows", "max_depth", "stopping_rounds", "score_tree_interval", "seed", }: gbm_params[i] = int(v) elif i in {"learn_rate", "stopping_tolerance", "sample_rate", "col_sample_rate"}: gbm_params[i] = float(v) elif i in {"balance_classes", "calibrate_model"}: gbm_params[i] = bool(v) if gbm_params.get("calibrate_model"): print("Splitting Training data from model calibration...") train_data, calib = train_data.split_frame(ratios=[0.9], seed=1) gbm_params["calibration_frame"] = calib else: print("Model calibration is set as false") gbm_params["calibrate_model"] = False # Train H2O model using GBM Estimator gbm_model = H2OGradientBoostingEstimator(**gbm_params) gbm_model.train(x=X, y=y, training_frame=train_data, validation_frame=validation_data) print("Training is completed.") # Export H2O Model Artifact model_type = gbm_params.get("model_type", "MOJO") if model_type == "MOJO": gbm_model.download_mojo(model_path) print("MOJO Model Path with Name: {}".format(model_path)) else: model_path = h2o.save_model(gbm_model, path=model_path) print("Binary Model Path with Name: {}".format(model_path)) print("Model artifacts are exported.") print(gbm_model.model_performance()) except Exception as e: # Write out an error file. This will be returned as the failureReason # in the DescribeTrainingJob result. trc = traceback.format_exc() with open(os.path.join(output_path, "failure"), "w") as s: s.write("Exception during training: " + str(e) + "\n" + trc) # Printing this causes the exception to be in the training job logs print("Exception during training: " + str(e) + "\n" + trc, file=sys.stderr) # A non-zero exit code causes the training job to be marked as Failed. sys.exit(255) def main(): hyperparameters, resource_params = helper_functions._get_parameters() helper_functions._create_h2o_cluster(resource_params) if resource_params["current_host"] == resource_params["hosts"][0]: _connect_to_cluster() _train_model(hyperparameters, resource_params) if __name__ == "__main__": main() # A zero exit code causes the job to be marked a Succeeded. sys.exit(0)