{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Feature Creation\n", "_Author @Shaheer Mansoor_
\n", "__Kernel__: pyspark\n", "\n", "\n", "## Notebook outline\n", "\n", "In this notebook we will load data from S3, create features and store them in Delta Lake (on S3) and in Keyspaces.\n", "\n", "This notebook is intended to be run as an EMR Notebook\n", "***" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 1. Setup depenencies\n", "\n", "In order to use Delta Lake and spark-cassandra connector in our spark session we need to define these dependencies in our configuration. Later when we initialize a spark session, these dependncies will be loaded in session.\n", "\n", "- To read more about the spark-cassandra connector visit their [github project](https://github.com/datastax/spark-cassandra-connector)\n", "- To read more about Delta Lake project visit their [Project page](https://docs.delta.io/latest/delta-intro.html)\n", "\n", "***" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "data": { "text/html": [ "Current session configs: {'conf': {'spark.jars.packages': 'com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.0.0,io.delta:delta-core_2.12:0.7.0', 'spark.sql.extensions': 'com.datastax.spark.connector.CassandraSparkExtensions,io.delta.sql.DeltaSparkSessionExtension', 'spark.sql.catalog.spark_catalog': 'org.apache.spark.sql.delta.catalog.DeltaCatalog', 'spark.cassandra.connection.config.profile.path': 'file:/home/hadoop/app.config'}, 'kind': 'pyspark'}
" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/html": [ "\n", "
IDYARN Application IDKindStateSpark UIDriver logCurrent session?
5application_1630411267894_0007pysparkkilledLinkLink
" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "%%configure -f\n", "{ \"conf\":{\n", " \"spark.jars.packages\": \"com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.0.0,io.delta:delta-core_2.12:0.7.0\",\n", " \"spark.sql.extensions\": \"com.datastax.spark.connector.CassandraSparkExtensions,io.delta.sql.DeltaSparkSessionExtension\",\n", " \"spark.sql.catalog.spark_catalog\": \"org.apache.spark.sql.delta.catalog.DeltaCatalog\",\n", " \"spark.cassandra.connection.config.profile.path\": \"file:/home/hadoop/app.config\"\n", " }\n", "}" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# 2. Load UK energy data from S3\n", "\n", "We will load the UK energy data into a spark data frame. Update the value of __s3_uri__ to point to the bucket where your data resides. We will also cache the data so spark keeps the dataset in memory while we process it.\n", "***" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "b9bc715a9bd84e7ca1b0d80ff801cf1e", "version_major": 2, "version_minor": 0 }, "text/plain": [ "VBox()" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "Starting Spark application\n" ] }, { "data": { "text/html": [ "\n", "
IDYARN Application IDKindStateSpark UIDriver logCurrent session?
7application_1630411267894_0009pysparkidleLinkLink
" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "SparkSession available as 'spark'.\n" ] }, { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "Loading Data\n", "Records Read: 3,510,433" ] } ], "source": [ "import traceback\n", "import os\n", "import sys\n", "import json\n", "import socket\n", "import logging\n", "from time import perf_counter\n", "from pyspark.context import SparkContext\n", "from pyspark.sql.session import SparkSession\n", "from datetime import datetime, date, time, timedelta\n", "from pyspark.sql.types import DoubleType\n", "from pyspark.sql import Window, DataFrame\n", "from pyspark.sql.functions import *\n", "import pyspark.sql.functions as F\n", "from pyspark.sql.functions import lit, col\n", "from pyspark.ml.feature import Imputer \n", "from datetime import date\n", "from dateutil.relativedelta import relativedelta\n", "\n", "s3_uri = \"s3://YOUR-BUCKET-NAME-HERE/Dataset/daily_dataset.csv\"\n", "\n", "print(\"Loading Data\")\n", "df = (\n", " spark\n", " .read.format(\"com.databricks.spark.csv\")\n", " .option(\"header\", \"true\")\n", " .option(\"inferschema\", \"true\")\n", " .load(s3_uri)\n", " )\n", "\n", "df.cache()\n", "print(\"Records Read: {0:,}\".format( df.count() ))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# 3. Adjust Date column schema\n", "\n", "The day column in the dataset is loaded as a string so we we will create a new column \"day_date\" of type date so we can use it to calulate features over different time windows. \n", "\n", "Run the cells below to view how the schema of the loaded data looks like and how to convert it to date type.\n", "***" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "4e57be1881af44ed94f14dfbd563a100", "version_major": 2, "version_minor": 0 }, "text/plain": [ "VBox()" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- household_id: string (nullable = true)\n", " |-- day: string (nullable = true)\n", " |-- energy_median: double (nullable = true)\n", " |-- energy_mean: double (nullable = true)\n", " |-- energy_max: double (nullable = true)\n", " |-- energy_count: integer (nullable = true)\n", " |-- energy_std: double (nullable = true)\n", " |-- energy_sum: double (nullable = true)\n", " |-- energy_min: double (nullable = true)" ] } ], "source": [ "df.printSchema()" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "610285d841d14c52b11d81a11dc9ee9a", "version_major": 2, "version_minor": 0 }, "text/plain": [ "VBox()" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- household_id: string (nullable = true)\n", " |-- day: string (nullable = true)\n", " |-- energy_median: double (nullable = true)\n", " |-- energy_mean: double (nullable = true)\n", " |-- energy_max: double (nullable = true)\n", " |-- energy_count: integer (nullable = true)\n", " |-- energy_std: double (nullable = true)\n", " |-- energy_sum: double (nullable = true)\n", " |-- energy_min: double (nullable = true)\n", " |-- day_date: date (nullable = true)" ] } ], "source": [ "df = df.withColumn('day_date', F.to_date('day', 'yyyy-MM-dd'))\n", "df.printSchema()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# 4. Create Features\n", "\n", "We create features on our dataset over different time periods. The features we create are defined as:\n", "\n", "- __energy_sum_3months__:\n", "- __energy_sum_6months__:\n", "- __energy_sum_1yr__:\n", "- __energy_count_3months__:\n", "- __energy_count_6months__:\n", "- __energy_count_1yr__:\n", "- __energy_max_3months__:\n", "- __energy_max_6months__:\n", "- __energy_max_1yr__:\n", "- __energy_mean_3months__:\n", "- __energy_mean_6months__:\n", "- __energy_mean_1yr__:\n", "- __energy_stddev_3months__:\n", "- __energy_stddev_6months__:\n", "- __energy_stddev_1yr__:" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "2eee14b76a944313a0b0a3ee60a99c7a", "version_major": 2, "version_minor": 0 }, "text/plain": [ "VBox()" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- household_id: string (nullable = true)\n", " |-- day: string (nullable = true)\n", " |-- energy_median: double (nullable = true)\n", " |-- energy_mean: double (nullable = true)\n", " |-- energy_max: double (nullable = true)\n", " |-- energy_count: integer (nullable = true)\n", " |-- energy_std: double (nullable = true)\n", " |-- energy_sum: double (nullable = true)\n", " |-- energy_min: double (nullable = true)\n", " |-- day_date: date (nullable = true)\n", " |-- energy_sum_3months: double (nullable = true)\n", " |-- energy_sum_6months: double (nullable = true)\n", " |-- energy_sum_1yr: double (nullable = true)\n", " |-- energy_count_3months: long (nullable = true)\n", " |-- energy_count_6months: long (nullable = true)\n", " |-- energy_count_1yr: long (nullable = true)\n", " |-- energy_max_3months: double (nullable = true)\n", " |-- energy_max_6months: double (nullable = true)\n", " |-- energy_max_1yr: double (nullable = true)\n", " |-- energy_mean_3months: double (nullable = true)\n", " |-- energy_mean_6months: double (nullable = true)\n", " |-- energy_mean_1yr: double (nullable = true)\n", " |-- energy_stddev_3months: double (nullable = true)\n", " |-- energy_stddev_6months: double (nullable = true)\n", " |-- energy_stddev_1yr: double (nullable = true)" ] } ], "source": [ "window = Window.partitionBy(\"household_id\").orderBy(F.col('day_date').desc())\n", "\n", "df = df.withColumn(\"energy_sum_3months\", sum(when(df.day_date \\\n", " >= (date(2014,2,28) - relativedelta(months=3))\\\n", " , col(\"energy_sum\")).otherwise(0))\\\n", " .over(window))\n", "\n", "df = df.withColumn(\"energy_sum_6months\", sum(when(df.day_date \\\n", " >= (date(2014,2,28) - relativedelta(months=6))\\\n", " , col(\"energy_sum\")).otherwise(0))\\\n", " .over(window))\n", "\n", "df = df.withColumn(\"energy_sum_1yr\", sum(when(df.day_date \\\n", " >= (date(2014,2,28) - relativedelta(months=12))\\\n", " , col(\"energy_sum\")).otherwise(0))\\\n", " .over(window))\n", "#------\n", "# Count\n", "#------\n", "\n", "df = df.withColumn(\"energy_count_3months\", sum(when(df.day_date \\\n", " >= (date(2014,2,28) - relativedelta(months=3))\\\n", " , col(\"energy_count\")).otherwise(0))\\\n", " .over(window))\n", "\n", "df = df.withColumn(\"energy_count_6months\", sum(when(df.day_date \\\n", " >= (date(2014,2,28) - relativedelta(months=6))\\\n", " , col(\"energy_count\")).otherwise(0))\\\n", " .over(window))\n", "\n", "df = df.withColumn(\"energy_count_1yr\", sum(when(df.day_date \\\n", " >= (date(2014,2,28) - relativedelta(months=12))\\\n", " , col(\"energy_count\")).otherwise(0))\\\n", " .over(window))\n", "\n", "#------\n", "# Max\n", "#------\n", "\n", "df = df.withColumn(\"energy_max_3months\", max(when(df.day_date \\\n", " >= (date(2014,2,28) - relativedelta(months=3))\\\n", " , col(\"energy_max\")).otherwise(0))\\\n", " .over(window))\n", "\n", "df = df.withColumn(\"energy_max_6months\", max(when(df.day_date \\\n", " >= (date(2014,2,28) - relativedelta(months=6))\\\n", " , col(\"energy_max\")).otherwise(0))\\\n", " .over(window))\n", "\n", "df = df.withColumn(\"energy_max_1yr\", max(when(df.day_date \\\n", " >= (date(2014,2,28) - relativedelta(months=12))\\\n", " , col(\"energy_max\")).otherwise(0))\\\n", " .over(window))\n", "\n", "#------\n", "# Mean\n", "#------\n", "\n", "df = df.withColumn(\"energy_mean_3months\", avg(when(df.day_date \\\n", " >= (date(2014,2,28) - relativedelta(months=3))\\\n", " , col(\"energy_mean\")).otherwise(0))\\\n", " .over(window))\n", "\n", "df = df.withColumn(\"energy_mean_6months\", avg(when(df.day_date \\\n", " >= (date(2014,2,28) - relativedelta(months=6))\\\n", " , col(\"energy_mean\")).otherwise(0))\\\n", " .over(window))\n", "\n", "df = df.withColumn(\"energy_mean_1yr\", avg(when(df.day_date \\\n", " >= (date(2014,2,28) - relativedelta(months=12))\\\n", " , col(\"energy_mean\")).otherwise(0))\\\n", " .over(window))\n", "\n", "\n", "#------\n", "# Stddev\n", "#------\n", "\n", "df = df.withColumn(\"energy_stddev_3months\", stddev(when(df.day_date \\\n", " >= (date(2014,2,28) - relativedelta(months=3))\\\n", " , col(\"energy_sum\")).otherwise(0))\\\n", " .over(window))\n", "\n", "df = df.withColumn(\"energy_stddev_6months\", stddev(when(df.day_date \\\n", " >= (date(2014,2,28) - relativedelta(months=6))\\\n", " , col(\"energy_sum\")).otherwise(0))\\\n", " .over(window))\n", "\n", "df = df.withColumn(\"energy_stddev_1yr\", stddev(when(df.day_date \\\n", " >= (date(2014,2,28) - relativedelta(months=12))\\\n", " , col(\"energy_sum\")).otherwise(0))\\\n", " .over(window))\n", "\n", "df.printSchema()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# 5. Update column name and order, replace null values\n", "\n", "- We will update the column name __household_id__ to __id__ for simplicity.\n", "- Next We need to order the columns in the dataframe in the same order Keyspaces table expects them to arrive in. This can be done by running a __selectExpr__ function. \n", "- Spark Null is not compatible with Keyspaces Null type so we use the __fillna__ function to replace all null values in the dataframe with 0.\n", "\n", "***" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "8e6b9152d8454993aee2ec45ac26b920", "version_major": 2, "version_minor": 0 }, "text/plain": [ "VBox()" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "Records in Feature Dataset: 3,510,433" ] } ], "source": [ "df = df.selectExpr('household_id as id','day_date','energy_median','energy_mean','energy_max','energy_count','energy_std',\\\n", " 'energy_sum','energy_min','energy_sum_3months','energy_sum_6months','energy_sum_1yr',\\\n", " 'energy_count_3months','energy_count_6months','energy_count_1yr','energy_max_3months',\\\n", " 'energy_max_6months','energy_max_1yr','energy_mean_3months','energy_mean_6months','energy_mean_1yr',\\\n", " 'energy_stddev_3months','energy_stddev_6months','energy_stddev_1yr').fillna(0)\n", "\n", "print(\"Records in Feature Dataset: {0:,}\".format(df.count()))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# 6. Write Feature Data to Delta Lake\n", "\n", "Next we will write the features to Delta Lake on an S3 location. You should set the variable __s3_delta_lake_uri__ the location where you want to write the Delta lake table\n", "\n", "***" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "58e4ceaf79ea4958b11674352859bc92", "version_major": 2, "version_minor": 0 }, "text/plain": [ "VBox()" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "s3_delta_lake_uri = \"s3://{your-bucket-name-here}/delta_table/uk_energy_features\"\n", "\n", "df.write.format(\"delta\")\\\n", " .mode(\"overwrite\")\\\n", " .partitionBy('day_date')\\\n", " .save(s3_delta_lake_uri)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# 7. Write Feature Data to Keyspaces\n", "\n", "Now we will write the dataframe to Keyspaces. Spark writes individual partitions to Keyspaces.\n", "When starting with a new table capacity mode set to on-demand which is a flexible option\n", "capable of serving thousands of requests per second without capacity planning. Keyspaces on-\n", "demand offers pay-per-request pricing for read and write requests so that you pay only for what\n", "you use. Keyspaces tables using on-demand capacity mode automatically adapt to your\n", "application's traffic volume. However, tables using the on-demand mode might still throttle. You\n", "might experience throttling if you exceed double your previous traffic peak within 30 minutes.\n", "It's a best practice to spread your traffic growth over at least 30 minutes before exceeding double\n", "your previous traffic peak. To overcome this we half the number of partitions in our dataframe if\n", "a write jobs fails. We continue doing that till we have 1 partition left.\n", "Another solution to writing more partitions at once is to change the capacity mode for the table\n", "from on-demand to provisioned. You can Switch Capacity Modes in order to optimize cost and\n", "performance\n", "\n", "\n", "Additionally, we created energy_data_features table with compound primary key, that we\n", "can use to query and return sorted results. id as partition key and day_date column WITH\n", "CLUSTERING ORDER BY in descending order.\n", "\n", "***" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "2127f19ab8314853bc8f1fb74d70d0ea", "version_major": 2, "version_minor": 0 }, "text/plain": [ "VBox()" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "def save_dataset(\n", " df: DataFrame, \n", " keyspace_name: str = 'feature_store', \n", " table_name: str = 'energy_data_features'\n", "):\n", "\n", " num_partitions = 2\n", " while num_partitions >= 1:\n", " \n", " print(\"Current Partitions: {0:,}\".format(num_partitions)) \n", "\n", " try:\n", "\n", " df.coalesce(num_partitions).write.format(\"org.apache.spark.sql.cassandra\")\\\n", " .mode(\"append\")\\\n", " .option(\"keyspace\", keyspace_name)\\\n", " .option(\"table\", table_name)\\\n", " .save()\n", " print(\"Dataframe saved in Keyspaces\")\n", " return\n", " except Exception as e:\n", " print(\n", " f\"Throttled saving {keyspace_name}.{table_name} with {num_partitions} partitions\",\n", " e,\n", " )\n", "\n", " num_partitions //= 2\n", " \n", " print(\n", " f\"Unable to save to {keyspace_name}.{table_name} despite repartitioning, \"\n", " )\n", " raise Exception(\n", " f\"Unable to save to {keyspace_name}.{table_name} despite repartitioning\"\n", " ) " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now we execute the function. This can take betweem __1-2 minutes__ to finish" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "ba727a1bc5424ab9b1f5b0abb4d45954", "version_major": 2, "version_minor": 0 }, "text/plain": [ "VBox()" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "1d1732c181ad44bdbdacc45284f10626", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "save_dataset(df)" ] } ], "metadata": { "interpreter": { "hash": "aee8b7b246df8f9039afb4144a1f6fd8d2ca17a180786b69acc140d282b71a49" }, "kernelspec": { "display_name": "Python 3", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "python", "version": 2 }, "mimetype": "text/x-python", "name": "python", "pygments_lexer": "python2", "version": "3" } }, "nbformat": 4, "nbformat_minor": 5 }