import boto3
import requests
import html2text
from typing import List
import re
import logging
import json
import traceback
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def find_http_urls_in_parentheses(s: str, prefix: str = None):
pattern = r'\((https?://[^)]+)\)'
urls = re.findall(pattern, s)
matched = []
if prefix is not None:
for url in urls:
if str(url).startswith(prefix):
matched.append(url)
else:
matched = urls
return list(set(matched)) # remove duplicates by converting to set, then convert back to list
class EZWebLoader:
def __init__(self, default_header: str = None):
self._html_to_text_parser = html2text
if default_header is None:
self._default_header = {"User-agent":"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/47.0.2526.80 Safari/537.36"}
else:
self._default_header = default_header
def load_data(self,
urls: List[str],
num_levels: int = 0,
level_prefix: str = None,
headers: str = None) -> List[str]:
logging.info(f"Number of urls: {len(urls)}.")
if headers is None:
headers = self._default_header
documents = []
visited = {}
for url in urls:
q = [url]
depth = num_levels
for page in q:
if page not in visited: #prevent cycles by checking to see if we already crawled a link
logging.info(f"Crawling {page}")
visited[page] = True #add entry to visited to prevent re-crawling pages
response = requests.get(page, headers=headers).text
response = self._html_to_text_parser.html2text(response) #reduce html to text
documents.append(response)
if depth > 0:
#crawl linked pages
ingest_urls = find_http_urls_in_parentheses(response, level_prefix)
logging.info(f"Found {len(ingest_urls)} pages to crawl.")
q.extend(ingest_urls)
depth -= 1 #reduce the depth counter so we go only num_levels deep in our crawl
else:
logging.info(f"Skipping {page} as it has already been crawled")
logging.info(f"Number of documents: {len(documents)}.")
return documents
ACCOUNT_ID = boto3.client('sts').get_caller_identity().get('Account')
S3_BUCKET = "lexgenaistack-source-materials-bucket-" + ACCOUNT_ID
FILE_NAME = 'web-crawl-results.txt'
def handler(event, context):
url = "http://www.zappos.com/general-questions"
depth = 1
level_prefix = "https://www.zappos.com/"
if event is not None:
if "url" in event:
url = event["url"]
if "depth" in event:
depth = int(event["depth"])
if "level_prefix" in event:
level_prefix = event["level_prefix"]
# crawl the website
try:
logger.info(f"Crawling {url} to depth of {depth}...")
loader = EZWebLoader()
documents = loader.load_data([url], depth, level_prefix)
doc_string = json.dumps(documents, indent=1)
logger.info(f"Crawling {url} to depth of {depth} succeeded")
except Exception as e:
# If there's an error, print the error message
logging.error(f"An error occurred during the crawl of {url}.")
exception_traceback = traceback.format_exc()
logger.error(exception_traceback)
return {
"status": 500,
"message": exception_traceback
}
# save the results for indexing
try:
# Use the S3 client to write the string to S3
s3 = boto3.client('s3')
s3.put_object(Body=doc_string, Bucket=S3_BUCKET, Key=FILE_NAME)
success_msg = f'Successfully put {FILE_NAME} to {S3_BUCKET}'
logging.info(success_msg)
return {
"status": 200,
"message": success_msg
}
except Exception as e:
# If there's an error, print the error message
exception_traceback = traceback.format_exc()
logger.error(exception_traceback)
return {
"status": 500,
"message": exception_traceback
}