{ "cells": [ { "cell_type": "markdown", "id": "16f15664-c5c8-4aa9-9ecf-4ad1935825e0", "metadata": { "tags": [] }, "source": [ "# Serverless Data Prep with Glue Interactive Sessions (from `SageMaker Studio`)\n", "\n", "This notebook demonstrates the ability to use Apache Spark using AWS Glue to do data prep with two different datasets in order to build an urban air quality predictor with Amazon SageMaker.\n", "\n", "Data engineers and data scientists can now interactively prepare data at scale using their Studio notebook’s built-in integration with serverless Spark sessions managed by AWS Glue. Starting in seconds and automatically stopping compute when idle, AWS Glue interactive sessions provide an on-demand, highly-scalable, serverless Spark backend to achieve scalable data preparation within Studio. Notable benefits of using AWS Glue interactive sessions on Studio notebooks include:\n", "\n", "- No clusters to provision or manage\n", "- No idle clusters to pay for\n", "- No up-front configuration required\n", "- No resource contention for the same development environment\n", "- The exact same serverless Spark runtime and platform as AWS Glue extract, transform, and load (ETL) jobs\n", "\n", "\n" ] }, { "cell_type": "markdown", "id": "91500df2-118c-402a-a356-059c33b43e48", "metadata": { "tags": [] }, "source": [ "# Introduction" ] }, { "cell_type": "markdown", "id": "30fbffff-9ca1-49ce-a5cb-f4d0d53e31f4", "metadata": {}, "source": [ "While some organizations see data science, data engineering, and data analytics as separate siloed functions, we're increasingly seeing with many of our customers that data prep and analytics are foundational components of ML workflows.\n", "\n", "For example, although organizations have data engineering teams to clean and prepare data for analytics and ML, the specific data that a data scientist may need for training a specific model may not be available in the repository of data that a data engineering team may have prepared. But now they can perform their respective task from the same unified interface using **SageMaker Studio with Glue Intractive Session**\n", "\n", "" ] }, { "cell_type": "markdown", "id": "a5f80972-6fa0-4fc3-bb70-ef8042f2dfd3", "metadata": { "tags": [] }, "source": [ "# Problem Statement" ] }, { "cell_type": "markdown", "id": "04dfd3da-6bde-4a15-9fbf-e411913b0ba1", "metadata": {}, "source": [ "Lets take a problem and try to solve it. As we all know, Air pollution in cities can be an acute problem leading to damaging effects on people, animals, plants and property.\n", "\n", "We need to build a machine learning model which can help to predict the amount of NO2 in the area based on weather conditions\n", "\n", "So, ultimately we would like to have a ML model, wherein we are going to feed the weather details of a particular city at a given time, These details would be, mean temperature, maximum temperature, minimum temperate and so on.\n", "\n", "And the Model should predict the NO2 or nitrogen dioxide concentration levels at that time.\n", "\n", "\n" ] }, { "cell_type": "markdown", "id": "54f05e1a-2bf4-4a80-b7d2-aed06ea8acad", "metadata": { "tags": [] }, "source": [ "# Dataset" ] }, { "cell_type": "markdown", "id": "84732ed9-55b1-4d28-bdc5-c3ebc40b9baf", "metadata": { "tags": [] }, "source": [ "For this demo we would use the following dataset:\n", "\n", "- `First dataset` **NO2 Air Quality** \n", " - [(OpenAQ physical air quality data)](https://registry.opendata.aws/openaq/) : Global, aggregated physical air quality data from public data sources provided by government, research-grade and other sources.\n", " 42GB of Data\n", "\n", "- `Second dataset` **Weather** \n", " - [(NOAA Global Surface Summary of Day)](https://registry.opendata.aws/noaa-gsod/) : Global summary of day data for 18 surface meteorological elements are derived from the synoptic/hourly observations contained in USAF DATSAV3 Surface data and Federal Climate Complex Integrated Surface Hourly (ISH)." ] }, { "cell_type": "markdown", "id": "ec12e7d8-0e72-4fa5-a456-21ee04e6476c", "metadata": { "tags": [] }, "source": [ "# Lets get started\n", "\n", "Check the AWS Glue Interactive Sessions from the [**Glue console**](https://us-east-2.console.aws.amazon.com/glue/home?region=us-east-2#/v2/getting-started)" ] }, { "cell_type": "code", "execution_count": 3, "id": "a15fcb8f-e83e-40bc-8d69-e73d83fcfd13", "metadata": { "tags": [] }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Welcome to the Glue Interactive Sessions Kernel\n", "For more information on available magic commands, please type %help in any new cell.\n", "\n", "Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html\n", "It looks like there is a newer version of the kernel available. The latest version is 0.38.1 and you have 0.37.4 installed.\n", "Please run `pip install --upgrade aws-glue-sessions` to upgrade your kernel\n", "There is no current session.\n" ] } ], "source": [ "%stop_session" ] }, { "cell_type": "code", "execution_count": 5, "id": "b85b99a7-d86f-47c5-b2f5-f10e9cd89b50", "metadata": { "tags": [] }, "outputs": [ { "data": { "text/markdown": [ "\n", "# Available Magic Commands\n", "\n", "## Sessions Magic\n", "\n", "----\n", " %help Return a list of descriptions and input types for all magic commands. \n", " %profile String Specify a profile in your aws configuration to use as the credentials provider.\n", " %region String Specify the AWS region in which to initialize a session. \n", " Default from ~/.aws/config on Linux or macOS, \n", " or C:\\Users\\ USERNAME \\.aws\\config\" on Windows.\n", " %idle_timeout Int The number of minutes of inactivity after which a session will timeout. \n", " Default: 2880 minutes (48 hours).\n", " %session_id_prefix String Define a String that will precede all session IDs in the format \n", " [session_id_prefix]-[session_id]. If a session ID is not provided,\n", " a random UUID will be generated.\n", " %status Returns the status of the current Glue session including its duration, \n", " configuration and executing user / role.\n", " %session_id Returns the session ID for the running session. \n", " %list_sessions Lists all currently running sessions by ID.\n", " %stop_session Stops the current session.\n", " %glue_version String The version of Glue to be used by this session. \n", " Currently, the only valid options are 2.0 and 3.0. \n", " Default: 2.0.\n", "----\n", "\n", "## Selecting Job Types\n", "\n", "----\n", " %streaming String Sets the session type to Glue Streaming.\n", " %etl String Sets the session type to Glue ETL.\n", " %glue_ray String Sets the session type to Glue Ray.\n", "----\n", "\n", "## Glue Config Magic \n", "*(common across all job types)*\n", "\n", "----\n", "\n", " %%configure Dictionary A json-formatted dictionary consisting of all configuration parameters for \n", " a session. Each parameter can be specified here or through individual magics.\n", " %iam_role String Specify an IAM role ARN to execute your session with.\n", " Default from ~/.aws/config on Linux or macOS, \n", " or C:\\Users\\%USERNAME%\\.aws\\config` on Windows.\n", " %number_of_workers int The number of workers of a defined worker_type that are allocated \n", " when a session runs.\n", " Default: 5.\n", " %additional_python_modules List Comma separated list of additional Python modules to include in your cluster \n", " (can be from Pypi or S3).\n", "----\n", "\n", " \n", "## Magic for Spark Jobs (ETL & Streaming)\n", "\n", "----\n", " %worker_type String Set the type of instances the session will use as workers. \n", " ETL and Streaming support G.1X, G.2X, G.4X and G.8X. \n", " Default: G.1X.\n", " %connections List Specify a comma separated list of connections to use in the session.\n", " %extra_py_files List Comma separated list of additional Python files From S3.\n", " %extra_jars List Comma separated list of additional Jars to include in the cluster.\n", " %spark_conf String Specify custom spark configurations for your session. \n", " E.g. %spark_conf spark.serializer=org.apache.spark.serializer.KryoSerializer\n", "----\n", " \n", "## Magic for Ray Job\n", "\n", "----\n", " %min_workers Int The minimum number of workers that are allocated to a Ray job. \n", " Default: 1.\n", " %object_memory_head Int The percentage of free memory on the instance head node after a warm start. \n", " Minimum: 0. Maximum: 100.\n", " %object_memory_worker Int The percentage of free memory on the instance worker nodes after a warm start. \n", " Minimum: 0. Maximum: 100.\n", "----\n", "\n", "## Action Magic\n", "\n", "----\n", "\n", " %%sql String Run SQL code. All lines after the initial %%sql magic will be passed\n", " as part of the SQL code. \n", "----\n", "\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "%help" ] }, { "cell_type": "code", "execution_count": 11, "id": "c1673d5c-9297-448e-8e31-2a6d42809496", "metadata": { "tags": [] }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Setting session ID prefix to my-smgluedemo\n", "Setting Glue version to: 3.0\n", "Previous number of workers: 5\n", "Setting new number of workers to: 50\n", "Current idle_timeout is 2880 minutes.\n", "idle_timeout has been set to 600 minutes.\n", "Additional python modules to be included:\n", "sagemaker\n" ] } ], "source": [ "%session_id_prefix my-smgluedemo\n", "%glue_version 3.0\n", "%number_of_workers 50\n", "%idle_timeout 600\n", "%additional_python_modules sagemaker" ] }, { "cell_type": "code", "execution_count": 1, "id": "e4780304-97ff-459d-ac2b-029b83c0257e", "metadata": { "tags": [] }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Authenticating with environment variables and user-defined glue_role_arn: arn:aws:iam::507922848584:role/service-role/SageMaker-myRoleforSMStudio\n", "Trying to create a Glue session for the kernel.\n", "Worker Type: G.1X\n", "Number of Workers: 50\n", "Session ID: my-smgluedemo-31f96e60-2cdb-401b-9d00-52a77c55c6c9\n", "Job Type: glueetl\n", "Applying the following default arguments:\n", "--glue_kernel_version 0.37.4\n", "--enable-glue-datacatalog true\n", "--additional-python-modules sagemaker\n", "Waiting for session my-smgluedemo-31f96e60-2cdb-401b-9d00-52a77c55c6c9 to get into ready status...\n", "Session my-smgluedemo-31f96e60-2cdb-401b-9d00-52a77c55c6c9 has been created.\n", "3.1.1-amzn-0\n" ] } ], "source": [ "print(spark.version)" ] }, { "cell_type": "markdown", "id": "e6bdcc48-8304-4661-a6a9-f6f03f618cd3", "metadata": { "tags": [] }, "source": [ "# Part 1: Data preparation and cleaning using Spark " ] }, { "cell_type": "markdown", "id": "f5b9b01e-1bdc-4537-875f-1dda545827fc", "metadata": {}, "source": [ "## 1.1 Data preparation and cleaning of the `first` dataset (NO2 Air Quality)\n", "\n", "In the cells below, we're going to perform the following operations:\n", "\n", "- Use Spark to read our data from the `OpenAQ` S3 Bucket.\n", "- Filter the available data to `Boston` and `NO2 readings` (indicative of air quality).\n", "- Group the readings by day." ] }, { "cell_type": "code", "execution_count": 2, "id": "b722be6a-f815-4d32-954c-631629e7e104", "metadata": { "tags": [] }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+--------------------+---------------+------------------+--------------------+-------+--------------------+--------+------+---------+-----------+----------+-----+-----+\n", "| attribution|averagingPeriod| city| coordinates|country| date|location|mobile|parameter| sourceName|sourceType| unit|value|\n", "+--------------------+---------------+------------------+--------------------+-------+--------------------+--------+------+---------+-----------+----------+-----+-----+\n", "|[{EEA, http://www...| {hours, 1.0}|Escaldes-Engordany|{42.5096939994651...| AD|{2023-02-21T02:00...| AD0942A| false| pm10|EEA Andorra|government|µg/m³| 18.0|\n", "|[{EEA, http://www...| {hours, 1.0}|Escaldes-Engordany|{42.5096939994651...| AD|{2023-02-21T03:00...| AD0942A| false| pm10|EEA Andorra|government|µg/m³| 20.0|\n", "+--------------------+---------------+------------------+--------------------+-------+--------------------+--------+------+---------+-----------+----------+-----+-----+\n", "only showing top 2 rows\n" ] } ], "source": [ "df = spark.read.json(\"s3://openaq-fetches/realtime-gzipped/2022-01-05/1641409725.ndjson.gz\")\n", "df2 = spark.read.schema(df.schema).json(\"s3://openaq-fetches/realtime-gzipped/202*\")\n", "\n", "df2.show(2, truncate=True)" ] }, { "cell_type": "code", "execution_count": 3, "id": "51fb24ce-0949-40e5-8e70-32fb28649e21", "metadata": { "tags": [] }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+--------------------+---------------+--------------------+--------------------+-------+--------------------+--------------------+------+---------+----------+----------+----+------+----+\n", "| attribution|averagingPeriod| city| coordinates|country| date| location|mobile|parameter|sourceName|sourceType|unit| value|year|\n", "+--------------------+---------------+--------------------+--------------------+-------+--------------------+--------------------+------+---------+----------+----------+----+------+----+\n", "|[{US EPA AirNow, ...| {hours, 1.0}|Boston-Cambridge-...|{42.474701, -70.9...| US|{2023-04-05T17:00...| LYNN| false| no2| AirNow|government| ppm|0.0024|2023|\n", "|[{US EPA AirNow, ...| {hours, 1.0}|Boston-Cambridge-...|{42.2117, -71.114...| US|{2023-04-05T17:00...|E. Milton - Blue Hil| false| no2| AirNow|government| ppm| 0.002|2023|\n", "|[{US EPA AirNow, ...| {hours, 1.0}|Boston-Cambridge-...|{42.3489, -71.097...| US|{2023-04-05T17:00...| BOSTON-KENMORE| false| no2| AirNow|government| ppm| 0.016|2023|\n", "|[{US EPA AirNow, ...| {hours, 1.0}|Boston-Cambridge-...|{42.329399, -71.0...| US|{2023-04-05T17:00...| Boston - Roxbury| false| no2| AirNow|government| ppm|0.0068|2023|\n", "|[{US EPA AirNow, ...| {hours, 1.0}|Boston-Cambridge-...|{42.474701, -70.9...| US|{2023-04-07T16:00...| LYNN| false| no2| AirNow|government| ppm|9.0E-4|2023|\n", "+--------------------+---------------+--------------------+--------------------+-------+--------------------+--------------------+------+---------+----------+----------+----+------+----+\n", "only showing top 5 rows\n" ] } ], "source": [ "import pyspark.sql.functions as F\n", "\n", "'''\n", "Filtering Data ONLY for \n", " City : Boston\n", " Parameter : no2\n", "\n", "Adding a new Column 'YEAR' \n", "'''\n", "dfBos = df2.filter(F.lower((df2.city)) \\\n", " .contains('boston')) \\\n", " .filter(df2.parameter == \"no2\") \\\n", " .withColumn(\"year\", F.substring(df2.date.utc, 1, 4)) \\\n", " .cache()\n", "\n", "dfBos.show(5, truncate=True)" ] }, { "cell_type": "code", "execution_count": 4, "id": "7953d5d5-86e3-4bff-9ebf-ac905b58d49f", "metadata": { "tags": [] }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----------+--------------------+\n", "| ymd| no2_avg|\n", "+----------+--------------------+\n", "|2023-02-23|0.009197023809523808|\n", "|2020-12-26|0.003088235294117...|\n", "|2022-12-04|0.003643782837127...|\n", "|2021-11-10|0.010848101265822781|\n", "|2021-11-23|0.004562500000000002|\n", "|2022-05-31|0.005007751937984496|\n", "|2021-12-13| 0.01046226415094339|\n", "|2020-07-14|0.005851851851851853|\n", "|2020-04-12|0.004528089887640451|\n", "|2020-01-16|0.014249999999999997|\n", "+----------+--------------------+\n", "only showing top 10 rows\n" ] } ], "source": [ "'''\n", "Aggregating the data day wise by taking the average of `no2` value across each day. \n", "Reducing the no. of data points from ~2.5M to ~ 2K \n", "'''\n", "\n", "dfNoAvg = (dfBos.withColumn(\"ymd\", F.to_date(dfBos.date.utc)) \n", " .groupBy(\"ymd\") \n", " .avg(\"value\") \n", " .withColumnRenamed(\"avg(value)\", \"no2_avg\")\n", " )\n", "\n", "dfNoAvg.show(10, truncate=True)" ] }, { "cell_type": "code", "execution_count": 5, "id": "45eeb4a8-63d8-481d-817c-bf4ec73082ed", "metadata": { "tags": [] }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "(2019, 2023)\n" ] } ], "source": [ "import pyspark.sql.functions as F\n", "\n", "year_min = dfNoAvg.select(F.year(\"ymd\").alias(\"year\")).agg({'year':'min'}).first()[0]\n", "year_max = dfNoAvg.select(F.year(\"ymd\").alias(\"year\")).agg({'year':'max'}).first()[0]\n", "\n", "(year_min, year_max)" ] }, { "cell_type": "markdown", "id": "9c82f762-3183-404c-bdbf-737c0897845c", "metadata": { "tags": [] }, "source": [ "## 1.2 Data preparation and cleaning of the `second` dataset (Weather )\n", "\n", "\n", "Now that our first dataset looks good, we used the **year_min** and **year_max** variables to limit the data we want to read from the second dataset.\n" ] }, { "cell_type": "markdown", "id": "8ebedd47-fee9-4375-987b-c2523780a45a", "metadata": {}, "source": [ "### And now the weather dataset" ] }, { "cell_type": "code", "execution_count": 6, "id": "3520ed79-7174-43f8-8c38-f52bfbed0e03", "metadata": { "tags": [] }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "\n" ] } ], "source": [ "from pyspark.sql.types import DoubleType\n", "from pyspark.sql import functions as F\n", "\n", "# For reading the schema \n", "dfSchema = spark.read.csv(\"s3://noaa-gsod-pds/2022/32509099999.csv\", header=True, inferSchema=True)\n", "\n", "# Boston, MA, USA\n", "longLeft, latBottom, longRight, latTop = [-71.191162,42.227087,-70.986022,42.397057]\n", "\n", "\n", "# We read our first year, then union the rest of the years :)\n", "def read_year(year):\n", " return spark.read.csv(f\"s3://noaa-gsod-pds/{year}/\", header=True, schema=dfSchema.schema)\n", "\n", "year_range = range(int(year_min), int(year_max)+1)\n", "df = read_year(year_range[0])\n", "for year in year_range[1:]:\n", " df = df.union(read_year(year))\n", "\n", "df = df \\\n", " .withColumn('LATITUDE', df.LATITUDE.cast(DoubleType())) \\\n", " .withColumn('LONGITUDE', df.LONGITUDE.cast(DoubleType()))\n", "\n", "bostondf = df \\\n", " .filter(df.LATITUDE >= latBottom) \\\n", " .filter(df.LATITUDE <= latTop) \\\n", " .filter(df.LONGITUDE >= longLeft) \\\n", " .filter(df.LONGITUDE <= longRight)\n", "\n", "# Rename columns so they're easier to read\n", "bostonfeatures = bostondf.selectExpr(\"Date as date\", \"MAX as temp_max\", \"MIN as temp_min\", \"WDSP as wind_avg\", \"SLP as pressure_sea_level\", \"STP as pressure_station\")\n", "\n", "# Remove invalid readings\n", "no_data_mappings = [\n", " [\"temp_max\", 9999.9],\n", " [\"temp_min\", 9999.9],\n", " [\"wind_avg\", 999.9],\n", " [\"pressure_sea_level\", 9999.9],\n", " [\"pressure_station\", 9999.9]\n", "]\n", "\n", "for [name, val] in no_data_mappings:\n", " bostonfeatures = bostonfeatures \\\n", " .withColumn(name, F.when(F.col(name)==val, None) \\\n", " .otherwise(F.col(name)))\n", " \n", "# Now average each reading per day\n", "bostonfeatures = bostonfeatures \\\n", " .groupBy(\"date\") \\\n", " .agg(*[F.mean(c).alias(c) for c in bostonfeatures.columns[1:]]) \\\n", " .cache()\n", "\n", "bostonfeatures = bostonfeatures.withColumn(\"date\", F.to_date(F.col(\"date\"), \"yyyy-MM-dd\"))" ] }, { "cell_type": "markdown", "id": "04b2e17b-696a-4c1d-b294-b263b27dcea2", "metadata": {}, "source": [ "## 1.3. Marry the `two processed data` and make it ready for the `ML training`\n", "\n", "Now that we've taken a quick look at our data and done some initial exploration for both the dataset, let's merge the two datasets." ] }, { "cell_type": "code", "execution_count": 7, "id": "969544e0-5a15-4e2e-bb24-e93814dae7a8", "metadata": { "tags": [] }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----------+--------------------+\n", "| ymd| no2_avg|\n", "+----------+--------------------+\n", "|2021-11-10|0.010848101265822783|\n", "|2023-02-23| 0.00919702380952381|\n", "|2022-12-04|0.003643782837127...|\n", "|2022-05-31|0.005007751937984495|\n", "|2021-12-13|0.010462264150943394|\n", "+----------+--------------------+\n", "only showing top 5 rows\n" ] } ], "source": [ "# First Dataset - NO2 Dataset\n", "dfNoAvg.show(5)" ] }, { "cell_type": "code", "execution_count": 8, "id": "e22f9965-4e52-415f-a4fd-c2c5afaad7e0", "metadata": { "tags": [] }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----------+------------------+------------------+--------+------------------+----------------+\n", "| date| temp_max| temp_min|wind_avg|pressure_sea_level|pressure_station|\n", "+----------+------------------+------------------+--------+------------------+----------------+\n", "|2022-03-18| 68.1|42.349999999999994| 4.7| 1013.3| 506.15|\n", "|2023-04-18| 57.85| 48.8| 10.3| 1003.55| 501.3|\n", "|2022-06-02| 63.0| 54.0| 7.1| 1009.2| 8.2|\n", "|2020-01-07| 41.35| 30.6| 4.3| 1015.0| 507.25|\n", "|2020-04-13|61.150000000000006| 43.55| 8.85| 1013.05| 505.95|\n", "+----------+------------------+------------------+--------+------------------+----------------+\n", "only showing top 5 rows\n" ] } ], "source": [ "# Second Dataset - Weather Dataset \n", "bostonfeatures.show(5)" ] }, { "cell_type": "code", "execution_count": 9, "id": "9c7c5af5-3e55-4c78-a19c-d9eb07622521", "metadata": { "tags": [] }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+--------------------+----------+------------------+--------+--------+------------------+------------------+\n", "| no2_avg| date| temp_max|temp_min|wind_avg|pressure_sea_level| pressure_station|\n", "+--------------------+----------+------------------+--------+--------+------------------+------------------+\n", "| 0.011|2019-12-31|43.150000000000006| 36.2| 7.3|1004.8499999999999| 502.45|\n", "|0.005319148936170215|2020-01-01| 42.55| 36.0| 6.05| 1001.2| 500.3|\n", "|0.009977011494252877|2020-01-02| 47.65| 34.15| 5.85|1009.6500000000001| 504.55|\n", "| 0.01679166666666667|2020-01-03| 50.8| 38.95| 3.9|1007.8499999999999| 503.7|\n", "| 0.01897872340425532|2020-01-04| 50.35| 42.8| 2.0|1005.5999999999999| 502.5|\n", "|0.003416666666666...|2020-01-05| 50.1| 35.0| 7.1| 1001.45| 500.25|\n", "| 0.01388541666666667|2020-01-06| 38.65| 27.6| 2.85|1012.1500000000001| 505.8|\n", "|0.015556962025316456|2020-01-07| 41.35| 30.6| 4.3| 1015.0| 507.25|\n", "| 0.011425|2020-01-08| 44.25| 33.35| 4.65| 1009.45| 504.45|\n", "|0.005458333333333335|2020-01-09| 40.4| 22.1| 7.45|1029.8000000000002| 514.6|\n", "|0.016882978723404257|2020-01-10| 51.0| 24.9| 5.3| 1034.6| 517.05|\n", "|0.006465909090909092|2020-01-11| 67.85| 49.25| 7.25| 1023.6|511.59999999999997|\n", "|0.001406250000000...|2020-01-12| 71.95| 51.25| 9.85| 1012.05|505.59999999999997|\n", "| 0.00792553191489362|2020-01-13| 50.75| 35.95| 4.5| 1030.1| 514.7|\n", "|0.012370370370370374|2020-01-14| 41.35| 35.55| 2.5| 1026.65| 513.05|\n", "| 0.01078125|2020-01-15| 50.1| 38.2| 3.6| 1018.75|509.09999999999997|\n", "|0.014249999999999999|2020-01-16| 48.75| 36.85| 5.1| 1011.65| 505.4|\n", "|0.004300000000000002|2020-01-17| 41.05| 16.65| 9.0| 1025.5| 512.5|\n", "|0.009635416666666669|2020-01-18| 26.15| 13.65| 4.45| 1032.55| 516.0|\n", "|0.014961538461538462|2020-01-19| 44.0| 17.35| 5.0| 1005.75| 502.7|\n", "+--------------------+----------+------------------+--------+--------+------------------+------------------+\n", "only showing top 20 rows\n" ] } ], "source": [ "# Merging the two dataset \n", "merged_df = dfNoAvg.join(bostonfeatures, dfNoAvg['ymd'] == bostonfeatures['date'], 'inner') \\\n", " .drop('ymd').orderBy('date', ascending=True)\n", "merged_df.show()" ] }, { "cell_type": "markdown", "id": "073effb6-de26-4e14-8cac-fd6c471997bc", "metadata": {}, "source": [ "## 1.4. Save the data to S3\n" ] }, { "cell_type": "code", "execution_count": 10, "id": "db3e64bf-5413-419d-bf29-072b2a73d960", "metadata": { "tags": [] }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "\n" ] } ], "source": [ "bucket = 'fcc-spark-example'\n", "merged_df.coalesce(1).write.parquet(f\"s3://{bucket}/subset-boston-data.parquet\")" ] }, { "cell_type": "markdown", "id": "2638a7ad-46b1-4c61-bb67-e9cb4bac4c0d", "metadata": { "tags": [] }, "source": [ "