{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!pip3 install Pillow --upgrade --user" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import random\n", "import string\n", "from src.mnist.src import katib_launch_args, converter, resource_provider, tfjoblaunch_args_provider\n", "\n", "import kfp\n", "from kfp import components\n", "from kfp.components import func_to_container_op\n", "from kfp import dsl" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Prerequiste: \n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "1. Create a aws-secret with `AmazonS3FullAccess` policy in `kubeflow` namespace.\n", "\n", "```yaml\n", "kind: Secret\n", "metadata:\n", " name: aws-secret\n", " namespace: kubeflow\n", "type: Opaque\n", "data:\n", " AWS_ACCESS_KEY_ID: YOUR_BASE64_ACCESS_KEY\n", " AWS_SECRET_ACCESS_KEY: YOUR_BASE64_SECRET_ACCESS\n", "```\n", "\n", "> Note: To get base64 string, try `echo -n $AWS_ACCESS_KEY_ID | base64`\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Replace example to your S3 bucket name" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "mnist_bucket= \"e2e-mnist-example\"\n", "s3_bucket_path = 's3://{}'.format(mnist_bucket)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Replace aws_region with your region" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "aws_region = \"us-west-2\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Build Kubeflow Pipeline" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "namespace='kubeflow'\n", " \n", "@dsl.pipeline(\n", " name=\"End to end pipeline\",\n", " description=\"An end to end example including hyperparameter tuning, train and inference.\"\n", ")\n", "def mnist_pipeline(\n", " name=\"mnist-{{workflow.uid}}\",\n", " namespace=namespace,\n", " step=\"1000\",\n", " s3bucketexportpath=\"\",\n", " ttlSecondsAfterFinished=-1,\n", " tfjobTimeoutMinutes=60,\n", " deleteAfterDone=False):\n", "\n", " # step 1: create a Katib experiment to tune hyperparameters\n", " objectiveConfig, algorithmConfig, parameters, trialTemplate, metricsCollectorSpec = \\\n", " katib_launch_args.argugments_provide(objective_type=\"minimize\",\n", " objective_goal=0.001,\n", " objective_metrics=\"loss\",\n", " algorithm=\"random\",\n", " parameters_lr_min=\"0.01\",\n", " parameters_lr_max=\"0.03\",\n", " parameters_batchsize=[\"16\", \"32\", \"64\"],\n", " tf_train_steps=\"200\",\n", " image=\"chuckshow/mnist-tf-pipeline:latest\",\n", " worker_num=3)\n", " \n", "\n", " katib_experiment_launcher_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/master/components/kubeflow/katib-launcher/component.yaml')\n", " op1 = katib_experiment_launcher_op(\n", " experiment_name=name,\n", " experiment_namespace=namespace,\n", " parallel_trial_count=3,\n", " max_trial_count=12,\n", " objective=str(objectiveConfig),\n", " algorithm=str(algorithmConfig),\n", " trial_template=str(trialTemplate),\n", " parameters=str(parameters),\n", " metrics_collector=str(metricsCollectorSpec),\n", " delete_finished_experiment=False)\n", " \n", " # step 1.5: convert Katib best parameteres into string\n", " convert_op = func_to_container_op(converter.convert_mnist_experiment_result)\n", " op2 = convert_op(op1.output)\n", " \n", " # step2: create a TFJob Launcher to train your model with best hyperparameter tuned by Katib\n", " tfjob_launcher_op = components.load_component_from_file(\"./src/mnist/launcher/component.yaml\")\n", " \n", " chief, worker = tfjoblaunch_args_provider.tfjoblauncher_args(step=step, \n", " s3bucketexportpath=s3bucketexportpath, \n", " args=op2.output, \n", " aws_region=aws_region)\n", " \n", " train = tfjob_launcher_op(\n", " name=name,\n", " namespace=namespace,\n", " ttl_seconds_after_finished=ttlSecondsAfterFinished,\n", " worker_spec=worker,\n", " chief_spec=chief,\n", " tfjob_timeout_minutes=tfjobTimeoutMinutes,\n", " delete_finished_tfjob=deleteAfterDone,\n", " )\n", " \n", " # step 3: model inferencese by Tensorflow Serving \n", " HASH = ''.join([random.choice(string.ascii_lowercase) for n in range(16)] + [random.choice(string.digits) for n in range(16)])\n", " servingdeploy_name = 'mnist-model' + HASH\n", " \n", " deploy = resource_provider.tfservingdeploy_resource(namespace=namespace,\n", " s3bucketexportpath=s3bucketexportpath,\n", " servingdeploy_name=servingdeploy_name,\n", " aws_region=aws_region) \n", " \n", " deployment = dsl.ResourceOp(\n", " name=\"deploy\",\n", " k8s_resource=deploy,\n", " ).after(train)\n", " \n", " servingsvc_name = 'mnist-service' \n", " serviceresource = resource_provider.tfservingsvc_resource(namespace=namespace,\n", " servingdeploy_name=servingdeploy_name,\n", " servingsvc_name=servingsvc_name)\n", " \n", " service = dsl.ResourceOp(\n", " name=\"service\",\n", " k8s_resource=serviceresource\n", " ).after(deployment)\n", " \n", "\n", " # step 4: mnist ui deploy\n", " ui_name = 'mnist-ui' + HASH\n", " uideployresource = resource_provider.uideploy_resource(namespace=namespace,\n", " ui_name=ui_name)\n", " \n", " uideploy = dsl.ResourceOp(\n", " name=\"uideploy\",\n", " k8s_resource=uideployresource\n", " ).after(train)\n", " \n", " uiserviceresource = resource_provider.uisvc_resource(namespace=namespace,\n", " ui_name=ui_name)\n", " \n", " uiservice = dsl.ResourceOp(\n", " name=\"uiservice\",\n", " k8s_resource=uiserviceresource\n", " ).after(uideploy)\n", " \n", " uivirtualserviceresource = resource_provider.uivirtualsvc_resource(namespace=namespace,\n", " ui_name=ui_name)\n", " \n", " uivirtualservice = dsl.ResourceOp(\n", " name=\"uivirtualservice\",\n", " k8s_resource=uivirtualserviceresource\n", " ).after(uiservice)\n", " " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Submit the pipeline" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pipeline = kfp.Client().create_run_from_pipeline_func(mnist_pipeline, arguments={\"s3bucketexportpath\":'{}/export'.format(s3_bucket_path)})" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Invoke serving API via Python client" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import tensorflow as tf\n", "from tensorflow import keras\n", "\n", "# Helper libraries\n", "import numpy as np\n", "import os\n", "import subprocess\n", "import argparse\n", "\n", "import random\n", "import json\n", "import requests\n", "\n", "\n", "endpoint = \"http://mnist-service.{}.svc.cluster.local:8500/v1/models/mnist:predict\".format(namespace)\n", "\n", "\n", "# Prepare test dataset\n", "fashion_mnist = keras.datasets.mnist\n", "(train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()\n", "\n", "# scale the values to 0.0 to 1.0\n", "train_images = train_images / 255.0\n", "test_images = test_images / 255.0\n", "\n", "# reshape for feeding into the model\n", "train_images = train_images.reshape(train_images.shape[0], 28, 28, 1)\n", "test_images = test_images.reshape(test_images.shape[0], 28, 28, 1)\n", "\n", "class_names = ['0','1','2','3','4','5','6','7','8','9']\n", "\n", "# Random generate one image\n", "rando = random.randint(0,len(test_images)-1)\n", "data = json.dumps({\"signature_name\": \"serving_default\", \"instances\": test_images[rando:rando+1].tolist()})\n", "print('Data: {} ... {}'.format(data[:50], data[len(data)-52:]))\n", "\n", "# HTTP call\n", "headers = {\"content-type\": \"application/json\"}\n", "json_response = requests.post(endpoint, data=data, headers=headers)\n", "predictions = json.loads(json_response.text)['predictions']\n", "\n", "print(predictions)\n", "\n", "title = 'The model thought this was a class {}, and it was actually a class {}'.format(\n", "test_labels[rando], predictions[0]['classes'])\n", "print('\\n')\n", "print(title)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Invoke serving API via UI\n", "\n", "Open your_ALB_endpoint + `/mnist/${namespace}/ui/` to visit mnist UI page." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Clean up resources in the terminal before re-running the Pipeline\n", "\n", "Kubectl delete svc mnist-service -n kubeflow" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.9" }, "pycharm": { "stem_cell": { "cell_type": "raw", "metadata": { "collapsed": false }, "source": [] } } }, "nbformat": 4, "nbformat_minor": 4 }