{ "cells": [ { "cell_type": "markdown", "id": "a0091589-d964-40c6-a4b7-09ecf597197e", "metadata": { "tags": [] }, "source": [ "# Ingest massive amounts of data to a Vector DB (Amazon OpenSearch)\n", "**_Use of Amazon OpenSearch as a vector database for storing embeddings_**\n", "\n", "This notebook works well with the `conda_python3` kernel on a SageMaker Notebook `ml.t3.xlarge` instance.\n", "\n", "---\n", "---\n", "\n", "## Contents\n", "\n", "1. [Objective](#Objective)\n", "1. [Background](#Background-(Problem-Description-and-Approach))\n", "1. [Overall Workflow](#Overall-Workflow)\n", "1. [Create scripts for ingesting data into OpenSearch](#Create-scripts-for-ingesting-data-into-OpenSearch)\n", "1. [Download the data from the web and upload to S3](#Download-the-data-from-the-web-and-upload-to-S3)\n", "1. [Load the data in a OpenSearch index (Local mode)](#Load-the-data-in-a-OpenSearch-index-(Local-mode))\n", "1. [Load the data in a OpenSearch index via SageMaker Processing Job (Distributed mode)](#Load-the-data-in-a-OpenSearch-index-via-SageMaker-Processing-Job-(Distributed-mode))\n", "1. [Conclusion](#Conclusion)" ] }, { "cell_type": "markdown", "id": "420c0dd9-bcb5-409f-8a4a-adab5ef47e42", "metadata": {}, "source": [ "---\n", "\n", "## Objective\n", "\n", "This notebook illustrates how to use [`langchain`](https://python.langchain.com/en/latest/index.html) Amazon Sagemaker Endpoints and Amazon Sagemaker Processing Job to convert large amount of data into embeddings and ingest the text data along with its embeddings into an Amazon OpenSearch index.\n", "\n", "We use the documents from [sagemaker.readthedocs.io/en/stable](sagemaker.readthedocs.io/en/stable) as the dataset to convert into embeddings. The [`gpt-j-6b`](https://huggingface.co/EleutherAI/gpt-j-6b) large language model (LLM) is to generate the embeddings. \n", "\n", "To understand the code, you might also find it useful to refer to:\n", "\n", "- *[The langchain OpenSearch documentation](https://python.langchain.com/en/latest/ecosystem/opensearch.html)*\n", "- *[Amazon OpenSearch service documentation](https://docs.aws.amazon.com/opensearch-service/index.html)*\n", "- *[SageMaker Processing Job](https://docs.aws.amazon.com/sagemaker/latest/dg/processing-job.html)*\n", "---\n", "\n", "## Background (Problem Description and Approach)\n", "\n", "- **Problem statement**: \n", "\n", "Using LLMs for information retrieval tasks (such as question-answering) requires converting the knowledge corpus as well as user questions into vector embeddings. We want to generate these vector embeddings using an LLM hosted as a Amazon Sagemaker Endpoint and store it in a vector database of choice such as Amazon OpenSearch. For converting large amounts of data (TBs or PBs) we need a scalable system which can accomplish both converting the documents into embeddings, storing them in a vector database and provide low latency similarity search\n", "\n", "- **Our approach**: \n", "\n", "1. Host the LLM use to generate the embeddings as a SageMaker Endpoint with `instance_count` set to > 1 (the exact number depends upon time taken to generate the embeddings for the amount of data we have and the dollar amount we want to spend on it; more instances would mean greater cost but also lesser time taken).\n", "\n", "1. Place the data to be corpus of data in S3 (each document is a file stored as an object in S3).\n", "\n", "1. Use a Python script that uses [langchain](https://python.langchain.com/en/latest/index.html) and [Opensearch-py](https://pypi.org/project/opensearch-py/) to ingest the data into OpenSearch. Run the script locally on this notebook for testing.\n", "\n", "1. Create a Sagemaker Processing job with `instance_count` set to > 1 (usually matching the `instance_count` for the Sagemaker Endpoint). \n", "\n", " Each instance of the SageMaker Processing Job runs a script that does the following:\n", " - Processes a subset of files from S3.\n", " - Uses langchain to read the files from the local filesystem and convert it into chunks.\n", " - Creates a langchain `OpenSearchVectorSearch` object and provides it a `SagemakerJumpstartEmbeddings` object that enables it to talk to our Sagemaker Endpoint.\n", " - Uses the langchain `OpenSearchVectorSearch` to create or get an existing Opensearch index and then ingests documents into the index which contain the original `text`, `embeddings` and `metadata`.\n", " - Does this using Pytohn multiprocessing to achieve parallelization even within a single processing job instance and ensure maximum use of the Sagemaker Endpoint instance's GPU.\n", " > **The advantage to using langchain as a wrapper for interfacing with a vector database is that it provides a generic pattern that can be used with any LLM and any vector store. Langchain automatically uses the OpenSearch bulk ingestion API endpoint for ingesting data rather than ingesting data one record at a time. Furthermore, langchain also provides an opinionated JSON structure that includes text and metadata alongwith the embeddings itself for storing embeddings in an OpenSearch index specifically for information retrieval use-cases**.\n", "\n", "- **Our tools**: [Amazon SageMaker SDK](https://sagemaker.readthedocs.io/en/stable/), [langchain](https://python.langchain.com/en/latest/index.html) and [Opensearch-py](https://pypi.org/project/opensearch-py/).\n" ] }, { "cell_type": "markdown", "id": "e0a32334-88aa-431a-a306-f9fc3030f6cb", "metadata": {}, "source": [ "---\n", "\n", "## Overall Workflow\n", "\n", "**Prerequisite**\n", "\n", "The following are prerequisites that needs to be accomplised by running [this cloud formation template](./template.yaml) before running this notebook.\n", "- A Sagemaker Endpoint for generating embeddings.\n", "- An Amazon OpenSearch cluster for storing embeddings.\n", " - Opensearch cluster's access credentials (username and password) stored in AWS Secrets Mananger by following steps described [here](https://docs.aws.amazon.com/secretsmanager/latest/userguide/managing-secrets.html).\n", "\n", "The overall workflow for this notebook is as follows:\n", "1. Install the required Python packages and store session information in local variables.\n", "1. Download data from source and upload to S3.\n", "1. Run the Python script locally to ingest a subset of data into an OpenSearch index for testing.\n", "1. Run Sagemaker Processing Job which reads all data from S3 and runs the same Python script as above to ingest data into OpenSearch.\n", " - As part of this step we also create a custom container to package langchain and opensearch Python packages.\n", "1. Do a similarity search with embeddings stored in the OpenSearch index for an input query." ] }, { "cell_type": "markdown", "id": "81acd36d-a1c9-41b0-b412-7af957b2580f", "metadata": {}, "source": [ "---\n", "\n", "## Step 1: Setup\n", "Install the required packages." ] }, { "cell_type": "code", "execution_count": null, "id": "d24ca243-fb69-4422-97ba-5a41c38a4b0d", "metadata": { "tags": [] }, "outputs": [], "source": [ "!pip install --upgrade sagemaker --quiet\n", "!pip install ipywidgets==7.0.0 langchain==0.0.149 opensearch-py==2.2.0 faiss_cpu==1.7.4 --quiet" ] }, { "cell_type": "code", "execution_count": null, "id": "822a0889-6149-4216-9633-ff66d201181c", "metadata": { "tags": [] }, "outputs": [], "source": [ "import os\n", "import sys\n", "import time\n", "import json\n", "import logging\n", "import numpy as np\n", "from typing import List\n", "import sagemaker, boto3, json\n", "from sagemaker.session import Session\n", "from sagemaker.processing import ProcessingInput\n", "from langchain.document_loaders import ReadTheDocsLoader\n", "from langchain.vectorstores import OpenSearchVectorSearch\n", "from langchain.embeddings import SagemakerEndpointEmbeddings\n", "from langchain.llms.sagemaker_endpoint import ContentHandlerBase\n", "from langchain.text_splitter import RecursiveCharacterTextSplitter\n", "from sagemaker.processing import ScriptProcessor, FrameworkProcessor" ] }, { "cell_type": "markdown", "id": "e4647260-5368-4793-b1dc-2853b453218c", "metadata": {}, "source": [ "Change the parameters if you would like to scrape a different website for data, customize chunk size etc." ] }, { "cell_type": "code", "execution_count": null, "id": "9d2fa335-c7a2-46ec-a14b-3e898f899975", "metadata": { "tags": [] }, "outputs": [], "source": [ "# global constants\n", "WEBSITE=\"https://sagemaker.readthedocs.io/en/stable/\"\n", "DOMAIN=\"sagemaker.readthedocs.io\"\n", "DATA_DIR = \"docs\"\n", "MAX_OS_DOCS_PER_PUT = 500\n", "IMAGE = \"load-data-opensearch-custom\"\n", "IMAGE_TAG = \"latest\"\n", "CHUNK_SIZE_FOR_DOC_SPLIT = 600\n", "CHUNK_OVERLAP_FOR_DOC_SPLIT = 20\n", "CREATE_OS_INDEX_HINT_FILE = \"_create_index_hint\"\n", "FAISS_INDEX_DIR = \"faiss_index\"" ] }, { "cell_type": "code", "execution_count": null, "id": "42612999-1cf9-46c7-a978-95f5991f1ce9", "metadata": { "tags": [] }, "outputs": [], "source": [ "logger = logging.getLogger()\n", "logging.basicConfig(format='%(asctime)s,%(module)s,%(processName)s,%(levelname)s,%(message)s', level=logging.INFO, stream=sys.stderr)" ] }, { "cell_type": "markdown", "id": "cb0453eb-9781-4ac7-b320-271620f7d18f", "metadata": {}, "source": [ "### Read parameters from Cloud Formation stack\n", "\n", "Some of the resources needed for this notebook such as the Embeddings LLM model endpoint, the Amazon OpenSearch cluster are created outside of this notebook, typically through a cloud formation template. We now read the outputs and parameters of the cloud formation stack created from that template to get the value of these parameters. \n", "\n", "The stack name here should match the stack name you used when creating the cloud formation stack." ] }, { "cell_type": "code", "execution_count": null, "id": "f3c3dc1e-65a1-482f-9027-46eaea80742f", "metadata": { "tags": [] }, "outputs": [], "source": [ "# if used a different name while creating the cloud formation stack then change this to match the name you used\n", "CFN_STACK_NAME = \"llm-apps-blog-rag\"" ] }, { "cell_type": "markdown", "id": "5814ff8c-b8f9-43fb-bb2b-da690a95bf68", "metadata": { "tags": [] }, "source": [ "**If you did not use a cloud formation template for creating these resources then set the names of these resources manually in the code below.**" ] }, { "cell_type": "code", "execution_count": null, "id": "fde8bcc6-d0ed-4813-9595-8d7c2e720027", "metadata": { "tags": [] }, "outputs": [], "source": [ "#boto3.client('cloudformation').describe_stacks(StackName=\"ssome\")\n", "stacks = boto3.client('cloudformation').list_stacks()\n", "stack_found = CFN_STACK_NAME in [stack['StackName'] for stack in stacks['StackSummaries']]" ] }, { "cell_type": "code", "execution_count": null, "id": "d4849d39-c2ca-4e21-a505-61b6a4970615", "metadata": { "tags": [] }, "outputs": [], "source": [ "def get_cfn_outputs(stackname: str) -> List:\n", " cfn = boto3.client('cloudformation')\n", " outputs = {}\n", " for output in cfn.describe_stacks(StackName=stackname)['Stacks'][0]['Outputs']:\n", " outputs[output['OutputKey']] = output['OutputValue']\n", " return outputs\n", "\n", "def get_cfn_parameters(stackname: str) -> List:\n", " cfn = boto3.client('cloudformation')\n", " params = {}\n", " for param in cfn.describe_stacks(StackName=stackname)['Stacks'][0]['Parameters']:\n", " params[param['ParameterKey']] = param['ParameterValue']\n", " return params\n", "\n", "if stack_found is True:\n", " outputs = get_cfn_outputs(CFN_STACK_NAME)\n", " params = get_cfn_parameters(CFN_STACK_NAME)\n", " logger.info(f\"cfn outputs={outputs}\\nparams={params}\")\n", "\n", " embeddings_model_endpoint_name = outputs['EmbeddingEndpointName']\n", " opensearch_domain_endpoint = f\"https://{outputs['OpenSearchDomainEndpoint']}\"\n", " opensearch_index = params['OpenSearchIndexName']\n", " app_name = params['AppName']\n", " # ARN of the secret is of the following format arn:aws:secretsmanager:region:account_id:secret:my_path/my_secret_name-autoid\n", " os_creds_secretid_in_secrets_manager = \"-\".join(outputs['OpenSearchSecret'].split(\":\")[-1].split('-')[:-1])\n", "else:\n", " logger.info(f\"cloud formation stack {CFN_STACK_NAME} not found, set parameters manually here\")\n", " # REPLACE THE \"placeholder\" WITH ACTUAL VALUES IF YOU CREATED THESE RESOURCES WITHOUT USING A CLOUD FORMATION TEMPLATE\n", " embeddings_model_endpoint_name = \"placeholder\"\n", " opensearch_domain_endpoint = \"placeholder\"\n", " opensearch_index = \"placeholder\"\n", " os_creds_secretid_in_secrets_manager = \"placeholder\"\n", " app_name = \"llm-apps-blogs\"" ] }, { "cell_type": "markdown", "id": "062d37de-df4c-409e-b4ca-d05b8cbca56b", "metadata": {}, "source": [ "The embeddings model endpoint name, OpenSearch domain endpoint and the identifier for the OpenSearch credentials stored in the Secrets Mananger are all available as `Outputs` from the cloud formation stack." ] }, { "cell_type": "code", "execution_count": null, "id": "54fbde3d-cc6c-49ef-bf82-ecaf69034c69", "metadata": { "tags": [] }, "outputs": [], "source": [ "logger.info(f\"embeddings_model_endpoint_name={embeddings_model_endpoint_name},\\nopensearch_domain_endpoint={opensearch_domain_endpoint},\\n\"\n", " f\"os_creds_secretid_in_secrets_manager={os_creds_secretid_in_secrets_manager},opensearch_index={opensearch_index}\")" ] }, { "cell_type": "code", "execution_count": null, "id": "25fc00c6-d5e3-4776-93a3-5d5ed6a3e729", "metadata": { "tags": [] }, "outputs": [], "source": [ "sagemaker_session = Session()\n", "aws_role = sagemaker_session.get_caller_identity_arn()\n", "aws_region = boto3.Session().region_name\n", "bucket = sagemaker_session.default_bucket()\n", "logger.info(f\"aws_role={aws_role}, aws_region={aws_region}, bucket={bucket}\")" ] }, { "cell_type": "markdown", "id": "fb2f4b53-23fe-4efd-9d5a-9211d7c2fe79", "metadata": {}, "source": [ "---\n", "\n", "## Step 2: Download the data from the web and upload to S3\n", "\n", "In this step we use `wget` to crawl a Python documentation style website data. All files other than `html`, `txt` and `md` are removed. **This data download would take a few minutes**." ] }, { "cell_type": "code", "execution_count": null, "id": "9f240968-f4fa-4fcb-b517-685f34de836f", "metadata": { "tags": [] }, "outputs": [], "source": [ "!mkdir -p scripts" ] }, { "cell_type": "code", "execution_count": null, "id": "42dbecc5-9b1f-4a86-b905-c51dcdf926ea", "metadata": { "tags": [] }, "outputs": [], "source": [ "DOWNLOAD_DATA = \"yes\"" ] }, { "cell_type": "code", "execution_count": null, "id": "f5d2acc1-198b-4f24-adb0-6ac8254d6cbc", "metadata": { "tags": [] }, "outputs": [], "source": [ "%%writefile scripts/get_data.sh\n", "# This scripts uses wget to crawl the input website and \n", "# save the downloaded files in a given directory.\n", "echo \"input args=\"\n", "echo $@\n", "if [[ \"$1\" == \"yes\" ]];\n", "then\n", " WEBSITE=$2\n", " DOMAIN=$3\n", " KB_DIR=$4 \n", " # delete any existing folder for this data\n", " rm -rf ${DOMAIN} ${KB_DIR}\n", " mkdir -p ${KB_DIR}\n", " \n", " # download the data, this may take a few minutes or more depending upon the amount of content, network speed etc.\n", " wget -e robots=off --recursive --no-clobber --page-requisites --html-extension --convert-links --restrict-file-names=windows --domains ${DOMAIN} --no-parent ${WEBSITE}\n", " \n", " # we only want to keep the html files\n", " # and copy them into a new directory with their\n", " # full path name flattened into a single file\n", " # so /path/to/a/file becomes path_to_a_file, this\n", " # is done so that we can upload all files to a single \n", " # prefix in S3 which allows the Sagemaker Processing Job\n", " # to easily split the files between instances \n", " for i in `find ${DOMAIN} -name \"*.html\"`\n", " do\n", " flat_i=`echo \"${i//\\//_}\"`\n", " echo going to copy $i to ${KB_DIR}/$flat_i\n", " cp $i ${KB_DIR}/$flat_i \n", " done\n", " \n", " file_count=`ls | wc -l`\n", " echo there are $file_count files in ${DOMAIN} directory\n", "else\n", " echo DOWNLOAD_DATA=$1, not downloading new data\n", "fi" ] }, { "cell_type": "code", "execution_count": null, "id": "4729828a-3f07-412b-9ca5-836bf05c0214", "metadata": { "tags": [] }, "outputs": [], "source": [ "!chmod +x scripts/get_data.sh\n", "!./scripts/get_data.sh $DOWNLOAD_DATA $WEBSITE $DOMAIN $DATA_DIR" ] }, { "cell_type": "code", "execution_count": null, "id": "895293b2-edec-4a70-98be-81feab16a4c9", "metadata": { "tags": [] }, "outputs": [], "source": [ "# create a dummy file called _create_index to provide a hint for opensearch index creation\n", "# this is needed for Sagemaker Processing Job when there are multiple instance nodes\n", "# all running the same code for data ingestion but only one node needs to create the index\n", "!touch $DATA_DIR/$CREATE_OS_INDEX_HINT_FILE\n", "\n", "# upload this data to S3, to be used when we run the Sagemaker Processing Job\n", "!aws s3 cp --recursive $DATA_DIR/ s3://$bucket/$app_name/$DOMAIN" ] }, { "cell_type": "markdown", "id": "95833d54-d6bb-4d4c-97b6-9157cd9bcabf", "metadata": { "tags": [] }, "source": [ "---\n", "\n", "## Step 3: Load data into `OpenSearch`\n", "\n", "We are now ready to create scripts which will read data from the local directory, use langchain to create embeddings and then upload the embeddings into OpenSearch." ] }, { "cell_type": "code", "execution_count": null, "id": "bdb00176-452a-4fe2-999e-322da380a6e7", "metadata": { "tags": [] }, "outputs": [], "source": [ "\"\"\"\n", "Create directories for storing scripts and Dockerfile\n", "\"\"\"\n", "!mkdir -p src\n", "!mkdir -p scripts\n", "!mkdir -p container" ] }, { "cell_type": "markdown", "id": "c6a0545c-b593-4319-b355-4dacee393416", "metadata": {}, "source": [ "### Read credentials from AWS Secrets Manager\n", "\n", "The credentials for the OpenSearch cluster are store in AWS Secrets Mananger, our code reads the credentials from there and provides them to the opensearch-py package (through langchain API)." ] }, { "cell_type": "code", "execution_count": null, "id": "99538cbf-aa7b-4dd4-b01e-8d4932a088d6", "metadata": { "tags": [] }, "outputs": [], "source": [ "%%writefile container/credentials.py\n", "\n", "\"\"\"\n", "Retrieve credentials password for given username from AWS SecretsManager\n", "\"\"\"\n", "import json\n", "import boto3\n", "\n", "def get_credentials(secret_id: str, region_name: str) -> str:\n", " \n", " client = boto3.client('secretsmanager', region_name=region_name)\n", " response = client.get_secret_value(SecretId=secret_id)\n", " secrets_value = json.loads(response['SecretString']) \n", " \n", " return secrets_value" ] }, { "cell_type": "markdown", "id": "de107289-6b61-445e-ad53-310db4f08946", "metadata": {}, "source": [ "### SageMaker embeddings for langchain\n", "\n", "langchain provides the [`SagemakerEndpointEmbeddings`]() class which is a wrapper around a functionality to talk to a Sagemaker Endpoint to generate embeddings. We will override the `embed_documents` function to define our own batching strategy for sending requests to the model (multiple requests are sent in one model invocation). Similarly, we extend the `ContentHandlerBase` class to provide implementation for two abstract methods which define how to process (encode/decode) the input data sent to the model and the output received from the model.\n", "\n", "We finally create a `SagemakerEndpointEmbeddingsJumpStart` object that puts all this together and can now be used by langchain to talk to an LLM deployed as a Sagemaker Endpoint to generate embeddings." ] }, { "cell_type": "code", "execution_count": null, "id": "3385847f-094f-4eaa-9e3c-9fde1b262bd6", "metadata": { "tags": [] }, "outputs": [], "source": [ "%%writefile container/sm_helper.py\n", "\n", "\"\"\"\n", "Helper functions for using Samgemaker Endpoint via langchain\n", "\"\"\"\n", "import time\n", "import json\n", "import logging\n", "from typing import List\n", "from langchain.embeddings import SagemakerEndpointEmbeddings\n", "from langchain.embeddings.sagemaker_endpoint import EmbeddingsContentHandler\n", "\n", "logger = logging.getLogger(__name__)\n", "\n", "# extend the SagemakerEndpointEmbeddings class from langchain to provide a custom embedding function\n", "class SagemakerEndpointEmbeddingsJumpStart(SagemakerEndpointEmbeddings):\n", " def embed_documents(\n", " self, texts: List[str], chunk_size: int = 5\n", " ) -> List[List[float]]:\n", " \"\"\"Compute doc embeddings using a SageMaker Inference Endpoint.\n", "\n", " Args:\n", " texts: The list of texts to embed.\n", " chunk_size: The chunk size defines how many input texts will\n", " be grouped together as request. If None, will use the\n", " chunk size specified by the class.\n", "\n", " Returns:\n", " List of embeddings, one for each text.\n", " \"\"\"\n", " results = []\n", " _chunk_size = len(texts) if chunk_size > len(texts) else chunk_size\n", " st = time.time()\n", " for i in range(0, len(texts), _chunk_size):\n", " response = self._embedding_func(texts[i:i + _chunk_size])\n", " results.extend(response)\n", " time_taken = time.time() - st\n", " logger.info(f\"got results for {len(texts)} in {time_taken}s, length of embeddings list is {len(results)}\")\n", " return results\n", "\n", "\n", "# class for serializing/deserializing requests/responses to/from the embeddings model\n", "class ContentHandler(EmbeddingsContentHandler):\n", " content_type = \"application/json\"\n", " accepts = \"application/json\"\n", "\n", " def transform_input(self, prompt: str, model_kwargs={}) -> bytes:\n", "\n", " input_str = json.dumps({\"text_inputs\": prompt, **model_kwargs})\n", " return input_str.encode('utf-8') \n", "\n", " def transform_output(self, output: bytes) -> str:\n", "\n", " response_json = json.loads(output.read().decode(\"utf-8\"))\n", " embeddings = response_json[\"embedding\"]\n", " if len(embeddings) == 1:\n", " return [embeddings[0]]\n", " return embeddings\n", " \n", "\n", "def create_sagemaker_embeddings_from_js_model(embeddings_model_endpoint_name: str, aws_region: str) -> SagemakerEndpointEmbeddingsJumpStart:\n", " # all set to create the objects for the ContentHandler and \n", " # SagemakerEndpointEmbeddingsJumpStart classes\n", " content_handler = ContentHandler()\n", "\n", " # note the name of the LLM Sagemaker endpoint, this is the model that we would\n", " # be using for generating the embeddings\n", " embeddings = SagemakerEndpointEmbeddingsJumpStart( \n", " endpoint_name=embeddings_model_endpoint_name,\n", " region_name=aws_region, \n", " content_handler=content_handler\n", " )\n", " return embeddings" ] }, { "cell_type": "markdown", "id": "2e31e78c-f65c-4662-8bb6-f98ed880d307", "metadata": {}, "source": [ "### Script to load data into OpenSearch\n", "\n", "This script puts everything together, it divides the documents into chunks, then uses the langchain package to create embeddings (through `SagemakerEndpointEmbeddingsJumpStart`) and then ingests the data into OpenSearch using `OpenSearchVectorSearch`. \n", "\n", "To keep things simple the chunks size is set to a fixed length of 500 tokens, with an overlap of 30 tokens. The langchain `OpenSearchVectorSearch` provides a wrapper over the `opensearch-py` package. It uses the `/_bulk` API endpoint for ingesting multiple records in a single PUT request." ] }, { "cell_type": "code", "execution_count": null, "id": "2d1e2e17-538b-45e6-bbe3-a29fe8a9e221", "metadata": {}, "outputs": [], "source": [ "%%writefile container/load_data_into_opensearch.py\n", "\n", "import os\n", "import sys\n", "\n", "# this is needed because the credentials.py and sm_helper.py\n", "# are in /code directory of the custom container we are going \n", "# to create for Sagemaker Processing Job\n", "sys.path.insert(1, '/code')\n", "\n", "import glob\n", "import time\n", "import json\n", "import logging\n", "import argparse\n", "import numpy as np\n", "import multiprocessing as mp\n", "from itertools import repeat\n", "from functools import partial\n", "import sagemaker, boto3, json\n", "from typing import List, Tuple\n", "from sagemaker.session import Session\n", "from credentials import get_credentials\n", "from opensearchpy.client import OpenSearch\n", "from langchain.document_loaders import ReadTheDocsLoader\n", "from langchain.vectorstores import OpenSearchVectorSearch\n", "from langchain.embeddings import SagemakerEndpointEmbeddings\n", "from sm_helper import create_sagemaker_embeddings_from_js_model\n", "from langchain.llms.sagemaker_endpoint import ContentHandlerBase\n", "from langchain.text_splitter import RecursiveCharacterTextSplitter\n", "from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth\n", "\n", "# global constants\n", "MAX_OS_DOCS_PER_PUT = 500\n", "TOTAL_INDEX_CREATION_WAIT_TIME = 60\n", "PER_ITER_SLEEP_TIME = 5\n", "logger = logging.getLogger()\n", "logging.basicConfig(format='%(asctime)s,%(module)s,%(processName)s,%(levelname)s,%(message)s', level=logging.INFO, stream=sys.stderr)\n", "\n", "def check_if_index_exists(index_name: str, region: str, host: str, http_auth: Tuple[str, str]) -> OpenSearch:\n", " #update the region if you're working other than us-east-1\n", "\n", " aos_client = OpenSearch(\n", " hosts = [{'host': host.replace(\"https://\", \"\"), 'port': 443}],\n", " http_auth = http_auth,\n", " use_ssl = True,\n", " verify_certs = True,\n", " connection_class = RequestsHttpConnection\n", " )\n", " exists = aos_client.indices.exists(index_name)\n", " logger.info(f\"index_name={index_name}, exists={exists}\")\n", " return exists\n", "\n", " \n", "def process_shard(shard, embeddings_model_endpoint_name, aws_region, os_index_name, os_domain_ep, os_http_auth) -> int: \n", " logger.info(f'Starting process_shard of {len(shard)} chunks.')\n", " st = time.time()\n", " embeddings = create_sagemaker_embeddings_from_js_model(embeddings_model_endpoint_name, aws_region)\n", " docsearch = OpenSearchVectorSearch(index_name=os_index_name,\n", " embedding_function=embeddings,\n", " opensearch_url=os_domain_ep,\n", " http_auth=os_http_auth) \n", " docsearch.add_documents(documents=shard)\n", " et = time.time() - st\n", " logger.info(f'Shard completed in {et} seconds.')\n", " return 0\n", "\n", "if __name__ == \"__main__\":\n", " parser = argparse.ArgumentParser()\n", " parser.add_argument(\"--opensearch-cluster-domain\", type=str, default=None)\n", " parser.add_argument(\"--opensearch-secretid\", type=str, default=None)\n", " parser.add_argument(\"--opensearch-index-name\", type=str, default=None)\n", " parser.add_argument(\"--aws-region\", type=str, default=\"us-east-1\")\n", " parser.add_argument(\"--embeddings-model-endpoint-name\", type=str, default=None)\n", " parser.add_argument(\"--chunk-size-for-doc-split\", type=int, default=500)\n", " parser.add_argument(\"--chunk-overlap-for-doc-split\", type=int, default=30)\n", " parser.add_argument(\"--input-data-dir\", type=str, default=\"/opt/ml/processing/input_data\")\n", " parser.add_argument(\"--process-count\", type=int, default=1)\n", " parser.add_argument(\"--create-index-hint-file\", type=str, default=\"_create_index_hint\")\n", " args, _ = parser.parse_known_args()\n", "\n", " logger.info(\"Received arguments {}\".format(args))\n", " # list all the files\n", " files = glob.glob(os.path.join(args.input_data_dir, \"*.*\"))\n", " logger.info(f\"there are {len(files)} files to process in the {args.input_data_dir} folder\")\n", " \n", " # retrieve secret to talk to opensearch\n", " creds = get_credentials(args.opensearch_secretid, args.aws_region)\n", " http_auth = (creds['username'], creds['password'])\n", " \n", " \n", " loader = ReadTheDocsLoader(args.input_data_dir)\n", " text_splitter = RecursiveCharacterTextSplitter(\n", " # Set a really small chunk size, just to show.\n", " chunk_size=args.chunk_size_for_doc_split,\n", " chunk_overlap=args.chunk_overlap_for_doc_split,\n", " length_function=len,\n", " )\n", " \n", " # Stage one: read all the docs, split them into chunks. \n", " st = time.time() \n", " logger.info('Loading documents ...')\n", " docs = loader.load()\n", " \n", " # add a custom metadata field, such as timestamp\n", " for doc in docs:\n", " doc.metadata['timestamp'] = time.time()\n", " doc.metadata['embeddings_model'] = args.embeddings_model_endpoint_name\n", " chunks = text_splitter.create_documents([doc.page_content for doc in docs], metadatas=[doc.metadata for doc in docs])\n", " et = time.time() - st\n", " logger.info(f'Time taken: {et} seconds. {len(chunks)} chunks generated') \n", " \n", " \n", " db_shards = (len(chunks) // MAX_OS_DOCS_PER_PUT) + 1\n", " print(f'Loading chunks into vector store ... using {db_shards} shards') \n", " st = time.time()\n", " shards = np.array_split(chunks, db_shards)\n", " \n", " t1 = time.time()\n", " \n", " # first check if index exists, if it does then call the add_documents function\n", " # otherwise call the from_documents function which would first create the index\n", " # and then do a bulk add. Both add_documents and from_documents do a bulk add\n", " # but it is important to call from_documents first so that the index is created\n", " # correctly for K-NN\n", " index_exists = check_if_index_exists(args.opensearch_index_name,\n", " args.aws_region,\n", " args.opensearch_cluster_domain,\n", " http_auth)\n", " \n", " embeddings = create_sagemaker_embeddings_from_js_model(args.embeddings_model_endpoint_name, args.aws_region)\n", " \n", " if index_exists is False:\n", " # create an index if the create index hint file exists\n", " path = os.path.join(args.input_data_dir, args.create_index_hint_file)\n", " if os.path.isfile(path) is True:\n", " logger.info(f\"index {args.opensearch_index_name} does not exist but {path} file is present so will create index\")\n", " # by default langchain would create a k-NN index and the embeddings would be ingested as a k-NN vector type\n", " docsearch = OpenSearchVectorSearch.from_documents(index_name=args.opensearch_index_name,\n", " documents=shards[0],\n", " embedding=embeddings,\n", " opensearch_url=args.opensearch_cluster_domain,\n", " http_auth=http_auth)\n", " # we now need to start the loop below for the second shard\n", " shard_start_index = 1 \n", " else:\n", " logger.info(f\"index {args.opensearch_index_name} does not exist and {path} file is not present, \"\n", " f\"will wait for some other node to create the index\")\n", " shard_start_index = 0\n", " # start a loop to wait for index creation by another node\n", " time_slept = 0\n", " while True:\n", " logger.info(f\"index {args.opensearch_index_name} still does not exist, sleeping...\")\n", " time.sleep(PER_ITER_SLEEP_TIME)\n", " index_exists = check_if_index_exists(args.opensearch_index_name,\n", " args.aws_region,\n", " args.opensearch_cluster_domain,\n", " http_auth)\n", " if index_exists is True:\n", " logger.info(f\"index {args.opensearch_index_name} now exists\")\n", " break\n", " time_slept += PER_ITER_SLEEP_TIME\n", " if time_slept >= TOTAL_INDEX_CREATION_WAIT_TIME:\n", " logger.error(f\"time_slept={time_slept} >= {TOTAL_INDEX_CREATION_WAIT_TIME}, not waiting anymore for index creation\")\n", " break\n", " \n", " else:\n", " logger.info(f\"index={args.opensearch_index_name} does exists, going to call add_documents\")\n", " shard_start_index = 0\n", " \n", " with mp.Pool(processes = args.process_count) as pool:\n", " results = pool.map(partial(process_shard,\n", " embeddings_model_endpoint_name=args.embeddings_model_endpoint_name,\n", " aws_region=args.aws_region,\n", " os_index_name=args.opensearch_index_name,\n", " os_domain_ep=args.opensearch_cluster_domain,\n", " os_http_auth=http_auth),\n", " shards[shard_start_index:])\n", " \n", " t2 = time.time()\n", " logger.info(f'run time in seconds: {t2-t1:.2f}')\n", " logger.info(\"all done\")" ] }, { "cell_type": "markdown", "id": "b2cdc58d-b6b2-42c5-8c45-bd73623051ce", "metadata": { "tags": [] }, "source": [ "---\n", "\n", "## Load the data in a [FAISS](https://github.com/facebookresearch/faiss) index (Local mode)\n", "\n", "We now create a FAISS index to store the embeddings. This is an alternative to OpenSearch for storing embeddings in-memory. We write the FAISS index locally and then upoad the files to an S3 bucket. A Lambda function can then download these files from S3 and load the FAISS index in memory to perform a similarity search." ] }, { "cell_type": "code", "execution_count": null, "id": "6fd41570-e61c-4c75-9c27-7cfd3c827ef2", "metadata": { "tags": [] }, "outputs": [], "source": [ "from langchain.vectorstores import FAISS\n", "from container.sm_helper import create_sagemaker_embeddings_from_js_model\n", "embeddings = create_sagemaker_embeddings_from_js_model(embeddings_model_endpoint_name, aws_region)\n", "\n", "loader = ReadTheDocsLoader(DATA_DIR)\n", "text_splitter = RecursiveCharacterTextSplitter(\n", " # Set a really small chunk size, just to show.\n", " chunk_size=CHUNK_SIZE_FOR_DOC_SPLIT,\n", " chunk_overlap=CHUNK_OVERLAP_FOR_DOC_SPLIT,\n", " length_function=len,\n", ")\n", " \n", "# read all the docs, split them into chunks. \n", "st = time.time() \n", "logger.info('Loading documents ...')\n", "docs = loader.load()\n", "\n", "# add a custom metadata field, such as timestamp\n", "for doc in docs:\n", " doc.metadata['timestamp'] = time.time()\n", " doc.metadata['embeddings_model'] = embeddings_model_endpoint_name\n", "chunks = text_splitter.create_documents([doc.page_content for doc in docs], metadatas=[doc.metadata for doc in docs])\n", "et = time.time() - st\n", "logger.info(f'Time taken: {et} seconds. {len(chunks)} chunks generated') " ] }, { "cell_type": "markdown", "id": "08c2d20a-7f68-45ca-830d-169ae2090ad2", "metadata": {}, "source": [ "Load the chunks into FAISS, we provide the embeddings object so that langchain can first convert the text chunks into embeddings and then store those embeddings into a FAISS index." ] }, { "cell_type": "code", "execution_count": null, "id": "52670206-9631-4c1c-aea6-f8a0d93421d2", "metadata": { "tags": [] }, "outputs": [], "source": [ "vector_db = FAISS.from_documents(chunks, embeddings)" ] }, { "cell_type": "markdown", "id": "18fcd3e9-0a2c-4e6b-963f-aacb2a4106bf", "metadata": {}, "source": [ "Save to a local directory and upload to S3." ] }, { "cell_type": "code", "execution_count": null, "id": "3971a475-0ce2-4443-b66a-8c70f5b72d04", "metadata": { "tags": [] }, "outputs": [], "source": [ "vector_db_path = FAISS_INDEX_DIR\n", "vector_db.save_local(vector_db_path)" ] }, { "cell_type": "code", "execution_count": null, "id": "2f2b05bc-d8b5-42ee-84c0-0c744e84e092", "metadata": { "tags": [] }, "outputs": [], "source": [ "# upload this data to S3, to be used when we run the Sagemaker Processing Job\n", "!aws s3 cp --recursive $vector_db_path/ s3://$bucket/$app_name/$vector_db_path" ] }, { "cell_type": "markdown", "id": "9016a278-9e64-43bd-867f-1adb87136bd0", "metadata": {}, "source": [ "---\n", "\n", "## Load the data in a `OpenSearch` index via SageMaker Processing Job (Distributed mode)\n", "\n", "We now have a working script that is able to ingest data into an OpenSearch index. But for this to work for massive amounts of data we need to scale up the processing by running this code in a distributed fashion. We will do this using Sagemkaer Processing Job. This involves the following steps:\n", "\n", "1. Create a custom container in which we will install the `langchain` and `opensearch-py` packges and then upload this container image to Amazon Elastic Container Registry (ECR).\n", "2. Use the Sagemaker `ScriptProcessor` class to create a Sagemaker Processing job that will run on multiple nodes.\n", " - The data files available in S3 are automatically distributed across in the Sagemaker Processing Job instances by setting `s3_data_distribution_type='ShardedByS3Key'` as part of the `ProcessingInput` provided to the processing job.\n", " - Each node processes a subset of the files and this brings down the overall time required to ingest the data into Opensearch.\n", " - Each node also uses Python `multiprocessing` to internally also parallelize the file processing. Thus, **there are two levels of parallelization happening, one at the cluster level where individual nodes are distributing the work (files) amongst themselves and another at the node level where the files in a node are also split between multiple processes running on the node**." ] }, { "cell_type": "markdown", "id": "3e6cced6-5e83-46d3-9c6e-cb0602f1ddf3", "metadata": {}, "source": [ "### Create custom container\n", "\n", "We will now create a container locally and push the container image to ECR. **The container creation process takes about 1 minute**.\n", "\n", "1. The container include all the Python packages we need i.e. `langchain`, `opensearch-py`, `sagemaker` and `beautifulsoup4`.\n", "1. The container also includes the `credentials.py` script for retrieving credentials from Secrets Manager and `sm_helper.py` for helping to create SageMaker endpoint classes that langchain uses." ] }, { "cell_type": "code", "execution_count": null, "id": "a91204c0-0cd2-45f1-b98e-56927fa04764", "metadata": { "tags": [] }, "outputs": [], "source": [ "%%writefile container/Dockerfile\n", "\n", "FROM python:3.9-slim-buster\n", "\n", "RUN apt-get -y update && apt-get install -y --no-install-recommends \\\n", " wget \\\n", " python3-pip \\\n", " python3-setuptools \\\n", " nginx \\\n", " ca-certificates \\\n", " && rm -rf /var/lib/apt/lists/*\n", "\n", "RUN ln -s /usr/bin/python3 /usr/bin/python\n", "RUN ln -s /usr/bin/pip3 /usr/bin/pip\n", "\n", "# pip leaves the install caches populated which uses a \n", "# significant amount of space. These optimizations save a fair \n", "# amount of space in the image, which reduces start up time.\n", "RUN pip --no-cache-dir install langchain==0.0.149 opensearch-py==2.2.0 sagemaker==2.148.0 beautifulsoup4==4.12.2\n", "\n", "# Include python script for retrieving credentials \n", "# from AWS SecretsManager and Sagemaker helper classes\n", "ADD credentials.py /code/\n", "ADD sm_helper.py /code/\n", "\n", "# Set some environment variables. PYTHONUNBUFFERED keeps Python from buffering our standard\n", "# output stream, which means that logs can be delivered to the user quickly. PYTHONDONTWRITEBYTECODE\n", "# keeps Python from writing the .pyc files which are unnecessary in this case. We also update\n", "# PATH so that the train and serve programs are found when the container is invoked.\n", "ENV PYTHONUNBUFFERED=TRUE\n", "ENV PYTHONDONTWRITEBYTECODE=TRUE" ] }, { "cell_type": "code", "execution_count": null, "id": "7f4f075b-e8c5-41d8-9073-a4ace9eec175", "metadata": { "tags": [] }, "outputs": [], "source": [ "%%writefile scripts/build_and_push.sh\n", "\n", "#!/usr/bin/env bash\n", "# This script shows how to build the Docker image and push it to ECR to be ready for use\n", "# by SageMaker.\n", "# The argument to this script are the path to the Dockerfile, the image name and tag and the aws-region\n", "# in which the container is to be created. This will be used as the image on the local\n", "# machine and combined with the account and region to form the repository name for ECR.\n", "\n", "# override the built-in echo so that we can have a nice timestamped trace\n", "echo () {\n", " builtin echo \"$(date +'[%m-%d %H:%M:%S]'):\" \"$@\"\n", "}\n", "\n", "if [ \"$#\" -eq 4 ]; then\n", " dlc_account_id=$(aws sts get-caller-identity | jq .Account)\n", " path_to_dockerfile=$1\n", " image=$2\n", " tag=$3\n", " region=$4\n", " \n", "else\n", " echo \"missing mandatory command line arguments, see usage...\"\n", " echo \"usage: $0 $1 $2 $3 \"\n", " exit 1\n", "fi\n", "\n", "# Get the account number associated with the current IAM credentials\n", "account=$(aws sts get-caller-identity --query Account --output text)\n", "\n", "if [ $? -ne 0 ]\n", "then\n", " exit 255\n", "fi\n", "\n", "\n", "fullname=\"${account}.dkr.ecr.${region}.amazonaws.com/${image}:${tag}\"\n", "echo the full image name would be ${fullname}\n", "\n", "# If the repository doesn't exist in ECR, create it.\n", "aws ecr describe-repositories --region ${region} --repository-names \"${image}\" > /dev/null 2>&1\n", "if [ $? -ne 0 ]; then\n", " echo \"creating ECR repository : ${fullname} \"\n", " aws ecr create-repository --region ${region} --repository-name \"${image}\" > /dev/null\n", "else\n", " echo \"${image} repo already exists in ECR\"\n", "fi\n", "\n", "# move to path of dockerfile\n", "cd ${path_to_dockerfile}\n", "\n", "# get credentials to login to ECR and, build and tag the image\n", "# note the use of DOCKER_BUILDKIT=1, this is needed for some mount instructions in the Dockerfile\n", "echo \"going to start a docker build, image=${image}, using Dockerfile=${path_to_dockerfile}\"\n", "aws ecr get-login-password --region ${region} \\\n", "| docker login --username AWS --password-stdin ${account}.dkr.ecr.${region}.amazonaws.com\n", "DOCKER_BUILDKIT=1 docker build . -t ${image} --build-arg dlc_account_id=${dlc_account_id} --build-arg region=${region}\n", "docker tag ${image} ${fullname}\n", "echo ${image} created\n", "\n", "# push the image to ECR\n", "cmd=\"aws ecr get-login-password --region ${region} | docker login --username AWS --password-stdin ${account}.dkr.ecr.${region}.amazonaws.com\"\n", "echo going to run \\\"${cmd}\\\" to login to ECR\n", "${cmd}\n", "\n", "cmd=\"docker push ${fullname}\"\n", "echo going to run \\\"${cmd}\\\" to push image to ecr\n", "${cmd}\n", "if [ $? -eq 0 ]; then\n", " echo \"Amazon ECR URI: ${fullname}\"\n", "else\n", " echo \"Error: Image ${fullname} build and push failed\"\n", " exit 1\n", "fi\n", "\n", "echo \"all done\"\n" ] }, { "cell_type": "code", "execution_count": null, "id": "d0c76456-f381-44b2-ae35-70563e9cb4cc", "metadata": { "tags": [] }, "outputs": [], "source": [ "# Run script to build docker custom containe image and push it to ECR \n", "# Set region and sagemaker URI variables \n", "session = boto3.session.Session()\n", "client = boto3.client(\"sts\")\n", "account_id = client.get_caller_identity()[\"Account\"]\n", "logger.info(f\"region={aws_region}, account_id={account_id}\")\n", "!bash scripts/build_and_push.sh $(pwd)/container $IMAGE $IMAGE_TAG $aws_region" ] }, { "cell_type": "markdown", "id": "eb0022cd-0934-45ef-8741-5a9ebd7cd6fe", "metadata": {}, "source": [ "### Create and run the Sagemaker Processing Job\n", "\n", "Now we will run the Sagemaker Processing Job to ingest the data into OpenSearch." ] }, { "cell_type": "code", "execution_count": null, "id": "621bb019-8b4c-4ff0-8f1f-0b7deaaf5c32", "metadata": {}, "outputs": [], "source": [ "# setup the parameters for the job\n", "base_job_name = f\"{app_name}-job\"\n", "tags = [{\"Key\": \"data\", \"Value\": \"embeddings-for-llm-apps\"}]\n", "\n", "# use the custom container we just created\n", "image_uri = f\"{account_id}.dkr.ecr.{aws_region}.amazonaws.com/{IMAGE}:{IMAGE_TAG}\"\n", "\n", "# instance type and count determined via trial and error: how much overall processing time\n", "# and what compute cost works best for your use-case\n", "instance_type = \"ml.m5.xlarge\"\n", "instance_count = 3\n", "logger.info(f\"base_job_name={base_job_name}, tags={tags}, image_uri={image_uri}, instance_type={instance_type}, instance_count={instance_count}\")\n", "\n", "# setup the ScriptProcessor with the above parameters\n", "processor = ScriptProcessor(base_job_name=base_job_name,\n", " image_uri=image_uri,\n", " role=aws_role,\n", " instance_type=instance_type,\n", " instance_count=instance_count,\n", " command=[\"python3\"],\n", " tags=tags)\n", "\n", "# setup input from S3, note the ShardedByS3Key, this ensures that \n", "# each instance gets a random and equal subset of the files in S3.\n", "inputs = [ProcessingInput(source=f\"s3://{bucket}/{app_name}/{DOMAIN}\",\n", " destination='/opt/ml/processing/input_data',\n", " s3_data_distribution_type='ShardedByS3Key',\n", " s3_data_type='S3Prefix')]\n", "\n", "\n", "logger.info(f\"creating an opensearch index with name={opensearch_index}\")\n", "# ready to run the processing job\n", "st = time.time()\n", "processor.run(code=\"container/load_data_into_opensearch.py\",\n", " inputs=inputs,\n", " outputs=[],\n", " arguments=[\"--opensearch-cluster-domain\", opensearch_domain_endpoint,\n", " \"--opensearch-secretid\", os_creds_secretid_in_secrets_manager,\n", " \"--opensearch-index-name\", opensearch_index,\n", " \"--aws-region\", aws_region,\n", " \"--embeddings-model-endpoint-name\", embeddings_model_endpoint_name,\n", " \"--chunk-size-for-doc-split\", str(CHUNK_SIZE_FOR_DOC_SPLIT),\n", " \"--chunk-overlap-for-doc-split\", str(CHUNK_OVERLAP_FOR_DOC_SPLIT),\n", " \"--input-data-dir\", \"/opt/ml/processing/input_data\",\n", " \"--create-index-hint-file\", CREATE_OS_INDEX_HINT_FILE,\n", " \"--process-count\", \"2\"])\n", "time_taken = time.time() - st\n", "logger.info(f\"processing job completed, total time taken={time_taken}s\")\n", "preprocessing_job_description = processor.jobs[-1].describe()\n", "logger.info(preprocessing_job_description)" ] }, { "cell_type": "markdown", "id": "087307b9-ab77-4956-9308-7856b515583e", "metadata": {}, "source": [ "## Step 4: Do a similarity search for for user input to documents (embeddings) in OpenSearch" ] }, { "cell_type": "code", "execution_count": null, "id": "a19dc18b-42ba-4236-85db-63998effeec8", "metadata": { "tags": [] }, "outputs": [], "source": [ "from container.credentials import get_credentials\n", "from langchain.vectorstores import OpenSearchVectorSearch\n", "from container.sm_helper import create_sagemaker_embeddings_from_js_model\n", "\n", "creds = get_credentials(os_creds_secretid_in_secrets_manager, aws_region)\n", "http_auth = (creds['username'], creds['password'])\n", "docsearch = OpenSearchVectorSearch(index_name=opensearch_index,\n", " embedding_function=create_sagemaker_embeddings_from_js_model(embeddings_model_endpoint_name,\n", " aws_region),\n", " opensearch_url=opensearch_domain_endpoint,\n", " http_auth=http_auth)\n", "q = \"Which XGBoost versions does SageMaker support?\"\n", "docs = docsearch.similarity_search(q, k=3) #, search_type=\"script_scoring\", space_type=\"cosinesimil\"\n", "for doc in docs:\n", " logger.info(\"----------\")\n", " logger.info(f\"content=\\\"{doc.page_content}\\\",\\nmetadata=\\\"{doc.metadata}\\\"\")\n", " " ] }, { "cell_type": "code", "execution_count": null, "id": "22cc4506-c4dc-4b79-8c59-e387f3354ed0", "metadata": { "tags": [] }, "outputs": [], "source": [ "opensearch_domain_endpoint" ] }, { "cell_type": "markdown", "id": "7d5a43f6-ca23-484d-a3c1-c84292c83112", "metadata": {}, "source": [ "---\n", "\n", "## Cleanup\n", "\n", "To avoid incurring future charges, delete the resources. You can do this by deleting the CloudFormation template used to create the IAM role and SageMaker notebook.\n" ] }, { "cell_type": "markdown", "id": "7b0613c5-9567-4767-a260-e6d050349fb7", "metadata": {}, "source": [ "---\n", "\n", "## Conclusion\n", "In this notebook we were able to see how to use LLMs deployed on a SageMaker Endpoint to generate embeddings and then ingest those embeddings into OpenSearch and finally do a similarity search for user input to the documents (embeddings) stored in OpenSearch. We used langchain as an abstraction layer to talk to both the SageMaker Endpoint as well as OpenSearch." ] }, { "cell_type": "code", "execution_count": null, "id": "dd6df6f4-e5b6-45d4-8f2a-5637a8ca2cbe", "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "availableInstances": [ { "_defaultOrder": 0, "_isFastLaunch": true, "category": "General purpose", "gpuNum": 0, "hideHardwareSpecs": false, "memoryGiB": 4, "name": "ml.t3.medium", "vcpuNum": 2 }, { "_defaultOrder": 1, "_isFastLaunch": false, "category": "General purpose", "gpuNum": 0, "hideHardwareSpecs": false, "memoryGiB": 8, "name": "ml.t3.large", "vcpuNum": 2 }, { "_defaultOrder": 2, "_isFastLaunch": false, "category": "General purpose", "gpuNum": 0, "hideHardwareSpecs": false, "memoryGiB": 16, "name": "ml.t3.xlarge", "vcpuNum": 4 }, { "_defaultOrder": 3, "_isFastLaunch": false, "category": "General purpose", "gpuNum": 0, "hideHardwareSpecs": false, "memoryGiB": 32, "name": "ml.t3.2xlarge", "vcpuNum": 8 }, { "_defaultOrder": 4, "_isFastLaunch": true, "category": "General purpose", "gpuNum": 0, "hideHardwareSpecs": false, "memoryGiB": 8, "name": "ml.m5.large", "vcpuNum": 2 }, { "_defaultOrder": 5, "_isFastLaunch": false, "category": "General purpose", "gpuNum": 0, "hideHardwareSpecs": false, "memoryGiB": 16, "name": "ml.m5.xlarge", "vcpuNum": 4 }, { "_defaultOrder": 6, "_isFastLaunch": false, "category": "General purpose", "gpuNum": 0, "hideHardwareSpecs": false, "memoryGiB": 32, "name": "ml.m5.2xlarge", "vcpuNum": 8 }, { "_defaultOrder": 7, "_isFastLaunch": false, "category": "General purpose", "gpuNum": 0, "hideHardwareSpecs": false, "memoryGiB": 64, "name": "ml.m5.4xlarge", "vcpuNum": 16 }, { "_defaultOrder": 8, "_isFastLaunch": false, "category": "General purpose", "gpuNum": 0, "hideHardwareSpecs": false, "memoryGiB": 128, "name": "ml.m5.8xlarge", "vcpuNum": 32 }, { "_defaultOrder": 9, "_isFastLaunch": false, "category": "General purpose", "gpuNum": 0, "hideHardwareSpecs": false, "memoryGiB": 192, "name": "ml.m5.12xlarge", "vcpuNum": 48 }, { "_defaultOrder": 10, "_isFastLaunch": false, "category": "General purpose", "gpuNum": 0, "hideHardwareSpecs": false, "memoryGiB": 256, "name": "ml.m5.16xlarge", "vcpuNum": 64 }, { "_defaultOrder": 11, "_isFastLaunch": false, "category": "General purpose", "gpuNum": 0, "hideHardwareSpecs": false, "memoryGiB": 384, "name": "ml.m5.24xlarge", "vcpuNum": 96 }, { "_defaultOrder": 12, "_isFastLaunch": false, "category": "General purpose", "gpuNum": 0, "hideHardwareSpecs": false, "memoryGiB": 8, "name": "ml.m5d.large", "vcpuNum": 2 }, { "_defaultOrder": 13, "_isFastLaunch": false, "category": "General purpose", "gpuNum": 0, "hideHardwareSpecs": false, "memoryGiB": 16, "name": "ml.m5d.xlarge", "vcpuNum": 4 }, { "_defaultOrder": 14, "_isFastLaunch": false, "category": "General purpose", "gpuNum": 0, "hideHardwareSpecs": false, "memoryGiB": 32, "name": "ml.m5d.2xlarge", "vcpuNum": 8 }, { "_defaultOrder": 15, "_isFastLaunch": false, "category": "General purpose", "gpuNum": 0, "hideHardwareSpecs": false, "memoryGiB": 64, "name": "ml.m5d.4xlarge", "vcpuNum": 16 }, { "_defaultOrder": 16, "_isFastLaunch": false, "category": "General purpose", "gpuNum": 0, "hideHardwareSpecs": false, "memoryGiB": 128, "name": "ml.m5d.8xlarge", "vcpuNum": 32 }, { "_defaultOrder": 17, "_isFastLaunch": false, "category": "General purpose", "gpuNum": 0, "hideHardwareSpecs": false, "memoryGiB": 192, "name": "ml.m5d.12xlarge", "vcpuNum": 48 }, { "_defaultOrder": 18, "_isFastLaunch": false, "category": "General purpose", "gpuNum": 0, "hideHardwareSpecs": false, "memoryGiB": 256, "name": "ml.m5d.16xlarge", "vcpuNum": 64 }, { "_defaultOrder": 19, "_isFastLaunch": false, "category": "General purpose", "gpuNum": 0, "hideHardwareSpecs": false, "memoryGiB": 384, "name": "ml.m5d.24xlarge", "vcpuNum": 96 }, { "_defaultOrder": 20, "_isFastLaunch": false, "category": "General purpose", "gpuNum": 0, "hideHardwareSpecs": true, "memoryGiB": 0, "name": "ml.geospatial.interactive", "supportedImageNames": [ "sagemaker-geospatial-v1-0" ], "vcpuNum": 0 }, { "_defaultOrder": 21, "_isFastLaunch": true, "category": "Compute optimized", "gpuNum": 0, "hideHardwareSpecs": false, "memoryGiB": 4, "name": "ml.c5.large", "vcpuNum": 2 }, { "_defaultOrder": 22, "_isFastLaunch": false, "category": "Compute optimized", "gpuNum": 0, "hideHardwareSpecs": false, "memoryGiB": 8, "name": "ml.c5.xlarge", "vcpuNum": 4 }, { "_defaultOrder": 23, "_isFastLaunch": false, "category": "Compute optimized", "gpuNum": 0, "hideHardwareSpecs": false, "memoryGiB": 16, "name": "ml.c5.2xlarge", "vcpuNum": 8 }, { "_defaultOrder": 24, "_isFastLaunch": false, "category": "Compute optimized", "gpuNum": 0, "hideHardwareSpecs": false, "memoryGiB": 32, "name": "ml.c5.4xlarge", "vcpuNum": 16 }, { "_defaultOrder": 25, "_isFastLaunch": false, "category": "Compute optimized", "gpuNum": 0, "hideHardwareSpecs": false, "memoryGiB": 72, "name": "ml.c5.9xlarge", "vcpuNum": 36 }, { "_defaultOrder": 26, "_isFastLaunch": false, "category": "Compute optimized", "gpuNum": 0, "hideHardwareSpecs": false, "memoryGiB": 96, "name": "ml.c5.12xlarge", "vcpuNum": 48 }, { "_defaultOrder": 27, "_isFastLaunch": false, "category": "Compute optimized", "gpuNum": 0, "hideHardwareSpecs": false, "memoryGiB": 144, "name": "ml.c5.18xlarge", "vcpuNum": 72 }, { "_defaultOrder": 28, "_isFastLaunch": false, "category": "Compute optimized", "gpuNum": 0, "hideHardwareSpecs": false, "memoryGiB": 192, "name": "ml.c5.24xlarge", "vcpuNum": 96 }, { "_defaultOrder": 29, "_isFastLaunch": true, "category": "Accelerated computing", "gpuNum": 1, "hideHardwareSpecs": false, "memoryGiB": 16, "name": "ml.g4dn.xlarge", "vcpuNum": 4 }, { "_defaultOrder": 30, "_isFastLaunch": false, "category": "Accelerated computing", "gpuNum": 1, "hideHardwareSpecs": false, "memoryGiB": 32, "name": "ml.g4dn.2xlarge", "vcpuNum": 8 }, { "_defaultOrder": 31, "_isFastLaunch": false, "category": "Accelerated computing", "gpuNum": 1, "hideHardwareSpecs": false, "memoryGiB": 64, "name": "ml.g4dn.4xlarge", "vcpuNum": 16 }, { "_defaultOrder": 32, "_isFastLaunch": false, "category": "Accelerated computing", "gpuNum": 1, "hideHardwareSpecs": false, "memoryGiB": 128, "name": "ml.g4dn.8xlarge", "vcpuNum": 32 }, { "_defaultOrder": 33, "_isFastLaunch": false, "category": "Accelerated computing", "gpuNum": 4, "hideHardwareSpecs": false, "memoryGiB": 192, "name": "ml.g4dn.12xlarge", "vcpuNum": 48 }, { "_defaultOrder": 34, "_isFastLaunch": false, "category": "Accelerated computing", "gpuNum": 1, "hideHardwareSpecs": false, "memoryGiB": 256, "name": "ml.g4dn.16xlarge", "vcpuNum": 64 }, { "_defaultOrder": 35, "_isFastLaunch": false, "category": "Accelerated computing", "gpuNum": 1, "hideHardwareSpecs": false, "memoryGiB": 61, "name": "ml.p3.2xlarge", "vcpuNum": 8 }, { "_defaultOrder": 36, "_isFastLaunch": false, "category": "Accelerated computing", "gpuNum": 4, "hideHardwareSpecs": false, "memoryGiB": 244, "name": "ml.p3.8xlarge", "vcpuNum": 32 }, { "_defaultOrder": 37, "_isFastLaunch": false, "category": "Accelerated computing", "gpuNum": 8, "hideHardwareSpecs": false, "memoryGiB": 488, "name": "ml.p3.16xlarge", "vcpuNum": 64 }, { "_defaultOrder": 38, "_isFastLaunch": false, "category": "Accelerated computing", "gpuNum": 8, "hideHardwareSpecs": false, "memoryGiB": 768, "name": "ml.p3dn.24xlarge", "vcpuNum": 96 }, { "_defaultOrder": 39, "_isFastLaunch": false, "category": "Memory Optimized", "gpuNum": 0, "hideHardwareSpecs": false, "memoryGiB": 16, "name": "ml.r5.large", "vcpuNum": 2 }, { "_defaultOrder": 40, "_isFastLaunch": false, "category": "Memory Optimized", "gpuNum": 0, "hideHardwareSpecs": false, "memoryGiB": 32, "name": "ml.r5.xlarge", "vcpuNum": 4 }, { "_defaultOrder": 41, "_isFastLaunch": false, "category": "Memory Optimized", "gpuNum": 0, "hideHardwareSpecs": false, "memoryGiB": 64, "name": "ml.r5.2xlarge", "vcpuNum": 8 }, { "_defaultOrder": 42, "_isFastLaunch": false, "category": "Memory Optimized", "gpuNum": 0, "hideHardwareSpecs": false, "memoryGiB": 128, "name": "ml.r5.4xlarge", "vcpuNum": 16 }, { "_defaultOrder": 43, "_isFastLaunch": false, "category": "Memory Optimized", "gpuNum": 0, "hideHardwareSpecs": false, "memoryGiB": 256, "name": "ml.r5.8xlarge", "vcpuNum": 32 }, { "_defaultOrder": 44, "_isFastLaunch": false, "category": "Memory Optimized", "gpuNum": 0, "hideHardwareSpecs": false, "memoryGiB": 384, "name": "ml.r5.12xlarge", "vcpuNum": 48 }, { "_defaultOrder": 45, "_isFastLaunch": false, "category": "Memory Optimized", "gpuNum": 0, "hideHardwareSpecs": false, "memoryGiB": 512, "name": "ml.r5.16xlarge", "vcpuNum": 64 }, { "_defaultOrder": 46, "_isFastLaunch": false, "category": "Memory Optimized", "gpuNum": 0, "hideHardwareSpecs": false, "memoryGiB": 768, "name": "ml.r5.24xlarge", "vcpuNum": 96 }, { "_defaultOrder": 47, "_isFastLaunch": false, "category": "Accelerated computing", "gpuNum": 1, "hideHardwareSpecs": false, "memoryGiB": 16, "name": "ml.g5.xlarge", "vcpuNum": 4 }, { "_defaultOrder": 48, "_isFastLaunch": false, "category": "Accelerated computing", "gpuNum": 1, "hideHardwareSpecs": false, "memoryGiB": 32, "name": "ml.g5.2xlarge", "vcpuNum": 8 }, { "_defaultOrder": 49, "_isFastLaunch": false, "category": "Accelerated computing", "gpuNum": 1, "hideHardwareSpecs": false, "memoryGiB": 64, "name": "ml.g5.4xlarge", "vcpuNum": 16 }, { "_defaultOrder": 50, "_isFastLaunch": false, "category": "Accelerated computing", "gpuNum": 1, "hideHardwareSpecs": false, "memoryGiB": 128, "name": "ml.g5.8xlarge", "vcpuNum": 32 }, { "_defaultOrder": 51, "_isFastLaunch": false, "category": "Accelerated computing", "gpuNum": 1, "hideHardwareSpecs": false, "memoryGiB": 256, "name": "ml.g5.16xlarge", "vcpuNum": 64 }, { "_defaultOrder": 52, "_isFastLaunch": false, "category": "Accelerated computing", "gpuNum": 4, "hideHardwareSpecs": false, "memoryGiB": 192, "name": "ml.g5.12xlarge", "vcpuNum": 48 }, { "_defaultOrder": 53, "_isFastLaunch": false, "category": "Accelerated computing", "gpuNum": 4, "hideHardwareSpecs": false, "memoryGiB": 384, "name": "ml.g5.24xlarge", "vcpuNum": 96 }, { "_defaultOrder": 54, "_isFastLaunch": false, "category": "Accelerated computing", "gpuNum": 8, "hideHardwareSpecs": false, "memoryGiB": 768, "name": "ml.g5.48xlarge", "vcpuNum": 192 } ], "instance_type": "ml.m5.large", "kernelspec": { "display_name": "conda_python3", "language": "python", "name": "conda_python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.8" } }, "nbformat": 4, "nbformat_minor": 5 }