# Deploy scalable streaming tokens solution on SageMaker


---

This notebook's CI test result for us-west-2 is as follows. CI test results in other regions can be found at the end of the notebook. 

![This us-west-2 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/us-west-2/inference|generativeai|llm-workshop|lab6-stream-with-pagination|stream_pagination_lmi.ipynb)

---

In this notebook, we explore how to host a large language model on SageMaker using the latest container that packages some of the most popular open source libraries for model parallel inference like DeepSpeed and HuggingFace Accelerate. We use DJLServing as the model serving solution in this example. DJLServing is a high-performance universal model serving solution powered by the Deep Java Library (DJL) that is programming language agnostic. To learn more about DJL and DJLServing, you can refer to our [recent blog post](https://aws.amazon.com/blogs/machine-learning/deploy-bloom-176b-and-opt-30b-on-amazon-sagemaker-with-large-model-inference-deep-learning-containers-and-deepspeed/).

In this notebook, we will deploy the open source [Cerebras-GPT-2.7B](https://huggingface.co/cerebras/Cerebras-GPT-1.3B) model on a ml.g5.2xlarge machine. We will also demostrate a streaming experience to have model run end2end in a stream way.

![](images/design_chart.png)


Customer could directly send the request to Lambda service through API-Gateway. Lambda service could send inference job request to SageMaker. SageMaker would run inference and update the result to dynamoDB. Finally, customer could read directly from DynamoDB.

## Licence agreement
- View model license information: Apache 2.0 before using the model.
- This notebook is a sample notebook and not intended for production use. Please refer to the licence at https://github.com/aws/mit-0.


## Permission

In order to conduct this lab, we will need the following permissions:

- ECR Push/Pull access
- S3 bucket push access
- SageMaker access
- DynamoDB access (create DB and query)

If you plan to do build restful services, we also need to have lambda, iam and API-gateway permission.

- AWSLambda access (Create lambda function)
- IAM access (Create role, delete role)
- APIGateway (Creation, deletion)

## Let's bump up SageMaker and import stuff

In [None]:
%pip install sagemaker boto3 awscli --upgrade --quiet

In [None]:
import boto3
import sagemaker
from sagemaker import Model, serializers, deserializers

role = sagemaker.get_execution_role() # execution role for the endpoint
sess = sagemaker.session.Session() # sagemaker session for interacting with different AWS APIs
region = sess._region_name # region name of the current SageMaker Studio environment
account_id = sess.account_id() # account_id of the current SageMaker Studio environment

## Bring your own container to ECR repository

*Note: Please make sure you have the permission in AWS credential to push to ECR repository*

In this step, we will pull the LMI nightly container from dockerhub and then push it to the ECR repository.

This process may take a while, depends on the container size and your network bandwidth.

In [None]:
%%bash

# The name of our container
repo_name=djlserving-byoc
# Target container
target_container="deepjavalibrary/djl-serving:deepspeed-nightly"

account=$(aws sts get-caller-identity --query Account --output text)

# Get the region defined in the current configuration (default to us-west-2 if none defined)
region=$(aws configure get region)
region=${region:-us-west-2}

fullname="${account}.dkr.ecr.${region}.amazonaws.com/${repo_name}:latest"
echo "Creating ECR repository ${fullname}"

# If the repository doesn't exist in ECR, create it.

aws ecr describe-repositories --repository-names "${repo_name}" > /dev/null 2>&1

if [ $? -ne 0 ]
then
 aws ecr create-repository --repository-name "${repo_name}" > /dev/null
fi

# Get the login command from ECR and execute it directly
aws ecr get-login-password --region ${region} | docker login --username AWS --password-stdin "${account}.dkr.ecr.${region}.amazonaws.com"

# Build the docker image locally with the image name and then push it to ECR
# with the full name.
echo "Start pulling container: ${target_container}"

docker pull ${target_container}
docker tag ${target_container} ${fullname}
docker push ${fullname}

## Create SageMaker compatible Model artifact, upload Model to S3 and bring your own inference script.

SageMaker Large Model Inference containers can be used to host models without providing your own inference code. This is extremely useful when there is no custom pre-processing of the input data or postprocessing of the model's predictions.

However in this notebook, we demonstrate how to deploy a model with custom inference code.

In LMI contianer, we expect some artifacts to help setting up the model
- `serving.properties` is the configuration file that can be used to configure the model server.
- `model.py` A python file to define the core inference logic
- `requirements.txt` contains the pip wheel need to install in runtime

For more details on the configuration options and an exhaustive list, you can refer the documentation - https://docs.aws.amazon.com/sagemaker/latest/dg/realtime-endpoints-large-model-configuration.html.

In [None]:
%%writefile serving.properties
engine=Python
option.model_id=cerebras/Cerebras-GPT-2.7B

In [None]:
%%writefile model.py
from djl_python import Input, Output
import torch
import logging
from transformers import AutoModelForCausalLM, AutoTokenizer
from djl_python.streaming_utils import StreamingUtils
from paginator import DDBPaginator
import uuid


def load_model(properties):
 model_location = properties["model_dir"]
 if "model_id" in properties:
 model_location = properties["model_id"]
 logging.info(f"Loading model in {model_location}")
 device = "cuda:0"
 model = AutoModelForCausalLM.from_pretrained(
 model_location, low_cpu_mem_usage=True, torch_dtype=torch.float16
 ).to(device)
 tokenizer = AutoTokenizer.from_pretrained(model_location)
 stream_generator = StreamingUtils.get_stream_generator("Accelerate")
 return model, tokenizer, stream_generator


model = None
tokenizer = None
stream_generator = None
paginator = None


def separate_inference(session_id, inputs):
 prompt = inputs["prompt"]
 length = inputs["max_new_tokens"]
 generate_kwargs = dict(max_new_tokens=length, do_sample=True)
 generator = stream_generator(model, tokenizer, prompt, **generate_kwargs)
 generated = ""
 iterator = 0
 for text in generator:
 generated += text[0]
 if iterator == 5:
 paginator.add_cache(session_id, generated)
 iterator = 0
 iterator += 1
 paginator.add_cache(session_id, generated + "")


def handle(inputs: Input):
 global model, tokenizer, stream_generator, paginator
 if not model:
 model, tokenizer, stream_generator = load_model(inputs.get_properties())
 paginator = DDBPaginator("lmi_test_db")

 if inputs.is_empty():
 # Model server makes an empty call to warmup the model on startup
 return None
 session_id = str(uuid.uuid4())
 return (
 Output()
 .add({"session_id": session_id})
 .finalize(separate_inference, session_id, inputs.get_as_json())
 )

In [None]:
%%writefile paginator.py
import boto3
import logging


class DDBPaginator:
 DEFAULT_KEY_NAME = "cache_id"

 def __init__(self, db_name):
 self.db_name = db_name
 self.ddb_client = boto3.client("dynamodb")
 try:
 self.ddb_client.describe_table(TableName=db_name)
 except self.ddb_client.exceptions.ResourceNotFoundException:
 logging.info(f"Table {db_name} not found")
 self.ddb_client.create_table(
 TableName=db_name,
 AttributeDefinitions=[
 {"AttributeName": self.DEFAULT_KEY_NAME, "AttributeType": "S"},
 ],
 KeySchema=[{"AttributeName": self.DEFAULT_KEY_NAME, "KeyType": "HASH"}],
 BillingMode="PAY_PER_REQUEST",
 )
 waiter = self.ddb_client.get_waiter("table_exists")
 waiter.wait(TableName=db_name, WaiterConfig={"Delay": 1})

 def add_cache(self, session_id, content):
 return self.ddb_client.put_item(
 TableName=self.db_name,
 Item={self.DEFAULT_KEY_NAME: {"S": session_id}, "content": {"S": content}},
 )

In [None]:
%%writefile requirements.txt
boto3
transformers==4.27.2

In [None]:
%%sh
mkdir mymodel
mv serving.properties mymodel/
mv model.py mymodel/
mv paginator.py mymodel/
mv requirements.txt mymodel/
tar czvf mymodel.tar.gz mymodel/
rm -rf mymodel

## Start building SageMaker endpoint

### Upload artifact on S3 and create SageMaker model

The tarball that we created will be sent to an s3bucket that SageMaker created.

In [None]:
s3_code_prefix = "large-model-lmi/code"
bucket = sess.default_bucket() # bucket to house artifacts
code_artifact = sess.upload_data("mymodel.tar.gz", bucket, s3_code_prefix)
print(f"S3 Code or Model tar ball uploaded to --- > {code_artifact}")

repo_name = "djlserving-byoc"
image_uri = f"{account_id}.dkr.ecr.{region}.amazonaws.com/{repo_name}:latest"
env = {"HUGGINGFACE_HUB_CACHE": "/tmp", "TRANSFORMERS_CACHE": "/tmp"}

model = Model(image_uri=image_uri, model_data=code_artifact, env=env, role=role)

### Create SageMaker endpoint

Here, we use g5.2xlarge instance. The endpoint name is `lmi-model-deploy`.

In [None]:
instance_type = "ml.g5.2xlarge"
endpoint_name = sagemaker.utils.name_from_base("lmi-model")
print(f"endpoint_name is {endpoint_name}")

model.deploy(
 initial_instance_count=1,
 instance_type=instance_type,
 endpoint_name=endpoint_name,
)

# our requests and responses will be in json format so we specify the serializer and the deserializer
predictor = sagemaker.Predictor(
 endpoint_name=endpoint_name,
 sagemaker_session=sess,
 serializer=serializers.JSONSerializer(),
 deserializer=deserializers.JSONDeserializer(),
)

### This step can take ~ 10 min or longer so please be patient

## Test and benchmark the inference

In here, we use a SageMaker endpoint + DynamoDB simple fetcher to get the response result.

- send prompt request and receive a session id
- use session id to retrieve the streamed tokens

In [None]:
import time

session_id = predictor.predict({"prompt": ["Large language model is"], "max_new_tokens": 256})


def get_stream(session_id):
 ddb_client = boto3.client("dynamodb")
 prev = 0
 while True:
 result = ddb_client.get_item(TableName="lmi_test_db", Key={"cache_id": {"S": session_id}})
 if "Item" in result:
 text = result["Item"]["content"]["S"]
 print(text[prev:], end="")
 prev = len(text)
 if text.endswith(""):
 break
 time.sleep(0.1)


get_stream(session_id["session_id"])

## Make this as a single RESTful endpoint service

in the previous example, we just demoed how to create an endpoint and use CLI to complete inference. Now, let's build a real-world application using Lambda and API-Gateway. This API can be used to call by any client server or any web applications. Here we used an open-sourced toolkit by AWS called [Chalice](https://github.com/aws/chalice). It combines most commonly used Lambda/DynamoDB/APIGateway functions to deploy the stack easily.

Chalice requires 4 major components:

- `app.py`: Place to define your lambda function and related services
- `requirements.txt`: pip wheel needed to drive the applicaiton
- `.chalice/config.json`: a json file defines the generation logic and deployment stage
- `.chalice/policy-.json`: a json file defines the policy that needs to attach to an IAM role of Lambda

#### Note: This step requires permission on IAM role creation, Lambda creation and APIGateWay creation

In [None]:
%pip install chalice requests --upgrade --quiet

In [None]:
with open("app.py", "w") as f:
 f.write(
 """import boto3
import sagemaker
from sagemaker import serializers, deserializers
from chalice import Chalice

app = Chalice(app_name="stream_endpoint")
TABLE_NAME = "lmi_test_db"
SM_ENDPOINT_NAME = "{endpoint_name}"
sm_predictor = None
""".format(
 endpoint_name=endpoint_name
 )
 + """
@app.route("/query", methods=["POST"])
def run_inference():
 body = app.current_request.json_body
 if "session_id" in body:
 return ddb_fetcher(body["session_id"])
 elif "prompt" in body:
 return get_sm_predictor().predict(body)
 else:
 return {"result": "Error!", "_debug": body}


def ddb_fetcher(session_id):
 ddb_client = boto3.client("dynamodb")
 result = ddb_client.get_item(TableName=TABLE_NAME, Key={"cache_id": {"S": session_id}})
 if "Item" in result:
 return {"result": result["Item"]["content"]["S"]}
 return {"result": "", "_debug": result}


def get_sm_predictor():
 global sm_predictor
 if sm_predictor is None:
 sess = sagemaker.session.Session()
 sm_predictor = sagemaker.Predictor(
 endpoint_name=SM_ENDPOINT_NAME,
 sagemaker_session=sess,
 serializer=serializers.JSONSerializer(),
 deserializer=deserializers.JSONDeserializer(),
 )
 return sm_predictor
"""
 )

In [None]:
%%writefile requirements.txt
boto3
sagemaker

In [None]:
%%writefile config.json
{
 "version": "2.0",
 "app_name": "stream_endpoint",
 "stages": {"dev": {"autogen_policy": false, "api_gateway_stage": "api"}}
}

In [None]:
%%writefile policy-dev.json
{
 "Version": "2012-10-17",
 "Statement": [
 {
 "Action": ["logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents"],
 "Resource": "arn:aws:logs:*:*:*",
 "Effect": "Allow"
 },
 {
 "Action": ["dynamodb:GetItem", "dynamodb:Scan", "dynamodb:Query"],
 "Resource": ["arn:aws:dynamodb:*:*:table/lmi_test_db*"],
 "Effect": "Allow"
 },
 {
 "Action": ["sagemaker:ListEndpoints", "sagemaker:InvokeEndpoint"],
 "Resource": ["arn:aws:sagemaker:*:*:endpoint/lmi*"],
 "Effect": "Allow"
 }
 ]
}

Now, let's do deployment!

In [None]:
%%bash
rm -rf deployment/
mkdir -p deployment/.chalice
mv app.py deployment/
mv requirements.txt deployment/
mv policy-dev.json deployment/.chalice/
mv config.json deployment/.chalice/
cd deployment/
chalice deploy

### Inference with Lambda endpoint
Now, let's keep the above url and use it for a simple inference request directly using requests library. Remember to replace the endpoint url.

In [None]:
import requests

lambda_endpoint = "https://wcxj19xw4f.execute-api.us-east-1.amazonaws.com/api/" + "query"

headers = {"content-type": "application/json"}
data = {"prompt": ["Large language model is"], "max_new_tokens": 256}
res = requests.post(lambda_endpoint, headers=headers, json=data)
print(f"First inference response: {res.json()}")
prev = 0
while True:
 ddb_result = requests.post(lambda_endpoint, headers=headers, json=res.json())
 text = ddb_result.json()["result"]
 print(text[prev:], end="")
 prev = len(text)
 if text.endswith(""):
 break
 time.sleep(0.1)

## Clean up the environment

If you have lambda and API gateway environment, do the following to clean up:

In [None]:
%%bash
cd deployment/
chalice delete

Clean up the SageMaker endpoint:

In [None]:
sess.delete_endpoint(endpoint_name)
sess.delete_endpoint_config(endpoint_name)
model.delete_model()

Delete DynamoDB table

In [None]:
ddb_client = boto3.client("dynamodb")
ddb_client.delete_table(TableName="lmi_test_db")

## Notebook CI Test Results

This notebook was tested in multiple regions. The test results are as follows, except for us-west-2 which is shown at the top of the notebook.

![This us-east-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/us-east-1/inference|generativeai|llm-workshop|lab6-stream-with-pagination|stream_pagination_lmi.ipynb)

![This us-east-2 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/us-east-2/inference|generativeai|llm-workshop|lab6-stream-with-pagination|stream_pagination_lmi.ipynb)

![This us-west-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/us-west-1/inference|generativeai|llm-workshop|lab6-stream-with-pagination|stream_pagination_lmi.ipynb)

![This ca-central-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/ca-central-1/inference|generativeai|llm-workshop|lab6-stream-with-pagination|stream_pagination_lmi.ipynb)

![This sa-east-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/sa-east-1/inference|generativeai|llm-workshop|lab6-stream-with-pagination|stream_pagination_lmi.ipynb)

![This eu-west-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/eu-west-1/inference|generativeai|llm-workshop|lab6-stream-with-pagination|stream_pagination_lmi.ipynb)

![This eu-west-2 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/eu-west-2/inference|generativeai|llm-workshop|lab6-stream-with-pagination|stream_pagination_lmi.ipynb)

![This eu-west-3 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/eu-west-3/inference|generativeai|llm-workshop|lab6-stream-with-pagination|stream_pagination_lmi.ipynb)

![This eu-central-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/eu-central-1/inference|generativeai|llm-workshop|lab6-stream-with-pagination|stream_pagination_lmi.ipynb)

![This eu-north-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/eu-north-1/inference|generativeai|llm-workshop|lab6-stream-with-pagination|stream_pagination_lmi.ipynb)

![This ap-southeast-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/ap-southeast-1/inference|generativeai|llm-workshop|lab6-stream-with-pagination|stream_pagination_lmi.ipynb)

![This ap-southeast-2 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/ap-southeast-2/inference|generativeai|llm-workshop|lab6-stream-with-pagination|stream_pagination_lmi.ipynb)

![This ap-northeast-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/ap-northeast-1/inference|generativeai|llm-workshop|lab6-stream-with-pagination|stream_pagination_lmi.ipynb)

![This ap-northeast-2 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/ap-northeast-2/inference|generativeai|llm-workshop|lab6-stream-with-pagination|stream_pagination_lmi.ipynb)

![This ap-south-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/ap-south-1/inference|generativeai|llm-workshop|lab6-stream-with-pagination|stream_pagination_lmi.ipynb)
