{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Part 1 : Data Preparation, Process, and Store Features"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Uncomment and install s3fs, this is required to read CSV files from S3 directly into Pandas dataframe\n",
"# Once installed, please restart the Notebook Kernel (Kernel > Restart Kernel) before proceeding\n",
"\n",
"#%pip install s3fs"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
"## Overview\n",
"* **[Notebook 1: Data Preparation, Process, and Store Features](./1-data-analysis-prep.ipynb)**\n",
" * **[Getting started](#aud-getting-started)**\n",
" * **[Analyzing Data and use SageMaker Data Wranger](#wrangler)**\n",
" * **[SageMaker Feature Store](#aud-feature-store)**\n",
" * **[Create Training DataSet](#dataset)**\n",
" * **[Conclusion](#conclusion)**\n",
"* [Notebook 2: Amazon Fraud Detector Model Setup](./2-afd-model-setup.ipynb)\n",
"* [Notebook 3: Model training, deployment, real-time and batch inference](./3-afd-model-train-deploy.ipynb)\n",
"* [Notebook 4: Create an end-to-end pipeline](./4-afd-pipeline.ipynb)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The purpose of this notebook is to perform the Data Prep phase of the ML life cycle. The main Data Wrangling, data ingestion, and multiple transformations will be done through the SageMaker Studio Data Wrangler GUI.\n",
"\n",
"In this notebook, we will take the `.flow` files that define the transformations to the raw data. and apply them using a SageMaker Processing job that will apply those transformations to the raw data deposited in the S3 bucket as `.csv` files.\n",
"\n",
"
"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Install required and/or update third-party libraries"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"!python -m pip install -Uq pip\n",
"!python -m pip install -q awswrangler==2.4.0 imbalanced-learn==0.7.0 sagemaker==2.23.1 boto3==1.16.48"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Loading stored variables\n",
"If you ran this notebook before, you may want to re-use the resources you aready created with AWS. Run the cell below to load any prevously created variables. You should see a print-out of the existing variables. If you don't see anything printed then it's probably the first time you are running the notebook! "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# %store -z #deletes all previous stored variables\n",
"%store -r\n",
"%store"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Import libraries"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import json\n",
"import time\n",
"import boto3\n",
"import string\n",
"import sagemaker\n",
"import pandas as pd\n",
"import awswrangler as wr\n",
"\n",
"from sagemaker.feature_store.feature_group import FeatureGroup"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 1. Getting started: Creating Resources \n",
"\n",
"[overview](#all-up-overview)\n",
"___\n",
"In order to successfully run this notebook you will need to create some AWS resources. \n",
"First, an S3 bucket will be created to store all the data for this tutorial. \n",
"Once created, you will then need to create an AWS Glue role using the IAM console then attach a policy to the S3 bucket to allow FeatureStore access to this notebook. If you've already run this notebook and are picking up where you left off, then running the cells below should pick up the resources you already created without creating any additional resources."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Add FeatureStore policy to Studio's execution role\n",
"\n",
"
\n",
"\n",
"\n",
"1. In a separate brower tab go to the IAM section of the AWS Console\n",
"2. Navigate to the Roles section and select the execution role you're using for your SageMaker Studio user\n",
" * If you're not sure what role you're using, run the cell below to print it out\n",
"3. Attach the AmazonSageMakerFeatureStoreAccess policy to this role. Once attached, the changes take effect immediately."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"print('SageMaker Role:', sagemaker.get_execution_role().split('/')[-1])\n",
"print('SageMaker Role:', sagemaker.get_execution_role())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### 1.1 Set region, boto3 and SageMaker SDK variables"
]
},
{
"cell_type": "code",
"execution_count": 25,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Stored 'region' (str)\n",
"Using AWS Region: us-east-2\n"
]
}
],
"source": [
"#You can change this to a region of your choice\n",
"import sagemaker\n",
"region = sagemaker.Session().boto_region_name\n",
"%store region\n",
"print(\"Using AWS Region: {}\".format(region))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"boto3.setup_default_session(region_name=region)\n",
"\n",
"boto_session = boto3.Session(region_name=region)\n",
"\n",
"s3_client = boto3.client('s3', region_name=region)\n",
"\n",
"sagemaker_boto_client = boto_session.client('sagemaker')\n",
"\n",
"sagemaker_session = sagemaker.session.Session(\n",
" boto_session=boto_session,\n",
" sagemaker_client=sagemaker_boto_client)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"\"\"\"\n",
"Note: if you are not running this notebook from SageMaker Studio or SageMaker Classic Notebooks you will need to instanatiate \n",
"the sagemaker_execution_role_name with an AWS role that has SageMakerFullAccess and SageMakerFeatureStoreFullAccess\n",
"\"\"\"\n",
"sagemaker_execution_role_name = 'AmazonSageMaker-ExecutionRole-20210107T234882'\n",
"try:\n",
" sagemaker_role = sagemaker.get_execution_role()\n",
"except ValueError:\n",
" iam = boto3.client('iam')\n",
" sagemaker_role = iam.get_role(RoleName=sagemaker_execution_role_name)['Role']['Arn']\n",
" print(f\"\\n instantiating sagemaker_role with supplied role name : {sagemaker_role}\")\n",
"\n",
"account_id = boto3.client('sts').get_caller_identity()[\"Account\"]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### 1.2 Create a directory (prefix) in the SageMaker default S3 bucket"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from datetime import datetime\n",
"current_time = datetime.now()\n",
"\n",
"if 'afd_bucket' not in locals():\n",
" afd_bucket = sagemaker_session.default_bucket()\n",
" afd_prefix = \"amazon-fraud-detector\"\n",
" %store afd_bucket\n",
" %store afd_prefix\n",
" print(f'Using default bucket: {afd_bucket}... Initialized folder {afd_prefix}')\n",
"else:\n",
" print(f'Bucket name already in local cache')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"[**Optoinal**] If you want to use your own S3 bucket that's already existing, uncomment and utilize the following example code."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"'''\n",
"try:\n",
" s3_client.create_bucket(Bucket=bucket, ACL='private', CreateBucketConfiguration={'LocationConstraint': region})\n",
" print('Create S3 bucket: SUCCESS')\n",
" \n",
"except Exception as e:\n",
" if e.response['Error']['Code'] == 'BucketAlreadyOwnedByYou':\n",
" print(f'Using existing bucket: {bucket}/{prefix}')\n",
" else:\n",
" raise(e)\n",
"'''"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"In the following cell, we will initialize a few variables which will be used in either this Notebook or subsequent notebooks. "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"traing_job_output_path = f's3://{afd_bucket}/{afd_prefix}/training_jobs'\n",
"bias_report_1_output_path = f's3://{afd_bucket}/{afd_prefix}/clarify-bias-1'\n",
"bias_report_2_output_path = f's3://{afd_bucket}/{afd_prefix}/clarify-bias-2'\n",
"explainability_output_path = f's3://{afd_bucket}/{afd_prefix}/clarify-explainability'\n",
"\n",
"train_data_uri = f's3://{afd_bucket}/{afd_prefix}/data/train/train.csv'\n",
"test_data_uri = f's3://{afd_bucket}/{afd_prefix}/data/test/test.csv'"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 2. Analyzing Data and use SageMaker Data Wranger \n",
"---\n",
"\n",
"We will use two data sets in file `signup_attempts.csv` and `signup_outcomes.csv`. The `signup_attempts.csv` contains data about each signup session such as the user's ip_address, email address, city, state, address, and a timestamp of when the user attempted to sign-up for our service.\n",
"\n",
"**NOTE: We are assuming that this dataset is historical from your environment. To train the model, Amazon Fraud Detector requires at least 10,000 records, with at least 500 of those records identified as fraudulent; however, the more records and variety of fraudulent examples you provide, the better. Refer to [this blog](https://aws.amazon.com/blogs/machine-learning/catching-fraud-faster-by-building-a-proof-of-concept-in-amazon-fraud-detector/) for more.**\n",
"\n",
"The next data set i.e. `signup_outcomes.csv`, is the corresponding predictions of each of the signup attempts in the `signup_attempts.csv` file. In this case, we are assuming that this dataset is generated by your existing FD/FP service.\n",
"\n",
"Finally, we will need to use a combination of these two data sets to construct our training data set required for Amazon Fraud Detector model training."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Importing required libraries.\n",
"import pandas as pd\n",
"import numpy as np\n",
"\n",
"# load the signup_attempts dataset into a dataframe\n",
"df_signups = pd.read_csv(\"./data/signup_attempts.csv\", index_col=0)\n",
"df_signups.head()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Importing required libraries.\n",
"import pandas as pd\n",
"import numpy as np\n",
"\n",
"# load the signup_outcomes dataset into a dataframe\n",
"df_signups_pred = pd.read_csv(\"./data/signup_outcomes.csv\", index_col=0)\n",
"# df_signups_pred.head()\n",
"df_signups_pred.isnull().sum()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"As we can see that the `df_signups` dataset consists of a number of features including the `ip_address`, `email_address` and the `timestamp`. Note that the `timestamp` here is in Epoch/UNIX timestamp format. In order to be able to make it compatible for Amazon Fraud Detector model training, we need to convert it into either one of date formats listed below-\n",
"\n",
"* %yyyy-%mm-%ddT%hh:%mm:%ssZ (ISO 8601 standard in UTC only with no milliseconds)\n",
"* %yyyy/%mm/%dd %hh:%mm:%ss (AM/PM)\n",
"* %mm/%dd/%yyyy %hh:%mm:%ss\n",
"* %mm/%dd/%yy %hh:%mm:%ss\n",
"\n",
"For more information on the \"Timestamp formats\" check out the [documentation](https://docs.aws.amazon.com/frauddetector/latest/ug/online-fraud-insights.html)\n",
"\n",
"We will use Amazon Sagemaker Data Wrangler to perform this transformation to convert the timestamp to a compatible date timestamp, in this case we will use `%yyyy/%mm/%dd %hh24:%mm:%ss` format. "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"df_signups.isnull().sum()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### 2.1 Upload raw data to S3\n",
"Before you can preprocess the raw data with Data Wrangler, it must exist in S3."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"s3_client.upload_file(Filename='data/signup_attempts.csv', Bucket=afd_bucket, Key=f'{afd_prefix}/data/raw/signup_attempts.csv')\n",
"s3_client.upload_file(Filename='data/signup_outcomes.csv', Bucket=afd_bucket, Key=f'{afd_prefix}/data/raw/signup_outcomes.csv')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### 2.2 Use Data Wrangler `flow` files to do transformations"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"As we can see that there are quite a few empty/null values for our dataset features for both the files. We will use SageMaker Data Wrangler to fill these missing values. We are going to perform these transformations on our `signup_attempts.csv` and `signup_outcomes.csv` file-\n",
"\n",
"* First we will convert the `timestamp` feature from Epoch/Unix timestamp to a Amazon Fraud Detector compatible date time format in the `signup_attempts.csv` file. We will use a small piece of custom Pandas code for this transformation using \"Custom Transform\" in Data Wrangler.\n",
"\n",
"```python\n",
"import pandas as pd\n",
"df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms').dt.strftime('%m/%d/%y %H:%M:%S')\n",
"```\n",
"\n",
"
\n",
"\n",
"**NOTE: Your entire dataset will be available in a built in variable named `df` within the Data Wrangler `.flow` file.**\n",
"\n",
"* Rename the column `timestamp` to `EVENT_TIMESTAMP`. This is a requirement for Amazon Fraud Detector. For more information on this please refer to the [documentation](https://docs.aws.amazon.com/frauddetector/latest/ug/online-fraud-insights.html)\n",
"\n",
"
\n",
"\n",
"* We will fill missing values for the features. A sample missing value transformation looks like this -\n",
"\n",
"
"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### 2.3 Update attributes within the `.flow` file \n",
"DataWrangler will generate a .flow file, in our case the file is named `signup_attempts.flow` and `signup_outcomes.flow` and they are included in this project. Each flow file contains a reference to an S3 bucket used during the Wrangling. This may be different from the one you have as a default in this notebook eg. if the Wrangling was done by someone else, you will probably not have access to their bucket and you now need to point to your own S3 bucket so you can actually load the `.flow` file into Wrangler or access the data.\n",
"\n",
"After running the cell below you can open the `signup_attempts.flow` and `signup_outcomes.flow` file and export the data to S3 (described in the next step) or you can continue this guide using the provided `data/signup_attempts_preprocessed.csv` and `data/signup_outcomes_preprocessed.csv` files.\n",
"\n",
"_Optionally, you may create your own Data Wrangler `.flow` file. Refer [documentation](https://docs.aws.amazon.com/sagemaker/latest/dg/data-wrangler-getting-started.html) for step-by-step instructions on how to create one_."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Run this in case you want to run the .flow files to generate the preprocessed data sets\n",
"# if you want to save some time you may continue with this notebook by utilizing the preprocessed files already included in the data/ directory\n",
"\n",
"signup_attempts_flow_template_file = \"signup_attempts_flow_template\"\n",
"\n",
"with open(signup_attempts_flow_template_file, 'r') as f:\n",
" variables = {'bucket': afd_bucket, 'prefix': afd_prefix}\n",
" template = string.Template(f.read())\n",
" signup_attempts_flow = template.substitute(variables)\n",
" signup_attempts_flow = json.loads(signup_attempts_flow)\n",
"\n",
"with open('signup_attempts.flow', 'w') as f:\n",
" json.dump(signup_flow, f)\n",
" \n",
" \n",
"signup_outcomes_flow_template_file = \"signup_outcomes_flow_template\"\n",
"\n",
"with open(signup_outcomes_flow_template_file, 'r') as f:\n",
" variables = {'bucket': afd_bucket, 'prefix': afd_prefix}\n",
" template = string.Template(f.read())\n",
" signup_outcomes_flow = template.substitute(variables)\n",
" signup_outcomes_flow = json.loads(signup_outcomes_flow)\n",
"\n",
"with open('signup_outcomes.flow', 'w') as f:\n",
" json.dump(signup_attempts_flow, f)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Our `.flow` file looks as shown below. Note that `.flow` visualization created with Data Wrangler is native to SageMaker Studio. You may still use your flow file in SageMaker Instances, however you may have to make updates to it using the method showed in the above code cell since the `.flow` file is based on underlying JSON data (Edit - `.flow` file in a text editor)\n",
"\n",
"
"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We will now run the flow file by first exporting it. There are a few options to export your flow file. In our case, we will export the results of the transformation defined in the flow file into our S3 bucket which will generate the pre-processed data set and save it in the bucket using a _SageMaker Processing Job_. If you select \"Save to S3\" SageMaker Studio will generate a New Notebook which you can tweak and run to generate the pre-processed data, you can also generate a Python code using the \"Python Code\". \n",
"\n",
"
\n",
"\n",
"You can generate the notebook using the details mentioned above to generate the pre-processing export to S3 notebook or, continue using this tutorial with pre-processed data file that is already generated for you in the `/data` directory -- file name `signup_attempts_preprocessed.csv` and `signup_outcomes_preprocessed.csv`. To continue to use this file we will create a preprocessed directory in our S3 bucket and upload it there, ideally your Data Wrangler job would output the final processed file into this location. \n",
"\n",
"Once the files are in the designated S3 buckets we will load them into dataframes using Data Wrangler."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 3. DataSets and Feature Types \n",
"[overview](#all-up-overview)\n",
"___"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"signups_dtypes = {'ip_address': str,\n",
" 'email_address': str,\n",
" 'user_agent': str,\n",
" 'customer_city': str,\n",
" 'customer_state': str,\n",
" 'customer_postal': str,\n",
" 'customer_name': str,\n",
" 'customer_address': str,\n",
" 'phone_number': str,\n",
" 'EVENT_TIMESTAMP': str\n",
" }\n",
"\n",
"outcomes_dtypes = {'ip_address': str,\n",
" 'email_address': str,\n",
" 'EVENT_LABEL': str\n",
"}"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#======> This is your DataFlow output path if you decide to redo the work in DataFlow on your own\n",
"flow_output_path = 'YOUR_PRE_PROCESSED_DATA_S3_PATH_HERE'\n",
"\n",
"try:\n",
" # this will try to load the exported dataframes from the .flow files\n",
" signups_s3_preprocessed_path = f'{flow_output_path}/signups'\n",
" \n",
" signups_preprocessed = wr.s3.read_csv(\n",
" path=signups_s3_preprocessed_path, \n",
" dataset=True, \n",
" index_col=0,\n",
" dtype=signups_dtypes)\n",
" \n",
" outcomes_s3_preprocessed_path = f'{flow_output_path}/outcomes'\n",
" \n",
" outcomes_preprocessed = wr.s3.read_csv(\n",
" path=outcomes_s3_preprocessed_path, \n",
" dataset=True, \n",
" index_col=0,\n",
" dtype=outcomes_dtypes)\n",
"\n",
"except:\n",
" # if the Data Wrangler job was not run, the claims and customers dataframes will be loaded from local copies\n",
" timestamp = pd.to_datetime('now').timestamp()\n",
" print('Unable to load Data Wrangler output. Loading local pre-processed files into s3 and dataframes...')\n",
" \n",
" signup_attempts_s3_path = f'{afd_prefix}/data/preprocessed/signups/signup_attempts_preprocessed.csv'\n",
" signup_outcomes_s3_path = f'{afd_prefix}/data/preprocessed/outcomes/signup_outcomes_preprocessed.csv'\n",
" \n",
" # Upload the local pre-processed files to S3\n",
" s3_client.upload_file(Filename='data/signup_outcomes_preprocessed.csv', Bucket=afd_bucket, Key=signup_outcomes_s3_path)\n",
" s3_client.upload_file(Filename='data/signup_attempts_preprocessed.csv', Bucket=afd_bucket, Key=signup_attempts_s3_path)\n",
" \n",
" signups_preprocessed = wr.s3.read_csv(\n",
" path=f's3://{afd_bucket}/{afd_prefix}/data/preprocessed/signups' , \n",
" dataset=True, \n",
"# index_col=0,\n",
" dtype=signups_dtypes)\n",
" \n",
" # a timestamp column is required by the feature store, so one is added with a current timestamp\n",
" signups_preprocessed['EventTime'] = timestamp\n",
" \n",
" outcomes_preprocessed = wr.s3.read_csv(\n",
" path=f's3://{afd_bucket}/{afd_prefix}/data/preprocessed/outcomes', \n",
" dataset=True, \n",
"# index_col=0,\n",
" dtype=outcomes_dtypes)\n",
" \n",
" outcomes_preprocessed['EventTime'] = timestamp\n",
" \n",
" print('Complete')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We now have a set of Pandas DataFrames that contain the signup and fraud outcome data. "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"signups_preprocessed.isnull().sum()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"signups_preprocessed.head()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"outcomes_preprocessed.isnull().sum()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"outcomes_preprocessed.head()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 4. SageMaker Feature Store \n",
"\n",
"[overview](#all-up-overview)\n",
"___\n",
"Amazon SageMaker Feature Store is a purpose-built repository where you can store and access features so it’s much easier to name, organize, and reuse them across teams. SageMaker Feature Store provides a unified store for features during training and real-time inference without the need to write additional code or create manual processes to keep features consistent. SageMaker Feature Store keeps track of the metadata of stored features (e.g. feature name or version number) so that you can query the features for the right attributes in batches or in real time using Amazon Athena, an interactive query service. SageMaker Feature Store also keeps features updated, because as new data is generated during inference, the single repository is updated so new features are always available for models to use during training and inference.\n",
"\n",
"A feature store consists of an offline componet stored in S3 and an online component stored in a low-latency database. The online database is optional, but very useful if you need supplemental features to be available at inference. In this section, we will create a feature groups for our Claims and Customers datasets. After inserting the claims and customer data into their respective feature groups, you need to query the offline store with Athena to build the training dataset.\n",
"\n",
"You can reference the [SageMaker Developer Guide](https://docs.aws.amazon.com/sagemaker/latest/dg/feature-store.html) for more information about the SageMaker Feature Store.\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"featurestore_runtime = boto_session.client(\n",
" service_name='sagemaker-featurestore-runtime', \n",
" region_name=region\n",
")\n",
"\n",
"feature_store_session = sagemaker.Session(\n",
" boto_session=boto_session,\n",
" sagemaker_client=sagemaker_boto_client,\n",
" sagemaker_featurestore_runtime_client=featurestore_runtime\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### 4.1 Configure the feature groups\n",
"Feature data types can also be set via a config variable `feature_definitions`, but it will have to match the correspongin Python data type in the Pandas dataframe when it's ingested to the Feature Group. We will define the schemas for both our data sets in order to use it in the Feature Group"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The following is a list of the feature names and data types of the final dataset that will be produced when your data flow is used to process your input dataset."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"signups_schema = [\n",
" {\n",
" \"name\": \"ip_address\",\n",
" \"type\": \"string\"\n",
" },\n",
" {\n",
" \"name\": \"email_address\",\n",
" \"type\": \"string\"\n",
" },\n",
" {\n",
" \"name\": \"user_agent\",\n",
" \"type\": \"string\"\n",
" },\n",
" {\n",
" \"name\": \"customer_city\",\n",
" \"type\": \"string\"\n",
" },\n",
" {\n",
" \"name\": \"customer_state\",\n",
" \"type\": \"string\"\n",
" },\n",
" {\n",
" \"name\": \"customer_postal\",\n",
" \"type\": \"float\"\n",
" },\n",
" {\n",
" \"name\": \"customer_name\",\n",
" \"type\": \"string\"\n",
" },\n",
" {\n",
" \"name\": \"customer_address\",\n",
" \"type\": \"string\"\n",
" },\n",
" {\n",
" \"name\": \"phone_number\",\n",
" \"type\": \"string\"\n",
" },\n",
" {\n",
" \"name\": \"EVENT_TIMESTAMP\",\n",
" \"type\": \"string\"\n",
" },\n",
" {\n",
" \"name\": \"EventTime\",\n",
" \"type\": \"float\"\n",
" }\n",
"]\n",
"\n",
"outcomes_schema = [\n",
" {\n",
" \"name\": \"ip_address\",\n",
" \"type\": \"string\"\n",
" },\n",
" {\n",
" \"name\": \"email_address\",\n",
" \"type\": \"string\"\n",
" },\n",
" {\n",
" \"name\": \"EVENT_LABEL\",\n",
" \"type\": \"string\"\n",
" },\n",
" {\n",
" \"name\": \"EventTime\",\n",
" \"type\": \"float\"\n",
" }\n",
"]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Below we create the SDK input for those feature definitions. Some schema types in Data Wrangler are not \n",
"supported by Feature Store. The following will create a default_FG_type set to String for these types."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from sagemaker.feature_store.feature_definition import FeatureDefinition\n",
"from sagemaker.feature_store.feature_definition import FeatureTypeEnum\n",
"\n",
"default_feature_type = FeatureTypeEnum.STRING\n",
"column_to_feature_type_mapping = {\n",
" \"float\": FeatureTypeEnum.FRACTIONAL,\n",
" \"long\": FeatureTypeEnum.INTEGRAL\n",
"}\n",
"\n",
"# the signups data set schema\n",
"signups_feature_definitions = [\n",
" FeatureDefinition(\n",
" feature_name=column_schema['name'], \n",
" feature_type=column_to_feature_type_mapping.get(column_schema['type'], default_feature_type)\n",
" ) for column_schema in signups_schema\n",
"]\n",
"\n",
"# the outcome dataset schema\n",
"outcomes_feature_definitions = [\n",
" FeatureDefinition(\n",
" feature_name=column_schema['name'], \n",
" feature_type=column_to_feature_type_mapping.get(column_schema['type'], default_feature_type)\n",
" ) for column_schema in outcomes_schema\n",
"]"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"signups_fg_name = f'{afd_prefix}-signups'\n",
"outcomes_fg_name = f'{afd_prefix}-outcomes'\n",
"%store signups_fg_name \n",
"%store outcomes_fg_name\n",
"\n",
"signups_feature_group = FeatureGroup(\n",
" name=signups_fg_name, \n",
" sagemaker_session=feature_store_session,\n",
" feature_definitions=signups_feature_definitions)\n",
"\n",
"outcomes_feature_group = FeatureGroup(\n",
" name=outcomes_fg_name, \n",
" sagemaker_session=feature_store_session,\n",
" feature_definitions=outcomes_feature_definitions)\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### 4.2 Create the feature groups\n",
"You must tell the Feature Group which columns in the dataframe correspond to the required record indentifier and event time features."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"print(f\"{signups_fg_name} -- {outcomes_fg_name} are the feature group names in use\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"record_identifier_feature_name = 'ip_address' #we will use 'ip_address' as the record identifier for both datasets\n",
"event_time_feature_name = 'EventTime'\n",
"\n",
"try:\n",
" print(f\"\\n Using s3://{afd_bucket}/{afd_prefix}\")\n",
" signups_feature_group.create(\n",
" s3_uri=f\"s3://{afd_bucket}/{afd_prefix}\",\n",
" record_identifier_name=record_identifier_feature_name,\n",
" event_time_feature_name=event_time_feature_name,\n",
" role_arn=sagemaker_role,\n",
" enable_online_store=True\n",
" )\n",
" print(f'Create \"signups\" feature group: SUCCESS')\n",
"except Exception as e:\n",
" code = e.response.get('Error').get('Code')\n",
" if code == 'ResourceInUse':\n",
" print(f'Using existing feature group: {signups_fg_name}')\n",
" else:\n",
" raise(e)\n",
"\n",
"try:\n",
" outcomes_feature_group.create(\n",
" s3_uri=f\"s3://{afd_bucket}/{afd_prefix}\",\n",
" record_identifier_name=record_identifier_feature_name,\n",
" event_time_feature_name=event_time_feature_name,\n",
" role_arn=sagemaker_role,\n",
" enable_online_store=True\n",
" )\n",
" print(f'Create \"outcomes\" feature group: SUCCESS')\n",
"except Exception as e:\n",
" code = e.response.get('Error').get('Code')\n",
" if code == 'ResourceInUse':\n",
" print(f'Using existing feature group: {outcomes_fg_name}')\n",
" else:\n",
" raise(e)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Wait until feature group creation has fully completed"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def wait_for_feature_group_creation_complete(feature_group):\n",
" status = feature_group.describe().get(\"FeatureGroupStatus\")\n",
" while status == \"Creating\":\n",
" print(\"Waiting for Feature Group Creation\")\n",
" time.sleep(5)\n",
" status = feature_group.describe().get(\"FeatureGroupStatus\")\n",
" if status != \"Created\":\n",
" raise RuntimeError(f\"Failed to create feature group {feature_group.name}\")\n",
" print(f\"FeatureGroup {feature_group.name} successfully created.\")\n",
" \n",
"wait_for_feature_group_creation_complete(feature_group=signups_feature_group)\n",
"wait_for_feature_group_creation_complete(feature_group=outcomes_feature_group)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### 4.3 Ingest records into the Feature Groups\n",
"After the Feature Groups have been created, we can put data into each store by using the PutRecord API. This API can handle high TPS and is designed to be called by different streams. The data from all of these Put requests is buffered and written to s3 in chunks. The files will be written to the offline store within a few minutes of ingestion."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"if 'training_data_table' in locals():\n",
" print(\"You may have already ingested the data into your Feature Groups. If you'd like to do this again, you can run the ingest methods outside of the 'if/else' statement.\")\n",
"\n",
"else:\n",
" signups_feature_group.ingest(\n",
" data_frame=signups_preprocessed, max_workers=3, wait=True\n",
" );\n",
"\n",
" outcomes_feature_group.ingest(\n",
" data_frame=outcomes_preprocessed, max_workers=3, wait=True\n",
" );"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Wait for offline store data to become available\n",
"This usually takes 5-8 minutes. The Feature Store offline database works with S3 Athena query engine and generates parquet files which can then be queried using SQL like Athena query."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"signups_feature_group_s3_prefix = f'{afd_prefix}/{account_id}/sagemaker/{region}/offline-store/{signups_fg_name}/data'\n",
"outcomes_feature_group_s3_prefix = f'{afd_prefix}/{account_id}/sagemaker/{region}/offline-store/{outcomes_fg_name}/data'\n",
"\n",
"offline_store_contents = None\n",
"while (offline_store_contents is None):\n",
" objects_in_bucket = s3_client.list_objects(Bucket=afd_bucket, Prefix=signups_feature_group_s3_prefix)\n",
" if ('Contents' in objects_in_bucket and len(objects_in_bucket['Contents']) > 1):\n",
" offline_store_contents = objects_in_bucket['Contents']\n",
" else:\n",
" print('Waiting for data in offline store...')\n",
" time.sleep(60)\n",
" \n",
"print('\\nData available.')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Our feature store has now been created and data has been ingested. Now we will use our SageMaker Feature Store offline data store to create training dataset.\n",
"\n",
"
"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 5. Create training dataset \n",
"\n",
"[overview](#all-up-overview)\n",
"___\n",
"Once the data is available in the offline store, it will automatically be cataloged and loaded into an Athena table (this is done by default, but can be turned off). In order to build our training and test datasets, you will submit a SQL query to join the the Signup attempts and outcomes tables created in Athena."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"signup_attempts_query = signups_feature_group.athena_query()\n",
"signup_outcomes_query = outcomes_feature_group.athena_query()\n",
"\n",
"signup_attempts_table = signup_attempts_query.table_name\n",
"signup_outcomes_table = signup_outcomes_query.table_name\n",
"afd_database_name = signup_outcomes_query.database\n",
"\n",
"%store signup_attempts_table\n",
"%store signup_outcomes_table\n",
"%store afd_database_name\n",
"\n",
"feature_columns = list( set(signups_preprocessed.columns) ^ set(outcomes_preprocessed.columns) )\n",
"feature_columns_string = \", \".join(f'\\\"{c}\\\"' for c in feature_columns)\n",
"feature_columns_string = f'\"EVENT_TIMESTAMP\", \"EVENT_LABEL\", \"{signup_attempts_table}\".ip_address as ip_address, \"{signup_attempts_table}\".email_address as email_address, \"user_agent\" ,\"customer_name\", \"phone_number\", \"customer_city\", \"customer_postal\", \"customer_state\", \"customer_address\"' #+ feature_columns_string\n",
"\n",
"query_string = f\"\"\"\n",
"SELECT DISTINCT {feature_columns_string}\n",
"FROM \"{signup_attempts_table}\" LEFT JOIN \"{signup_outcomes_table}\" \n",
"ON \"{signup_attempts_table}\".ip_address = \"{signup_outcomes_table}\".ip_address\n",
"AND \"{signup_attempts_table}\".email_address = \"{signup_outcomes_table}\".email_address\n",
"\"\"\"\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"response=signup_attempts_query.run(query_string=query_string, output_location=f's3://{afd_bucket}/{afd_prefix}/query_results')\n",
"signup_attempts_query.wait()\n",
"dataset = signup_attempts_query.as_dataframe()\n",
"\n",
"output_file_name=f\"{response}.csv\"\n",
"output_file_path=f's3://{afd_bucket}/{afd_prefix}/query_results/{output_file_name}'"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"print(output_file_path)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"dataset.head()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The query results will now be saved to the S3 bucket under `query_results/` prefix. The final training data file `afd_training_data.csv` is also included in this project. This concludes our data preperation for Amazon Fraud Detector. In the next notebook, we will build an AFD Model and train it with this dataset. The subsequent code cells will take the query output and move it to the final training data S3 location."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import s3fs # This is required to read CSV data directly from S3 into Pandas dataframe\n",
"import pandas as pd\n",
"import csv\n",
"\n",
"df_train = pd.read_csv(output_file_path)\n",
"df_train.to_csv(\"./data/afd_training_data.csv\", index=False)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"s3_client.upload_file(Filename='data/afd_training_data.csv', \n",
" Bucket=afd_bucket, \n",
" Key=f'{afd_prefix}/training_data/afd_training_data.csv')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 5. Conclusion \n",
"\n",
"[overview](#all-up-overview)\n",
"___"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"In this notebook, we processed our raw data with SageMaker Data Wrangler and then stored our features into SageMaker Feature store. We then ingested the preprocessed data into Feature Store offline datastore. We then used Athena query to join our pre-processed datasets to generate our training dataset and copied it into an S3 location. \n",
"\n",
"In the next notebook, we will be setting up our Amazon Fraud Detector model."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"instance_type": "ml.t3.medium",
"kernelspec": {
"display_name": "Python 3 (Data Science)",
"language": "python",
"name": "python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-east-2:429704687514:image/datascience-1.0"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.10"
}
},
"nbformat": 4,
"nbformat_minor": 4
}