{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Declare a table with HiveQL from data stored in Amazon S3\n", "\n", "#### Topics covered in this example\n", "* Installation of custom jar from s3." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "***\n", "\n", "## Prerequisites\n", "
\n", "NOTE : In order to execute this notebook successfully as is, please ensure the following prerequisites are completed.
\n", "\n", "* The EMR cluster attached to this notebook should have the `Spark` and `Hive` applications installed.\n", "* This example uses a public dataset, hence the EMR cluster attached to this notebook must have internet connectivity.\n", "* This notebook uses the `Spark` kernel.\n", "***" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Introduction\n", "Ad serving machines produce two types of log files: `impression logs` and `click logs`. Every time we display an advertisement to a customer, we add an entry to the impression log. Every time a customer clicks on an advertisement, we add an entry to the click log.\n", "This example demonstrates how to combine the click and impression logs into a single table that specifies if there was a click for a specific ad and information about that click.\n", "\n", "The log data is stored on s3 in the elasticmapreduce bucket `s3://elasticmapreduce/samples/hive-ads/` and includes subdirectories called `tables/impressions` and `tables/clicks`. The directories contain additional directories named such that we can access the data as a partitioned table within Hive. The naming syntax is `[Partition column]` = `[Partition value]`. For example: dt=2009-04-13-05.\n", "\n", "The post: Contextual Advertising using Apache Hive and Amazon EMR provides detailed information.\n", "****" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Example\n", "\n", "We need to use a `custom Serde` (Serializer-deserializer) to read the impressions and clicks data, which is stored in JSON format. Serdes enables Hive to read data stored in a custom format. Our Serde is stored in a JAR file located in Amazon S3 and we tell Spark about it via the following statement." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%configure -f\n", "{\n", " \"conf\": {\n", " \"spark.jars\": \"s3://elasticmapreduce/samples/hive-ads/libs/jsonserde.jar\"\n", " }\n", "}" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now that our Serde is defined, we can tell Hive about our clicks and impressions data by creating an external table.\n", "\n", "The data for this table resides in Amazon S3. Creating the table is a quick operation because we’re just telling Hive about the existence of the data, not copying it. When we query this table Hive will read the table using Hadoop." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%sql\n", "CREATE EXTERNAL TABLE impressions (\n", " requestBeginTime string, adId string, impressionId string, referrer string, \n", " userAgent string, userCookie string, ip string\n", " )\n", " PARTITIONED BY (dt string)\n", " ROW FORMAT \n", " serde \"com.amazon.elasticmapreduce.JsonSerde\"\n", " with serdeproperties ( \"paths\"=\"requestBeginTime, adId, impressionId, referrer, userAgent, userCookie, ip\" )\n", " LOCATION \"s3://elasticmapreduce/samples/hive-ads/tables/impressions\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The table is partitioned based on time. As yet, Hive doesn’t know which partitions exist in the table. We can tell Hive about the existence of a single partition using the following statement." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%sql\n", "ALTER TABLE impressions ADD PARTITION (dt=\"2009-04-13-08-05\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "If we were to query the table at this point the results would contain data from just this partition. We can instruct Hive to recover all partitions by inspecting the data stored in Amazon S3 using the `RECOVER PARTITIONS` statement." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%sql\n", "ALTER TABLE impressions RECOVER PARTITIONS" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We follow the same process to recover clicks." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%sql\n", "CREATE EXTERNAL TABLE clicks (\n", " impressionId string\n", " )\n", " PARTITIONED BY (dt string)\n", " ROW FORMAT \n", " SERDE \"com.amazon.elasticmapreduce.JsonSerde\"\n", " WITH SERDEPROPERTIES ( \"paths\"=\"impressionId\" )\n", " LOCATION \"s3://elasticmapreduce/samples/hive-ads/tables/clicks\"" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%sql\n", "ALTER TABLE clicks RECOVER PARTITIONS" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Combining the Clicks and Impressions Tables\n", "We want to combine the clicks and impressions tables so that we have a record of whether or not each impression resulted in a click." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "First we create a table called `joined_impressions`" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%sql\n", "CREATE TABLE joined_impressions (\n", " requestBeginTime string, adId string, impressionId string, referrer string, \n", " userAgent string, userCookie string, ip string, clicked Boolean\n", " )\n", " PARTITIONED BY (day string, hour string)\n", " STORED AS SEQUENCEFILE" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This table is partitioned as well. An advantage of partitioning tables stored in Amazon S3 is that if Hive needs only some of the partitions to answer the query then only the data from these partitions will be downloaded from Amazon S3.\n", "\n", "The joined_impressions table is stored in `SEQUENCEFILE` format, which is a native Hadoop file format that is more compressed and has better performance than JSON files.\n", "\n", "Next, we create some temporary tables in the job flow’s local HDFS partition to store intermediate impression and click data." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%sql\n", "CREATE TABLE tmp_impressions (\n", " requestBeginTime string, adId string, impressionId string, referrer string, \n", " userAgent string, userCookie string, ip string\n", " )\n", " STORED AS SEQUENCEFILE" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We insert data from the impressions table for the time duration we’re interested in. Note that because the impressions table is partitioned only the relevant partitions will be read." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%sql\n", "INSERT OVERWRITE TABLE tmp_impressions \n", " SELECT \n", " from_unixtime(cast((cast(i.requestBeginTime as bigint) / 1000) as int)) requestBeginTime, \n", " i.adId, i.impressionId, i.referrer, i.userAgent, i.userCookie, i.ip\n", " FROM \n", " impressions i\n", " WHERE \n", " i.dt >= \"2009-04-13-08-00\" and i.dt < \"2009-04-13-09-00\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "For clicks, we extend the period of time over which we join by 20 minutes. Meaning we accept a click that occurred up to 20 minutes after the impression." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%sql\n", "CREATE TABLE tmp_clicks (\n", " impressionId string\n", " ) STORED AS SEQUENCEFILE" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%sql\n", "INSERT OVERWRITE TABLE tmp_clicks \n", " SELECT \n", " impressionId\n", " FROM \n", " clicks c \n", " WHERE \n", " c.dt >= \"2009-04-13-08-00\" AND c.dt < \"2009-04-13-09-20\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now we combine the impressions and clicks tables using a left outer join. This way any impressions that did not result in a click are preserved. This join also enables us to search for clicks that occurred after the time period. The query also excludes any clicks that did not originate from an impression in the selected time period." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%sql\n", "INSERT OVERWRITE TABLE joined_impressions PARTITION (day=\"2009-04-13\", hour=\"08\")\n", " SELECT \n", " i.requestBeginTime, i.adId, i.impressionId, i.referrer, i.userAgent, i.userCookie, \n", " i.ip, (c.impressionId is not null) clicked\n", " FROM \n", " tmp_impressions i LEFT OUTER JOIN tmp_clicks c ON i.impressionId = c.impressionId" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Because the joined_impressions table is located in Amazon S3 this data is now available for other job flows to use.\n", "\n", "Check the results." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%sql\n", "select requestBeginTime, adId, impressionId, referrer from joined_impressions limit 5" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "***\n", "## Cleanup" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Delete the tables." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%sql\n", "DROP TABLE IF EXISTS impressions" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%sql\n", "DROP TABLE IF EXISTS clicks" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%sql\n", "DROP TABLE IF EXISTS tmp_impressions" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%sql\n", "DROP TABLE IF EXISTS tmp_clicks" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%sql\n", "DROP TABLE IF EXISTS joined_impressions" ] } ], "metadata": { "kernelspec": { "display_name": "Spark", "language": "", "name": "sparkkernel" }, "language_info": { "codemirror_mode": "text/x-scala", "mimetype": "text/x-scala", "name": "scala", "pygments_lexer": "scala" } }, "nbformat": 4, "nbformat_minor": 4 }