{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Using AWS Lambda and PyWren to find keywords in Common Crawl dataset\n", "\n", "The [Common Crawl](https://aws.amazon.com/public-datasets/common-crawl/) corpus includes web crawl data collected over 8 years. Common Crawl offers the largest, most comprehensive, open repository of web crawl data on the cloud. In this notebook, we are going to use the power of AWS Lambda and pywren to search and compare the popularity of items on the internet.\n", "\n", "### Credits\n", "- [PyWren](https://github.com/pywren/pywren) - Project by BCCI and riselab. Makes it easy to executive massive parallel map queries across [AWS Lambda](https://aws.amazon.com/lambda/)\n", "- [Warcio](https://github.com/webrecorder/warcio) - Streaming WARC/ARC library for fast web archive IO \n", "- [Common Crawl Foundation](http://commoncrawl.org/) - Builds and maintains an open repository of web crawl data that can be accessed and analyzed by anyone." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Step by Step instructions\n", "\n", "### Setup Logging (optional)\n", "Only activate the below lines if you want to see all debug messages from PyWren. _Note: The output will be rather chatty and lengthy._" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import logging\n", "logger = logging.getLogger()\n", "logger.setLevel(logging.INFO)\n", "%env PYWREN_LOGLEVEL=INFO" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's setup all the necessary libraries:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import boto3, botocore, time\n", "import numpy as np\n", "from IPython.display import HTML, display, Image, IFrame\n", "import matplotlib.pyplot as plt\n", "import pywren\n", "import warc_search" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Next we want to identify certain recent crawls datapoints which we want to send to PyWren for further analysis. The Common Crawl dataset is split up into different key naming schemes in an Amazon S3 bucket. More information can be found on the [Getting Started](http://commoncrawl.org/the-data/get-started/) page of Common Crawl. Let's identify some of the folder structure first by using the AWS CLI to list some folders in the Amazon S3 bucket:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!aws s3 ls s3://commoncrawl/crawl-data/CC-MAIN-2017" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Ok, let's drill into some more specific crawls now:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "s3 = boto3.client('s3','us-east-1')\n", "items = s3.list_objects(Bucket = 'commoncrawl', Prefix = 'crawl-data/CC-MAIN-2017-39/segments/1505818685129.23/wet/')\n", "keys = items['Contents']\n", "display(HTML('Amount of WARC files available: ' + str(len(keys)) + ''))\n", "\n", "html = 'Sample links:'\n", "html += ''\n", "for i in keys[:10]:\n", " html += ''\n", " html += ''\n", " html += ''\n", "html += '
' + i['Key'] + ''\n", " html += '' + str(round(i['Size']/1024/1024,2)) + ' MB
'\n", "display(HTML(html))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now let's use PyWren to run through various Web Archive format (WARC) files of recent crawls and look for specific keywords. Given the larger file size of the archives (>100MB) per crawl, we can benefit from the proximity of AWS Lambda and Amazon S3 to achieve a faster processing speed.\n", "\n", "If you want to understand the exact details, explore [warc_search.py](/edit/Lab-2-Common-Crawl/warc_search.py). Here are the relevant code snippets.\n", "```python\n", "dynamo_tbl = boto3.resource('dynamodb').Table('pywren-workshop-common-crawl')\n", "resp = requests.get('https://commoncrawl.s3.amazonaws.com/' + key, stream = True)\n", "for record in ArchiveIterator(resp.raw, arc2warc=True):\n", " if record.content_type == 'text/plain':\n", " webpage_text = record.content_stream().read()\n", " date = record.rec_headers.get_header('WARC-Date')\n", " for search_str in search_array:\n", " if re.search(search_str,webpage_text):\n", " result[search_str]['count'] += 1\n", "for search_str in search_array:\n", " if result[search_str]['count'] > 0:\n", " record={}\n", " record['warc_file']=key\n", " record['search_str']=search_str\n", " record['occurrence']=result[search_str]['count']\n", " response=dynamo_tbl.put_item(Item=record)\n", "```\n", "\n", "Let's first do one crawl on a single WebARC file first: (feel free to change the keywords to your liking)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "keywords = 'Amazon, AWS, Python, Java'\n", "wrenexec = pywren.default_executor()\n", "future = wrenexec.call_async(warc_search.keyword_search, keys[:1][0]['Key'], extra_env = {'KEYWORDS' : keywords})\n", "display(HTML('Time to complete: ' + str(round(future.result(),2)) + ' seconds'))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's have a look at the DynamoDB console for the [pywren-workshop-common](https://us-west-2.console.aws.amazon.com/dynamodb/home?region=us-west-2#tables:selected=pywren-workshop-common-crawl) crawl table and click on items to find our results.\n", "\n", "Now let's load this up into our local Jupyter notebook:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "table = boto3.resource('dynamodb', 'us-west-2').Table('pywren-workshop-common-crawl')\n", "db_table = table.scan(ProjectionExpression='search_str, occurrence')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Time to plot this information out:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "occurrences = {}\n", "for item in db_table['Items']:\n", " if item['search_str'] in occurrences.keys():\n", " occurrences[item['search_str']] += item['occurrence']\n", " else:\n", " occurrences[item['search_str']] = item['occurrence']\n", " \n", "plt.figure(figsize=(10, 5))\n", "plt.title(\"Word frequency across crawl data\")\n", "plt.xlabel(\"Words\")\n", "plt.ylabel(\"Frequency\")\n", "plt.bar(range(len(occurrences)), occurrences.values(), align='center')\n", "plt.xticks(range(len(occurrences)), occurrences.keys())\n", "plt.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's now run the PyWren function over the first 50 Common Crawl dataset in parallel across multiple AWS Lambda functions:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "iterdata = []\n", "for key in keys[:50]:\n", " iterdata.append(key['Key'])\n", " \n", "keywords = 'Amazon, AWS, Python, Java'\n", "wrenexec = pywren.default_executor()\n", "future = wrenexec.map(warc_search.keyword_search, iterdata, extra_env = {'KEYWORDS' : keywords})\n", "t1 = time.time()\n", "pywren_results = pywren.get_all_results(future)\n", "duration = time.time() - t1" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's analyze how long it took us to run over all these large datasets in Amazon S3. As you will see, the total duration is around a minute or less, however our overall total aggregate computation time across all AWS Lambda functions easily exceeds over 15 minutes - this is the power of parallel processing!" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": false }, "outputs": [], "source": [ "display(HTML('Total time for the job: ' + str(round(duration,2)) + ' seconds'))\n", "display(HTML('Average time per Common Crawl file: ' + str(round(np.mean(pywren_results),2)) + ' seconds'))\n", "display(HTML('Total aggregate process time across all AWS Lambda function: ' + str(round(np.sum(pywren_results),2)) + ' seconds'))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's replot our information with the newly analyzed amount of data points:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "table = boto3.resource('dynamodb', 'us-west-2').Table('pywren-workshop-common-crawl')\n", "db_table = table.scan(ProjectionExpression='warc_file, search_str, occurrence')\n", "\n", "occurrences = {}\n", "for item in db_table['Items']:\n", " if item['search_str'] in occurrences.keys():\n", " occurrences[item['search_str']] += item['occurrence']\n", " else:\n", " occurrences[item['search_str']] = item['occurrence']\n", " \n", "plt.figure(figsize=(10, 5))\n", "plt.title(\"Word frequency across crawl data\")\n", "plt.xlabel(\"Words\")\n", "plt.ylabel(\"Frequency\")\n", "plt.bar(range(len(occurrences)), occurrences.values(), align='center')\n", "plt.xticks(range(len(occurrences)), occurrences.keys())\n", "plt.show()\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Next we will use a different function with PyWren to receive the information when a keyword was found including it's URL and send that information straight back to the Jupyter notebook here:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "iterdata = []\n", "for key in keys[:10]:\n", " iterdata.append(key['Key'])\n", " \n", "keywords = 'Amazon, AWS, Python, Java'\n", "wrenexec = pywren.default_executor()\n", "future = wrenexec.map(warc_search.keyword_search_with_URL, iterdata, extra_env = {'KEYWORDS' : keywords})\n", "url_results = pywren.get_all_results(future)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's now try to analyze the different top-level domains that we found and the according keywords:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from datetime import datetime\n", "import tldextract\n", "\n", "data = dict()\n", "for result in url_results:\n", " for key in result.keys():\n", " if key in data.keys():\n", " data[key].extend(result[key])\n", " else:\n", " data[key] = result[key]\n", " \n", "# convert values to time values\n", "url_data = {}\n", "for key in data.keys():\n", " url_data[key] = {}\n", " for item in data[key]:\n", " tld = tldextract.extract(item).suffix\n", " if tld in url_data[key].keys():\n", " url_data[key][tld] += 1\n", " else:\n", " url_data[key][tld] = 1\n", "\n", "# render bar charts\n", "for keyword in url_data.keys():\n", " top20 = dict(sorted(url_data[keyword].iteritems(), key=lambda (k, v): (-v, k))[:20])\n", " x = top20.keys()\n", " frequency = top20.values()\n", " x_pos = [i for i, _ in enumerate(x)]\n", " plt.figure(figsize=(10, 5))\n", " plt.barh(x_pos, frequency, color='green')\n", " plt.ylabel(\"TLD\")\n", " plt.xlabel(\"Frequency\")\n", " plt.title(\"TLD keyword frequency for \" + keyword)\n", " plt.yticks(x_pos, x)\n", " plt.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "That's it. We managed to perform a keyword analysis across a large amount of web crawled data in a massively distributed manner and plotted the results back on our local machine." ] } ], "metadata": { "kernelspec": { "display_name": "Python 2", "language": "python", "name": "python2" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 2 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython2", "version": "2.7.13" } }, "nbformat": 4, "nbformat_minor": 2 }