from typing import Callable, Dict, List, Optional, Union, Tuple from pathlib import Path from bs4 import BeautifulSoup from sentence_transformers import SentenceTransformer from opensearchpy import OpenSearch, RequestsHttpConnection from haystack.nodes.retriever import EmbeddingRetriever from haystack.document_stores import OpenSearchDocumentStore from urllib.parse import urlparse, unquote from os.path import splitext, basename import requests import io import gzip import tarfile import zipfile import argparse import json import sys import os from rfc3986_validator import validate_rfc3986 from haystack.nodes.file_converter import BaseConverter, DocxToTextConverter, PDFToTextConverter, TextConverter, MarkdownConverter from haystack.schema import Document from haystack.utils import clean_wiki_text import logging DEFAULT_DOCS_DIR = "awsdocs/data" logger = logging.getLogger(__name__) parser = argparse.ArgumentParser( prog='AWS Semantic Search Ingestion', description='Ingests documents into Amazon OpenSearch index.', epilog='Made with ❤️ at AWS') parser.add_argument('--src', type=str, help='Directory or URL where documents are located', default=DEFAULT_DOCS_DIR) parser.add_argument('--index_name', type=str, help='Amazon OpenSearch index name', default="awsdocs") # Add markdown conversion # Licensed under Apache-2.0 license from deepset-ai haystack # https://github.com/deepset-ai/haystack/blob/ba30971d8d77827da9d2c81d82f7d02bf1917d8c/haystack/utils/preprocessing.py def convert_files_to_docs( dir_path: str, clean_func: Optional[Callable] = None, split_paragraphs: bool = False, encoding: Optional[str] = None, id_hash_keys: Optional[List[str]] = None, ) -> List[Document]: """ Convert all files(.txt, .pdf, .docx) in the sub-directories of the given path to Documents that can be written to a Document Store. :param dir_path: The path of the directory containing the Files. :param clean_func: A custom cleaning function that gets applied to each Document (input: str, output: str). :param split_paragraphs: Whether to split text by paragraph. :param encoding: Character encoding to use when converting pdf documents. :param id_hash_keys: A list of Document attribute names from which the Document ID should be hashed from. Useful for generating unique IDs even if the Document contents are identical. To ensure you don't have duplicate Documents in your Document Store if texts are not unique, you can modify the metadata and pass [`"content"`, `"meta"`] to this field. If you do this, the Document ID will be generated by using the content and the defined metadata. """ file_paths = [p for p in Path(dir_path).glob("**/*")] allowed_suffixes = [".pdf", ".txt", ".docx", ".md"] suffix2converter: Dict[str, BaseConverter] = {} suffix2paths: Dict[str, List[Path]] = {} for path in file_paths: file_suffix = path.suffix.lower() if file_suffix in allowed_suffixes: if file_suffix not in suffix2paths: suffix2paths[file_suffix] = [] suffix2paths[file_suffix].append(path) elif not path.is_dir(): logger.warning( "Skipped file {0} as type {1} is not supported here. " "See haystack.file_converter for support of more file types".format(path, file_suffix) ) # No need to initialize converter if file type not present for file_suffix in suffix2paths.keys(): if file_suffix == ".pdf": suffix2converter[file_suffix] = PDFToTextConverter() if file_suffix == ".txt": suffix2converter[file_suffix] = TextConverter() if file_suffix == ".docx": suffix2converter[file_suffix] = DocxToTextConverter() if file_suffix == ".md": suffix2converter[file_suffix] = MarkdownConverter() documents = [] for suffix, paths in suffix2paths.items(): for path in paths: logger.info("Converting {}".format(path)) # PDFToTextConverter, TextConverter, and DocxToTextConverter return a list containing a single Document document = suffix2converter[suffix].convert( file_path=path, meta=None, encoding=encoding, id_hash_keys=id_hash_keys )[0] text = document.content if clean_func: text = clean_func(text) if split_paragraphs: for para in text.split("\n\n"): if not para.strip(): # skip empty paragraphs continue documents.append(Document(content=para, meta={"name": path.name}, id_hash_keys=id_hash_keys)) else: documents.append(Document(content=text, meta={"name": path.name}, id_hash_keys=id_hash_keys)) return documents # Enable downloading archive from Amazon S3 presigned url and other urls that contain data such as query parameters after the file extension. # Licensed under Apache-2.0 license from deepset-ai haystack # https://github.com/deepset-ai/haystack/blob/ba30971d8d77827da9d2c81d82f7d02bf1917d8c/haystack/utils/import_utils.py def fetch_archive_from_http( url: str, output_dir: str, proxies: Optional[Dict[str, str]] = None, timeout: Union[float, Tuple[float, float]] = 10.0, ) -> bool: """ Fetch an archive (zip, gz or tar.gz) from a url via http and extract content to an output directory. :param url: http address :param output_dir: local path :param proxies: proxies details as required by requests library :param timeout: How many seconds to wait for the server to send data before giving up, as a float, or a :ref:`(connect timeout, read timeout) ` tuple. Defaults to 10 seconds. :return: if anything got fetched """ # verify & prepare local directory path = Path(output_dir) if not path.exists(): path.mkdir(parents=True) is_not_empty = len(list(Path(path).rglob("*"))) > 0 if is_not_empty: logger.info("Found data stored in '%s'. Delete this first if you really want to fetch new data.", output_dir) return False else: logger.info("Fetching from %s to '%s'", url, output_dir) parsed = urlparse(url) root, extension = splitext(parsed.path) archive_extension = extension[1:] request_data = requests.get(url, proxies=proxies, timeout=timeout) if archive_extension == "zip": zip_archive = zipfile.ZipFile(io.BytesIO(request_data.content)) zip_archive.extractall(output_dir) elif archive_extension == "gz" and not "tar.gz" in url: gzip_archive = gzip.GzipFile(fileobj=io.BytesIO(request_data.content)) file_content = gzip_archive.read() file_name = unquote(basename(root[1:])) with open(f"{output_dir}/{file_name}", "wb") as file: file.write(file_content) elif archive_extension in ["gz", "bz2", "xz"]: tar_archive = tarfile.open(fileobj=io.BytesIO(request_data.content), mode="r|*") tar_archive.extractall(output_dir) else: logger.warning( "Skipped url %s as file type is not supported here. " "See haystack documentation for support of more file types", url, ) return True host = os.environ['OPENSEARCH_HOST'] password = os.environ['OPENSEARCH_PASSWORD'] args = parser.parse_args() docs_src = args.src index_name = args.index_name #if len(sys.argv)>1: # doc_dir_aws = sys.argv[1] #try: # is_url = validators.url(docs_src) #except validators.ValidationFailure: # is_url = False if validate_rfc3986(docs_src): fetch_archive_from_http(url=docs_src, output_dir=DEFAULT_DOCS_DIR) doc_dir_aws = DEFAULT_DOCS_DIR else: doc_dir_aws = docs_src print(f"doc_dir_aws {doc_dir_aws}") print(f"index_name {index_name}") document_store = OpenSearchDocumentStore( host = host, port = 443, username = 'admin', password = password, scheme = 'https', verify_certs = False, similarity='cosine' ) dicts_aws = convert_files_to_docs(dir_path=doc_dir_aws, clean_func=clean_wiki_text, split_paragraphs=True) path = Path(doc_dir_aws) # Let's have a look at the first 3 entries: print("First 3 documents to be ingested") print(dicts_aws[:3]) print(f"Starting Ingestion, Documents: {len(dicts_aws)}") # Now, let's write the dicts containing documents to our DB. document_store.write_documents(dicts_aws, index=index_name) print(f"Finished Ingestion, Documents: {len(dicts_aws)}") print(f"Started Update Embeddings, Documents: {len(dicts_aws)}") # Calculate and store a dense embedding for each document retriever = EmbeddingRetriever( document_store=document_store, model_format = "sentence_transformers", embedding_model = "sentence-transformers/all-mpnet-base-v2" ) document_store.update_embeddings( retriever=retriever, index=index_name ) print(f"Finished Update Embeddings, Documents: {len(dicts_aws)}")