# Declare a table with HiveQL from data stored in Amazon S3

#### Topics covered in this example
* Installation of custom jar from s3.

***

## Prerequisites
<div class="alert alert-block alert-info">
<b>NOTE :</b> In order to execute this notebook successfully as is, please ensure the following prerequisites are completed.</div>

* The EMR cluster attached to this notebook should have the `Spark` and `Hive` applications installed.
* This example uses a public dataset, hence the EMR cluster attached to this notebook must have internet connectivity.
* This notebook uses the `Spark` kernel.
***

## Introduction
Ad serving machines produce two types of log files: `impression logs` and `click logs`. Every time we display an advertisement to a customer, we add an entry to the impression log. Every time a customer clicks on an advertisement, we add an entry to the click log.
This example demonstrates how to combine the click and impression logs into a single table that specifies if there was a click for a specific ad and information about that click.

The log data is stored on s3 in the elasticmapreduce bucket `s3://elasticmapreduce/samples/hive-ads/` and includes subdirectories called `tables/impressions` and `tables/clicks`. The directories contain additional directories named such that we can access the data as a partitioned table within Hive. The naming syntax is `[Partition column]` = `[Partition value]`. For example: dt=2009-04-13-05.

The post: <a href="https://aws.amazon.com/articles/contextual-advertising-using-apache-hive-and-amazon-emr/" target="_blank">Contextual Advertising using Apache Hive and Amazon EMR</a> provides detailed information.
****

## Example

We need to use a `custom Serde` (Serializer-deserializer) to read the impressions and clicks data, which is stored in JSON format. Serdes enables Hive to read data stored in a custom format. Our Serde is stored in a JAR file located in Amazon S3 and we tell Spark about it via the following statement.

In [None]:
%%configure -f
{
    "conf": {
        "spark.jars": "s3://elasticmapreduce/samples/hive-ads/libs/jsonserde.jar"
    }
}

Now that our Serde is defined, we can tell Hive about our clicks and impressions data by creating an external table.

The data for this table resides in Amazon S3. Creating the table is a quick operation because we’re just telling Hive about the existence of the data, not copying it. When we query this table Hive will read the table using Hadoop.

In [None]:
%%sql
CREATE EXTERNAL TABLE impressions (
    requestBeginTime string, adId string, impressionId string, referrer string, 
    userAgent string, userCookie string, ip string
  )
  PARTITIONED BY (dt string)
  ROW FORMAT 
    serde "com.amazon.elasticmapreduce.JsonSerde"
    with serdeproperties ( "paths"="requestBeginTime, adId, impressionId, referrer, userAgent, userCookie, ip" )
  LOCATION "s3://elasticmapreduce/samples/hive-ads/tables/impressions"

The table is partitioned based on time. As yet, Hive doesn’t know which partitions exist in the table. We can tell Hive about the existence of a single partition using the following statement.

In [None]:
%%sql
ALTER TABLE impressions ADD PARTITION (dt="2009-04-13-08-05")

If we were to query the table at this point the results would contain data from just this partition. We can instruct Hive to recover all partitions by inspecting the data stored in Amazon S3 using the `RECOVER PARTITIONS` statement.

In [None]:
%%sql
ALTER TABLE impressions RECOVER PARTITIONS

We follow the same process to recover clicks.

In [None]:
%%sql
CREATE EXTERNAL TABLE clicks (
    impressionId string
  )
  PARTITIONED BY (dt string)
  ROW FORMAT 
    SERDE "com.amazon.elasticmapreduce.JsonSerde"
    WITH SERDEPROPERTIES ( "paths"="impressionId" )
  LOCATION "s3://elasticmapreduce/samples/hive-ads/tables/clicks"

In [None]:
%%sql
ALTER TABLE clicks RECOVER PARTITIONS

### Combining the Clicks and Impressions Tables
We want to combine the clicks and impressions tables so that we have a record of whether or not each impression resulted in a click.

First we create a table called `joined_impressions`

In [None]:
%%sql
CREATE TABLE joined_impressions (
    requestBeginTime string, adId string, impressionId string, referrer string, 
      userAgent string, userCookie string, ip string, clicked Boolean
    )
    PARTITIONED BY (day string, hour string)
    STORED AS SEQUENCEFILE

This table is partitioned as well. An advantage of partitioning tables stored in Amazon S3 is that if Hive needs only some of the partitions to answer the query then only the data from these partitions will be downloaded from Amazon S3.

The joined_impressions table is stored in `SEQUENCEFILE` format, which is a native Hadoop file format that is more compressed and has better performance than JSON files.

Next, we create some temporary tables in the job flow’s local HDFS partition to store intermediate impression and click data.

In [None]:
%%sql
CREATE TABLE tmp_impressions (
    requestBeginTime string, adId string, impressionId string, referrer string, 
    userAgent string, userCookie string, ip string
  )
  STORED AS SEQUENCEFILE

We insert data from the impressions table for the time duration we’re interested in. Note that because the impressions table is partitioned only the relevant partitions will be read.

In [None]:
%%sql
INSERT OVERWRITE TABLE tmp_impressions 
    SELECT 
      from_unixtime(cast((cast(i.requestBeginTime as bigint) / 1000) as int)) requestBeginTime, 
      i.adId, i.impressionId, i.referrer, i.userAgent, i.userCookie, i.ip
    FROM 
      impressions i
    WHERE 
      i.dt >= "2009-04-13-08-00" and i.dt < "2009-04-13-09-00"

For clicks, we extend the period of time over which we join by 20 minutes. Meaning we accept a click that occurred up to 20 minutes after the impression.

In [None]:
%%sql
CREATE TABLE tmp_clicks (
    impressionId string
  ) STORED AS SEQUENCEFILE

In [None]:
%%sql
INSERT OVERWRITE TABLE tmp_clicks 
    SELECT 
      impressionId
    FROM 
      clicks c  
    WHERE 
      c.dt >= "2009-04-13-08-00" AND c.dt < "2009-04-13-09-20"

Now we combine the impressions and clicks tables using a left outer join. This way any impressions that did not result in a click are preserved. This join also enables us to search for clicks that occurred after the time period. The query also excludes any clicks that did not originate from an impression in the selected time period.

In [None]:
%%sql
INSERT OVERWRITE TABLE joined_impressions PARTITION (day="2009-04-13", hour="08")
  SELECT 
    i.requestBeginTime, i.adId, i.impressionId, i.referrer, i.userAgent, i.userCookie, 
    i.ip, (c.impressionId is not null) clicked
  FROM 
    tmp_impressions i LEFT OUTER JOIN tmp_clicks c ON i.impressionId = c.impressionId

Because the joined_impressions table is located in Amazon S3 this data is now available for other job flows to use.

Check the results.

In [None]:
%%sql
select requestBeginTime, adId, impressionId, referrer from joined_impressions limit 5

***
## Cleanup

Delete the tables.

In [None]:
%%sql
DROP TABLE IF EXISTS impressions

In [None]:
%%sql
DROP TABLE IF EXISTS clicks

In [None]:
%%sql
DROP TABLE IF EXISTS tmp_impressions

In [None]:
%%sql
DROP TABLE IF EXISTS tmp_clicks

In [None]:
%%sql
DROP TABLE IF EXISTS joined_impressions