{ "cells": [ { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "import base64\n", "import boto3\n", "from datetime import datetime\n", "from io import BytesIO\n", "import json\n", "import numpy as np\n", "from PIL import Image\n", "import sagemaker\n", "from sagemaker.tensorflow import TensorFlowModel\n", "import tarfile\n", "import tensorflow as tf\n", "import time\n", "import os" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "## sagemaker configuration\n", "sm_client = boto3.client('sagemaker')\n", "sm_session = sagemaker.Session()\n", "sm_role = sagemaker.get_execution_role()\n", "bucket = sm_session.default_bucket()\n", "s3_client = boto3.client('s3')" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "## load label dictionary\n", "with open(\"../data/label_dictionary.json\", \"r\") as f:\n", " label_dictionary = json.load(f)\n", "with open(\"../data/inverted_label_dictionary.json\", \"r\") as f:\n", " inverted_label_dictionary = json.load(f)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Get model output location\n", "In the cell below, we will pull the trained model that was launched in the previous notebook. After training, the model is stored in S3, and its URI can be found in the SageMaker console by clicking on `Training Jobs` on the left-hand frame, then clicking on the training job, then finally scrolling down to the `Output` section where you will find `S3 model artifact` (see images below for reference).\n", "\n", "The S3 key of the model artifact will be assigned to the variable `s3key_model_artifact` in this notebook.\n", "\n", "\n", "\n", "" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Local inference" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "## get the output path of the training job\n", "# pull the path from the training job using the sagemaker client\n", "s3uri_model_artifact = sm_client.describe_training_job(\n", " TrainingJobName='cinic-demo-horovod-2021-02-12-20-05-02-420')['ModelArtifacts']['S3ModelArtifacts']\n", "s3key_model_artifact = '/'.join(s3uri_model_artifact.split('/')[3:])\n", "\n", "# or find output path in the console using the instructions in the figure above\n", "base_prefix = 'distributed_training_demo/model'\n", "# s3key_model_artifact = f'{base_prefix}/cinic-demo-horovod-2021-02-12-20-05-02-420/output/model.tar.gz'" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "## download model\n", "local_model_dir = '../data/models'\n", "os.makedirs('../data/models', exist_ok=True)\n", "s3_client.download_file(bucket, s3key_model_artifact, local_model_dir +'/'+ 'horovod_model.tar.gz')" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [], "source": [ "## untar and load models\n", "tar_filepath = os.path.join(local_model_dir, 'horovod_model.tar.gz')\n", "extracted_dir = os.path.join(local_model_dir, 'horovod_model')\n", "with tarfile.open(tar_filepath) as tarred_file:\n", " tarred_file.extractall(extracted_dir)\n", "model_path = os.path.join(extracted_dir, 'cinic10_classifier', '1')\n", "model = tf.keras.models.load_model(model_path)" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [], "source": [ "## load test dataset for evaluation\n", "\n", "test_path = f'../data/sharded_tfrecords/test/'\n", "files = [os.path.join(r,file) for r,d,f in os.walk(test_path) for file in f]\n", "test_set = tf.data.TFRecordDataset(files)\n", "\n", "def _dataset_parser(value):\n", " \n", " # create a dictionary describing the features \n", " sample_feature_description = {\n", " 'image': tf.io.FixedLenFeature([], tf.string),\n", " 'label': tf.io.FixedLenFeature([], tf.int64),\n", " }\n", "\n", " # parse to tf\n", " example = tf.io.parse_single_example(value, sample_feature_description)\n", " \n", " # decode from bytes to tf types\n", " # NOTE: example key must match the name of the Input layer in the keras model\n", " example['image'] = tf.io.decode_raw(example['image'], tf.uint8)\n", " example['image'] = tf.reshape(example['image'], (32,32,3))\n", " \n", " # preprocess for resnset\n", " # see https://www.tensorflow.org/api_docs/python/tf/keras/applications/resnet_v2/preprocess_input\n", " example['image'] = tf.cast(example['image'], tf.float32)\n", " example['image'] = tf.keras.applications.resnet_v2.preprocess_input(example['image'])\n", " \n", " # parse for input to neural network and loss function\n", " sample_data = {'image_input': example['image']}\n", "\n", " label = tf.cast(example['label'], tf.int32)\n", " label = tf.one_hot(indices=label, depth=10)\n", " \n", " return sample_data, label\n", "\n", "test_set = test_set.map(_dataset_parser, num_parallel_calls=tf.data.experimental.AUTOTUNE)\n", "test_set = test_set.batch(128)" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "211/211 [==============================] - 41s 196ms/step - loss: 0.7631 - categorical_accuracy: 0.7422\n" ] } ], "source": [ "## inference on test set\n", "_ = model.evaluate(test_set)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Real-time endpoint inference" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "update_endpoint is a no-op in sagemaker>=2.\n", "See: https://sagemaker.readthedocs.io/en/stable/v2.html for details.\n", "Using already existing model: cinic-10-classifier\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "---------------!CPU times: user 25.3 s, sys: 2.39 s, total: 27.7 s\n", "Wall time: 8min\n" ] } ], "source": [ "%%time\n", "## create model for batch transform (can also be used for real-time inference)\n", "endpoint_name = 'cinic-10-classifier'\n", "model_name = endpoint_name\n", "instance_type = 'ml.g4dn.xlarge'\n", "\n", "## if specifying the exact container, use \"image_uri\"\n", "# https://github.com/aws/deep-learning-containers/blob/master/available_images.md\n", "image_uri = '763104351884.dkr.ecr.us-east-1.amazonaws.com/tensorflow-inference:2.3.1-gpu-py37-cu102-ubuntu18.04'\n", "\n", "## if specifying only the framework version, for which the container is subject to change, use \"framework_version\"\n", "# https://sagemaker.readthedocs.io/en/stable/frameworks/tensorflow/sagemaker.tensorflow.html#tensorflow-serving-model\n", "framework_version = '2.3'\n", "\n", "## create model and deploy\n", "sm_model = TensorFlowModel(\n", " model_data=f's3://{bucket}/{s3key_model_artifact}',\n", " image_uri=image_uri,\n", "# framework_version=framework_version,\n", " source_dir='../source_directory/inference',\n", " entry_point='inference.py',\n", " role=sm_role,\n", " sagemaker_session=sm_session,\n", " name=endpoint_name,\n", ")\n", "\n", "sm_predictor = sm_model.deploy(\n", " instance_type=instance_type,\n", " initial_instance_count=1,\n", " endpoint_name=endpoint_name,\n", ")" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [], "source": [ "## setup to invoke endpoint\n", "sm_runtime_client = boto3.client('sagemaker-runtime')\n", "image_np = np.random.randint(low=0, high=255, size=(32,32,3))\n", "image_list = image_np.tolist()\n", "image_json = json.dumps(image_list)" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [], "source": [ "## invoke endpoint for inference\n", "response = sm_runtime_client.invoke_endpoint(\n", " EndpointName=endpoint_name,\n", " Body=image_json,\n", " ContentType=\"application/json\",\n", ")\n", "endpoint_vector = json.loads(response['Body'].read().decode('utf-8'))['predictions'][0]" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [], "source": [ "## local inference\n", "image_np = np.expand_dims(image_np, axis=0)\n", "local_vector = model.predict(tf.keras.applications.resnet_v2.preprocess_input(image_np))\n", "local_vector = local_vector.tolist()[0]" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "All elements are within relative tolerance of 1e-05\n" ] } ], "source": [ "## check where (if) the endpoint prediction and local prediction disagree\n", "rtol = 1e-5\n", "not_close_idx = np.where(np.isclose(endpoint_vector, local_vector, rtol=rtol) == False)[0]\n", "for i in not_close_idx:\n", " print(\"Component {} does not agree:\".format(i))\n", " print(\"\\tendpoint_vector: {}\".format(endpoint_vector[i]))\n", " print(\"\\tlocal_vector: {}\".format(local_vector[i]))\n", "if not_close_idx.size < 1:\n", " print(\"All elements are within relative tolerance of {}\".format(rtol))" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [], "source": [ "## delete endpoint and model\n", "sm_predictor.delete_endpoint()\n", "sm_predictor.delete_model()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Batch job inference" ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "........................................................................................................!\n", "CPU times: user 25.8 s, sys: 2.38 s, total: 28.2 s\n", "Wall time: 9min 7s\n" ] } ], "source": [ "%%time\n", "env = {'SAGEMAKER_TFS_ENABLE_BATCHING': 'true',\n", " 'SAGEMAKER_TFS_BATCH_TIMEOUT_MICROS': '50000',\n", " 'SAGEMAKER_TFS_MAX_BATCH_SIZE': '16'}\n", "\n", "sm_model_transformer = sm_model.transformer(\n", " instance_count=2,\n", " strategy='SingleRecord',\n", " instance_type='ml.p3.2xlarge',\n", " max_concurrent_transforms=64,\n", " output_path=f's3://{bucket}/distributed_training_demo/batch_transform_output/',\n", " env=env,\n", " assemble_with='Line')\n", "\n", "sm_model_transformer.transform(\n", " data=f's3://{bucket}/distributed_training_demo/data/test/',\n", " data_type='S3Prefix',\n", " split_type='TFRecord',\n", " content_type='application/x-tfexample',\n", " wait=True,\n", " logs=False,)" ] }, { "cell_type": "code", "execution_count": 20, "metadata": {}, "outputs": [ { "data": { "image/png": "\n", "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "dog\n" ] } ], "source": [ "## parse and confirm data in tfrecords\n", "\n", "test_index = 21\n", "\n", "# load and test the tf record files\n", "test_image_s3key = 'distributed_training_demo/data/test/test_000.tfrecords'\n", "test_set = tf.data.TFRecordDataset(f's3://{bucket}/{test_image_s3key}')\n", "\n", "# Create a dictionary describing the features.\n", "sample_feature_description = {\n", " 'image': tf.io.FixedLenFeature([], tf.string),\n", " 'label': tf.io.FixedLenFeature([], tf.int64),\n", "}\n", "\n", "def _parse_sample_function(example_proto):\n", " return tf.io.parse_single_example(example_proto, sample_feature_description)\n", "\n", "test_set = test_set.map(_parse_sample_function)\n", "test_set = test_set.batch(1)\n", "for i, sample_features in enumerate(test_set):\n", " image = tf.io.decode_raw(sample_features['image'], tf.uint8)\n", " image = tf.reshape(image, (32,32,3))\n", " \n", " label = sample_features['label']\n", " \n", " if i==test_index: break\n", "\n", "display(Image.fromarray(image.numpy()).resize((128,128)))\n", "print(inverted_label_dictionary[str(int(label.numpy()))])" ] }, { "cell_type": "code", "execution_count": 21, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "dog\n" ] } ], "source": [ "## local inference\n", "processed_image = tf.reshape(tf.keras.applications.resnet_v2.preprocess_input(image.numpy()), (1,32,32,3))\n", "local_vector = model.predict(processed_image)[0]\n", "local_prediction = np.argmax(local_vector)\n", "print(inverted_label_dictionary[str(int(local_prediction))])" ] }, { "cell_type": "code", "execution_count": 22, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "dog\n" ] } ], "source": [ "## get inference from batch transform job\n", "test_image_s3key = 'distributed_training_demo/batch_transform_output/test_000.tfrecords.out'\n", "bt_inference = s3_client.get_object(Bucket=bucket, Key=test_image_s3key)\n", "predictions = bt_inference['Body'].read().decode(\"utf-8\")\n", "bt_vector = np.array(json.loads(predictions.split()[test_index])['predictions'][0])\n", "bt_prediction = np.argmax(bt_vector)\n", "print(inverted_label_dictionary[str(int(bt_prediction))])" ] }, { "cell_type": "code", "execution_count": 23, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "All elements are within relative tolerance of 1e-05\n" ] } ], "source": [ "## check where (if) the endpoint prediction and local prediction disagree\n", "rtol = 1e-5\n", "not_close_idx = np.where(np.isclose(local_vector, bt_vector, rtol=rtol) == False)[0]\n", "for i in not_close_idx:\n", " print(\"Component {} does not agree:\".format(i))\n", " print(\"\\tendpoint_vector: {}\".format(endpoint_vector[i]))\n", " print(\"\\tlocal_vector: {}\".format(local_vector[i]))\n", "if not_close_idx.size < 1:\n", " print(\"All elements are within relative tolerance of {}\".format(rtol))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "conda_tensorflow2_p36", "language": "python", "name": "conda_tensorflow2_p36" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.10" } }, "nbformat": 4, "nbformat_minor": 4 }