In [None]:
%%spark

In [None]:
val schema_2010_2014 = {
 spark
 .read
 .format("csv")
 .option("header", "true")
 .option("inferSchema", "true")
 .load("s3://nyc-tlc/trip data/yellow_tripdata_2010-01.csv")
 .schema
}

val trips_2010_2014 = {
 spark
 .read
 .schema(schema_2010_2014)
 .format("csv")
 .option("header", "true")
 .option("mode", "DROPMALFORMED")
 .load("s3://nyc-tlc/trip data/yellow_tripdata_201[0-4]-*.csv")
}

In [None]:
val schema_2015_2016 = {
 spark
 .read
 .format("csv")
 .option("header", "true")
 .option("inferSchema", "true")
 .load("s3://nyc-tlc/trip data/yellow_tripdata_2015-01.csv")
 .schema
}

val trips_2015_2016 = {
 spark
 .read
 .schema(schema_2015_2016)
 .format("csv")
 .option("header", "true")
 .option("mode", "DROPMALFORMED")
 .load("s3://nyc-tlc/trip data/yellow_tripdata_201[5-6]-*.csv")
}

In [None]:
val trips = trips_2010_2014.union(trips_2015_2016.drop("improvement_surcharge"))

In [None]:
val cleaned = { 
 trips
 .filter(col("dropoff_datetime") >= "2010-01-01T00:00:00.000Z")
 .filter(col("dropoff_datetime") < "2017-01-01T00:00:00.000Z")
 .orderBy("dropoff_datetime")
 .withColumn("trip_id", monotonically_increasing_id())
 .withColumn("type", lit("trip"))
}

cleaned.cache
cleaned.printSchema

In [None]:
val prefix = "yellow-trip-data/taxi-trips"

In [None]:
val partitioned_year = {
 cleaned
 .withColumn("dropoff_year", date_format(col("dropoff_datetime"), "yyyy"))
 .orderBy("trip_id")
}

partitioned_year.write.partitionBy("dropoff_year").json(s"s3://shausma-nyc-tlc/${prefix}.json/")

In [None]:
val partitioned_year_month_day = {
 partitioned_year
 .withColumn("dropoff_month", date_format(col("dropoff_datetime"), "MM"))
 .withColumn("dropoff_day", date_format(col("dropoff_datetime"), "dd"))
 .orderBy("trip_id")
}

partitioned_year_month_day.cache

In [None]:
partitioned_year_month_day.write.partitionBy("dropoff_year", "dropoff_month", "dropoff_day").parquet(s"s3://shausma-nyc-tlc/${prefix}.parquet/")
partitioned_year_month_day.write.partitionBy("dropoff_year", "dropoff_month", "dropoff_day").orc(s"s3://shausma-nyc-tlc/${prefix}.orc/")

```
$ aws s3 ls --recursive s3://shausma-nyc-tlc/yellow-trip-data/taxi-trips.json/ | awk '{print $4}' | \
 parallel 'aws s3 cp s3://shausma-nyc-tlc/{} - | \ 
 lz4 | \ 
 aws s3 cp - s3://shausma-nyc-tlc/`echo {} | sed "s/.json/.json.lz4/g"`'
```