# Making predictions using streaming aggregated features

All prior notebooks have been setting up our end to end solution. Now that all those steps are complete, it is time to see the solution in action. In this notebook, we will send credit card transactions to our source Kafka topic and show that we can detect fraud. We take advantage of multiple online feature groups in the Amazon SageMaker Feature Store. One of those feature groups is refreshed by a processing job, which we would run nightly to provide aggregate features looking back one week. The other feature group uses streaming ingestion to aggregate features that look back over a rolling 10-minute window.

![SegmentLocal](images/streaming_prediction.png "connection")

### Recap of what is in place

Here is a recap of what we have done so far:

1. In [notebook 0](./0_prepare_transactions_dataset.ipynb), We generated a synthetic dataset of transactions, including simulated fraud attacks.
2. In [notebook 1](./1_setup.ipynb), we created our two feature groups. In that same notebook, we also created a Kinesis data stream and a Kinesis Data Analytics SQL application that consumes the transaction stream and produces aggregate features. These features are provided in near real time to Lambda, and they look back over a 10 minute window.
3. In [notebook 2](./2_batch_ingestion.ipynb), we used a SageMaker Processing Job to create aggregated features and used them to feed both the training dataset as well as an online feature group.
4. In [notebook 3](./3_train_and_deploy_model.ipynb), we trained and deployed an XGBoost model to detect fraud.
5. Our [CloudFormation template](https://console.aws.amazon.com/cloudformation/home) deployed a pair of Lambda functions. One listens to the MSK topic output and keeps the `` feature group up to date. The other Lambda listens to the Kinesis data stream for transactions, pulls a set of features from multiple feature groups, and invokes our fraud detection endpoint, as seen in the above picture.

**Recommended settings to run this notebook in SageMaker Studio:**

- Image: Data Science
- Kernel: Python3
- Instance type: ml.m5.large (2 vCPU + 8 GiB)

**Important Note:**

DO NOT "Run All Cells" on this notebook, manual steps are needed for successful execution

Do not proceed with this notebook unless upstream notebooks noted in recap are fully executed including the manual steps.

## Imports and overall setup

### Imports and initialization

In [None]:
!pip install pykafka

In [None]:
from datetime import datetime
from pykafka import KafkaClient
import numpy as np
import pandas as pd
import sagemaker
import boto3
import json
import time

The MSK cluster required would be created via the CloudFormation template. Retrieve the connection string for the pre-created MSK cluster using steps shown in this animation. Use that connection string in cell below.

![SegmentLocal](images/get-msk-cluster-connection-string.gif "connection")

In [None]:
LOCAL_DIR = './data'
BUCKET = sagemaker.Session().default_bucket()
PREFIX = 'testing'
TOPIC_NAME = 'cctopic'
KAFKA_HOST = ''

s3_client = boto3.Session().client('s3')
kinesis_client = boto3.client('kinesis')

In [None]:
# The exact name of the Lambda function is controlled by our CloudFormation template, 
# so we access that here. We will use this to help get to the proper CloudWatch log group to see the
# results of our testing.
%store -r
predict_lambda_name

### Ensure Lambda knows which SageMaker endpoint to use
In our previous notebook that deploys a SageMaker endpoint, we allow the endpoint name to be generated on the fly instead of hard-coding a specific endpoint name. Our Lambda function that invokes the endpoint thus needs a way to know the endpoint name. We handle that through a Lambda environment variable.

This section of code simply takes care of updating end ENDPOINT_NAME Lambda environment variable. It is important to do so before we start feeding transactions into our Kinesis stream.

In [None]:
lambda_client = boto3.client('lambda')

# Grab the latest endpoint name we used in the previous notebook, as well as the ARN for the lambda
%store -r
print(f'Updating Lambda to use endpoint: {endpoint_name} for ARN: {lambda_to_model_arn}')

variables = lambda_client.get_function_configuration(FunctionName=lambda_to_model_arn)['Environment']['Variables']
variables['ENDPOINT_NAME'] = endpoint_name
resp = lambda_client.update_function_configuration(
 FunctionName=lambda_to_model_arn,
 Environment={
 'Variables': variables
 }
)

### Access the transaction test dataset

In [None]:
test_file_path = './data/test.csv'
test_df = pd.read_csv(test_file_path)
test_df.head()

In [None]:
test_df.head()
print(test_df.shape)

## Test out the solution, end to end

### First, a few utility functions

In [None]:
def get_cloudwatch_logs_url(start_time, end_time):
 log_group_name = '/aws/lambda/' + predict_lambda_name 
 # get the latest log stream for our Lambda that makes fraud predictions
 cw_client = boto3.client('logs')
 last_cw_evt = 0
 while last_cw_evt < int(start_test_time * 1000):
 streams = cw_client.describe_log_streams(logGroupName=log_group_name,
 orderBy='LastEventTime',
 descending=True)['logStreams']
 last_cw_evt = streams[0]['lastIngestionTime'] #'lastEventTimestamp']
 latest_stream = str(streams[0]['logStreamName']).replace('/', '$252F').replace('[$LATEST]', '$255B$2524LATEST$255D')
 if last_cw_evt < int(start_test_time * 1000):
 print('waiting for updated log stream...')
 time.sleep(10)

 # produce a valid URL to get to that log stream
 region = boto3.session.Session().region_name
 log_group_escaped = log_group_name.replace('/', '$252F')
 cw_url = f'https://console.aws.amazon.com/cloudwatch/home?region={region}#logsV2:log-groups/log-group/{log_group_escaped}'
 time_filter = f'$26start$3D{int(start_test_time * 1000) - 10000}$26end$3D{int(end_test_time * 1000) + 40000}'
 full_cw_url = f'{cw_url}/log-events/{latest_stream}$3FfilterPattern$3DPrediction+{time_filter}'
 print('Updated log stream is ready.')
 return full_cw_url

In [None]:
def put_to_topic(topic_name, cc_num, merchant, amount, zip_code, timestamp):
 
 client = KafkaClient(hosts = KAFKA_HOST)
 topic = client.topics[TOPIC_NAME]
 payload = {
 'cc_num': int(cc_num),
 'merchant': merchant,
 'amount': amount,
 'zip_code': zip_code,
 'trans_ts': timestamp
 }
 ret_status = True
 data = json.dumps(payload)
 encoded_message = data.encode("utf-8")
 
 
 print(f'Sending transaction on card: {cc_num}...')
 
 with topic.get_sync_producer() as producer:
 
 result=producer.produce(encoded_message)
 #error handling - TODO

 
 return ret_status

In [None]:
def simulate_fraud_kafka(topic_name, cc_num):
 min_wait = 1; max_wait = 2
 for i in range(10):
 random_amt = round(np.random.uniform(1.00, 50.00), 2)
 seconds_to_wait = np.random.uniform(min_wait, max_wait)
 print(f'waiting {seconds_to_wait:.1f} seconds to send trans {i}...')
 time.sleep(seconds_to_wait)
 put_to_topic(topic_name, int(cc_num), 'Random Corp', random_amt, '03099', time.time())

### Send some transactions, and see the results
To show that the solution works, we send a single transaction to each of three different credit cards. Then, we simulate a fraud attack on a 4th credit card by sending many transactions in quick succession. The output from our Lambda function is then shown from CloudWatch log streams. Here's an example of what you should see as a result:

![SegmentLocal](images/log_results.png "connection")

As expected, the first three one-off transactions are predicted as NOT FRAUD. Of the ten fraudulent transactions, the first is predicted as NOT FRAUD, and the rest are all correctly identified as FRAUD. Notice how the aggregate features are kept current, helping drive more accurate predictions.

Now let's give it a shot.

In [None]:
#KAFKA - Simulate
cc_nums = test_df.cc_num.unique()[10:14]

start_test_time = time.time() 

put_to_topic(TOPIC_NAME, cc_nums[0], 'Merchant-0', round(np.random.uniform(100, 5000), 2), 'zip-0', time.time())
put_to_topic(TOPIC_NAME, cc_nums[1], 'Merchant-1', round(np.random.uniform(100, 5000), 2), 'zip-1', time.time())
put_to_topic(TOPIC_NAME, cc_nums[2], 'Merchant-2', round(np.random.uniform(100, 5000), 2), 'zip-2', time.time())

print('\nNow simulate a fraud attack...')
fraud_cc_num = cc_nums[3]
simulate_fraud_kafka(TOPIC_NAME, fraud_cc_num)

end_test_time = time.time()

Results can be seen in Apache Flink dashboard created as part of the Setup notebook.

![SegmentLocal](images/apache-flink-dashboard.png "connection")

### Results can also be seen in the CloudWatch log stream of our Lambda function
The following cell dynamically creates a link to view the results. It waits for the CloudWatch log stream to have the output events from the transactions we just sent. The URL also hones in on the output from the specific timeframe of the transactions.

In [None]:
from IPython.core.display import display, HTML

full_cw_url = get_cloudwatch_logs_url(start_test_time, end_test_time)
display(HTML(f'Review results in this log stream Lambda fraud detection results'))

##### KDA application monitoring

When transactions are sent to the source MSK topic, they are aggregated and sent to the target MSK topic from where the associated feature group is populated. 

![SegmentLocal](images/kda-app-workflow.png "connection")

Verify record transmission in the Apache Flink dashboard that gets created in the last manual step of 1_setup.ipynb.

##### AWS Glue Catalogs

Additionally, you can check out the AWS Glue tables and related metadata by navigating to the AWS Glue console.

![SegmentLocal](images/glue-tables.png "connection")