In [None]:
%pip install sagemaker xgboost==1.5.1 scikit-learn install sm-serverless-benchmarking -Uqq

## Train and deploy an XGBooost Model
In this example, we'll train an XGBoost model on an synthetic dataset and then deploy it for benchmarking

In [None]:
import tarfile


import sagemaker
import xgboost as xgb
import numpy as np
from sagemaker.model import Model
from sagemaker.serializers import CSVSerializer
from sklearn.datasets import make_classification

role = (
 sagemaker.get_execution_role()
) # manually provide role if using non role based identity
sess = sagemaker.Session()
region = sess.boto_region_name
bucket = sess.default_bucket()
image_uri = sagemaker.image_uris.retrieve(
 framework="xgboost", region=region, version="1.5-1"
)

In [None]:
# train a model on some synthetic data
X, y = make_classification(n_samples=1000, n_features=20)
data = xgb.DMatrix(data=X, label=y)
bst = xgb.train(params={}, dtrain=data)
bst.save_model("./xgboost-model")

In [None]:
# package the model for deployment and upload to S3
with tarfile.open("model.tar.gz", "w:gz") as f:
 f.add("xgboost-model")
model_uri = sess.upload_data(
 "model.tar.gz", bucket=bucket, key_prefix="sm-sl-bench-xgboost"
)

In [None]:
sm_model = Model(image_uri=image_uri, model_data=model_uri, role=role)
sm_model.create(instance_type="ml.m5.xlarge")

## Validate Endpoint
Before launching a full benchmarking job, it is a good idea to first deploy the model on a test endpoint to ensure everything is functioning as it should. Here we will deploy a temporary endpoint and test it with an example payload. Afterwards, the endpoint is deleted. 

In [None]:
# create a temporary endpoint
from sm_serverless_benchmarking.endpoint import ServerlessEndpoint

endpoint = ServerlessEndpoint(model_name=sm_model.name, memory_size=6144)
endpoint.create_endpoint()

In [None]:
# convert first 10 records into csv format
ser = CSVSerializer()
payload = ser.serialize(X[:10])

In [None]:
# invoke endpoint and print predictions
response = endpoint.invoke_endpoint({"Body": payload, "ContentType": "text/csv"})
print(response["Body"].read().decode("utf-8"))

In [None]:
endpoint.clean_up() # delete the endpoint

## Launch Benchmarking SageMaker Job

In [None]:
from sm_serverless_benchmarking.utils import convert_invoke_args_to_jsonl
from sm_serverless_benchmarking.sagemaker_runner import run_as_sagemaker_job

# we'll use 20 random inputs
sample_inputs = X[np.random.choice(X.shape[0], size=20, replace=False)]

example_invoke_args = [
 {"Body": ser.serialize(inp), "ContentType": "text/csv"} for inp in sample_inputs
]

example_invoke_file = convert_invoke_args_to_jsonl(example_invoke_args)

In [None]:
processor = run_as_sagemaker_job(
 role=role,
 model_name=sm_model.name,
 invoke_args_examples_file=example_invoke_file,
 stability_benchmark_invocations=2500,
 concurrency_benchmark_invocations=2500,
)

In [None]:
print(
 f"Once the job is finished, the outputs will be uploaded to {processor.latest_job.outputs[0].destination}"
)

You can optionally run the command below to copy all of the benchmark output artifacts into the current directory. The primary report output will be under the `benchmarking_report/` directory

In [None]:
!aws s3 cp --recursive {processor.latest_job.outputs[0].destination} .

## Run a Local Benchmarking Job [OPTIONAL]
You can also run the same benchmark locally 

In [None]:
from sm_serverless_benchmarking.benchmark import run_serverless_benchmarks

report = run_serverless_benchmarks(
 model_name=sm_model.name, invoke_args_examples_file=example_invoke_file
)