{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "## Amazon Sagemaker Feature Store\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import io\n", "import pandas as pd\n", "import time\n", "import sagemaker\n", "import boto3\n", "from sagemaker.feature_store.feature_group import FeatureGroup\n", "from sagemaker import get_execution_role\n", "\n", "role = get_execution_role()\n", "sess = sagemaker.Session()\n", "bucket = sess.default_bucket()\n", "region = sagemaker.Session().boto_region_name\n", "\n", "# read the prepared data from S3. Enter the S3 URI value. Example S3 URI: s3://sagemaker-ap-south-1-367858208265/data-preparation-using-amazon-sagemaker-and-glue-databrew/Results/DataWrangler/output_1637236520/part-00000-5f963eac-eb6b-4af8-aed3-d777ec46c79a-c000.csv\n", "source = '{S3 URI of csv file}'\n", "s3Bucket = source.replace('s3://', '')\n", "s3Key = s3Bucket[s3Bucket.find('/')+1:]\n", "s3Bucket = s3Bucket[:s3Bucket.find('/')]\n", "s3Client = boto3.client('s3')\n", "s3Obj = s3Client.get_object(Bucket = s3Bucket, Key = s3Key)\n", "df = pd.read_csv(io.BytesIO(s3Obj['Body'].read()))\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "When creating a feature group, you can also create the metadata for the feature group, such as a short description, storage configuration, features for identifying each record, and the event time, as well as tags to store information such as the author, data source, version, and more. Since we do not have any such column, we are adding two extra columns called Fraud_ID and Fraud_time" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#Add unique ID and event time for features store\n", "\n", "df['Fraud_ID'] = df.index + 1000\n", "current_time_sec = int(round(time.time()))\n", "df['Fraud_time'] = pd.Series([current_time_sec]*len(df), dtype=\"float64\")\n", "df=df.drop(['_c0'],axis=1)\n", "df.head()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# initialize boto3 client\n", "\n", "boto3.setup_default_session(region_name=region)\n", "s3_client = boto3.client(\"s3\", region_name=region)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Configure the feature groups\n", "The datatype for each feature is set by passing a dataframe and inferring the proper datatype. Feature data types can also be set via a config variable, but it will have to match the correspongin Python data type in the Pandas dataframe when it’s ingested to the Feature Group." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#configure the features\n", "\n", "fraud_fg_name = f\"auto-fraud\"\n", "fraud_feature_group = FeatureGroup(name=fraud_fg_name, sagemaker_session=sess)\n", "#fraud_feature_group.delete()\n", "fraud_feature_group.load_feature_definitions(data_frame=df)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create the feature groups\n", "You must tell the Feature Group which columns in the dataframe correspond to the required record indentifier and event time features." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "record_identifier_feature_name = \"Fraud_ID\"\n", "event_time_feature_name = \"Fraud_time\"\n", "fraud_feature_group_s3_prefix=\"data-preparation-using-amazon-sagemaker-and-glue-databrew/FeatureStore\"\n", "\n", "try:\n", " fraud_feature_group.create(\n", " s3_uri=f\"s3://{bucket}/{fraud_feature_group_s3_prefix}\",\n", " record_identifier_name=record_identifier_feature_name,\n", " event_time_feature_name=event_time_feature_name,\n", " role_arn=role,\n", " enable_online_store=True,\n", " )\n", " print(f'Create \"fraud\" feature group: SUCCESS')\n", "except Exception as e:\n", " code = e.response.get(\"Error\").get(\"Code\")\n", " if code == \"ResourceInUse\":\n", " print(f\"Using existing feature group: {fraud_fg_name}\")\n", " else:\n", " raise (e)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Wait until feature group creation has fully completed" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def wait_for_feature_group_creation_complete(feature_group):\n", " status = feature_group.describe().get(\"FeatureGroupStatus\")\n", " while status == \"Creating\":\n", " print(\"Waiting for Feature Group Creation\")\n", " time.sleep(5)\n", " status = feature_group.describe().get(\"FeatureGroupStatus\")\n", " if status != \"Created\":\n", " raise RuntimeError(f\"Failed to create feature group {feature_group.name}\")\n", " print(f\"FeatureGroup {feature_group.name} successfully created.\")\n", "\n", "\n", "wait_for_feature_group_creation_complete(feature_group=fraud_feature_group)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Ingest records into the Feature Groups\n", "After the Feature Groups have been created, we can put data into each store by using the PutRecord API. This API can handle high TPS and is designed to be called by different streams. The data from all of these Put requests is buffered and written to s3 in chunks. The files will be written to the offline store within a few minutes of ingestion." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fraud_feature_group.ingest(data_frame=df, max_workers=3, wait=True)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Wait for offline store data to become available\n", "This usually takes 5-8 minutes" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#The FeatureGroup contains an OnlineStoreConfig and an OfflineStoreConfig controlling where the data is stored. \n", "\n", "fraud_feature_group_resolved_output_s3_uri = (\n", " fraud_feature_group.describe()\n", " .get(\"OfflineStoreConfig\")\n", " .get(\"S3StorageConfig\")\n", " .get(\"ResolvedOutputS3Uri\")\n", ")\n", "\n", "fraud_feature_group_s3_prefix = fraud_feature_group_resolved_output_s3_uri.replace(\n", " f\"s3://{bucket}/\", \"\"\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "offline_store_contents = None\n", "while offline_store_contents is None:\n", " objects_in_bucket = s3_client.list_objects(\n", " Bucket=bucket, Prefix=fraud_feature_group_s3_prefix\n", " )\n", " if \"Contents\" in objects_in_bucket and len(objects_in_bucket[\"Contents\"]) > 1:\n", " offline_store_contents = objects_in_bucket[\"Contents\"]\n", " else:\n", " print(\"Waiting for data in offline store...\")\n", " time.sleep(60)\n", "\n", "print(\"\\nData is available now.\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "instance_type": "ml.t3.medium", "kernelspec": { "display_name": "Python 3 (Data Science)", "language": "python", "name": "python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:ap-south-1:394103062818:image/datascience-1.0" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.10" } }, "nbformat": 4, "nbformat_minor": 4 }