{ "cells": [ { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "# Module 5: Online ingestion via SageMaker Processing job (PySpark)\n", "\n", "---\n", "\n", "**Note:** Please set kernel to `Python 3 (Data Science)` and select instance to `ml.t3.medium`\n", "\n", "## Contents\n", "\n", "1. [Setup](#Setup)\n", "1. [Create PySpark SageMaker Processing script](#Create-PySpark-SageMaker-Processing-script)\n", "1. [Run batch ingestion job](#Run-batch-ingestion-job)\n", "1. [Verify processing job results](#Verify-processing-job-results)\n", "\n", "This notebook uses the SageMaker Processing Job to create a scalable and repeatable ingestion into the feature store. It utilizes the `.put_record()` api of the feature store contrary to [notebook 4 - m5_nb4_sm_processing_pyspark_offline_batch](./m5_nb4_sm_processing_pyspark_offline_batch.ipynb), which uses the SageMaker Feature Store Manager to perform a spark batch ingestion. The SageMaker Feature Store Manager needs extra packages, which can be avoided using the method presented in this section. " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Setup" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Imports " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.spark.processing import PySparkProcessor\n", "from sagemaker import get_execution_role\n", "from random import randint\n", "import sagemaker\n", "import logging\n", "import boto3\n", "import json" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "logger = logging.getLogger('__name__')\n", "logger.setLevel(logging.DEBUG)\n", "logger.addHandler(logging.StreamHandler())" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "logger.info(f'Using SageMaker version: {sagemaker.__version__}')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Essentials" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "role = get_execution_role()\n", "logger.info(f'Role = {role}')\n", "sagemaker_session = sagemaker.Session()\n", "region = sagemaker_session.boto_region_name\n", "featurestore_runtime_client = sagemaker_session.boto_session.client('sagemaker-featurestore-runtime', region_name=region)\n", "default_bucket = sagemaker_session.default_bucket()\n", "logger.info(f'Default bucket = {default_bucket}')\n", "prefix = 'sagemaker-feature-store'" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Create PySpark SageMaker Processing script" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%writefile ./scripts/batch_ingest_sm_pyspark.py\n", "from pyspark.ml.feature import VectorAssembler, StringIndexer, MinMaxScaler\n", "from pyspark.sql.functions import udf, datediff, to_date, lit\n", "from pyspark.sql.types import IntegerType, DoubleType\n", "from pyspark.sql import SparkSession, DataFrame\n", "from argparse import Namespace, ArgumentParser\n", "from pyspark.ml.linalg import Vector\n", "from pyspark.ml import Pipeline\n", "from datetime import datetime\n", "import argparse\n", "import logging\n", "import boto3\n", "import time\n", "import os\n", "\n", "\n", "logger = logging.getLogger('__name__')\n", "logger.setLevel(logging.INFO)\n", "logger.addHandler(logging.StreamHandler())\n", "\n", "\n", "def transform_row(row) -> list:\n", " columns = list(row.asDict())\n", " record = []\n", " for column in columns:\n", " feature = {'FeatureName': column, 'ValueAsString': str(row[column])}\n", " record.append(feature)\n", " return record\n", "\n", "\n", "def ingest_to_feature_store(args: argparse.Namespace, rows) -> None:\n", " feature_group_name = args.feature_group_name\n", " session = boto3.session.Session()\n", " featurestore_runtime_client = session.client(service_name='sagemaker-featurestore-runtime')\n", " rows = list(rows)\n", " logger.info(f'Ingesting {len(rows)} rows into feature group: {feature_group_name}')\n", " for _, row in enumerate(rows):\n", " record = transform_row(row)\n", " response = featurestore_runtime_client.put_record(FeatureGroupName=feature_group_name, Record=record)\n", " assert response['ResponseMetadata']['HTTPStatusCode'] == 200\n", "\n", "\n", "def parse_args() -> None:\n", " parser = argparse.ArgumentParser()\n", " parser.add_argument('--num_processes', type=int, default=1)\n", " parser.add_argument('--num_workers', type=int, default=1)\n", " parser.add_argument('--feature_group_name', type=str)\n", " parser.add_argument(\"--s3_uri_prefix\", type=str)\n", " args, _ = parser.parse_known_args()\n", " return args\n", "\n", "\n", "def scale_col(df: DataFrame, col_name: str) -> DataFrame:\n", " unlist = udf(lambda x: round(float(list(x)[0]), 2), DoubleType())\n", " assembler = VectorAssembler(inputCols=[col_name], outputCol=f'{col_name}_vec')\n", " scaler = MinMaxScaler(inputCol=f'{col_name}_vec', outputCol=f'{col_name}_scaled')\n", " pipeline = Pipeline(stages=[assembler, scaler])\n", " df = pipeline.fit(df).transform(df).withColumn(f'{col_name}_scaled', unlist(f'{col_name}_scaled')) \\\n", " .drop(f'{col_name}_vec')\n", " df = df.drop(col_name)\n", " df = df.withColumnRenamed(f'{col_name}_scaled', col_name)\n", " return df\n", "\n", "\n", "def ordinal_encode_col(df: DataFrame, col_name: str) -> DataFrame:\n", " indexer = StringIndexer(inputCol=col_name, outputCol=f'{col_name}_new')\n", " df = indexer.fit(df).transform(df)\n", " df = df.drop(col_name)\n", " df = df.withColumnRenamed(f'{col_name}_new', col_name)\n", " return df\n", "\n", "\n", "def run_spark_job():\n", " args = parse_args()\n", " spark_session = SparkSession.builder.appName('PySparkJob').getOrCreate()\n", " spark_context = spark_session.sparkContext\n", " total_cores = int(spark_context._conf.get('spark.executor.instances')) * int(spark_context._conf.get('spark.executor.cores'))\n", " logger.info(f'Total available cores in the Spark cluster = {total_cores}')\n", " logger.info('Reading input file from S3')\n", " df = spark_session.read.options(Header=True).csv(args.s3_uri_prefix)\n", " \n", " # transform raw features \n", " \n", " # transform 1 - encode boolean to int\n", " df = ordinal_encode_col(df, 'is_reordered')\n", " df = df.withColumn('is_reordered', df['is_reordered'].cast(IntegerType()))\n", "\n", " # transform 2 - min max scale `purchase_amount`\n", " df = df.withColumn('purchase_amount', df['purchase_amount'].cast(DoubleType()))\n", " df = scale_col(df, 'purchase_amount')\n", " \n", " # transform 3 - derive `n_days_since_last_purchase` column using the `purchased_on` col\n", " current_date = datetime.today().strftime('%Y-%m-%d')\n", " df = df.withColumn('n_days_since_last_purchase', datediff(to_date(lit(current_date)), to_date('purchased_on', 'yyyy-MM-dd')))\n", " df = df.drop('purchased_on')\n", " df = scale_col(df, 'n_days_since_last_purchase')\n", " df.show(5)\n", " \n", " logger.info(f'Number of partitions = {df.rdd.getNumPartitions()}')\n", " # Rule of thumb heuristic - rely on the product of #executors by #executor.cores, and then multiply that by 3 or 4\n", " df = df.repartition(total_cores * 3)\n", " logger.info(f'Number of partitions after re-partitioning = {df.rdd.getNumPartitions()}')\n", " logger.info(f'Feature Store ingestion start: {datetime.now().strftime(\"%m/%d/%Y, %H:%M:%S\")}')\n", " df.foreachPartition(lambda rows: ingest_to_feature_store(args, rows))\n", " logger.info(f'Feature Store ingestion complete: {datetime.now().strftime(\"%m/%d/%Y, %H:%M:%S\")}')\n", "\n", "\n", "if __name__ == '__main__':\n", " logger.info('BATCH INGESTION - STARTED')\n", " run_spark_job()\n", " logger.info('BATCH INGESTION - COMPLETED')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Run batch ingestion job" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%store -r orders_feature_group_name\n", "\n", "s3_uri_prefix = f's3://{default_bucket}/{prefix}/partitions/*'\n", "# REUSE orders feature group name from module 1\n", "feature_group_name = orders_feature_group_name " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pyspark_processor = PySparkProcessor(framework_version='2.4', # Spark version\n", " role=role,\n", " instance_type='ml.m5.xlarge',\n", " instance_count=2,\n", " base_job_name='sm-processing-pyspark-fs-ingestion',\n", " env={'AWS_DEFAULT_REGION': boto3.Session().region_name,\n", " 'mode': 'python'},\n", " max_runtime_in_seconds=3600)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "\n", "pyspark_processor.run(submit_app='./scripts/batch_ingest_sm_pyspark.py', \n", " arguments = ['--feature_group_name', feature_group_name, \n", " '--s3_uri_prefix', s3_uri_prefix], \n", " spark_event_logs_s3_uri=f's3://{default_bucket}/spark-logs', \n", " logs=False) # set logs=True to enable logging" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Verify processing job results" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "order_id = f'O{randint(1, 100000)}'\n", "logger.info(f'order_id={order_id}') \n", "\n", "feature_record = featurestore_runtime_client.get_record(FeatureGroupName=feature_group_name, \n", " RecordIdentifierValueAsString=order_id)\n", "print(json.dumps(feature_record, indent=2))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "instance_type": "ml.m5.large", "kernelspec": { "display_name": "datascience", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.9" }, "vscode": { "interpreter": { "hash": "d795c0e75ad6f709987bd9b084eaf6e68fce5ba2deef3e876790e85fe4f0e0c1" } } }, "nbformat": 4, "nbformat_minor": 4 }