# Ingest massive amounts of data to a Vector DB (Amazon OpenSearch)
**_Use of Amazon OpenSearch as a vector database for storing embeddings_**

This notebook works well with the `conda_python3` kernel on a SageMaker Notebook `ml.t3.xlarge` instance.

---
---

## Contents

1. [Objective](#Objective)
1. [Background](#Background-(Problem-Description-and-Approach))
1. [Overall Workflow](#Overall-Workflow)
1. [Create scripts for ingesting data into OpenSearch](#Create-scripts-for-ingesting-data-into-OpenSearch)
1. [Download the data from the web and upload to S3](#Download-the-data-from-the-web-and-upload-to-S3)
1. [Load the data in a OpenSearch index (Local mode)](#Load-the-data-in-a-OpenSearch-index-(Local-mode))
1. [Load the data in a OpenSearch index via SageMaker Processing Job (Distributed mode)](#Load-the-data-in-a-OpenSearch-index-via-SageMaker-Processing-Job-(Distributed-mode))
1. [Conclusion](#Conclusion)

---

## Objective

This notebook illustrates how to use [`langchain`](https://python.langchain.com/en/latest/index.html) Amazon Sagemaker Endpoints and Amazon Sagemaker Processing Job to convert large amount of data into embeddings and ingest the text data along with its embeddings into an Amazon OpenSearch index.

We use the documents from [sagemaker.readthedocs.io/en/stable](sagemaker.readthedocs.io/en/stable) as the dataset to convert into embeddings. The [`gpt-j-6b`](https://huggingface.co/EleutherAI/gpt-j-6b) large language model (LLM) is to generate the embeddings. 

To understand the code, you might also find it useful to refer to:

- *[The langchain OpenSearch documentation](https://python.langchain.com/en/latest/ecosystem/opensearch.html)*
- *[Amazon OpenSearch service documentation](https://docs.aws.amazon.com/opensearch-service/index.html)*
- *[SageMaker Processing Job](https://docs.aws.amazon.com/sagemaker/latest/dg/processing-job.html)*
---

## Background (Problem Description and Approach)

- **Problem statement**: 

Using LLMs for information retrieval tasks (such as question-answering) requires converting the knowledge corpus as well as user questions into vector embeddings. We want to generate these vector embeddings using an LLM hosted as a Amazon Sagemaker Endpoint and store it in a vector database of choice such as Amazon OpenSearch. For converting large amounts of data (TBs or PBs) we need a scalable system which can accomplish both converting the documents into embeddings, storing them in a vector database and provide low latency similarity search

- **Our approach**: 

1. Host the LLM use to generate the embeddings as a SageMaker Endpoint with `instance_count` set to > 1 (the exact number depends upon time taken to generate the embeddings for the amount of data we have and the dollar amount we want to spend on it; more instances would mean greater cost but also lesser time taken).

1. Place the data to be corpus of data in S3 (each document is a file stored as an object in S3).

1. Use a Python script that uses [langchain](https://python.langchain.com/en/latest/index.html) and [Opensearch-py](https://pypi.org/project/opensearch-py/) to ingest the data into OpenSearch. Run the script locally on this notebook for testing.

1. Create a Sagemaker Processing job with `instance_count` set to > 1 (usually matching the `instance_count` for the Sagemaker Endpoint). 

 Each instance of the SageMaker Processing Job runs a script that does the following:
 - Processes a subset of files from S3.
 - Uses langchain to read the files from the local filesystem and convert it into chunks.
 - Creates a langchain `OpenSearchVectorSearch` object and provides it a `SagemakerJumpstartEmbeddings` object that enables it to talk to our Sagemaker Endpoint.
 - Uses the langchain `OpenSearchVectorSearch` to create or get an existing Opensearch index and then ingests documents into the index which contain the original `text`, `embeddings` and `metadata`.
 - Does this using Pytohn multiprocessing to achieve parallelization even within a single processing job instance and ensure maximum use of the Sagemaker Endpoint instance's GPU.
 > **The advantage to using langchain as a wrapper for interfacing with a vector database is that it provides a generic pattern that can be used with any LLM and any vector store. Langchain automatically uses the OpenSearch bulk ingestion API endpoint for ingesting data rather than ingesting data one record at a time. Furthermore, langchain also provides an opinionated JSON structure that includes text and metadata alongwith the embeddings itself for storing embeddings in an OpenSearch index specifically for information retrieval use-cases**.

- **Our tools**: [Amazon SageMaker SDK](https://sagemaker.readthedocs.io/en/stable/), [langchain](https://python.langchain.com/en/latest/index.html) and [Opensearch-py](https://pypi.org/project/opensearch-py/).


---

## Overall Workflow

**Prerequisite**

The following are prerequisites that needs to be accomplised by running [this cloud formation template](./template.yaml) before running this notebook.
- A Sagemaker Endpoint for generating embeddings.
- An Amazon OpenSearch cluster for storing embeddings.
 - Opensearch cluster's access credentials (username and password) stored in AWS Secrets Mananger by following steps described [here](https://docs.aws.amazon.com/secretsmanager/latest/userguide/managing-secrets.html).

The overall workflow for this notebook is as follows:
1. Install the required Python packages and store session information in local variables.
1. Download data from source and upload to S3.
1. Run the Python script locally to ingest a subset of data into an OpenSearch index for testing.
1. Run Sagemaker Processing Job which reads all data from S3 and runs the same Python script as above to ingest data into OpenSearch.
 - As part of this step we also create a custom container to package langchain and opensearch Python packages.
1. Do a similarity search with embeddings stored in the OpenSearch index for an input query.

---

## Step 1: Setup
Install the required packages.

In [1]:
!pip install --upgrade sagemaker --quiet
!pip install ipywidgets==7.0.0 --quiet
!pip install langchain==0.0.149
!pip install opensearch-py==2.2.0

Looking in indexes: https://pypi.org/simple, https://pip.repos.neuron.amazonaws.com
Looking in indexes: https://pypi.org/simple, https://pip.repos.neuron.amazonaws.com


In [2]:
import os
import sys
import time
import json
import logging
import numpy as np
from typing import List
import sagemaker, boto3, json
from sagemaker.session import Session
from sagemaker.processing import ProcessingInput
from langchain.document_loaders import ReadTheDocsLoader
from langchain.vectorstores import OpenSearchVectorSearch
from langchain.embeddings import SagemakerEndpointEmbeddings
from langchain.llms.sagemaker_endpoint import ContentHandlerBase
from langchain.text_splitter import RecursiveCharacterTextSplitter
from sagemaker.processing import ScriptProcessor, FrameworkProcessor

Change the parameters if you would like to scrape a different website for data, customize chunk size etc.

In [3]:
# global constants
APP_NAME = "llm-apps-blogs"
WEBSITE="https://sagemaker.readthedocs.io/en/stable/"
DOMAIN="sagemaker.readthedocs.io"
DATA_DIR = "docs"
MAX_OS_DOCS_PER_PUT = 500
IMAGE = "load-data-opensearch-custom"
IMAGE_TAG = "latest"
CHUNK_SIZE_FOR_DOC_SPLIT = 600
CHUNK_OVERLAP_FOR_DOC_SPLIT = 20
CREATE_OS_INDEX_HINT_FILE = "_create_index_hint"

In [64]:
logger = logging.getLogger()
logging.basicConfig(format='%(asctime)s,%(module)s,%(processName)s,%(levelname)s,%(message)s', level=logging.INFO, stream=sys.stderr)

### Read parameters from Cloud Formation stack

Some of the resources needed for this notebook such as the Embeddings LLM model endpoint, the Amazon OpenSearch cluster are created outside of this notebook, typically through a cloud formation template. We now read the outputs and parameters of the cloud formation stack created from that template to get the value of these parameters. 

The stack name here should match the stack name you used when creating the cloud formation stack.

In [65]:
# if used a different name while creating the cloud formation stack then change this to match the name you used
CFN_STACK_NAME = "rag22"

**If you did not use a cloud formation template for creating these resources then set the names of these resources manually in the code below.**

In [67]:
#boto3.client('cloudformation').describe_stacks(StackName="ssome")
stacks = boto3.client('cloudformation').list_stacks()
stack_found = CFN_STACK_NAME in [stack['StackName'] for stack in stacks['StackSummaries']]

In [70]:
def get_cfn_outputs(stackname: str) -> List:
 cfn = boto3.client('cloudformation')
 outputs = {}
 for output in cfn.describe_stacks(StackName=stackname)['Stacks'][0]['Outputs']:
 outputs[output['OutputKey']] = output['OutputValue']
 return outputs

def get_cfn_parameters(stackname: str) -> List:
 cfn = boto3.client('cloudformation')
 params = {}
 for param in cfn.describe_stacks(StackName=stackname)['Stacks'][0]['Parameters']:
 params[param['ParameterKey']] = param['ParameterValue']
 return params

if stack_found is True:
 outputs = get_cfn_outputs(CFN_STACK_NAME)
 params = get_cfn_parameters(CFN_STACK_NAME)
 logger.info(f"cfn outputs={outputs}\nparams={params}")

 embeddings_model_endpoint_name = outputs['EmbeddingEndpointName']
 opensearch_domain_endpoint = f"https://{outputs['OpenSearchDomainEndpoint']}"
 opensearch_index = params['OpenSearchIndexName']
 # ARN of the secret is of the following format arn:aws:secretsmanager:region:account_id:secret:my_path/my_secret_name-autoid
 os_creds_secretid_in_secrets_manager = "-".join(outputs['OpenSearchSecret'].split(":")[-1].split('-')[:-1])
else:
 logger.info(f"cloud formation stack {CFN_STACK_NAME} not found, set parameters manually here")
 # REPLACE THE "placeholder" WITH ACTUAL VALUES IF YOU CREATED THESE RESOURCES WITHOUT USING A CLOUD FORMATION TEMPLATE
 embeddings_model_endpoint_name = "placeholder"
 opensearch_domain_endpoint = "placeholder"
 opensearch_index = "placeholder"
 os_creds_secretid_in_secrets_manager = "placeholder"

2023-05-05 01:22:54,050,3843410561,MainProcess,INFO,cfn outputs={'EmbeddingEndpointName': 'gpt-j-6b-endpoint-27e32820', 'OpenSourceDomainArn': 'arn:aws:es:us-east-1:015469603702:domain/opensearchservi-xesatndxtlnk', 'OpenSearchDomainEndpoint': 'search-opensearchservi-xesatndxtlnk-styan2632w7jxb3ojtngbdbsvu.us-east-1.es.amazonaws.com', 'S3BucketSecureURL': 'https://rag22-s3buckethosting-1thvjqr6739hy.s3.amazonaws.com', 'LLMEndpointName': 'flan-t5-xxl-endpoint-27e32820', 'SageMakerNotebookURL': 'https://console.aws.amazon.com/sagemaker/home?region=us-east-1#/notebook-instances/openNotebook/NotebookInstance-WEP7yM7QaFQy?view=classic', 's3BucketTraining': 'rag22-s3buckettraining-13vbb2sl5r60', 'Region': 'us-east-1', 'OpenSearchDomainName': 'opensearchservi-xesatndxtlnk', 'OpenSearchSecret': 'arn:aws:secretsmanager:us-east-1:015469603702:secret:OpenSearchSecret-rag22-Wy0aDq', 's3BucketHostingBucketName': 'rag22-s3buckethosting-1thvjqr6739hy'}
params={'OpenSearchIndexName': 'llm_apps_worksho

The embeddings model endpoint name, OpenSearch domain endpoint and the identifier for the OpenSearch credentials stored in the Secrets Mananger are all available as `Outputs` from the cloud formation stack.

In [71]:
logger.info(f"embeddings_model_endpoint_name={embeddings_model_endpoint_name},\nopensearch_domain_endpoint={opensearch_domain_endpoint},\n"
 f"os_creds_secretid_in_secrets_manager={os_creds_secretid_in_secrets_manager},opensearch_index={opensearch_index}")

2023-05-05 01:22:57,546,796691109,MainProcess,INFO,embeddings_model_endpoint_name=gpt-j-6b-endpoint-27e32820,
opensearch_domain_endpoint=https://search-opensearchservi-xesatndxtlnk-styan2632w7jxb3ojtngbdbsvu.us-east-1.es.amazonaws.com,
os_creds_secretid_in_secrets_manager=OpenSearchSecret-rag22,opensearch_index=llm_apps_workshop_embeddings


In [34]:
sagemaker_session = Session()
aws_role = sagemaker_session.get_caller_identity_arn()
aws_region = boto3.Session().region_name
bucket = sagemaker_session.default_bucket()
logger.info(f"aws_role={aws_role}, aws_region={aws_region}, bucket={bucket}")

2023-05-05 00:41:51,581,1323976668,MainProcess,INFO,aws_role=arn:aws:iam::015469603702:role/SageMakerRepoRole, aws_region=us-east-1, bucket=sagemaker-us-east-1-015469603702


---

## Step 2: Download the data from the web and upload to S3

In this step we use `wget` to crawl a Python documentation style website data. All files other than `html`, `txt` and `md` are removed. **This data download would take a few minutes**.

In [15]:
!mkdir -p scripts

In [16]:
DOWNLOAD_DATA = "yes"

In [17]:
%%writefile scripts/get_data.sh
# This scripts uses wget to crawl the input website and 
# save the downloaded files in a given directory.
echo "input args="
echo $@
if [[ "$1" == "yes" ]];
then
 WEBSITE=$2
 DOMAIN=$3
 KB_DIR=$4 
 # delete any existing folder for this data
 rm -rf ${DOMAIN} ${KB_DIR}
 mkdir -p ${KB_DIR}
 
 # download the data, this may take a few minutes or more depending upon the amount of content, network speed etc.
 wget -e robots=off --recursive --no-clobber --page-requisites --html-extension --convert-links --restrict-file-names=windows --domains ${DOMAIN} --no-parent ${WEBSITE}
 
 # we only want to keep the html files
 # and copy them into a new directory with their
 # full path name flattened into a single file
 # so /path/to/a/file becomes path_to_a_file, this
 # is done so that we can upload all files to a single 
 # prefix in S3 which allows the Sagemaker Processing Job
 # to easily split the files between instances 
 for i in `find ${DOMAIN} -name "*.html"`
 do
 flat_i=`echo "${i//\//_}"`
 echo going to copy $i to ${KB_DIR}/$flat_i
 cp $i ${KB_DIR}/$flat_i 
 done
 
 file_count=`ls | wc -l`
 echo there are $file_count files in ${DOMAIN} directory
else
 echo DOWNLOAD_DATA=$1, not downloading new data
fi

Overwriting scripts/get_data.sh


In [18]:
!chmod +x scripts/get_data.sh
!./scripts/get_data.sh $DOWNLOAD_DATA $WEBSITE $DOMAIN $DATA_DIR

input args=
yes https://sagemaker.readthedocs.io/en/stable/ sagemaker.readthedocs.io docs
Both --no-clobber and --convert-links were specified, only --convert-links will be used.
--2023-05-05 00:25:39-- https://sagemaker.readthedocs.io/en/stable/
Resolving sagemaker.readthedocs.io (sagemaker.readthedocs.io)... 104.17.33.82, 104.17.32.82, 2606:4700::6811:2152, ...
Connecting to sagemaker.readthedocs.io (sagemaker.readthedocs.io)|104.17.33.82|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: unspecified [text/html]
Saving to: ‘sagemaker.readthedocs.io/en/stable/index.html’

 [ <=> ] 50,198 --.-K/s in 0.001s 

2023-05-05 00:25:41 (45.5 MB/s) - ‘sagemaker.readthedocs.io/en/stable/index.html’ saved [50198]

--2023-05-05 00:25:41-- https://sagemaker.readthedocs.io/en/stable/_static/css/theme.css
Reusing existing connection to sagemaker.readthedocs.io:443.
HTTP request sent, awaiting response... 200 OK
Length: unspecified [text/css]
Saving to: ‘sagemaker.readthedocs.io

In [19]:
# create a dummy file called _create_index to provide a hint for opensearch index creation
# this is needed for Sagemaker Processing Job when there are multiple instance nodes
# all running the same code for data ingestion but only one node needs to create the index
!touch $DATA_DIR/$CREATE_OS_INDEX_HINT_FILE

# upload this data to S3, to be used when we run the Sagemaker Processing Job
!aws s3 cp --recursive $DATA_DIR/ s3://$bucket/$APP_NAME/$DOMAIN

upload: docs/_create_index_hint to s3://sagemaker-us-east-1-015469603702/llm-apps-blogs/sagemaker.readthedocs.io/_create_index_hint
upload: docs/sagemaker.readthedocs.io_en_stable_algorithms_tabular_catboost.html to s3://sagemaker-us-east-1-015469603702/llm-apps-blogs/sagemaker.readthedocs.io/sagemaker.readthedocs.io_en_stable_algorithms_tabular_catboost.html
upload: docs/sagemaker.readthedocs.io_en_stable_algorithms_index.html to s3://sagemaker-us-east-1-015469603702/llm-apps-blogs/sagemaker.readthedocs.io/sagemaker.readthedocs.io_en_stable_algorithms_index.html
upload: docs/sagemaker.readthedocs.io_en_stable_algorithms_tabular_lightgbm.html to s3://sagemaker-us-east-1-015469603702/llm-apps-blogs/sagemaker.readthedocs.io/sagemaker.readthedocs.io_en_stable_algorithms_tabular_lightgbm.html
upload: docs/sagemaker.readthedocs.io_en_stable_algorithms_text_lda.html to s3://sagemaker-us-east-1-015469603702/llm-apps-blogs/sagemaker.readthedocs.io/sagemaker.readthedocs.io_en_stable_algorithms_

---

## Step 3: Load data into `OpenSearch`

We are now ready to create scripts which will read data from the local directory, use langchain to create embeddings and then upload the embeddings into OpenSearch.

In [20]:
"""
Create directories for storing scripts and Dockerfile
"""
!mkdir src
!mkdir scripts
!mkdir container

mkdir: cannot create directory ‘src’: File exists
mkdir: cannot create directory ‘scripts’: File exists
mkdir: cannot create directory ‘container’: File exists


### Read credentials from AWS Secrets Manager

The credentials for the OpenSearch cluster are store in AWS Secrets Mananger, our code reads the credentials from there and provides them to the opensearch-py package (through langchain API).

In [21]:
%%writefile container/credentials.py

"""
Retrieve credentials password for given username from AWS SecretsManager
"""
import json
import boto3

def get_credentials(secret_id: str, region_name: str) -> str:
 
 client = boto3.client('secretsmanager', region_name=region_name)
 response = client.get_secret_value(SecretId=secret_id)
 secrets_value = json.loads(response['SecretString']) 
 
 return secrets_value

Overwriting container/credentials.py


### SageMaker embeddings for langchain

langchain provides the [`SagemakerEndpointEmbeddings`]() class which is a wrapper around a functionality to talk to a Sagemaker Endpoint to generate embeddings. We will override the `embed_documents` function to define our own batching strategy for sending requests to the model (multiple requests are sent in one model invocation). Similarly, we extend the `ContentHandlerBase` class to provide implementation for two abstract methods which define how to process (encode/decode) the input data sent to the model and the output received from the model.

We finally create a `SagemakerEndpointEmbeddingsJumpStart` object that puts all this together and can now be used by langchain to talk to an LLM deployed as a Sagemaker Endpoint to generate embeddings.

In [22]:
%%writefile container/sm_helper.py

"""
Helper functions for using Samgemaker Endpoint via langchain
"""
import time
import json
import logging
from typing import List
from langchain.embeddings import SagemakerEndpointEmbeddings
from langchain.embeddings.sagemaker_endpoint import EmbeddingsContentHandler

logger = logging.getLogger(__name__)

# extend the SagemakerEndpointEmbeddings class from langchain to provide a custom embedding function
class SagemakerEndpointEmbeddingsJumpStart(SagemakerEndpointEmbeddings):
 def embed_documents(
 self, texts: List[str], chunk_size: int = 5
 ) -> List[List[float]]:
 """Compute doc embeddings using a SageMaker Inference Endpoint.

 Args:
 texts: The list of texts to embed.
 chunk_size: The chunk size defines how many input texts will
 be grouped together as request. If None, will use the
 chunk size specified by the class.

 Returns:
 List of embeddings, one for each text.
 """
 results = []
 _chunk_size = len(texts) if chunk_size > len(texts) else chunk_size
 st = time.time()
 for i in range(0, len(texts), _chunk_size):
 response = self._embedding_func(texts[i:i + _chunk_size])
 results.extend(response)
 time_taken = time.time() - st
 logger.info(f"got results for {len(texts)} in {time_taken}s, length of embeddings list is {len(results)}")
 return results


# class for serializing/deserializing requests/responses to/from the embeddings model
class ContentHandler(EmbeddingsContentHandler):
 content_type = "application/json"
 accepts = "application/json"

 def transform_input(self, prompt: str, model_kwargs={}) -> bytes:

 input_str = json.dumps({"text_inputs": prompt, **model_kwargs})
 return input_str.encode('utf-8') 

 def transform_output(self, output: bytes) -> str:

 response_json = json.loads(output.read().decode("utf-8"))
 embeddings = response_json["embedding"]
 if len(embeddings) == 1:
 return [embeddings[0]]
 return embeddings
 

def create_sagemaker_embeddings_from_js_model(embeddings_model_endpoint_name: str, aws_region: str) -> SagemakerEndpointEmbeddingsJumpStart:
 # all set to create the objects for the ContentHandler and 
 # SagemakerEndpointEmbeddingsJumpStart classes
 content_handler = ContentHandler()

 # note the name of the LLM Sagemaker endpoint, this is the model that we would
 # be using for generating the embeddings
 embeddings = SagemakerEndpointEmbeddingsJumpStart( 
 endpoint_name=embeddings_model_endpoint_name,
 region_name=aws_region, 
 content_handler=content_handler
 )
 return embeddings

Overwriting container/sm_helper.py


### Script to load data into OpenSearch

This script puts everything together, it divides the documents into chunks, then uses the langchain package to create embeddings (through `SagemakerEndpointEmbeddingsJumpStart`) and then ingests the data into OpenSearch using `OpenSearchVectorSearch`. 

To keep things simple the chunks size is set to a fixed length of 500 tokens, with an overlap of 30 tokens. The langchain `OpenSearchVectorSearch` provides a wrapper over the `opensearch-py` package. It uses the `/_bulk` API endpoint for ingesting multiple records in a single PUT request.

In [23]:
%%writefile container/load_data_into_opensearch.py

import os
import sys

# this is needed because the credentials.py and sm_helper.py
# are in /code directory of the custom container we are going 
# to create for Sagemaker Processing Job
sys.path.insert(1, '/code')

import glob
import time
import json
import logging
import argparse
import numpy as np
import multiprocessing as mp
from itertools import repeat
from functools import partial
import sagemaker, boto3, json
from typing import List, Tuple
from sagemaker.session import Session
from credentials import get_credentials
from opensearchpy.client import OpenSearch
from langchain.document_loaders import ReadTheDocsLoader
from langchain.vectorstores import OpenSearchVectorSearch
from langchain.embeddings import SagemakerEndpointEmbeddings
from sm_helper import create_sagemaker_embeddings_from_js_model
from langchain.llms.sagemaker_endpoint import ContentHandlerBase
from langchain.text_splitter import RecursiveCharacterTextSplitter
from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth

# global constants
MAX_OS_DOCS_PER_PUT = 500
TOTAL_INDEX_CREATION_WAIT_TIME = 60
PER_ITER_SLEEP_TIME = 5
logger = logging.getLogger()
logging.basicConfig(format='%(asctime)s,%(module)s,%(processName)s,%(levelname)s,%(message)s', level=logging.INFO, stream=sys.stderr)

def check_if_index_exists(index_name: str, region: str, host: str, http_auth: Tuple[str, str]) -> OpenSearch:
 #update the region if you're working other than us-east-1

 aos_client = OpenSearch(
 hosts = [{'host': host.replace("https://", ""), 'port': 443}],
 http_auth = http_auth,
 use_ssl = True,
 verify_certs = True,
 connection_class = RequestsHttpConnection
 )
 exists = aos_client.indices.exists(index_name)
 logger.info(f"index_name={index_name}, exists={exists}")
 return exists

 
def process_shard(shard, embeddings_model_endpoint_name, aws_region, os_index_name, os_domain_ep, os_http_auth) -> int: 
 logger.info(f'Starting process_shard of {len(shard)} chunks.')
 st = time.time()
 embeddings = create_sagemaker_embeddings_from_js_model(embeddings_model_endpoint_name, aws_region)
 docsearch = OpenSearchVectorSearch(index_name=os_index_name,
 embedding_function=embeddings,
 opensearch_url=os_domain_ep,
 http_auth=os_http_auth) 
 docsearch.add_documents(documents=shard)
 et = time.time() - st
 logger.info(f'Shard completed in {et} seconds.')
 return 0

if __name__ == "__main__":
 parser = argparse.ArgumentParser()
 parser.add_argument("--opensearch-cluster-domain", type=str, default=None)
 parser.add_argument("--opensearch-secretid", type=str, default=None)
 parser.add_argument("--opensearch-index-name", type=str, default=None)
 parser.add_argument("--aws-region", type=str, default="us-east-1")
 parser.add_argument("--embeddings-model-endpoint-name", type=str, default=None)
 parser.add_argument("--chunk-size-for-doc-split", type=int, default=500)
 parser.add_argument("--chunk-overlap-for-doc-split", type=int, default=30)
 parser.add_argument("--input-data-dir", type=str, default="/opt/ml/processing/input_data")
 parser.add_argument("--process-count", type=int, default=1)
 parser.add_argument("--create-index-hint-file", type=str, default="_create_index_hint")
 args, _ = parser.parse_known_args()

 logger.info("Received arguments {}".format(args))
 # list all the files
 files = glob.glob(os.path.join(args.input_data_dir, "*.*"))
 logger.info(f"there are {len(files)} files to process in the {args.input_data_dir} folder")
 
 # retrieve secret to talk to opensearch
 creds = get_credentials(args.opensearch_secretid, args.aws_region)
 http_auth = (creds['username'], creds['password'])
 
 
 loader = ReadTheDocsLoader(args.input_data_dir)
 text_splitter = RecursiveCharacterTextSplitter(
 # Set a really small chunk size, just to show.
 chunk_size=args.chunk_size_for_doc_split,
 chunk_overlap=args.chunk_overlap_for_doc_split,
 length_function=len,
 )
 
 # Stage one: read all the docs, split them into chunks. 
 st = time.time() 
 logger.info('Loading documents ...')
 docs = loader.load()
 
 # add a custom metadata field, such as timestamp
 for doc in docs:
 doc.metadata['timestamp'] = time.time()
 doc.metadata['embeddings_model'] = args.embeddings_model_endpoint_name
 chunks = text_splitter.create_documents([doc.page_content for doc in docs], metadatas=[doc.metadata for doc in docs])
 et = time.time() - st
 logger.info(f'Time taken: {et} seconds. {len(chunks)} chunks generated') 
 
 
 db_shards = (len(chunks) // MAX_OS_DOCS_PER_PUT) + 1
 print(f'Loading chunks into vector store ... using {db_shards} shards') 
 st = time.time()
 shards = np.array_split(chunks, db_shards)
 
 t1 = time.time()
 
 # first check if index exists, if it does then call the add_documents function
 # otherwise call the from_documents function which would first create the index
 # and then do a bulk add. Both add_documents and from_documents do a bulk add
 # but it is important to call from_documents first so that the index is created
 # correctly for K-NN
 index_exists = check_if_index_exists(args.opensearch_index_name,
 args.aws_region,
 args.opensearch_cluster_domain,
 http_auth)
 
 embeddings = create_sagemaker_embeddings_from_js_model(args.embeddings_model_endpoint_name, args.aws_region)
 
 if index_exists is False:
 # create an index if the create index hint file exists
 path = os.path.join(args.input_data_dir, args.create_index_hint_file)
 if os.path.isfile(path) is True:
 logger.info(f"index {args.opensearch_index_name} does not exist but {path} file is present so will create index")
 # by default langchain would create a k-NN index and the embeddings would be ingested as a k-NN vector type
 docsearch = OpenSearchVectorSearch.from_documents(index_name=args.opensearch_index_name,
 documents=shards[0],
 embedding=embeddings,
 opensearch_url=args.opensearch_cluster_domain,
 http_auth=http_auth)
 # we now need to start the loop below for the second shard
 shard_start_index = 1 
 else:
 logger.info(f"index {args.opensearch_index_name} does not exist and {path} file is not present, "
 f"will wait for some other node to create the index")
 shard_start_index = 0
 # start a loop to wait for index creation by another node
 time_slept = 0
 while True:
 logger.info(f"index {args.opensearch_index_name} still does not exist, sleeping...")
 time.sleep(PER_ITER_SLEEP_TIME)
 index_exists = check_if_index_exists(args.opensearch_index_name,
 args.aws_region,
 args.opensearch_cluster_domain,
 http_auth)
 if index_exists is True:
 logger.info(f"index {args.opensearch_index_name} now exists")
 break
 time_slept += PER_ITER_SLEEP_TIME
 if time_slept >= TOTAL_INDEX_CREATION_WAIT_TIME:
 logger.error(f"time_slept={time_slept} >= {TOTAL_INDEX_CREATION_WAIT_TIME}, not waiting anymore for index creation")
 break
 
 else:
 logger.info(f"index={args.opensearch_index_name} does exists, going to call add_documents")
 shard_start_index = 0
 
 with mp.Pool(processes = args.process_count) as pool:
 results = pool.map(partial(process_shard,
 embeddings_model_endpoint_name=args.embeddings_model_endpoint_name,
 aws_region=args.aws_region,
 os_index_name=args.opensearch_index_name,
 os_domain_ep=args.opensearch_cluster_domain,
 os_http_auth=http_auth),
 shards[shard_start_index:])
 
 t2 = time.time()
 logger.info(f'run time in seconds: {t2-t1:.2f}')
 logger.info("all done")

Overwriting container/load_data_into_opensearch.py


---

## Load the data in a `OpenSearch` index (Local mode)

We now run our script in local mode i.e. on this notebook. This is going to take about 10 minutes or so, once we confirm that this works sucessfully then we will run this same script as a Sagemaker processing job using multiple nodes.

In [24]:
import subprocess
def run_cmd(cmd: str) -> None:
 """
 Run a shell command. This function exists because often it is 
 cumbersome to run a shell command that takes parameters which 
 are Python variables.
 """
 MAX_OUTPUT_LEN = 500
 logger.info(f"run_cmd, going to run cmd=\"{cmd}\"")

 p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
 out, err = p.communicate()
 errcode = p.returncode

 # result = subprocess.run(cmd, capture_output=True, text=True, check=True)
 out = out.decode("utf-8") 
 err = err.decode("utf-8")
 if len(out) > MAX_OUTPUT_LEN:
 out = out[:-MAX_OUTPUT_LEN]
 if len(err) > MAX_OUTPUT_LEN:
 err = err[:-MAX_OUTPUT_LEN]
 logger.info(f"errcode={errcode}, out={out}, err={err}")


In [35]:
cmd = f"""python container/load_data_into_opensearch.py --opensearch-cluster-domain {opensearch_domain_endpoint} \
--opensearch-secretid {os_creds_secretid_in_secrets_manager} \
--opensearch-index-name {opensearch_index} \
--aws-region {aws_region} \
--embeddings-model-endpoint-name {embeddings_model_endpoint_name} \
--chunk-size-for-doc-split 1000 \
--chunk-overlap-for-doc-split 50 \
--input-data-dir {DATA_DIR} \
--create-index-hint-file {CREATE_OS_INDEX_HINT_FILE} \
--process-count 2
"""
run_cmd(cmd)

2023-05-05 00:42:01,829,4244735302,MainProcess,INFO,run_cmd, going to run cmd="python container/load_data_into_opensearch.py --opensearch-cluster-domain https://search-opensearchservi-xesatndxtlnk-styan2632w7jxb3ojtngbdbsvu.us-east-1.es.amazonaws.com --opensearch-secretid OpenSearchSecret-rag22 --opensearch-index-name llm_apps_workshop_embeddings --aws-region us-east-1 --embeddings-model-endpoint-name gpt-j-6b-endpoint-27e32820 --chunk-size-for-doc-split 1000 --chunk-overlap-for-doc-split 50 --input-data-dir docs --create-index-hint-file _create_index_hint --process-count 2
"
2023-05-05 00:51:41,854,4244735302,MainProcess,INFO,errcode=0, out=Loading chunks into vector store ... using 7 shards
is_appx_search is true
back to original
{'settings': {'index': {'knn': True, 'knn.algo_param.ef_search': 512}}, 'mappings': {'properties': {'vector_field': {'type': 'knn_vector', 'dimension': 4096, err=2023-05-05 00:42:03,626,load_data_into_opensearch,MainProcess,INFO,Received arguments Namespace(

---

## Load the data in a `OpenSearch` index via SageMaker Processing Job (Distributed mode)

We now have a working script that is able to ingest data into an OpenSearch index. But for this to work for massive amounts of data we need to scale up the processing by running this code in a distributed fashion. We will do this using Sagemkaer Processing Job. This involves the following steps:

1. Create a custom container in which we will install the `langchain` and `opensearch-py` packges and then upload this container image to Amazon Elastic Container Registry (ECR).
2. Use the Sagemaker `ScriptProcessor` class to create a Sagemaker Processing job that will run on multiple nodes.
 - The data files available in S3 are automatically distributed across in the Sagemaker Processing Job instances by setting `s3_data_distribution_type='ShardedByS3Key'` as part of the `ProcessingInput` provided to the processing job.
 - Each node processes a subset of the files and this brings down the overall time required to ingest the data into Opensearch.
 - Each node also uses Python `multiprocessing` to internally also parallelize the file processing. Thus, **there are two levels of parallelization happening, one at the cluster level where individual nodes are distributing the work (files) amongst themselves and another at the node level where the files in a node are also split between multiple processes running on the node**.

### Create custom container

We will now create a container locally and push the container image to ECR. **The container creation process takes about 1 minute**.

1. The container include all the Python packages we need i.e. `langchain`, `opensearch-py`, `sagemaker` and `beautifulsoup4`.
1. The container also includes the `credentials.py` script for retrieving credentials from Secrets Manager and `sm_helper.py` for helping to create SageMaker endpoint classes that langchain uses.

In [36]:
%%writefile container/Dockerfile

FROM python:3.9-slim-buster

RUN apt-get -y update && apt-get install -y --no-install-recommends \
 wget \
 python3-pip \
 python3-setuptools \
 nginx \
 ca-certificates \
 && rm -rf /var/lib/apt/lists/*

RUN ln -s /usr/bin/python3 /usr/bin/python
RUN ln -s /usr/bin/pip3 /usr/bin/pip

# pip leaves the install caches populated which uses a 
# significant amount of space. These optimizations save a fair 
# amount of space in the image, which reduces start up time.
RUN pip --no-cache-dir install langchain==0.0.149 opensearch-py==2.2.0 sagemaker==2.148.0 beautifulsoup4==4.12.2

# Include python script for retrieving credentials 
# from AWS SecretsManager and Sagemaker helper classes
ADD credentials.py /code/
ADD sm_helper.py /code/

# Set some environment variables. PYTHONUNBUFFERED keeps Python from buffering our standard
# output stream, which means that logs can be delivered to the user quickly. PYTHONDONTWRITEBYTECODE
# keeps Python from writing the .pyc files which are unnecessary in this case. We also update
# PATH so that the train and serve programs are found when the container is invoked.
ENV PYTHONUNBUFFERED=TRUE
ENV PYTHONDONTWRITEBYTECODE=TRUE

Overwriting container/Dockerfile


In [37]:
%%writefile scripts/build_and_push.sh

#!/usr/bin/env bash
# This script shows how to build the Docker image and push it to ECR to be ready for use
# by SageMaker.
# The argument to this script are the path to the Dockerfile, the image name and tag and the aws-region
# in which the container is to be created. This will be used as the image on the local
# machine and combined with the account and region to form the repository name for ECR.

# override the built-in echo so that we can have a nice timestamped trace
echo () {
 builtin echo "$(date +'[%m-%d %H:%M:%S]'):" "$@"
}

if [ "$#" -eq 4 ]; then
 dlc_account_id=$(aws sts get-caller-identity | jq .Account)
 path_to_dockerfile=$1
 image=$2
 tag=$3
 region=$4
 
else
 echo "missing mandatory command line arguments, see usage..."
 echo "usage: $0 $1 $2 $3 "
 exit 1
fi

# Get the account number associated with the current IAM credentials
account=$(aws sts get-caller-identity --query Account --output text)

if [ $? -ne 0 ]
then
 exit 255
fi


fullname="${account}.dkr.ecr.${region}.amazonaws.com/${image}:${tag}"
echo the full image name would be ${fullname}

# If the repository doesn't exist in ECR, create it.
aws ecr describe-repositories --region ${region} --repository-names "${image}" > /dev/null 2>&1
if [ $? -ne 0 ]; then
 echo "creating ECR repository : ${fullname} "
 aws ecr create-repository --region ${region} --repository-name "${image}" > /dev/null
else
 echo "${image} repo already exists in ECR"
fi

# move to path of dockerfile
cd ${path_to_dockerfile}

# get credentials to login to ECR and, build and tag the image
# note the use of DOCKER_BUILDKIT=1, this is needed for some mount instructions in the Dockerfile
echo "going to start a docker build, image=${image}, using Dockerfile=${path_to_dockerfile}"
aws ecr get-login-password --region ${region} \
| docker login --username AWS --password-stdin ${account}.dkr.ecr.${region}.amazonaws.com
DOCKER_BUILDKIT=1 docker build . -t ${image} --build-arg dlc_account_id=${dlc_account_id} --build-arg region=${region}
docker tag ${image} ${fullname}
echo ${image} created

# push the image to ECR
cmd="aws ecr get-login-password --region ${region} | docker login --username AWS --password-stdin ${account}.dkr.ecr.${region}.amazonaws.com"
echo going to run \"${cmd}\" to login to ECR
${cmd}

cmd="docker push ${fullname}"
echo going to run \"${cmd}\" to push image to ecr
${cmd}
if [ $? -eq 0 ]; then
 echo "Amazon ECR URI: ${fullname}"
else
 echo "Error: Image ${fullname} build and push failed"
 exit 1
fi

echo "all done"


Overwriting scripts/build_and_push.sh


In [38]:
# Run script to build docker custom containe image and push it to ECR 
# Set region and sagemaker URI variables 
session = boto3.session.Session()
client = boto3.client("sts")
account_id = client.get_caller_identity()["Account"]
logger.info(f"region={aws_region}, account_id={account_id}")
!bash scripts/build_and_push.sh $(pwd)/container $IMAGE $IMAGE_TAG $aws_region

2023-05-05 00:51:41,946,383160684,MainProcess,INFO,region=us-east-1, account_id=015469603702


[05-05 00:51:43]: the full image name would be 015469603702.dkr.ecr.us-east-1.amazonaws.com/load-data-opensearch-custom:latest
[05-05 00:51:43]: load-data-opensearch-custom repo already exists in ECR
[05-05 00:51:43]: going to start a docker build, image=load-data-opensearch-custom, using Dockerfile=/home/ec2-user/SageMaker/repos/llm-apps-workshop/blogs/container
https://docs.docker.com/engine/reference/commandline/login/#credentials-store

Login Succeeded
[1A[1B[0G[?25l[+] Building 0.0s (0/1) 
[?25h[1A[0G[?25l[+] Building 0.1s (3/3) 
[34m => [internal] load build definition from Dockerfile 0.0s
[0m[34m => => transferring dockerfile: 1.26kB 0.0s
[0m[34m => [internal] load .dockerignore 0.0s
[0m[34m => => transferring context: 2B 0.0s
[0m[34m => [internal] load metadata for docker.io/library/python:3.9-slim-buster 0.1s
[0m[?25h[1A[1A[1A[1A[1A[1A[0G[?25l[+] Building 0.2s (12/12) FINISHED 
[34m => [internal] load build definition from Dockerfile 0.0s
[0m[34m 

### Create and run the Sagemaker Processing Job

Now we will run the Sagemaker Processing Job to ingest the data into OpenSearch.

In [39]:
# setup the parameters for the job
base_job_name = f"{APP_NAME}-job"
tags = [{"Key": "data", "Value": "embeddings-for-llm-apps"}]

# use the custom container we just created
image_uri = f"{account_id}.dkr.ecr.{aws_region}.amazonaws.com/{IMAGE}:{IMAGE_TAG}"

# instance type and count determined via trial and error: how much overall processing time
# and what compute cost works best for your use-case
instance_type = "ml.m5.xlarge"
instance_count = 3
logger.info(f"base_job_name={base_job_name}, tags={tags}, image_uri={image_uri}, instance_type={instance_type}, instance_count={instance_count}")

# setup the ScriptProcessor with the above parameters
processor = ScriptProcessor(base_job_name=base_job_name,
 image_uri=image_uri,
 role=aws_role,
 instance_type=instance_type,
 instance_count=instance_count,
 command=["python3"],
 tags=tags)

# setup input from S3, note the ShardedByS3Key, this ensures that 
# each instance gets a random and equal subset of the files in S3.
inputs = [ProcessingInput(source=f"s3://{bucket}/{APP_NAME}/{DOMAIN}",
 destination='/opt/ml/processing/input_data',
 s3_data_distribution_type='ShardedByS3Key',
 s3_data_type='S3Prefix')]


logger.info(f"creating an opensearch index with name={opensearch_index}")
# ready to run the processing job
st = time.time()
processor.run(code="container/load_data_into_opensearch.py",
 inputs=inputs,
 outputs=[],
 arguments=["--opensearch-cluster-domain", opensearch_domain_endpoint,
 "--opensearch-secretid", os_creds_secretid_in_secrets_manager,
 "--opensearch-index-name", opensearch_index,
 "--aws-region", aws_region,
 "--embeddings-model-endpoint-name", embeddings_model_endpoint_name,
 "--chunk-size-for-doc-split", str(CHUNK_SIZE_FOR_DOC_SPLIT),
 "--chunk-overlap-for-doc-split", str(CHUNK_OVERLAP_FOR_DOC_SPLIT),
 "--input-data-dir", "/opt/ml/processing/input_data",
 "--create-index-hint-file", CREATE_OS_INDEX_HINT_FILE,
 "--process-count", "2"])
time_taken = time.time() - st
logger.info(f"processing job completed, total time taken={time_taken}s")
preprocessing_job_description = processor.jobs[-1].describe()
logger.info(preprocessing_job_description)

2023-05-05 00:51:45,494,2195981164,MainProcess,INFO,base_job_name=llm-apps-blogs-job, tags=[{'Key': 'data', 'Value': 'embeddings-for-llm-apps'}], image_uri=015469603702.dkr.ecr.us-east-1.amazonaws.com/load-data-opensearch-custom:latest, instance_type=ml.m5.xlarge, instance_count=3
2023-05-05 00:51:45,529,2195981164,MainProcess,INFO,creating an opensearch index with name=llm_apps_workshop_embeddings
2023-05-05 00:51:45,805,session,MainProcess,INFO,Creating processing-job with name llm-apps-blogs-job-2023-05-05-00-51-45-530


.......................[32m2023-05-05 00:55:34,810,load_data_into_opensearch,MainProcess,INFO,Received arguments Namespace(opensearch_cluster_domain='https://search-opensearchservi-xesatndxtlnk-styan2632w7jxb3ojtngbdbsvu.us-east-1.es.amazonaws.com', opensearch_secretid='OpenSearchSecret-rag22', opensearch_index_name='llm_apps_workshop_embeddings', aws_region='us-east-1', embeddings_model_endpoint_name='gpt-j-6b-endpoint-27e32820', chunk_size_for_doc_split=600, chunk_overlap_for_doc_split=20, input_data_dir='/opt/ml/processing/input_data', process_count=2, create_index_hint_file='_create_index_hint')[0m
[32m2023-05-05 00:55:34,810,load_data_into_opensearch,MainProcess,INFO,there are 70 files to process in the /opt/ml/processing/input_data folder[0m
 _ = BeautifulSoup([0m
[32m2023-05-05 00:55:35,020,load_data_into_opensearch,MainProcess,INFO,Loading documents ...[0m
 soup = BeautifulSoup(data, **self.bs_kwargs)[0m
[35m2023-05-05 00:55:34,672,load_data_into_opensearch,MainProcess

2023-05-05 01:04:50,470,2195981164,MainProcess,INFO,processing job completed, total time taken=784.9402143955231s
2023-05-05 01:04:50,499,2195981164,MainProcess,INFO,{'ProcessingInputs': [{'InputName': 'input-1', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-015469603702/llm-apps-blogs/sagemaker.readthedocs.io', 'LocalPath': '/opt/ml/processing/input_data', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'ShardedByS3Key', 'S3CompressionType': 'None'}}, {'InputName': 'code', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-015469603702/llm-apps-blogs-job-2023-05-05-00-51-45-530/input/code/load_data_into_opensearch.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}], 'ProcessingJobName': 'llm-apps-blogs-job-2023-05-05-00-51-45-530', 'ProcessingResources': {'ClusterConfig': {'InstanceCount': 3, 'Inst

## Step 4: Do a similarity search for for user input to documents (embeddings) in OpenSearch

In [72]:
from container.credentials import get_credentials
from langchain.vectorstores import OpenSearchVectorSearch
from container.sm_helper import create_sagemaker_embeddings_from_js_model

creds = get_credentials(os_creds_secretid_in_secrets_manager, aws_region)
http_auth = (creds['username'], creds['password'])
docsearch = OpenSearchVectorSearch(index_name=opensearch_index,
 embedding_function=create_sagemaker_embeddings_from_js_model(embeddings_model_endpoint_name,
 aws_region),
 opensearch_url=opensearch_domain_endpoint,
 http_auth=http_auth)
q = "Which XGBoost versions does SageMaker support?"
docs = docsearch.similarity_search(q, k=3) #, search_type="script_scoring", space_type="cosinesimil"
for doc in docs:
 logger.info("----------")
 logger.info(f"content=\"{doc.page_content}\",\nmetadata=\"{doc.metadata}\"")
 

2023-05-05 02:12:53,333,credentials,MainProcess,INFO,Found credentials from IAM Role: BaseNotebookInstanceEc2InstanceRole
2023-05-05 02:12:54,084,base,MainProcess,INFO,POST https://search-opensearchservi-xesatndxtlnk-styan2632w7jxb3ojtngbdbsvu.us-east-1.es.amazonaws.com:443/llm_apps_workshop_embeddings/_search [status:200 request:0.544s]
2023-05-05 02:12:54,092,336566357,MainProcess,INFO,----------
2023-05-05 02:12:54,093,336566357,MainProcess,INFO,content="an expanded set of metrics than the original versions. It provides an XGBoost estimator that executes a training script in a managed XGBoost environment. The current release of SageMaker XGBoost is based on the original XGBoost versions 1.0, 1.2, 1.3, and 1.5.
The following table outlines a variety of sample notebooks that address different use cases of Amazon SageMaker XGBoost algorithm.
Notebook Title
Description
How to Create a Custom XGBoost container?
This notebook shows you how to build a custom XGBoost Container with Amazon S

In [74]:
opensearch_domain_endpoint

'https://search-opensearchservi-xesatndxtlnk-styan2632w7jxb3ojtngbdbsvu.us-east-1.es.amazonaws.com'

---

## Cleanup

To avoid incurring future charges, delete the resources. You can do this by deleting the CloudFormation template used to create the IAM role and SageMaker notebook.


---

## Conclusion
In this notebook we were able to see how to use LLMs deployed on a SageMaker Endpoint to generate embeddings and then ingest those embeddings into OpenSearch and finally do a similarity search for user input to the documents (embeddings) stored in OpenSearch. We used langchain as an abstraction layer to talk to both the SageMaker Endpoint as well as OpenSearch.