{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Setup environment for data transformation and ingestion workflow\n", "This notebook sets up needed resources and parameters for a custom SageMaker project which provision a data transformation and ingestion workflow:\n", "\n", "\"solution\n", "\n", "1. Data file or files uploaded to an Amazon S3 bucket\n", "2. Data processing and transformation process is launched\n", "3. Extracted, processed, and transformed features are ingested into a designated feature group in Feature Store\n", "\n", "The notebook takes you through following activites to create the pre-requisite resources:\n", "- Get an Amazon S3 bucket for data upload\n", "- download the dataset and explore the data\n", "- create Amazon Data Wrangler flow for data transformation and feature ingestion\n", "- create a new feature group in Feature Store where features are stored\n", "\n", "⭐ Depending on your specific use case and requirements, for your own custom project you can consider to create all these resources as part of the project provisioning." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Load packages:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import os\n", "import json\n", "import boto3\n", "import pandas as pd\n", "import sagemaker\n", "from sagemaker.session import Session\n", "from sagemaker.feature_store.feature_definition import FeatureDefinition\n", "from sagemaker.feature_store.feature_definition import FeatureTypeEnum\n", "from sagemaker.feature_store.feature_group import FeatureGroup\n", "\n", "import time\n", "from time import gmtime, strftime\n", "import uuid\n", "\n", "\n", "print(sagemaker.__version__)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%store -r\n", "%store" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Get `domain_id` and `execution_role`:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "NOTEBOOK_METADATA_FILE = \"/opt/ml/metadata/resource-metadata.json\"\n", "domain_id = None\n", "\n", "if os.path.exists(NOTEBOOK_METADATA_FILE):\n", " with open(NOTEBOOK_METADATA_FILE, \"rb\") as f:\n", " domain_id = json.loads(f.read()).get('DomainId')\n", " print(f\"SageMaker domain id: {domain_id}\")\n", "\n", "%store domain_id" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "r = boto3.client(\"sagemaker\").describe_domain(DomainId=domain_id)\n", "execution_role = r[\"DefaultUserSettings\"][\"ExecutionRole\"]\n", "\n", "%store execution_role" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Get S3 bucket for data\n", "We use the SageMaker default bucket for storing all solution artifacts and data. You can choose to create or use your own bucket. Make sure you have corresponding permissions attached to the SageMaker execution role and to `AmazonSageMakerServiceCatalogProductsUseRole` role to be able to list, read, and put objects into the bucket." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data_bucket = None # you can use your own S3 bucket name\n", "sagemaker_session = Session()\n", "\n", "if data_bucket is None:\n", " data_bucket = sagemaker_session.default_bucket()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(data_bucket)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "⭐ You can keep the following literals set to their default values or change them if you would like." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# set some literals\n", "s3_data_prefix = f\"{data_bucket}/feature-store-ingestion-pipeline/dataset/\"\n", "s3_flow_prefix = f\"{data_bucket}/feature-store-ingestion-pipeline/dw-flow/\"\n", "s3_fs_query_output_prefix = f\"{data_bucket}/feature-store-ingestion-pipeline/fs_query_results/\"\n", "\n", "dw_flow_name = \"dw-flow\" # change to your custom file name if you use a different one\n", "unique_suffix = f\"{strftime('%d-%H-%M-%S', gmtime())}-{str(uuid.uuid4())[:8]}\"\n", "abalone_dataset_file_name = \"abalone.csv\"\n", "abalone_dataset_local_path = \"../dataset/\"\n", "abalone_dataset_local_url = f\"{abalone_dataset_local_path}{abalone_dataset_file_name}\"\n", "\n", "print(f\"Data Wrangler flow upload and a feature group will have this unique suffix: {unique_suffix}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Download the dataset\n", "We use a well-known [Abalone dataset](https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/regression.html#abalone) in this solution. The dataset contains 4177 rows of data, and 8 features.\n", "\n", "Dua, D. and Graff, C. (2019). UCI Machine Learning Repository [http://archive.ics.uci.edu/ml]. Irvine, CA: University of California, School of Information and Computer Science." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!mkdir -p ../dataset\n", "!rm -fr ../dataset/*" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Download the dataset from [UCI website](http://archive.ics.uci.edu/ml/datasets/Abalone):" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!cd {abalone_dataset_local_path} && wget -t inf http://archive.ics.uci.edu/ml/machine-learning-databases/abalone/abalone.data\n", "!cd {abalone_dataset_local_path} && wget -t inf http://archive.ics.uci.edu/ml/machine-learning-databases/abalone/abalone.names" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Load the dataset and print first five rows." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# dictionary of dataset columns and data types\n", "columns = {\n", " \"sex\":\"string\", \n", " \"length\":\"float\", \n", " \"diameter\":\"float\", \n", " \"height\":\"float\", \n", " \"whole_weight\":\"float\", \n", " \"shucked_weight\":\"float\", \n", " \"viscera_weight\":\"float\", \n", " \"shell_weight\":\"float\",\n", " \"rings\":\"long\"\n", "}" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data_df = pd.read_csv(f\"{abalone_dataset_local_path}abalone.data\", names=columns.keys())\n", "print(f\"Data shape: {data_df.shape}\")\n", "data_df.head()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# save the dataframe as CSV with the header and index\n", "data_df.to_csv(abalone_dataset_local_url, index_label=\"record_id\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Upload the data to the data S3 bucket." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!aws s3 cp {abalone_dataset_local_path}. s3://{s3_data_prefix} --recursive" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(f\"Data uploaded to s3://{s3_data_prefix}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Data Wrangler flow\n", "You can use the provided [Data Wrangler flow file](dw-flow.flow) and skip the **Create Data Wrangler flow** section and move on directly to **Set output name** step. Alternatively you can follow the instructions how to create a new flow with data transformations." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create Data Wrangler flow (OPTIONAL)\n", "\n", "
💡 The creation of Data Wrangler flow is optional\n", "
\n", "\n", "Follow these step-by-step instructions to create a new Data Wrangler flow and add data transformation steps to the flow. Refer to [Data Wrangler documentation](https://docs.aws.amazon.com/sagemaker/latest/dg/data-wrangler.html) for more details.\n", "\n", "1. Select **Data Wrangler** in **SageMaker resources** widget:\n", "\n", "\n", "\n", "2. Click on **New flow**: \n", "\n", "\n", "\n", "3. Select **Amazon S3** as your source:\n", "\n", "\n", "\n", "4. Navigate to the S3 bucket path to import the dataset you uploaded to the S3 prefix in the previous section:\n", "\n", "\n", "\n", "5. Select `abalone.csv` file, check that **First row is header** is selected and **Delimiter** set to `COMMA`. Click on **Import**:\n", "\n", "\n", "\n", "6. Right-click on the untitled.flow flow if you want to rename it to `dw-flow.flow`. ⭐ You can use your own name and don't overwrite the provided original flow file. In this case you have to change the value of `dw_flow_name` variable correspondingly.\n", "\n", "\n", "\n", "7. We add Data Wrangler transformations containing several custom Python Pandas commands for the following:\n", " - scale all numeric columns using sklearn `StandardScaler`\n", " - one-hot encoding of the categorical column `sex`\n", " - add a timestamp column `record_time`, which is required by Feature Store\n", "\n", "In order to add transformations, go to the **Data Flow** tab and click on the + sign next to **Date types** box and select **Add transform**:\n", "\n", "\n", "\n", "8. Click on **+ Add step** and select **Custom transform** and **Python (Pandas)** in the selection box:\n", "\n", "\n", "\n", "\n", "\n", "Enter the following code into the editor:\n", "```python\n", "import pandas as pd\n", "from sklearn.preprocessing import StandardScaler\n", "\n", "df_scaled = df.drop(['record_id', 'sex','rings'], axis=1)\n", "df_scaled = StandardScaler().fit_transform(df_scaled.to_numpy())\n", "df_scaled = pd.DataFrame(df_scaled, columns=['length','diameter','height','whole_weight','shucked_weight','viscera_weight','shell_weight'])\n", "\n", "df = pd.concat((df_scaled, df[['record_id', 'sex','rings']]), 1)\n", "```\n", "\n", "We scale all numeric columns in one step by using this Custom Transform in Python (Pandas) and sklearn. \n", "\n", "Click on **Preview**, then **Add** to add the transform to the data flow.\n", "\n", "9. We use the native **Encode Categorical** feature of Data Wrangler to one hot encode the `Sex` variable. Click on **+ Add step** and under **Add Transform** on the right select **Encode Categorical**:\n", "\n", "\n", "\n", "Select `One-hot encode` for Transform, `sex` for Input column, and `Columns` as Output style.\n", " \n", "Click on **Preview** to see the changes and then on **Add**.\n", "\n", "10. Finally, click on **+ Add step** and select **Custom transform** and **Python (Pandas)**:\n", "\n", "\n", "\n", "Enter the following code into the editor:\n", "```python\n", "import time\n", "import pandas as pd\n", "\n", "record_time_feature_name = 'record_time'\n", "current_time_sec = int(round(time.time()))\n", "df[record_time_feature_name] = pd.Series([current_time_sec]*len(df), dtype=\"float\")\n", "```\n", "\n", "Click on **Preview**, then **Add** to add the transform to the data flow.\n", "\n", "11. Now you have three transformation steps in **Transforms** overview:\n", "\n", "\n", "\n", "12. Save your Data Wrangler flow. Select **File** and then select **Save Data Wrangler Flow**.\n", "\n", "Click on **Back to data flow** and navigate to the **Export** tab.\n", "\n", "Select the last step in your Data Wrangler flow.\n", "\n", "\n", "\n", "Choose **Export step** and select the export option **Feature Store**:\n", "\n", "\n", "\n", "13. A new generated notebook will be opened in a new window. Navigate to **Output: Feature Store** section in the notebook and locate the `output_name` variable:\n", "\n", "\n", "\n", "Copy the value of `output_name` variable and paste it in the following code cell in this notebook.\n", "\n", "### End of Data Wrangler flow manual creation\n", "---" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Set output name\n", "Data Wrangler processor needs a `node_id` of the last transformation step, after which transformed data is exported to the output destination. \n", "If you created your own Data Wrangler flow or added more transformation steps to the flow, you need to set the `dw_output_name` to a correct `node_id` value as described in the previous section in step 12 and 13. Otherwise run the following code cell." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Set the dw_output_name to your export node_id, otherwise keep None if you use the provided flow\n", "dw_output_name = None\n", "\n", "if dw_output_name is None:\n", " flow_content = json.loads(open(f\"{dw_flow_name}.flow\").read())\n", " dw_output_name = f\"{flow_content['nodes'][len(flow_content['nodes'])-1]['node_id']}.default\"\n", " \n", "print(f\"DataWrangler flow output name: {dw_output_name}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Upload DataWrangler flow to S3 bucket\n", "Finally, we upload the Data Wrangler flow to the S3 bucket. The data processing pipeline uses this flow file to run the data transformation." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "dw_flow_file_url = f\"s3://{s3_flow_prefix}{dw_flow_name}-{unique_suffix}.flow\"" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!aws s3 cp {dw_flow_name}.flow {dw_flow_file_url}" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Create feature group\n", "We must create a new feature group in SageMaker Feature Store to store the data features. A feature group is a predefined schema for a \n", "collection of features - each feature in the feature group has a specified data type and name. \n", "\n", "A single record in a feature group corresponds to a row in your dataframe. A feature store is a \n", "collection of feature groups. To learn more about SageMaker Feature Store, see \n", "[Amazon Feature Store Documentation](http://docs.aws.amazon.com/sagemaker/latest/dg/feature-store.html)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Select Record identifier and Record time feature name. These are required parameters for feature group\n", "creation." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "record_identifier_feature_name = 'record_id'\n", "if record_identifier_feature_name is None:\n", " raise SystemExit(\"Select a column name as the feature group record identifier.\")\n", "\n", "record_time_feature_name = 'record_time'\n", "if record_time_feature_name is None:\n", " raise SystemExit(\"Select a column name as the event time feature name.\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The following is a list of the feature names and data types of the **final dataset** that will be produced when your data flow is used to process your input dataset." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "try:\n", " columns\n", " \n", "except NameError:\n", " # dictionary of dataset columns and data types\n", " columns = {\n", " \"sex\":\"string\", \n", " \"length\":\"float\", \n", " \"diameter\":\"float\", \n", " \"height\":\"float\", \n", " \"whole_weight\":\"float\", \n", " \"shucked_weight\":\"float\", \n", " \"viscera_weight\":\"float\", \n", " \"shell_weight\":\"float\",\n", " \"rings\":\"long\"\n", " }" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# since we added one-hot encoding for the categorical column `sex`, adjust the column list for the feature group\n", "if columns.get(\"sex\") is not None: \n", " del columns[\"sex\"]\n", " \n", "for i in ('M', 'I', 'F'):\n", " columns[f\"sex_{i}\"] = \"float\"\n", "\n", "columns" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "column_schemas = [\n", " *[{\"name\":c[0], \"type\":c[1]} for c in columns.items()],\n", " {\n", " \"name\": record_identifier_feature_name,\n", " \"type\": \"long\"\n", " },\n", " {\n", " \"name\": record_time_feature_name,\n", " \"type\": \"float\"\n", " },\n", "]\n", "\n", "column_schemas" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Below we create the SDK input for those feature definitions. Some schema types in Data Wrangler are not \n", "supported by Feature Store. The following will create a `default_feature_type` set to String for these types." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "default_feature_type = FeatureTypeEnum.STRING\n", "column_to_feature_type_mapping = {\n", " \"float\": FeatureTypeEnum.FRACTIONAL,\n", " \"long\": FeatureTypeEnum.INTEGRAL\n", "}\n", "\n", "feature_definitions = [\n", " FeatureDefinition(\n", " feature_name=column_schema['name'], \n", " feature_type=column_to_feature_type_mapping.get(column_schema['type'], default_feature_type)\n", " ) for column_schema in column_schemas\n", "]\n", "print(f\"feature definitions: {feature_definitions}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Define some literals:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "feature_group_name_prefix = \"FG-abalone\"\n", "feature_group_name = f\"{feature_group_name_prefix}-{unique_suffix}\"\n", "feature_store_offline_s3_uri = f\"s3://{data_bucket}\"\n", "\n", "# controls if online store is enabled. Enabling the online store allows quick access to \n", "# the latest value for a Record via the GetRecord API.\n", "enable_online_store = False" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Create a feature group using SageMaker Python SDK:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "feature_group = FeatureGroup(\n", " name=feature_group_name,\n", " sagemaker_session=sagemaker_session,\n", " feature_definitions=feature_definitions)\n", "\n", "feature_group.create(\n", " s3_uri=feature_store_offline_s3_uri,\n", " record_identifier_name=record_identifier_feature_name,\n", " event_time_feature_name=record_time_feature_name,\n", " role_arn=execution_role,\n", " enable_online_store=enable_online_store\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Wait until the feature group is ready, it takes around 1 minute:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "while feature_group.describe().get(\"FeatureGroupStatus\") == \"Creating\":\n", " print(\"Waiting for Feature Group Creation\")\n", " time.sleep(5)\n", "\n", "if feature_group.describe().get(\"FeatureGroupStatus\") != \"Created\":\n", " raise SystemExit(f\"Failed to create feature group {feature_group.name}: {status}\")\n", "print(f\"FeatureGroup {feature_group.name} successfully created.\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
💡 Dealing with AccessDenied exception
\n", "\n", "If you get `AccessDenied` exception during creation of a feature group, it may be caused by Lake Formation permissions on `sagemaker_featurestore` database. \n", "\n", "You have to grant permissions to that database to the SageMaker execution role (or role that you use to access Feature Store) in Lake Formation as described in [this document](../grant-sm-execution-role-access-to-fs.pdf)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Query data in feature group\n", "Upon creation, the feature group in Feature Store is empty and contains no data. You can browse the feature group meta data by selecting **Feature Store** in **SageMaker resources** widget:\n", "\n", "\n", "\n", "or use SageMaker SDK:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "feature_group.describe()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "You can query data in the feature group by using Athena query, as demonstrated in the next two code cells." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Build SQL query to features group\n", "fs_query = feature_group.athena_query()\n", "\n", "query_string = f'SELECT * FROM \"{fs_query.table_name}\"'\n", "print(f'Prepared query {query_string}')\n", "print(fs_query)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Run Athena query. The output is loaded to a Pandas dataframe.\n", "fs_query.run(\n", " query_string=query_string, \n", " output_location=f\"s3://{s3_fs_query_output_prefix}\"\n", ")\n", "\n", "fs_query.wait()\n", "fs_df = fs_query.as_dataframe()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fs_df" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "As expected, the feature group doesn't contain any data.\n", "Now is everything ready for deployment of the data transformation and ingestion pipeline, which is going to ingest features into our feature group." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Store the parameters\n", "We store the parameters for data transformation and ingestion pipeline using `%store` magic. We are going to use these parameters in the next notebook." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%store data_bucket\n", "%store dw_flow_file_url\n", "%store dw_output_name\n", "%store feature_group_name\n", "%store s3_data_prefix\n", "%store s3_flow_prefix \n", "%store s3_fs_query_output_prefix\n", "%store abalone_dataset_file_name\n", "%store abalone_dataset_local_url\n", "\n", "%store" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Release resources" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%html\n", "\n", "

Shutting down your kernel for this notebook to release resources.

\n", "\n", " \n", "" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Proceed to the [`01-feature-store-ingest-pipeline` notebook](01-feature-store-ingest-pipeline.ipynb)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3.9.12 64-bit", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.12" }, "vscode": { "interpreter": { "hash": "aee8b7b246df8f9039afb4144a1f6fd8d2ca17a180786b69acc140d282b71a49" } } }, "nbformat": 4, "nbformat_minor": 4 }