{ "cells": [ { "cell_type": "markdown", "id": "italic-invalid", "metadata": {}, "source": [ "# Spark Machine Learning using linear regression\n", "\n", "\n", "#### Topics covered in this example\n", "* `VectorAssembler`, `LinearRegression` and `RegressionEvaluator` from `pyspark.ml`." ] }, { "cell_type": "markdown", "id": "moderate-dominant", "metadata": {}, "source": [ "***\n", "\n", "## Prerequisites\n", "
\n", "NOTE : In order to execute this notebook successfully as is, please ensure the following prerequisites are completed.
\n", "\n", "* The EMR cluster attached to this notebook should have the `Spark` application installed.\n", "* This example uses a public dataset, hence the EMR cluster attached to this notebook must have internet connectivity.\n", "* This notebook uses the `PySpark` kernel.\n", "***" ] }, { "cell_type": "markdown", "id": "northern-violence", "metadata": {}, "source": [ "## Introduction\n", "In this example we use pyspark to predict the total cost of a trip using New York City Taxi and Limousine Commission (TLC) Trip Record Data from Registry of Open Data on AWS.\n", "\n", "***" ] }, { "cell_type": "markdown", "id": "difficult-enough", "metadata": {}, "source": [ "## Example\n", "Load the data set for trips into a Spark DataFrame." ] }, { "cell_type": "code", "execution_count": null, "id": "human-probe", "metadata": { "tags": [] }, "outputs": [], "source": [ "df = spark.read.format(\"parquet\") \\\n", ".load(\"s3://nyc-tlc/trip data/yellow_tripdata_2022-12.parquet\", \n", " inferSchema = True, \n", " header = True)" ] }, { "cell_type": "markdown", "id": "professional-panama", "metadata": {}, "source": [ "Mark the dataFrame for caching in memory and display the schema to check the data-types using the `printSchema` method." ] }, { "cell_type": "code", "execution_count": null, "id": "distinct-dollar", "metadata": { "tags": [] }, "outputs": [], "source": [ "# Mark the dataFrame for caching in memory\n", "df.cache()\n", "\n", "# Print the scehma\n", "df.printSchema()\n", "\n", "# Get the dimensions of the data\n", "df.count() , len(df.columns)" ] }, { "cell_type": "code", "execution_count": null, "id": "descending-messaging", "metadata": { "tags": [] }, "outputs": [], "source": [ "# Get the summary of the columns\n", "df.select(\"total_amount\", \"tip_amount\")\\\n", ".describe()\\\n", ".show()\n", "\n", "# Value counts of VendorID column\n", "df.groupBy(\"VendorID\")\\\n", ".count()\\\n", ".show()" ] }, { "cell_type": "markdown", "id": "compressed-maintenance", "metadata": {}, "source": [ "### Use VectorAssembler to transform input columns into vectors\n", "pyspark.ml provides dataFrame-based machine learning APIs to let users quickly assemble and configure practical machine learning pipelines. \n", "A `VectorAssembler` combines a given list of columns into a single vector column. In the below cell we combine the columns to a single vector cloumn `features`." ] }, { "cell_type": "code", "execution_count": null, "id": "magnetic-anaheim", "metadata": { "tags": [] }, "outputs": [], "source": [ "from pyspark.ml.feature import VectorAssembler\n", "\n", "# Specify the input and output columns of the vector assembler\n", "vectorAssembler = VectorAssembler(\n", " inputCols = [\n", " \"trip_distance\",\n", " \"PULocationID\",\n", " \"DOLocationID\",\n", " \"fare_amount\",\n", " \"mta_tax\",\n", " \"tip_amount\", \n", " \"tolls_amount\",\n", " \"improvement_surcharge\", \n", " \"congestion_surcharge\"\n", " ], \n", " outputCol = \"features\")\n", "\n", "# Transform the data\n", "v_df = vectorAssembler.setHandleInvalid(\"skip\").transform(df)\n", "\n", "# View the transformed data\n", "v_df = v_df.select([\"features\", \"total_amount\"])\n", "v_df.show(3)" ] }, { "cell_type": "markdown", "id": "baking-surge", "metadata": {}, "source": [ "Divide input dataset into training set and test set" ] }, { "cell_type": "code", "execution_count": null, "id": "monthly-ranch", "metadata": { "tags": [] }, "outputs": [], "source": [ "splits = v_df.randomSplit([0.7, 0.3])\n", "train_df = splits[0]\n", "test_df = splits[1]" ] }, { "cell_type": "markdown", "id": "august-terror", "metadata": {}, "source": [ "### Train the model using LinearRegression against training set" ] }, { "cell_type": "code", "execution_count": null, "id": "golden-industry", "metadata": { "tags": [] }, "outputs": [], "source": [ "from pyspark.ml.regression import LinearRegression\n", "\n", "lr = LinearRegression(featuresCol = \"features\", \\\n", " labelCol = \"total_amount\", \\\n", " maxIter = 100, \\\n", " regParam = 0.3, \\\n", " elasticNetParam = 0.8)\n", "lr_model = lr.fit(train_df)\n", "print(\"Coefficients: \" + str(lr_model.coefficients))\n", "print(\"Intercept: \" + str(lr_model.intercept))" ] }, { "cell_type": "markdown", "id": "congressional-buddy", "metadata": {}, "source": [ "Report the trained model performance on the training set" ] }, { "cell_type": "code", "execution_count": null, "id": "waiting-healthcare", "metadata": { "tags": [] }, "outputs": [], "source": [ "training_summary = lr_model.summary\n", "print(\"RMSE: %f\" % training_summary.rootMeanSquaredError)\n", "print(\"R squred (R2): %f\" % training_summary.r2)" ] }, { "cell_type": "markdown", "id": "personal-serve", "metadata": {}, "source": [ "Predict the result using test set and report accuracy" ] }, { "cell_type": "code", "execution_count": null, "id": "arabic-corpus", "metadata": { "tags": [] }, "outputs": [], "source": [ "predictions = lr_model.transform(test_df)\n", "\n", "from pyspark.sql.functions import col\n", "predictions.filter(predictions.total_amount > 10.0)\\\n", ".select(\"prediction\", \"total_amount\")\\\n", ".withColumn(\"diff\", col(\"prediction\") - col(\"total_amount\"))\\\n", ".withColumn(\"diff%\", (col(\"diff\") / col(\"total_amount\")) * 100)\\\n", ".show()" ] }, { "cell_type": "markdown", "id": "copyrighted-kazakhstan", "metadata": {}, "source": [ "### Report performance on the test set using RegressionEvaluator" ] }, { "cell_type": "code", "execution_count": null, "id": "thirty-beaver", "metadata": { "tags": [] }, "outputs": [], "source": [ "from pyspark.ml.evaluation import RegressionEvaluator\n", "\n", "lr_evaluator = RegressionEvaluator(predictionCol = \"prediction\", \\\n", " labelCol = \"total_amount\", \\\n", " metricName = \"r2\")\n", "print(\"R Squared (R2) on test data = %g\" % lr_evaluator.evaluate(predictions))" ] }, { "cell_type": "code", "execution_count": null, "id": "26f8ecab-a88c-4840-9550-b3a99c177226", "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "PySpark", "language": "python", "name": "pysparkkernel" }, "language_info": { "codemirror_mode": { "name": "python", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "pyspark", "pygments_lexer": "python3" } }, "nbformat": 4, "nbformat_minor": 5 }