ID | YARN Application ID | Kind | State | Spark UI | Driver log | Current session? |
---|---|---|---|---|---|---|
5 | application_1630411267894_0007 | pyspark | killed | Link | Link |
ID | YARN Application ID | Kind | State | Spark UI | Driver log | Current session? |
---|---|---|---|---|---|---|
7 | application_1630411267894_0009 | pyspark | idle | Link | Link | ✔ |
selectExpr
__ function. \n",
"- Spark Null
is not compatible with Keyspaces Null
type so we use the __fillna
__ function to replace all null values in the dataframe with 0.\n",
"\n",
"***"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "8e6b9152d8454993aee2ec45ac26b920",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"VBox()"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"Records in Feature Dataset: 3,510,433"
]
}
],
"source": [
"df = df.selectExpr('household_id as id','day_date','energy_median','energy_mean','energy_max','energy_count','energy_std',\\\n",
" 'energy_sum','energy_min','energy_sum_3months','energy_sum_6months','energy_sum_1yr',\\\n",
" 'energy_count_3months','energy_count_6months','energy_count_1yr','energy_max_3months',\\\n",
" 'energy_max_6months','energy_max_1yr','energy_mean_3months','energy_mean_6months','energy_mean_1yr',\\\n",
" 'energy_stddev_3months','energy_stddev_6months','energy_stddev_1yr').fillna(0)\n",
"\n",
"print(\"Records in Feature Dataset: {0:,}\".format(df.count()))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# 6. Write Feature Data to Delta Lake\n",
"\n",
"Next we will write the features to Delta Lake on an S3 location. You should set the variable __s3_delta_lake_uri__ the location where you want to write the Delta lake table\n",
"\n",
"***"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "58e4ceaf79ea4958b11674352859bc92",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"VBox()"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"s3_delta_lake_uri = \"s3://{your-bucket-name-here}/delta_table/uk_energy_features\"\n",
"\n",
"df.write.format(\"delta\")\\\n",
" .mode(\"overwrite\")\\\n",
" .partitionBy('day_date')\\\n",
" .save(s3_delta_lake_uri)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# 7. Write Feature Data to Keyspaces\n",
"\n",
"Now we will write the dataframe to Keyspaces. Spark writes individual partitions to Keyspaces.\n",
"When starting with a new table capacity mode set to on-demand which is a flexible option\n",
"capable of serving thousands of requests per second without capacity planning. Keyspaces on-\n",
"demand offers pay-per-request pricing for read and write requests so that you pay only for what\n",
"you use. Keyspaces tables using on-demand capacity mode automatically adapt to your\n",
"application's traffic volume. However, tables using the on-demand mode might still throttle. You\n",
"might experience throttling if you exceed double your previous traffic peak within 30 minutes.\n",
"It's a best practice to spread your traffic growth over at least 30 minutes before exceeding double\n",
"your previous traffic peak. To overcome this we half the number of partitions in our dataframe if\n",
"a write jobs fails. We continue doing that till we have 1 partition left.\n",
"Another solution to writing more partitions at once is to change the capacity mode for the table\n",
"from on-demand to provisioned. You can Switch Capacity Modes in order to optimize cost and\n",
"performance\n",
"\n",
"\n",
"Additionally, we created energy_data_features table with compound primary key, that we\n",
"can use to query and return sorted results. id as partition key and day_date column WITH\n",
"CLUSTERING ORDER BY in descending order.\n",
"\n",
"***"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "2127f19ab8314853bc8f1fb74d70d0ea",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"VBox()"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"def save_dataset(\n",
" df: DataFrame, \n",
" keyspace_name: str = 'feature_store', \n",
" table_name: str = 'energy_data_features'\n",
"):\n",
"\n",
" num_partitions = 2\n",
" while num_partitions >= 1:\n",
" \n",
" print(\"Current Partitions: {0:,}\".format(num_partitions)) \n",
"\n",
" try:\n",
"\n",
" df.coalesce(num_partitions).write.format(\"org.apache.spark.sql.cassandra\")\\\n",
" .mode(\"append\")\\\n",
" .option(\"keyspace\", keyspace_name)\\\n",
" .option(\"table\", table_name)\\\n",
" .save()\n",
" print(\"Dataframe saved in Keyspaces\")\n",
" return\n",
" except Exception as e:\n",
" print(\n",
" f\"Throttled saving {keyspace_name}.{table_name} with {num_partitions} partitions\",\n",
" e,\n",
" )\n",
"\n",
" num_partitions //= 2\n",
" \n",
" print(\n",
" f\"Unable to save to {keyspace_name}.{table_name} despite repartitioning, \"\n",
" )\n",
" raise Exception(\n",
" f\"Unable to save to {keyspace_name}.{table_name} despite repartitioning\"\n",
" ) "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now we execute the function. This can take betweem __1-2 minutes__ to finish"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "ba727a1bc5424ab9b1f5b0abb4d45954",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"VBox()"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "1d1732c181ad44bdbdacc45284f10626",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"save_dataset(df)"
]
}
],
"metadata": {
"interpreter": {
"hash": "aee8b7b246df8f9039afb4144a1f6fd8d2ca17a180786b69acc140d282b71a49"
},
"kernelspec": {
"display_name": "Python 3",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "python",
"version": 2
},
"mimetype": "text/x-python",
"name": "python",
"pygments_lexer": "python2",
"version": "3"
}
},
"nbformat": 4,
"nbformat_minor": 5
}