{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Module 5: Batch ingestion via SageMaker Processing job (Sklearn)\n", "\n", "---\n", "**Note:** Please set kernel to `Python 3 (Data Science)` and select instance to `ml.t3.medium`\n", "\n", "## Contents\n", "\n", "1. [Setup](#Setup)\n", "1. [Create Sklearn SageMaker Processing script](#Create-Sklearn-SageMaker-Processing-script)\n", "1. [Run batch ingestion processing job](#Run-batch-ingestion-processing-job)\n", "1. [Verify processing job results](#Verify-processing-job-results)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Setup" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Imports" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.processing import ProcessingInput, ProcessingOutput\n", "from sagemaker.sklearn.processing import SKLearnProcessor\n", "from sagemaker import get_execution_role\n", "from random import randint\n", "import sagemaker\n", "import logging\n", "import json" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "logger = logging.getLogger('__name__')\n", "logger.setLevel(logging.DEBUG)\n", "logger.addHandler(logging.StreamHandler())" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "logger.info(f'Using SageMaker version: {sagemaker.__version__}')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Essentials" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "role = get_execution_role()\n", "logger.info(f'Role = {role}')\n", "sagemaker_session = sagemaker.Session()\n", "region = sagemaker_session.boto_region_name\n", "featurestore_runtime_client = sagemaker_session.boto_session.client('sagemaker-featurestore-runtime', region_name=region)\n", "default_bucket = sagemaker_session.default_bucket()\n", "logger.info(f'Default bucket = {default_bucket}')\n", "prefix = 'sagemaker-feature-store'" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Create Sklearn SageMaker Processing script" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%writefile ./scripts/batch_ingest_sm_sklearn.py\n", "import subprocess\n", "import sys\n", "subprocess.check_call([sys.executable, '-m', 'pip', 'install', 'sagemaker'])\n", "\n", "from sagemaker.feature_store.feature_group import FeatureGroup\n", "from sklearn.preprocessing import MinMaxScaler, LabelEncoder\n", "from datetime import datetime, timezone, date\n", "import pandas as pd\n", "import sagemaker\n", "import argparse\n", "import logging\n", "import time\n", "import os\n", "\n", "\n", "sagemaker_session = sagemaker.Session()\n", "\n", "logger = logging.getLogger('__name__')\n", "logger.setLevel(logging.INFO)\n", "logger.addHandler(logging.StreamHandler())\n", "\n", "\n", "label_encoder = LabelEncoder()\n", "min_max_scaler = MinMaxScaler()\n", "n_cores = os.cpu_count()\n", "\n", "\n", "def get_file_paths(directory):\n", " file_paths = [] \n", " for root, directories, files in os.walk(directory):\n", " for file_name in files:\n", " if file_name.endswith('.csv'):\n", " file_path = os.path.join(root, file_name)\n", " file_paths.append(file_path) \n", " return file_paths\n", "\n", "\n", "def get_delta_in_days(date_time) -> int:\n", " today = date.today()\n", " delta = today - date_time.date()\n", " return delta.days\n", "\n", "\n", "def apply_transforms(df: pd.DataFrame) -> pd.DataFrame:\n", " df['is_reordered'] = df['is_reordered'].astype(int)\n", " df['purchased_on'] = pd.to_datetime(df['purchased_on'], format='%Y-%m-%d %H:%M:%S')\n", " df['n_days_since_last_purchase'] = df['purchased_on'].apply(lambda x: get_delta_in_days(x))\n", " df['n_days_since_last_purchase'] = min_max_scaler.fit_transform(df[['n_days_since_last_purchase']])\n", " df.drop('purchased_on', axis=1, inplace=True)\n", " return df\n", "\n", "\n", "def ingest_data(args: argparse.Namespace) -> None:\n", " files = get_file_paths('/opt/ml/processing/input/')\n", " logger.info(f'Files: {files}')\n", " df = pd.concat([pd.read_csv(file) for file in files], ignore_index=True)\n", " df = apply_transforms(df)\n", " logger.info(f'Ingesting a total of [{df.shape[0]}] rows from {len(files)} files')\n", " logger.info(f'Ingesting into feature group [{args.feature_group_name}] using {args.num_processes} processes and {args.num_workers} workers')\n", " fg = FeatureGroup(name=args.feature_group_name, sagemaker_session=sagemaker_session)\n", " response = fg.ingest(data_frame=df, max_processes=args.num_processes, max_workers=args.num_workers, wait=True)\n", " \"\"\"\n", " The ingest call above returns an IngestionManagerPandas instance as a response. Zero based indices of rows \n", " that failed to be ingested are captured via failed_rows in this response. By asserting this count to be 0,\n", " we validated that all rows were successfully ingested without a failure.\n", " \"\"\"\n", " assert len(response.failed_rows) == 0\n", " \n", " \n", "def parse_args() -> None:\n", " parser = argparse.ArgumentParser()\n", " # if num_processes is not set, we set it to the number of vCPUs by default\n", " parser.add_argument('--num_processes', type=int, default=n_cores)\n", " # if num_workers is not set, we default it to 4 i.e., 4 threads per python process\n", " parser.add_argument('--num_workers', type=int, default=4)\n", " parser.add_argument('--feature_group_name', type=str)\n", " args, _ = parser.parse_known_args()\n", " return args\n", "\n", "\n", "if __name__ == '__main__':\n", " logger.info('BATCH INGESTION - STARTED')\n", " args = parse_args()\n", " ingest_data(args)\n", " logger.info('BATCH INGESTION - COMPLETED')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Run batch ingestion processing job" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%store -r orders_feature_group_name\n", "\n", "s3_uri_prefix = f's3://{default_bucket}/{prefix}/partitions/'\n", "# REUSE orders feature group name from module 1\n", "feature_group_name = orders_feature_group_name" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "\n", "sklearn_processor = SKLearnProcessor(framework_version='0.20.0',\n", " role=role,\n", " instance_type='ml.m5.xlarge',\n", " instance_count=2,\n", " base_job_name='fscw-sm-processing-sklearn-fs-ingestion', \n", " env={'AWS_DEFAULT_REGION': region})\n", "\n", "\"\"\"\n", "Note: It is recommended to set the num_processes argument below to the total number of cores (vCPUs)in your \n", "processing node and set the num_workers to 4. num_workers here denotes number of threads per python process.\n", "In this example, since we are using instance_type=m1.m5.xlarge (set above) for our processing node, we have \n", "set num_processes=4 (an m5.xlarge instance has 4 cores) and num_workers=4 below.\n", "\"\"\"\n", "\n", "sklearn_processor.run(code='./scripts/batch_ingest_sm_sklearn.py', \n", " arguments = ['--num_processes', '4', \n", " '--num_workers', '4',\n", " '--feature_group_name', feature_group_name],\n", " inputs=[ProcessingInput(s3_data_type='S3Prefix', \n", " source=s3_uri_prefix, \n", " s3_data_distribution_type='ShardedByS3Key', \n", " destination='/opt/ml/processing/input')],\n", " logs=False) # set logs=True to enable logging" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Verify processing job results" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "order_id = f'O{randint(1, 100000)}'\n", "logger.info(f'order_id={order_id}') \n", "\n", "feature_record = featurestore_runtime_client.get_record(FeatureGroupName=feature_group_name, \n", " RecordIdentifierValueAsString=order_id)\n", "print(json.dumps(feature_record, indent=2))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "instance_type": "ml.t3.medium", "kernelspec": { "display_name": "Python 3 (Data Science)", "language": "python", "name": "python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-east-1:081325390199:image/datascience-1.0" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.10" } }, "nbformat": 4, "nbformat_minor": 4 }