# Spark Machine Learning using linear regression


#### Topics covered in this example
* `VectorAssembler`, `LinearRegression` and `RegressionEvaluator` from `pyspark.ml`.

***

## Prerequisites
<div class="alert alert-block alert-info">
<b>NOTE :</b> In order to execute this notebook successfully as is, please ensure the following prerequisites are completed.</div>

* The EMR cluster attached to this notebook should have the `Spark` application installed.
* This example uses a public dataset, hence the EMR cluster attached to this notebook must have internet connectivity.
* This notebook uses the `PySpark` kernel.
***

## Introduction
In this example we use pyspark to predict the total cost of a trip using <a href="https://registry.opendata.aws/nyc-tlc-trip-records-pds/" target="_blank">New York City Taxi and Limousine Commission (TLC) Trip Record Data</a> from <a href="https://registry.opendata.aws/" target="_blank">Registry of Open Data on AWS</a>.

***

## Example
Load the data set for trips into a Spark DataFrame.

In [None]:
df = spark.read.format("parquet") \
.load("s3://nyc-tlc/trip data/yellow_tripdata_2022-12.parquet", 
      inferSchema = True, 
      header = True)

Mark the dataFrame for caching in memory and display the schema to check the data-types using the `printSchema` method.

In [None]:
# Mark the dataFrame for caching in memory
df.cache()

# Print the scehma
df.printSchema()

# Get the dimensions of the data
df.count() , len(df.columns)

In [None]:
# Get the summary of the columns
df.select("total_amount", "tip_amount")\
.describe()\
.show()

# Value counts of VendorID column
df.groupBy("VendorID")\
.count()\
.show()

### Use <a href="https://spark.apache.org/docs/2.4.7/ml-features#vectorassembler" target="_blank">VectorAssembler</a> to transform input columns into vectors
<a href="https://spark.apache.org/docs/2.3.1/api/python/pyspark.ml.html" target="_blank">pyspark.ml</a> provides dataFrame-based machine learning APIs to let users quickly assemble and configure practical machine learning pipelines.    
A `VectorAssembler` combines a given list of columns into a single vector column. In the below cell we combine the columns to a single vector cloumn `features`.

In [None]:
from pyspark.ml.feature import VectorAssembler

# Specify the input and output columns of the vector assembler
vectorAssembler = VectorAssembler(
    inputCols = [
        "trip_distance",
        "PULocationID",
        "DOLocationID",
        "fare_amount",
        "mta_tax",
        "tip_amount", 
        "tolls_amount",
        "improvement_surcharge", 
        "congestion_surcharge"
    ], 
    outputCol = "features")

# Transform the data
v_df = vectorAssembler.setHandleInvalid("skip").transform(df)

# View the transformed data
v_df = v_df.select(["features", "total_amount"])
v_df.show(3)

Divide input dataset into training set and test set

In [None]:
splits = v_df.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

### Train the model using <a href="https://spark.apache.org/docs/2.4.7/ml-classification-regression.html#linear-regression" target="_blank">LinearRegression</a> against training set

In [None]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol = "features", \
                      labelCol = "total_amount", \
                      maxIter = 100, \
                      regParam = 0.3, \
                      elasticNetParam = 0.8)
lr_model = lr.fit(train_df)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

Report the trained model performance on the training set

In [None]:
training_summary = lr_model.summary
print("RMSE: %f" % training_summary.rootMeanSquaredError)
print("R squred (R2): %f" % training_summary.r2)

Predict the result using test set and report accuracy

In [None]:
predictions = lr_model.transform(test_df)

from pyspark.sql.functions import col
predictions.filter(predictions.total_amount > 10.0)\
.select("prediction", "total_amount")\
.withColumn("diff", col("prediction") - col("total_amount"))\
.withColumn("diff%", (col("diff") / col("total_amount")) * 100)\
.show()

### Report performance on the test set using <a href="https://spark.apache.org/docs/2.4.7/api/java/org/apache/spark/ml/evaluation/RegressionEvaluator.html" target="_blank">RegressionEvaluator</a>

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator

lr_evaluator = RegressionEvaluator(predictionCol = "prediction", \
                                   labelCol = "total_amount", \
                                   metricName = "r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(predictions))