{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Mnist classification pipeline using Sagemaker\n", "\n", "This sample runs a pipeline to train a classficiation model using Kmeans with MNIST dataset on Sagemaker." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We will have all required steps here and for other details like how to get source data, please check [documentation](https://github.com/kubeflow/pipelines/tree/master/samples/contrib/aws-samples/mnist-kmeans-sagemaker).\n", "\n", "\n", "This sample is based on the [Train a Model with a Built-in Algorithm and Deploy it](https://docs.aws.amazon.com/sagemaker/latest/dg/ex1.html).\n", "\n", "The sample performs hyperparameter optimization, trains a model, deploy a live endpoint and perform batch inference based on the [MNIST dataset](http://www.deeplearning.net/tutorial/gettingstarted.html). It takes approximately 35 min for the pipeline to run end to end.\n", "\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Prerequisite" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "1. Install Sagemaker, kfp and boto sdk. \n", "\n", "> Note: Be sure to use specified KFP SDK version in this notebook. Notebook is tested for kfp v0.1.29 release" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!pip install sagemaker https://storage.googleapis.com/ml-pipeline/release/0.1.29/kfp.tar.gz --upgrade " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "2. Create an S3 bucket to store pipeline data\n", "\n", "> Note: Be sure to change the HASH variable to random hash and change AWS_REGION before running next cell\n", "\n", "> Note: you use us-east-1, please use command `!aws s3 mb s3://$S3_BUCKET --region $AWS_REGION --endpoint-url https://s3.us-east-1.amazonaws.com`" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import random, string\n", "HASH = ''.join([random.choice(string.ascii_lowercase) for n in range(16)] + [random.choice(string.digits) for n in range(16)])\n", "AWS_REGION = 'us-west-2'\n", "S3_BUCKET = '{}-kubeflow-pipeline-data'.format(HASH)\n", "!aws s3 mb s3://$S3_BUCKET --region $AWS_REGION" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "3. Prepare dataset\n", "\n", "> Download `data` and `valid_data.csv`, convert them to the format required by KMeans and upload them into your S3 bucket." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import pickle, gzip, numpy, urllib.request, json\n", "from urllib.parse import urlparse\n", "\n", "# Load the dataset\n", "urllib.request.urlretrieve(\"http://deeplearning.net/data/mnist/mnist.pkl.gz\", \"mnist.pkl.gz\")\n", "with gzip.open('mnist.pkl.gz', 'rb') as f:\n", " train_set, valid_set, test_set = pickle.load(f, encoding='latin1')\n", "\n", "\n", "# Upload dataset to S3\n", "from sagemaker.amazon.common import write_numpy_to_dense_tensor\n", "import io\n", "import boto3\n", "\n", "###################################################################\n", "# This is the only thing that you need to change to run this code \n", "# Give the name of your S3 bucket \n", "bucket = S3_BUCKET \n", "\n", "# If you are gonna use the default values of the pipeline then \n", "# give a bucket name which is in us-west-2 region \n", "###################################################################\n", "\n", "train_data_key = 'mnist_kmeans_example/train_data'\n", "test_data_key = 'mnist_kmeans_example/test_data'\n", "train_data_location = 's3://{}/{}'.format(bucket, train_data_key)\n", "test_data_location = 's3://{}/{}'.format(bucket, test_data_key)\n", "print('Training data will be uploaded to: {}'.format(train_data_location))\n", "print('Test data will be uploaded to: {}'.format(test_data_location))\n", "\n", "# Convert the training data into the format required by the SageMaker KMeans algorithm\n", "buf = io.BytesIO()\n", "write_numpy_to_dense_tensor(buf, train_set[0], train_set[1])\n", "buf.seek(0)\n", "\n", "boto3.resource('s3').Bucket(bucket).Object(train_data_key).upload_fileobj(buf)\n", "\n", "# Convert the test data into the format required by the SageMaker KMeans algorithm\n", "write_numpy_to_dense_tensor(buf, test_set[0], test_set[1])\n", "buf.seek(0)\n", "\n", "boto3.resource('s3').Bucket(bucket).Object(test_data_key).upload_fileobj(buf)\n", "\n", "# Convert the valid data into the format required by the SageMaker KMeans algorithm\n", "numpy.savetxt('valid-data.csv', valid_set[0], delimiter=',', fmt='%g')\n", "s3_client = boto3.client('s3')\n", "input_key = \"{}/valid_data.csv\".format(\"mnist_kmeans_example/input\")\n", "s3_client.upload_file('valid-data.csv', bucket, input_key)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "4. Grant SageMaker permission" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "> Typically in a production environment, you would assign fine-grained permissions depending on the nature of actions you take and leverage tools like [IAM Role for Service Account](https://docs.aws.amazon.com/eks/latest/userguide/iam-roles-for-service-accounts.html) for securing access to AWS resources but for simplicity we will assign AmazonSageMakerFullAccess and AmazonS3FullAccess IAM policy. You can read more about granular policies [here](https://docs.aws.amazon.com/sagemaker/latest/dg/sagemaker-roles.html) \n", "\n", "> In order to run this pipeline, we need two levels of IAM permissions\n", "\n", "> a) create Kubernetes secrets **aws-secret** with Sagemaker and S3 policies. Please make sure to create `aws-secret` in kubeflow namespace.\n", "\n", "```yaml\n", "apiVersion: v1\n", "kind: Secret\n", "metadata:\n", " name: aws-secret\n", " namespace: kubeflow\n", "type: Opaque\n", "data:\n", " AWS_ACCESS_KEY_ID: YOUR_BASE64_ACCESS_KEY\n", " AWS_SECRET_ACCESS_KEY: YOUR_BASE64_SECRET_ACCESS\n", "```\n", "> Note: To get base64 string, try `echo -n $AWS_ACCESS_KEY_ID | base64`\n", "\n", "> b) create an IAM execution role for Sagemaker and S3 so that the job can assume this role in order to perform Sagemaker and S3 actions. Make a note of this role as you will need it during pipeline creation step\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "5. Install Kubeflow Pipelines SDK\n", "> You can skip this step if its already installed. You can validate if you have SDK installed by running `!pip show kfp`. The notebook has been tested for kfp v0.1.29 release" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Build pipeline" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "1. Run the following command to load Kubeflow Pipelines SDK" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import kfp\n", "from kfp import components\n", "from kfp import dsl\n", "from kfp.aws import use_aws_secret" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "2. Load reusable sagemaker components." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sagemaker_train_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/942be78bfe0f063084a5a006b3310b811a39f1ec/components/aws/sagemaker/train/component.yaml')\n", "sagemaker_model_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/942be78bfe0f063084a5a006b3310b811a39f1ec/components/aws/sagemaker/model/component.yaml')\n", "sagemaker_deploy_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/942be78bfe0f063084a5a006b3310b811a39f1ec/components/aws/sagemaker/deploy/component.yaml')\n", "sagemaker_batch_transform_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/942be78bfe0f063084a5a006b3310b811a39f1ec/components/aws/sagemaker/batch_transform/component.yaml')\n", "sagemaker_hpo_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/942be78bfe0f063084a5a006b3310b811a39f1ec/components/aws/sagemaker/hyperparameter_tuning/component.yaml')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "3. Create pipeline. \n", "\n", "We will create a training job first. Once training job is done, it will persist trained model to S3. \n", "\n", "Then a job will be kicked off to create a `Model` manifest in Sagemaker. \n", "\n", "With this model, batch transformation job can use it to predict on other datasets, prediction service can create an endpoint using it.\n", "\n", "\n", "> Note: remember to use pass your **role_arn** to successfully run the job.\n", "\n", "> Note: If you use a different region, please replace `us-west-2` with your region. \n", "\n", "> Note: ECR Images for k-means algorithm\n", "\n", "|Region| ECR Image|\n", "|------|----------|\n", "|us-west-1|632365934929.dkr.ecr.us-west-1.amazonaws.com|\n", "|us-west-2|174872318107.dkr.ecr.us-west-2.amazonaws.com|\n", "|us-east-1|382416733822.dkr.ecr.us-east-1.amazonaws.com|\n", "|us-east-2|404615174143.dkr.ecr.us-east-2.amazonaws.com|\n", "|us-gov-west-1|226302683700.dkr.ecr.us-gov-west-1.amazonaws.com|\n", "|ap-east-1|286214385809.dkr.ecr.ap-east-1.amazonaws.com|\n", "|ap-northeast-1|351501993468.dkr.ecr.ap-northeast-1.amazonaws.com|\n", "|ap-northeast-2|835164637446.dkr.ecr.ap-northeast-2.amazonaws.com|\n", "|ap-south-1|991648021394.dkr.ecr.ap-south-1.amazonaws.com|\n", "|ap-southeast-1|475088953585.dkr.ecr.ap-southeast-1.amazonaws.com|\n", "|ap-southeast-2|712309505854.dkr.ecr.ap-southeast-2.amazonaws.com|\n", "|ca-central-1|469771592824.dkr.ecr.ca-central-1.amazonaws.com|\n", "|eu-central-1|664544806723.dkr.ecr.eu-central-1.amazonaws.com|\n", "|eu-north-1|669576153137.dkr.ecr.eu-north-1.amazonaws.com|\n", "|eu-west-1|438346466558.dkr.ecr.eu-west-1.amazonaws.com|\n", "|eu-west-2|644912444149.dkr.ecr.eu-west-2.amazonaws.com|\n", "|eu-west-3|749696950732.dkr.ecr.eu-west-3.amazonaws.com|\n", "|me-south-1|249704162688.dkr.ecr.me-south-1.amazonaws.com|\n", "|sa-east-1|855470959533.dkr.ecr.sa-east-1.amazonaws.com|" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Configure your s3 bucket.\n", "S3_BUCKET = '{}-kubeflow-pipeline-data'.format(HASH)\n", "S3_PIPELINE_PATH='s3://{}/mnist_kmeans_example'.format(S3_BUCKET)\n", "\n", "# Configure your Sagemaker execution role.\n", "SAGEMAKER_ROLE_ARN=''\n", "\n", "\n", "@dsl.pipeline(\n", " name='MNIST Classification pipeline',\n", " description='MNIST Classification using KMEANS in SageMaker'\n", ")\n", "def mnist_classification(region='us-west-2',\n", " image='174872318107.dkr.ecr.us-west-2.amazonaws.com/kmeans:1',\n", " training_input_mode='File',\n", " hpo_strategy='Bayesian',\n", " hpo_metric_name='test:msd',\n", " hpo_metric_type='Minimize',\n", " hpo_early_stopping_type='Off',\n", " hpo_static_parameters='{\"k\": \"10\", \"feature_dim\": \"784\"}',\n", " hpo_integer_parameters='[{\"Name\": \"mini_batch_size\", \"MinValue\": \"500\", \"MaxValue\": \"600\"}, {\"Name\": \"extra_center_factor\", \"MinValue\": \"10\", \"MaxValue\": \"20\"}]',\n", " hpo_continuous_parameters='[]',\n", " hpo_categorical_parameters='[{\"Name\": \"init_method\", \"Values\": [\"random\", \"kmeans++\"]}]',\n", " hpo_channels='[{\"ChannelName\": \"train\", \\\n", " \"DataSource\": { \\\n", " \"S3DataSource\": { \\\n", " \"S3Uri\": \"' + S3_PIPELINE_PATH + '/train_data\", \\\n", " \"S3DataType\": \"S3Prefix\", \\\n", " \"S3DataDistributionType\": \"FullyReplicated\" \\\n", " } \\\n", " }, \\\n", " \"ContentType\": \"\", \\\n", " \"CompressionType\": \"None\", \\\n", " \"RecordWrapperType\": \"None\", \\\n", " \"InputMode\": \"File\"}, \\\n", " {\"ChannelName\": \"test\", \\\n", " \"DataSource\": { \\\n", " \"S3DataSource\": { \\\n", " \"S3Uri\": \"' + S3_PIPELINE_PATH + '/test_data\", \\\n", " \"S3DataType\": \"S3Prefix\", \\\n", " \"S3DataDistributionType\": \"FullyReplicated\" \\\n", " } \\\n", " }, \\\n", " \"ContentType\": \"\", \\\n", " \"CompressionType\": \"None\", \\\n", " \"RecordWrapperType\": \"None\", \\\n", " \"InputMode\": \"File\"}]',\n", " hpo_spot_instance='False',\n", " hpo_max_wait_time='3600',\n", " hpo_checkpoint_config='{}',\n", " output_location=S3_PIPELINE_PATH + '/output',\n", " output_encryption_key='',\n", " instance_type='ml.p3.2xlarge',\n", " instance_count='1',\n", " volume_size='50',\n", " hpo_max_num_jobs='9',\n", " hpo_max_parallel_jobs='2',\n", " max_run_time='3600',\n", " endpoint_url='',\n", " network_isolation='True',\n", " traffic_encryption='False',\n", " train_channels='[{\"ChannelName\": \"train\", \\\n", " \"DataSource\": { \\\n", " \"S3DataSource\": { \\\n", " \"S3Uri\": \"' + S3_PIPELINE_PATH + '/train_data\", \\\n", " \"S3DataType\": \"S3Prefix\", \\\n", " \"S3DataDistributionType\": \"FullyReplicated\" \\\n", " } \\\n", " }, \\\n", " \"ContentType\": \"\", \\\n", " \"CompressionType\": \"None\", \\\n", " \"RecordWrapperType\": \"None\", \\\n", " \"InputMode\": \"File\"}]',\n", " train_spot_instance='False',\n", " train_max_wait_time='3600',\n", " train_checkpoint_config='{}',\n", " batch_transform_instance_type='ml.m4.xlarge',\n", " batch_transform_input=S3_PIPELINE_PATH + '/input',\n", " batch_transform_data_type='S3Prefix',\n", " batch_transform_content_type='text/csv',\n", " batch_transform_compression_type='None',\n", " batch_transform_ouput=S3_PIPELINE_PATH + '/output',\n", " batch_transform_max_concurrent='4',\n", " batch_transform_max_payload='6',\n", " batch_strategy='MultiRecord',\n", " batch_transform_split_type='Line',\n", " role_arn=SAGEMAKER_ROLE_ARN\n", " ):\n", "\n", " hpo = sagemaker_hpo_op(\n", " region=region,\n", " endpoint_url=endpoint_url,\n", " image=image,\n", " training_input_mode=training_input_mode,\n", " strategy=hpo_strategy,\n", " metric_name=hpo_metric_name,\n", " metric_type=hpo_metric_type,\n", " early_stopping_type=hpo_early_stopping_type,\n", " static_parameters=hpo_static_parameters,\n", " integer_parameters=hpo_integer_parameters,\n", " continuous_parameters=hpo_continuous_parameters,\n", " categorical_parameters=hpo_categorical_parameters,\n", " channels=hpo_channels,\n", " output_location=output_location,\n", " output_encryption_key=output_encryption_key,\n", " instance_type=instance_type,\n", " instance_count=instance_count,\n", " volume_size=volume_size,\n", " max_num_jobs=hpo_max_num_jobs,\n", " max_parallel_jobs=hpo_max_parallel_jobs,\n", " max_run_time=max_run_time,\n", " network_isolation=network_isolation,\n", " traffic_encryption=traffic_encryption,\n", " spot_instance=hpo_spot_instance,\n", " max_wait_time=hpo_max_wait_time,\n", " checkpoint_config=hpo_checkpoint_config,\n", " role=role_arn,\n", " ).apply(use_aws_secret('aws-secret', 'AWS_ACCESS_KEY_ID', 'AWS_SECRET_ACCESS_KEY'))\n", "\n", " training = sagemaker_train_op(\n", " region=region,\n", " endpoint_url=endpoint_url,\n", " image=image,\n", " training_input_mode=training_input_mode,\n", " hyperparameters=hpo.outputs['best_hyperparameters'],\n", " channels=train_channels,\n", " instance_type=instance_type,\n", " instance_count=instance_count,\n", " volume_size=volume_size,\n", " max_run_time=max_run_time,\n", " model_artifact_path=output_location,\n", " output_encryption_key=output_encryption_key,\n", " network_isolation=network_isolation,\n", " traffic_encryption=traffic_encryption,\n", " spot_instance=train_spot_instance,\n", " max_wait_time=train_max_wait_time,\n", " checkpoint_config=train_checkpoint_config,\n", " role=role_arn,\n", " ).apply(use_aws_secret('aws-secret', 'AWS_ACCESS_KEY_ID', 'AWS_SECRET_ACCESS_KEY'))\n", "\n", " create_model = sagemaker_model_op(\n", " region=region,\n", " endpoint_url=endpoint_url,\n", " model_name=training.outputs['job_name'],\n", " image=training.outputs['training_image'],\n", " model_artifact_url=training.outputs['model_artifact_url'],\n", " network_isolation=network_isolation,\n", " role=role_arn\n", " ).apply(use_aws_secret('aws-secret', 'AWS_ACCESS_KEY_ID', 'AWS_SECRET_ACCESS_KEY'))\n", "\n", " prediction = sagemaker_deploy_op(\n", " region=region,\n", " endpoint_url=endpoint_url,\n", " model_name_1=create_model.output,\n", " ).apply(use_aws_secret('aws-secret', 'AWS_ACCESS_KEY_ID', 'AWS_SECRET_ACCESS_KEY'))\n", "\n", " batch_transform = sagemaker_batch_transform_op(\n", " region=region,\n", " endpoint_url=endpoint_url,\n", " model_name=create_model.output,\n", " instance_type=batch_transform_instance_type,\n", " instance_count=instance_count,\n", " max_concurrent=batch_transform_max_concurrent,\n", " max_payload=batch_transform_max_payload,\n", " batch_strategy=batch_strategy,\n", " input_location=batch_transform_input,\n", " data_type=batch_transform_data_type,\n", " content_type=batch_transform_content_type,\n", " split_type=batch_transform_split_type,\n", " compression_type=batch_transform_compression_type,\n", " output_location=batch_transform_ouput\n", " ).apply(use_aws_secret('aws-secret', 'AWS_ACCESS_KEY_ID', 'AWS_SECRET_ACCESS_KEY'))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "4. Compile your pipeline" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "kfp.compiler.Compiler().compile(mnist_classification, 'mnist-classification-pipeline.zip')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "5. Deploy your pipeline" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "client = kfp.Client()\n", "aws_experiment = client.create_experiment(name='aws')\n", "my_run = client.run_pipeline(aws_experiment.id, 'mnist-classification-pipeline', \n", " 'mnist-classification-pipeline.zip')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Prediction\n", "\n", "Open Sagemaker console and find your endpoint name. Please check dataset section to get train_set.\n", "\n", "Once your pipeline is done, you can find sagemaker endpoint name and replace `ENDPOINT_NAME` value with your newly created endpoint name. \n", "\n", "\n", "> Note: make sure to attach `sagemaker:InvokeEndpoint` to the worker node nodegroup that is running this jupyter notebook.\n", "\n", "```json\n", "{\n", " \"Version\": \"2012-10-17\",\n", " \"Statement\": [\n", " {\n", " \"Effect\": \"Allow\",\n", " \"Action\": [\n", " \"sagemaker:InvokeEndpoint\"\n", " ],\n", " \"Resource\": \"*\"\n", " }\n", " ]\n", "}\n", "\n", "```\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Find your Endpoint name in AWS Console\n", "\n", "Open AWS console and enter Sagemaker service, find the endpoint name as the following picture shows.\n", "\n", "![download-pipeline](./images/sm-endpoint.jpg)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import pickle, gzip, numpy, urllib.request, json\n", "from urllib.parse import urlparse\n", "import json\n", "import io\n", "import boto3\n", "\n", "# Replace the endpoint name with yours.\n", "ENDPOINT_NAME='Endpoint-20190916223205-Y635'\n", "\n", "# We will use the same dataset that was downloaded at the beginning of the notebook.\n", "\n", "# Simple function to create a csv from our numpy array\n", "def np2csv(arr):\n", " csv = io.BytesIO()\n", " numpy.savetxt(csv, arr, delimiter=',', fmt='%g')\n", " return csv.getvalue().decode().rstrip()\n", "\n", "runtime = boto3.Session(region_name='us-west-2').client('sagemaker-runtime')\n", "\n", "payload = np2csv(train_set[0][30:31])\n", "\n", "response = runtime.invoke_endpoint(EndpointName=ENDPOINT_NAME,\n", " ContentType='text/csv',\n", " Body=payload)\n", "result = json.loads(response['Body'].read().decode())\n", "print(result)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Clean up\n", "\n", "Go to Sagemaker console and delete `endpoint`, `model`." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Clean up S3 bucket\n", "Delete S3 bucket that was created for this exercise" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!aws s3 rb s3://$S3_BUCKET --force" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.3" }, "nteract": { "version": "0.22.4" } }, "nbformat": 4, "nbformat_minor": 2 }