{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.\n", "#SPDX-License-Identifier: MIT-0" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!pip install requests_aws4auth\n", "!pip install elasticsearch\n", "!pip install nltk\n", "!pip install jsonlines\n", "!pip install pandarallel\n", "!pip install --upgrade grpcio \n", "!pip install --upgrade s3fs" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import os \n", "from requests_aws4auth import AWS4Auth\n", "from elasticsearch import Elasticsearch, RequestsHttpConnection\n", "import logging\n", "import json\n", "import ast\n", "import numpy as np\n", "import pandas as pd\n", "\n", "import boto3\n", "import sagemaker\n", "import nltk\n", "\n", "from search_utils import helpers, search_preprocessing" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Set up a few parameters\n", "logging.basicConfig(level=logging.WARNING)\n", "logger = logging.getLogger()\n", "logger.setLevel(logging.WARNING)\n", "\n", "s3_client = boto3.client('s3')\n", "\n", "#Creating a sagemaker session\n", "sagemaker_session = sagemaker.Session()\n", "\n", "#We'll be using the sagemaker default bucket\n", "#Feel free to change this to another bucket name and make sure it's the same across all four notebooks\n", "bucket_name = sagemaker_session.default_bucket()\n", "\n", "#Copy the glove_job_name, this was generated automatically in step 3 of the training notebook\n", "glove_job_name = \"\"\n", "\n", "#Copy the training_job_name, this was generated automatically in step 4 of the training notebook\n", "training_job_name = \"\"\n", "\n", "#This is the region in which you deployed the elasticsearch cluster\n", "region = \"\"\n", "\n", "#This is host name of the elasticsearch cluster you deployed\n", "host = \"\"\n", "\n", "#Feel free to change index name\n", "es_index = \"knn-test\"\n", "\n", "#If you didn't change the endpoint_name this will be the same value\n", "endpoint_name = \"object2vec-embeddings\"" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#Connect to Elasticsearch\n", "service = 'es'\n", "credentials = boto3.Session().get_credentials()\n", "awsauth = AWS4Auth(credentials.access_key, credentials.secret_key,\n", " region, service, session_token=credentials.token)\n", "\n", "headers = {\"Content-Type\": \"application/json\"}\n", "\n", "es = Elasticsearch(\n", " hosts=[{'host': host, 'port': 443}],\n", " http_auth=awsauth,\n", " use_ssl=True,\n", " verify_certs=True,\n", " connection_class=RequestsHttpConnection,\n", " timeout=60\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# 1.Create and load data to Elasticsearch index " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def create_index(index):\n", " \"\"\"\n", " This function will create an index using knn settings\n", " \"\"\"\n", " if not es.indices.exists(index=index):\n", " index_settings = {\n", " \"settings\": {\n", " \"index.knn\": True,\n", " \"index.mapping.total_fields.limit\": \"2000\"\n", " },\n", " \"mappings\": {\n", " \"properties\": {\n", " \"embeddings\": {\n", " \"type\": \"knn_vector\",\n", " \"dimension\": 512\n", " }\n", " }\n", " }\n", " }\n", "\n", " es.indices.create(index=index, body=json.dumps(index_settings))\n", " print(\"Created the elasticsearch index successufly \")\n", " else:\n", " print(\"elasticsearch index already exists\")\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#Create the index using knn settings\n", "create_index(es_index)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "# You can check if the index is created within your es cluster\n", "es.indices.get_alias(\"*\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def ingest_data_into_es(event):\n", " \n", " loaded_keys = []\n", " \n", " bucket = event['bucket']\n", " key = event['key']\n", "\n", " loaded_keys += [key]\n", "\n", " obj = s3_client.get_object(Bucket=bucket, Key=key)\n", "\n", " records = json.loads(obj['Body'].read().decode('utf-8'))\n", "\n", "\n", " count = 0\n", " lost_records = 0\n", "\n", " for record in records:\n", " # Get the primary key for use as the Elasticsearch ID\n", " record_id = record['id']\n", "\n", " try:\n", " if 'embeddings' in record:\n", " record['embeddings'] = ast.literal_eval(record['embeddings'])\n", "\n", " es.index(index=es_index, id=record_id, doc_type='_doc', body=record)\n", " \n", " count += 1\n", " except Exception as error:\n", " logger.error(f\"An error {error} for record {record}\")\n", " lost_records += 1\n", "\n", " \n", " logger.info(\n", " f'{lost_records} out of {len(records)} are lost records')\n", "\n", " logger.info(\n", " f'{count} out of {len(records)} records has been processed')\n", "\n", " return {\n", " 'statusCode': 200,\n", " 'body': json.dumps(str(count) + ' records processed.')\n", " }\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "event={'bucket':bucket_name,'key':'search_knn_blog/data/enriched_data/data.json'}" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#This will take 1-2 minutes\n", "response = ingest_data_into_es(event)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#Check that data is indeed in ES\n", "res = es.search(index=es_index, body={\n", " \"query\": {\n", " \"match_all\": {}\n", " }},\n", " size=10)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "res" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# 2. Generate embeddings from the query" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Each time a user makes a query, we will created embeddings of this query using the SageMaker Endpoint. Using that embedding we will make a search API call using the knn functionality to get the most relevant results." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.predictor import json_serializer, json_deserializer\n", "\n", "predictor = sagemaker.predictor.RealTimePredictor(endpoint_name)\n", "predictor.content_type = 'application/json'\n", "predictor.serializer = json_serializer\n", "predictor.deserializer = json_deserializer\n", "tokenizer = nltk.tokenize.TreebankWordTokenizer()\n", "\n", "word_to_id = helpers.read_json_from_s3(bucket_name,\\\n", " f'search_knn_blog/sagemaker-runs/{glove_job_name}/vocab.json')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "query = \"office\"\n", "page_size = 50" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "enc_description = search_preprocessing.sentence_to_integers(query, tokenizer, word_to_id)\n", "\n", "if len(enc_description) != 0:\n", " payload = {\"instances\" : [{\"in0\": enc_description}]}\n", " result = predictor.predict(payload)\n", " query_embeddings = result[\"predictions\"][0][\"embeddings\"]\n", " norm = np.sqrt(np.sum(np.square(query_embeddings)))\n", " query_embeddings = query_embeddings/norm\n", " \n", "print(query_embeddings.shape)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# A function to parse elasticsearch results and transform to pandas dataframe\n", "def res_to_df(res):\n", " list_results = []\n", " for hit in res[\"hits\"][\"hits\"]:\n", " list_results.append(hit[\"_source\"])\n", " df_results = pd.DataFrame(list_results)\n", " \n", " return df_results" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# 3. Make a simple search query" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "es_query ={\n", " \"query\": {\n", " \"multi_match\": {\n", " \"query\": query,\n", " \"fuzziness\": \"auto\",\n", " \"fields\": ['product_title','product_category']\n", " }\n", " }}" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#Check that data is indeed in ES\n", "res = es.search(index=es_index, body=es_query, size=page_size)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "res_to_df(res)[[\"id\",\"product_category\",\"product_title\"]]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# 4. Make a k-nn based query " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "es_query ={\n", " \"query\": {\n", " \"knn\": {\n", " \"embeddings\": {\n", " \"vector\": query_embeddings,\n", " \"k\": 5\n", " }\n", " }\n", " }\n", "\n", "}" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "res = es.search(index=es_index, body=es_query, size=page_size)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "res_to_df(res)[[\"id\",\"product_category\",\"product_title\"]]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# 5. Combine both approaches and experiment" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "es_query = {\n", " \"query\": {\n", " \"bool\": {\n", " \"should\": [\n", " {\n", " \"function_score\": {\n", " \"query\": {\n", " \"multi_match\": {\n", " \"query\": query,\n", " \"fuzziness\": \"auto\",\n", " \"fields\": ['product_title','product_category']\n", " }\n", " },\n", " \"boost\": 0.1\n", " }\n", " },\n", " {\n", " \"function_score\": {\n", " \"query\":{\n", " \"knn\":{\n", " \"embeddings\" :{\n", " \"vector\": query_embeddings,\n", " \"k\": page_size\n", " }\n", " }\n", " },\n", " \"boost\": 2\n", " }\n", " }\n", " ]\n", " }\n", " },\n", " \"size\": page_size\n", "}" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "res = es.search(index=es_index, body = es_query, size=page_size)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "res_to_df(res)[[\"id\",\"product_category\",\"product_title\"]]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Please note that the relevance of the results will depend on the query you use as well as other parameters (boost values, k size, etc). The above examples serve merely as a guidance on how query Elasticsearch using key-word, k-nn or a mix of both." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# 6. Cleaning up" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Make sure you remove any resources you don't need, this can include SageMaker endpoints, Elasticsearch clustet etc." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# 7.Conclusion\n", "\n", "Throughout these four notebooks, we saw how to process data, train an Object2Vec model using Amazon SageMaker. We then created a live Endpoint to perform predictions and generate embeddings for all catalog products. Finally we loaded this information to a Elasticsearch index and discovered multiple ways to perform queries.\n", "\n" ] } ], "metadata": { "kernelspec": { "display_name": "conda_tensorflow_p36", "language": "python", "name": "conda_tensorflow_p36" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.10" } }, "nbformat": 4, "nbformat_minor": 4 }