# Batch Ingestion
**This notebook aggregates raw features into new derived features that is used for Fraud Detection model training/inference.**

---

## Contents

1. [Background](#Background)
1. [Setup](#Setup)
1. [Create PySpark Processing Script](#Create-PySpark-Processing-Script)
1. [Run SageMaker Processing Job](#Run-SageMaker-Processing-Job)
1. [Explore Aggregated Features](#Explore-Aggregated-Features)
1. [Validate Feature Group for Records](#Validate-Feature-Group-for-Records)

**Recommended settings to run this notebook in SageMaker Studio:**

- Image: Data Science
- Kernel: Python3
- Instance type: <font color='blue'>ml.m5.large (2 vCPU + 8 GiB)</font>

### Background

- This notebook takes raw credit card transactions data (csv) generated by 
[notebook 0](./0_prepare_transactions_dataset.ipynb) and aggregates the raw features to create new features (ratios) via <b>SageMaker Processing</b> PySpark Job. These aggregated features alongside the raw original features will be leveraged in the training phase of a Credit Card Fraud Detection model in the next step (see notebook [notebook 3](./3_train_and_deploy_model.ipynb)).

- As part of the Spark job, we also select the latest weekly aggregated features - `num_trans_last_1w` and `avg_amt_last_1w` grouped by `cc_num` (credit card number) and populate these features into the <b>SageMaker Online Feature Store</b> as a feature group. This feature group (`cc-agg-batch-fg`) was created in notebook [notebook 1](./1_setup.ipynb).

- [Amazon SageMaker Processing](https://aws.amazon.com/about-aws/whats-new/2020/09/amazon-sagemaker-processing-now-supports-built-in-spark-containers-for-big-data-processing/) lets customers run analytics jobs for data engineering and model evaluation on Amazon SageMaker easily and at scale. It provides a fully managed Spark environment for data processing or feature engineering workloads.

![SegmentLocal](images/batch_ingestion.png "connection")

### Setup

#### Imports 

In [None]:
from sagemaker.spark.processing import PySparkProcessor
import pandas as pd
import numpy as np
import sagemaker
import logging
import random
import boto3
import time

In [None]:
print(f'Using SageMaker version: {sagemaker.__version__}')

In [None]:
%store -r

#### Setup Logger

In [None]:
logger = logging.getLogger('sagemaker')
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())

In [None]:
logger.info('[Batch Aggregation using SageMaker PySpark Processing Job]')

#### Essentials

In [None]:
sagemaker_role = sagemaker.get_execution_role()
BUCKET = sagemaker.Session().default_bucket()
INPUT_KEY_PREFIX = 'raw'
OUTPUT_KEY_PREFIX = 'aggregated'
LOCAL_DIR = './data'

### Create PySpark Script
This PySpark script does the following:

1. Aggregates raw features to derive new features (ratios).
2. Saves the aggregated features alongside the original raw features into a CSV file and writes it to S3 - will be used in the next step for model training.
3. Groups the aggregated features by credit card number and picks selected aggregated features to write to SageMaker Feature Store (Online). <br>
<b>Note: </b> The feature group was created in the previous notebook (`1_setup.ipynb`)

In [None]:
%%writefile batch_aggregation_temp.py
from pyspark.sql.types import StructField, StructType, StringType, DoubleType, TimestampType, LongType
from pyspark.sql.functions import desc, dense_rank
from pyspark.sql import SparkSession, DataFrame
from  argparse import Namespace, ArgumentParser
from pyspark.sql.window import Window
import argparse
import logging
import boto3
import time
import sys
import os


TOTAL_UNIQUE_USERS = 10000
FEATURE_GROUP = '|agg_batch_fg_name|'

logger = logging.getLogger('sagemaker')
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())


feature_store_client = boto3.client(service_name='sagemaker-featurestore-runtime')


def parse_args() -> Namespace:
    parser = ArgumentParser(description='Spark Job Input and Output Args')
    parser.add_argument('--s3_input_bucket', type=str, help='S3 Input Bucket')
    parser.add_argument('--s3_input_key_prefix', type=str, help='S3 Input Key Prefix')
    parser.add_argument('--s3_output_bucket', type=str, help='S3 Output Bucket')
    parser.add_argument('--s3_output_key_prefix', type=str, help='S3 Output Key Prefix')
    args = parser.parse_args()
    return args
    

def define_schema() -> StructType:
    schema = StructType([StructField('tid', StringType(), True),
                         StructField('datetime', TimestampType(), True),
                         StructField('cc_num', LongType(), True),
                         StructField('amount', DoubleType(), True),
                         StructField('fraud_label', StringType(), True)])
    return schema


def aggregate_features(args: Namespace, schema: StructType, spark: SparkSession) -> DataFrame:
    logger.info('[Read Raw Transactions Data as Spark DataFrame]')
    transactions_df = spark.read.csv(f's3a://{os.path.join(args.s3_input_bucket, args.s3_input_key_prefix)}', \
                                     header=False, \
                                     schema=schema)
    logger.info('[Aggregate Transactions to Derive New Features using Spark SQL]')
    query = """
    SELECT *, \
           avg_amt_last_10m/avg_amt_last_1w AS amt_ratio1, \
           amount/avg_amt_last_1w AS amt_ratio2, \
           num_trans_last_10m/num_trans_last_1w AS count_ratio \
    FROM \
        ( \
        SELECT *, \
               COUNT(*) OVER w1 as num_trans_last_10m, \
               AVG(amount) OVER w1 as avg_amt_last_10m, \
               COUNT(*) OVER w2 as num_trans_last_1w, \
               AVG(amount) OVER w2 as avg_amt_last_1w \
        FROM transactions_df \
        WINDOW \
               w1 AS (PARTITION BY cc_num order by cast(datetime AS timestamp) RANGE INTERVAL 10 MINUTE PRECEDING), \
               w2 AS (PARTITION BY cc_num order by cast(datetime AS timestamp) RANGE INTERVAL 1 WEEK PRECEDING) \
        ) 
    """
    transactions_df.registerTempTable('transactions_df')
    aggregated_features = spark.sql(query)
    return aggregated_features


def write_to_s3(args: Namespace, aggregated_features: DataFrame) -> None:
    logger.info('[Write Aggregated Features to S3]')
    aggregated_features.coalesce(1) \
                       .write.format('com.databricks.spark.csv') \
                       .option('header', True) \
                       .mode('overwrite') \
                       .option('sep', ',') \
                       .save('s3a://' + os.path.join(args.s3_output_bucket, args.s3_output_key_prefix))
    
def group_by_card_number(aggregated_features: DataFrame) -> DataFrame: 
    logger.info('[Group Aggregated Features by Card Number]')
    window = Window.partitionBy('cc_num').orderBy(desc('datetime'))
    sorted_df = aggregated_features.withColumn('rank', dense_rank().over(window))
    grouped_df = sorted_df.filter(sorted_df.rank == 1).drop(sorted_df.rank)
    sliced_df = grouped_df.select('cc_num', 'num_trans_last_1w', 'avg_amt_last_1w')
    return sliced_df


def transform_row(sliced_df: DataFrame) -> list:
    logger.info('[Transform Spark DataFrame Row to SageMaker Feature Store Record]')
    records = []
    for row in sliced_df.rdd.collect():
        record = []
        cc_num, num_trans_last_1w, avg_amt_last_1w = row
        if cc_num:
            record.append({'ValueAsString': str(cc_num), 'FeatureName': 'cc_num'})
            record.append({'ValueAsString': str(num_trans_last_1w), 'FeatureName': 'num_trans_last_1w'})
            record.append({'ValueAsString': str(round(avg_amt_last_1w, 2)), 'FeatureName': 'avg_amt_last_1w'})
            records.append(record)
    return records


def write_to_feature_store(records: list) -> None:
    logger.info('[Write Grouped Features to SageMaker Online Feature Store]')
    success, fail = 0, 0
    for record in records:
        event_time_feature = {
                'FeatureName': 'trans_time',
                'ValueAsString': str(int(round(time.time())))
            }
        record.append(event_time_feature)
        response = feature_store_client.put_record(FeatureGroupName=FEATURE_GROUP, Record=record)
        if response['ResponseMetadata']['HTTPStatusCode'] == 200:
            success += 1
        else:
            fail += 1
    logger.info('Success = {}'.format(success))
    logger.info('Fail = {}'.format(fail))
    assert success == TOTAL_UNIQUE_USERS
    assert fail == 0


def run_spark_job():
    spark = SparkSession.builder.appName('PySparkJob').getOrCreate()
    args = parse_args()
    schema = define_schema()
    aggregated_features = aggregate_features(args, schema, spark)
    write_to_s3(args, aggregated_features)
    sliced_df = group_by_card_number(aggregated_features)
    records = transform_row(sliced_df)
    write_to_feature_store(records)
    
    
if __name__ == '__main__':
    run_spark_job()

In [None]:
# Read in the file
with open('batch_aggregation_temp.py', 'r') as file :
  filedata = file.read()

# Replace the target string
filedata = filedata.replace('|agg_batch_fg_name|', aggregate_batch_feature_group_name)

# Write the file out again
with open('batch_aggregation.py', 'w') as file:
  file.write(filedata)

### Run SageMaker Processing Job

In [None]:
spark_processor = PySparkProcessor(base_job_name='sagemaker-processing', 
                                   framework_version='2.4', # spark version
                                   role=sagemaker_role, 
                                   instance_count=1, 
                                   instance_type='ml.m5.4xlarge', 
                                   env={'AWS_DEFAULT_REGION': boto3.Session().region_name},
                                   max_runtime_in_seconds=1200)

In [None]:
%%time

spark_processor.run(submit_app='batch_aggregation.py', 
                    arguments=['--s3_input_bucket', BUCKET, 
                               '--s3_input_key_prefix', INPUT_KEY_PREFIX, 
                               '--s3_output_bucket', BUCKET, 
                               '--s3_output_key_prefix', OUTPUT_KEY_PREFIX],
                    spark_event_logs_s3_uri='s3://{}/logs'.format(BUCKET),
                    logs=False)

### Explore Aggregated Features 
<p> The SageMaker Processing Job above creates the aggregated features alongside the raw features and writes it to S3. 
Let us verify this output using the code below and prep it to be used in the next step for model training.</p>


Copy results csv from S3 to local directory. Below step may fail if you are running the notebook for the first time. Hence any file missing error could be safely ignored.

In [None]:
!rm {LOCAL_DIR}/{OUTPUT_KEY_PREFIX}/part*.csv

In [None]:
!aws s3 cp s3://{BUCKET}/{OUTPUT_KEY_PREFIX}/ {LOCAL_DIR}/{OUTPUT_KEY_PREFIX}/ --recursive --exclude '_SUCCESS'

In [None]:
!mv {LOCAL_DIR}/{OUTPUT_KEY_PREFIX}/part*.csv {LOCAL_DIR}/{OUTPUT_KEY_PREFIX}/part.csv 

In [None]:
agg_features = pd.read_csv(f'{LOCAL_DIR}/{OUTPUT_KEY_PREFIX}/part.csv')
agg_features.dropna(inplace=True)
agg_features['cc_num'] = agg_features['cc_num'].astype(np.int64)
agg_features['fraud_label'] = agg_features['fraud_label'].astype(np.int64)
agg_features.head()

In [None]:
agg_features.to_csv(f'{LOCAL_DIR}/{OUTPUT_KEY_PREFIX}/processing_output.csv', index=False)

Remove the intermediate `part.csv` file

In [None]:
!rm {LOCAL_DIR}/{OUTPUT_KEY_PREFIX}/part.csv

### Validate Feature Group for Records
Let's randomly pick N credit card numbers from the `processing_output.csv` and verify if records exist in the feature group `<aggregate_batch_feature_group_name>` for these card numbers.

In [None]:
N = 3 # number of random records to validate
FEATURE_GROUP = aggregate_batch_feature_group_name

In [None]:
processing_out_df = pd.read_csv(f'{LOCAL_DIR}/{OUTPUT_KEY_PREFIX}/processing_output.csv', nrows=10000)
processing_out_df.head()

In [None]:
cc_nums = random.sample(processing_out_df['cc_num'].tolist(), N)
cc_nums

Using SageMaker Feature Store run-time client, we can verify if records exist in the feature group for the picked `cc_nums` 

In [None]:
feature_store_client = boto3.Session().client(service_name='sagemaker-featurestore-runtime')

In [None]:
success, fail = 0, 0
for cc_num in cc_nums:
    response = feature_store_client.get_record(FeatureGroupName=FEATURE_GROUP, 
                                               RecordIdentifierValueAsString=str(cc_num))
    if response['ResponseMetadata']['HTTPStatusCode'] == 200 and 'Record' in response.keys():
        success += 1
        print(response['Record'])
    else:
        print(response)
        fail += 1
assert success == N