{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Making predictions using streaming aggregated features\n",
"\n",
"All prior notebooks have been setting up our end to end solution. Now that all those steps are complete, it is time to see the solution in action. In this notebook, we will send credit card transactions to our input Kinesis stream and show that we can detect fraud. We take advantage of multiple online feature groups in the Amazon SageMaker Feature Store. One of those feature groups is refreshed by a processing job, which we would run nightly to provide aggregate features looking back one week. The other feature group uses streaming ingestion to aggregate features that look back over a rolling 10-minute window.\n",
"\n",
"
\n",
"\n",
"\n",
"### Recap of what is in place\n",
"\n",
"Here is a recap of what we have done so far:\n",
"\n",
"1. In [notebook 0](./0_prepare_transactions_dataset.ipynb), We generated a synthetic dataset of transactions, including simulated fraud attacks.\n",
"2. In [notebook 1](./1_setup.ipynb), we created our two feature groups. In that same notebook, we also created a Kinesis data stream and a Kinesis Data Analytics SQL application that consumes the transaction stream and produces aggregate features. These features are provided in near real time to Lambda, and they look back over a 10 minute window.\n",
"3. In [notebook 2](./2_batch_ingestion.ipynb), we used a SageMaker Processing Job to create aggregated features and used them to feed both the training dataset as well as an online feature group.\n",
"4. In [notebook 3](./3_train_and_deploy_model.ipynb), we trained and deployed an XGBoost model to detect fraud.\n",
"5. Our [CloudFormation template](https://console.aws.amazon.com/cloudformation/home) deployed a pair of Lambda functions. One listens to the KDA SQL output and keeps the `cc-agg-fg` feature group up to date. The other Lambda listens to the Kinesis data stream for transactions, pulls a set of features from multiple feature groups, and invokes our fraud detection endpoint, as seen in the above picture."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Imports and overall setup\n",
"\n",
"### Imports and initialization"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from datetime import datetime\n",
"import numpy as np\n",
"import pandas as pd\n",
"import sagemaker\n",
"import boto3\n",
"import json\n",
"import time"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"LOCAL_DIR = './data'\n",
"BUCKET = sagemaker.Session().default_bucket()\n",
"PREFIX = 'testing'\n",
"STREAM_NAME = 'cc-stream'\n",
"\n",
"s3_client = boto3.Session().client('s3')\n",
"kinesis_client = boto3.client('kinesis')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# The exact name of the Lambda function is controlled by our CloudFormation template, \n",
"# so we access that here. We will use this to help get to the proper CloudWatch log group to see the\n",
"# results of our testing.\n",
"%store -r\n",
"predict_lambda_name"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Ensure Lambda knows which SageMaker endpoint to use\n",
"In our previous notebook that deploys a SageMaker endpoint, we allow the endpoint name to be generated on the fly instead of hard-coding a specific endpoint name. Our Lambda function that invokes the endpoint thus needs a way to know the endpoint name. We handle that through a Lambda environment variable.\n",
"\n",
"This section of code simply takes care of updating end ENDPOINT_NAME Lambda environment variable. It is important to do so before we start feeding transactions into our Kinesis stream."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"lambda_client = boto3.client('lambda')\n",
"\n",
"# Grab the latest endpoint name we used in the previous notebook, as well as the ARN for the lambda\n",
"%store -r\n",
"print(f'Updating Lambda to use endpoint: {endpoint_name} for ARN: {lambda_to_model_arn}')\n",
"\n",
"variables = lambda_client.get_function_configuration(FunctionName=lambda_to_model_arn)['Environment']['Variables']\n",
"variables['ENDPOINT_NAME'] = endpoint_name\n",
"resp = lambda_client.update_function_configuration(\n",
" FunctionName=lambda_to_model_arn,\n",
" Environment={\n",
" 'Variables': variables\n",
" }\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Access the transaction test dataset"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"test_file_path = './data/test.csv'\n",
"test_df = pd.read_csv(test_file_path)\n",
"test_df.head()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"test_df.head()\n",
"print(test_df.shape)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Test out the solution, end to end"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### First, a few utility functions"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def get_cloudwatch_logs_url(start_time, end_time):\n",
" log_group_name = '/aws/lambda/' + predict_lambda_name \n",
" # get the latest log stream for our Lambda that makes fraud predictions\n",
" cw_client = boto3.client('logs')\n",
" last_cw_evt = 0\n",
" while last_cw_evt < int(start_test_time * 1000):\n",
" streams = cw_client.describe_log_streams(logGroupName=log_group_name,\n",
" orderBy='LastEventTime',\n",
" descending=True)['logStreams']\n",
" last_cw_evt = streams[0]['lastIngestionTime'] #'lastEventTimestamp']\n",
" latest_stream = str(streams[0]['logStreamName']).replace('/', '$252F').replace('[$LATEST]', '$255B$2524LATEST$255D')\n",
" if last_cw_evt < int(start_test_time * 1000):\n",
" print('waiting for updated log stream...')\n",
" time.sleep(10)\n",
"\n",
" # produce a valid URL to get to that log stream\n",
" region = boto3.session.Session().region_name\n",
" log_group_escaped = log_group_name.replace('/', '$252F')\n",
" cw_url = f'https://console.aws.amazon.com/cloudwatch/home?region={region}#logsV2:log-groups/log-group/{log_group_escaped}'\n",
" time_filter = f'$26start$3D{int(start_test_time * 1000) - 10000}$26end$3D{int(end_test_time * 1000) + 40000}'\n",
" full_cw_url = f'{cw_url}/log-events/{latest_stream}$3FfilterPattern$3DPrediction+{time_filter}'\n",
" print('Updated log stream is ready.')\n",
" return full_cw_url"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def put_to_stream(stream_name, cc_num, merchant, amount, zip_code, timestamp):\n",
" \n",
" payload = {\n",
" 'cc_num': int(cc_num),\n",
" 'merchant': merchant,\n",
" 'amount': amount,\n",
" 'zip_code': zip_code,\n",
" 'trans_ts': timestamp\n",
" }\n",
" ret_status = True\n",
" data = json.dumps(payload)\n",
" print(f'Sending transaction on card: {cc_num}...')\n",
" response = kinesis_client.put_record(StreamName = stream_name,\n",
" Data = data,\n",
" PartitionKey = 'shard1')\n",
" \n",
" if (response['ResponseMetadata']['HTTPStatusCode'] != 200):\n",
" print(\"ERROR: Kinesis put_record failed: \\n{}\".format(json.dumps(response)))\n",
" ret_status = False\n",
" \n",
" return ret_status"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def simulate_fraud(stream_name, cc_num):\n",
" min_wait = 1; max_wait = 2\n",
" for i in range(10):\n",
" random_amt = round(np.random.uniform(1.00, 50.00), 2)\n",
" seconds_to_wait = np.random.uniform(min_wait, max_wait)\n",
" print(f'waiting {seconds_to_wait:.1f} seconds to send trans {i}...')\n",
" time.sleep(seconds_to_wait)\n",
" put_to_stream(stream_name, int(cc_num), 'Random Corp', random_amt, '03099', time.time())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Send some transactions, and see the results\n",
"To show that the solution works, we send a single transaction to each of three different credit cards. Then, we simulate a fraud attack on a 4th credit card by sending many transactions in quick succession. The output from our Lambda function is then shown from CloudWatch log streams. Here's an example of what you should see as a result:\n",
"\n",
"
\n",
"\n",
"As expected, the first three one-off transactions are predicted as NOT FRAUD. Of the ten fraudulent transactions, the first is predicted as NOT FRAUD, and the rest are all correctly identified as FRAUD. Notice how the aggregate features are kept current, helping drive more accurate predictions.\n",
"\n",
"Now let's give it a shot."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"cc_nums = test_df.cc_num.unique()[10:14]\n",
"\n",
"start_test_time = time.time() \n",
"\n",
"put_to_stream(STREAM_NAME, cc_nums[0], 'Merchant-0', round(np.random.uniform(100, 5000), 2), 'zip-0', time.time())\n",
"put_to_stream(STREAM_NAME, cc_nums[1], 'Merchant-1', round(np.random.uniform(100, 5000), 2), 'zip-1', time.time())\n",
"put_to_stream(STREAM_NAME, cc_nums[2], 'Merchant-2', round(np.random.uniform(100, 5000), 2), 'zip-2', time.time())\n",
"\n",
"print('\\nNow simulate a fraud attack...')\n",
"fraud_cc_num = cc_nums[3]\n",
"simulate_fraud(STREAM_NAME, fraud_cc_num)\n",
"\n",
"end_test_time = time.time() "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Results can be seen in the CloudWatch log stream of our Lambda function\n",
"The following cell dynamically creates a link to view the results. It waits for the CloudWatch log stream to have the output events from the transactions we just sent. The URL also hones in on the output from the specific timeframe of the transactions."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from IPython.core.display import display, HTML\n",
"\n",
"full_cw_url = get_cloudwatch_logs_url(start_test_time, end_test_time)\n",
"display(HTML(f'Review results in this log stream Lambda fraud detection results'))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Feed a stream of transactions [optional]\n",
"If you would like to send additional credit card transactions to simulate more input traffic to the feature pipeline, you can pull from the test dataset as shown below. Just pass in how many transactions you want to send, and the max wait time between transactions (in seconds)."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import time\n",
"\n",
"def simulate_traffic(df, max_wait, num_trans):\n",
" for i in range(num_trans):\n",
" row = test_df.iloc[i]\n",
" cc_num = row['cc_num']\n",
" zip_code = '0'\n",
" merchant = 'A'\n",
" amt = row['amount']\n",
" print(f'cc_num: {cc_num}, amt: {amt}')\n",
" seconds_to_wait = int(np.random.uniform(0.1, max_wait))\n",
" print(f'waiting {seconds_to_wait} seconds to send trans {i}...')\n",
" time.sleep(seconds_to_wait)\n",
" print(f' putting trans with card: {cc_num}, amt: {amt}, zip: {zip_code}, merchant: {merchant}')\n",
" status = put_to_stream(STREAM_NAME,cc_num, merchant, amt, zip_code, time.time())\n",
" if (not status):\n",
" print('Error found during write to Kinesis Stream')\n",
" break\n",
" i += 1\n",
" if i > (num_trans -1):\n",
" break"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#simulate_traffic(test_df, 2, 5)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "conda_python3",
"language": "python",
"name": "conda_python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.10"
}
},
"nbformat": 4,
"nbformat_minor": 4
}