In [None]:
#Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
#SPDX-License-Identifier: MIT-0

In [None]:
!pip install requests_aws4auth
!pip install elasticsearch
!pip install nltk
!pip install jsonlines
!pip install pandarallel
!pip install --upgrade grpcio 
!pip install --upgrade s3fs

In [None]:
import os 
from requests_aws4auth import AWS4Auth
from elasticsearch import Elasticsearch, RequestsHttpConnection
import logging
import json
import ast
import numpy as np
import pandas as pd

import boto3
import sagemaker
import nltk

from search_utils import helpers, search_preprocessing

In [None]:
# Set up a few parameters
logging.basicConfig(level=logging.WARNING)
logger = logging.getLogger()
logger.setLevel(logging.WARNING)

s3_client = boto3.client('s3')

#Creating a sagemaker session
sagemaker_session = sagemaker.Session()

#We'll be using the sagemaker default bucket
#Feel free to change this to another bucket name and make sure it's the same across all four notebooks
bucket_name = sagemaker_session.default_bucket()

#Copy the glove_job_name, this was generated automatically in step 3 of the training notebook
glove_job_name = ""

#Copy the training_job_name, this was generated automatically in step 4 of the training notebook
training_job_name = ""

#This is the region in which you deployed the elasticsearch cluster
region = ""

#This is host name of the elasticsearch cluster you deployed
host = ""

#Feel free to change index name
es_index = "knn-test"

#If you didn't change the endpoint_name this will be the same value
endpoint_name = "object2vec-embeddings"

In [None]:
#Connect to Elasticsearch
service = 'es'
credentials = boto3.Session().get_credentials()
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key,
 region, service, session_token=credentials.token)

headers = {"Content-Type": "application/json"}

es = Elasticsearch(
 hosts=[{'host': host, 'port': 443}],
 http_auth=awsauth,
 use_ssl=True,
 verify_certs=True,
 connection_class=RequestsHttpConnection,
 timeout=60
)

# 1.Create and load data to Elasticsearch index 

In [None]:
def create_index(index):
 """
 This function will create an index using knn settings
 """
 if not es.indices.exists(index=index):
 index_settings = {
 "settings": {
 "index.knn": True,
 "index.mapping.total_fields.limit": "2000"
 },
 "mappings": {
 "properties": {
 "embeddings": {
 "type": "knn_vector",
 "dimension": 512
 }
 }
 }
 }

 es.indices.create(index=index, body=json.dumps(index_settings))
 print("Created the elasticsearch index successufly ")
 else:
 print("elasticsearch index already exists")


In [None]:
#Create the index using knn settings
create_index(es_index)

In [None]:
# You can check if the index is created within your es cluster
es.indices.get_alias("*")

In [None]:
def ingest_data_into_es(event):
 
 loaded_keys = []
 
 bucket = event['bucket']
 key = event['key']

 loaded_keys += [key]

 obj = s3_client.get_object(Bucket=bucket, Key=key)

 records = json.loads(obj['Body'].read().decode('utf-8'))


 count = 0
 lost_records = 0

 for record in records:
 # Get the primary key for use as the Elasticsearch ID
 record_id = record['id']

 try:
 if 'embeddings' in record:
 record['embeddings'] = ast.literal_eval(record['embeddings'])

 es.index(index=es_index, id=record_id, doc_type='_doc', body=record)
 
 count += 1
 except Exception as error:
 logger.error(f"An error {error} for record {record}")
 lost_records += 1

 
 logger.info(
 f'{lost_records} out of {len(records)} are lost records')

 logger.info(
 f'{count} out of {len(records)} records has been processed')

 return {
 'statusCode': 200,
 'body': json.dumps(str(count) + ' records processed.')
 }


In [None]:
event={'bucket':bucket_name,'key':'search_knn_blog/data/enriched_data/data.json'}

In [None]:
#This will take 1-2 minutes
response = ingest_data_into_es(event)

In [None]:
#Check that data is indeed in ES
res = es.search(index=es_index, body={
 "query": {
 "match_all": {}
 }},
 size=10)

In [None]:
res

# 2. Generate embeddings from the query

Each time a user makes a query, we will created embeddings of this query using the SageMaker Endpoint. Using that embedding we will make a search API call using the knn functionality to get the most relevant results.

In [None]:
from sagemaker.predictor import json_serializer, json_deserializer

predictor = sagemaker.predictor.RealTimePredictor(endpoint_name)
predictor.content_type = 'application/json'
predictor.serializer = json_serializer
predictor.deserializer = json_deserializer
tokenizer = nltk.tokenize.TreebankWordTokenizer()

word_to_id = helpers.read_json_from_s3(bucket_name,\
 f'search_knn_blog/sagemaker-runs/{glove_job_name}/vocab.json')

In [None]:
query = "office"
page_size = 50

In [None]:
enc_description = search_preprocessing.sentence_to_integers(query, tokenizer, word_to_id)

if len(enc_description) != 0:
 payload = {"instances" : [{"in0": enc_description}]}
 result = predictor.predict(payload)
 query_embeddings = result["predictions"][0]["embeddings"]
 norm = np.sqrt(np.sum(np.square(query_embeddings)))
 query_embeddings = query_embeddings/norm
 
print(query_embeddings.shape)

In [None]:
# A function to parse elasticsearch results and transform to pandas dataframe
def res_to_df(res):
 list_results = []
 for hit in res["hits"]["hits"]:
 list_results.append(hit["_source"])
 df_results = pd.DataFrame(list_results)
 
 return df_results

# 3. Make a simple search query

In [None]:
es_query ={
 "query": {
 "multi_match": {
 "query": query,
 "fuzziness": "auto",
 "fields": ['product_title','product_category']
 }
 }}

In [None]:
#Check that data is indeed in ES
res = es.search(index=es_index, body=es_query, size=page_size)

In [None]:
res_to_df(res)[["id","product_category","product_title"]]

# 4. Make a k-nn based query 

In [None]:
es_query ={
 "query": {
 "knn": {
 "embeddings": {
 "vector": query_embeddings,
 "k": 5
 }
 }
 }

}

In [None]:
res = es.search(index=es_index, body=es_query, size=page_size)

In [None]:
res_to_df(res)[["id","product_category","product_title"]]

# 5. Combine both approaches and experiment

In [None]:
es_query = {
 "query": {
 "bool": {
 "should": [
 {
 "function_score": {
 "query": {
 "multi_match": {
 "query": query,
 "fuzziness": "auto",
 "fields": ['product_title','product_category']
 }
 },
 "boost": 0.1
 }
 },
 {
 "function_score": {
 "query":{
 "knn":{
 "embeddings" :{
 "vector": query_embeddings,
 "k": page_size
 }
 }
 },
 "boost": 2
 }
 }
 ]
 }
 },
 "size": page_size
}

In [None]:
res = es.search(index=es_index, body = es_query, size=page_size)

In [None]:
res_to_df(res)[["id","product_category","product_title"]]

Please note that the relevance of the results will depend on the query you use as well as other parameters (boost values, k size, etc). The above examples serve merely as a guidance on how query Elasticsearch using key-word, k-nn or a mix of both.

# 6. Cleaning up

Make sure you remove any resources you don't need, this can include SageMaker endpoints, Elasticsearch clustet etc.

# 7.Conclusion

Throughout these four notebooks, we saw how to process data, train an Object2Vec model using Amazon SageMaker. We then created a live Endpoint to perform predictions and generate embeddings for all catalog products. Finally we loaded this information to a Elasticsearch index and discovered multiple ways to perform queries.

