# Feature Creation
<font color='grey'>_Author @Shaheer Mansoor_</font><br>
__Kernel__: pyspark


## Notebook outline

In this notebook we will load data from S3, create features and store them in Delta Lake (on S3) and in Keyspaces.

This notebook is intended to be run as an EMR Notebook
***

## 1. Setup depenencies

In order to use Delta Lake and spark-cassandra connector in our spark session we need to define these dependencies in our configuration. Later when we initialize a spark session, these dependncies will be loaded in session.

- To read more about the spark-cassandra connector visit their [github project](https://github.com/datastax/spark-cassandra-connector)
- To read more about Delta Lake project visit their [Project page](https://docs.delta.io/latest/delta-intro.html)

***

In [1]:
%%configure -f
{ "conf":{
        "spark.jars.packages": "com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.0.0,io.delta:delta-core_2.12:0.7.0",
        "spark.sql.extensions": "com.datastax.spark.connector.CassandraSparkExtensions,io.delta.sql.DeltaSparkSessionExtension",
        "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.delta.catalog.DeltaCatalog",
        "spark.cassandra.connection.config.profile.path": "file:/home/hadoop/app.config"
         }
}

ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
5,application_1630411267894_0007,pyspark,killed,Link,Link,


# 2. Load UK energy data from S3

We will load the UK energy data into a spark data frame. Update the value of __s3_uri__ to point to the bucket where your data resides. We will also cache the data so spark keeps the dataset in memory while we process it.
***

In [2]:
import traceback
import os
import sys
import json
import socket
import logging
from time import perf_counter
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from datetime import datetime, date, time, timedelta
from pyspark.sql.types import DoubleType
from pyspark.sql import Window, DataFrame
from pyspark.sql.functions import *
import pyspark.sql.functions as F
from pyspark.sql.functions import lit, col
from pyspark.ml.feature import Imputer 
from datetime import date
from dateutil.relativedelta import relativedelta

s3_uri = "s3://YOUR-BUCKET-NAME-HERE/Dataset/daily_dataset.csv"

print("Loading Data")
df = (
    spark
    .read.format("com.databricks.spark.csv")
    .option("header", "true")
    .option("inferschema", "true")
    .load(s3_uri)
    )

df.cache()
print("Records Read: {0:,}".format( df.count() ))

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
7,application_1630411267894_0009,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Loading Data
Records Read: 3,510,433

# 3. Adjust Date column schema

The day column in the dataset is loaded as a string so we we will create a new column "day_date" of type date so we can use it to calulate features over different time windows. 

Run the cells below to view how the schema of the loaded data looks like and how to convert it to date type.
***

In [3]:
df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- household_id: string (nullable = true)
 |-- day: string (nullable = true)
 |-- energy_median: double (nullable = true)
 |-- energy_mean: double (nullable = true)
 |-- energy_max: double (nullable = true)
 |-- energy_count: integer (nullable = true)
 |-- energy_std: double (nullable = true)
 |-- energy_sum: double (nullable = true)
 |-- energy_min: double (nullable = true)

In [4]:
df = df.withColumn('day_date', F.to_date('day', 'yyyy-MM-dd'))
df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- household_id: string (nullable = true)
 |-- day: string (nullable = true)
 |-- energy_median: double (nullable = true)
 |-- energy_mean: double (nullable = true)
 |-- energy_max: double (nullable = true)
 |-- energy_count: integer (nullable = true)
 |-- energy_std: double (nullable = true)
 |-- energy_sum: double (nullable = true)
 |-- energy_min: double (nullable = true)
 |-- day_date: date (nullable = true)

# 4. Create Features

We create features on our dataset over different time periods. The features we create are defined as:

- __energy_sum_3months__:
- __energy_sum_6months__:
- __energy_sum_1yr__:
- __energy_count_3months__:
- __energy_count_6months__:
- __energy_count_1yr__:
- __energy_max_3months__:
- __energy_max_6months__:
- __energy_max_1yr__:
- __energy_mean_3months__:
- __energy_mean_6months__:
- __energy_mean_1yr__:
- __energy_stddev_3months__:
- __energy_stddev_6months__:
- __energy_stddev_1yr__:

In [5]:
window = Window.partitionBy("household_id").orderBy(F.col('day_date').desc())

df = df.withColumn("energy_sum_3months", sum(when(df.day_date \
                                                             >= (date(2014,2,28) - relativedelta(months=3))\
                                                        , col("energy_sum")).otherwise(0))\
                                                  .over(window))

df = df.withColumn("energy_sum_6months", sum(when(df.day_date \
                                                             >= (date(2014,2,28) - relativedelta(months=6))\
                                                        , col("energy_sum")).otherwise(0))\
                                                  .over(window))

df = df.withColumn("energy_sum_1yr", sum(when(df.day_date \
                                                             >= (date(2014,2,28) - relativedelta(months=12))\
                                                        , col("energy_sum")).otherwise(0))\
                                                  .over(window))
#------
# Count
#------

df = df.withColumn("energy_count_3months", sum(when(df.day_date \
                                                             >= (date(2014,2,28) - relativedelta(months=3))\
                                                        , col("energy_count")).otherwise(0))\
                                                  .over(window))

df = df.withColumn("energy_count_6months", sum(when(df.day_date \
                                                             >= (date(2014,2,28) - relativedelta(months=6))\
                                                        , col("energy_count")).otherwise(0))\
                                                  .over(window))

df = df.withColumn("energy_count_1yr", sum(when(df.day_date \
                                                             >= (date(2014,2,28) - relativedelta(months=12))\
                                                        , col("energy_count")).otherwise(0))\
                                                  .over(window))

#------
# Max
#------

df = df.withColumn("energy_max_3months", max(when(df.day_date \
                                                             >= (date(2014,2,28) - relativedelta(months=3))\
                                                        , col("energy_max")).otherwise(0))\
                                                  .over(window))

df = df.withColumn("energy_max_6months", max(when(df.day_date \
                                                             >= (date(2014,2,28) - relativedelta(months=6))\
                                                        , col("energy_max")).otherwise(0))\
                                                  .over(window))

df = df.withColumn("energy_max_1yr", max(when(df.day_date \
                                                             >= (date(2014,2,28) - relativedelta(months=12))\
                                                        , col("energy_max")).otherwise(0))\
                                                  .over(window))

#------
# Mean
#------

df = df.withColumn("energy_mean_3months", avg(when(df.day_date \
                                                             >= (date(2014,2,28) - relativedelta(months=3))\
                                                        , col("energy_mean")).otherwise(0))\
                                                  .over(window))

df = df.withColumn("energy_mean_6months", avg(when(df.day_date \
                                                             >= (date(2014,2,28) - relativedelta(months=6))\
                                                        , col("energy_mean")).otherwise(0))\
                                                  .over(window))

df = df.withColumn("energy_mean_1yr", avg(when(df.day_date \
                                                             >= (date(2014,2,28) - relativedelta(months=12))\
                                                        , col("energy_mean")).otherwise(0))\
                                                  .over(window))


#------
# Stddev
#------

df = df.withColumn("energy_stddev_3months", stddev(when(df.day_date \
                                                             >= (date(2014,2,28) - relativedelta(months=3))\
                                                        , col("energy_sum")).otherwise(0))\
                                                  .over(window))

df = df.withColumn("energy_stddev_6months", stddev(when(df.day_date \
                                                             >= (date(2014,2,28) - relativedelta(months=6))\
                                                        , col("energy_sum")).otherwise(0))\
                                                  .over(window))

df = df.withColumn("energy_stddev_1yr", stddev(when(df.day_date \
                                                             >= (date(2014,2,28) - relativedelta(months=12))\
                                                        , col("energy_sum")).otherwise(0))\
                                                  .over(window))

df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- household_id: string (nullable = true)
 |-- day: string (nullable = true)
 |-- energy_median: double (nullable = true)
 |-- energy_mean: double (nullable = true)
 |-- energy_max: double (nullable = true)
 |-- energy_count: integer (nullable = true)
 |-- energy_std: double (nullable = true)
 |-- energy_sum: double (nullable = true)
 |-- energy_min: double (nullable = true)
 |-- day_date: date (nullable = true)
 |-- energy_sum_3months: double (nullable = true)
 |-- energy_sum_6months: double (nullable = true)
 |-- energy_sum_1yr: double (nullable = true)
 |-- energy_count_3months: long (nullable = true)
 |-- energy_count_6months: long (nullable = true)
 |-- energy_count_1yr: long (nullable = true)
 |-- energy_max_3months: double (nullable = true)
 |-- energy_max_6months: double (nullable = true)
 |-- energy_max_1yr: double (nullable = true)
 |-- energy_mean_3months: double (nullable = true)
 |-- energy_mean_6months: double (nullable = true)
 |-- energy_mean_1yr: double (nullabl

# 5. Update column name and order, replace null values

- We will update the column name __household_id__ to __id__ for simplicity.
- Next We need to order the columns in the dataframe in the same order Keyspaces table expects them to arrive in. This can be done by running a __<code>selectExpr</code>__ function. 
- Spark <code>Null</code> is not compatible with Keyspaces <code>Null</code> type so we use the __<code>fillna</code>__ function to replace all null values in the dataframe with 0.

***

In [6]:
df = df.selectExpr('household_id as id','day_date','energy_median','energy_mean','energy_max','energy_count','energy_std',\
               'energy_sum','energy_min','energy_sum_3months','energy_sum_6months','energy_sum_1yr',\
               'energy_count_3months','energy_count_6months','energy_count_1yr','energy_max_3months',\
               'energy_max_6months','energy_max_1yr','energy_mean_3months','energy_mean_6months','energy_mean_1yr',\
               'energy_stddev_3months','energy_stddev_6months','energy_stddev_1yr').fillna(0)

print("Records in Feature Dataset: {0:,}".format(df.count()))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Records in Feature Dataset: 3,510,433

# 6. Write Feature Data to Delta Lake

Next we will write the features to Delta Lake on an S3 location. You should set the variable __s3_delta_lake_uri__ the location where you want to write the Delta lake table

***

In [7]:
s3_delta_lake_uri = "s3://{your-bucket-name-here}/delta_table/uk_energy_features"

df.write.format("delta")\
        .mode("overwrite")\
        .partitionBy('day_date')\
        .save(s3_delta_lake_uri)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# 7. Write Feature Data to Keyspaces

Now we will write the dataframe to Keyspaces. Spark writes individual partitions to Keyspaces.
When starting with a new table capacity mode set to on-demand which is a flexible option
capable of serving thousands of requests per second without capacity planning. Keyspaces on-
demand offers pay-per-request pricing for read and write requests so that you pay only for what
you use. Keyspaces tables using on-demand capacity mode automatically adapt to your
application's traffic volume. However, tables using the on-demand mode might still throttle. You
might experience throttling if you exceed double your previous traffic peak within 30 minutes.
It's a best practice to spread your traffic growth over at least 30 minutes before exceeding double
your previous traffic peak. To overcome this we half the number of partitions in our dataframe if
a write jobs fails. We continue doing that till we have 1 partition left.
Another solution to writing more partitions at once is to change the capacity mode for the table
from on-demand to provisioned. You can Switch Capacity Modes in order to optimize cost and
performance


Additionally, we created energy_data_features table with compound primary key, that we
can use to query and return sorted results. id as partition key and day_date column WITH
CLUSTERING ORDER BY in descending order.

***

In [8]:
def save_dataset(
    df: DataFrame, 
    keyspace_name: str = 'feature_store', 
    table_name: str = 'energy_data_features'
):

    num_partitions = 2
    while num_partitions >= 1:
        
        print("Current Partitions: {0:,}".format(num_partitions))  

        try:

            df.coalesce(num_partitions).write.format("org.apache.spark.sql.cassandra")\
                                       .mode("append")\
                                       .option("keyspace", keyspace_name)\
                                       .option("table", table_name)\
                                       .save()
            print("Dataframe saved in Keyspaces")
            return
        except Exception as e:
            print(
                f"Throttled saving {keyspace_name}.{table_name} with {num_partitions} partitions",
                e,
            )

        num_partitions //= 2
        
    print(
        f"Unable to save to {keyspace_name}.{table_name} despite repartitioning, "
    )
    raise Exception(
        f"Unable to save to {keyspace_name}.{table_name} despite repartitioning"
    )    

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Now we execute the function. This can take betweem __1-2 minutes__ to finish

In [None]:
save_dataset(df)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…