# Multiple Ensembles with GPU models using Amazon SageMaker in MME mode

In this notebook, we will re-use a couple of examples listed under the parent folder ../ensemble/, and deploy them using MME. In order to create a working example and for clarify reasons, the relevant part of the notebooks are re-listed here.

#### A. TF+DALI Ensemble

In this ensemble, the DALI pipeline pre-processes the input using CPU. The input from this model is fed into the TF Inception model, which runs on GPU

#### B. TRT+Python Ensemble

In this ensemble, a TRT model (BERT) and the post-process python models run on GPU, whereas the pre-process model runs on CPU

#### In both the examples, one more GPU models are executed on the same host, and each example is an ensemble with multiple models working together to create a pipeline reflective of a single model

## 1.Setup

In [None]:
!pip install -qU pip awscli boto3 sagemaker --quiet
!pip install nvidia-pyindex --quiet
!pip install tritonclient[http] --quiet

In [None]:
# Note: We are installing NVIDIA DALI Cuda in the below step. You need to execute this notebook on a GPU based instance.
!pip install --extra-index-url https://developer.download.nvidia.com/compute/redist --upgrade nvidia-dali-cuda110

In [None]:
import boto3, json, sagemaker, time
from sagemaker import get_execution_role
import nvidia.dali as dali
import nvidia.dali.types as types

In [None]:
# SageMaker varaibles
sm_client = boto3.client(service_name="sagemaker")
runtime_sm_client = boto3.client("sagemaker-runtime")
sagemaker_session = sagemaker.Session(boto_session=boto3.Session())
role = get_execution_role()

# Other Variables
instance_type = "ml.g4dn.4xlarge"
sm_model_name = "triton-tf-dali-ensemble-" + time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime())
endpoint_config_name = "triton-tf-dali-ensemble-" + time.strftime(
 "%Y-%m-%d-%H-%M-%S", time.gmtime()
)
endpoint_name = "triton-tf-dali-ensemble-" + time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime())

## 2. TF+DALI Ensemble

In [None]:
!mkdir -p model_repository/inception_graphdef/1
!mkdir -p model_repository/dali/1
!mkdir -p model_repository/ensemble_dali_inception/1

In [None]:
!wget -O /tmp/inception_v3_2016_08_28_frozen.pb.tar.gz \
 https://storage.googleapis.com/download.tensorflow.org/models/inception_v3_2016_08_28_frozen.pb.tar.gz

In [None]:
!(cd /tmp && tar xzf inception_v3_2016_08_28_frozen.pb.tar.gz)
!mv /tmp/inception_v3_2016_08_28_frozen.pb model_repository/inception_graphdef/1/model.graphdef

Write model config for ensemble

In [None]:
%%writefile model_repository/ensemble_dali_inception/config.pbtxt
name: "ensemble_dali_inception"
platform: "ensemble"
max_batch_size: 256
input [
 {
 name: "INPUT"
 data_type: TYPE_UINT8
 dims: [ -1 ]
 }
]
output [
 {
 name: "OUTPUT"
 data_type: TYPE_FP32
 dims: [ 1001 ]
 }
]
ensemble_scheduling {
 step [
 {
 model_name: "dali"
 model_version: -1
 input_map {
 key: "DALI_INPUT_0"
 value: "INPUT"
 }
 output_map {
 key: "DALI_OUTPUT_0"
 value: "preprocessed_image"
 }
 },
 {
 model_name: "inception_graphdef"
 model_version: -1
 input_map {
 key: "input"
 value: "preprocessed_image"
 }
 output_map {
 key: "InceptionV3/Predictions/Softmax"
 value: "OUTPUT"
 }
 }
 ]
}

Model config for DALI backend

In [None]:
%%writefile model_repository/dali/config.pbtxt
name: "dali"
backend: "dali"
max_batch_size: 256
input [
 {
 name: "DALI_INPUT_0"
 data_type: TYPE_UINT8
 dims: [ -1 ]
 }
]
output [
 {
 name: "DALI_OUTPUT_0"
 data_type: TYPE_FP32
 dims: [ 299, 299, 3 ]
 }
]
parameters: [
 {
 key: "num_threads"
 value: { string_value: "12" }
 }
]

Model config for inception, using GPU

In [None]:
%%writefile model_repository/inception_graphdef/config.pbtxt
name: "inception_graphdef"
platform: "tensorflow_graphdef"
max_batch_size: 256
input [
 {
 name: "input"
 data_type: TYPE_FP32
 format: FORMAT_NHWC
 dims: [ 299, 299, 3 ]
 }
]
output [
 {
 name: "InceptionV3/Predictions/Softmax"
 data_type: TYPE_FP32
 dims: [ 1001 ]
 label_filename: "inception_labels.txt"
 }
]
instance_group [
 {
 kind: KIND_GPU
 }
]

Download inception_labels.txt

In [None]:
region = boto3.Session().region_name

s3_client = boto3.client("s3")
s3_client.download_file(
 f"sagemaker-example-files-prod-{region}", "datasets/labels/inception_labels.txt", f"model_repository/inception_graphdef/inception_labels.txt"
)

Create DALI Pipeline

In [None]:
@dali.pipeline_def(batch_size=3, num_threads=1, device_id=0)
def pipe():
 """Create a pipeline which reads images and masks, decodes the images and returns them."""
 images = dali.fn.external_source(device="cpu", name="DALI_INPUT_0")
 images = dali.fn.decoders.image(images, device="mixed", output_type=types.RGB)
 images = dali.fn.resize(
 images, resize_x=299, resize_y=299
 ) # resize image to the default 299x299 size
 images = dali.fn.crop_mirror_normalize(
 images,
 dtype=types.FLOAT,
 output_layout="HWC",
 crop=(299, 299), # crop image to the default 299x299 size
 mean=[0.485 * 255, 0.456 * 255, 0.406 * 255], # crop a central region of the image
 std=[0.229 * 255, 0.224 * 255, 0.225 * 255], # crop a central region of the image
 )
 return images


pipe().serialize(filename="model_repository/dali/1/model.dali")

Upload model artifacts to S3

In [None]:
!tar -cvzf model_tf_dali.tar.gz -C model_repository .
model_uri = sagemaker_session.upload_data(
 path="model_tf_dali.tar.gz", key_prefix="triton-mme-gpu-ensemble"
)
print("S3 model uri: {}".format(model_uri))

## 3. TRT + Python Ensemble

For this example, we will download a pretrained model from transformers library. The rest of the models i.e. pre-process and post-process, along with config.pbtxt for all models are included in the folder `ensemble_hf`

In [None]:
model_id = "sentence-transformers/all-MiniLM-L6-v2"

In [None]:
! docker run --gpus=all --rm -it -v `pwd`/workspace:/workspace nvcr.io/nvidia/pytorch:23.03-py3 /bin/bash generate_model_trt.sh $model_id

In [None]:
! mkdir -p ensemble_hf/bert-trt/1 && mv workspace/model.plan ensemble_hf/bert-trt/1/model.plan && rm -rf workspace/model.onnx workspace/core*

Create a custom python conda environment with required dependencies installed

In [None]:
!bash conda_dependencies.sh
!cp processing_env.tar.gz ensemble_hf/postprocess/ && cp processing_env.tar.gz ensemble_hf/preprocess/
!rm processing_env.tar.gz

Upload model artifacts to S3

In [None]:
!tar -C ensemble_hf/ -czf model_trt_python.tar.gz .
model_uri = sagemaker_session.upload_data(
 path="model_trt_python.tar.gz", key_prefix="triton-mme-gpu-ensemble"
)

print("S3 model uri: {}".format(model_uri))

## 4. Run ensembles on SageMaker MME GPU instance

In [None]:
account_id_map = {
 "us-east-1": "785573368785",
 "us-east-2": "007439368137",
 "us-west-1": "710691900526",
 "us-west-2": "301217895009",
 "eu-west-1": "802834080501",
 "eu-west-2": "205493899709",
 "eu-west-3": "254080097072",
 "eu-north-1": "601324751636",
 "eu-south-1": "966458181534",
 "eu-central-1": "746233611703",
 "ap-east-1": "110948597952",
 "ap-south-1": "763008648453",
 "ap-northeast-1": "941853720454",
 "ap-northeast-2": "151534178276",
 "ap-southeast-1": "324986816169",
 "ap-southeast-2": "355873309152",
 "cn-northwest-1": "474822919863",
 "cn-north-1": "472730292857",
 "sa-east-1": "756306329178",
 "ca-central-1": "464438896020",
 "me-south-1": "836785723513",
 "af-south-1": "774647643957",
}

In [None]:
region = boto3.Session().region_name
if region not in account_id_map.keys():
 raise ("UNSUPPORTED REGION")

In [None]:
base = "amazonaws.com.cn" if region.startswith("cn-") else "amazonaws.com"
triton_image_uri = "{account_id}.dkr.ecr.{region}.{base}/sagemaker-tritonserver:23.03-py3".format(
 account_id=account_id_map[region], region=region, base=base
)

In [None]:
models_s3_location = model_uri.rstrip("model_trt_python.tar.gz")

In [None]:
models_s3_location

In [None]:
container = {
 "Image": triton_image_uri,
 "ModelDataUrl": models_s3_location,
 "Mode": "MultiModel",
 "Environment": {"SAGEMAKER_TRITON_DEFAULT_MODEL_NAME": "ensemble_dali_inception"},
}

create_model_response = sm_client.create_model(
 ModelName=sm_model_name, ExecutionRoleArn=role, PrimaryContainer=container
)

model_arn = create_model_response["ModelArn"]

print(f"Model Arn: {model_arn}")

In [None]:
create_endpoint_config_response = sm_client.create_endpoint_config(
 EndpointConfigName=endpoint_config_name,
 ProductionVariants=[
 {
 "InstanceType": instance_type,
 "InitialVariantWeight": 1,
 "InitialInstanceCount": 1,
 "ModelName": sm_model_name,
 "VariantName": "AllTraffic",
 }
 ],
)

endpoint_config_arn = create_endpoint_config_response["EndpointConfigArn"]

print(f"Endpoint Config Arn: {endpoint_config_arn}")

In [None]:
create_endpoint_response = sm_client.create_endpoint(
 EndpointName=endpoint_name, EndpointConfigName=endpoint_config_name
)

endpoint_arn = create_endpoint_response["EndpointArn"]

print(f"Endpoint Arn: {endpoint_arn}")

In [None]:
rv = sm_client.describe_endpoint(EndpointName=endpoint_name)
status = rv["EndpointStatus"]
print(f"Endpoint Creation Status: {status}")

while status == "Creating":
 time.sleep(60)
 rv = sm_client.describe_endpoint(EndpointName=endpoint_name)
 status = rv["EndpointStatus"]
 print(f"Endpoint Creation Status: {status}")

endpoint_arn = rv["EndpointArn"]

print(f"Endpoint Arn: {endpoint_arn}")
print(f"Endpoint Status: {status}")

## 5. Create inference payload and send requests to respective models

### 5.1. TF + Dali Ensemble

In [None]:
sample_img_fname = "shiba_inu_dog.jpg"

import numpy as np

region = boto3.Session().region_name
s3_client = boto3.client("s3")
s3_client.download_file(
 f"sagemaker-example-files-prod-{region}", "datasets/image/pets/shiba_inu_dog.jpg", sample_img_fname
)


def load_image(img_path):
 """
 Loads image as an encoded array of bytes.
 This is a typical approach you want to use in DALI backend
 """
 with open(img_path, "rb") as f:
 img = f.read()
 return np.array(list(img)).astype(np.uint8)


rv = load_image(sample_img_fname)
print(f"Shape of image {rv.shape}")

rv2 = np.expand_dims(rv, 0)
print(f"Shape of expanded image array {rv2.shape}")

payload = {
 "inputs": [
 {
 "name": "INPUT",
 "shape": rv2.shape,
 "datatype": "UINT8",
 "data": rv2.tolist(),
 }
 ]
}

In [None]:
# Run inference - first inference request will take substantially longer than next ones as the model is loaded on the first requestß

response = runtime_sm_client.invoke_endpoint(
 EndpointName=endpoint_name,
 ContentType="application/octet-stream",
 Body=json.dumps(payload),
 TargetModel="model_tf_dali.tar.gz",
)

print(json.loads(response["Body"].read().decode("utf8")))

### 5.2 TRT + Python backend

For this example, we will use binary+json payload. If you choose to not use binary+json payload, you can submit an inference payload similar to the TF + Dali ensemble example previously.

In [None]:
import tritonclient.http as http_client

In [None]:
text_inputs = ["Sentence 1", "Sentence 2"]

inputs = []
inputs.append(http_client.InferInput("INPUT0", [len(text_inputs), 1], "BYTES"))

batch_request = [[text_inputs[i]] for i in range(len(text_inputs))]

input0_real = np.array(batch_request, dtype=np.object_)

inputs[0].set_data_from_numpy(input0_real, binary_data=True)

len(input0_real)

outputs = []
outputs.append(http_client.InferRequestedOutput("finaloutput"))

request_body, header_length = http_client.InferenceServerClient.generate_request_body(
 inputs, outputs=outputs
)

print(request_body)

In [None]:
# Run inference - first inference request will take substantially longer than next ones as the model is loaded on the first requestß

response = runtime_sm_client.invoke_endpoint(
 EndpointName=endpoint_name,
 ContentType="application/vnd.sagemaker-triton.binary+json;json-header-size={}".format(
 header_length
 ),
 Body=request_body,
 TargetModel="model_trt_python.tar.gz",
)

## json.loads fails
# a = json.loads(response["Body"].read().decode("utf8"))

header_length_prefix = "application/vnd.sagemaker-triton.binary+json;json-header-size="
header_length_str = response["ContentType"][len(header_length_prefix) :]

# Read response body
result = http_client.InferenceServerClient.parse_response_body(
 response["Body"].read(), header_length=int(header_length_str)
)

outputs_data = result.as_numpy("finaloutput")

for idx, output in enumerate(outputs_data):
 print(text_inputs[idx])
 print(output)

## Terminate your resources

In [None]:
sm_client.delete_endpoint(EndpointName=endpoint_name)
sm_client.delete_endpoint_config(EndpointConfigName=endpoint_config_name)
sm_client.delete_model(ModelName=sm_model_name)

## Notes: 

Here are some note-worthy points when developing with MME on GPU with ensembles:

1. Larger ensembles are not recommended for smaller instance types due to different memory management behavior of framework backends e.g., Tensorflow backend does not release memory upon UNLOAD due to the framework's GPU memory allocation strategy. When coupled with other model backends, non-deterministic behavior may occur.
2. Each ensemble is treated as a single-model in SageMaker i.e. hierarchy of models is not flat.
3. Model names may be re-used across ensembles, however, each ensemble must have its own copy of the model with duplicated name.


## Conclusion:
We saw an example of the capability of SageMaker-Triton container to support multiple ensembles on GPU. We also saw two different methods of sending inference payload to the models. Customizing the hosting job is further possible by setting environment variables. The supported environment variables are listed here - https://github.com/triton-inference-server/server/blob/main/docker/sagemaker/serve. 