{ "cells": [ { "cell_type": "markdown", "id": "f83cb1ee", "metadata": {}, "source": [ "This notebook is build to run with `PySpark(SparkMagic)` kernel." ] }, { "cell_type": "markdown", "id": "18be2c43", "metadata": {}, "source": [ "# Data Preperation and Feature Engineering" ] }, { "cell_type": "markdown", "id": "330edb16", "metadata": {}, "source": [ "## Configuration and Setup\n", "### Connect to cluster in Amazon SageMaker Studio\n", "This notebook is designed to be run in SageMaker Studio. Please refer to https://docs.aws.amazon.com/sagemaker/latest/dg/studio-notebooks-emr-cluster-connect.html for general instruction on how to connect the notebook to a running EMR cluster. \n", "\n", "After following the steps, it will generate an active cell to connect to Amazon EMR cluster. The cell's content look like this:\n", "\n", "``\n", "%load_ext sagemaker_studio_analytics_extension.magics\n", "%sm_analytics emr connect --cluster-id j-XXXXXXXXXXX-auth-type None \n", "``\n", "\n", "Running the cell sets up the connection to EMR. You can reuse this code as long as you still use the same cluster.\n", "\n", "If you create a new cluster, you should remove this cell and connect the notebook with the cluster again. It will generate a new cell with new connection code.\n" ] }, { "cell_type": "markdown", "id": "70cb2a38", "metadata": {}, "source": [ "In the following we enable virtualenv in the Spark session to be later able to install packages to the already running EMR cluster. See also https://aws.amazon.com/blogs/big-data/install-python-libraries-on-a-running-cluster-with-emr-notebooks/." ] }, { "cell_type": "code", "execution_count": null, "id": "49403cd7", "metadata": {}, "outputs": [], "source": [ "%%configure -f\n", "{\n", " \"conf\": {\n", " \"spark.jars.packages\":\"ml.combust.mleap:mleap-spark_2.12:0.19.0,ml.combust.mleap:mleap-spark-base_2.12:0.19.0,ml.combust.mleap:mleap-spark_2.12:0.19.0\",\n", " \"spark.jars.excludes\":\"net.sourceforge.f2j:arpack_combined_all\",\n", " \"spark.pyspark.python\": \"python3\",\n", " \"spark.pyspark.virtualenv.enabled\": \"true\",\n", " \"spark.pyspark.virtualenv.type\":\"native\",\n", " \"spark.pyspark.virtualenv.bin.path\":\"/usr/bin/virtualenv\"\n", " }\n", "}\n" ] }, { "cell_type": "markdown", "id": "f1ea7186", "metadata": {}, "source": [ "### Debug Configuration\n", "This notebook contains code which is useful to better understand the actual flow or debug the code if you apply your own changes. Some of those addtional steps have a lengthy runtime. Hence they are guarded with a flag:" ] }, { "cell_type": "code", "execution_count": null, "id": "85b0c397", "metadata": {}, "outputs": [], "source": [ "DEBUG = False # set to True to have detailed output" ] }, { "cell_type": "markdown", "id": "362ff130", "metadata": {}, "source": [ "### Set input and output pathes" ] }, { "cell_type": "code", "execution_count": null, "id": "a12945f3", "metadata": {}, "outputs": [], "source": [ "import os\n", "import boto3\n", "import json\n", "import subprocess\n", "\n", "emr_instance_info = subprocess.run([\"curl\",\"-s\",\"169.254.169.254/latest/dynamic/instance-identity/document\"], \\\n", " capture_output=True).stdout.decode('UTF-8')\n", "emr_region = json.loads(emr_instance_info)[\"region\"]\n", "\n", "session = boto3.Session(region_name=emr_region) \n", "ssm = session.client('ssm')\n", "s3_client = boto3.client(\"s3\")\n", "\n", "bucket = ssm.get_parameter(Name=\"/aik/data-bucket\")[\"Parameter\"][\"Value\"]\n", "\n", "bid_source = ssm.get_parameter(Name=\"/aik/bid_source\")[\"Parameter\"][\"Value\"]\n", "imp_source = ssm.get_parameter(Name=\"/aik/imp_source\")[\"Parameter\"][\"Value\"]\n", "\n", "output_train = ssm.get_parameter(Name=\"/aik/output_train\")[\"Parameter\"][\"Value\"]\n", "output_test = ssm.get_parameter(Name=\"/aik/output_test\")[\"Parameter\"][\"Value\"]\n", "output_verify = ssm.get_parameter(Name=\"/aik/output_verify\")[\"Parameter\"][\"Value\"] \n", "output_transformed = ssm.get_parameter(Name=\"/aik/output_verify\")[\"Parameter\"][\"Value\"] \n", "inference_data = ssm.get_parameter(Name=\"/aik/inference_data\")[\"Parameter\"][\"Value\"] \n", "pipelineModelArtifactPath = ssm.get_parameter(Name=\"/aik/pipelineModelArtifactPath\")[\"Parameter\"][\"Value\"] " ] }, { "cell_type": "code", "execution_count": null, "id": "7e400dec", "metadata": {}, "outputs": [], "source": [ "print(f'bucket={bucket}')\n", "print(f'bid_source={bid_source}')\n", "print(f'imp_source={imp_source}')\n", "print(f'output_train={output_train}')\n", "print(f'output_verify={output_verify}')\n", "print(f'output_test={output_test}')\n", "print(f'inference_data={inference_data}')\n", "print(f'pipelineModelArtifactPath={pipelineModelArtifactPath}')" ] }, { "cell_type": "markdown", "id": "d32f642f", "metadata": {}, "source": [ "## Prepare Bidding Data" ] }, { "cell_type": "markdown", "id": "7ceba5fc", "metadata": {}, "source": [ "### Schema" ] }, { "cell_type": "code", "execution_count": null, "id": "00f391e3", "metadata": {}, "outputs": [], "source": [ "bid_columns = [\n", " \"BidID\", \n", " \"Timestamp\", \n", " \"iPinYouID\",\n", " \"UserAgent\",\n", " \"IP\", \n", " \"RegionID\", \n", " \"CityID\", \n", " \"AdExchange\",\n", " \"Domain\", \n", " \"URL\",\n", " \"AnonymousURL\",\n", " \"AdSlotID\",\n", " \"AdSlotWidth\",\n", " \"AdSlotHeight\",\n", " \"AdSlotVisibility\",\n", " \"AdSlotFormat\",\n", " \"AdSlotFloorPrice\",\n", " \"CreativeID\",\n", " \"BiddingPrice\",\n", " \"AdvertiserID\", # V\n", " \"UserProfileIDs\"\n", "]\n", "\n", "bid_schema = \"\"\n", "for col in bid_columns:\n", " if bid_schema != \"\":\n", " bid_schema += \", \"\n", " if col == 'Region ID':\n", " bid_schema += f\"`{col}` long\"\n", " elif col == 'City ID':\n", " bid_schema += f\"`{col}` long\"\n", " else:\n", " bid_schema += f\"`{col}` string\"" ] }, { "cell_type": "markdown", "id": "29ef4e7d", "metadata": {}, "source": [ "### Read data" ] }, { "cell_type": "code", "execution_count": null, "id": "8620f950", "metadata": {}, "outputs": [], "source": [ "print(bid_source)\n", "bid_df = spark.read.option(\"delimiter\", \"\\t\").format(\"csv\").load(\n", " bid_source,\n", " inferSchema=False,\n", " header=False,\n", " schema=bid_schema)\n", "\n", "bid_df.show(2)\n" ] }, { "cell_type": "code", "execution_count": null, "id": "5d827969", "metadata": {}, "outputs": [], "source": [ "if DEBUG:\n", " bid_df.select(\"AdvertiserID\").show(3)" ] }, { "cell_type": "markdown", "id": "8b9e58c0", "metadata": {}, "source": [ "### Transform dataframe\n", "We are only going to use a subset of the available data. Also we are not going to work with a timestamp, but want to break it down into day of the week and hour of the day as this will provide better insights into the underlying patterns. Therefore we transform the data frame now." ] }, { "cell_type": "code", "execution_count": null, "id": "eef91a1d", "metadata": {}, "outputs": [], "source": [ "bid_df.createOrReplaceTempView(\"bid_table\")\n", "df1 = spark.sql(\"SELECT \\\n", " BidID, dayofweek(concat(substring(Timestamp, 1, 4), '-', substring(Timestamp, 5, 2), '-', substring(Timestamp, 7, 2))) AS dow, \\\n", " substring(Timestamp, 9, 2) AS hour, RegionID, CityID, Domain, AdvertiserID \\\n", " FROM bid_table GROUP BY BidID, Timestamp, RegionID, CityID, Domain, AdvertiserID\")\n", "df1.show(3)" ] }, { "cell_type": "markdown", "id": "57e8ee59", "metadata": {}, "source": [ "## Prepare Impression Data" ] }, { "cell_type": "markdown", "id": "d8ffa256", "metadata": {}, "source": [ "### Schema" ] }, { "cell_type": "code", "execution_count": null, "id": "972a42e2", "metadata": {}, "outputs": [], "source": [ "imp_columns = [\"BidID\", # V\n", "\"Timestamp\",\n", "\"LogType\", \n", "\"iPinYouID\",\n", "\"UserAgent\",\n", "\"IP\",\n", "\"RegionID\", \n", "\"CityID\", \n", "\"AdExchange\",\n", "\"Domain\",\n", "\"URL\",\n", "\"AnonymousURL\",\n", "\"AdSlotID\",\n", "\"AdSlotWidth\",\n", "\"AdSlotHeight\",\n", "\"AdSlotVisibility\",\n", "\"AdSlotFormat\",\n", "\"AdSlotFloorPrice\",\n", "\"CreativeID\",\n", "\"BiddingPrice\",\n", "\"PayingPrice\",\n", "\"LandingPageURL\",\n", "\"AdvertiserID\",\n", "\"UserProfileIDs\"]\n", "\n", "imp_schema = \"\"\n", "for col in imp_columns:\n", " if imp_schema != \"\":\n", " imp_schema += \", \"\n", " if col == 'BiddingPrice':\n", " imp_schema += f\"`{col}` long\"\n", " elif col == 'PayingPrice':\n", " imp_schema += f\"`{col}` long\"\n", " else:\n", " imp_schema += f\"`{col}` string\"" ] }, { "cell_type": "markdown", "id": "fb54680f", "metadata": {}, "source": [ "### Read Data" ] }, { "cell_type": "code", "execution_count": null, "id": "b1e00b16", "metadata": {}, "outputs": [], "source": [ "imp_df = spark.read.option(\"delimiter\", \"\\t\").format(\"csv\").load(\n", " imp_source,\n", " inferSchema=False,\n", " header=False,\n", " schema=imp_schema)\n", "\n", "imp_df.show(2)" ] }, { "cell_type": "markdown", "id": "737a6f2e", "metadata": {}, "source": [ "### Transform DataFrame\n", "\n", "Again we are transforming the data so that we can use it easier. At this time we only drop columns we are not going to use further." ] }, { "cell_type": "code", "execution_count": null, "id": "61482257", "metadata": {}, "outputs": [], "source": [ "imp_df.createOrReplaceTempView(\"imp_table\")\n", "df2 = spark.sql(\"SELECT \\\n", " BidID, BiddingPrice, PayingPrice, UserAgent \\\n", " FROM imp_table GROUP BY BidID, BiddingPrice, PayingPrice, UserAgent\")\n", "\n", "df2.show(3)" ] }, { "cell_type": "markdown", "id": "37be231e", "metadata": {}, "source": [ "## Merge Data Frames\n", "\n", "So far we have been working with the bidding and impression data in individual data frames. Now we are going to join them into one single data frame." ] }, { "cell_type": "markdown", "id": "144bce85", "metadata": {}, "source": [ "Let's have a look into the schema of the bidding data:" ] }, { "cell_type": "code", "execution_count": null, "id": "3b893ff7", "metadata": {}, "outputs": [], "source": [ "if DEBUG:\n", " df1.show(5)\n", " df1.printSchema()" ] }, { "cell_type": "markdown", "id": "cd92bdb1", "metadata": {}, "source": [ "Let's have a look into the schema of the impression data:" ] }, { "cell_type": "code", "execution_count": null, "id": "e04e52f2", "metadata": {}, "outputs": [], "source": [ "if DEBUG:\n", " df2.show(5)\n", " df2.printSchema()" ] }, { "cell_type": "markdown", "id": "925cbdb4", "metadata": {}, "source": [ "We can see that both dataframes have a common column, which is the BidID. Therefore we are going to use it to joing the dataframes. Later we will use the impression data to identify successful bids. This is an assumption as in theory bidrequests could have been successful but not lead to an impression. However we have no data to drill this down. You might want to keep this in mind as your own data might cover this better." ] }, { "cell_type": "code", "execution_count": null, "id": "82b8b99f", "metadata": {}, "outputs": [], "source": [ "df = df1.join(df2,\"BidID\" ,\"left\")" ] }, { "cell_type": "markdown", "id": "61654bc6", "metadata": {}, "source": [ "Let's have a look into the resulting dataframe: " ] }, { "cell_type": "code", "execution_count": null, "id": "a084e324", "metadata": {}, "outputs": [], "source": [ "if DEBUG:df.show(5)\n" ] }, { "cell_type": "markdown", "id": "3225fe78", "metadata": {}, "source": [ "## Encode Features\n", "\n", "In the following we will encode our features so that we can use them for the model training. Here we are using [mleap](https://combust.github.io/mleap-docs/) to searialize feature transformation logic. We are exporting the features transformation Pipeline to a MLeap bundle to run the required encoding steps. We will need the pipeline to apply the same feature encoding for the inference part of the solution.\n", "\n", "We also have some more complex steps to be executed. Those we will not implement in the same approach, but use a dedicated library for them. This will be explained later in the notebook." ] }, { "cell_type": "code", "execution_count": null, "id": "d27c42c8", "metadata": {}, "outputs": [], "source": [ "from pyspark.ml import Pipeline\n", "from pyspark.ml.feature import StringIndexer\n", "\n", "categoricalCols = [\"AdvertiserID\", \"Domain\", \"RegionID\", \"CityID\"]\n", "\n", "stringindexers = [StringIndexer(\n", " inputCol=col,\n", " outputCol=\"Index\" + col) for col in categoricalCols]\n", "\n", "pipeline = Pipeline(stages = stringindexers)" ] }, { "cell_type": "code", "execution_count": null, "id": "f5fd33e5", "metadata": {}, "outputs": [], "source": [ "for indexer in stringindexers:\n", " indexer.setHandleInvalid(\"keep\")" ] }, { "cell_type": "code", "execution_count": null, "id": "53edef77", "metadata": {}, "outputs": [], "source": [ "df.printSchema()" ] }, { "cell_type": "code", "execution_count": null, "id": "a71c1c34", "metadata": {}, "outputs": [], "source": [ "pipelineModel = pipeline.fit(df)\n", "\n", "\n", "df.write.parquet(output_transformed, mode=\"overwrite\")\n", "df.write.json(inference_data, mode=\"overwrite\")\n", "#transform the dataframe\n", "\n", "df = pipelineModel.transform(df)\n", "df.printSchema()\n", "df.show(3)\n" ] }, { "cell_type": "markdown", "id": "b857e3c2", "metadata": {}, "source": [ "Finally we store the Pipeline Model locally." ] }, { "cell_type": "code", "execution_count": null, "id": "1fbff2a8", "metadata": {}, "outputs": [], "source": [ "import mleap.pyspark\n", "\n", "from mleap.pyspark.spark_support import SimpleSparkSerializer\n", "pipelineModel.serializeToBundle(\"jar:file:/tmp/pipelineModel.zip\", df)" ] }, { "cell_type": "markdown", "id": "28c4eda1", "metadata": {}, "source": [ "Upload pipeline model to the S3 bucket" ] }, { "cell_type": "code", "execution_count": null, "id": "39759410", "metadata": {}, "outputs": [], "source": [ "from urllib.parse import urlparse\n", "\n", "parsed_pipeline_model_path = urlparse(pipelineModelArtifactPath, allow_fragments=False)\n", "s3_client.upload_file(\"/tmp/pipelineModel.zip\", parsed_pipeline_model_path.netloc, parsed_pipeline_model_path.path.lstrip('/'))\n" ] }, { "cell_type": "markdown", "id": "071e1d3f", "metadata": {}, "source": [ "## Engineer features from useragent" ] }, { "cell_type": "markdown", "id": "d07406b7", "metadata": {}, "source": [ "Now as we have engineered the main features, we are taking on to understand what features we can derive from the useragent field we have in the raw data. Actually it turns out that the type of the device a user is using is important to determine if a bid request is likely to be successful. Actually we can derive the device type from the user agent information. Instead of building this on our own, we are using one of the multiple libraries which are actually delevering this functionality. For the purpose of this kit, we have choosen Project https://github.com/woothee/woothee. One of the drivers is the availabilty for different programming langugages, including python, which we are using for the model training and java, which we are using for the inference." ] }, { "cell_type": "code", "execution_count": null, "id": "60c43145", "metadata": {}, "outputs": [], "source": [ "import woothee\n", "import numpy as np\n", "from random import random\n", "\n", "def parse_ua_to_device_type(user_agent_str):\n", " ua = woothee.parse(user_agent_str)\n", " category = ua['category']\n", " if category =='smartphone':\n", " return 0\n", " if category ==\"mobilephone\":\n", " return 1\n", " if category ==\"appliance\":\n", " return 2\n", " if category ==\"pc\":\n", " return 3\n", " if category ==\"crawler\":\n", " return 4\n", " if category ==\"misc\":\n", " return 5\n", " return int(random()*10)%6 # missing value imputed with random device_type_id" ] }, { "cell_type": "code", "execution_count": null, "id": "1265d87a", "metadata": {}, "outputs": [], "source": [ "from pyspark.sql.functions import udf\n", "from pyspark.sql.types import StructType, StructField, IntegerType, FloatType, StringType\n", "from pyspark.sql.functions import when\n", "from pyspark.sql.functions import col" ] }, { "cell_type": "code", "execution_count": null, "id": "e9a666b0", "metadata": {}, "outputs": [], "source": [ "cols_parse_ua_to_device_type = udf(parse_ua_to_device_type, IntegerType())" ] }, { "cell_type": "code", "execution_count": null, "id": "626cae4d", "metadata": {}, "outputs": [], "source": [ "df_new = df.select([\"dow\", \"hour\", \"UserAgent\", \"PayingPrice\", \"IndexAdvertiserID\", \"IndexDomain\", \"IndexRegionID\", \"IndexCityID\"])\\\n", " .withColumn('UserAgent', when(col('UserAgent').isNull(), \"\").otherwise(col('UserAgent')))\\\n", " .withColumn(\"hour\", df[\"hour\"].cast(IntegerType()))\\\n", " .withColumn(\"device_type_id\", cols_parse_ua_to_device_type(\"UserAgent\"))\\\n", " .withColumn('label', when(col('PayingPrice').isNull(), 0).otherwise(1))\\\n", " .drop(\"UserAgent\").drop(\"PayingPrice\")" ] }, { "cell_type": "code", "execution_count": null, "id": "a2060f68", "metadata": {}, "outputs": [], "source": [ "df_new.show(5)" ] }, { "cell_type": "code", "execution_count": null, "id": "e4547ce8", "metadata": {}, "outputs": [], "source": [ "df.printSchema()" ] }, { "cell_type": "code", "execution_count": null, "id": "7771bebf", "metadata": {}, "outputs": [], "source": [ "if DEBUG:\n", " df_new.groupBy('device_type_id').count().show()" ] }, { "cell_type": "markdown", "id": "2569d36e", "metadata": {}, "source": [ "### Sanity checks for null values\n", "\n", "We quickly check if there are null values within the `device_type_id`column as those would generate problems later on." ] }, { "cell_type": "code", "execution_count": null, "id": "3c072086", "metadata": {}, "outputs": [], "source": [ "if DEBUG:\n", " df_new.filter(df_new['device_type_id'].isNull()).count()" ] }, { "cell_type": "markdown", "id": "43476a54", "metadata": {}, "source": [ "### Generate training, validation and test data sets." ] }, { "cell_type": "markdown", "id": "a0293125", "metadata": {}, "source": [ "First of all we are making sure that the label column is the first column as this is expected for the training later on." ] }, { "cell_type": "code", "execution_count": null, "id": "467435c0", "metadata": {}, "outputs": [], "source": [ "# reorder columns label as first column\n", "#df_new = df_new.select([\"label\", \"bid_table.BidID\", \"dow\", \"hour\", \"IndexAdvertiserID\", \"IndexDomain\", \"IndexRegionID\", \"IndexCityID\", \"device_type_id\"])\n", "df_new = df_new.select([\"label\", \"dow\", \"hour\", \"IndexAdvertiserID\", \"IndexDomain\", \"IndexRegionID\", \"IndexCityID\", \"device_type_id\"])" ] }, { "cell_type": "code", "execution_count": null, "id": "18f76a6d", "metadata": {}, "outputs": [], "source": [ "df_new.show(1)" ] }, { "cell_type": "markdown", "id": "2c502eda", "metadata": {}, "source": [ "Now we are going to shuffle the dataset prior to splitting the data in the three different sets." ] }, { "cell_type": "code", "execution_count": null, "id": "e2e622e4", "metadata": {}, "outputs": [], "source": [ "from pyspark.sql.functions import col, rand\n", "df_new_shuffled = df_new.orderBy(rand())" ] }, { "cell_type": "code", "execution_count": null, "id": "8ce2f50b", "metadata": {}, "outputs": [], "source": [ "df_new_shuffled.show(5)" ] }, { "cell_type": "code", "execution_count": null, "id": "81aee1cc", "metadata": {}, "outputs": [], "source": [ "if DEBUG:\n", " df_new.count()\n", " df_new_shuffled.groupBy('label').count().show()\n", " df.groupBy('PayingPrice').count().show()" ] }, { "cell_type": "markdown", "id": "1a8fd311", "metadata": {}, "source": [ "Finally we are going to split the dataset into 80% training, 10% validation and 10% testing." ] }, { "cell_type": "code", "execution_count": null, "id": "ccb0e6e2", "metadata": {}, "outputs": [], "source": [ "splits = df_new_shuffled.randomSplit([0.8, 0.1, 0.1], 42)" ] }, { "cell_type": "code", "execution_count": null, "id": "b03e5cbc", "metadata": {}, "outputs": [], "source": [ "# Check the counts\n", "if DEBUG:\n", " splits[0].count(), splits[1].count(), splits[2].count()" ] }, { "cell_type": "markdown", "id": "bec9b0ad", "metadata": {}, "source": [ "### Save to S3 in parquet format" ] }, { "cell_type": "code", "execution_count": null, "id": "5bab28ae", "metadata": {}, "outputs": [], "source": [ "splits[0].write.parquet(output_train, mode=\"overwrite\")" ] }, { "cell_type": "code", "execution_count": null, "id": "c00cfa54", "metadata": {}, "outputs": [], "source": [ "splits[1].write.parquet(output_verify, mode=\"overwrite\")" ] }, { "cell_type": "code", "execution_count": null, "id": "3d4db57a", "metadata": {}, "outputs": [], "source": [ "splits[2].write.parquet(output_test, mode=\"overwrite\")" ] } ], "metadata": { "instance_type": "ml.t3.medium", "kernelspec": { "display_name": "PySpark (SparkMagic)", "language": "python", "name": "pysparkkernel__SAGEMAKER_INTERNAL__arn:aws:sagemaker:eu-west-1:470317259841:image/sagemaker-sparkmagic" }, "language_info": { "codemirror_mode": { "name": "python", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "pyspark", "pygments_lexer": "python3" } }, "nbformat": 4, "nbformat_minor": 5 }