{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "## Running Spark queries against your Data Lake\n", "\n", "We will now leverage EMR using the Glue Data Catalog and the same yellow taxi data we used in Athena and Redshift. The first step is to get a Spark session to be able to run SparkSQL queries against." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from pyspark.sql import SparkSession\n", "import datetime\n", "\n", "spark = SparkSession \\\n", " .builder \\\n", " .appName(\"Spark Taxi demo\") \\\n", " .getOrCreate()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Run the same query but with Spark on EMR to get the count of yellow taxi rides between Jan 1st - 10th in 2017 using the CSV formatted data." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "## Unoptimized query\n", "currentDT1 = datetime.datetime.now()\n", "\n", "sql = 'SELECT count(yellow.vendorid) FROM taxi.yellow '\\\n", " 'Inner JOIN taxi.paymenttype ON yellow.payment_type = paymenttype.id '\\\n", " 'Inner JOIN taxi.ratecode ON yellow.ratecodeid = ratecode.id '\\\n", " 'Inner JOIN taxi.taxi_zone_lookup AS pu_taxizone ON yellow.pulocationid = pu_taxizone.locationid '\\\n", " 'Inner JOIN taxi.taxi_zone_lookup AS do_taxizone ON yellow.dolocationid = do_taxizone.locationid '\\\n", " 'where month(to_date(tpep_pickup_datetime)) = 1 '\\\n", " 'and year(to_date(tpep_pickup_datetime)) = 2017 and dayofmonth(to_date(tpep_pickup_datetime)) between 1 and 10'\n", "\n", "sqlDF = spark.sql(sql)\n", "\n", "sqlDF.show()\n", "\n", "currentDT2 = datetime.datetime.now()\n", "print(currentDT2 - currentDT1)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Run the same query but with Spark on EMR to get the count of yellow taxi rides between Jan 1st - 10th in 2017 using the Parquet formatted data." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "## Optimized query\n", "currentDT1 = datetime.datetime.now()\n", "\n", "sql = 'SELECT count(yellow.vendorid) FROM taxi.yellow_parquet As yellow '\\\n", " 'Inner JOIN taxi.paymenttype ON yellow.payment_type = paymenttype.id '\\\n", " 'Inner JOIN taxi.ratecode ON yellow.ratecodeid = ratecode.id '\\\n", " 'Inner JOIN taxi.taxi_zone_lookup AS pu_taxizone ON yellow.pulocationid = pu_taxizone.locationid '\\\n", " 'Inner JOIN taxi.taxi_zone_lookup AS do_taxizone ON yellow.dolocationid = do_taxizone.locationid '\\\n", " 'where month(to_date(tpep_pickup_datetime)) = 1 '\\\n", " 'and year(to_date(tpep_pickup_datetime)) = 2017 and dayofmonth(to_date(tpep_pickup_datetime)) between 1 and 10'\n", "\n", "sqlDF = spark.sql(sql)\n", "sqlDF.show()\n", "\n", "currentDT2 = datetime.datetime.now()\n", "print(currentDT2 - currentDT1)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "PySpark", "language": "", "name": "pysparkkernel" }, "language_info": { "codemirror_mode": { "name": "python", "version": 2 }, "mimetype": "text/x-python", "name": "pyspark", "pygments_lexer": "python2" } }, "nbformat": 4, "nbformat_minor": 2 }