## Ensemble model inference with NVIDIA Triton Inference Server and NVIDIA DALI on Amazon SageMaker


---

This notebook's CI test result for us-west-2 is as follows. CI test results in other regions can be found at the end of the notebook. 

![This us-west-2 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/us-west-2/sagemaker-triton|ensemble|dali-tf-inception|tf-dali-ensemble-cv.ipynb)

---


Deep learning applications are often complex, requiring multi-stage data loading and pre-processing pipelines. Optimizing these pre-processing steps are critical to achieve best performing inference workloads. In a computer vision application, pre-processing pipelines may include steps like image loading, cropping, image decoding, image resizing and other image augmentations. These data processing pipelines can be a bottleneck, limiting the performance and scalability of deep learning inference. Additionally, these pre-processing implementations can result in challenges like portability of inference workloads and code maintainability.

In this notebook, we will deep dive into NVIDIA DALI pre-processing pipeline implementation for Inception V3 model. Pipeline implements image pre-processing steps like resize, decoder and crop. Serialize the pipeline and create a model configuration to be deployed with NVIDIA Triton Inference server. Finally, we deploy the Inception V3 model to an Amazon SageMaker real time endpoint using Triton Inference Deep Learning containers.

### NVIDIA DALI

The NVIDIA Data Loading Library (DALI) is a library for data loading and pre-processing to accelerate deep learning applications. It provides a collection of highly optimized building blocks for loading and processing image, video and audio data. It can be used as a portable drop-in replacement for built in data loaders and data iterators in popular deep learning frameworks.

DALI addresses the problem of the CPU bottleneck by offloading data preprocessing to the GPU. Additionally, DALI relies on its own execution engine, built to maximize the throughput of the input pipeline. Features such as prefetching, parallel execution, and batch processing are handled transparently for the user. Data processing pipelines implemented using DALI are portable because they can easily be retargeted to TensorFlow, PyTorch, MXNet and PaddlePaddle.

<img src="images/dali.png" alt="DALI" width="700" />



#### Highlights

- Easy integration with NVIDIA Triton Inference Serve

- Multiple data formats support - RecordIO, TFRecord, COCO, JPEG etc

- Portable across popular deep learning frameworks: TensorFlow, PyTorch, MXNet.

- Supports CPU and GPU execution.

- Scalable across multiple GPUs.

- Flexible graphs let developers create custom pipelines.

## Triton Model Ensembles

Triton Inference Server greatly simplifies the deployment of AI models at scale in production. Triton Server comes with a convenient solution that simplifies building pre-processing and post-processing pipelines. Triton Server platform provides the ensemble scheduler, which is responsible for pipelining models participating in the inference process while ensuring efficiency and optimizing throughput. 

<img src="images/triton-ensemble.png" alt="triton-ensemble" width="500" align="left"/>


## Set up

Install the dependencies required to package the model and run inferences using SageMaker Triton server.

In [None]:
!pip install -qU pip awscli boto3 sagemaker --quiet
!pip install nvidia-pyindex --quiet
!pip install tritonclient[http] --quiet

Execute the following command to install the latest DALI for specified CUDA version

### Note: We are installing NVIDIA DALI Cuda in the below step. You need to execute this notebook on a GPU based instance. 

In [None]:
!pip install --extra-index-url https://developer.download.nvidia.com/compute/redist --upgrade nvidia-dali-cuda110

### Imports

In [None]:
import boto3, json, sagemaker, time
from sagemaker import get_execution_role
import nvidia.dali as dali
import nvidia.dali.types as types

### Variables

In [None]:
# SageMaker varaibles
sm_client = boto3.client(service_name="sagemaker")
runtime_sm_client = boto3.client("sagemaker-runtime")
sagemaker_session = sagemaker.Session(boto_session=boto3.Session())
role = get_execution_role()

# Other Variables
instance_type = "ml.g4dn.4xlarge"
sm_model_name = "triton-tf-dali-ensemble-" + time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime())
endpoint_config_name = "triton-tf-dali-ensemble-" + time.strftime(
    "%Y-%m-%d-%H-%M-%S", time.gmtime()
)
endpoint_name = "triton-tf-dali-ensemble-" + time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime())

## Download models and set up pre-processing pipeline with DALI

Create directories to host DALI ensemble models into the model repository. The following example shows the model repository directory structure, containing a DALI preprocessing model, TensorFlow Inception v3 model, and the model ensemble 

<img src="images/model-repo.png" alt="model-repo" width="300" align="left"/>

In [None]:
!mkdir -p model_repository/inception_graphdef/1
!mkdir -p model_repository/dali/1
!mkdir -p model_repository/ensemble_dali_inception/1

Next, we will download Inception V3 model, this is an image classification neural network model

In [None]:
!wget -O /tmp/inception_v3_2016_08_28_frozen.pb.tar.gz \
     https://storage.googleapis.com/download.tensorflow.org/models/inception_v3_2016_08_28_frozen.pb.tar.gz

Place the downloaded Inception V3 model in model repository under `inception_graphdef` folder

In [None]:
!(cd /tmp && tar xzf inception_v3_2016_08_28_frozen.pb.tar.gz)
!mv /tmp/inception_v3_2016_08_28_frozen.pb model_repository/inception_graphdef/1/model.graphdef

Model configuration of ensemble model for image classification and dali pre-processing is shown below

In [None]:
%%writefile model_repository/ensemble_dali_inception/config.pbtxt
name: "ensemble_dali_inception"
platform: "ensemble"
max_batch_size: 256
input [
  {
    name: "INPUT"
    data_type: TYPE_UINT8
    dims: [ -1 ]
  }
]
output [
  {
    name: "OUTPUT"
    data_type: TYPE_FP32
    dims: [ 1001 ]
  }
]
ensemble_scheduling {
  step [
    {
      model_name: "dali"
      model_version: -1
      input_map {
        key: "DALI_INPUT_0"
        value: "INPUT"
      }
      output_map {
        key: "DALI_OUTPUT_0"
        value: "preprocessed_image"
      }
    },
    {
      model_name: "inception_graphdef"
      model_version: -1
      input_map {
        key: "input"
        value: "preprocessed_image"
      }
      output_map {
        key: "InceptionV3/Predictions/Softmax"
        value: "OUTPUT"
      }
    }
  ]
}

Model configuration for dali backend

In [None]:
%%writefile model_repository/dali/config.pbtxt
name: "dali"
backend: "dali"
max_batch_size: 256
input [
  {
    name: "DALI_INPUT_0"
    data_type: TYPE_UINT8
    dims: [ -1 ]
  }
]
output [
  {
    name: "DALI_OUTPUT_0"
    data_type: TYPE_FP32
    dims: [ 299, 299, 3 ]
  }
]
parameters: [
  {
    key: "num_threads"
    value: { string_value: "12" }
  }
]

Model configurations containing inception model graph definition

In [None]:
%%writefile model_repository/inception_graphdef/config.pbtxt
name: "inception_graphdef"
platform: "tensorflow_graphdef"
max_batch_size: 256
input [
  {
    name: "input"
    data_type: TYPE_FP32
    format: FORMAT_NHWC
    dims: [ 299, 299, 3 ]
  }
]
output [
  {
    name: "InceptionV3/Predictions/Softmax"
    data_type: TYPE_FP32
    dims: [ 1001 ]
    label_filename: "inception_labels.txt"
  }
]

We will copy the inception classification model labels to `inception_graphdef` directory in model repository. The labels file contain 1000 class labels of [ImageNet](https://image-net.org/download.php) classification dataset.

In [None]:
!mkdir -p model_repository/inception_graphdef
s3_client = boto3.client("s3")
s3_client.download_file(
    f"sagemaker-example-files-prod-{sagemaker_session.boto_region_name}",
    "datasets/labels/inception_labels.txt",
    "model_repository/inception_graphdef/inception_labels.txt",
)

### DALI Pipeline

In DALI, any data processing task has a central object called Pipeline. Pipeline object is an instance of `nvidia.dali.Pipeline`. Pipeline encapsulates the data processing graph and the execution engine. You can define a DALI pipeline by implementing a function that uses DALI operators inside and decorating it with the `pipeline_def()` decorator. 

DALI pipelines are executed in stages. The stages correspond to the device parameter that can be specified for the operator, and are executed in following order:

1. 'cpu' - operators that accept CPU inputs and produce CPU outputs.

2. 'mixed' - operators that accept CPU inputs and produce GPU outputs, for example nvidia.dali.fn.decoders.image().

3. 'gpu' - operators that accept GPU inputs and produce GPU outputs.

#### Parameters
1. batch_size - Maximum batch size of the pipeline
2. num_threads - Number of CPU threads used by the pipeline
3. device_id - Id of GPU used by the pipeline

In [None]:
@dali.pipeline_def(batch_size=3, num_threads=1, device_id=0)
def pipe():
    """Create a pipeline which reads images and masks, decodes the images and returns them."""
    images = dali.fn.external_source(device="cpu", name="DALI_INPUT_0")
    images = dali.fn.decoders.image(images, device="mixed", output_type=types.RGB)
    images = dali.fn.resize(
        images, resize_x=299, resize_y=299
    )  # resize image to the default 299x299 size
    images = dali.fn.crop_mirror_normalize(
        images,
        dtype=types.FLOAT,
        output_layout="HWC",
        crop=(299, 299),  # crop image to the default 299x299 size
        mean=[0.485 * 255, 0.456 * 255, 0.406 * 255],  # crop a central region of the image
        std=[0.229 * 255, 0.224 * 255, 0.225 * 255],  # crop a central region of the image
    )
    return images

Serialize the pipeline to a Protobuf string, `filename` is the File where serialized pipeline will be written

In [None]:
pipe().serialize(filename="model_repository/dali/1/model.dali")

## Get Triton Inference Server Container image

Now that we have set up the DALI pipelines, we will get the SageMaker Triton image from ECR and use it to deploy the Inception V3 model to Amazon SageMaker real time endpoint

In [None]:
account_id_map = {
    "us-east-1": "785573368785",
    "us-east-2": "007439368137",
    "us-west-1": "710691900526",
    "us-west-2": "301217895009",
    "eu-west-1": "802834080501",
    "eu-west-2": "205493899709",
    "eu-west-3": "254080097072",
    "eu-north-1": "601324751636",
    "eu-south-1": "966458181534",
    "eu-central-1": "746233611703",
    "ap-east-1": "110948597952",
    "ap-south-1": "763008648453",
    "ap-northeast-1": "941853720454",
    "ap-northeast-2": "151534178276",
    "ap-southeast-1": "324986816169",
    "ap-southeast-2": "355873309152",
    "cn-northwest-1": "474822919863",
    "cn-north-1": "472730292857",
    "sa-east-1": "756306329178",
    "ca-central-1": "464438896020",
    "me-south-1": "836785723513",
    "af-south-1": "774647643957",
}

In [None]:
region = boto3.Session().region_name
if region not in account_id_map.keys():
    raise ("UNSUPPORTED REGION")

In [None]:
base = "amazonaws.com.cn" if region.startswith("cn-") else "amazonaws.com"
triton_image_uri = "{account_id}.dkr.ecr.{region}.{base}/sagemaker-tritonserver:21.08-py3".format(
    account_id=account_id_map[region], region=region, base=base
)

Let's create the model artifact 

In [None]:
!tar -cvzf model.tar.gz -C model_repository .

Once the content of the model repository directory tar'd to `model.tar.gz` file, we will upload the model artifacts to model_uri S3 location

In [None]:
model_uri = sagemaker_session.upload_data(
    path="model.tar.gz", key_prefix="triton-serve-tf-dali-ensemble"
)
print(model_uri)

### Create SageMaker Endpoint

We start off by creating a [SageMaker model](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_CreateModel.html) from the model artifacts we uploaded to s3 in the previous step.

In this step we also provide an additional Environment Variable i.e. `SAGEMAKER_TRITON_DEFAULT_MODEL_NAME` which specifies the name of the model to be loaded by Triton. **The value of this key should match the folder name in the model package uploaded to s3**. This variable is optional in case of a single model. In case of ensemble models, this key **has to be** specified for Triton to startup in SageMaker.

Additionally, customers can set `SAGEMAKER_TRITON_BUFFER_MANAGER_THREAD_COUNT` and `SAGEMAKER_TRITON_THREAD_COUNT` for optimizing the thread counts.

**Note**: The current release of Triton (21.08-py3) on SageMaker doesn't support running instances of different models on the same server, except in case of [ensembles](https://github.com/triton-inference-server/server/blob/main/docs/architecture.md#ensemble-models). Only multiple model instances of the same model are supported, which can be specified under the [instance-groups](https://github.com/triton-inference-server/server/blob/main/docs/model_configuration.md#instance-groups) section of the config.pbtxt file.

In [None]:
container = {
    "Image": triton_image_uri,
    "ModelDataUrl": model_uri,
    "Environment": {"SAGEMAKER_TRITON_DEFAULT_MODEL_NAME": "ensemble_dali_inception"},
}

create_model_response = sm_client.create_model(
    ModelName=sm_model_name, ExecutionRoleArn=role, PrimaryContainer=container
)

model_arn = create_model_response["ModelArn"]

print(f"Model Arn: {model_arn}")

Using the model above, we create an [endpoint configuration](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_CreateEndpointConfig.html) where we can specify the type and number of instances we want in the endpoint.

In [None]:
create_endpoint_config_response = sm_client.create_endpoint_config(
    EndpointConfigName=endpoint_config_name,
    ProductionVariants=[
        {
            "InstanceType": instance_type,
            "InitialVariantWeight": 1,
            "InitialInstanceCount": 1,
            "ModelName": sm_model_name,
            "VariantName": "AllTraffic",
        }
    ],
)

endpoint_config_arn = create_endpoint_config_response["EndpointConfigArn"]

print(f"Endpoint Config Arn: {endpoint_config_arn}")

Using the above endpoint configuration we create a new sagemaker endpoint and wait for the deployment to finish. The status will change to **InService** once the deployment is successful.

In [None]:
create_endpoint_response = sm_client.create_endpoint(
    EndpointName=endpoint_name, EndpointConfigName=endpoint_config_name
)

endpoint_arn = create_endpoint_response["EndpointArn"]

print(f"Endpoint Arn: {endpoint_arn}")

In [None]:
rv = sm_client.describe_endpoint(EndpointName=endpoint_name)
status = rv["EndpointStatus"]
print(f"Endpoint Creation Status: {status}")

while status == "Creating":
    time.sleep(60)
    rv = sm_client.describe_endpoint(EndpointName=endpoint_name)
    status = rv["EndpointStatus"]
    print(f"Endpoint Creation Status: {status}")

endpoint_arn = rv["EndpointArn"]

print(f"Endpoint Arn: {endpoint_arn}")
print(f"Endpoint Status: {status}")

### Prepare inference payload

Let's download an image from SageMaker S3 bucket to be used for Inception V3 model inference. This image will go through pre-processing DALI pipeline and used in ensemble scheduler provided by Triton Inference server.

In [None]:
sample_img_fname = "shiba_inu_dog.jpg"

In [None]:
import numpy as np

s3_client.download_file(
    f"sagemaker-example-files-prod-{sagemaker_session.boto_region_name}",
    "datasets/image/pets/shiba_inu_dog.jpg",
    sample_img_fname,
)

In [None]:
def load_image(img_path):
    """
    Loads image as an encoded array of bytes.
    This is a typical approach you want to use in DALI backend
    """
    with open(img_path, "rb") as f:
        img = f.read()
        return np.array(list(img)).astype(np.uint8)

In [None]:
rv = load_image(sample_img_fname)
print(f"Shape of image {rv.shape}")

In [None]:
rv2 = np.expand_dims(rv, 0)
print(f"Shape of expanded image array {rv2.shape}")

Prepare input payload with the name, shape, datatype and the data as list. This payload will be used to invoke the endpoint to get the prediction results

In [None]:
payload = {
    "inputs": [
        {
            "name": "INPUT",
            "shape": rv2.shape,
            "datatype": "UINT8",
            "data": rv2.tolist(),
        }
    ]
}

### Run inference

Once we have the endpoint running we can use the sample image downloaded to do an inference using json as the payload format. For inference request format, Triton uses the KFServing community standard [inference protocols](https://github.com/triton-inference-server/server/blob/main/docs/protocol/README.md).

In [None]:
%%timeit

response = runtime_sm_client.invoke_endpoint(
    EndpointName=endpoint_name, ContentType="application/octet-stream", Body=json.dumps(payload)
)

print(json.loads(response["Body"].read().decode("utf8")))

We can also use binary+json as the payload format to get better performance for the inference call. The specification of this format is provided [here](https://github.com/triton-inference-server/server/blob/main/docs/protocol/extension_binary_data.md).

**Note:** With the `binary+json` format, we have to specify the length of the request metadata in the header to allow Triton to correctly parse the binary payload. This is done using a custom Content-Type header `application/vnd.sagemaker-triton.binary+json;json-header-size={}`.

Please not, this is different from using `Inference-Header-Content-Length` header on a stand-alone Triton server since custom headers are not allowed in SageMaker.

The `tritonclient` package provides utility methods to generate the payload without having to know the details of the specification. We'll use the following methods to convert our inference request into a binary format which provides lower latencies for inference.

In [None]:
import tritonclient.http as httpclient


def get_sample_image_binary(img_path, input_name, output_name):
    inputs = []
    outputs = []
    input_data = load_image(img_path)
    input_data = np.expand_dims(input_data, axis=0)
    inputs.append(httpclient.InferInput(input_name, input_data.shape, "UINT8"))
    inputs[0].set_data_from_numpy(input_data, binary_data=True)
    outputs.append(httpclient.InferRequestedOutput(output_name, binary_data=True))
    request_body, header_length = httpclient.InferenceServerClient.generate_request_body(
        inputs, outputs=outputs
    )
    return request_body, header_length

In [None]:
request_body, header_length = get_sample_image_binary(sample_img_fname, "INPUT", "OUTPUT")

In [None]:
%%timeit

response = runtime_sm_client.invoke_endpoint(
    EndpointName=endpoint_name,
    ContentType="application/vnd.sagemaker-triton.binary+json;json-header-size={}".format(
        header_length
    ),
    Body=request_body,
)

We use `invoke_endpoint` to pass in the payload in binary json format to the endpoint.

In [None]:
response = runtime_sm_client.invoke_endpoint(
    EndpointName=endpoint_name,
    ContentType="application/vnd.sagemaker-triton.binary+json;json-header-size={}".format(
        header_length
    ),
    Body=request_body,
)

# Parse json header size length from the response
header_length_prefix = "application/vnd.sagemaker-triton.binary+json;json-header-size="
header_length_str = response["ContentType"][len(header_length_prefix) :]

# Read response body
result = httpclient.InferenceServerClient.parse_response_body(
    response["Body"].read(), header_length=int(header_length_str)
)
output0_data = result.as_numpy("OUTPUT")
print(output0_data)

### Delete endpoint and model artifacts

Finally, we clean up the model artifacts i.e. SageMaker model, endpoint configuration and the endpoint.

In [None]:
sm_client.delete_model(ModelName=sm_model_name)
sm_client.delete_endpoint_config(EndpointConfigName=endpoint_config_name)
sm_client.delete_endpoint(EndpointName=endpoint_name)

### Conclusion

In this notebook, we implemented a model ensemble using NIVIDA Triton inference server and pre-processed images using NVIDIA DALI pipelines. This significantly accelerates model inference in terms of overall latency and throughput. Try it out! 

## Notebook CI Test Results

This notebook was tested in multiple regions. The test results are as follows, except for us-west-2 which is shown at the top of the notebook.

![This us-east-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/us-east-1/sagemaker-triton|ensemble|dali-tf-inception|tf-dali-ensemble-cv.ipynb)

![This us-east-2 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/us-east-2/sagemaker-triton|ensemble|dali-tf-inception|tf-dali-ensemble-cv.ipynb)

![This us-west-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/us-west-1/sagemaker-triton|ensemble|dali-tf-inception|tf-dali-ensemble-cv.ipynb)

![This ca-central-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/ca-central-1/sagemaker-triton|ensemble|dali-tf-inception|tf-dali-ensemble-cv.ipynb)

![This sa-east-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/sa-east-1/sagemaker-triton|ensemble|dali-tf-inception|tf-dali-ensemble-cv.ipynb)

![This eu-west-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/eu-west-1/sagemaker-triton|ensemble|dali-tf-inception|tf-dali-ensemble-cv.ipynb)

![This eu-west-2 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/eu-west-2/sagemaker-triton|ensemble|dali-tf-inception|tf-dali-ensemble-cv.ipynb)

![This eu-west-3 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/eu-west-3/sagemaker-triton|ensemble|dali-tf-inception|tf-dali-ensemble-cv.ipynb)

![This eu-central-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/eu-central-1/sagemaker-triton|ensemble|dali-tf-inception|tf-dali-ensemble-cv.ipynb)

![This eu-north-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/eu-north-1/sagemaker-triton|ensemble|dali-tf-inception|tf-dali-ensemble-cv.ipynb)

![This ap-southeast-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/ap-southeast-1/sagemaker-triton|ensemble|dali-tf-inception|tf-dali-ensemble-cv.ipynb)

![This ap-southeast-2 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/ap-southeast-2/sagemaker-triton|ensemble|dali-tf-inception|tf-dali-ensemble-cv.ipynb)

![This ap-northeast-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/ap-northeast-1/sagemaker-triton|ensemble|dali-tf-inception|tf-dali-ensemble-cv.ipynb)

![This ap-northeast-2 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/ap-northeast-2/sagemaker-triton|ensemble|dali-tf-inception|tf-dali-ensemble-cv.ipynb)

![This ap-south-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/ap-south-1/sagemaker-triton|ensemble|dali-tf-inception|tf-dali-ensemble-cv.ipynb)
