# Graph Fraud Detection with Neptune ML

In this module, we will run an end to end pipeline to train a fraud detection model using graph neural networks. The steps will include the following:


* Fraud detection dataset
* Export and Processing
* Model training
* Inference queries


**Fraud Detection** is a set of techniques and analyses that allow organizations to identify and prevent unauthorized activity. Fraud can also be any kind of abuse to the system in place to gain undeserved benefits. This can include fraudulent credit card transactions, identify theft, insurance scams, etc. Fraudesters can collude to commit illegal activities and strive to make it look normal so it can be difficult to detect. The most effective solutions that fights fraud use a multifaceted approaches that integrates several of techniques. One of these techniques is the use of graphs.

Graphs allow us to understand the relationship between various entities and how they are connected together which help in detecting fraud patterns that couldn't be detected by traditional methods. In this workshop, we will go through building a ML model from a graph database and train a graph neural network to estimate the probability of fraud for a certain transaction.

### 1- Restore Variables

In [None]:
%store -r

### 3- Establish Connection with Neptune Graph

The next cell of code will establish a connection with the Neptune graph DB using a python wrapper around Apache TinkerPop Gremlin. Apache TinkerPop is a graph computing framework for both graph databases (OLTP) and graph analytic systems (OLAP). Gremlin is the graph traversal language of TinkerPop. It is a functional, data-flow language that enables users to write complex traversals on (or queries of) their application’s property graph. 

Once we establish the remote graph connection, we can traverse through the graph and run different queries on the graph object

In [None]:
from __future__ import print_function # Python 2/3 compatibility

from gremlin_python import statics
from gremlin_python.structure.graph import Graph
from gremlin_python.process.graph_traversal import __
from gremlin_python.process.strategies import *
from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection

graph = Graph()

remoteConn = DriverRemoteConnection('wss://'+NEPTUNE_ENDPOINT+':8182/gremlin','g')
g = graph.traversal().withRemote(remoteConn)

### 5- Reset the Neptune Database (Optional)

If you created a new Neptune cluster for this excercise, no need to run this step. This step will make sure that the database is empty before populating it with the new data.

#### 5.1- Initiate a DB reset

In [None]:
%%bash -s "$NEPTUNE_ENDPOINT" --out RESPONSE

awscurl -X POST \
-H 'Content-Type: application/json' https://$1:8182/system \
-d '{ "action" : "initiateDatabaseReset" }'

#### 5.2- Process the respose and get the token

In [None]:
import ast
reset_token = ast.literal_eval(RESPONSE)['payload']['token']

#### 5.3- Perform the DB Reset Using the Token

Replace the Token ID below with the one from the output above. The next cell will initiate the DB reset

In [None]:
%%bash -s "$NEPTUNE_ENDPOINT" "$reset_token"

awscurl -X POST -H 'Content-Type: application/json' https://$1:8182/system -d '
{ 
"action": "performDatabaseReset" ,
"token" : "'${2}'"
}'

#### 5.4- Scale up the Neptune Instance Size for the Export Process (if needed)
The bulk loader uses most of the free CPU cycles available in the cluster. Scaling up the instance before ingesting the graph data will help make the process much faster.

In [None]:
!aws neptune modify-db-instance --db-instance-identifier $NEPTUNE_INSTANCE_ID --apply-immediately --db-instance-class db.r5.12xlarge

Now, you can go to the Neptune Cluster console and wait for the new larger instance to be added

### 6- Preparing the data

#### 6.1- Loading the data

Amazon Neptune, has a Bulk Loader to ingest data into the db. In the next block of code, we will use the loader API and point to the location of the files to upload them to Neptune.

In this example, we are using the `OVERSUBSCRIBE` parallelism parameter. This parameter sets the bulk loader to use all available CPU resources when it runs. It generally takes 60%-70% of CPU capacity to keep the operation running as fast as I/O constraints permit.


Loading data from an Amazon Simple Storage Service (Amazon S3) bucket requires an AWS Identity and Access Management (IAM) role that has access to the bucket. Follow the instructions here: https://docs.aws.amazon.com/neptune/latest/userguide/bulk-load-tutorial-IAM.html. In this instance, we call it `LoadFromNeptune`

#### 6.3- Upload the dataset to S3 bucket

In [None]:
!aws s3 cp --recursive ./data/ s3://$BUCKET/$PREFIX/ --exclude "*" --include "*_vertices.csv"
!aws s3 cp data/edges.csv s3://$BUCKET/$PREFIX/

#### 6.4- Import the data into the Cluster 

In [None]:
%%sh -s "$NEPTUNE_ENDPOINT" "$BUCKET" "$ACCOUNT_ID" "$REGION" --out loadId

awscurl -X POST -H 'Content-Type: application/json' https://$1:8182/loader -d '
 { 
 "region" : "'${4}'", 
 "source" : "s3://'$2'/credit-transaction-fraud/", 
 "format" : "csv", 
 "iamRoleArn" : "arn:aws:iam::'$3':role/LoadFromNeptune", 
 "parallelism" : "OVERSUBSCRIBE",
 "queueRequest": "TRUE"
 }'

In [None]:
load_id = ast.literal_eval(loadId)['payload']['loadId']

#### 6.5 Get the status of the load

Loading the data can take ~7 minutes.

In [None]:
%%bash -s "$NEPTUNE_ENDPOINT" "$load_id"

awscurl -X GET 'https://'"$1"':8182/loader?loadId='$2''

#### Verbose status information

If the bulk load failed for any reason, you can get more details on the error and location of the logs from the command below

In [None]:
!awscurl -X GET 'https://$NEPTUNE_ENDPOINT:8182/loader/'$load_id'?details=true&errors=true&page=1&errorsPerPage=3'

#### Drop 10% of the fraud labels

**NOTE: You must wait for the bulk load from previous step to complete first before dropping the fraud labels**

Once the data ingestion is complete, we need to simulate entities with no labels so that the algorithm learn their label during training. In the next cell, we drop 10% of the transactions' labels so that the graph can infer this 10% after training the graph

In [None]:
#pick a random range of transactions
ids = [*range(2987000, 2992000)]
idss = [str(id) for id in ids]

#Save their values before dropping them
fraud_labels = g.V(idss).hasLabel('Transaction').valueMap('isFraud').toList()

#drop their fraud labels
g.V(idss).hasLabel('Transaction').properties('isFraud').drop().toList()

#### Count the entities with no labels

In [None]:
g.V().hasLabel('Transaction').hasNot('isFraud').count().toList()

### 7- Preparing for Export

Neptune ML requires that you provide training data for the Deep Graph Library (DGL) to create and test models using Amazon SageMaker in your account. To do this, you can export data from Neptune using an open-source tool named [neptune-export](https://github.com/awslabs/amazon-neptune-tools/tree/master/neptune-export). 

You can use the tool either as a service (the Neptune-Export service) or as the Java neptune-export command line tool. The next block of code shows how to trigger the Neptune export through the API

In the export command, we can pass parameters in the additionalParams field to guide the creation of a training data configuration file.

#### 7.1- Invoke the export process

In [None]:
%%bash -s "$NEPTUNE_ENDPOINT" --out response 

awscurl --region us-east-2 -X POST -H 'Content-Type: application/json' -d ' 
 { "command": "export-pg", 
 "params": { 
 "endpoint": "",
 "cloneCluster": false,
 "cloneClusterInstanceType": "r5.8xlarge"
 },
 
 "additionalParams": {
 "neptune_ml": {
 "version": "v2.0",
 "split_rate": [0.8,0.1,0.1],
 "targets": [
 {
 "node": "Transaction",
 "property": "isFraud",
 "type": "classification"
 }
 ]
 }
 },
 "outputS3Path": "s3:///neptune-export", 
 "jobSize": "medium" }' 

 

#### 7.2- Get the job ID from the Previous Job

In [None]:
import ast
jobId = ast.literal_eval(response)['jobId']

#### 7.3- Check the Status of the Export Job

The export job above will spin up an instance and create a clone for the Neptune cluster to avoid disrubting the cluster. The clone will be teared down once the export job is complete. Wait until the export job status is **Successful** before proceeding with the next steps

In [None]:
%%bash -s "$jobId" --out export_response

awscurl --region us-east-2 https://r7zvc0y2ji.execute-api.us-east-2.amazonaws.com/Deployment/neptune-export/$1

In [None]:
ast.literal_eval(export_response)

Now we wait until the status of the export job is completed successfully

#### 7.4 Get the Output S3 Location

In [None]:
outputS3Uri = ast.literal_eval(export_response)['outputS3Uri']

#### 7.5 Examine the Training Configurations File

In [None]:
import json
!aws s3 cp $outputS3Uri/training-data-configuration.json ./

with open('training-data-configuration.json', 'r') as handle:
 parsed = json.load(handle)
parsed 

Neptune ML infers the data types of the entities and its properties automatically but you can also set them manually in the training configurations file. We've already modified some of the data types in the configuration file and defined some pre-processing steps that will be handled by Neptune ML 

#### 7.6 Copy the JSON file to examine its content

In [None]:
!aws s3 cp $outputS3Uri/training-data-configuration.json ./

#### 7.7 Copy the training configurations file to S3 output location

After modifying any necessary fields, upload the file back to the S3 output location

In [None]:
!aws s3 cp ./training-data-configuration.json $outputS3Uri/

### 8- Model training


Model training in Neptune ML is a 2-step process: The first step is to run a SageMaker Processing job to carry out any data pre-processing needed before training - such as categorical features encoding, data imputation, numerical features scaling, etc.

#### 8.1 Data Pre-processing and Feature Engineering
##### 8.1.1 Define the Training and Processing IDs

In [None]:
import time
epoch_time = int(time.time())
TRAINING_ID = 'data-training-' + str(epoch_time)
PROCESSING_ID = 'data-processing-' + str(epoch_time)
ENDPOINT_ID = 'endpoint-' + str(epoch_time)

##### 8.1.2 Invoke the Data Processing Job

In [None]:
%%bash -s "$NEPTUNE_ENDPOINT" "$REGION" "$BUCKET" "$outputS3Uri" "$TRAINING_ID" "$PROCESSING_ID"

awscurl --region $2 --service neptune-db -X POST https://$1:8182/ml/dataprocessing -H 'Content-Type: application/json' -d '
 {
 "inputDataS3Location" : "'${4}'/",
 "id" : "'${6}'",
 "processedDataS3Location" : "s3://'${3}'/neptune-export/output/",
 "processingInstanceType": "ml.r5.16xlarge"
 }'

##### 8.1.3 Check the Processing Job Status

In [None]:
%%bash -s "$NEPTUNE_ENDPOINT" "$PROCESSING_ID" --out preprocess_response

curl -s https://${1}:8182/ml/dataprocessing/${2}

In [None]:
ast.literal_eval(preprocess_response)

#### 8.2 Examine the generated HPO Config file

In [None]:
preprocess_location = ast.literal_eval(preprocess_response)['processingJob']['outputLocation']
!aws s3 cp $preprocess_location/model-hpo-configuration.json ./config/
with open('model-hpo-configuration.json', 'r') as handle:
 parsed = json.load(handle)
parsed 

##### 8.2.1 Let's Change some HPs

In [None]:
HPO_file = open("model-hpo-configuration.json", "r")
HPO_JSON = json.load(HPO_file)

#change the objective metric to ROC AUC
HPO_JSON["models"][0]['eval_metric']['metric'] = 'roc_auc'

#change the frequency of evaluation to 3 epochs instead of 1
HPO_JSON["models"][0]['eval_frequency']['value'] = '3'

HPO_file = open("model-hpo-configuration.json", "w")
json.dump(HPO_JSON, HPO_file)
HPO_file.close()

#upload the new model HPO configuration file to S3 processing output location
!aws s3 cp config/model-hpo-configuration.json $preprocess_location/

#### 8.2 Train the Model

In [None]:
%%bash -s "$NEPTUNE_ENDPOINT" "$REGION" "$BUCKET" "$TRAINING_ID" "$PROCESSING_ID"

awscurl --region $2 --service neptune-db -X POST https://$1:8182/ml/modeltraining -H 'Content-Type: application/json' -d '
 {
 "id" : "'${4}'",
 "dataProcessingJobId" : "'${5}'",
 "trainModelS3Location" : "s3://'${3}'/neptune-export/neptune-model-graph-autotrainer",
 "trainingInstanceType" : "ml.p3.2xlarge",
 "maxHPONumberOfTrainingJobs": 2
 }'

In [None]:
%%bash -s "$NEPTUNE_ENDPOINT" "$TRAINING_ID" --out training_response

curl -s https://${1}:8182/ml/modeltraining/${2}

In [None]:
ast.literal_eval(training_response)

### 9- Store the Variables

In [None]:
%store BUCKET
%store REGION
%store ACCOUNT_ID
%store PREFIX
%store outputS3Uri
%store fraud_labels
%store idss
%store NEPTUNE_ENDPOINT
%store NEPTUNE_LOAD_ROLE
%store PROCESSING_ID
%store TRAINING_ID
%store ENDPOINT_ID