{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Module 5: Offline Batch ingestion via SageMaker Processing job using Feature Store Spark Connector\n", "\n", "---\n", "\n", "**Note:** Please set kernel to `Python 3 (Data Science)` and select instance to `ml.t3.medium`\n", "\n", "Please be aware, that you need to run through the notebook [m5_nb0_partition_data](https://github.com/aws-samples/amazon-sagemaker-feature-store-end-to-end-workshop/blob/main/05-module-scalable-batch-ingestion/m5_nb0_partition_data.ipynb) in this section of the workshop, to setup the needed data. \n", "\n", "## Contents\n", "\n", "1. [Setup](#Setup)\n", "1. [Create PySpark SageMaker Processing script](#Create-PySpark-SageMaker-Processing-script)\n", "1. [Run batch ingestion job](#Run-batch-ingestion-job)\n", "1. [Verify processing job results](#Verify-processing-job-results)\n", "\n", "\n", "\n", "In this example, an alternative route through the batch ingestion via PySpark Processing containers will be explored to ingest data direclty into the Offline Store. We will use the `.ingest_data()` api instead of the `.put_record()` api. \n", "\n", "This notebook will display how to use the the [Sagemaker Feature Store Manager](https://pypi.org/project/sagemaker-feature-store-pyspark/). \n", "\n", "\n", "To achieve this, the package [Sagemaker Feature Store PySpark](https://pypi.org/project/sagemaker-feature-store-pyspark/) is needed. If you want to use other means of Spark, please see the [Documentation](https://docs.aws.amazon.com/sagemaker/latest/dg/batch-ingestion-spark-connector-setup.html) for further guidance.\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Setup" ] }, { "cell_type": "markdown", "metadata": { "tags": [] }, "source": [ "#### Imports " ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "from sagemaker.spark.processing import PySparkProcessor\n", "from sagemaker import get_execution_role\n", "from random import randint\n", "import sagemaker\n", "import logging\n", "import boto3\n", "import json" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "logging.basicConfig(format='SMFS BATCH INGEST PYSPARK - %(levelname)s - %(message)s')\n", "\n", "logger = logging.getLogger(__name__)\n", "logger.setLevel(logging.DEBUG)\n", "\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "logger.info(f'Using SageMaker version: {sagemaker.__version__}')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Essentials" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "try:\n", " role = get_execution_role()\n", "except:\n", " # for local dev, please set your sagemaker role here\n", " role = #'arn:aws:iam::XXXXXXXX:role/service-role/role-name'\n", "logger.info(f'Role = {role}')\n", "sagemaker_session = sagemaker.Session()\n", "region = sagemaker_session.boto_region_name\n", "featurestore_runtime_client = sagemaker_session.boto_session.client('sagemaker-featurestore-runtime', region_name=region)\n", "default_bucket = sagemaker_session.default_bucket()\n", "logger.info(f'Default bucket = {default_bucket}')\n", "prefix = 'sagemaker-feature-store'\n", "\n", "spark_version = '3.1'" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Getting the packages needed\n", "To use the feature store manager directly, we only need two things. \n", "- the right `jar` that we can get from [maven](https://mvnrepository.com/artifact/software.amazon.sagemaker.featurestore/sagemaker-feature-store-spark-sdk) or from installing of the `sagemaker-feature-store-pyspark` packages. \n", "- the right python packages that we can get from [PyPi](https://pypi.org/project/sagemaker-feature-store-pyspark/). From PyPi You can download the latest package and copy the `feature-store-manager.py` and `wrapper.py` to your local file system. This has already been done for you in the folder `feature-store-pyspark`. For the packages, please look into the feature_store_pyspark folder. We will need to provide both to the pyspark processor. \n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "You can either get the jars for needed directly from [maven](https://mvnrepository.com/artifact/software.amazon.sagemaker.featurestore/sagemaker-feature-store-spark-sdk), or simply install the feature-store-manager via pip, which will get us the jars without the hassle of copying them. " ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "# Install a pip package in the current Jupyter kernel\n", "import sys\n", "!{sys.executable} -m pip install sagemaker-feature-store-pyspark-{spark_version}\n", "!{sys.executable} -m pip install pyathena" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "jar_path = !feature-store-pyspark-dependency-jars\n", "jar_path = jar_path[0]\n", "jar_path" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now that we have all the needed packages, the PySpark processing script can be created. \n", "Not only can we ingest into the feature store direclty, but combine it with data cleaning and feature engineering. " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ " ## Create PySpark Processing script\n", "The following script\n", "- Creates a data schema for the spark dataframe\n", "- Ordinally encodes a column\n", "- Scales the data \n", "- Transforms the column `purchased_on` to the ML ready feature `n_days_since_last_purchase`\n", "- Repartitions the data \n", "- Ingests into the offline feature store\n", "\n", "If you are only interested in the batch ingestion, please take a look at the `batch_ingest_to_feature_store` method.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%writefile ./scripts/batch_ingest_sm_pyspark.py\n", "from pyspark.ml.feature import VectorAssembler, StringIndexer, MinMaxScaler\n", "from feature_store_manager import FeatureStoreManager\n", "from pyspark.sql.functions import udf, datediff, to_date, lit, col,isnan, when, count\n", "from pyspark.sql.types import IntegerType, DoubleType, StructType, StructField, StringType, FloatType\n", "from pyspark.sql import SparkSession, DataFrame\n", "from argparse import Namespace, ArgumentParser\n", "from pyspark.ml.linalg import Vector\n", "from pyspark.ml import Pipeline\n", "from datetime import datetime\n", "import argparse\n", "import ast\n", "import logging\n", "import boto3\n", "import time\n", "import os\n", "\n", "\n", "logger = logging.getLogger('__name__')\n", "logger.setLevel(logging.INFO)\n", "logger.addHandler(logging.StreamHandler())\n", "\n", "\n", "def parse_args() -> None:\n", " parser = argparse.ArgumentParser()\n", " parser.add_argument('--num_processes', type=int, default=1)\n", " parser.add_argument('--num_workers', type=int, default=1)\n", " parser.add_argument('--feature_group_name', type=str)\n", " parser.add_argument('--feature_group_arn', type=str)\n", " parser.add_argument('--target_feature_store_list', type=str)\n", " parser.add_argument('--s3_uri_prefix', type=str)\n", " \n", " args, _ = parser.parse_known_args()\n", " return args\n", "\n", "def transform_row(row) -> list:\n", " columns = list(row.asDict())\n", " record = []\n", " for column in columns:\n", " feature = {'FeatureName': column, 'ValueAsString': str(row[column])}\n", " record.append(feature)\n", " return record\n", "\n", "def batch_ingest_to_feature_store(args: argparse.Namespace, df: DataFrame) -> None:\n", " feature_group_name = args.feature_group_name\n", " logger.info(f'Feature Group name supplied is: {feature_group_name}')\n", " session = boto3.session.Session()\n", "\n", " logger.info(f'Instantiating FeatureStoreManger!')\n", " feature_store_manager=FeatureStoreManager()\n", "\n", " logger.info(f'trying to load datatypes directly from Dataframe')\n", "\n", " # Load the feature definitions from input schema. The feature definitions can be used to create a feature group\n", " feature_definitions = feature_store_manager.load_feature_definitions_from_schema(df)\n", " logger.info(f'Feature definitions loaded successfully!')\n", " print(feature_definitions)\n", " feature_group_arn = args.feature_group_arn\n", " logger.info(f'Feature Group ARN supplied is: {feature_group_arn}')\n", "\n", " # If only OfflineStore is selected, the connector will batch write the data to offline store directly\n", " args.target_feature_store_list = ast.literal_eval(args.target_feature_store_list)\n", " logger.info(f'Ingesting into the following stores: {args.target_feature_store_list}')\n", "\n", " feature_store_manager.ingest_data(input_data_frame=df, feature_group_arn=feature_group_arn, target_stores= args.target_feature_store_list) \n", " logger.info(f'Feature Ingestions successful!')\n", "\n", "def scale_col(df: DataFrame, col_name: str) -> DataFrame:\n", " unlist = udf(lambda x: round(float(list(x)[0]), 2), DoubleType())\n", " assembler = VectorAssembler(inputCols=[col_name], outputCol=f'{col_name}_vec')\n", " # scale an column col_name with minmax scaler and drop the original column\n", "\n", " scaler = MinMaxScaler(inputCol=f'{col_name}_vec', outputCol=f'{col_name}_scaled')\n", " pipeline = Pipeline(stages=[assembler, scaler])\n", " df = pipeline.fit(df).transform(df).withColumn(f'{col_name}_scaled', unlist(f'{col_name}_scaled')) \\\n", " .drop(f'{col_name}_vec')\n", " df = df.drop(col_name)\n", " df = df.withColumnRenamed(f'{col_name}_scaled', col_name)\n", " return df\n", "\n", "def ordinal_encode_col(df: DataFrame, col_name: str) -> DataFrame:\n", " indexer = StringIndexer(inputCol=col_name, outputCol=f'{col_name}_new')\n", " df = indexer.fit(df).transform(df)\n", " df = df.drop(col_name)\n", " df = df.withColumnRenamed(f'{col_name}_new', col_name)\n", " return df\n", "\n", "def run_spark_job():\n", "\n", " args = parse_args()\n", " \n", " spark = SparkSession.builder.getOrCreate()\n", " \n", " # set the legacy time parser policy to LEGACY to allow for parsing of dates in the format dd/MM/yyyy HH:mm:ss, which solves backwards compatibility issues to spark 2.4\n", " spark.sql(\"set spark.sql.legacy.timeParserPolicy=LEGACY\")\n", "\n", " logger.info(f'Using Spark-Version:{spark.version}')\n", "\n", " # get the total number of cores in the Spark cluster; if developing locally, there might be no executor\n", " try:\n", " spark_context = spark.sparkContext\n", " total_cores = int(spark_context._conf.get('spark.executor.instances')) * int(spark_context._conf.get('spark.executor.cores'))\n", " logger.info(f'Total available cores in the Spark cluster = {total_cores}')\n", " except:\n", " total_cores = 1\n", " logger.error(f'Could not retrieve number of total cores. Setting total cores to 1. Error message: {str(e)}')\n", " \n", " logger.info(f'Reading input file from S3. S3 uri is {args.s3_uri_prefix}')\n", "\n", " # define the schema of the input data\n", " csvSchema = StructType([\n", " StructField(\"order_id\", StringType(), True),\n", " StructField(\"customer_id\", StringType(), False),\n", " StructField(\"product_id\", StringType(), False),\n", " StructField(\"purchase_amount\", FloatType(), False),\n", " StructField(\"is_reordered\", IntegerType(), False),\n", " StructField(\"purchased_on\", StringType(), False),\n", " StructField(\"event_time\", StringType(), False)])\n", "\n", " # read the pyspark dataframe with a schema \n", " df = spark.read.option(\"header\", \"true\").schema(csvSchema).csv(args.s3_uri_prefix) \n", "\n", " # transform 1 - encode boolean to int\n", " df = ordinal_encode_col(df, 'is_reordered')\n", " df = df.withColumn('is_reordered', df['is_reordered'].cast(IntegerType()))\n", "\n", " # transform 2 - min max scale `purchase_amount`\n", " df = df.withColumn('purchase_amount', df['purchase_amount'].cast(DoubleType()))\n", " df = scale_col(df, 'purchase_amount')\n", " \n", " # transform 3 - derive `n_days_since_last_purchase` column using the `purchased_on` col\n", " current_date = datetime.today().strftime('%Y-%m-%d')\n", " df = df.withColumn('n_days_since_last_purchase', datediff(to_date(lit(current_date)), to_date('purchased_on', 'yyyy-MM-dd')))\n", " df = df.drop('purchased_on')\n", " df = scale_col(df, 'n_days_since_last_purchase')\n", " \n", " \n", " logger.info(f'Number of partitions = {df.rdd.getNumPartitions()}')\n", " # Rule of thumb heuristic - rely on the product of #executors by #executor.cores, and then multiply that by 3 or 4\n", " df = df.repartition(total_cores * 3)\n", " logger.info(f'Number of partitions after re-partitioning = {df.rdd.getNumPartitions()}')\n", " logger.info(f'Feature Store ingestion start: {datetime.now().strftime(\"%m/%d/%Y, %H:%M:%S\")}')\n", " batch_ingest_to_feature_store(args, df)\n", " logger.info(f'Feature Store ingestion complete: {datetime.now().strftime(\"%m/%d/%Y, %H:%M:%S\")}')\n", "\n", "if __name__ == '__main__':\n", " logger.info('BATCH INGESTION - STARTED')\n", " run_spark_job()\n", " logger.info('BATCH INGESTION - COMPLETED')\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Next, check that our feature group names, feature definition and our arguments are in line with our expectation." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%store -r orders_feature_group_name\n", "s3_uri_prefix = f's3://{default_bucket}/{prefix}/partitions/*'\n", "\n", "# REUSE orders feature group name from module 1\n", "try:\n", " feature_group_name = orders_feature_group_name \n", " logger.info(f\"{feature_group_name} is used for the feature group name.\")\n", "except NameError:\n", " logger.info(f\"Feature group name could not be retrieved, please specify manually.\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Specify the feature group name manually \n", "feature_group_name = \"fscw-orders-01-19-17-06\"" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "sm_client=boto3.client('sagemaker')\n", "\n", "feature_group_description = sm_client.describe_feature_group(FeatureGroupName=feature_group_name)\n", "feature_group_arn = feature_group_description['FeatureGroupArn']\n", "\n", "# please specify what target stores you want to ingest into -> PySpark does not accept list as a parameter\n", "target_feature_store_list = \"['OfflineStore']\" # ['OfflineStore', 'OnlineStore'] for both\n", "\n", "feature_group_name, feature_group_arn, target_feature_store_list, s3_uri_prefix" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "feature_group_description[\"FeatureDefinitions\"]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Run batch ingestion job\n", "You can check what image URI for the PySpark Processor is the right one for your region via [aws github](https://github.com/aws/sagemaker-spark-container/releases).\n", "\n", "If so, please specify the image URI via the `image_uri` attribute when instantiating the PySparkProcessor. \n", "\n", "Alternatively, you can specify the framework version that you would like to use via the `framework_version` attribute. \n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "from sagemaker.spark.processing import PySparkProcessor\n", "\n", "pyspark_processor = PySparkProcessor(\n", " base_job_name=\"spark-preprocessor\",\n", " # image_uri = image_uri,\n", " framework_version=spark_version,\n", " role=role,\n", " instance_count=1,\n", " instance_type=\"ml.m5.xlarge\",\n", " max_runtime_in_seconds=1200,\n", ")\n", "\n", "pyspark_processor.run(submit_app='./scripts/batch_ingest_sm_pyspark.py', \n", " arguments = ['--feature_group_name', feature_group_name, \n", " '--s3_uri_prefix', s3_uri_prefix,\n", " '--feature_group_arn', feature_group_arn,\n", " '--target_feature_store_list', target_feature_store_list],\n", " submit_jars=[jar_path],\n", " submit_py_files=[\n", " './feature_store_pyspark/feature_store_manager.py',\n", " './feature_store_pyspark/wrapper.py'\n", " ],\n", " spark_event_logs_s3_uri=f's3://{default_bucket}/spark-logs', \n", " logs=False,\n", " wait=True) # set logs=True to disable logging" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Verify processing job results" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "To verify the processing results, we are going to fetch an ingested order, by its `order_id` that we created. The same methodology can be used to get any record from the feature store. " ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "order_id = f'O{randint(1, 100000)}'\n", "logger.info(f'order_id={order_id}') \n", "print(feature_group_name)\n", "feature_record = featurestore_runtime_client.get_record(FeatureGroupName=feature_group_name, \n", " RecordIdentifierValueAsString=order_id)\n", "print(json.dumps(feature_record, indent=2))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Alternatively, we can use pyathena to comfortably query the underlying table for the latest inserted orders. " ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "import pyathena as pa\n", "import pandas as pd\n", "\n", "# getting the latest fetaure group description\n", "feature_group_description = sm_client.describe_feature_group(FeatureGroupName=feature_group_name)\n", "\n", "# Opening a connection to Athena\n", "conn = pa.connect(s3_staging_dir=f's3://{default_bucket}/athena-staging',\n", " region_name=region)\n", "\n", "# Getting the table name from the feature group description\n", "table_name = feature_group_description['OfflineStoreConfig']['DataCatalogConfig']['TableName']\n", "\n", "# Querying the table\n", "query = f\"\"\"SELECT * FROM \\\"sagemaker_featurestore\\\".\\\"{table_name}\\\" \n", " ORDER BY \"write_time\" DESC\n", " LIMIT 1000;\"\"\"\n", "\n", "df = pd.read_sql(query, conn)\n", "df" ] } ], "metadata": { "instance_type": "ml.t3.medium", "kernelspec": { "display_name": "Python 3 (Data Science)", "language": "python", "name": "python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-east-1:081325390199:image/datascience-1.0" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.10" }, "vscode": { "interpreter": { "hash": "d795c0e75ad6f709987bd9b084eaf6e68fce5ba2deef3e876790e85fe4f0e0c1" } } }, "nbformat": 4, "nbformat_minor": 4 }