{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Hosting Models on SageMaker and Automate the Workflow\n", "\n", "In this module you will:\n", "- Host a pretrained SKLearn model on SageMaker\n", "- Enable autoscaling on your endpoint \n", "- Monitor your model\n", "- Perform hyperparameter tuning\n", "- Redploy a new model to the endpoint\n", "- Automate the pipeline using the notebook runner toolkit\n", "\n", "Let's get started! \n", "\n", "---" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 1. Access your model artifact\n", "First, you should see a `model.tar.gz` file in this repository. Let's get that in your S3 bucket." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import sagemaker\n", "import os\n", "\n", "sess = sagemaker.Session()\n", "\n", "# sagemaker will check to make sure this is a valid tar.gz object\n", "local_model_file = 'model.tar.gz'\n", "\n", "bucket = sess.default_bucket()\n", "\n", "prefix = 'model-hosting'\n", "\n", "s3_path = 's3://{}/{}/'.format(bucket, prefix)\n", "\n", "msg = 'aws s3 cp {} {}'.format(local_model_file, s3_path)\n", "\n", "os.system(msg)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 2. Load your pretrained model artifact into SageMaker\n", "Now, we know that this model was trained using the SKLearn container within SageMaker. All we need to do get this into a SageMaker-managed endpoint is set it up as a model. Let's do that here!" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "model_data = '{}{}'.format(s3_path, local_model_file)\n", "print (model_data)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%writefile train.py\n", "\n", "import argparse\n", "import pandas as pd\n", "import numpy as np\n", "import os\n", "\n", "from sklearn.metrics import confusion_matrix\n", "from sklearn.neural_network import MLPClassifier\n", "from sklearn.externals import joblib\n", "\n", "def model_fn(model_dir):\n", " \"\"\"Deserialized and return fitted model\n", "\n", " Note that this should have the same name as the serialized model in the main method\n", " \"\"\"\n", " regr = joblib.load(os.path.join(model_dir, \"model.joblib\"))\n", " return regr\n", "\n", "def predict_fn(input_data, model):\n", " '''return the class and the probability of the class'''\n", " prediction = model.predict(input_data)\n", " pred_prob = model.predict_proba(input_data) #a numpy array\n", " return np.array(pred_prob)\n", "\n", "def parse_args():\n", " \n", " # Hyperparameters are described here. In this simple example we are just including one hyperparameter.\n", "\n", " parser = argparse.ArgumentParser()\n", " \n", " parser.add_argument('--max_leaf_nodes', type=int, default=-1)\n", "\n", " # Sagemaker specific arguments. Defaults are set in the environment variables.\n", " parser.add_argument('--output-data-dir', type=str, default=os.environ['SM_OUTPUT_DATA_DIR'])\n", " parser.add_argument('--model-dir', type=str, default=os.environ['SM_MODEL_DIR'])\n", " parser.add_argument('--train', type=str, default=os.environ['SM_CHANNEL_TRAIN'])\n", " parser.add_argument('--test', type=str, default = os.environ['SM_CHANNEL_TEST'])\n", " \n", " # hyperparameters for tuning\n", " parser.add_argument('--batch-size', type=int, default=256)\n", " parser.add_argument('--lr', type=float, default = 0.001)\n", " \n", " args = parser.parse_args()\n", " \n", " return args\n", "\n", "def train(args):\n", " \n", " # Take the set of files and read them all into a single pandas dataframe\n", " train_data=pd.read_csv(os.path.join(args.train, 'train_set.csv'), engine='python')\n", "\n", " # labels are in the first column\n", " train_y = train_data['truth']\n", " train_X = train_data[train_data.columns[1:len(train_data)]]\n", "\n", " # Now use scikit-learn's MLP Classifier to train the model.\n", "\n", " regr = MLPClassifier(random_state=1, max_iter=500, batch_size = args.batch_size, learning_rate_init = args.lr, solver='lbfgs').fit(train_X, train_y)\n", " regr.get_params()\n", "\n", " # Print the coefficients of the trained classifier, and save the coefficients\n", " joblib.dump(regr, os.path.join(args.model_dir, \"model.joblib\")) \n", " \n", " return regr\n", " \n", "def accuracy(y_pred, y_true):\n", " \n", " cm = confusion_matrix(y_pred, y_true)\n", " \n", " diagonal_sum = cm.trace()\n", " sum_of_all_elements = cm.sum()\n", " \n", " rt = diagonal_sum / sum_of_all_elements\n", " \n", " print ('Accuracy: {}'.format(rt))\n", " \n", " return rt\n", " \n", " \n", "def test(regr, args):\n", " test_data=pd.read_csv(os.path.join(args.test, 'test_set.csv'), engine='python')\n", "\n", " # labels are in the first column\n", " y_true = test_data['truth']\n", " test_x = test_data[test_data.columns[1:len(test_data)]]\n", " \n", " y_pred = regr.predict(test_x)\n", " \n", " accuracy(y_pred, y_true)\n", " \n", "if __name__ == '__main__':\n", "\n", " args = parse_args()\n", " \n", " regr = train(args)\n", " \n", " test(regr, args)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.sklearn.model import SKLearnModel\n", "\n", "role = sagemaker.get_execution_role()\n", "\n", "model = SKLearnModel(model_data = model_data,\n", " role = role, \n", " framework_version = '0.20.0', \n", " py_version='py3',\n", " entry_point = 'train.py')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 3. Create an Endpoint on SageMaker\n", "Now, here comes the complex maneuver. Kidding, it's dirt simple. Let's turn your model into a RESTful API!" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "predictor = model.deploy(1, 'ml.m4.2xlarge')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import sagemaker\n", "\n", "from sagemaker.sklearn.model import SKLearnPredictor\n", "sess = sagemaker.Session()\n", "\n", "# optional. If your kernel times out, or your need to refresh, here's how you can easily point to an existing endpoint\n", "endpoint_name = 'sagemaker-scikit-learn-2020-10-14-15-12-50-644'\n", "\n", "predictor = SKLearnPredictor(endpoint_name = endpoint_name, sagemaker_session = sess)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now let's get some predictions from that endpoint. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "test_set = pd.read_csv('test_set.csv')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "y_true = test_set['truth']\n", "\n", "test_set.drop('truth', inplace=True, axis=1)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import pandas as pd\n", "\n", "y_pred = pd.DataFrame(predictor.predict(test_set))\n", "\n", "assert len(y_pred) == test_set.shape[0]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 4. Enable Autoscaling on your Endpoint\n", "For the sake of argument, let's say we're happy with this model and want to continue supporting it in prod. Our next step might be to enable autoscaling. Let's do that right here." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import boto3\n", "\n", "def get_resource_id(endpoint_name):\n", "\n", " client = boto3.client('sagemaker')\n", "\n", " response = client.describe_endpoint(\n", " EndpointName=endpoint_name)\n", "\n", " variant_name = response['ProductionVariants'][0]['VariantName']\n", " resource_id = 'endpoint/{}/variant/{}'.format(endpoint_name, variant_name)\n", " \n", " return resource_id\n", "\n", "resource_id = get_resource_id(endpoint_name)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import boto3\n", "\n", "role = sagemaker.get_execution_role()\n", "\n", "def set_scaling_policy(resource_id, min_capacity = 1, max_capacity = 8, role = role):\n", "\n", " scaling_client = boto3.client('application-autoscaling')\n", "\n", " response = scaling_client.register_scalable_target(\n", " ServiceNamespace='sagemaker',\n", " ResourceId=resource_id,\n", " ScalableDimension='sagemaker:variant:DesiredInstanceCount',\n", " MinCapacity=min_capacity,\n", " MaxCapacity=max_capacity,\n", " RoleARN=role)\n", " \n", " return response\n", "\n", "res = set_scaling_policy(resource_id)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 5. Enable Model Monitor on your Endpoint\n", "Now that you have a model up and running, with autoscaling enabled, let's set up model monitor on that endpoint. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import sagemaker\n", "import os\n", "\n", "sess = sagemaker.Session()\n", "\n", "bucket = sess.default_bucket()\n", "\n", "prefix = 'model-hosting'\n", "\n", "s3_capture_upload_path = 's3://{}/{}/model-monitor'.format(bucket, prefix)\n", "\n", "print ('about to set up monitoring for endpoint named {}'.format(endpoint_name))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now, let's set up a data capture config." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.model_monitor import DataCaptureConfig\n", "\n", "data_capture_config = DataCaptureConfig(\n", " enable_capture = True,\n", " sampling_percentage=50,\n", " destination_s3_uri=s3_capture_upload_path,\n", " capture_options=[\"REQUEST\", \"RESPONSE\"],\n", " csv_content_types=[\"text/csv\"],\n", " json_content_types=[\"application/json\"])\n", "\n", "# Now it is time to apply the new configuration and wait for it to be applied\n", "predictor.update_data_capture_config(data_capture_config=data_capture_config)\n", "\n", "sess.wait_for_endpoint(endpoint=endpoint_name)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Next step here is to pass in our training data, and ask SageMaker to learn baseline thresholds for all of our features. \n", "\n", "First, let's make sure the data we used to train our model is stored in S3." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "msg = 'aws s3 cp train_set.csv s3://{}/{}/train/'.format(bucket, prefix)\n", "os.system(msg)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# todo - show them how to get access to this training data\n", "s3_training_data_path = 's3://{}/{}/train/train_set.csv'.format(bucket, prefix)\n", "\n", "s3_baseline_results = 's3://{}/{}/model-monitor/baseline-results'.format(bucket, prefix)\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.model_monitor import DefaultModelMonitor\n", "from sagemaker.model_monitor.dataset_format import DatasetFormat\n", "\n", "my_default_monitor = DefaultModelMonitor(\n", " role=role,\n", " instance_count=1,\n", " instance_type='ml.m5.xlarge',\n", " volume_size_in_gb=20,\n", " max_runtime_in_seconds=3600,\n", ")\n", "\n", "my_default_monitor.suggest_baseline(\n", " baseline_dataset=s3_training_data_path,\n", " \n", " # change header to false if not included\n", " dataset_format=DatasetFormat.csv(header=False),\n", " output_s3_uri=s3_baseline_results,\n", " wait=True\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "If you like, you can download the results from S3 and analyze. In the interest of time, we'll move on to setting up the monitoring schedule. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.model_monitor import CronExpressionGenerator\n", "from time import gmtime, strftime\n", "\n", "mon_schedule_name = 'bi-hourly'\n", "s3_report_path = 's3://{}/{}/model-monitor/monitoring-job-results'.format(bucket, prefix)\n", "\n", "my_default_monitor.create_monitoring_schedule(\n", " monitor_schedule_name=mon_schedule_name,\n", " endpoint_input=endpoint_name,\n", " output_s3_uri=s3_report_path,\n", " statistics=my_default_monitor.baseline_statistics(),\n", " constraints=my_default_monitor.suggested_constraints(),\n", " schedule_cron_expression=CronExpressionGenerator.daily(),\n", " enable_cloudwatch_metrics=True)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "# Tune your model and re-deploy onto the SageMaker Endpoint\n", "\n", "Alright, we made it pretty far already! Now that we have monitoring enabled on this endpoint, let's imagine that something goes awry. We realize that we need a new model hosted on this RESTful API. How are we going to do that?" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "First, let's go about getting a new model. Given that the dataset here is pretty small, less than even 500 rows on the training set, why not try out AutoGluon? AutoGluon is a competitive choice here because it will actually augment our data for us. Said another way, Autogluon will make our original dataset larger by using Transformers and masking columns. Pretty cool!" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!mkdir src" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%writefile src/requirements.txt\n", "\n", "autogluon\n", "sagemaker\n", "awscli \n", "boto3\n", "PrettyTable\n", "bokeh\n", "numpy==1.16.1\n", "matplotlib\n", "sagemaker-experiments" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%writefile src/train.py\n", "\n", "import ast\n", "import argparse\n", "import logging\n", "import warnings\n", "import os\n", "import json\n", "import glob\n", "import subprocess\n", "import sys\n", "import boto3\n", "import pickle\n", "import pandas as pd\n", "from collections import Counter\n", "from timeit import default_timer as timer\n", "import time\n", "\n", "from smexperiments.experiment import Experiment\n", "from smexperiments.trial import Trial\n", "from smexperiments.trial_component import TrialComponent\n", "from smexperiments.tracker import Tracker\n", "\n", "sys.path.insert(0, 'package')\n", "with warnings.catch_warnings():\n", " warnings.filterwarnings(\"ignore\",category=DeprecationWarning)\n", " from prettytable import PrettyTable\n", " import autogluon as ag\n", " from autogluon import TabularPrediction as task\n", " from autogluon.task.tabular_prediction import TabularDataset\n", " \n", "# ------------------------------------------------------------ #\n", "# Training methods #\n", "# ------------------------------------------------------------ #\n", "\n", "def du(path):\n", " \"\"\"disk usage in human readable format (e.g. '2,1GB')\"\"\"\n", " return subprocess.check_output(['du','-sh', path]).split()[0].decode('utf-8')\n", "\n", "def __load_input_data(path: str) -> TabularDataset:\n", " \"\"\"\n", " Load training data as dataframe\n", " :param path:\n", " :return: DataFrame\n", " \"\"\"\n", " input_data_files = os.listdir(path)\n", " try:\n", " input_dfs = [pd.read_csv(f'{path}/{data_file}') for data_file in input_data_files]\n", " return task.Dataset(df=pd.concat(input_dfs))\n", " except:\n", " print(f'No csv data in {path}!')\n", " return None\n", "\n", "def train(args):\n", " \n", " is_distributed = len(args.hosts) > 1\n", " host_rank = args.hosts.index(args.current_host) \n", " dist_ip_addrs = args.hosts\n", " dist_ip_addrs.pop(host_rank)\n", " ngpus_per_trial = 1 if args.num_gpus > 0 else 0\n", "\n", " # load training and validation data\n", " print(f'Train files: {os.listdir(args.train)}')\n", " train_data = __load_input_data(args.train)\n", " print(f'Label counts: {dict(Counter(train_data[args.label]))}')\n", " \n", " predictor = task.fit(\n", " train_data=train_data,\n", " label=args.label, \n", " output_directory=args.model_dir,\n", " problem_type=args.problem_type,\n", " eval_metric=args.eval_metric,\n", " stopping_metric=args.stopping_metric,\n", " auto_stack=args.auto_stack, # default: False\n", " hyperparameter_tune=args.hyperparameter_tune, # default: False\n", " feature_prune=args.feature_prune, # default: False\n", " holdout_frac=args.holdout_frac, # default: None\n", " num_bagging_folds=args.num_bagging_folds, # default: 0\n", " num_bagging_sets=args.num_bagging_sets, # default: None\n", " stack_ensemble_levels=args.stack_ensemble_levels, # default: 0\n", " cache_data=args.cache_data,\n", " time_limits=args.time_limits,\n", " num_trials=args.num_trials, # default: None\n", " search_strategy=args.search_strategy, # default: 'random'\n", " search_options=args.search_options,\n", " visualizer=args.visualizer,\n", " verbosity=args.verbosity\n", " )\n", " \n", " # Results summary\n", " predictor.fit_summary(verbosity=1)\n", "\n", " # Leaderboard on optional test data\n", " if args.test:\n", " print(f'Test files: {os.listdir(args.test)}')\n", " test_data = __load_input_data(args.test) \n", " print('Running model on test data and getting Leaderboard...')\n", " leaderboard = predictor.leaderboard(dataset=test_data, silent=True)\n", " def format_for_print(df):\n", " table = PrettyTable(list(df.columns))\n", " for row in df.itertuples():\n", " table.add_row(row[1:])\n", " return str(table)\n", " print(format_for_print(leaderboard), end='\\n\\n')\n", "\n", " # Files summary\n", " print(f'Model export summary:')\n", " print(f\"/opt/ml/model/: {os.listdir('/opt/ml/model/')}\")\n", " models_contents = os.listdir('/opt/ml/model/models')\n", " print(f\"/opt/ml/model/models: {models_contents}\")\n", " print(f\"/opt/ml/model directory size: {du('/opt/ml/model/')}\\n\")\n", "\n", "# ------------------------------------------------------------ #\n", "# Training execution #\n", "# ------------------------------------------------------------ #\n", "\n", "def str2bool(v):\n", " return v.lower() in ('yes', 'true', 't', '1')\n", "\n", "def parse_args():\n", "\n", " parser = argparse.ArgumentParser(\n", " formatter_class=argparse.ArgumentDefaultsHelpFormatter)\n", " parser.register('type','bool',str2bool) # add type keyword to registries\n", "\n", " parser.add_argument('--hosts', type=list, default=json.loads(os.environ['SM_HOSTS'])) \n", " parser.add_argument('--current-host', type=str, default=os.environ['SM_CURRENT_HOST'])\n", " parser.add_argument('--num-gpus', type=int, default=os.environ['SM_NUM_GPUS'])\n", " parser.add_argument('--model-dir', type=str, default=os.environ['SM_MODEL_DIR']) # /opt/ml/model\n", " parser.add_argument('--train', type=str, default=os.environ['SM_CHANNEL_TRAINING'])\n", " parser.add_argument('--test', type=str, default='') # /opt/ml/input/data/test\n", " parser.add_argument('--label', type=str, default='truth',\n", " help=\"Name of the column that contains the target variable to predict.\")\n", " \n", " parser.add_argument('--problem_type', type=str, default=None,\n", " help=(\"Type of prediction problem, i.e. is this a binary/multiclass classification or \"\n", " \"regression problem options: 'binary', 'multiclass', 'regression'). \"\n", " \"If `problem_type = None`, the prediction problem type is inferred based \"\n", " \"on the label-values in provided dataset.\"))\n", " parser.add_argument('--eval_metric', type=str, default=None,\n", " help=(\"Metric by which predictions will be ultimately evaluated on test data.\"\n", " \"AutoGluon tunes factors such as hyperparameters, early-stopping, ensemble-weights, etc. \"\n", " \"in order to improve this metric on validation data. \"\n", " \"If `eval_metric = None`, it is automatically chosen based on `problem_type`. \"\n", " \"Defaults to 'accuracy' for binary and multiclass classification and \"\n", " \"'root_mean_squared_error' for regression. \"\n", " \"Otherwise, options for classification: [ \"\n", " \" 'accuracy', 'balanced_accuracy', 'f1', 'f1_macro', 'f1_micro', 'f1_weighted', \"\n", " \" 'roc_auc', 'average_precision', 'precision', 'precision_macro', 'precision_micro', 'precision_weighted', \"\n", " \" 'recall', 'recall_macro', 'recall_micro', 'recall_weighted', 'log_loss', 'pac_score']. \"\n", " \"Options for regression: ['root_mean_squared_error', 'mean_squared_error', \"\n", " \"'mean_absolute_error', 'median_absolute_error', 'r2']. \"\n", " \"For more information on these options, see `sklearn.metrics`: \"\n", " \"https://scikit-learn.org/stable/modules/classes.html#sklearn-metrics-metrics \"\n", " \"You can also pass your own evaluation function here as long as it follows formatting of the functions \"\n", " \"defined in `autogluon/utils/tabular/metrics/`. \"))\n", " parser.add_argument('--stopping_metric', type=str, default=None,\n", " help=(\"Metric which models use to early stop to avoid overfitting. \"\n", " \"`stopping_metric` is not used by weighted ensembles, instead weighted ensembles maximize `eval_metric`. \"\n", " \"Defaults to `eval_metric` value except when `eval_metric='roc_auc'`, where it defaults to `log_loss`.\")) \n", " parser.add_argument('--auto_stack', type='bool', default=False,\n", " help=(\"Whether to have AutoGluon automatically attempt to select optimal \"\n", " \"num_bagging_folds and stack_ensemble_levels based on data properties. \"\n", " \"Note: Overrides num_bagging_folds and stack_ensemble_levels values. \"\n", " \"Note: This can increase training time by up to 20x, but can produce much better results. \"\n", " \"Note: This can increase inference time by up to 20x.\"))\n", " parser.add_argument('--hyperparameter_tune', type='bool', default=False,\n", " help=(\"Whether to tune hyperparameters or just use fixed hyperparameter values \"\n", " \"for each model. Setting as True will increase `fit()` runtimes.\"))\n", " parser.add_argument('--feature_prune', type='bool', default=False,\n", " help=\"Whether or not to perform feature selection.\")\n", " parser.add_argument('--holdout_frac', type=float, default=None, \n", " help=(\"Fraction of train_data to holdout as tuning data for optimizing hyperparameters \"\n", " \"(ignored unless `tuning_data = None`, ignored if `num_bagging_folds != 0`). \"\n", " \"Default value is selected based on the number of rows in the training data. \"\n", " \"Default values range from 0.2 at 2,500 rows to 0.01 at 250,000 rows. \"\n", " \"Default value is doubled if `hyperparameter_tune = True`, up to a maximum of 0.2. \"\n", " \"Disabled if `num_bagging_folds >= 2`.\")) \n", " parser.add_argument('--num_bagging_folds', type=int, default=0, \n", " help=(\"Number of folds used for bagging of models. When `num_bagging_folds = k`, \"\n", " \"training time is roughly increased by a factor of `k` (set = 0 to disable bagging). \"\n", " \"Disabled by default, but we recommend values between 5-10 to maximize predictive performance. \"\n", " \"Increasing num_bagging_folds will result in models with lower bias but that are more prone to overfitting. \"\n", " \"Values > 10 may produce diminishing returns, and can even harm overall results due to overfitting. \"\n", " \"To further improve predictions, avoid increasing num_bagging_folds much beyond 10 \"\n", " \"and instead increase num_bagging_sets. \")) \n", " parser.add_argument('--num_bagging_sets', type=int, default=None,\n", " help=(\"Number of repeats of kfold bagging to perform (values must be >= 1). \"\n", " \"Total number of models trained during bagging = num_bagging_folds * num_bagging_sets. \"\n", " \"Defaults to 1 if time_limits is not specified, otherwise 20 \"\n", " \"(always disabled if num_bagging_folds is not specified). \"\n", " \"Values greater than 1 will result in superior predictive performance, \"\n", " \"especially on smaller problems and with stacking enabled. \"\n", " \"Increasing num_bagged_sets reduces the bagged aggregated variance without \"\n", " \"increasing the amount each model is overfit.\"))\n", " parser.add_argument('--stack_ensemble_levels', type=int, default=0, \n", " help=(\"Number of stacking levels to use in stack ensemble. \"\n", " \"Roughly increases model training time by factor of `stack_ensemble_levels+1` \" \n", " \"(set = 0 to disable stack ensembling). \"\n", " \"Disabled by default, but we recommend values between 1-3 to maximize predictive performance. \"\n", " \"To prevent overfitting, this argument is ignored unless you have also set `num_bagging_folds >= 2`.\"))\n", " parser.add_argument('--hyperparameters', type=lambda s: ast.literal_eval(s), default=None,\n", " help=\"Refer to docs: https://autogluon.mxnet.io/api/autogluon.task.html\")\n", " parser.add_argument('--cache_data', type='bool', default=True,\n", " help=(\"Whether the predictor returned by this `fit()` call should be able to be further trained \"\n", " \"via another future `fit()` call. \"\n", " \"When enabled, the training and validation data are saved to disk for future reuse.\"))\n", " parser.add_argument('--time_limits', type=int, default=None, \n", " help=(\"Approximately how long `fit()` should run for (wallclock time in seconds).\"\n", " \"If not specified, `fit()` will run until all models have completed training, \"\n", " \"but will not repeatedly bag models unless `num_bagging_sets` is specified.\"))\n", " parser.add_argument('--num_trials', type=int, default=None, \n", " help=(\"Maximal number of different hyperparameter settings of each \"\n", " \"model type to evaluate during HPO. (only matters if \"\n", " \"hyperparameter_tune = True). If both `time_limits` and \"\n", " \"`num_trials` are specified, `time_limits` takes precedent.\")) \n", " parser.add_argument('--search_strategy', type=str, default='random',\n", " help=(\"Which hyperparameter search algorithm to use. \"\n", " \"Options include: 'random' (random search), 'skopt' \"\n", " \"(SKopt Bayesian optimization), 'grid' (grid search), \"\n", " \"'hyperband' (Hyperband), 'rl' (reinforcement learner)\")) \n", " parser.add_argument('--search_options', type=lambda s: ast.literal_eval(s), default=None,\n", " help=\"Auxiliary keyword arguments to pass to the searcher that performs hyperparameter optimization.\")\n", " parser.add_argument('--nthreads_per_trial', type=int, default=None,\n", " help=\"How many CPUs to use in each training run of an individual model. This is automatically determined by AutoGluon when left as None (based on available compute).\")\n", " parser.add_argument('--ngpus_per_trial', type=int, default=None,\n", " help=\"How many GPUs to use in each trial (ie. single training run of a model). This is automatically determined by AutoGluon when left as None.\")\n", " parser.add_argument('--dist_ip_addrs', type=list, default=None,\n", " help=\"List of IP addresses corresponding to remote workers, in order to leverage distributed computation.\") \n", " parser.add_argument('--visualizer', type=str, default='none',\n", " help=(\"How to visualize the neural network training progress during `fit()`. \"\n", " \"Options: ['mxboard', 'tensorboard', 'none'].\")) \n", " parser.add_argument('--verbosity', type=int, default=2, \n", " help=(\"Verbosity levels range from 0 to 4 and control how much information is printed during fit(). \"\n", " \"Higher levels correspond to more detailed print statements (you can set verbosity = 0 to suppress warnings). \"\n", " \"If using logging, you can alternatively control amount of information printed via `logger.setLevel(L)`, \"\n", " \"where `L` ranges from 0 to 50 (Note: higher values of `L` correspond to fewer print statements, \"\n", " \"opposite of verbosity levels\"))\n", " parser.add_argument('--debug', type='bool', default=False,\n", " help=(\"Whether to set logging level to DEBUG\")) \n", " \n", " parser.add_argument('--feature_importance', type='bool', default=True)\n", "\n", " return parser.parse_args()\n", "\n", "\n", "def set_experiment_config(experiment_basename = None):\n", " '''\n", " Optionally takes an base name for the experiment. Has a hard dependency on boto3 installation. \n", " Creates a new experiment using the basename, otherwise simply uses autogluon as basename.\n", " May run into issues on Experiments' requirements for basename config downstream.\n", " '''\n", " now = int(time.time())\n", " \n", " if experiment_basename:\n", " experiment_name = '{}-autogluon-{}'.format(experiment_basename, now)\n", " else:\n", " experiment_name = 'autogluon-{}'.format(now)\n", " \n", " try:\n", " client = boto3.Session().client('sagemaker')\n", " except:\n", " print ('You need to install boto3 to create an experiment. Try pip install --upgrade boto3')\n", " return ''\n", " \n", " try:\n", " Experiment.create(experiment_name=experiment_name, \n", " description=\"Running AutoGluon Tabular with SageMaker Experiments\", \n", " sagemaker_boto_client=client)\n", " print ('Created an experiment named {}, you should be able to see this in SageMaker Studio right now.'.format(experiment_name))\n", " \n", " except:\n", " print ('Could not create the experiment. Is your basename properly configured? Also try installing the sagemaker experiments SDK with pip install sagemaker-experiments.')\n", " return ''\n", " \n", " return experiment_name\n", "\n", "if __name__ == \"__main__\":\n", " start = timer()\n", "\n", " args = parse_args()\n", " \n", " # Print SageMaker args\n", " print('\\n====== args ======')\n", " for k,v in vars(args).items():\n", " print(f'{k}, type: {type(v)}, value: {v}')\n", " print()\n", " \n", " train()\n", "\n", " # Package inference code with model export\n", " subprocess.call('mkdir /opt/ml/model/code'.split())\n", " subprocess.call('cp /opt/ml/code/inference.py /opt/ml/model/code/'.split())\n", " \n", " elapsed_time = round(timer()-start,3)\n", " print(f'Elapsed time: {elapsed_time} seconds') \n", " print('===== Training Completed =====')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.mxnet.estimator import MXNet\n", "from sagemaker import get_execution_role\n", "\n", "role = get_execution_role()\n", "\n", "estimator = MXNet(source_dir = 'src',\n", " entry_point = 'train.py',\n", " role=role,\n", " framework_version = '1.7.0',\n", " py_version = 'py3',\n", " instance_count=1,\n", " instance_type='ml.m5.2xlarge',\n", " volume_size=100) \n", "\n", "s3_path = 's3://sagemaker-us-east-1-181880743555/model-hosting/test_set.csv'\n", "\n", "estimator.fit(s3_path, wait=False)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# from sagemaker.sklearn.estimator import SKLearn\n", "# from sagemaker import get_execution_role\n", "\n", "# script_path = 'train.py'\n", "\n", "# # first, let's get the estimator defined \n", "# est = SKLearn(entry_point=script_path,\n", "# instance_type=\"ml.c4.xlarge\",\n", "# instance_count = 1,\n", "# role=role,\n", "# sagemaker_session=sess,\n", "# py_version = 'py3',\n", "# framework_version = '0.20.0')\n", "\n", "# # then, let's set up the tuning framework \n", "# from sagemaker.tuner import IntegerParameter, CategoricalParameter, ContinuousParameter, HyperparameterTuner\n", "\n", "# hyperparameter_ranges = {'lr': ContinuousParameter(0.00001, 0.001),\n", "# 'batch-size': IntegerParameter(25, 300)}" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# objective_metric_name = 'Accuracy'\n", "# objective_type = 'Maximize'\n", "# metric_definitions = [{'Name': 'Accuracy',\n", "# 'Regex': 'Accuracy: ([0-9\\\\.]+)'}]" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# tuner = HyperparameterTuner(est,\n", "# objective_metric_name,\n", "# hyperparameter_ranges,\n", "# metric_definitions,\n", "# max_jobs=20,\n", "# max_parallel_jobs=3,\n", "# objective_type=objective_type)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# msg = 'aws s3 cp test_set.csv s3://{}/{}/ && aws s3 cp train_set.csv s3://{}/{}/'.format(bucket, prefix, bucket, prefix)\n", "# os.system(msg)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# # may complain about not wanting headers \n", "# inputs = {'train': 's3://{}/{}/train_set.csv'.format(bucket, prefix),\n", "# 'test': 's3://{}/{}/test_set.csv'.format(bucket, prefix)}" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# tuner.fit(inputs)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Redeploy to existing SageMaker Endpoint" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.tuner import HyperparameterTuner\n", "\n", "job_name = 'sagemaker-scikit-lea-201014-1830'\n", "\n", "tuner = HyperparameterTuner.attach(job_name)\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "best_estimator = tuner.best_estimator()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "model = best_estimator.create_model()\n", "model_name = model.name" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import boto3\n", "import random\n", "import time\n", "import datetime\n", "\n", "def create_model(model, now):\n", " sm_client = boto3.client('sagemaker')\n", " \n", " x = random.randint(1, 100)\n", " \n", " model_name = '{}-{}'.format(model.name, now)\n", " \n", " response = sm_client.create_model(ModelName=model_name,\n", " PrimaryContainer={'ContainerHostname': 'string','Image': model.image_uri, 'ModelDataUrl': model.model_data},\n", " ExecutionRoleArn= 'arn:aws:iam::181880743555:role/service-role/AmazonSageMaker-ExecutionRole-20200929T125134')\n", "\n", " return response\n", "\n", "def get_endpoint_config(model_name, now):\n", " \n", " sm_client = boto3.client('sagemaker')\n", "\n", " endpoint_config_name = 'ec-{}-{}'.format(model_name, now)\n", " \n", " response = sm_client.create_endpoint_config(EndpointConfigName= endpoint_config_name,\n", " ProductionVariants=[{'VariantName': 'v-{}'.format(model_name),\n", " 'ModelName': model_name,\n", " 'InitialInstanceCount': 1,\n", " 'InstanceType':'ml.m5.large'}])\n", " return endpoint_config_name\n", "\n", "def update_endpoint(model_name, endpoint_name, now):\n", " \n", " sm_client = boto3.client('sagemaker')\n", "\n", " endpoint_config = get_endpoint_config(model_name, now)\n", " \n", " # deregister a scaling policy \n", " resource_id = get_resource_id(endpoint_name)\n", " \n", " client = boto3.client('application-autoscaling')\n", " \n", " \n", " try:\n", " response = client.deregister_scalable_target(ServiceNamespace='sagemaker',\n", " ResourceId=resource_id,\n", " ScalableDimension='sagemaker:variant:DesiredInstanceCount')\n", " \n", " except:\n", " print ('no autoscaling policy to deregister, continuing')\n", " # get monitoring schedules\n", " \n", " try:\n", " response = sm_client.list_monitoring_schedules(EndpointName=endpoint_name,\n", " MaxResults=10,\n", " StatusEquals='Scheduled')\n", " # delete monitoring schedules \n", " for each in response['MonitoringScheduleSummaries']:\n", " name = each['MonitoringScheduleName']\n", " response = sm_client.delete_monitoring_schedule(MonitoringScheduleName=name)\n", " \n", " except:\n", " print ('already deleted the monitoring schedules')\n", " \n", " response = sm_client.update_endpoint(EndpointName=endpoint_name,\n", " EndpointConfigName=endpoint_config)\n", " \n", " return response\n", "\n", " \n", "now = str(datetime.datetime.now()).split('.')[-1]\n", " \n", "endpoint_name = 'sagemaker-scikit-learn-2020-10-14-15-12-50-644'\n", "\n", "create_model(model, now)\n", "\n", "update_endpoint(model_name, endpoint_name, now)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "# Automate with Notebook Runner\n", "Now we're able to monitor new endpoints, we want the ability to automate this whole flow so that we can do it rapidly. As it so happens, a simple and fast way of doing that is using SageMaker processing jobs, CloudWatch, and Lambda. Luckily we can import all of the infrastructure we need using a simple toolkit, which we'll step through here.\n", "\n", "GitHub notes are right here: https://github.com/aws-samples/sagemaker-run-notebook" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# todo - make sure they have the right execution role here, add cfn all access, then a trust relationship, then inlines to allow create stack, plus codebuild create project nad start build " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# !wget https://github.com/aws-samples/sagemaker-run-notebook/releases/download/v0.15.0/sagemaker_run_notebook-0.15.0.tar.gz" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# !pip install sagemaker_run_notebook-0.15.0.tar.gz" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# !run-notebook create-infrastructure --update" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%writefile requirements.txt\n", "awscli\n", "boto3\n", "sagemaker\n", "pandas\n", "sklearn" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# !run-notebook create-container --requirements requirements.txt" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# !wget https://github.com/aws-samples/sagemaker-run-notebook/releases/download/v0.15.0/install-run-notebook.sh" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Next, __you need to open a system terminal on Studio, cd into the directory where we just downloaded `install-run-notebook.sh`, and run the command `bash install-run-notebook.sh`.__ This will run for a few minutes, then prompt you to refresh your web browser. Do that, and you'll see a new Jupyter Widget!" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "After restarting your Studio page, click on the spaceship widget on the top lefthand side of your Stuio domain view. Make sure you're actually looking at an ipython notebook while you do this.\n", "\n", "Using the widget is super simple. Paste in your execution role, which you can find by running `sagemaker.get_execution_role()` locally. Then paste in your ECR image repository, which you can find by opening up the ECR page in the AWS console. It should default to `notebook-runner`, so you can just paste that in directly.\n", "\n", "Then click the big blue `run now` button, and __this entire notebook is going to run on a SageMaker processing job.__ " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Before you do that, you'll want to comment-out those last few cells you ran to install this toolkit and get the infrastructure up and running. " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "If you want, you can parameterize this entire notebook using Papermill. Read more about how to do that with the following resources:\n", "- Blog post: https://aws.amazon.com/blogs/machine-learning/scheduling-jupyter-notebooks-on-sagemaker-ephemeral-instances/\n", "- GitHub repository: https://github.com/aws-samples/sagemaker-run-notebook" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "instance_type": "ml.t3.medium", "kernelspec": { "display_name": "Python 3 (Data Science)", "language": "python", "name": "python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-east-1:081325390199:image/datascience-1.0" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.6" } }, "nbformat": 4, "nbformat_minor": 4 }