{ "cells": [ { "cell_type": "markdown", "metadata": { "button": false, "deletable": true, "new_sheet": false, "run_control": { "read_only": false } }, "source": [ "# Tutorial: CTR Modeling on EKS" ] }, { "cell_type": "markdown", "metadata": { "button": false, "deletable": true, "new_sheet": false, "run_control": { "read_only": false } }, "source": [ "In this tutorial, we will use the classic [Multilayer Perception (MLP)](https://spark.apache.org/docs/latest/ml-classification-regression.html#multilayer-perceptron-classifier) model to illustrate the workflow of deep learning model with Spark on Kubernetes. We'll see how to create a Jupyter notebook server Pod in an EKS cluster, and how to connect to the notebook server from local browser. Then inside the notebook, we will complete end-to-end feature engineering and Deep learning model training using [Spark MLlib](https://spark.apache.org/mllib/)." ] }, { "cell_type": "markdown", "metadata": { "button": false, "deletable": true, "new_sheet": false, "run_control": { "read_only": false } }, "source": [ "### Get current namespace" ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "button": false, "collapsed": false, "deletable": true, "execution": { "iopub.execute_input": "2021-01-13T06:44:44.329657Z", "iopub.status.busy": "2021-01-13T06:44:44.329351Z", "iopub.status.idle": "2021-01-13T06:44:45.037958Z", "shell.execute_reply": "2021-01-13T06:44:45.037042Z", "shell.execute_reply.started": "2021-01-13T06:44:44.329620Z" }, "jupyter": { "outputs_hidden": false }, "new_sheet": false, "run_control": { "read_only": false } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "notebook" ] } ], "source": [ "cat /var/run/secrets/kubernetes.io/serviceaccount/namespace" ] }, { "cell_type": "markdown", "metadata": { "button": false, "deletable": true, "new_sheet": false, "run_control": { "read_only": false } }, "source": [ "### Spark Docker\n", "Please refer to the ```README``` on how to build the ```docker/spark/Dockerfile``` available in the GitHub repository.\n", "
NOTE: Replace the docker image in the ```spark.kubernetes.container.image``` below.
" ] }, { "cell_type": "markdown", "metadata": { "button": false, "deletable": true, "new_sheet": false, "run_control": { "read_only": false } }, "source": [ "### Start Spark Session" ] }, { "cell_type": "markdown", "metadata": { "button": false, "deletable": true, "new_sheet": false, "run_control": { "read_only": false } }, "source": [ "It's surprisingly fast to start a Spark cluster in EKS, you can start hundreds of executors in a few minutes. This is a great time saving feature to enable rapid data analysis. To make Spark available in your Jupyter notebook, you can use [findspark](https://pypi.org/project/findspark/) to initialize and import PySpark just as regular libraries. We'll be using the following library to create your Spark session with CPU, Memory, ShufflePartiton, etc. Kubernetes scheduler assigns the Spark executor containers on run on the EC2 instances in EKS cluster." ] }, { "cell_type": "code", "execution_count": 3, "metadata": { "button": false, "deletable": true, "execution": { "iopub.execute_input": "2021-01-13T06:51:45.136677Z", "iopub.status.busy": "2021-01-13T06:51:45.136320Z", "iopub.status.idle": "2021-01-13T06:51:45.428770Z", "shell.execute_reply": "2021-01-13T06:51:45.427882Z", "shell.execute_reply.started": "2021-01-13T06:51:45.136600Z" }, "new_sheet": false, "run_control": { "read_only": false } }, "outputs": [], "source": [ "import findspark\n", "findspark.init()\n", "\n", "import pyspark\n", "from pyspark import SparkConf\n", "from pyspark.sql import SparkSession\n", "\n", "import socket\n", "host_name = socket.gethostbyname(socket.gethostname())\n", "\n", "def init_spark_session(num_executors):\n", " conf = SparkConf().setAppName('spark-on-k8s')\n", " conf.setMaster('k8s://https://kubernetes.default.svc:443')\n", " conf.set(\"spark.submit.deployMode\", \"client\")\n", " conf.set(\"spark.kubernetes.allocation.batch.size\", \"40\")\n", " conf.set(\"spark.executor.instances\", num_executors)\n", " conf.set(\"spark.executor.cores\", \"4\")\n", " conf.set(\"spark.executor.memory\", \"20G\")\n", " conf.set(\"spark.driver.host\", host_name)\n", " conf.set(\"spark.driver.memory\", \"20G\")\n", " conf.set(\"spark.driver.maxResultSize\", \"18G\")\n", " conf.set(\"spark.kubernetes.namespace\", \"notebook\")\n", " conf.set(\"spark.kubernetes.container.image\", \"\")\n", " conf.set(\"spark.kubernetes.container.image.pullPolicy\", \"IfNotPresent\")\n", " conf.set(\"spark.kubernetes.pyspark.pythonVersion\", \"3\")\n", " conf.set(\"spark.kubernetes.authenticate.driver.serviceAccountName\", \"spark\")\n", "\n", " print(\"Starting spark session with num_executors: {}\".format(num_executors))\n", " spark = SparkSession.builder.config(conf=conf).getOrCreate()\n", " return spark" ] }, { "cell_type": "markdown", "metadata": { "button": false, "deletable": true, "new_sheet": false, "run_control": { "read_only": false } }, "source": [ "### Let's start with a Spark session with a few executors" ] }, { "cell_type": "code", "execution_count": 4, "metadata": { "button": false, "collapsed": false, "deletable": true, "execution": { "iopub.execute_input": "2021-01-13T06:53:11.421202Z", "iopub.status.busy": "2021-01-13T06:53:11.420905Z", "iopub.status.idle": "2021-01-13T06:53:34.031463Z", "shell.execute_reply": "2021-01-13T06:53:34.030535Z", "shell.execute_reply.started": "2021-01-13T06:53:11.421166Z" }, "jupyter": { "outputs_hidden": false }, "new_sheet": false, "run_control": { "read_only": false } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Starting spark session with num_executors: 2\n" ] } ], "source": [ "spark = init_spark_session(num_executors=2)" ] }, { "cell_type": "code", "execution_count": 5, "metadata": { "button": false, "collapsed": false, "deletable": true, "execution": { "iopub.execute_input": "2021-01-13T06:53:34.033317Z", "iopub.status.busy": "2021-01-13T06:53:34.033053Z", "iopub.status.idle": "2021-01-13T06:53:34.049862Z", "shell.execute_reply": "2021-01-13T06:53:34.048989Z", "shell.execute_reply.started": "2021-01-13T06:53:34.033281Z" }, "jupyter": { "outputs_hidden": false }, "new_sheet": false, "run_control": { "read_only": false } }, "outputs": [ { "data": { "text/html": [ "\n", "
\n", "

SparkSession - in-memory

\n", " \n", "
\n", "

SparkContext

\n", "\n", "

Spark UI

\n", "\n", "
\n", "
Version
\n", "
v2.4.4
\n", "
Master
\n", "
k8s://https://kubernetes.default.svc:443
\n", "
AppName
\n", "
spark-on-k8s
\n", "
\n", "
\n", " \n", "
\n", " " ], "text/plain": [ "" ] }, "execution_count": 5, "metadata": {}, "output_type": "execute_result" } ], "source": [ "spark" ] }, { "cell_type": "markdown", "metadata": { "button": false, "deletable": true, "new_sheet": false, "run_control": { "read_only": false } }, "source": [ "### Synthetic Data Generation" ] }, { "cell_type": "markdown", "metadata": { "button": false, "deletable": true, "new_sheet": false, "run_control": { "read_only": false } }, "source": [ "In Amazon, we deal with Petabyte scale datasets. For the purpose of the tutorial, we synthesized two sample datasets to illustrate the workflow rather than the scale. We will first load a small dataset with 90K rows to do feature engineering, model training etc. Later we'll use a larger dataset with around 1B rows for end to end execution." ] }, { "cell_type": "code", "execution_count": 6, "metadata": { "button": false, "deletable": true, "execution": { "iopub.execute_input": "2021-01-13T06:53:34.051531Z", "iopub.status.busy": "2021-01-13T06:53:34.051290Z", "iopub.status.idle": "2021-01-13T06:53:34.087878Z", "shell.execute_reply": "2021-01-13T06:53:34.087114Z", "shell.execute_reply.started": "2021-01-13T06:53:34.051499Z" }, "new_sheet": false, "run_control": { "read_only": false } }, "outputs": [], "source": [ "import random\n", "import pyspark.mllib.random as rnd\n", "from pyspark.sql.functions import udf, array\n", "from pyspark.sql.types import DoubleType\n", "\n", "def get_random_udf(l):\n", " return udf(lambda: random.choice(l))\n", "\n", "def get_click(numeric_features, coeff):\n", " col = len(coeff)\n", " products = sum([numeric_features[i] * (coeff[i] + random.random()) for i in range(col)])\n", " \n", " if products > col * random.random():\n", " return 1.0\n", " return 0.0\n", "\n", "def get_click_udf(coeff):\n", " return udf(lambda features: get_click(features, coeff), DoubleType())\n", "\n", "def get_synthetic_data(sc, num_rows=10, num_cols=10, num_partitions=1, seed=1):\n", " numeric_feature_names = list(map(lambda x: 'numeric_{}'.format(x), list(range(0, num_cols))))\n", " numeric_features_rdd = rnd.RandomRDDs.normalVectorRDD(sc, num_rows, num_cols, num_partitions, seed)\n", " numeric_features = numeric_features_rdd.map(lambda a : a.tolist()).toDF(numeric_feature_names)\n", "\n", " category_values = ['A'] * random.randint(1, 100) + ['B'] * random.randint(1, 100)\n", " features = numeric_features.withColumn('categoric_0', get_random_udf(category_values)())\n", " \n", " coeff = rnd.RandomRDDs.uniformRDD(sc, num_cols).collect()\n", " data = features.withColumn('click', get_click_udf(coeff)(array(numeric_feature_names)))\n", "\n", " return data" ] }, { "cell_type": "code", "execution_count": 7, "metadata": { "button": false, "collapsed": false, "deletable": true, "execution": { "iopub.execute_input": "2021-01-13T06:53:44.358057Z", "iopub.status.busy": "2021-01-13T06:53:44.357723Z", "iopub.status.idle": "2021-01-13T06:53:48.578515Z", "shell.execute_reply": "2021-01-13T06:53:48.577697Z", "shell.execute_reply.started": "2021-01-13T06:53:44.358023Z" }, "jupyter": { "outputs_hidden": false }, "new_sheet": false, "run_control": { "read_only": false } }, "outputs": [ { "data": { "text/plain": [ "DataFrame[numeric_0: double, numeric_1: double, numeric_2: double, numeric_3: double, numeric_4: double, numeric_5: double, numeric_6: double, numeric_7: double, numeric_8: double, numeric_9: double, categoric_0: string, click: double]" ] }, "execution_count": 7, "metadata": {}, "output_type": "execute_result" } ], "source": [ "(num_rows, num_cols, num_partitions) = (100000, 10, 1)\n", "df = get_synthetic_data(spark.sparkContext, num_rows, num_cols, num_partitions)\n", "df.cache()" ] }, { "cell_type": "code", "execution_count": 8, "metadata": { "button": false, "collapsed": false, "deletable": true, "execution": { "iopub.execute_input": "2021-01-13T06:53:48.580115Z", "iopub.status.busy": "2021-01-13T06:53:48.579851Z", "iopub.status.idle": "2021-01-13T06:53:48.585346Z", "shell.execute_reply": "2021-01-13T06:53:48.584491Z", "shell.execute_reply.started": "2021-01-13T06:53:48.580080Z" }, "jupyter": { "outputs_hidden": false }, "new_sheet": false, "run_control": { "read_only": false } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- numeric_0: double (nullable = true)\n", " |-- numeric_1: double (nullable = true)\n", " |-- numeric_2: double (nullable = true)\n", " |-- numeric_3: double (nullable = true)\n", " |-- numeric_4: double (nullable = true)\n", " |-- numeric_5: double (nullable = true)\n", " |-- numeric_6: double (nullable = true)\n", " |-- numeric_7: double (nullable = true)\n", " |-- numeric_8: double (nullable = true)\n", " |-- numeric_9: double (nullable = true)\n", " |-- categoric_0: string (nullable = true)\n", " |-- click: double (nullable = true)\n", "\n" ] } ], "source": [ "df.printSchema()" ] }, { "cell_type": "markdown", "metadata": { "button": false, "deletable": true, "new_sheet": false, "run_control": { "read_only": false } }, "source": [ "### Feature Engineering" ] }, { "cell_type": "markdown", "metadata": { "button": false, "deletable": true, "new_sheet": false, "run_control": { "read_only": false } }, "source": [ "Machine Learning models expect numeric inputs in order to make predictions. But dataset usually comes in unstructured format. Since using the right numeric representation of features is crucial for the success of a machine learning model, we usually spend a lot of time understanding the data and applying appropriate transformations to the raw data to produce high quality features, this process is called feature engineering. We will cover a few common feature types and its associated feature engineering APIs in pyspark.\n", "\n", "There are ```num_cols``` numerical features and 1 categorical feature in this raw data set, and click is the label column. We have named the features like numeric_n for simplicity of this tutorial. \n", "\n", "A quick exploratory data analysis (EDA) can show the ratio of click vs non-click impressions in this dataset." ] }, { "cell_type": "code", "execution_count": 10, "metadata": { "button": false, "collapsed": false, "deletable": true, "execution": { "iopub.execute_input": "2021-01-13T06:55:42.011301Z", "iopub.status.busy": "2021-01-13T06:55:42.010990Z", "iopub.status.idle": "2021-01-13T06:55:42.276465Z", "shell.execute_reply": "2021-01-13T06:55:42.275413Z", "shell.execute_reply.started": "2021-01-13T06:55:42.011265Z" }, "jupyter": { "outputs_hidden": false }, "new_sheet": false, "run_control": { "read_only": false } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "click: 13171\n", "non-click: 86829\n" ] } ], "source": [ "print('click: {}'.format(df.filter(df.click == 1).count()))\n", "print('non-click: {}'.format(df.filter(df.click != 1).count()))" ] }, { "cell_type": "markdown", "metadata": { "button": false, "deletable": true, "new_sheet": false, "run_control": { "read_only": false } }, "source": [ "#### Categorical Features" ] }, { "cell_type": "markdown", "metadata": { "button": false, "deletable": true, "new_sheet": false, "run_control": { "read_only": false } }, "source": [ "The most common type of data is categorical data that the values are discrete and form a finite set. A typical example is the size of shirts, the values include S, M, L, XL, etc. So feature engineering for categorical data is to apply transformations on the discrete values to produce distinct numeric values. Below is a simple example that how we can achieve that with StringIndexer from pyspark." ] }, { "cell_type": "code", "execution_count": 13, "metadata": { "button": false, "collapsed": false, "deletable": true, "execution": { "iopub.execute_input": "2021-01-13T06:57:03.739953Z", "iopub.status.busy": "2021-01-13T06:57:03.739640Z", "iopub.status.idle": "2021-01-13T06:57:04.431706Z", "shell.execute_reply": "2021-01-13T06:57:04.430990Z", "shell.execute_reply.started": "2021-01-13T06:57:03.739916Z" }, "jupyter": { "outputs_hidden": false }, "new_sheet": false, "run_control": { "read_only": false } }, "outputs": [ { "data": { "text/plain": [ "" ] }, "execution_count": 13, "metadata": {}, "output_type": "execute_result" }, { "data": { "image/png": "iVBORw0KGgoAAAANSUhEUgAAAYMAAAD2CAYAAAA0/OvUAAAAOXRFWHRTb2Z0d2FyZQBNYXRwbG90bGliIHZlcnNpb24zLjMuMywgaHR0cHM6Ly9tYXRwbG90bGliLm9yZy/Il7ecAAAACXBIWXMAAAsTAAALEwEAmpwYAAARtUlEQVR4nO3df6zddX3H8edrrShxwxa5NqS3rhg7CZIocAM1mmXSWFpcLFmUwZb1jjR0Ceg0WTKrWdYMJMF/xsAgSScdrXFiZToaLdamYrb9UehFGL8JV4S1DdArt8CUKQHf++N8Oo/l3t5zob2n630+kpPz+b4/n+/3fL7JTV/n+/1+DqSqkCTNbr/V7wlIkvrPMJAkGQaSJMNAkoRhIEnCMJAkAXOnGpDkPcA3ukrvAv4W2Nzqi4EngYur6kCSANcDFwIvAX9eVT9qxxoG/qYd5wtVtanVzwFuAU4EtgGfrinWvJ5yyim1ePHiXs5RkgTcc889P62qgYn6Mp3fGSSZA+wDzgOuBMar6tok64D5VfXZJBcCn6ITBucB11fVeUlOBkaAIaCAe4BzWoDcDfwlcBedMLihqu443FyGhoZqZGSk57lL0myX5J6qGpqob7q3iZYBP66qp4BVwKZW3wRc1NqrgM3VsQuYl+RU4AJgR1WNV9UBYAewovWdVFW72tXA5q5jSZJmwHTD4BLg6629oKqebu1ngAWtvRDY07XP3lY7XH3vBHVJ0gzpOQySnAB8DPjmoX3tG/1R/+9aJFmbZCTJyNjY2NH+OEmaNaZzZbAS+FFVPdu2n223eGjv+1t9H7Coa7/BVjtcfXCC+mtU1YaqGqqqoYGBCZ+BSJJeh+mEwaX8+hYRwFZguLWHgdu76qvTsRR4od1O2g4sTzI/yXxgObC99b2YZGlbibS661iSpBkw5dJSgCRvBT4C/EVX+VpgS5I1wFPAxa2+jc5KolE6S0svA6iq8SRXA7vbuKuqary1r+DXS0vvaC9J0gyZ1tLSY4lLSyVpeo7k0lJJ0nHIMJAk9fbMQK/P4nXf7fcUjitPXvvRfk9BOm55ZSBJMgwkSYaBJAnDQJKEYSBJwjCQJGEYSJIwDCRJGAaSJAwDSRKGgSQJw0CShGEgScIwkCRhGEiSMAwkSRgGkiQMA0kShoEkiR7DIMm8JLcleTTJI0k+kOTkJDuSPN7e57exSXJDktEk9yc5u+s4w23840mGu+rnJHmg7XNDkhz5U5UkTabXK4Prge9V1enA+4BHgHXAzqpaAuxs2wArgSXttRa4CSDJycB64DzgXGD9wQBpYy7v2m/FGzstSdJ0TBkGSd4G/D5wM0BVvVxVzwOrgE1t2CbgotZeBWyujl3AvCSnAhcAO6pqvKoOADuAFa3vpKraVVUFbO46liRpBvRyZXAaMAb8U5J7k3wlyVuBBVX1dBvzDLCgtRcCe7r239tqh6vvnaAuSZohvYTBXOBs4KaqOgv4Ob++JQRA+0ZfR356vynJ2iQjSUbGxsaO9sdJ0qzRSxjsBfZW1V1t+zY64fBsu8VDe9/f+vcBi7r2H2y1w9UHJ6i/RlVtqKqhqhoaGBjoYeqSpF5MGQZV9QywJ8l7WmkZ8DCwFTi4ImgYuL21twKr26qipcAL7XbSdmB5kvntwfFyYHvrezHJ0raKaHXXsSRJM2Buj+M+BXwtyQnAE8BldIJkS5I1wFPAxW3sNuBCYBR4qY2lqsaTXA3sbuOuqqrx1r4CuAU4EbijvSRJM6SnMKiq+4ChCbqWTTC2gCsnOc5GYOME9RHgzF7mIkk68vwFsiTJMJAkGQaSJAwDSRKGgSQJw0CShGEgScIwkCRhGEiSMAwkSRgGkiQMA0kShoEkCcNAkoRhIEnCMJAkYRhIkjAMJEkYBpIkDANJEoaBJIkewyDJk0keSHJfkpFWOznJjiSPt/f5rZ4kNyQZTXJ/krO7jjPcxj+eZLirfk47/mjbN0f6RCVJk5vOlcGHq+r9VTXUttcBO6tqCbCzbQOsBJa011rgJuiEB7AeOA84F1h/MEDamMu79lvxus9IkjRtb+Q20SpgU2tvAi7qqm+ujl3AvCSnAhcAO6pqvKoOADuAFa3vpKraVVUFbO46liRpBvQaBgV8P8k9Sda22oKqerq1nwEWtPZCYE/Xvntb7XD1vRPUJUkzZG6P4z5UVfuSvAPYkeTR7s6qqiR15Kf3m1oQrQV45zvfebQ/TpJmjZ6uDKpqX3vfD3ybzj3/Z9stHtr7/jZ8H7Coa/fBVjtcfXCC+kTz2FBVQ1U1NDAw0MvUJUk9mDIMkrw1ye8cbAPLgQeBrcDBFUHDwO2tvRVY3VYVLQVeaLeTtgPLk8xvD46XA9tb34tJlrZVRKu7jiVJmgG93CZaAHy7rfacC/xzVX0vyW5gS5I1wFPAxW38NuBCYBR4CbgMoKrGk1wN7G7jrqqq8da+ArgFOBG4o70kSTNkyjCoqieA901Qfw5YNkG9gCsnOdZGYOME9RHgzB7mK0k6CvwFsiTJMJAkGQaSJAwDSRKGgSQJw0CShGEgScIwkCRhGEiSMAwkSRgGkiQMA0kShoEkCcNAkoRhIEnCMJAkYRhIkjAMJEkYBpIkDANJEoaBJAnDQJLENMIgyZwk9yb5Tts+LcldSUaTfCPJCa3+5rY92voXdx3jc63+WJILuuorWm00ybojeH6SpB5M58rg08AjXdtfBK6rqncDB4A1rb4GONDq17VxJDkDuAR4L7AC+HILmDnAjcBK4Azg0jZWkjRDegqDJIPAR4GvtO0A5wO3tSGbgItae1XbpvUva+NXAbdW1S+r6ifAKHBue41W1RNV9TJwaxsrSZohvV4Z/APw18Cv2vbbgeer6pW2vRdY2NoLgT0Arf+FNv7/6ofsM1ldkjRDpgyDJH8I7K+qe2ZgPlPNZW2SkSQjY2Nj/Z6OJB03erky+CDwsSRP0rmFcz5wPTAvydw2ZhDY19r7gEUArf9twHPd9UP2maz+GlW1oaqGqmpoYGCgh6lLknoxZRhU1eeqarCqFtN5APyDqvpT4E7g423YMHB7a29t27T+H1RVtfolbbXRacAS4G5gN7CkrU46oX3G1iNydpKknsydesikPgvcmuQLwL3Aza1+M/DVJKPAOJ1/3Kmqh5JsAR4GXgGurKpXAZJ8EtgOzAE2VtVDb2BekqRpmlYYVNUPgR+29hN0VgIdOuYXwCcm2f8a4JoJ6tuAbdOZiyTpyPEXyJIkw0CSZBhIkjAMJEkYBpIkDANJEoaBJAnDQJKEYSBJwjCQJGEYSJIwDCRJGAaSJAwDSRKGgSQJw0CShGEgScIwkCTxxv4fyJL+H1u87rv9nsJx5clrP9rvKbwhXhlIkgwDSVIPYZDkLUnuTvKfSR5K8netflqSu5KMJvlGkhNa/c1te7T1L+461uda/bEkF3TVV7TaaJJ1R+E8JUmH0cuVwS+B86vqfcD7gRVJlgJfBK6rqncDB4A1bfwa4ECrX9fGkeQM4BLgvcAK4MtJ5iSZA9wIrATOAC5tYyVJM2TKMKiOn7XNN7VXAecDt7X6JuCi1l7Vtmn9y5Kk1W+tql9W1U+AUeDc9hqtqieq6mXg1jZWkjRDenpm0L7B3wfsB3YAPwaer6pX2pC9wMLWXgjsAWj9LwBv764fss9kdUnSDOkpDKrq1ap6PzBI55v86UdzUpNJsjbJSJKRsbGxfkxBko5L01pNVFXPA3cCHwDmJTn4O4VBYF9r7wMWAbT+twHPddcP2Wey+kSfv6GqhqpqaGBgYDpTlyQdRi+riQaSzGvtE4GPAI/QCYWPt2HDwO2tvbVt0/p/UFXV6pe01UanAUuAu4HdwJK2OukEOg+Ztx6Bc5Mk9aiXXyCfCmxqq35+C9hSVd9J8jBwa5IvAPcCN7fxNwNfTTIKjNP5x52qeijJFuBh4BXgyqp6FSDJJ4HtwBxgY1U9dMTOUJI0pSnDoKruB86aoP4EnecHh9Z/AXxikmNdA1wzQX0bsK2H+UqSjgJ/gSxJMgwkSYaBJAnDQJKEYSBJwjCQJGEYSJIwDCRJGAaSJAwDSRKGgSQJw0CShGEgScIwkCRhGEiSMAwkSRgGkiQMA0kShoEkCcNAkoRhIEnCMJAk0UMYJFmU5M4kDyd5KMmnW/3kJDuSPN7e57d6ktyQZDTJ/UnO7jrWcBv/eJLhrvo5SR5o+9yQJEfjZCVJE+vlyuAV4K+q6gxgKXBlkjOAdcDOqloC7GzbACuBJe21FrgJOuEBrAfOA84F1h8MkDbm8q79VrzxU5Mk9WrKMKiqp6vqR63938AjwEJgFbCpDdsEXNTaq4DN1bELmJfkVOACYEdVjVfVAWAHsKL1nVRVu6qqgM1dx5IkzYBpPTNIshg4C7gLWFBVT7euZ4AFrb0Q2NO1295WO1x97wR1SdIM6TkMkvw28C/AZ6rqxe6+9o2+jvDcJprD2iQjSUbGxsaO9sdJ0qzRUxgkeROdIPhaVX2rlZ9tt3ho7/tbfR+wqGv3wVY7XH1wgvprVNWGqhqqqqGBgYFepi5J6kEvq4kC3Aw8UlV/39W1FTi4ImgYuL2rvrqtKloKvNBuJ20HlieZ3x4cLwe2t74Xkyxtn7W661iSpBkwt4cxHwT+DHggyX2t9nngWmBLkjXAU8DFrW8bcCEwCrwEXAZQVeNJrgZ2t3FXVdV4a18B3AKcCNzRXpKkGTJlGFTVfwCTrftfNsH4Aq6c5FgbgY0T1EeAM6eaiyTp6PAXyJIkw0CSZBhIkjAMJEkYBpIkDANJEoaBJAnDQJKEYSBJwjCQJGEYSJIwDCRJGAaSJAwDSRKGgSQJw0CShGEgScIwkCRhGEiSMAwkSRgGkiQMA0kSPYRBko1J9id5sKt2cpIdSR5v7/NbPUluSDKa5P4kZ3ftM9zGP55kuKt+TpIH2j43JMmRPklJ0uH1cmVwC7DikNo6YGdVLQF2tm2AlcCS9loL3ASd8ADWA+cB5wLrDwZIG3N5136HfpYk6SibMgyq6t+A8UPKq4BNrb0JuKirvrk6dgHzkpwKXADsqKrxqjoA7ABWtL6TqmpXVRWwuetYkqQZ8nqfGSyoqqdb+xlgQWsvBPZ0jdvbaoer752gPqEka5OMJBkZGxt7nVOXJB3qDT9Abt/o6wjMpZfP2lBVQ1U1NDAwMBMfKUmzwusNg2fbLR7a+/5W3wcs6ho32GqHqw9OUJckzaDXGwZbgYMrgoaB27vqq9uqoqXAC+120nZgeZL57cHxcmB763sxydK2imh117EkSTNk7lQDknwd+APglCR76awKuhbYkmQN8BRwcRu+DbgQGAVeAi4DqKrxJFcDu9u4q6rq4EPpK+isWDoRuKO9JEkzaMowqKpLJ+laNsHYAq6c5DgbgY0T1EeAM6eahyTp6PEXyJIkw0CSZBhIkjAMJEkYBpIkDANJEoaBJAnDQJKEYSBJwjCQJGEYSJIwDCRJGAaSJAwDSRKGgSQJw0CShGEgScIwkCRhGEiSMAwkSRgGkiSOoTBIsiLJY0lGk6zr93wkaTY5JsIgyRzgRmAlcAZwaZIz+jsrSZo9jokwAM4FRqvqiap6GbgVWNXnOUnSrHGshMFCYE/X9t5WkyTNgLn9nsB0JFkLrG2bP0vyWD/ncxw5BfhpvycxlXyx3zNQn/j3eeT87mQdx0oY7AMWdW0PttpvqKoNwIaZmtRskWSkqob6PQ9pIv59zoxj5TbRbmBJktOSnABcAmzt85wkadY4Jq4MquqVJJ8EtgNzgI1V9VCfpyVJs8YxEQYAVbUN2NbvecxS3nrTscy/zxmQqur3HCRJfXasPDOQJPWRYSBJMgwkHduSfCjJjf2ex/HumHmArP5JcgrwXPkASceIJGcBfwJ8AvgJ8K3+zuj455XBLJNkaZIfJvlWkrOSPAg8CDybZEW/56fZK8nvJVmf5FHgS8B/0Vnk8uGq+lKfp3fcczXRLJNkBPg88DY6S/ZWVtWuJKcDX6+qs/o6Qc1aSX4F/DuwpqpGW+2JqnpXf2c2O3hlMPvMrarvV9U3gWeqahdAVT3a53lJfwQ8DdyZ5B+TLAPS5znNGobB7POrrvb/HNLnZaL6pqr+taouAU4H7gQ+A7wjyU1Jlvd1crOAt4lmmSSvAj+n843rROClg13AW6rqTf2am3SoJPPpPET+46pa1u/5HM8MA0mSt4kkSYaBJAnDQJKEYSBJwjCQJAH/C1QqdlpCKek/AAAAAElFTkSuQmCC\n", "text/plain": [ "
" ] }, "metadata": { "needs_background": "light" }, "output_type": "display_data" } ], "source": [ "import pandas\n", "pdf_categorical_feature = df.select('categoric_0').toPandas()\n", "\n", "# Distribution of Categorical feature\n", "pdf_categorical_feature['categoric_0'].value_counts().plot(kind='bar')" ] }, { "cell_type": "code", "execution_count": 14, "metadata": { "button": false, "collapsed": false, "deletable": true, "execution": { "iopub.execute_input": "2021-01-13T06:57:05.516211Z", "iopub.status.busy": "2021-01-13T06:57:05.515902Z", "iopub.status.idle": "2021-01-13T06:57:05.527352Z", "shell.execute_reply": "2021-01-13T06:57:05.526656Z", "shell.execute_reply.started": "2021-01-13T06:57:05.516175Z" }, "jupyter": { "outputs_hidden": false }, "new_sheet": false, "run_control": { "read_only": false } }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
categoric_0
0A
1B
2A
3B
4B
......
99995B
99996A
99997B
99998B
99999B
\n", "

100000 rows × 1 columns

\n", "
" ], "text/plain": [ " categoric_0\n", "0 A\n", "1 B\n", "2 A\n", "3 B\n", "4 B\n", "... ...\n", "99995 B\n", "99996 A\n", "99997 B\n", "99998 B\n", "99999 B\n", "\n", "[100000 rows x 1 columns]" ] }, "execution_count": 14, "metadata": {}, "output_type": "execute_result" } ], "source": [ "pdf_categorical_feature" ] }, { "cell_type": "code", "execution_count": 15, "metadata": { "button": false, "collapsed": false, "deletable": true, "execution": { "iopub.execute_input": "2021-01-13T06:57:11.579038Z", "iopub.status.busy": "2021-01-13T06:57:11.578735Z", "iopub.status.idle": "2021-01-13T06:57:14.161352Z", "shell.execute_reply": "2021-01-13T06:57:14.160456Z", "shell.execute_reply.started": "2021-01-13T06:57:11.579003Z" }, "jupyter": { "outputs_hidden": false }, "new_sheet": false, "run_control": { "read_only": false } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-----------+----------+\n", "|categoric_0|numeric_10|\n", "+-----------+----------+\n", "| A| 1.0|\n", "| B| 0.0|\n", "| A| 1.0|\n", "| B| 0.0|\n", "| B| 0.0|\n", "+-----------+----------+\n", "\n" ] } ], "source": [ "# Convert categorical feature into numerical feature\n", "from pyspark.ml.feature import StringIndexer\n", "\n", "indexer = StringIndexer().setInputCol('categoric_0').setOutputCol('numeric_10')\n", "indexer.fit(df).transform(df).limit(5).select('categoric_0', 'numeric_10').show()\n", "\n", "numerical_features = indexer.fit(df).transform(df).drop('categoric_0').na.fill(0)" ] }, { "cell_type": "markdown", "metadata": { "button": false, "deletable": true, "new_sheet": false, "run_control": { "read_only": false } }, "source": [ "NOTE: As we can see from the output, StringIndexer maps B in column categoric_0 to 1.0 in column numeric_10, and A to 0.0" ] }, { "cell_type": "markdown", "metadata": { "button": false, "deletable": true, "new_sheet": false, "run_control": { "read_only": false } }, "source": [ "#### Vectorization" ] }, { "cell_type": "markdown", "metadata": { "button": false, "deletable": true, "new_sheet": false, "run_control": { "read_only": false } }, "source": [ "We can then merge all the transformed values into a vector for further normalization and it can be processed by the learning algorithm easily. There is an API called VectorAssembler which can do this." ] }, { "cell_type": "code", "execution_count": 16, "metadata": { "button": false, "collapsed": false, "deletable": true, "execution": { "iopub.execute_input": "2021-01-13T06:58:08.620404Z", "iopub.status.busy": "2021-01-13T06:58:08.620086Z", "iopub.status.idle": "2021-01-13T06:58:08.626989Z", "shell.execute_reply": "2021-01-13T06:58:08.626170Z", "shell.execute_reply.started": "2021-01-13T06:58:08.620367Z" }, "jupyter": { "outputs_hidden": false }, "new_sheet": false, "run_control": { "read_only": false } }, "outputs": [ { "data": { "text/plain": [ "DataFrame[numeric_0: double, numeric_1: double, numeric_2: double, numeric_3: double, numeric_4: double, numeric_5: double, numeric_6: double, numeric_7: double, numeric_8: double, numeric_9: double, click: double, numeric_10: double]" ] }, "execution_count": 16, "metadata": {}, "output_type": "execute_result" } ], "source": [ "numerical_features" ] }, { "cell_type": "code", "execution_count": 17, "metadata": { "button": false, "deletable": true, "execution": { "iopub.execute_input": "2021-01-13T06:58:10.826895Z", "iopub.status.busy": "2021-01-13T06:58:10.826585Z", "iopub.status.idle": "2021-01-13T06:58:10.898897Z", "shell.execute_reply": "2021-01-13T06:58:10.898043Z", "shell.execute_reply.started": "2021-01-13T06:58:10.826858Z" }, "new_sheet": false, "run_control": { "read_only": false } }, "outputs": [], "source": [ "# Feature Vectorization\n", "from pyspark.ml.feature import VectorAssembler\n", "feature_cols = numerical_features.columns\n", "feature_cols.remove('click')\n", "\n", "assembler = VectorAssembler(inputCols=feature_cols, outputCol='features')\n", "feature_vector = assembler.transform(numerical_features).drop(*feature_cols).withColumnRenamed('click', 'label')" ] }, { "cell_type": "code", "execution_count": 19, "metadata": { "button": false, "collapsed": false, "deletable": true, "execution": { "iopub.execute_input": "2021-01-13T06:58:46.645392Z", "iopub.status.busy": "2021-01-13T06:58:46.645065Z", "iopub.status.idle": "2021-01-13T06:58:46.782792Z", "shell.execute_reply": "2021-01-13T06:58:46.781939Z", "shell.execute_reply.started": "2021-01-13T06:58:46.645355Z" }, "jupyter": { "outputs_hidden": false }, "new_sheet": false, "run_control": { "read_only": false } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-----+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+\n", "|label|features |\n", "+-----+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+\n", "|0.0 |[-0.7364417928926634,1.1537268112805235,0.46316657909752246,1.7794324881133858,0.35038251708418316,-1.2078422905234971,0.182557706990586,-0.28115411118907574,0.1794811259957898,-1.406603860191494,1.0] |\n", "|0.0 |[1.3494846050334237,-0.6572059487488587,-2.338727150900283,-0.6966792390268524,1.6356047759173402,-1.2537356749907436,0.057684896184203235,1.8656701226653227,0.7499573797384258,0.3306751398492047,0.0] |\n", "|1.0 |[0.610120301334282,1.3666264850339138,0.9410896976666331,1.1508546050085344,0.0056280906383625795,-1.8544898325192476,1.875104770438199,-0.23226179403790756,0.4589199344712018,1.0405810391616797,1.0] |\n", "|0.0 |[-0.3242868108863616,0.2637593042852379,-1.2368719808439692,0.2645598197228282,-0.4846847375836658,0.5152933279665263,0.3314800422258143,-2.511775551231611,1.1859658260888708,0.7150415153348041,0.0] |\n", "|0.0 |[-1.1996283342765224,0.6974332628034251,-0.914149295721909,-0.19639824963005964,-0.09587925493895781,-0.7430745644138209,0.9694158938254155,-0.11188608079122866,1.0698435464482654,0.6218661513443944,0.0]|\n", "+-----+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+\n", "only showing top 5 rows\n", "\n" ] } ], "source": [ "# Features in sparse vector format\n", "feature_vector.show(5, False)" ] }, { "cell_type": "markdown", "metadata": { "button": false, "deletable": true, "new_sheet": false, "run_control": { "read_only": false } }, "source": [ "#### Normalization" ] }, { "cell_type": "markdown", "metadata": { "button": false, "deletable": true, "new_sheet": false, "run_control": { "read_only": false } }, "source": [ "When dealing with numeric data, each column might have totally different range of the values. The normalization step can standardize the data and help following learning step. We will show how to perform normalization with StandardScaler from pyspark." ] }, { "cell_type": "code", "execution_count": 20, "metadata": { "button": false, "deletable": true, "execution": { "iopub.execute_input": "2021-01-13T06:58:50.706745Z", "iopub.status.busy": "2021-01-13T06:58:50.706398Z", "iopub.status.idle": "2021-01-13T06:58:51.590341Z", "shell.execute_reply": "2021-01-13T06:58:51.589455Z", "shell.execute_reply.started": "2021-01-13T06:58:50.706709Z" }, "new_sheet": false, "run_control": { "read_only": false } }, "outputs": [], "source": [ "# Feature vector normalization\n", "from pyspark.ml.feature import StandardScaler\n", "scalerizer = StandardScaler().setInputCol('features').setOutputCol('normalized_features')\n", "normalized_fv = scalerizer.fit(feature_vector).transform(feature_vector).drop('features').\\\n", " withColumnRenamed('normalized_features', 'features')" ] }, { "cell_type": "code", "execution_count": 21, "metadata": { "button": false, "collapsed": false, "deletable": true, "execution": { "iopub.execute_input": "2021-01-13T06:58:53.279126Z", "iopub.status.busy": "2021-01-13T06:58:53.278774Z", "iopub.status.idle": "2021-01-13T06:58:53.447221Z", "shell.execute_reply": "2021-01-13T06:58:53.446335Z", "shell.execute_reply.started": "2021-01-13T06:58:53.279088Z" }, "jupyter": { "outputs_hidden": false }, "new_sheet": false, "run_control": { "read_only": false } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-----+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+\n", "|label|features |\n", "+-----+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+\n", "|0.0 |[-0.7362297602360367,1.1544578505037622,0.46225025038035605,1.7779990368819572,0.34794922147788515,-1.2069284034249674,0.18266651921793614,-0.2800539360944794,0.17976939613089873,-1.4072350282685373,2.1396805206549403]|\n", "|0.0 |[1.3490960681407005,-0.6576223760361366,-2.334100213321664,-0.6961180175589041,1.6242460187852785,-1.252787063680037,0.0577192788577246,1.8583696290143212,0.7511619092619826,0.3308235196440079,0.0] |\n", "|1.0 |[0.609944638532955,1.3674924244870914,0.9392278458959771,1.1499275150729709,0.0055890053314353386,-1.8530866738903515,1.8762224133465688,-0.23135293789439942,0.459657019837589,1.041047966313014,2.1396805206549403] |\n", "|0.0 |[-0.3241934438413557,0.26392643084851214,-1.2344249534316845,0.26434669919035425,-0.48131875559285014,0.5149034426908725,0.3316776185555749,-2.5019468032435634,1.1878706421358196,0.7153623671332475,0.0] |\n", "|0.0 |[-1.1992829432556216,0.6978751794389021,-0.9123407428399639,-0.19624003777618867,-0.0952134038807498,-0.7425123335141486,0.9699937072980684,-0.11144826297310073,1.0715618549443204,0.6221451937059509,0.0] |\n", "+-----+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+\n", "only showing top 5 rows\n", "\n" ] } ], "source": [ "normalized_fv.show(5, False)" ] }, { "cell_type": "markdown", "metadata": { "button": false, "deletable": true, "new_sheet": false, "run_control": { "read_only": false } }, "source": [ "### Training Data Preparation" ] }, { "cell_type": "markdown", "metadata": { "button": false, "deletable": true, "new_sheet": false, "run_control": { "read_only": false } }, "source": [ "After feature engineering, you normally need to split the dataset to three non-overlapped sets for training, calibration and testing. You can either choose full scale training with all impressions or perform down-sampling strategy for training dataset to adjust the ratio of clicks vs non-clicks for more efficient training. You can also apply calibration to correct the overall bias after training. In this tutorial, given the size of dataset is small, we just do fair sampling on training/testing dataset and skip calibration step." ] }, { "cell_type": "code", "execution_count": 22, "metadata": { "button": false, "deletable": true, "execution": { "iopub.execute_input": "2021-01-13T07:02:10.287382Z", "iopub.status.busy": "2021-01-13T07:02:10.287065Z", "iopub.status.idle": "2021-01-13T07:02:10.306259Z", "shell.execute_reply": "2021-01-13T07:02:10.305435Z", "shell.execute_reply.started": "2021-01-13T07:02:10.287344Z" }, "new_sheet": false, "run_control": { "read_only": false } }, "outputs": [], "source": [ "# Split into training and testing dataset\n", "seed = 1234\n", "splits = normalized_fv.randomSplit([0.7, 0.3], seed)\n", "training_data = splits[0]\n", "testing_data = splits[1]" ] }, { "cell_type": "markdown", "metadata": { "button": false, "deletable": true, "new_sheet": false, "run_control": { "read_only": false } }, "source": [ "### Model Training" ] }, { "cell_type": "markdown", "metadata": { "button": false, "deletable": true, "new_sheet": false, "run_control": { "read_only": false } }, "source": [ "\"Model" ] }, { "cell_type": "markdown", "metadata": { "button": false, "deletable": true, "new_sheet": false, "run_control": { "read_only": false } }, "source": [ "Basic deep learning based CTR model follows the embedding & Multi-Layer-Perception (MLP) paradigm, where network layers are fed by an encoding layer. The encoding layer encodes the historical features (CTR & COEC, etc) and text features (e.g., Shopping Query), as well as the standard categorical features (e.g., Page Layout) using encoding techniques. Firstly, the large scale sparse feature inputs are mapped into low dimensional embedding vectors. Then the embedding vectors are transformed into fixed-length vectors via average pooling. Finally, the fixed-length feature vectors are concatenated together to fed into MLP. In this tutorial, we build a basic MLP model for CTR prediction via MultilayerPerceptronClassifier in Spark ML library. The raw feature set has 11 numerical features and 1 categorical feature with size of around 90K row, hence we only define one single hidden layer with 25 nodes. And the output dimensions of the MLP layers are 11, 25 and 2, where nodes in intermediate layers use sigmoid function and nodes in the output layer use softmax function." ] }, { "cell_type": "code", "execution_count": 23, "metadata": { "button": false, "deletable": true, "execution": { "iopub.execute_input": "2021-01-13T07:02:17.844071Z", "iopub.status.busy": "2021-01-13T07:02:17.843726Z", "iopub.status.idle": "2021-01-13T07:02:24.543936Z", "shell.execute_reply": "2021-01-13T07:02:24.543020Z", "shell.execute_reply.started": "2021-01-13T07:02:17.844034Z" }, "new_sheet": false, "run_control": { "read_only": false } }, "outputs": [], "source": [ "# MLP based CTR model\n", "from pyspark.ml.classification import MultilayerPerceptronClassifier\n", "\n", "layers = [11, 25, 2]\n", "trainer = MultilayerPerceptronClassifier(maxIter=10, layers=layers, blockSize=128, seed=seed)\n", "\n", "mlp_model = trainer.fit(training_data)\n", "\n", "result = mlp_model.transform(testing_data)" ] }, { "cell_type": "code", "execution_count": 25, "metadata": { "button": false, "collapsed": false, "deletable": true, "execution": { "iopub.execute_input": "2021-01-13T07:04:01.216714Z", "iopub.status.busy": "2021-01-13T07:04:01.216350Z", "iopub.status.idle": "2021-01-13T07:04:01.734420Z", "shell.execute_reply": "2021-01-13T07:04:01.733537Z", "shell.execute_reply.started": "2021-01-13T07:04:01.216674Z" }, "jupyter": { "outputs_hidden": false }, "new_sheet": false, "run_control": { "read_only": false } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-----+--------------------+--------------------+--------------------+----------+\n", "|label| features| rawPrediction| probability|prediction|\n", "+-----+--------------------+--------------------+--------------------+----------+\n", "| 0.0|[-4.8084804095445...|[2.41533275797333...|[0.99486591490757...| 0.0|\n", "| 0.0|[-4.4206894066630...|[3.79828745987186...|[0.99950563650799...| 0.0|\n", "| 0.0|[-4.2755452027141...|[3.39402550539004...|[0.99923093120135...| 0.0|\n", "| 0.0|[-3.6498486759062...|[3.36304212645181...|[0.99827470856746...| 0.0|\n", "| 0.0|[-3.4651340845658...|[1.21855065082685...|[0.94745955128086...| 0.0|\n", "| 0.0|[-3.4618340266007...|[3.30358497922916...|[0.99896201545287...| 0.0|\n", "| 0.0|[-3.4337473953628...|[2.1587826820064,...|[0.99277012251603...| 0.0|\n", "| 0.0|[-3.4073895913747...|[1.69691098614294...|[0.98731895525694...| 0.0|\n", "| 0.0|[-3.3998364836530...|[2.75175577104954...|[0.99789743752986...| 0.0|\n", "| 0.0|[-3.3761150797093...|[1.68050548577400...|[0.98407566936413...| 0.0|\n", "| 0.0|[-3.3620634109504...|[1.49977404408218...|[0.97385090695523...| 0.0|\n", "| 0.0|[-3.3587359695215...|[2.90819413203107...|[0.99884606091741...| 0.0|\n", "| 0.0|[-3.3428976472243...|[3.21124240300963...|[0.99872074710896...| 0.0|\n", "| 0.0|[-3.3182394402098...|[2.69503782224675...|[0.99760960658261...| 0.0|\n", "| 0.0|[-3.2526203137299...|[3.34745236431401...|[0.99948742700186...| 0.0|\n", "| 0.0|[-3.2273547058667...|[1.25103524467307...|[0.96463130492536...| 0.0|\n", "| 0.0|[-3.1866554035633...|[2.40446868656227...|[0.99760841669399...| 0.0|\n", "| 0.0|[-3.1637103104246...|[2.54717342291299...|[0.99570776881562...| 0.0|\n", "| 0.0|[-3.1631355802612...|[2.19228499336290...|[0.99371746747302...| 0.0|\n", "| 0.0|[-3.1610742973931...|[3.03738021121340...|[0.99787947055468...| 0.0|\n", "+-----+--------------------+--------------------+--------------------+----------+\n", "only showing top 20 rows\n", "\n" ] } ], "source": [ "result.show()" ] }, { "cell_type": "markdown", "metadata": { "button": false, "deletable": true, "new_sheet": false, "run_control": { "read_only": false } }, "source": [ "### Model Evaluation" ] }, { "cell_type": "markdown", "metadata": { "button": false, "deletable": true, "new_sheet": false, "run_control": { "read_only": false } }, "source": [ "We will use log loss and AUC to evaluate the quality of model. Log loss is a standard evaluation criterion for events such as click-through rate prediction. AUC is short for Area Under ROC curve, where the ROC curve shows the ratio of false positive rate (FPR) and true positive rate (TPR) when we liberalize the threshold to give positive prediction." ] }, { "cell_type": "code", "execution_count": 26, "metadata": { "button": false, "deletable": true, "execution": { "iopub.execute_input": "2021-01-13T07:04:37.048551Z", "iopub.status.busy": "2021-01-13T07:04:37.048209Z", "iopub.status.idle": "2021-01-13T07:04:37.366339Z", "shell.execute_reply": "2021-01-13T07:04:37.365455Z", "shell.execute_reply.started": "2021-01-13T07:04:37.048514Z" }, "new_sheet": false, "run_control": { "read_only": false } }, "outputs": [], "source": [ "from pyspark.sql.functions import udf, col\n", "from pyspark.sql.types import DoubleType\n", "\n", "get_probability_udf = udf(lambda l: float(l[1]), DoubleType())\n", "probability_label = result.select(\n", " get_probability_udf('probability').alias('probability'), 'label').withColumn('label', col('label').cast('double'))\n", "\n", "from pyspark.mllib.evaluation import BinaryClassificationMetrics\n", "\n", "metrics = BinaryClassificationMetrics(probability_label.rdd.map(tuple))" ] }, { "cell_type": "code", "execution_count": 28, "metadata": { "button": false, "collapsed": false, "deletable": true, "execution": { "iopub.execute_input": "2021-01-13T07:05:22.165946Z", "iopub.status.busy": "2021-01-13T07:05:22.165605Z", "iopub.status.idle": "2021-01-13T07:05:22.170761Z", "shell.execute_reply": "2021-01-13T07:05:22.170059Z", "shell.execute_reply.started": "2021-01-13T07:05:22.165909Z" }, "jupyter": { "outputs_hidden": false }, "new_sheet": false, "run_control": { "read_only": false } }, "outputs": [ { "data": { "text/plain": [ "" ] }, "execution_count": 28, "metadata": {}, "output_type": "execute_result" } ], "source": [ "metrics" ] }, { "cell_type": "markdown", "metadata": { "button": false, "deletable": true, "new_sheet": false, "run_control": { "read_only": false } }, "source": [ "#### Area Under ROC\n", "An ROC curve (receiver operating characteristic curve) is a graph showing the performance of a classification model at all classification thresholds. This curve plots two parameters: True Positive Rate (TPR), and False Positive Rate (FPR)." ] }, { "cell_type": "code", "execution_count": 34, "metadata": { "execution": { "iopub.execute_input": "2021-01-13T07:09:12.314943Z", "iopub.status.busy": "2021-01-13T07:09:12.314631Z", "iopub.status.idle": "2021-01-13T07:09:20.410099Z", "shell.execute_reply": "2021-01-13T07:09:20.409318Z", "shell.execute_reply.started": "2021-01-13T07:09:12.314907Z" } }, "outputs": [], "source": [ "import matplotlib.pyplot as plt\n", "\n", "rdd = getattr(metrics._java_model, 'roc')().toJavaRDD()\n", "\n", "points = []\n", "for row in rdd.collect():\n", " points += [(float(row._1()), float(row._2()))]" ] }, { "cell_type": "code", "execution_count": 35, "metadata": { "execution": { "iopub.execute_input": "2021-01-13T07:09:20.411681Z", "iopub.status.busy": "2021-01-13T07:09:20.411430Z", "iopub.status.idle": "2021-01-13T07:09:20.594578Z", "shell.execute_reply": "2021-01-13T07:09:20.593799Z", "shell.execute_reply.started": "2021-01-13T07:09:20.411648Z" } }, "outputs": [ { "data": { "text/plain": [ "[]" ] }, "execution_count": 35, "metadata": {}, "output_type": "execute_result" }, { "data": { "image/png": "\n", "text/plain": [ "
" ] }, "metadata": { "needs_background": "light" }, "output_type": "display_data" } ], "source": [ "plt.figure()\n", "x_val = [x[0] for x in points]\n", "y_val = [x[1] for x in points]\n", "plt.title('ROC')\n", "plt.xlabel('FPR')\n", "plt.xlabel('TPR')\n", "plt.plot(x_val, y_val)" ] }, { "cell_type": "code", "execution_count": 36, "metadata": { "button": false, "collapsed": false, "deletable": true, "execution": { "iopub.execute_input": "2021-01-13T07:09:24.974746Z", "iopub.status.busy": "2021-01-13T07:09:24.974444Z", "iopub.status.idle": "2021-01-13T07:09:25.057039Z", "shell.execute_reply": "2021-01-13T07:09:25.056254Z", "shell.execute_reply.started": "2021-01-13T07:09:24.974711Z" }, "jupyter": { "outputs_hidden": false }, "new_sheet": false, "run_control": { "read_only": false } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Area under ROC = 0.8894075220608592\n" ] } ], "source": [ "print(f'Area under ROC = %s' % metrics.areaUnderROC)" ] }, { "cell_type": "markdown", "metadata": { "button": false, "deletable": true, "new_sheet": false, "run_control": { "read_only": false } }, "source": [ "#### Log Loss\n", "Log Loss is the most important classification metric based on probabilities. For a given problem, a lower log-loss value means better predictions." ] }, { "cell_type": "code", "execution_count": 37, "metadata": { "button": false, "collapsed": false, "deletable": true, "execution": { "iopub.execute_input": "2021-01-13T07:10:08.371246Z", "iopub.status.busy": "2021-01-13T07:10:08.370929Z", "iopub.status.idle": "2021-01-13T07:10:10.232964Z", "shell.execute_reply": "2021-01-13T07:10:10.232040Z", "shell.execute_reply.started": "2021-01-13T07:10:08.371211Z" }, "jupyter": { "outputs_hidden": false }, "new_sheet": false, "run_control": { "read_only": false } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Log Loss = 0.2588519640114422\n" ] } ], "source": [ "from pyspark.sql.functions import when, log, mean\n", "\n", "log_loss_df = probability_label.select(when(probability_label['label'] == 1, -log(probability_label['probability'])).\\\n", " otherwise(-log(1 - probability_label['probability'])).alias('log_loss'))\n", "\n", "log_loss = log_loss_df.agg(mean('log_loss').alias('l_l')).collect()[0]['l_l']\n", "print(f'Log Loss = %s' % log_loss)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### CTR Distribution" ] }, { "cell_type": "code", "execution_count": 41, "metadata": { "execution": { "iopub.execute_input": "2021-01-13T07:12:52.401679Z", "iopub.status.busy": "2021-01-13T07:12:52.401364Z", "iopub.status.idle": "2021-01-13T07:12:54.134831Z", "shell.execute_reply": "2021-01-13T07:12:54.134032Z", "shell.execute_reply.started": "2021-01-13T07:12:52.401642Z" } }, "outputs": [ { "data": { "text/plain": [ "" ] }, "execution_count": 41, "metadata": {}, "output_type": "execute_result" }, { "data": { "image/png": "\n", "text/plain": [ "
" ] }, "metadata": { "needs_background": "light" }, "output_type": "display_data" } ], "source": [ "probabilities = probability_label.select('probability').toPandas()\n", "probabilities.plot.hist(bins=50, alpha=0.5, density=1)" ] }, { "cell_type": "markdown", "metadata": { "button": false, "deletable": true, "execution": { "iopub.execute_input": "2020-11-20T23:24:41.145095Z", "iopub.status.busy": "2020-11-20T23:24:41.144776Z", "iopub.status.idle": "2020-11-20T23:24:41.148415Z", "shell.execute_reply": "2020-11-20T23:24:41.147636Z", "shell.execute_reply.started": "2020-11-20T23:24:41.145059Z" }, "new_sheet": false, "run_control": { "read_only": false } }, "source": [ "### Now let's increase the executors to work with big data" ] }, { "cell_type": "markdown", "metadata": { "button": false, "deletable": true, "new_sheet": false, "run_control": { "read_only": false } }, "source": [ "
NOTE: After exploring on the small dataset a little bit, we are ready to go through the same feature engineering process again on the 1B dataset and prepare data for model training. And our solution works well on data with such size.
" ] }, { "cell_type": "code", "execution_count": 42, "metadata": { "button": false, "collapsed": false, "deletable": true, "execution": { "iopub.execute_input": "2021-01-13T07:14:28.497689Z", "iopub.status.busy": "2021-01-13T07:14:28.497382Z", "iopub.status.idle": "2021-01-13T07:14:46.463986Z", "shell.execute_reply": "2021-01-13T07:14:46.463109Z", "shell.execute_reply.started": "2021-01-13T07:14:28.497653Z" }, "jupyter": { "outputs_hidden": false }, "new_sheet": false, "run_control": { "read_only": false } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Starting spark session with num_executors: 20\n" ] } ], "source": [ "spark.stop()\n", "spark = init_spark_session(num_executors=20)" ] }, { "cell_type": "code", "execution_count": 44, "metadata": { "button": false, "collapsed": false, "deletable": true, "execution": { "iopub.execute_input": "2021-01-13T07:15:06.396208Z", "iopub.status.busy": "2021-01-13T07:15:06.395891Z", "iopub.status.idle": "2021-01-13T07:15:06.402652Z", "shell.execute_reply": "2021-01-13T07:15:06.401559Z", "shell.execute_reply.started": "2021-01-13T07:15:06.396171Z" }, "jupyter": { "outputs_hidden": false }, "new_sheet": false, "run_control": { "read_only": false } }, "outputs": [ { "data": { "text/html": [ "\n", "
\n", "

SparkSession - in-memory

\n", " \n", "
\n", "

SparkContext

\n", "\n", "

Spark UI

\n", "\n", "
\n", "
Version
\n", "
v2.4.4
\n", "
Master
\n", "
k8s://https://kubernetes.default.svc:443
\n", "
AppName
\n", "
spark-on-k8s
\n", "
\n", "
\n", " \n", "
\n", " " ], "text/plain": [ "" ] }, "execution_count": 44, "metadata": {}, "output_type": "execute_result" } ], "source": [ "spark" ] }, { "cell_type": "markdown", "metadata": { "button": false, "deletable": true, "new_sheet": false, "run_control": { "read_only": false } }, "source": [ "### Feature Engineering on 1B Data" ] }, { "cell_type": "code", "execution_count": 45, "metadata": { "button": false, "collapsed": false, "deletable": true, "execution": { "iopub.execute_input": "2021-01-13T07:15:11.354857Z", "iopub.status.busy": "2021-01-13T07:15:11.354539Z", "iopub.status.idle": "2021-01-13T07:17:58.134173Z", "shell.execute_reply": "2021-01-13T07:17:58.133313Z", "shell.execute_reply.started": "2021-01-13T07:15:11.354822Z" }, "jupyter": { "outputs_hidden": false }, "new_sheet": false, "run_control": { "read_only": false } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- numeric_0: double (nullable = true)\n", " |-- numeric_1: double (nullable = true)\n", " |-- numeric_2: double (nullable = true)\n", " |-- numeric_3: double (nullable = true)\n", " |-- numeric_4: double (nullable = true)\n", " |-- numeric_5: double (nullable = true)\n", " |-- numeric_6: double (nullable = true)\n", " |-- numeric_7: double (nullable = true)\n", " |-- numeric_8: double (nullable = true)\n", " |-- numeric_9: double (nullable = true)\n", " |-- categoric_0: string (nullable = true)\n", " |-- click: double (nullable = true)\n", "\n", "click: 132186312\n", "non-click: 867813688\n" ] } ], "source": [ "(num_rows, num_cols, num_partitions) = (1000000000, 10, 1000)\n", "df = get_synthetic_data(spark.sparkContext, num_rows, num_cols, num_partitions)\n", "df.printSchema()\n", "df.cache()\n", "\n", "print('click: {}'.format(df.filter(df.click == 1).count()))\n", "print('non-click: {}'.format(df.filter(df.click != 1).count()))\n", "\n", "# Convert categorical feature into numerical feature\n", "from pyspark.ml.feature import StringIndexer\n", "indexer = StringIndexer().setInputCol('categoric_0').setOutputCol('numeric_10')\n", "numerical_features = indexer.fit(df).transform(df).drop('categoric_0').na.fill(0)\n", "\n", "# Feature Vectorization\n", "from pyspark.ml.feature import VectorAssembler\n", "feature_cols = numerical_features.columns\n", "feature_cols.remove('click')\n", "\n", "assembler = VectorAssembler(inputCols=feature_cols, outputCol='features')\n", "feature_vector = assembler.transform(numerical_features).drop(*feature_cols).withColumnRenamed('click', 'label')\n", "\n", "# Feature vector normalization\n", "from pyspark.ml.feature import StandardScaler\n", "scalerizer = StandardScaler().setInputCol('features').setOutputCol('normalized_features')\n", "normalized_fv = scalerizer.fit(feature_vector).transform(feature_vector).drop('features').withColumnRenamed('normalized_features', 'features')" ] }, { "cell_type": "code", "execution_count": 46, "metadata": { "button": false, "execution": { "iopub.execute_input": "2021-01-13T07:19:58.067477Z", "iopub.status.busy": "2021-01-13T07:19:58.067159Z", "iopub.status.idle": "2021-01-13T07:19:58.078116Z", "shell.execute_reply": "2021-01-13T07:19:58.077335Z", "shell.execute_reply.started": "2021-01-13T07:19:58.067441Z" }, "new_sheet": false, "run_control": { "read_only": false } }, "outputs": [], "source": [ "# Split into training and testing dataset\n", "seed = 1234\n", "splits = normalized_fv.randomSplit([0.7, 0.3], seed)\n", "training_data = splits[0]\n", "testing_data = splits[1]" ] }, { "cell_type": "markdown", "metadata": { "button": false, "new_sheet": false, "run_control": { "read_only": false } }, "source": [ "### Model Training" ] }, { "cell_type": "code", "execution_count": 47, "metadata": { "button": false, "execution": { "iopub.execute_input": "2021-01-13T07:20:00.518905Z", "iopub.status.busy": "2021-01-13T07:20:00.518599Z", "iopub.status.idle": "2021-01-13T07:27:35.272153Z", "shell.execute_reply": "2021-01-13T07:27:35.271296Z", "shell.execute_reply.started": "2021-01-13T07:20:00.518869Z" }, "new_sheet": false, "run_control": { "read_only": false } }, "outputs": [], "source": [ "# MLP based CTR model\n", "from pyspark.ml.classification import MultilayerPerceptronClassifier\n", "\n", "layers = [11, 25, 2]\n", "trainer = MultilayerPerceptronClassifier(maxIter=10, layers=layers, blockSize=128, seed=seed)\n", "\n", "mlp_model = trainer.fit(training_data)\n", "\n", "result = mlp_model.transform(testing_data)" ] }, { "cell_type": "code", "execution_count": 48, "metadata": { "button": false, "collapsed": false, "execution": { "iopub.execute_input": "2021-01-13T07:27:35.273830Z", "iopub.status.busy": "2021-01-13T07:27:35.273563Z", "iopub.status.idle": "2021-01-13T07:27:39.269324Z", "shell.execute_reply": "2021-01-13T07:27:39.268456Z", "shell.execute_reply.started": "2021-01-13T07:27:35.273794Z" }, "jupyter": { "outputs_hidden": false }, "new_sheet": false, "run_control": { "read_only": false } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-----+--------------------+--------------------+--------------------+----------+\n", "|label| features| rawPrediction| probability|prediction|\n", "+-----+--------------------+--------------------+--------------------+----------+\n", "| 0.0|[-5.2054159344268...|[3.90495310895609...|[0.99942834311940...| 0.0|\n", "| 0.0|[-4.8098127287259...|[2.78039617778141...|[0.99734271399049...| 0.0|\n", "| 0.0|[-4.7243289547889...|[2.95249354799626...|[0.99764477084954...| 0.0|\n", "| 0.0|[-4.2566361281857...|[1.96090998483072...|[0.98984829677926...| 0.0|\n", "| 0.0|[-4.0726733061780...|[3.82729458190679...|[0.99979734687489...| 0.0|\n", "| 0.0|[-4.0575591987422...|[3.17354130022613...|[0.99889681137230...| 0.0|\n", "| 0.0|[-4.0254455078853...|[3.3412912065155,...|[0.99950807331617...| 0.0|\n", "| 0.0|[-4.0202463359206...|[1.55861504449304...|[0.98948487537028...| 0.0|\n", "| 0.0|[-3.9806532284056...|[2.73937382792825...|[0.99853558976682...| 0.0|\n", "| 0.0|[-3.9567036409879...|[3.04429301465895...|[0.99904148286796...| 0.0|\n", "| 0.0|[-3.9286819734839...|[1.69954997438725...|[0.98964060304855...| 0.0|\n", "| 0.0|[-3.9239296013819...|[2.51074982510198...|[0.99631774122402...| 0.0|\n", "| 0.0|[-3.9207883329654...|[1.67884814435537...|[0.98613506980551...| 0.0|\n", "| 0.0|[-3.9042730735632...|[2.69458270134423...|[0.99826754198763...| 0.0|\n", "| 0.0|[-3.8528839522069...|[2.74368220743337...|[0.99619528684143...| 0.0|\n", "| 0.0|[-3.8388790402551...|[3.14866033591390...|[0.99893012911406...| 0.0|\n", "| 0.0|[-3.8087141108052...|[1.95419675132077...|[0.98062178655329...| 0.0|\n", "| 0.0|[-3.7942313414529...|[3.11759708884587...|[0.99859745800453...| 0.0|\n", "| 0.0|[-3.7934139764804...|[2.32874916693439...|[0.99352683103870...| 0.0|\n", "| 0.0|[-3.7826897314031...|[2.12890055704172...|[0.98980428152276...| 0.0|\n", "+-----+--------------------+--------------------+--------------------+----------+\n", "only showing top 20 rows\n", "\n" ] } ], "source": [ "result.show()" ] }, { "cell_type": "markdown", "metadata": { "button": false, "new_sheet": false, "run_control": { "read_only": false } }, "source": [ "### Model Evaluation" ] }, { "cell_type": "code", "execution_count": 49, "metadata": { "button": false, "execution": { "iopub.execute_input": "2021-01-13T07:27:39.271201Z", "iopub.status.busy": "2021-01-13T07:27:39.270947Z", "iopub.status.idle": "2021-01-13T07:27:39.415700Z", "shell.execute_reply": "2021-01-13T07:27:39.414800Z", "shell.execute_reply.started": "2021-01-13T07:27:39.271166Z" }, "new_sheet": false, "run_control": { "read_only": false } }, "outputs": [], "source": [ "from pyspark.sql.functions import udf, col\n", "from pyspark.sql.types import DoubleType\n", "\n", "get_probability_udf = udf(lambda l: float(l[1]), DoubleType())\n", "probability_label = result.select(\n", " get_probability_udf('probability').alias('probability'), 'label').withColumn('label', col('label').cast('double'))\n", "\n", "from pyspark.mllib.evaluation import BinaryClassificationMetrics\n", "\n", "metrics = BinaryClassificationMetrics(probability_label.rdd.map(tuple))" ] }, { "cell_type": "code", "execution_count": 50, "metadata": { "button": false, "collapsed": false, "execution": { "iopub.execute_input": "2021-01-13T07:27:39.417316Z", "iopub.status.busy": "2021-01-13T07:27:39.417062Z", "iopub.status.idle": "2021-01-13T07:27:39.422064Z", "shell.execute_reply": "2021-01-13T07:27:39.421364Z", "shell.execute_reply.started": "2021-01-13T07:27:39.417283Z" }, "jupyter": { "outputs_hidden": false }, "new_sheet": false, "run_control": { "read_only": false } }, "outputs": [ { "data": { "text/plain": [ "" ] }, "execution_count": 50, "metadata": {}, "output_type": "execute_result" } ], "source": [ "metrics" ] }, { "cell_type": "markdown", "metadata": { "button": false, "new_sheet": false, "run_control": { "read_only": false } }, "source": [ "#### Area Under ROC\n", "An ROC curve (receiver operating characteristic curve) is a graph showing the performance of a classification model at all classification thresholds. This curve plots two parameters: True Positive Rate (TPR), and False Positive Rate (FPR)." ] }, { "cell_type": "code", "execution_count": 51, "metadata": { "button": false, "collapsed": false, "execution": { "iopub.execute_input": "2021-01-13T07:27:39.423360Z", "iopub.status.busy": "2021-01-13T07:27:39.423094Z", "iopub.status.idle": "2021-01-13T07:31:09.267444Z", "shell.execute_reply": "2021-01-13T07:31:09.266612Z", "shell.execute_reply.started": "2021-01-13T07:27:39.423328Z" }, "jupyter": { "outputs_hidden": false }, "new_sheet": false, "run_control": { "read_only": false } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Area under ROC = 0.891779317357863\n" ] } ], "source": [ "print(f'Area under ROC = %s' % metrics.areaUnderROC)" ] }, { "cell_type": "markdown", "metadata": { "button": false, "new_sheet": false, "run_control": { "read_only": false } }, "source": [ "#### Log Loss\n", "Log Loss is the most important classification metric based on probabilities. For a given problem, a lower log-loss value means better predictions." ] }, { "cell_type": "code", "execution_count": 52, "metadata": { "button": false, "collapsed": false, "execution": { "iopub.execute_input": "2021-01-13T07:31:09.268926Z", "iopub.status.busy": "2021-01-13T07:31:09.268670Z", "iopub.status.idle": "2021-01-13T07:33:47.941770Z", "shell.execute_reply": "2021-01-13T07:33:47.940919Z", "shell.execute_reply.started": "2021-01-13T07:31:09.268892Z" }, "jupyter": { "outputs_hidden": false }, "new_sheet": false, "run_control": { "read_only": false } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Log Loss = 0.2587065412441612\n" ] } ], "source": [ "from pyspark.sql.functions import when, log, mean\n", "\n", "log_loss_df = probability_label.select(when(probability_label['label'] == 1, -log(probability_label['probability'])).\\\n", " otherwise(-log(1 - probability_label['probability'])).alias('log_loss'))\n", "\n", "log_loss = log_loss_df.agg(mean('log_loss').alias('l_l')).collect()[0]['l_l']\n", "print(f'Log Loss = %s' % log_loss)" ] }, { "cell_type": "markdown", "metadata": { "button": false, "deletable": true, "new_sheet": false, "run_control": { "read_only": false } }, "source": [ "### Cleanup Resource" ] }, { "cell_type": "markdown", "metadata": { "button": false, "deletable": true, "new_sheet": false, "run_control": { "read_only": false } }, "source": [ "After this tutorial, you can release Spark executors and give resource back to Kubernetes by just calling Spark's stop API." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "button": false, "deletable": true, "new_sheet": false, "run_control": { "read_only": false } }, "outputs": [], "source": [ "spark.stop()" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.9" } }, "nbformat": 4, "nbformat_minor": 4 }