{ "cells": [ { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "# Music Recommender Data Preparation with SageMaker Feature Store and SageMaker Data Wrangler\n" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "\n", "This notebook's CI test result for us-west-2 is as follows. CI test results in other regions can be found at the end of the notebook. \n", "\n", "![This us-west-2 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/us-west-2/end_to_end|music_recommendation|02_export_feature_groups.ipynb)\n", "\n", "---" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "\n", "----\n", "\n", "## Background\n", "\n", "This notebook is part of a notebook series that goes through the ML lifecycle and shows how we can build a Music Recommender System using a combination of SageMaker services and features. This notebook uses Amazon SageMaker Feature Store (Feature Store) to create a feature group, \n", "executes your Data Wrangler Flow `01_music_dataprep.flow` on the entire dataset using a SageMaker \n", "Processing Job and ingest processed data to Feature Store. It is the second notebook in the series. You can choose to run this notebook by itself or in sequence with the other notebooks listed below. Please see the [README.md](README.md) for more information about this use case implement of this sequence of notebooks. \n", "\n", "1. [Music Recommender Data Exploration](01_data_exploration.ipynb)\n", "1. [Music Recommender Data Preparation with SageMaker Feature Store and SageMaker Data Wrangler](02_export_feature_groups.ipynb) (current notebook)\n", "1. [Train, Deploy, and Monitor the Music Recommender Model using SageMaker SDK](03_train_deploy_debugger_explain_monitor_registry.ipynb)\n", "\n", "----\n", "\n", "## Contents\n", "1. [Prereqs: Get Data](#Prereqs:-Get-Data)\n", "1. [Update the Data Source in the .flow File](#Update-the-Data-Source-in-the-.flow-File)\n", "1. [Create Feature Group](#Create-Feature-Group)\n", "1. [Configure Feature Group](#Configure-Feature-Group)\n", "1. [Initialize & Create Feature Group](#Initialize-&-Create-Feature-Group)\n", "1. [Inputs and Outputs](#Inputs-and-Outputs)\n", "1. [Upload Flow to S3](#Upload-Flow-to-S3)\n", "1. [Run Processing Job](#Run-Processing-Job)\n", "1. [Fetch Data from Offline Feature Store](#Fetch-Data-from-Offline-Feature-Store)\n", "\n", "\n", "
💡 Quick Start \n", "To save your processed data to feature store, \n", " Click here to create a feature group and follow the instruction to run a SageMaker processing job.\n", "\n", "
\n", "\n", "\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import sys\n", "import pprint\n", "\n", "sys.path.insert(1, \"./code\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# update pandas to avoid data type issues in older 1.0 version\n", "!pip install pandas --upgrade --quiet\n", "import pandas as pd\n", "\n", "print(pd.__version__)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# create data folder\n", "!mkdir data" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import pandas as pd\n", "import matplotlib.pyplot as plt\n", "\n", "%matplotlib inline\n", "\n", "import json\n", "import sagemaker\n", "import boto3\n", "import os\n", "from awscli.customizations.s3.utils import split_s3_bucket_key\n", "\n", "# SageMaker session\n", "sess = sagemaker.Session()\n", "# get session bucket name\n", "bucket = sess.default_bucket()\n", "# bucket prefix or the subfolder for everything we produce\n", "prefix = \"music-recommendation\"\n", "# s3 client\n", "s3_client = boto3.client(\"s3\")\n", "\n", "print(f\"this is your default SageMaker Studio bucket name: {bucket}\")" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## Prereqs: Get Data \n", "\n", "----\n", "\n", "Here we will download the music data from a public S3 bucket that we'll be using for this demo and uploads it to your default S3 bucket that was created for you when you initially created a SageMaker Studio workspace. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from demo_helpers import get_data, get_model, update_data_sources" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# create data folder\n", "!mkdir data" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# public S3 bucket that contains our music data\n", "s3_bucket_music_data = (\n", " f\"s3://sagemaker-example-files-prod-{sess.boto_region_name}/datasets/tabular/synthetic-music\"\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "new_data_paths = get_data(\n", " s3_client,\n", " [f\"{s3_bucket_music_data}/tracks.csv\", f\"{s3_bucket_music_data}/ratings.csv\"],\n", " bucket,\n", " prefix,\n", " sample_data=0.70,\n", ")\n", "print(new_data_paths)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# these are the new file paths located on your SageMaker Studio default s3 storage bucket\n", "tracks_data_source = f\"s3://{bucket}/{prefix}/tracks.csv\"\n", "ratings_data_source = f\"s3://{bucket}/{prefix}/ratings.csv\"" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## Update the Data Source in the `.flow` File\n", "\n", "----\n", "The `01_music_datapred.flow` file is a JSON file containing instructions for where to find your data sources and how to transform the data. We'll be updating the object telling Data Wrangler where to find the input data on S3. We will set this to your default S3 bucket. With this update to the `.flow` file it now points to your new S3 bucket as the data source used by SageMaker Data Wrangler.\n", "\n", "Make sure the `.flow` file is closed before running this next step or it won't update the new s3 file locations in the file" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "update_data_sources(\"01_music_dataprep.flow\", tracks_data_source, ratings_data_source)" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## Create Feature Group\n", "----\n", "\n", "[Amazon SageMaker Feature Store](https://www.youtube.com/watch?v=pEg5c6d4etI) is a fully managed, purpose-built repository to store, update, retrieve, and share machine learning (ML) features. Features are the attributes or properties models use during training and inference to make predictions. For example, in a ML application that recommends a music playlist, features could include song ratings, which songs were listened to previously, and how long songs were listened to. The accuracy of a ML model is based on a precise set and composition of features. Often, these features are used repeatedly by multiple teams training multiple models. And whichever feature set was used to train the model needs to be available to make real-time predictions (inference). Keeping a single source of features that is consistent and up-to-date across these different access patterns is a challenge as most organizations keep two different feature stores, one for training and one for inference.\n", "\n", "Amazon SageMaker Feature Store is a purpose-built repository where you can store and access features so it’s much easier to name, organize, and reuse them across teams. SageMaker Feature Store provides a unified store for features during training and real-time inference without the need to write additional code or create manual processes to keep features consistent. SageMaker Feature Store keeps track of the metadata of stored features (e.g. feature name or version number) so that you can query the features for the right attributes in batches or in real time using Amazon Athena, an interactive query service. SageMaker Feature Store also keeps features updated, because as new data is generated during inference, the single repository is updated so new features are always available for models to use during training and inference.\n", "\n", "_What is a feature group_\n", "\n", "A single feature corresponds to a column in your dataset. A feature group is a predefined schema for a \n", "collection of features - each feature in the feature group has a specified data type and name. \n", "A single record in a feature group corresponds to a row in your dataframe. A feature store is a \n", "collection of feature groups. To learn more about SageMaker Feature Store, see \n", "[Amazon Feature Store Documentation](http://docs.aws.amazon.com/sagemaker/latest/dg/feature-store.html)." ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "### Define Feature Group \n", "\n", "Select Record identifier and Event time feature name. These are required parameters for feature group\n", "creation.\n", "* **Record identifier name** is the name of the feature defined in the feature group's feature definitions \n", "whose value uniquely identifies a Record defined in the feature group's feature definitions.\n", "* **Event time feature name** is the name of the EventTime feature of a Record in FeatureGroup. An EventTime \n", "is a timestamp that represents the point in time when a new event occurs that corresponds to the creation or \n", "update of a Record in the FeatureGroup. All Records in the FeatureGroup must have a corresponding EventTime.\n", "\n", "
💡Record identifier and Event time feature name are required \n", "for feature group. After filling in the values, you can choose Run Selected Cell and All Below \n", "from the Run Menu from the menu bar. \n", "
" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# feature group name, with flow_name and an unique id. You can give it a customized name\n", "feature_group_names = [\n", " \"track-features-music-rec\",\n", " \"user-5star-track-features-music-rec\",\n", " \"ratings-features-music-rec\",\n", "]\n", "print(f\"Feature Group Name: {feature_group_names}\")\n", "\n", "record_identifier_feature_names = {\n", " \"track-features-music-rec\": \"trackId\",\n", " \"user-5star-track-features-music-rec\": \"userId\",\n", " \"ratings-features-music-rec\": \"ratingEventId\",\n", "}\n", "event_time_feature_name = \"EventTime\"" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "### Feature Definitions\n", "The following is a list of the feature names and feature types of the final dataset that will be produced \n", "when your data flow is used to process your input dataset. These are automatically generated from the \n", "step `Custom Pyspark` from `Source: Answers.Csv`. To save from a different step, go to Data Wrangler to \n", "select a new step to export.\n", "\n", "
💡 Configurable Settings \n", "\n", "1. You can select a subset of the features. By default all columns of the result dataframe will be used as \n", "features.\n", "2. You can change the Data Wrangler data type to one of the Feature Store supported types \n", "(Integral, Fractional, or String). The default type is set to String. \n", "This means that, if a column in your dataset is not a float or long type, it will default \n", "to String in your Feature Store.\n", "\n", "For Event Time features, make sure the format follows the feature store\n", "\n", " \n", " Event Time feature format\n", " \n", "\n", "
" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "The following is a list of the feature names and data types of the final dataset that will be produced when your data flow is used to process your input dataset." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "track_column_schemas = [\n", " {\"name\": \"trackId\", \"type\": \"string\"},\n", " {\"name\": \"length\", \"type\": \"float\"},\n", " {\"name\": \"energy\", \"type\": \"float\"},\n", " {\"name\": \"acousticness\", \"type\": \"float\"},\n", " {\"name\": \"valence\", \"type\": \"float\"},\n", " {\"name\": \"speechiness\", \"type\": \"float\"},\n", " {\"name\": \"instrumentalness\", \"type\": \"float\"},\n", " {\"name\": \"liveness\", \"type\": \"float\"},\n", " {\"name\": \"tempo\", \"type\": \"float\"},\n", " {\"name\": \"genre_Folk\", \"type\": \"float\"},\n", " {\"name\": \"genre_Country\", \"type\": \"float\"},\n", " {\"name\": \"genre_Latin\", \"type\": \"float\"},\n", " {\"name\": \"genre_Jazz\", \"type\": \"float\"},\n", " {\"name\": \"genre_RnB\", \"type\": \"float\"},\n", " {\"name\": \"genre_Reggae\", \"type\": \"float\"},\n", " {\"name\": \"genre_Rap\", \"type\": \"float\"},\n", " {\"name\": \"genre_Pop_Rock\", \"type\": \"float\"},\n", " {\"name\": \"genre_Electronic\", \"type\": \"float\"},\n", " {\"name\": \"genre_Blues\", \"type\": \"float\"},\n", " {\"name\": \"danceability\", \"type\": \"float\"},\n", " {\"name\": \"EventTime\", \"type\": \"float\"},\n", "]\n", "\n", "user_column_schemas = [\n", " {\"name\": \"userId\", \"type\": \"long\"},\n", " {\"name\": \"energy_5star\", \"type\": \"float\"},\n", " {\"name\": \"acousticness_5star\", \"type\": \"float\"},\n", " {\"name\": \"valence_5star\", \"type\": \"float\"},\n", " {\"name\": \"speechiness_5star\", \"type\": \"float\"},\n", " {\"name\": \"instrumentalness_5star\", \"type\": \"float\"},\n", " {\"name\": \"liveness_5star\", \"type\": \"float\"},\n", " {\"name\": \"tempo_5star\", \"type\": \"float\"},\n", " {\"name\": \"danceability_5star\", \"type\": \"float\"},\n", " {\"name\": \"genre_Latin_5star\", \"type\": \"float\"},\n", " {\"name\": \"genre_Folk_5star\", \"type\": \"float\"},\n", " {\"name\": \"genre_Blues_5star\", \"type\": \"float\"},\n", " {\"name\": \"genre_Rap_5star\", \"type\": \"float\"},\n", " {\"name\": \"genre_Reggae_5star\", \"type\": \"float\"},\n", " {\"name\": \"genre_Jazz_5star\", \"type\": \"float\"},\n", " {\"name\": \"genre_RnB_5star\", \"type\": \"float\"},\n", " {\"name\": \"genre_Country_5star\", \"type\": \"float\"},\n", " {\"name\": \"genre_Electronic_5star\", \"type\": \"float\"},\n", " {\"name\": \"genre_Pop_Rock_5star\", \"type\": \"float\"},\n", " {\"name\": \"EventTime\", \"type\": \"float\"},\n", "]\n", "\n", "rating_column_schemas = [\n", " {\"name\": \"ratingEventId\", \"type\": \"string\"},\n", " {\"name\": \"ts\", \"type\": \"long\"},\n", " {\"name\": \"userId\", \"type\": \"long\"},\n", " {\"name\": \"trackId\", \"type\": \"string\"},\n", " {\"name\": \"sessionId\", \"type\": \"long\"},\n", " {\"name\": \"itemInSession\", \"type\": \"long\"},\n", " {\"name\": \"Rating\", \"type\": \"float\"},\n", " {\"name\": \"EventTime\", \"type\": \"float\"},\n", "]\n", "\n", "column_schemas = {\n", " \"track-features-music-rec\": track_column_schemas,\n", " \"user-5star-track-features-music-rec\": user_column_schemas,\n", " \"ratings-features-music-rec\": rating_column_schemas,\n", "}" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "Below we create the SDK input for those feature definitions. Some schema types in Data Wrangler are not \n", "supported by Feature Store. The following will create a default_FG_type set to String for these types." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.feature_store.feature_definition import FeatureDefinition\n", "from sagemaker.feature_store.feature_definition import FeatureTypeEnum\n", "\n", "default_feature_type = FeatureTypeEnum.STRING\n", "column_to_feature_type_mapping = {\n", " \"float\": FeatureTypeEnum.FRACTIONAL,\n", " \"long\": FeatureTypeEnum.INTEGRAL,\n", "}\n", "\n", "feature_definitions = {}\n", "for feature_group_name in feature_group_names:\n", " feature_definition = [\n", " FeatureDefinition(\n", " feature_name=column_schema[\"name\"],\n", " feature_type=column_to_feature_type_mapping.get(\n", " column_schema[\"type\"], default_feature_type\n", " ),\n", " )\n", " for column_schema in column_schemas[feature_group_name]\n", " ]\n", " feature_definitions[feature_group_name] = feature_definition" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## Configure Feature Group\n", "\n", "----\n", "
💡 Configurable Settings \n", "\n", "1. feature_group_name: name of the feature group.\n", "1. feature_store_offline_s3_uri: SageMaker FeatureStore writes the data in the OfflineStore of a FeatureGroup to a S3 location owned by you.\n", "1. enable_online_store: controls if online store is enabled. Enabling the online store allows quick access to the latest value for a Record via the GetRecord API.\n", "1. iam_role: IAM role for executing the processing job.\n", "
" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from time import gmtime, strftime\n", "import uuid" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# IAM role for executing the processing job.\n", "iam_role = sagemaker.get_execution_role()\n", "\n", "# flow name and an unique ID for this export (used later as the processing job name for the export)\n", "flow_name = \"01_music_dataprep\"\n", "flow_export_id = f\"{strftime('%d-%H-%M-%S', gmtime())}-{str(uuid.uuid4())[:8]}\"\n", "flow_export_name = f\"flow-{flow_export_id}\"\n", "\n", "# SageMaker FeatureStore writes the data in the OfflineStore of a FeatureGroup to a\n", "# S3 location owned by you.\n", "feature_store_offline_s3_uri = \"s3://\" + bucket\n", "\n", "# controls if online store is enabled. Enabling the online store allows quick access to\n", "# the latest value for a Record via the GetRecord API.\n", "enable_online_store = True\n", "fg_name_tracks = feature_group_name" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## Initialize & Create Feature Group\n", "\n", "----" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Initialize Boto3 session that is required to create feature group\n", "import boto3\n", "from sagemaker.session import Session\n", "\n", "region = boto3.Session().region_name\n", "boto_session = boto3.Session(region_name=region)\n", "\n", "sagemaker_client = boto_session.client(service_name=\"sagemaker\", region_name=region)\n", "featurestore_runtime = boto_session.client(\n", " service_name=\"sagemaker-featurestore-runtime\", region_name=region\n", ")\n", "\n", "feature_store_session = Session(\n", " boto_session=boto_session,\n", " sagemaker_client=sagemaker_client,\n", " sagemaker_featurestore_runtime_client=featurestore_runtime,\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.feature_store.feature_group import FeatureGroup\n", "import time\n", "\n", "\n", "def wait_for_feature_group_creation_complete(feature_group):\n", " \"\"\"Helper function to wait for the completions of creating a feature group\"\"\"\n", " status = feature_group.describe().get(\"FeatureGroupStatus\")\n", " while status == \"Creating\":\n", " print(\"Waiting for Feature Group Creation\")\n", " time.sleep(5)\n", " status = feature_group.describe().get(\"FeatureGroupStatus\")\n", " if status != \"Created\":\n", " raise SystemExit(f\"Failed to create feature group {feature_group.name}: {status}\")\n", " print(f\"FeatureGroup {feature_group.name} successfully created.\")\n", "\n", "\n", "def create_feature_group(feature_group_name, feature_store_session, feature_definitions):\n", " feature_group = FeatureGroup(\n", " name=feature_group_name,\n", " sagemaker_session=feature_store_session,\n", " feature_definitions=feature_definitions[feature_group_name],\n", " )\n", "\n", " # only create feature group if it doesn't already exist\n", " try:\n", " sagemaker_client.describe_feature_group(\n", " FeatureGroupName=feature_group_name, NextToken=\"string\"\n", " )\n", " feature_group_exists = True\n", " print(\"Feature Group {0} already exists. Using {0}\".format(feature_group_name))\n", " except Exception as e:\n", " error = e.response.get(\"Error\").get(\"Code\")\n", " if error == \"ResourceNotFound\":\n", " feature_group_exists = False\n", " print(\"Creating Feature Group {}\".format(feature_group_name))\n", " feature_group.create(\n", " s3_uri=feature_store_offline_s3_uri,\n", " record_identifier_name=record_identifier_feature_names[feature_group_name],\n", " event_time_feature_name=event_time_feature_name,\n", " role_arn=iam_role,\n", " enable_online_store=enable_online_store,\n", " )\n", " # Invoke the Feature Store API to create the feature group and wait until it is ready\n", " wait_for_feature_group_creation_complete(feature_group=feature_group)\n", " if error == \"ResourceInUse\":\n", " feature_group_exists = True\n", " print(\"Feature Group {0} already exists. Using {0}\".format(feature_group_name))\n", "\n", " return feature_group_exists" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "Feature group is initialized and created below" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "feature_group_existence = {}\n", "for feature_group_name in feature_group_names:\n", " feature_group_exists = create_feature_group(\n", " feature_group_name, feature_store_session, feature_definitions\n", " )\n", " feature_group_existence[feature_group_name] = feature_group_exists" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "Now that the feature group is created, You will use a processing job to process your \n", " data at scale and ingest the transformed data into this feature group." ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## Inputs and Outputs\n", "\n", "----\n", "\n", "The below settings configure the inputs and outputs for the flow export.\n", "\n", "
💡 Configurable Settings \n", "\n", "In Input - Source you can configure the data sources that will be used as input by Data Wrangler\n", "\n", "1. For S3 sources, configure the source attribute that points to the input S3 prefixes\n", "2. For all other sources, configure attributes like query_string, database in the source's \n", "DatasetDefinition object.\n", "\n", "If you modify the inputs the provided data must have the same schema and format as the data used in the Flow. \n", "You should also re-execute the cells in this section if you have modified the settings in any data sources.\n", "
" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.processing import ProcessingInput, ProcessingOutput\n", "from sagemaker.dataset_definition.inputs import (\n", " AthenaDatasetDefinition,\n", " DatasetDefinition,\n", " RedshiftDatasetDefinition,\n", ")\n", "\n", "data_sources = []" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "### Input - S3 Source: tracks.csv" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data_sources.append(\n", " ProcessingInput(\n", " source=f\"{tracks_data_source}\", # You could override this to point to another dataset on S3\n", " destination=\"/opt/ml/processing/tracks.csv\",\n", " input_name=\"tracks.csv\",\n", " s3_data_type=\"S3Prefix\",\n", " s3_input_mode=\"File\",\n", " s3_data_distribution_type=\"FullyReplicated\",\n", " )\n", ")" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "### Input - S3 Source: ratings.csv" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data_sources.append(\n", " ProcessingInput(\n", " source=f\"{ratings_data_source}\", # You could override this to point to another dataset on S3\n", " destination=\"/opt/ml/processing/ratings.csv\",\n", " input_name=\"ratings.csv\",\n", " s3_data_type=\"S3Prefix\",\n", " s3_input_mode=\"File\",\n", " s3_data_distribution_type=\"FullyReplicated\",\n", " )\n", ")" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "### Output: Feature Store \n", "\n", "Below are the inputs required by the SageMaker Python SDK to launch a processing job with feature store as an output. Notice the `output_name` variable below; this ID is found within the `.flow` file at the node point you want to capture transformations up to. The `.flow` file contains instructions for SageMaker Data Wrangler to know where to look for data and how to transform it. Each data transformation action is associated with a node and therefore a node ID. Using the associated node ID + output name tells SageMaker up to what point in the transformation process you want to export to a feature store." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.processing import FeatureStoreOutput\n", "\n", "# Output name is auto-generated from the select node's ID + output name from the .flow file\n", "output_names = {\n", " \"track-features-music-rec\": \"19ad8e80-2002-4ee9-9753-fe9a384b1166.default\",\n", " \"user-5star-track-features-music-rec\": \"7a6dad19-2c80-43e3-b03d-ec23c3842ae9.default\",\n", " \"ratings-features-music-rec\": \"9a283380-91ca-478e-be99-6ba3bf57c680.default\",\n", "}\n", "\n", "processing_job_outputs = {}\n", "\n", "for feature_group_name in feature_group_names:\n", " processing_job_output = ProcessingOutput(\n", " output_name=output_names[feature_group_name],\n", " app_managed=True,\n", " feature_store_output=FeatureStoreOutput(feature_group_name=feature_group_name),\n", " )\n", " processing_job_outputs[feature_group_name] = processing_job_output" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## Upload Flow to S3\n", "\n", "----\n", "To use the Data Wrangler as an input to the processing job, first upload your flow file to Amazon S3." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import os\n", "import json\n", "import boto3\n", "\n", "# name of the flow file which should exist in the current notebook working directory\n", "flow_file_name = \"01_music_dataprep.flow\"\n", "\n", "# Load .flow file from current notebook working directory\n", "!echo \"Loading flow file from current notebook working directory: $PWD\"\n", "\n", "with open(flow_file_name) as f:\n", " flow = json.load(f)\n", "\n", "# Upload flow to S3\n", "s3_client = boto3.client(\"s3\")\n", "s3_client.upload_file(\n", " flow_file_name, bucket, f\"{prefix}/data_wrangler_flows/{flow_export_name}.flow\"\n", ")\n", "\n", "flow_s3_uri = f\"s3://{bucket}/{prefix}/data_wrangler_flows/{flow_export_name}.flow\"\n", "\n", "print(f\"Data Wrangler flow {flow_file_name} uploaded to {flow_s3_uri}\")" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "The Data Wrangler Flow is also provided to the Processing Job as an input source which we configure below." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "## Input - Flow: 01_music_dataprep.flow\n", "flow_input = ProcessingInput(\n", " source=flow_s3_uri,\n", " destination=\"/opt/ml/processing/flow\",\n", " input_name=\"flow\",\n", " s3_data_type=\"S3Prefix\",\n", " s3_input_mode=\"File\",\n", " s3_data_distribution_type=\"FullyReplicated\",\n", ")" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## Run Processing Job\n", "\n", "----\n", "### Job Configurations\n", "\n", "
💡 Configurable Settings \n", "\n", "You can configure the following settings for Processing Jobs. If you change any configurations you will \n", "need to re-execute this and all cells below it by selecting the Run menu above and click \n", "Run Selected Cells and All Below\n", "\n", "1. IAM role for executing the processing job. \n", "2. A unique name of the processing job. Give a unique name every time you re-execute processing jobs\n", "3. Data Wrangler Container URL.\n", "4. Instance count, instance type and storage volume size in GB.\n", "5. Content type for each output. Data Wrangler supports CSV as default and Parquet.\n", "6. Network Isolation settings\n", "
" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Data Wrangler Container URL.\n", "container_uri = sagemaker.image_uris.retrieve(framework=\"data-wrangler\", region=region)\n", "\n", "# Processing Job Instance count and instance type.\n", "instance_count = 2\n", "instance_type = \"ml.m5.4xlarge\"\n", "\n", "# Size in GB of the EBS volume to use for storing data during processing\n", "volume_size_in_gb = 30\n", "\n", "# Content type for each output. Data Wrangler supports CSV as default and Parquet.\n", "output_content_type = \"CSV\"\n", "\n", "# Network Isolation mode; default is off\n", "enable_network_isolation = False" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "### Create Processing Job\n", "\n", "To launch a Processing Job, you will use the SageMaker Python SDK to create a Processor function." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.processing import Processor\n", "from sagemaker.network import NetworkConfig\n", "\n", "processor = Processor(\n", " role=iam_role,\n", " image_uri=container_uri,\n", " instance_count=instance_count,\n", " instance_type=instance_type,\n", " volume_size_in_gb=volume_size_in_gb,\n", " network_config=NetworkConfig(enable_network_isolation=enable_network_isolation),\n", " sagemaker_session=sess,\n", ")" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "### Job Status & S3 Output Location\n", "\n", "Below you wait for processing job to finish. If it finishes successfully, your feature group should be populated \n", "with transformed feature values. In addition the raw parameters used by the Processing Job will be printed." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "\n", "feature_group_exists = False\n", "for feature_group_name in feature_group_names:\n", " print(f\"Processing {feature_group_name}\")\n", " # Unique processing job name. Give a unique name every time you re-execute processing jobs\n", " processing_job_name = \"dw-flow-proc-music-rec-tracks-{}-{}\".format(\n", " flow_export_id, str(uuid.uuid4())[:8]\n", " )\n", " print(f\"{processing_job_name}\")\n", "\n", " # Output configuration used as processing job container arguments\n", " output_config = {output_names[feature_group_name]: {\"content_type\": output_content_type}}\n", "\n", " # Run Processing Job if job not already previously ran\n", " if feature_group_exists: # feature_group_existence[feature_group_name]\n", " print(\n", " \"Feature Group {0} already exists therefore we will not run a processing job to create it again\".format(\n", " feature_group_name\n", " )\n", " )\n", " else:\n", " print(\"Creating Processing Job: {}\".format(feature_group_name))\n", " processor.run(\n", " inputs=[flow_input] + data_sources,\n", " outputs=[processing_job_outputs[feature_group_name]],\n", " arguments=[f\"--output-config '{json.dumps(output_config)}'\"],\n", " wait=False,\n", " logs=False,\n", " job_name=processing_job_name,\n", " )\n", "\n", " job_result = sess.wait_for_processing_job(processing_job_name)\n", " print(job_result)" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "You can view newly created feature group in Studio, refer to [Use Amazon SageMaker Feature Store with Amazon SageMaker Studio](https://docs.aws.amazon.com/sagemaker/latest/dg/feature-store-use-with-studio.html)\n", "for detailed guide. [Learn more about SageMaker Feature Store](https://github.com/aws/amazon-sagemaker-examples/tree/master/sagemaker-featurestore)" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## Fetch Data from Offline Feature Store\n", "\n", "----\n", "There are 3 feature stores for the ratings, tracks, and user preferences data. We retrieve data from all 3 before joining them." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "feature_groups = []\n", "for name in feature_group_names:\n", " feature_group = FeatureGroup(name=name, sagemaker_session=feature_store_session)\n", " feature_groups.append(feature_group)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "s3_client = boto3.client(\"s3\")\n", "account_id = boto3.client(\"sts\").get_caller_identity()[\"Account\"]\n", "\n", "sagemaker_role = sagemaker.get_execution_role()\n", "\n", "s3_output_path = \"s3://\" + bucket" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "feature_group_s3_prefixes = []\n", "for feature_group in feature_groups:\n", " feature_group_table_name = (\n", " feature_group.describe().get(\"OfflineStoreConfig\").get(\"DataCatalogConfig\").get(\"TableName\")\n", " )\n", " feature_group_s3_prefix = (\n", " f\"{account_id}/sagemaker/{region}/offline-store/{feature_group_table_name}\"\n", " )\n", " feature_group_s3_prefixes.append(feature_group_s3_prefix)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "# wait for data to be added to offline feature store\n", "def wait_for_offline_store(feature_group_s3_prefix):\n", " print(feature_group_s3_prefix)\n", " offline_store_contents = None\n", " while offline_store_contents is None:\n", " objects_in_bucket = s3_client.list_objects(Bucket=bucket, Prefix=feature_group_s3_prefix)\n", " if \"Contents\" in objects_in_bucket and len(objects_in_bucket[\"Contents\"]) > 1:\n", " offline_store_contents = objects_in_bucket[\"Contents\"]\n", " else:\n", " print(\"Waiting for data in offline store...\")\n", " time.sleep(60)\n", " print(\"Data available.\")\n", "\n", "\n", "for s3_prefix in feature_group_s3_prefixes:\n", " wait_for_offline_store(s3_prefix)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "tables = {\n", " \"ratings\": {\"feature_group\": feature_groups[2], \"cols\": [\"userId\", \"trackid\", \"rating\"]},\n", " \"tracks\": {\n", " \"feature_group\": feature_groups[0],\n", " \"cols\": [\n", " \"trackid\",\n", " \"length\",\n", " \"energy\",\n", " \"acousticness\",\n", " \"valence\",\n", " \"speechiness\",\n", " \"instrumentalness\",\n", " \"liveness\",\n", " \"tempo\",\n", " \"danceability\",\n", " \"genre_latin\",\n", " \"genre_folk\",\n", " \"genre_blues\",\n", " \"genre_rap\",\n", " \"genre_reggae\",\n", " \"genre_jazz\",\n", " \"genre_rnb\",\n", " \"genre_country\",\n", " \"genre_electronic\",\n", " \"genre_pop_rock\",\n", " ],\n", " },\n", " \"user_5star_features\": {\n", " \"feature_group\": feature_groups[1],\n", " \"cols\": [\n", " \"userId\",\n", " \"energy_5star\",\n", " \"acousticness_5star\",\n", " \"valence_5star\",\n", " \"speechiness_5star\",\n", " \"instrumentalness_5star\",\n", " \"liveness_5star\",\n", " \"tempo_5star\",\n", " \"danceability_5star\",\n", " \"genre_latin_5star\",\n", " \"genre_folk_5star\",\n", " \"genre_blues_5star\",\n", " \"genre_rap_5star\",\n", " \"genre_reggae_5star\",\n", " \"genre_jazz_5star\",\n", " \"genre_rnb_5star\",\n", " \"genre_country_5star\",\n", " \"genre_electronic_5star\",\n", " \"genre_pop_rock_5star\",\n", " ],\n", " },\n", "}" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "Check if the Athena queries have been done and the data sets exist, then just do train test split or just proceed to training" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def get_train_val():\n", " for k, v in tables.items():\n", " query = v[\"feature_group\"].athena_query()\n", " joined_cols = \", \".join(v[\"cols\"])\n", " # limit number of datapoints for training time\n", " query_string = 'SELECT {} FROM \"{}\" LIMIT 500000'.format(joined_cols, query.table_name)\n", " print(query_string, \"\\n\")\n", "\n", " output_location = f\"s3://{bucket}/{prefix}/query_results/\"\n", " query.run(query_string=query_string, output_location=output_location)\n", " query.wait()\n", "\n", " tables[k][\"df\"] = query.as_dataframe()\n", "\n", " ratings = tables[\"ratings\"][\"df\"]\n", " tracks = tables[\"tracks\"][\"df\"]\n", " user_prefs = tables[\"user_5star_features\"][\"df\"]\n", "\n", " print(\"Merging datasets...\")\n", " print(f\"Ratings: {ratings.shape}\\nTracks: {tracks.shape}\\nUser Prefs: {user_prefs.shape}\\n\")\n", "\n", " dataset = pd.merge(ratings, tracks, on=\"trackid\", how=\"inner\")\n", " dataset = pd.merge(dataset, user_prefs, on=\"userId\", how=\"inner\")\n", " dataset.drop_duplicates(inplace=True)\n", " dataset.drop([\"userId\", \"trackid\"], axis=1, inplace=True)\n", "\n", " # split data\n", " from sklearn.model_selection import train_test_split\n", "\n", " train, val = train_test_split(dataset, test_size=0.2, random_state=42)\n", " print(\n", " \"Training dataset shape: {}\\nValidation dataset shape: {}\\n\".format(train.shape, val.shape)\n", " )\n", "\n", " return train, val" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "import pandas as pd\n", "import glob\n", "\n", "\n", "print(\"Creating training and validation sets...\\n\")\n", "train, val = get_train_val()\n", "# Write to csv in S3 without headers and index column\n", "train.to_csv(\"./data/train_data.csv\", header=False, index=False)\n", "val.to_csv(\"./data/val_data.csv\", header=False, index=False)\n", "\n", "pd.DataFrame({\"ColumnName\": train.columns}).to_csv(\n", " \"./data/train_data_headers.csv\", header=False, index=False\n", ")\n", "pd.DataFrame({\"ColumnName\": val.columns}).to_csv(\n", " \"./data/val_data_headers.csv\", header=False, index=False\n", ")" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## Notebook CI Test Results\n", "\n", "This notebook was tested in multiple regions. The test results are as follows, except for us-west-2 which is shown at the top of the notebook.\n", "\n", "![This us-east-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/us-east-1/end_to_end|music_recommendation|02_export_feature_groups.ipynb)\n", "\n", "![This us-east-2 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/us-east-2/end_to_end|music_recommendation|02_export_feature_groups.ipynb)\n", "\n", "![This us-west-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/us-west-1/end_to_end|music_recommendation|02_export_feature_groups.ipynb)\n", "\n", "![This ca-central-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/ca-central-1/end_to_end|music_recommendation|02_export_feature_groups.ipynb)\n", "\n", "![This sa-east-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/sa-east-1/end_to_end|music_recommendation|02_export_feature_groups.ipynb)\n", "\n", "![This eu-west-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/eu-west-1/end_to_end|music_recommendation|02_export_feature_groups.ipynb)\n", "\n", "![This eu-west-2 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/eu-west-2/end_to_end|music_recommendation|02_export_feature_groups.ipynb)\n", "\n", "![This eu-west-3 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/eu-west-3/end_to_end|music_recommendation|02_export_feature_groups.ipynb)\n", "\n", "![This eu-central-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/eu-central-1/end_to_end|music_recommendation|02_export_feature_groups.ipynb)\n", "\n", "![This eu-north-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/eu-north-1/end_to_end|music_recommendation|02_export_feature_groups.ipynb)\n", "\n", "![This ap-southeast-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/ap-southeast-1/end_to_end|music_recommendation|02_export_feature_groups.ipynb)\n", "\n", "![This ap-southeast-2 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/ap-southeast-2/end_to_end|music_recommendation|02_export_feature_groups.ipynb)\n", "\n", "![This ap-northeast-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/ap-northeast-1/end_to_end|music_recommendation|02_export_feature_groups.ipynb)\n", "\n", "![This ap-northeast-2 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/ap-northeast-2/end_to_end|music_recommendation|02_export_feature_groups.ipynb)\n", "\n", "![This ap-south-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/ap-south-1/end_to_end|music_recommendation|02_export_feature_groups.ipynb)\n" ] } ], "metadata": { "instance_type": "ml.m5.4xlarge", "kernelspec": { "display_name": "Python 3 (Data Science 3.0)", "language": "python", "name": "python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-east-1:081325390199:image/sagemaker-data-science-310-v1" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.6" } }, "nbformat": 4, "nbformat_minor": 4 }