{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Lab 1 : Data Preparation and Process" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Loading stored variables\n", "If you ran this notebook before, you may want to re-use the resources you aready created with AWS. Run the cell below to load any prevously created variables. You should see a print-out of the existing variables. If you don't see anything printed then it's probably the first time you are running the notebook!\n", "\n", "* Please make sure to run the prerequisites.ipynb in the notebook folder before running this notebook" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%store -r\n", "%store" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ ": You must have run the previous sequential notebooks to retrieve variables using the StoreMagic command." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Preparation\n", "\n", "\n", "Let's start by specifying:\n", "\n", "- The S3 bucket and prefix that you want to use for training and model data. This should be within the same region as the Notebook Instance, training, and hosting.\n", "- The IAM role arn used to give training and hosting access to your data. See the documentation for how to create these. Note, if more than one role is required for notebook instances, training, and/or hosting, please replace the boto regexp with a the appropriate full IAM role arn string(s)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# cell 01\n", "import sagemaker\n", "bucket=sagemaker.Session().default_bucket()\n", "prefix = 'sagemaker/DEMO-xgboost-tripfare'\n", " \n", "# Define IAM role\n", "import boto3\n", "import re\n", "import os\n", "from sagemaker import get_execution_role\n", "\n", "role = get_execution_role()\n", "sagemaker_session = sagemaker.Session()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# cell 02\n", "import numpy as np # For matrix operations and numerical processing\n", "import pandas as pd # For munging tabular data\n", "import matplotlib.pyplot as plt # For charts and visualizations\n", "from IPython.display import Image # For displaying images in the notebook\n", "from IPython.display import display # For displaying outputs in the notebook\n", "from time import gmtime, strftime # For labeling SageMaker models, endpoints, etc.\n", "import sys # For writing outputs to notebook\n", "import math # For ceiling function\n", "import json # For parsing hosting outputs\n", "import os # For manipulating filepath names\n", "import sagemaker \n", "import zipfile # Amazon SageMaker's Python SDK provides many helper functions" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "if 'input_source' not in locals():\n", " \n", " input_source = f's3://{bucket}/{prefix}/input/'\n", " %store input_source\n", " \n", "else:\n", " print(f'input source is available: {input_source}')\n", "\n", "input_data = input_source + 'data/green/'\n", "%store input_data" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# cell 05\n", "from sagemaker import Session\n", "\n", "sess = Session()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Create Feature Groups" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In this notebook, you will learn how to create a feature group in the SageMaker Feature Store. You will then learn how to ingest the feature \n", "columns into the created feature groups using SageMaker Python SDK. You will also see how to get an ingested feature record from the Online store. In the end, you will know how to list all the feature groups created within the Feature Store and delete them." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "column_schemas = [\n", " {\n", " \"name\": \"fare_amount\",\n", " \"type\": \"float\"\n", " },\n", " {\n", " \"name\": \"passenger_count\",\n", " \"type\": \"long\"\n", " },\n", " {\n", " \"name\": \"pickup_latitude\",\n", " \"type\": \"float\"\n", " },\n", " {\n", " \"name\": \"pickup_longitude\",\n", " \"type\": \"float\"\n", " },\n", " {\n", " \"name\": \"dropoff_latitude\",\n", " \"type\": \"float\"\n", " },\n", " {\n", " \"name\": \"dropoff_longitude\",\n", " \"type\": \"float\"\n", " },\n", " {\n", " \"name\": \"geo_distance\",\n", " \"type\": \"float\"\n", " },\n", " {\n", " \"name\": \"hour\",\n", " \"type\": \"int\"\n", " },\n", " {\n", " \"name\": \"weekday\",\n", " \"type\": \"int\"\n", " },\n", " {\n", " \"name\": \"month\",\n", " \"type\": \"int\"\n", " },\n", " {\n", " \"name\": \"FS_ID\",\n", " \"type\": \"long\"\n", " },\n", " {\n", " \"name\": \"FS_time\",\n", " \"type\": \"float\"\n", " },\n", "]" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.feature_store.feature_definition import FeatureDefinition\n", "from sagemaker.feature_store.feature_definition import FeatureTypeEnum\n", "\n", "default_feature_type = FeatureTypeEnum.STRING\n", "column_to_feature_type_mapping = {\n", " \"float\": FeatureTypeEnum.FRACTIONAL,\n", " \"long\": FeatureTypeEnum.INTEGRAL\n", "}\n", "\n", "feature_definitions = [\n", " FeatureDefinition(\n", " feature_name=column_schema['name'], \n", " feature_type=column_to_feature_type_mapping.get(column_schema['type'], default_feature_type)\n", " ) for column_schema in column_schemas\n", "]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Initialize & Create Feature Group" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "region = boto3.Session().region_name\n", "boto_session = boto3.Session(region_name=region)\n", "\n", "sagemaker_client = boto_session.client(service_name='sagemaker', region_name=region)\n", "featurestore_runtime = boto_session.client(service_name='sagemaker-featurestore-runtime', region_name=region)\n", "\n", "feature_store_session = sagemaker.session.Session(\n", " boto_session=boto_session,\n", " sagemaker_client=sagemaker_client,\n", " sagemaker_featurestore_runtime_client=featurestore_runtime\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "current_timestamp = strftime('%m-%d-%H-%M', gmtime())\n", "\n", "# prefix to track all the feature groups created as part of feature store champions workshop (fscw)\n", "fs_prefix = 'fscw-'\n", "\n", "tripfare_feature_group_name = f'{fs_prefix}tripfare-{current_timestamp}'\n", "%store tripfare_feature_group_name\n", "\n", "print(f'Customers feature group name = {tripfare_feature_group_name}')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Feature group is initialized and created below" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.feature_store.feature_group import FeatureGroup\n", "\n", "enable_online_store=True\n", "feature_store_offline_s3_uri = 's3://' + bucket\n", "\n", "record_identifier_feature_name = 'FS_ID'\n", "event_time_feature_name = 'FS_time'\n", "\n", "feature_group = FeatureGroup(\n", " name=tripfare_feature_group_name, sagemaker_session=feature_store_session, feature_definitions=feature_definitions)\n", "\n", "feature_group.create(\n", " s3_uri=feature_store_offline_s3_uri,\n", " record_identifier_name=record_identifier_feature_name,\n", " event_time_feature_name=event_time_feature_name,\n", " role_arn=role,\n", " enable_online_store=enable_online_store\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import time\n", "def wait_for_feature_group_creation_complete(feature_group):\n", " \"\"\"Helper function to wait for the completions of creating a feature group\"\"\"\n", " status = feature_group.describe().get(\"FeatureGroupStatus\")\n", " while status == \"Creating\":\n", " print(\"Waiting for Feature Group Creation\")\n", " time.sleep(5)\n", " status = feature_group.describe().get(\"FeatureGroupStatus\")\n", " if status != \"Created\":\n", " raise SystemExit(f\"Failed to create feature group {feature_group.name}: {status}\")\n", " print(f\"FeatureGroup {feature_group.name} successfully created.\")\n", "\n", "wait_for_feature_group_creation_complete(feature_group=feature_group)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Feature Engineering with Amazon SageMaker Processing\n", "\n", "Amazon SageMaker Processing allows you to run steps for data pre- or post-processing, feature engineering, data validation, or model evaluation workloads on Amazon SageMaker. Processing jobs accept data from Amazon S3 as input and store data into Amazon S3 as output.\n", "\n", "![processing](https://sagemaker.readthedocs.io/en/stable/_images/amazon_sagemaker_processing_image1.png)\n", "\n", "Here, we'll import the dataset and transform it with SageMaker Processing, which can be used to process terabytes of data in a SageMaker-managed cluster separate from the instance running your notebook server. In a typical SageMaker workflow, notebooks are only used for prototyping and can be run on relatively inexpensive and less powerful instances, while processing, training and model hosting tasks are run on separate, more powerful SageMaker-managed instances. SageMaker Processing includes off-the-shelf support for Scikit-learn, as well as a Bring Your Own Container option, so it can be used with many different data transformation technologies and tasks. \n", "\n", "To use SageMaker Processing, simply supply a Python data preprocessing script as shown below. For this example, we're using a SageMaker prebuilt Scikit-learn container, which includes many common functions for processing data. There are few limitations on what kinds of code and operations you can run, and only a minimal contract: input and output data must be placed in specified directories. If this is done, SageMaker Processing automatically loads the input data from S3 and uploads transformed data back to S3 when the job is complete." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create Sklearn SageMaker Processing script" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%writefile preprocess.py\n", "\n", "import glob\n", "import logging\n", "import os\n", "import subprocess\n", "import sys\n", "\n", "subprocess.check_call([sys.executable, '-m', 'pip', 'install', 'sagemaker'])\n", "\n", "from zipfile import ZipFile\n", "# from time import gmtime, strftime\n", "import socket\n", "import shutil\n", "import json\n", "import time\n", "import argparse\n", "import boto3\n", "import uuid\n", "\n", "n_cores = os.cpu_count()\n", "# host_name = socket.gethostname()\n", "# print(host_name)\n", "# print(os.environ)\n", "\n", "# Install geopandas dependency before including pandas\n", "subprocess.check_call([sys.executable, \"-m\", \"pip\", \"install\", \"geopandas==0.9.0\"])\n", "\n", "import pandas as pd # noqa: E402\n", "import geopandas as gpd # noqa: E402\n", "from sklearn.model_selection import train_test_split # noqa: E402\n", "\n", "import sagemaker\n", "from sagemaker.feature_store.feature_group import FeatureGroup\n", "\n", "def get_session(region, default_bucket):\n", " \"\"\"Gets the sagemaker session based on the region.\n", " \"\"\"\n", "\n", " boto_session = boto3.Session(region_name=region)\n", "\n", " sagemaker_client = boto_session.client(\"sagemaker\")\n", " return sagemaker.session.Session(\n", " boto_session=boto_session,\n", " sagemaker_client=sagemaker_client,\n", " default_bucket=default_bucket,\n", " )\n", "\n", "\n", "logger = logging.getLogger()\n", "logger.setLevel(logging.INFO)\n", "logger.addHandler(logging.StreamHandler())\n", "\n", "def parse_args() -> None:\n", " parser = argparse.ArgumentParser()\n", " parser.add_argument('--ingest_featuregroup_name', type=str, default=None)\n", " parser.add_argument('--region', type=str)\n", " parser.add_argument('--bucket', type=str)\n", " parser.add_argument('--base_dir', type=str, default=\"/opt/ml/processing\")\n", " args, _ = parser.parse_known_args()\n", " return args\n", "\n", "def extract_zones(zones_file: str, zones_dir: str):\n", " logger.info(f\"Extracting zone file: {zones_file}\")\n", " with ZipFile(zones_file, \"r\") as zip:\n", " zip.extractall(zones_dir)\n", "\n", "\n", "def load_zones(zones_dir: str):\n", " logging.info(f\"Loading zones from {zones_dir}\")\n", " # Load the shape file and get the geometry and lat/lon\n", " zone_df = gpd.read_file(os.path.join(zones_dir, \"taxi_zones.shp\"))\n", " # Get centroids as EPSG code of 3310 to measure distance\n", " zone_df[\"centroid\"] = zone_df.geometry.centroid.to_crs(epsg=3310)\n", " # Convert cordinates to the WSG84 lat/long CRS has a EPSG code of 4326.\n", " zone_df[\"latitude\"] = zone_df.centroid.to_crs(epsg=4326).x\n", " zone_df[\"longitude\"] = zone_df.centroid.to_crs(epsg=4326).y\n", " return zone_df\n", "\n", "\n", "def load_data(file_list: list):\n", " # Define dates, and columns to use\n", " use_cols = [\n", " \"fare_amount\",\n", " \"lpep_pickup_datetime\",\n", " \"lpep_dropoff_datetime\",\n", " \"passenger_count\",\n", " \"PULocationID\",\n", " \"DOLocationID\",\n", " ]\n", " # Concat input files with select columns\n", " dfs = []\n", " for file in file_list:\n", " df = pd.read_parquet(file, engine='pyarrow', columns=use_cols)\n", " df = df.fillna(0)\n", " df['passenger_count'] = df['passenger_count'].astype('int64')\n", " dfs.append(df)\n", " return pd.concat(dfs, ignore_index=True)\n", "\n", "\n", "def enrich_data(trip_df: pd.DataFrame, zone_df: pd.DataFrame):\n", " # Join trip DF to zones for poth pickup and drop off locations\n", " trip_df = gpd.GeoDataFrame(\n", " trip_df.join(zone_df, on=\"PULocationID\").join(\n", " zone_df, on=\"DOLocationID\", rsuffix=\"_DO\", lsuffix=\"_PU\"\n", " )\n", " )\n", " trip_df[\"geo_distance\"] = (\n", " trip_df[\"centroid_PU\"].distance(trip_df[\"centroid_DO\"]) / 1000\n", " )\n", "\n", " # Add date parts\n", " trip_df[\"lpep_pickup_datetime\"] = pd.to_datetime(trip_df[\"lpep_pickup_datetime\"])\n", " trip_df[\"hour\"] = trip_df[\"lpep_pickup_datetime\"].dt.hour\n", " trip_df[\"weekday\"] = trip_df[\"lpep_pickup_datetime\"].dt.weekday\n", " trip_df[\"month\"] = trip_df[\"lpep_pickup_datetime\"].dt.month\n", "\n", " # Get calculated duration in minutes\n", " trip_df[\"lpep_dropoff_datetime\"] = pd.to_datetime(trip_df[\"lpep_dropoff_datetime\"])\n", " trip_df[\"duration_minutes\"] = (\n", " trip_df[\"lpep_dropoff_datetime\"] - trip_df[\"lpep_pickup_datetime\"]\n", " ).dt.seconds / 60\n", "\n", " # Rename and filter cols\n", " trip_df = trip_df.rename(\n", " columns={\n", " \"latitude_PU\": \"pickup_latitude\",\n", " \"longitude_PU\": \"pickup_longitude\",\n", " \"latitude_DO\": \"dropoff_latitude\",\n", " \"longitude_DO\": \"dropoff_longitude\",\n", " }\n", " )\n", " \n", " trip_df['FS_ID'] = trip_df.index + 1000\n", " current_time_sec = int(round(time.time()))\n", " trip_df[\"FS_time\"] = pd.Series([current_time_sec]*len(trip_df), dtype=\"float64\")\n", " return trip_df\n", "\n", "\n", "def clean_data(trip_df: pd.DataFrame):\n", " # Remove outliers\n", " trip_df = trip_df[\n", " (trip_df.fare_amount > 0)\n", " & (trip_df.fare_amount < 200)\n", " & (trip_df.passenger_count > 0)\n", " & (trip_df.duration_minutes > 0)\n", " & (trip_df.duration_minutes < 120)\n", " & (trip_df.geo_distance > 0)\n", " & (trip_df.geo_distance < 121)\n", " ].dropna()\n", "\n", " # Filter columns\n", " cols = [\n", " \"fare_amount\",\n", " \"passenger_count\",\n", " \"pickup_latitude\",\n", " \"pickup_longitude\",\n", " \"dropoff_latitude\",\n", " \"dropoff_longitude\",\n", " \"geo_distance\",\n", " \"hour\",\n", " \"weekday\",\n", " \"month\",\n", " ]\n", " \n", " cols_fg = [\n", " \"fare_amount\",\n", " \"passenger_count\",\n", " \"pickup_latitude\",\n", " \"pickup_longitude\",\n", " \"dropoff_latitude\",\n", " \"dropoff_longitude\",\n", " \"geo_distance\",\n", " \"hour\",\n", " \"weekday\",\n", " \"month\",\n", " \"FS_ID\",\n", " \"FS_time\"\n", " ]\n", " return trip_df[cols], trip_df[cols_fg]\n", "\n", "def ingest_data(data_fg: pd.DataFrame, fg_name: str, sagemaker_session) -> None:\n", " \n", " # 4 threads per python process\n", " num_workers = 4\n", " num_processes = n_cores\n", " logger.info(f'Ingesting into feature group [{fg_name}] using {num_processes} processes and {num_workers} workers')\n", " fg = FeatureGroup(name=fg_name, sagemaker_session=sagemaker_session)\n", " response = fg.ingest(data_frame=data_fg, max_processes=num_processes, max_workers=num_workers, wait=True)\n", " \"\"\"\n", " The ingest call above returns an IngestionManagerPandas instance as a response. Zero based indices of rows \n", " that failed to be ingested are captured via failed_rows in this response. By asserting this count to be 0,\n", " we validated that all rows were successfully ingested without a failure.\n", " \"\"\"\n", " assert len(response.failed_rows) == 0\n", "\n", "\n", "def save_files(base_dir: str, data_df: pd.DataFrame, data_fg: pd.DataFrame, fg_name: str, \n", " val_size=0.2, test_size=0.05, current_host=None, sagemaker_session=None):\n", " \n", " logger.info(f\"Splitting {len(data_df)} rows of data into train, val, test.\")\n", "\n", " train_df, val_df = train_test_split(data_df, test_size=val_size, random_state=42)\n", " val_df, test_df = train_test_split(val_df, test_size=test_size, random_state=42)\n", "\n", " logger.info(f\"Writing out datasets to {base_dir}\")\n", " tmp_id = uuid.uuid4().hex[:8]\n", " train_df.to_csv(f\"{base_dir}/train/train_{current_host}_{tmp_id}.csv\", header=False, index=False)\n", " val_df.to_csv(f\"{base_dir}/validation/validation_{current_host}_{tmp_id}.csv\", header=False, index=False)\n", "\n", " # Save test data without header\n", " test_df.to_csv(f\"{base_dir}/test/test_{current_host}_{tmp_id}.csv\", header=False, index=False)\n", "\n", " \n", " if fg_name:\n", " # batch ingestion to the feature group of all the data\n", " ingest_data(data_fg, fg_name, sagemaker_session)\n", "\n", " return\n", "\n", "def _read_json(path): # type: (str) -> dict\n", " \"\"\"Read a JSON file.\n", " Args:\n", " path (str): Path to the file.\n", " Returns:\n", " (dict[object, object]): A dictionary representation of the JSON file.\n", " \"\"\"\n", " with open(path, \"r\") as f:\n", " return json.load(f)\n", "\n", "def main(base_dir: str, args: argparse.Namespace):\n", " # Input data files\n", " input_dir = os.path.join(base_dir, \"input/data\")\n", " input_file_list = glob.glob(f\"{input_dir}/*.parquet\")\n", " logger.info(f\"Input file list: {input_file_list}\")\n", " \n", " if len(input_file_list) == 0:\n", " raise Exception(f\"No input files found in {input_dir}\")\n", "\n", "\n", " hosts = _read_json(\"/opt/ml/config/resourceconfig.json\")\n", " logger.info(hosts)\n", " current_host = hosts[\"current_host\"]\n", " logger.info(current_host)\n", " \n", "\n", " # Input zones file\n", " zones_dir = os.path.join(base_dir, \"input/zones\")\n", " zones_file = os.path.join(zones_dir, \"taxi_zones.zip\")\n", " if not os.path.exists(zones_file):\n", " raise Exception(f\"Zones file {zones_file} does not exist\")\n", "\n", " # Extract and load taxi zones geopandas dataframe\n", " extract_zones(zones_file, zones_dir)\n", " zone_df = load_zones(zones_dir)\n", "\n", " # Load input files\n", " data_df = load_data(input_file_list)\n", " data_df = enrich_data(data_df, zone_df)\n", " data_df, data_fg = clean_data(data_df)\n", " \n", " fg_name = args.ingest_featuregroup_name\n", " \n", " sagemaker_session = get_session(args.region, args.bucket)\n", " \n", " return save_files(base_dir, data_df, data_fg, fg_name, current_host=current_host, sagemaker_session=sagemaker_session)\n", "\n", "\n", "if __name__ == \"__main__\":\n", " logger.info(\"Starting preprocessing.\")\n", " args = parse_args()\n", " base_dir = args.base_dir\n", " main(base_dir, args)\n", " logger.info(\"Done\")\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "process_script = os.getcwd() + '/preprocess.py'\n", "%store process_script" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Before starting the SageMaker Processing job, we instantiate a `SKLearnProcessor` object. This object allows you to specify the instance type to use in the job, as well as how many instances." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "from sagemaker.sklearn.processing import SKLearnProcessor\n", "from sagemaker.processing import ProcessingInput, ProcessingOutput\n", "from sagemaker.utils import name_from_base\n", "\n", "base_job_name = 'sm-tripfare-processing'\n", "sklearn_processor = SKLearnProcessor(\n", " framework_version=\"0.23-1\",\n", " role=get_execution_role(),\n", " instance_type=\"ml.m5.4xlarge\",\n", " instance_count=2, \n", " base_job_name=base_job_name,\n", ")\n", "\n", "processing_job_name = name_from_base(base_job_name)\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "train_path = f\"s3://{bucket}/{prefix}/train/{processing_job_name}\"\n", "validation_path = f\"s3://{bucket}/{prefix}/validation/{processing_job_name}\"\n", "test_path = f\"s3://{bucket}/{prefix}/test/{processing_job_name}\"\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "\n", "sklearn_processor.run(\n", " code='preprocess.py',\n", " arguments = [\n", " '--ingest_featuregroup_name', tripfare_feature_group_name,\n", " '--region', region,\n", " '--bucket', bucket,\n", " ],\n", " inputs=[\n", " ProcessingInput(\n", " source=input_data,\n", " destination=\"/opt/ml/processing/input/data\",\n", " s3_data_distribution_type=\"ShardedByS3Key\",\n", " ),\n", " ProcessingInput(\n", " source=input_zones,\n", " destination=\"/opt/ml/processing/input/zones\",\n", " s3_data_distribution_type=\"FullyReplicated\",\n", " ),\n", " ],\n", " outputs=[\n", " ProcessingOutput(output_name=\"train\", source=\"/opt/ml/processing/train\", destination=train_path),\n", " ProcessingOutput(output_name=\"validation\", source=\"/opt/ml/processing/validation\", destination=validation_path),\n", " ProcessingOutput(output_name=\"test\", source=\"/opt/ml/processing/test\", destination=test_path),\n", " ],\n", " job_name=processing_job_name,\n", "\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%store train_path\n", "%store validation_path\n", "%store test_path" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Get feature record from the Online feature store " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "featurestore_runtime_client = sagemaker_session.boto_session.client('sagemaker-featurestore-runtime', region_name=region)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Retrieve a record from customers feature group" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from random import randint\n", "feature_id = f'{randint(1000, 10000)}'\n", "print(f'feature_id={feature_id}') " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "feature_record = featurestore_runtime_client.get_record(FeatureGroupName=tripfare_feature_group_name, \n", " RecordIdentifierValueAsString=feature_id)\n", "feature_record" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# List feature groups " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sagemaker_client = sagemaker_session.boto_session.client('sagemaker', region_name=region)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response = sagemaker_client.list_feature_groups()\n", "for fg in response['FeatureGroupSummaries']:\n", " fg_name = fg['FeatureGroupName']\n", " print(f'Found feature group: {fg_name}')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "\n", "## End of Lab 1" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "instance_type": "ml.t3.medium", "kernelspec": { "display_name": "Python 3 (Data Science)", "language": "python", "name": "python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-east-1:081325390199:image/datascience-1.0" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.10" } }, "nbformat": 4, "nbformat_minor": 4 }