{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "## Train and deploy model on Kubeflow in Notebooks\n", "\n", "This examples comes from a upstream fairing [example](https://github.com/kubeflow/fairing/tree/master/examples/prediction).\n", "\n", "\n", "Please check Kaggle competiton [\n", "House Prices: Advanced Regression Techniques](https://www.kaggle.com/c/house-prices-advanced-regression-techniques)\n", "for details about the ML problem we want to resolve.\n", "\n", "This notebook introduces you to using Kubeflow Fairing to train and deploy a model to Kubeflow on Amazon EKS. This notebook demonstrate how to:\n", "\n", "* Train an XGBoost model in a local notebook,\n", "* Use Kubeflow Fairing to train an XGBoost model remotely on Kubeflow,\n", "* Use Kubeflow Fairing to deploy a trained model to Kubeflow,\n", "* Call the deployed endpoint for predictions.\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Install python dependencies" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%writefile requirements.txt\n", "pandas\n", "joblib\n", "numpy\n", "xgboost\n", "scikit-learn>=0.21.0\n", "seldon-core\n", "tornado>=6.0.3" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!pip install -r requirements.txt" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Develop your model" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import argparse\n", "import logging\n", "import joblib\n", "import sys\n", "import pandas as pd\n", "import numpy as np\n", "from sklearn.metrics import mean_absolute_error\n", "from sklearn.model_selection import train_test_split\n", "from sklearn.impute import SimpleImputer\n", "from xgboost import XGBRegressor\n", "\n", "logging.basicConfig(format='%(message)s')\n", "logging.getLogger().setLevel(logging.INFO)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def read_input(file_name, test_size=0.25):\n", " \"\"\"Read input data and split it into train and test.\"\"\"\n", " data = pd.read_csv(file_name)\n", " data.dropna(axis=0, subset=['SalePrice'], inplace=True)\n", "\n", " y = data.SalePrice\n", " X = data.drop(['SalePrice'], axis=1).select_dtypes(exclude=['object'])\n", "\n", " train_X, test_X, train_y, test_y = train_test_split(X.values,\n", " y.values,\n", " test_size=test_size,\n", " shuffle=False)\n", "\n", " imputer = SimpleImputer()\n", " train_X = imputer.fit_transform(train_X)\n", " test_X = imputer.transform(test_X)\n", "\n", " return (train_X, train_y), (test_X, test_y)\n", "\n", "def train_model(train_X,\n", " train_y,\n", " test_X,\n", " test_y,\n", " n_estimators,\n", " learning_rate):\n", " \"\"\"Train the model using XGBRegressor.\"\"\"\n", " model = XGBRegressor(n_estimators=n_estimators, learning_rate=learning_rate)\n", "\n", " model.fit(train_X,\n", " train_y,\n", " early_stopping_rounds=40,\n", " eval_set=[(test_X, test_y)])\n", "\n", " print(\"Best RMSE on eval: %.2f with %d rounds\" %\n", " (model.best_score,\n", " model.best_iteration+1))\n", " return model\n", "\n", "def eval_model(model, test_X, test_y):\n", " \"\"\"Evaluate the model performance.\"\"\"\n", " predictions = model.predict(test_X)\n", " logging.info(\"mean_absolute_error=%.2f\", mean_absolute_error(predictions, test_y))\n", "\n", "def save_model(model, model_file):\n", " \"\"\"Save XGBoost model for serving.\"\"\"\n", " joblib.dump(model, model_file)\n", " logging.info(\"Model export success: %s\", model_file)\n", " \n", " \n", "class HousingServe(object):\n", " \n", " def __init__(self):\n", " self.train_input = \"ames_dataset/train.csv\"\n", " self.n_estimators = 50\n", " self.learning_rate = 0.1\n", " self.model_file = \"trained_ames_model.dat\"\n", " self.model = None\n", "\n", " def train(self):\n", " (train_X, train_y), (test_X, test_y) = read_input(self.train_input)\n", " model = train_model(train_X,\n", " train_y,\n", " test_X,\n", " test_y,\n", " self.n_estimators,\n", " self.learning_rate)\n", "\n", " eval_model(model, test_X, test_y)\n", " save_model(model, self.model_file)\n", "\n", " def predict(self, X, feature_names=None):\n", " \"\"\"Predict using the model for given ndarray.\"\"\"\n", " if not self.model:\n", " self.model = joblib.load(self.model_file)\n", " # Do any preprocessing\n", " prediction = self.model.predict(data=X)\n", " # Do any postprocessing\n", " return prediction" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Train an XGBoost model in a notebook" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "model = HousingServe()\n", "model.train()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create an S3 bucket to store pipeline data\n", "> Note: Be sure to change the HASH variable to random hash before running next cell\n", "\n", "> Note: if you use `us-east-1`, please use command `!aws s3 mb s3://{HASH}'-kubeflow-pipeline-data' --region $AWS_REGION --endpoint-url https://s3.us-east-1.amazonaws.com`" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import random, string\n", "HASH = ''.join([random.choice(string.ascii_lowercase) for n in range(16)] + [random.choice(string.digits) for n in range(16)])\n", "AWS_REGION = 'us-west-2'\n", "!aws s3 mb s3://{HASH}'-kubeflow-pipeline-data' --region $AWS_REGION" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Set up Kubeflow Fairing for training and predictions\n", "\n", "> Note: remember to change `kubeflow-pipeline-data` to your own s3 bucket." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from kubeflow import fairing\n", "from kubeflow.fairing import TrainJob\n", "from kubeflow.fairing.backends import KubeflowAWSBackend\n", "\n", "\n", "from kubeflow import fairing\n", "\n", "FAIRING_BACKEND = 'KubeflowAWSBackend'\n", "\n", "AWS_ACCOUNT_ID = fairing.cloud.aws.guess_account_id()\n", "AWS_REGION = 'us-west-2'\n", "DOCKER_REGISTRY = '{}.dkr.ecr.{}.amazonaws.com'.format(AWS_ACCOUNT_ID, AWS_REGION)\n", "S3_BUCKET = f'{HASH}-kubeflow-pipeline-data'" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import importlib\n", "\n", "if FAIRING_BACKEND == 'KubeflowAWSBackend':\n", " from kubeflow.fairing.builders.cluster.s3_context import S3ContextSource\n", " BuildContext = S3ContextSource(\n", " aws_account=AWS_ACCOUNT_ID, region=AWS_REGION,\n", " bucket_name=S3_BUCKET\n", " )\n", "\n", "BackendClass = getattr(importlib.import_module('kubeflow.fairing.backends'), FAIRING_BACKEND)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Train an XGBoost model remotely on Kubeflow\n", "Import the `TrainJob` and use the configured backend class. Kubeflow Fairing packages the `HousingServe` class, the training data, and the training job's software prerequisites as a Docker image. Then Kubeflow Fairing deploys and runs the training job on Kubeflow.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from kubeflow.fairing import TrainJob\n", "train_job = TrainJob(HousingServe, input_files=['ames_dataset/train.csv', \"requirements.txt\"],\n", " docker_registry=DOCKER_REGISTRY,\n", " backend=BackendClass(build_context_source=BuildContext))\n", "train_job.submit()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Deploy the trained model to Kubeflow for predictions\n", "\n", "Import the `PredictionEndpoint` and use the configured backend class. Kubeflow Fairing packages the `HousingServe` class, the trained model, and the prediction endpoint's software prerequisites as a Docker image. Then Kubeflow Fairing deploys and runs the prediction endpoint on Kubeflow." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from kubeflow.fairing import PredictionEndpoint\n", "endpoint = PredictionEndpoint(HousingServe, input_files=['trained_ames_model.dat', \"requirements.txt\"],\n", " docker_registry=DOCKER_REGISTRY,\n", " service_type='ClusterIP',\n", " backend=BackendClass(build_context_source=BuildContext))\n", "endpoint.create()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Call the prediction endpoint\n", "Create a test dataset, then call the endpoint on Kubeflow for predictions." ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "# Wait service a while to be ready and replace `` with the output from last step.\n", "# Here's an example !nc -vz fairing-service-srwh2.anonymous.svc.cluster.local 5000\n", "\n", "!nc -vz 5000" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Get sample data and query endpoint\n", "(train_X, train_y), (test_X, test_y) = read_input(\"ames_dataset/train.csv\")\n", "\n", "# PR https://github.com/kubeflow/fairing/pull/376\n", "# Add `:5000/predict` to mitigate the issue.\n", "endpoint.url='http://fairing-service-n8qv2.anonymous.svc.cluster.local:5000/predict'\n", "\n", "endpoint.predict_nparray(test_X)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Clean up the prediction endpoint\n", "Delete the prediction endpoint created by this notebook." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "endpoint.delete()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Clean up S3 bucket and ECR Repository\n", "Delete S3 bucket and ECR Repository that was created for this exercise" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!aws s3 rb s3://$S3_BUCKET --force\n", "!aws ecr delete-repository --repository-name fairing-job --region $AWS_REGION --force" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.3" } }, "nbformat": 4, "nbformat_minor": 2 }