This notebook is build to run with `PySpark(SparkMagic)` kernel.

# Data Preperation and Feature Engineering

## Configuration and Setup
### Connect to cluster in Amazon SageMaker Studio
This notebook is designed to be run in SageMaker Studio. Please refer to https://docs.aws.amazon.com/sagemaker/latest/dg/studio-notebooks-emr-cluster-connect.html for general instruction on how to connect the notebook to a running EMR cluster. 

After following the steps, it will generate an active cell to connect to Amazon EMR cluster. The cell's content look like this:

``
%load_ext sagemaker_studio_analytics_extension.magics
%sm_analytics emr connect --cluster-id j-XXXXXXXXXXX-auth-type None 
``

Running the cell sets up the connection to EMR. You can reuse this code as long as you still use the same cluster.

If you create a new cluster, you should remove this cell and connect the notebook with the cluster again. It will generate a new cell with new connection code.


In the following we enable virtualenv in the Spark session to be later able to install packages to the already running EMR cluster. See also https://aws.amazon.com/blogs/big-data/install-python-libraries-on-a-running-cluster-with-emr-notebooks/.

In [None]:
%%configure -f
{
 "conf": {
 "spark.jars.packages":"ml.combust.mleap:mleap-spark_2.12:0.19.0,ml.combust.mleap:mleap-spark-base_2.12:0.19.0,ml.combust.mleap:mleap-spark_2.12:0.19.0",
 "spark.jars.excludes":"net.sourceforge.f2j:arpack_combined_all",
 "spark.pyspark.python": "python3",
 "spark.pyspark.virtualenv.enabled": "true",
 "spark.pyspark.virtualenv.type":"native",
 "spark.pyspark.virtualenv.bin.path":"/usr/bin/virtualenv"
 }
}


### Debug Configuration
This notebook contains code which is useful to better understand the actual flow or debug the code if you apply your own changes. Some of those addtional steps have a lengthy runtime. Hence they are guarded with a flag:

In [None]:
DEBUG = False # set to True to have detailed output

### Set input and output pathes

In [None]:
import os
import boto3
import json
import subprocess

emr_instance_info = subprocess.run(["curl","-s","169.254.169.254/latest/dynamic/instance-identity/document"], \
 capture_output=True).stdout.decode('UTF-8')
emr_region = json.loads(emr_instance_info)["region"]

session = boto3.Session(region_name=emr_region) 
ssm = session.client('ssm')
s3_client = boto3.client("s3")

bucket = ssm.get_parameter(Name="/aik/data-bucket")["Parameter"]["Value"]

bid_source = ssm.get_parameter(Name="/aik/bid_source")["Parameter"]["Value"]
imp_source = ssm.get_parameter(Name="/aik/imp_source")["Parameter"]["Value"]

output_train = ssm.get_parameter(Name="/aik/output_train")["Parameter"]["Value"]
output_test = ssm.get_parameter(Name="/aik/output_test")["Parameter"]["Value"]
output_verify = ssm.get_parameter(Name="/aik/output_verify")["Parameter"]["Value"] 
output_transformed = ssm.get_parameter(Name="/aik/output_verify")["Parameter"]["Value"] 
inference_data = ssm.get_parameter(Name="/aik/inference_data")["Parameter"]["Value"] 
pipelineModelArtifactPath = ssm.get_parameter(Name="/aik/pipelineModelArtifactPath")["Parameter"]["Value"] 

In [None]:
print(f'bucket={bucket}')
print(f'bid_source={bid_source}')
print(f'imp_source={imp_source}')
print(f'output_train={output_train}')
print(f'output_verify={output_verify}')
print(f'output_test={output_test}')
print(f'inference_data={inference_data}')
print(f'pipelineModelArtifactPath={pipelineModelArtifactPath}')

## Prepare Bidding Data

### Schema

In [None]:
bid_columns = [
 "BidID", 
 "Timestamp", 
 "iPinYouID",
 "UserAgent",
 "IP", 
 "RegionID", 
 "CityID", 
 "AdExchange",
 "Domain", 
 "URL",
 "AnonymousURL",
 "AdSlotID",
 "AdSlotWidth",
 "AdSlotHeight",
 "AdSlotVisibility",
 "AdSlotFormat",
 "AdSlotFloorPrice",
 "CreativeID",
 "BiddingPrice",
 "AdvertiserID", # V
 "UserProfileIDs"
]

bid_schema = ""
for col in bid_columns:
 if bid_schema != "":
 bid_schema += ", "
 if col == 'Region ID':
 bid_schema += f"`{col}` long"
 elif col == 'City ID':
 bid_schema += f"`{col}` long"
 else:
 bid_schema += f"`{col}` string"

### Read data

In [None]:
print(bid_source)
bid_df = spark.read.option("delimiter", "\t").format("csv").load(
 bid_source,
 inferSchema=False,
 header=False,
 schema=bid_schema)

bid_df.show(2)


In [None]:
if DEBUG:
 bid_df.select("AdvertiserID").show(3)

### Transform dataframe
We are only going to use a subset of the available data. Also we are not going to work with a timestamp, but want to break it down into day of the week and hour of the day as this will provide better insights into the underlying patterns. Therefore we transform the data frame now.

In [None]:
bid_df.createOrReplaceTempView("bid_table")
df1 = spark.sql("SELECT \
 BidID, dayofweek(concat(substring(Timestamp, 1, 4), '-', substring(Timestamp, 5, 2), '-', substring(Timestamp, 7, 2))) AS dow, \
 substring(Timestamp, 9, 2) AS hour, RegionID, CityID, Domain, AdvertiserID \
 FROM bid_table GROUP BY BidID, Timestamp, RegionID, CityID, Domain, AdvertiserID")
df1.show(3)

## Prepare Impression Data

### Schema

In [None]:
imp_columns = ["BidID", # V
"Timestamp",
"LogType", 
"iPinYouID",
"UserAgent",
"IP",
"RegionID", 
"CityID", 
"AdExchange",
"Domain",
"URL",
"AnonymousURL",
"AdSlotID",
"AdSlotWidth",
"AdSlotHeight",
"AdSlotVisibility",
"AdSlotFormat",
"AdSlotFloorPrice",
"CreativeID",
"BiddingPrice",
"PayingPrice",
"LandingPageURL",
"AdvertiserID",
"UserProfileIDs"]

imp_schema = ""
for col in imp_columns:
 if imp_schema != "":
 imp_schema += ", "
 if col == 'BiddingPrice':
 imp_schema += f"`{col}` long"
 elif col == 'PayingPrice':
 imp_schema += f"`{col}` long"
 else:
 imp_schema += f"`{col}` string"

### Read Data

In [None]:
imp_df = spark.read.option("delimiter", "\t").format("csv").load(
 imp_source,
 inferSchema=False,
 header=False,
 schema=imp_schema)

imp_df.show(2)

### Transform DataFrame

Again we are transforming the data so that we can use it easier. At this time we only drop columns we are not going to use further.

In [None]:
imp_df.createOrReplaceTempView("imp_table")
df2 = spark.sql("SELECT \
 BidID, BiddingPrice, PayingPrice, UserAgent \
 FROM imp_table GROUP BY BidID, BiddingPrice, PayingPrice, UserAgent")

df2.show(3)

## Merge Data Frames

So far we have been working with the bidding and impression data in individual data frames. Now we are going to join them into one single data frame.

Let's have a look into the schema of the bidding data:

In [None]:
if DEBUG:
 df1.show(5)
 df1.printSchema()

Let's have a look into the schema of the impression data:

In [None]:
if DEBUG:
 df2.show(5)
 df2.printSchema()

We can see that both dataframes have a common column, which is the BidID. Therefore we are going to use it to joing the dataframes. Later we will use the impression data to identify successful bids. This is an assumption as in theory bidrequests could have been successful but not lead to an impression. However we have no data to drill this down. You might want to keep this in mind as your own data might cover this better.

In [None]:
df = df1.join(df2,"BidID" ,"left")

Let's have a look into the resulting dataframe: 

In [None]:
if DEBUG:df.show(5)


## Encode Features

In the following we will encode our features so that we can use them for the model training. Here we are using [mleap](https://combust.github.io/mleap-docs/) to searialize feature transformation logic. We are exporting the features transformation Pipeline to a MLeap bundle to run the required encoding steps. We will need the pipeline to apply the same feature encoding for the inference part of the solution.

We also have some more complex steps to be executed. Those we will not implement in the same approach, but use a dedicated library for them. This will be explained later in the notebook.

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer

categoricalCols = ["AdvertiserID", "Domain", "RegionID", "CityID"]

stringindexers = [StringIndexer(
 inputCol=col,
 outputCol="Index" + col) for col in categoricalCols]

pipeline = Pipeline(stages = stringindexers)

In [None]:
for indexer in stringindexers:
 indexer.setHandleInvalid("keep")

In [None]:
df.printSchema()

In [None]:
pipelineModel = pipeline.fit(df)


df.write.parquet(output_transformed, mode="overwrite")
df.write.json(inference_data, mode="overwrite")
#transform the dataframe

df = pipelineModel.transform(df)
df.printSchema()
df.show(3)


Finally we store the Pipeline Model locally.

In [None]:
import mleap.pyspark

from mleap.pyspark.spark_support import SimpleSparkSerializer
pipelineModel.serializeToBundle("jar:file:/tmp/pipelineModel.zip", df)

Upload pipeline model to the S3 bucket

In [None]:
from urllib.parse import urlparse

parsed_pipeline_model_path = urlparse(pipelineModelArtifactPath, allow_fragments=False)
s3_client.upload_file("/tmp/pipelineModel.zip", parsed_pipeline_model_path.netloc, parsed_pipeline_model_path.path.lstrip('/'))


## Engineer features from useragent

Now as we have engineered the main features, we are taking on to understand what features we can derive from the useragent field we have in the raw data. Actually it turns out that the type of the device a user is using is important to determine if a bid request is likely to be successful. Actually we can derive the device type from the user agent information. Instead of building this on our own, we are using one of the multiple libraries which are actually delevering this functionality. For the purpose of this kit, we have choosen Project https://github.com/woothee/woothee. One of the drivers is the availabilty for different programming langugages, including python, which we are using for the model training and java, which we are using for the inference.

In [None]:
import woothee
import numpy as np
from random import random

def parse_ua_to_device_type(user_agent_str):
 ua = woothee.parse(user_agent_str)
 category = ua['category']
 if category =='smartphone':
 return 0
 if category =="mobilephone":
 return 1
 if category =="appliance":
 return 2
 if category =="pc":
 return 3
 if category =="crawler":
 return 4
 if category =="misc":
 return 5
 return int(random()*10)%6 # missing value imputed with random device_type_id

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType, StringType
from pyspark.sql.functions import when
from pyspark.sql.functions import col

In [None]:
cols_parse_ua_to_device_type = udf(parse_ua_to_device_type, IntegerType())

In [None]:
df_new = df.select(["dow", "hour", "UserAgent", "PayingPrice", "IndexAdvertiserID", "IndexDomain", "IndexRegionID", "IndexCityID"])\
 .withColumn('UserAgent', when(col('UserAgent').isNull(), "").otherwise(col('UserAgent')))\
 .withColumn("hour", df["hour"].cast(IntegerType()))\
 .withColumn("device_type_id", cols_parse_ua_to_device_type("UserAgent"))\
 .withColumn('label', when(col('PayingPrice').isNull(), 0).otherwise(1))\
 .drop("UserAgent").drop("PayingPrice")

In [None]:
df_new.show(5)

In [None]:
df.printSchema()

In [None]:
if DEBUG:
 df_new.groupBy('device_type_id').count().show()

### Sanity checks for null values

We quickly check if there are null values within the `device_type_id`column as those would generate problems later on.

In [None]:
if DEBUG:
 df_new.filter(df_new['device_type_id'].isNull()).count()

### Generate training, validation and test data sets.

First of all we are making sure that the label column is the first column as this is expected for the training later on.

In [None]:
# reorder columns label as first column
#df_new = df_new.select(["label", "bid_table.BidID", "dow", "hour", "IndexAdvertiserID", "IndexDomain", "IndexRegionID", "IndexCityID", "device_type_id"])
df_new = df_new.select(["label", "dow", "hour", "IndexAdvertiserID", "IndexDomain", "IndexRegionID", "IndexCityID", "device_type_id"])

In [None]:
df_new.show(1)

Now we are going to shuffle the dataset prior to splitting the data in the three different sets.

In [None]:
from pyspark.sql.functions import col, rand
df_new_shuffled = df_new.orderBy(rand())

In [None]:
df_new_shuffled.show(5)

In [None]:
if DEBUG:
 df_new.count()
 df_new_shuffled.groupBy('label').count().show()
 df.groupBy('PayingPrice').count().show()

Finally we are going to split the dataset into 80% training, 10% validation and 10% testing.

In [None]:
splits = df_new_shuffled.randomSplit([0.8, 0.1, 0.1], 42)

In [None]:
# Check the counts
if DEBUG:
 splits[0].count(), splits[1].count(), splits[2].count()

### Save to S3 in parquet format

In [None]:
splits[0].write.parquet(output_train, mode="overwrite")

In [None]:
splits[1].write.parquet(output_verify, mode="overwrite")

In [None]:
splits[2].write.parquet(output_test, mode="overwrite")