In [None]:
!pip3 install Pillow --upgrade --user

In [None]:
import random
import string
from src.mnist.src import katib_launch_args, converter, resource_provider, tfjoblaunch_args_provider

import kfp
from kfp import components
from kfp.components import func_to_container_op
from kfp import dsl

### Prerequiste: 


1. Create a aws-secret with `AmazonS3FullAccess` policy in `kubeflow` namespace.

```yaml
kind: Secret
metadata:
 name: aws-secret
 namespace: kubeflow
type: Opaque
data:
 AWS_ACCESS_KEY_ID: YOUR_BASE64_ACCESS_KEY
 AWS_SECRET_ACCESS_KEY: YOUR_BASE64_SECRET_ACCESS
```

> Note: To get base64 string, try `echo -n $AWS_ACCESS_KEY_ID | base64`


### Replace example to your S3 bucket name

In [None]:
mnist_bucket= "e2e-mnist-example"
s3_bucket_path = 's3://{}'.format(mnist_bucket)

### Replace aws_region with your region

In [None]:
aws_region = "us-west-2"

### Build Kubeflow Pipeline

In [None]:
namespace='kubeflow'
 
@dsl.pipeline(
 name="End to end pipeline",
 description="An end to end example including hyperparameter tuning, train and inference."
)
def mnist_pipeline(
 name="mnist-{{workflow.uid}}",
 namespace=namespace,
 step="1000",
 s3bucketexportpath="",
 ttlSecondsAfterFinished=-1,
 tfjobTimeoutMinutes=60,
 deleteAfterDone=False):

 # step 1: create a Katib experiment to tune hyperparameters
 objectiveConfig, algorithmConfig, parameters, trialTemplate, metricsCollectorSpec = \
 katib_launch_args.argugments_provide(objective_type="minimize",
 objective_goal=0.001,
 objective_metrics="loss",
 algorithm="random",
 parameters_lr_min="0.01",
 parameters_lr_max="0.03",
 parameters_batchsize=["16", "32", "64"],
 tf_train_steps="200",
 image="chuckshow/mnist-tf-pipeline:latest",
 worker_num=3)
 

 katib_experiment_launcher_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/master/components/kubeflow/katib-launcher/component.yaml')
 op1 = katib_experiment_launcher_op(
 experiment_name=name,
 experiment_namespace=namespace,
 parallel_trial_count=3,
 max_trial_count=12,
 objective=str(objectiveConfig),
 algorithm=str(algorithmConfig),
 trial_template=str(trialTemplate),
 parameters=str(parameters),
 metrics_collector=str(metricsCollectorSpec),
 delete_finished_experiment=False)
 
 # step 1.5: convert Katib best parameteres into string
 convert_op = func_to_container_op(converter.convert_mnist_experiment_result)
 op2 = convert_op(op1.output)
 
 # step2: create a TFJob Launcher to train your model with best hyperparameter tuned by Katib
 tfjob_launcher_op = components.load_component_from_file("./src/mnist/launcher/component.yaml")
 
 chief, worker = tfjoblaunch_args_provider.tfjoblauncher_args(step=step, 
 s3bucketexportpath=s3bucketexportpath, 
 args=op2.output, 
 aws_region=aws_region)
 
 train = tfjob_launcher_op(
 name=name,
 namespace=namespace,
 ttl_seconds_after_finished=ttlSecondsAfterFinished,
 worker_spec=worker,
 chief_spec=chief,
 tfjob_timeout_minutes=tfjobTimeoutMinutes,
 delete_finished_tfjob=deleteAfterDone,
 )
 
 # step 3: model inferencese by Tensorflow Serving 
 HASH = ''.join([random.choice(string.ascii_lowercase) for n in range(16)] + [random.choice(string.digits) for n in range(16)])
 servingdeploy_name = 'mnist-model' + HASH
 
 deploy = resource_provider.tfservingdeploy_resource(namespace=namespace,
 s3bucketexportpath=s3bucketexportpath,
 servingdeploy_name=servingdeploy_name,
 aws_region=aws_region) 
 
 deployment = dsl.ResourceOp(
 name="deploy",
 k8s_resource=deploy,
 ).after(train)
 
 servingsvc_name = 'mnist-service' 
 serviceresource = resource_provider.tfservingsvc_resource(namespace=namespace,
 servingdeploy_name=servingdeploy_name,
 servingsvc_name=servingsvc_name)
 
 service = dsl.ResourceOp(
 name="service",
 k8s_resource=serviceresource
 ).after(deployment)
 

 # step 4: mnist ui deploy
 ui_name = 'mnist-ui' + HASH
 uideployresource = resource_provider.uideploy_resource(namespace=namespace,
 ui_name=ui_name)
 
 uideploy = dsl.ResourceOp(
 name="uideploy",
 k8s_resource=uideployresource
 ).after(train)
 
 uiserviceresource = resource_provider.uisvc_resource(namespace=namespace,
 ui_name=ui_name)
 
 uiservice = dsl.ResourceOp(
 name="uiservice",
 k8s_resource=uiserviceresource
 ).after(uideploy)
 
 uivirtualserviceresource = resource_provider.uivirtualsvc_resource(namespace=namespace,
 ui_name=ui_name)
 
 uivirtualservice = dsl.ResourceOp(
 name="uivirtualservice",
 k8s_resource=uivirtualserviceresource
 ).after(uiservice)
 

### Submit the pipeline

In [None]:
pipeline = kfp.Client().create_run_from_pipeline_func(mnist_pipeline, arguments={"s3bucketexportpath":'{}/export'.format(s3_bucket_path)})

### Invoke serving API via Python client

In [None]:
import tensorflow as tf
from tensorflow import keras

# Helper libraries
import numpy as np
import os
import subprocess
import argparse

import random
import json
import requests


endpoint = "http://mnist-service.{}.svc.cluster.local:8500/v1/models/mnist:predict".format(namespace)


# Prepare test dataset
fashion_mnist = keras.datasets.mnist
(train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()

# scale the values to 0.0 to 1.0
train_images = train_images / 255.0
test_images = test_images / 255.0

# reshape for feeding into the model
train_images = train_images.reshape(train_images.shape[0], 28, 28, 1)
test_images = test_images.reshape(test_images.shape[0], 28, 28, 1)

class_names = ['0','1','2','3','4','5','6','7','8','9']

# Random generate one image
rando = random.randint(0,len(test_images)-1)
data = json.dumps({"signature_name": "serving_default", "instances": test_images[rando:rando+1].tolist()})
print('Data: {} ... {}'.format(data[:50], data[len(data)-52:]))

# HTTP call
headers = {"content-type": "application/json"}
json_response = requests.post(endpoint, data=data, headers=headers)
predictions = json.loads(json_response.text)['predictions']

print(predictions)

title = 'The model thought this was a class {}, and it was actually a class {}'.format(
test_labels[rando], predictions[0]['classes'])
print('\n')
print(title)


### Invoke serving API via UI

Open your_ALB_endpoint + `/mnist/${namespace}/ui/` to visit mnist UI page.

### Clean up resources in the terminal before re-running the Pipeline

Kubectl delete svc mnist-service -n kubeflow