{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": { "trusted": true }, "outputs": [], "source": "%%spark" }, { "cell_type": "code", "execution_count": null, "metadata": { "trusted": true }, "outputs": [], "source": "val schema_2010_2014 = {\n spark\n .read\n .format(\"csv\")\n .option(\"header\", \"true\")\n .option(\"inferSchema\", \"true\")\n .load(\"s3://nyc-tlc/trip data/yellow_tripdata_2010-01.csv\")\n .schema\n}\n\nval trips_2010_2014 = {\n spark\n .read\n .schema(schema_2010_2014)\n .format(\"csv\")\n .option(\"header\", \"true\")\n .option(\"mode\", \"DROPMALFORMED\")\n .load(\"s3://nyc-tlc/trip data/yellow_tripdata_201[0-4]-*.csv\")\n}" }, { "cell_type": "code", "execution_count": null, "metadata": { "trusted": true }, "outputs": [], "source": "val schema_2015_2016 = {\n spark\n .read\n .format(\"csv\")\n .option(\"header\", \"true\")\n .option(\"inferSchema\", \"true\")\n .load(\"s3://nyc-tlc/trip data/yellow_tripdata_2015-01.csv\")\n .schema\n}\n\nval trips_2015_2016 = {\n spark\n .read\n .schema(schema_2015_2016)\n .format(\"csv\")\n .option(\"header\", \"true\")\n .option(\"mode\", \"DROPMALFORMED\")\n .load(\"s3://nyc-tlc/trip data/yellow_tripdata_201[5-6]-*.csv\")\n}" }, { "cell_type": "code", "execution_count": null, "metadata": { "trusted": true }, "outputs": [], "source": "val trips = trips_2010_2014.union(trips_2015_2016.drop(\"improvement_surcharge\"))" }, { "cell_type": "code", "execution_count": null, "metadata": { "trusted": true }, "outputs": [], "source": "val cleaned = { \n trips\n .filter(col(\"dropoff_datetime\") >= \"2010-01-01T00:00:00.000Z\")\n .filter(col(\"dropoff_datetime\") < \"2017-01-01T00:00:00.000Z\")\n .orderBy(\"dropoff_datetime\")\n .withColumn(\"trip_id\", monotonically_increasing_id())\n .withColumn(\"type\", lit(\"trip\"))\n}\n\ncleaned.cache\ncleaned.printSchema" }, { "cell_type": "code", "execution_count": null, "metadata": { "trusted": true }, "outputs": [], "source": "val prefix = \"yellow-trip-data/taxi-trips\"" }, { "cell_type": "code", "execution_count": null, "metadata": { "trusted": true }, "outputs": [], "source": "val partitioned_year = {\n cleaned\n .withColumn(\"dropoff_year\", date_format(col(\"dropoff_datetime\"), \"yyyy\"))\n .orderBy(\"trip_id\")\n}\n\npartitioned_year.write.partitionBy(\"dropoff_year\").json(s\"s3://shausma-nyc-tlc/${prefix}.json/\")" }, { "cell_type": "code", "execution_count": null, "metadata": { "trusted": true }, "outputs": [], "source": "val partitioned_year_month_day = {\n partitioned_year\n .withColumn(\"dropoff_month\", date_format(col(\"dropoff_datetime\"), \"MM\"))\n .withColumn(\"dropoff_day\", date_format(col(\"dropoff_datetime\"), \"dd\"))\n .orderBy(\"trip_id\")\n}\n\npartitioned_year_month_day.cache" }, { "cell_type": "code", "execution_count": null, "metadata": { "trusted": true }, "outputs": [], "source": "partitioned_year_month_day.write.partitionBy(\"dropoff_year\", \"dropoff_month\", \"dropoff_day\").parquet(s\"s3://shausma-nyc-tlc/${prefix}.parquet/\")\npartitioned_year_month_day.write.partitionBy(\"dropoff_year\", \"dropoff_month\", \"dropoff_day\").orc(s\"s3://shausma-nyc-tlc/${prefix}.orc/\")" }, { "cell_type": "markdown", "metadata": {}, "source": "```\n$ aws s3 ls --recursive s3://shausma-nyc-tlc/yellow-trip-data/taxi-trips.json/ | awk '{print $4}' | \\\n parallel 'aws s3 cp s3://shausma-nyc-tlc/{} - | \\ \n lz4 | \\ \n aws s3 cp - s3://shausma-nyc-tlc/`echo {} | sed \"s/.json/.json.lz4/g\"`'\n```" } ], "metadata": { "kernelspec": { "display_name": "Spark", "language": "", "name": "sparkkernel" }, "language_info": { "codemirror_mode": "text/x-scala", "mimetype": "text/x-scala", "name": "scala", "pygments_lexer": "scala" } }, "nbformat": 4, "nbformat_minor": 2 }