{ "cells": [ { "cell_type": "markdown", "id": "8975db06", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "# Graph Fraud Detection with Neptune ML" ] }, { "cell_type": "markdown", "id": "73b417d6", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "In this module, we will run an end to end pipeline to train a fraud detection model using graph neural networks. The steps will include the following:\n", "\n", "\n", "* Fraud detection dataset\n", "* Export and Processing\n", "* Model training\n", "* Inference queries\n", "\n", "\n", "**Fraud Detection** is a set of techniques and analyses that allow organizations to identify and prevent unauthorized activity. Fraud can also be any kind of abuse to the system in place to gain undeserved benefits. This can include fraudulent credit card transactions, identify theft, insurance scams, etc. Fraudesters can collude to commit illegal activities and strive to make it look normal so it can be difficult to detect. The most effective solutions that fights fraud use a multifaceted approaches that integrates several of techniques. One of these techniques is the use of graphs.\n", "\n", "Graphs allow us to understand the relationship between various entities and how they are connected together which help in detecting fraud patterns that couldn't be detected by traditional methods. In this workshop, we will go through building a ML model from a graph database and train a graph neural network to estimate the probability of fraud for a certain transaction." ] }, { "cell_type": "markdown", "id": "a09eed09", "metadata": {}, "source": [ "### 1- Restore Variables" ] }, { "cell_type": "code", "execution_count": null, "id": "14e451fb", "metadata": { "slideshow": { "slide_type": "fragment" } }, "outputs": [], "source": [ "%store -r" ] }, { "cell_type": "markdown", "id": "076567f8", "metadata": {}, "source": [ "### 3- Establish Connection with Neptune Graph\n", "\n", "The next cell of code will establish a connection with the Neptune graph DB using a python wrapper around Apache TinkerPop Gremlin. Apache TinkerPop is a graph computing framework for both graph databases (OLTP) and graph analytic systems (OLAP). Gremlin is the graph traversal language of TinkerPop. It is a functional, data-flow language that enables users to write complex traversals on (or queries of) their application’s property graph. \n", "\n", "Once we establish the remote graph connection, we can traverse through the graph and run different queries on the graph object" ] }, { "cell_type": "code", "execution_count": null, "id": "05c4c08e", "metadata": {}, "outputs": [], "source": [ "from __future__ import print_function # Python 2/3 compatibility\n", "\n", "from gremlin_python import statics\n", "from gremlin_python.structure.graph import Graph\n", "from gremlin_python.process.graph_traversal import __\n", "from gremlin_python.process.strategies import *\n", "from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection\n", "\n", "graph = Graph()\n", "\n", "remoteConn = DriverRemoteConnection('wss://'+NEPTUNE_ENDPOINT+':8182/gremlin','g')\n", "g = graph.traversal().withRemote(remoteConn)" ] }, { "cell_type": "markdown", "id": "bfb09daa", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "### 5- Reset the Neptune Database (Optional)\n", "\n", "If you created a new Neptune cluster for this excercise, no need to run this step. This step will make sure that the database is empty before populating it with the new data.\n", "\n", "#### 5.1- Initiate a DB reset" ] }, { "cell_type": "code", "execution_count": null, "id": "b6acea2e", "metadata": { "slideshow": { "slide_type": "fragment" } }, "outputs": [], "source": [ "%%bash -s \"$NEPTUNE_ENDPOINT\" --out RESPONSE\n", "\n", "awscurl -X POST \\\n", "-H 'Content-Type: application/json' https://$1:8182/system \\\n", "-d '{ \"action\" : \"initiateDatabaseReset\" }'" ] }, { "cell_type": "markdown", "id": "45c8d460", "metadata": {}, "source": [ "#### 5.2- Process the respose and get the token" ] }, { "cell_type": "code", "execution_count": null, "id": "0703b00f", "metadata": {}, "outputs": [], "source": [ "import ast\n", "reset_token = ast.literal_eval(RESPONSE)['payload']['token']" ] }, { "cell_type": "markdown", "id": "4bcec973", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "#### 5.3- Perform the DB Reset Using the Token\n", "\n", "Replace the Token ID below with the one from the output above. The next cell will initiate the DB reset" ] }, { "cell_type": "code", "execution_count": null, "id": "1f3103e5", "metadata": { "slideshow": { "slide_type": "fragment" } }, "outputs": [], "source": [ "%%bash -s \"$NEPTUNE_ENDPOINT\" \"$reset_token\"\n", "\n", "awscurl -X POST -H 'Content-Type: application/json' https://$1:8182/system -d '\n", "{ \n", "\"action\": \"performDatabaseReset\" ,\n", "\"token\" : \"'${2}'\"\n", "}'" ] }, { "cell_type": "markdown", "id": "b66fd0b1", "metadata": {}, "source": [ "#### 5.4- Scale up the Neptune Instance Size for the Export Process (if needed)\n", "The bulk loader uses most of the free CPU cycles available in the cluster. Scaling up the instance before ingesting the graph data will help make the process much faster." ] }, { "cell_type": "code", "execution_count": null, "id": "2ee15079", "metadata": {}, "outputs": [], "source": [ "!aws neptune modify-db-instance --db-instance-identifier $NEPTUNE_INSTANCE_ID --apply-immediately --db-instance-class db.r5.12xlarge" ] }, { "cell_type": "markdown", "id": "d6e4c56d", "metadata": {}, "source": [ "Now, you can go to the Neptune Cluster console and wait for the new larger instance to be added" ] }, { "cell_type": "markdown", "id": "be3385a5", "metadata": {}, "source": [ "### 6- Preparing the data" ] }, { "cell_type": "markdown", "id": "608a2f31", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "#### 6.1- Loading the data\n", "\n", "Amazon Neptune, has a Bulk Loader to ingest data into the db. In the next block of code, we will use the loader API and point to the location of the files to upload them to Neptune.\n", "\n", "In this example, we are using the `OVERSUBSCRIBE` parallelism parameter. This parameter sets the bulk loader to use all available CPU resources when it runs. It generally takes 60%-70% of CPU capacity to keep the operation running as fast as I/O constraints permit.\n", "\n", "\n", "Loading data from an Amazon Simple Storage Service (Amazon S3) bucket requires an AWS Identity and Access Management (IAM) role that has access to the bucket. Follow the instructions here: https://docs.aws.amazon.com/neptune/latest/userguide/bulk-load-tutorial-IAM.html. In this instance, we call it `LoadFromNeptune`" ] }, { "cell_type": "markdown", "id": "bcf8932e", "metadata": {}, "source": [ "#### 6.3- Upload the dataset to S3 bucket" ] }, { "cell_type": "code", "execution_count": null, "id": "25a6474a", "metadata": {}, "outputs": [], "source": [ "!aws s3 cp --recursive ./data/ s3://$BUCKET/$PREFIX/ --exclude \"*\" --include \"*_vertices.csv\"\n", "!aws s3 cp data/edges.csv s3://$BUCKET/$PREFIX/" ] }, { "cell_type": "markdown", "id": "cb4ca979", "metadata": {}, "source": [ "#### 6.4- Import the data into the Cluster " ] }, { "cell_type": "code", "execution_count": null, "id": "ff668b25", "metadata": { "slideshow": { "slide_type": "slide" } }, "outputs": [], "source": [ "%%sh -s \"$NEPTUNE_ENDPOINT\" \"$BUCKET\" \"$ACCOUNT_ID\" \"$REGION\" --out loadId\n", "\n", "awscurl -X POST -H 'Content-Type: application/json' https://$1:8182/loader -d '\n", " { \n", " \"region\" : \"'${4}'\", \n", " \"source\" : \"s3://'$2'/credit-transaction-fraud/\", \n", " \"format\" : \"csv\", \n", " \"iamRoleArn\" : \"arn:aws:iam::'$3':role/LoadFromNeptune\", \n", " \"parallelism\" : \"OVERSUBSCRIBE\",\n", " \"queueRequest\": \"TRUE\"\n", " }'" ] }, { "cell_type": "code", "execution_count": null, "id": "4a06e7c7", "metadata": {}, "outputs": [], "source": [ "load_id = ast.literal_eval(loadId)['payload']['loadId']" ] }, { "cell_type": "markdown", "id": "4314f550", "metadata": {}, "source": [ "#### 6.5 Get the status of the load\n", "\n", "Loading the data can take ~7 minutes." ] }, { "cell_type": "code", "execution_count": null, "id": "0765cc34", "metadata": {}, "outputs": [], "source": [ "%%bash -s \"$NEPTUNE_ENDPOINT\" \"$load_id\"\n", "\n", "awscurl -X GET 'https://'\"$1\"':8182/loader?loadId='$2''" ] }, { "cell_type": "markdown", "id": "cf1cf472", "metadata": {}, "source": [ "#### Verbose status information\n", "\n", "If the bulk load failed for any reason, you can get more details on the error and location of the logs from the command below" ] }, { "cell_type": "code", "execution_count": null, "id": "1a87dcab", "metadata": {}, "outputs": [], "source": [ "!awscurl -X GET 'https://$NEPTUNE_ENDPOINT:8182/loader/'$load_id'?details=true&errors=true&page=1&errorsPerPage=3'" ] }, { "cell_type": "markdown", "id": "a24ee64d", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "#### Drop 10% of the fraud labels\n", "\n", "**NOTE: You must wait for the bulk load from previous step to complete first before dropping the fraud labels**\n", "\n", "Once the data ingestion is complete, we need to simulate entities with no labels so that the algorithm learn their label during training. In the next cell, we drop 10% of the transactions' labels so that the graph can infer this 10% after training the graph" ] }, { "cell_type": "code", "execution_count": null, "id": "23147be5", "metadata": {}, "outputs": [], "source": [ "#pick a random range of transactions\n", "ids = [*range(2987000, 2992000)]\n", "idss = [str(id) for id in ids]\n", "\n", "#Save their values before dropping them\n", "fraud_labels = g.V(idss).hasLabel('Transaction').valueMap('isFraud').toList()\n", "\n", "#drop their fraud labels\n", "g.V(idss).hasLabel('Transaction').properties('isFraud').drop().toList()" ] }, { "cell_type": "markdown", "id": "d38da067", "metadata": {}, "source": [ "#### Count the entities with no labels" ] }, { "cell_type": "code", "execution_count": null, "id": "1d6c97b8", "metadata": { "slideshow": { "slide_type": "skip" } }, "outputs": [], "source": [ "g.V().hasLabel('Transaction').hasNot('isFraud').count().toList()" ] }, { "cell_type": "markdown", "id": "afdf34d8", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "### 7- Preparing for Export\n", "\n", "Neptune ML requires that you provide training data for the Deep Graph Library (DGL) to create and test models using Amazon SageMaker in your account. To do this, you can export data from Neptune using an open-source tool named [neptune-export](https://github.com/awslabs/amazon-neptune-tools/tree/master/neptune-export). \n", "\n", "You can use the tool either as a service (the Neptune-Export service) or as the Java neptune-export command line tool. The next block of code shows how to trigger the Neptune export through the API\n", "\n", "In the export command, we can pass parameters in the additionalParams field to guide the creation of a training data configuration file." ] }, { "cell_type": "markdown", "id": "8e6ca744", "metadata": {}, "source": [ "#### 7.1- Invoke the export process" ] }, { "cell_type": "code", "execution_count": null, "id": "6b26bb87", "metadata": {}, "outputs": [], "source": [ "%%bash -s \"$NEPTUNE_ENDPOINT\" --out response \n", "\n", "awscurl --region us-east-2 -X POST -H 'Content-Type: application/json' -d ' \n", " { \"command\": \"export-pg\", \n", " \"params\": { \n", " \"endpoint\": \"\",\n", " \"cloneCluster\": false,\n", " \"cloneClusterInstanceType\": \"r5.8xlarge\"\n", " },\n", " \n", " \"additionalParams\": {\n", " \"neptune_ml\": {\n", " \"version\": \"v2.0\",\n", " \"split_rate\": [0.8,0.1,0.1],\n", " \"targets\": [\n", " {\n", " \"node\": \"Transaction\",\n", " \"property\": \"isFraud\",\n", " \"type\": \"classification\"\n", " }\n", " ]\n", " }\n", " },\n", " \"outputS3Path\": \"s3:///neptune-export\", \n", " \"jobSize\": \"medium\" }' \n", "\n", " " ] }, { "cell_type": "markdown", "id": "b2515051", "metadata": {}, "source": [ "#### 7.2- Get the job ID from the Previous Job" ] }, { "cell_type": "code", "execution_count": null, "id": "f0d21a7a", "metadata": {}, "outputs": [], "source": [ "import ast\n", "jobId = ast.literal_eval(response)['jobId']" ] }, { "cell_type": "markdown", "id": "8928a046", "metadata": {}, "source": [ "#### 7.3- Check the Status of the Export Job\n", "\n", "The export job above will spin up an instance and create a clone for the Neptune cluster to avoid disrubting the cluster. The clone will be teared down once the export job is complete. Wait until the export job status is **Successful** before proceeding with the next steps" ] }, { "cell_type": "code", "execution_count": null, "id": "67adf236", "metadata": {}, "outputs": [], "source": [ "%%bash -s \"$jobId\" --out export_response\n", "\n", "awscurl --region us-east-2 https://r7zvc0y2ji.execute-api.us-east-2.amazonaws.com/Deployment/neptune-export/$1" ] }, { "cell_type": "code", "execution_count": null, "id": "4ed873dc", "metadata": {}, "outputs": [], "source": [ "ast.literal_eval(export_response)" ] }, { "cell_type": "markdown", "id": "1d5e70dc", "metadata": {}, "source": [ "Now we wait until the status of the export job is completed successfully" ] }, { "cell_type": "markdown", "id": "fb8743ef", "metadata": {}, "source": [ "#### 7.4 Get the Output S3 Location" ] }, { "cell_type": "code", "execution_count": null, "id": "e9eec1f4", "metadata": {}, "outputs": [], "source": [ "outputS3Uri = ast.literal_eval(export_response)['outputS3Uri']" ] }, { "cell_type": "markdown", "id": "3a2fd9bd", "metadata": {}, "source": [ "#### 7.5 Examine the Training Configurations File" ] }, { "cell_type": "code", "execution_count": null, "id": "1416aa9d", "metadata": {}, "outputs": [], "source": [ "import json\n", "!aws s3 cp $outputS3Uri/training-data-configuration.json ./\n", "\n", "with open('training-data-configuration.json', 'r') as handle:\n", " parsed = json.load(handle)\n", "parsed " ] }, { "cell_type": "markdown", "id": "87e4f49c", "metadata": {}, "source": [ "Neptune ML infers the data types of the entities and its properties automatically but you can also set them manually in the training configurations file. We've already modified some of the data types in the configuration file and defined some pre-processing steps that will be handled by Neptune ML " ] }, { "cell_type": "markdown", "id": "f8aabc7b", "metadata": {}, "source": [ "#### 7.6 Copy the JSON file to examine its content" ] }, { "cell_type": "code", "execution_count": null, "id": "75d7e499", "metadata": {}, "outputs": [], "source": [ "!aws s3 cp $outputS3Uri/training-data-configuration.json ./" ] }, { "cell_type": "markdown", "id": "4ecd5050", "metadata": {}, "source": [ "#### 7.7 Copy the training configurations file to S3 output location\n", "\n", "After modifying any necessary fields, upload the file back to the S3 output location" ] }, { "cell_type": "code", "execution_count": null, "id": "f04a7740", "metadata": {}, "outputs": [], "source": [ "!aws s3 cp ./training-data-configuration.json $outputS3Uri/" ] }, { "cell_type": "markdown", "id": "343016d5", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "### 8- Model training\n" ] }, { "cell_type": "markdown", "id": "25d5ccd7", "metadata": {}, "source": [ "Model training in Neptune ML is a 2-step process: The first step is to run a SageMaker Processing job to carry out any data pre-processing needed before training - such as categorical features encoding, data imputation, numerical features scaling, etc.\n", "\n", "#### 8.1 Data Pre-processing and Feature Engineering\n", "##### 8.1.1 Define the Training and Processing IDs" ] }, { "cell_type": "code", "execution_count": null, "id": "0fe23c59", "metadata": {}, "outputs": [], "source": [ "import time\n", "epoch_time = int(time.time())\n", "TRAINING_ID = 'data-training-' + str(epoch_time)\n", "PROCESSING_ID = 'data-processing-' + str(epoch_time)\n", "ENDPOINT_ID = 'endpoint-' + str(epoch_time)" ] }, { "cell_type": "markdown", "id": "e44e1060", "metadata": {}, "source": [ "##### 8.1.2 Invoke the Data Processing Job" ] }, { "cell_type": "code", "execution_count": null, "id": "cce83eff", "metadata": { "slideshow": { "slide_type": "fragment" } }, "outputs": [], "source": [ "%%bash -s \"$NEPTUNE_ENDPOINT\" \"$REGION\" \"$BUCKET\" \"$outputS3Uri\" \"$TRAINING_ID\" \"$PROCESSING_ID\"\n", "\n", "awscurl --region $2 --service neptune-db -X POST https://$1:8182/ml/dataprocessing -H 'Content-Type: application/json' -d '\n", " {\n", " \"inputDataS3Location\" : \"'${4}'/\",\n", " \"id\" : \"'${6}'\",\n", " \"processedDataS3Location\" : \"s3://'${3}'/neptune-export/output/\",\n", " \"processingInstanceType\": \"ml.r5.16xlarge\"\n", " }'" ] }, { "cell_type": "markdown", "id": "ac29827f", "metadata": {}, "source": [ "##### 8.1.3 Check the Processing Job Status" ] }, { "cell_type": "code", "execution_count": null, "id": "528be2dc", "metadata": {}, "outputs": [], "source": [ "%%bash -s \"$NEPTUNE_ENDPOINT\" \"$PROCESSING_ID\" --out preprocess_response\n", "\n", "curl -s https://${1}:8182/ml/dataprocessing/${2}" ] }, { "cell_type": "code", "execution_count": null, "id": "978a591c", "metadata": {}, "outputs": [], "source": [ "ast.literal_eval(preprocess_response)" ] }, { "cell_type": "markdown", "id": "1f73c10a", "metadata": {}, "source": [ "#### 8.2 Examine the generated HPO Config file" ] }, { "cell_type": "code", "execution_count": null, "id": "ae650b2c", "metadata": {}, "outputs": [], "source": [ "preprocess_location = ast.literal_eval(preprocess_response)['processingJob']['outputLocation']\n", "!aws s3 cp $preprocess_location/model-hpo-configuration.json ./config/\n", "with open('model-hpo-configuration.json', 'r') as handle:\n", " parsed = json.load(handle)\n", "parsed " ] }, { "cell_type": "markdown", "id": "52e17f82", "metadata": {}, "source": [ "##### 8.2.1 Let's Change some HPs" ] }, { "cell_type": "code", "execution_count": null, "id": "562a2698", "metadata": {}, "outputs": [], "source": [ "HPO_file = open(\"model-hpo-configuration.json\", \"r\")\n", "HPO_JSON = json.load(HPO_file)\n", "\n", "#change the objective metric to ROC AUC\n", "HPO_JSON[\"models\"][0]['eval_metric']['metric'] = 'roc_auc'\n", "\n", "#change the frequency of evaluation to 3 epochs instead of 1\n", "HPO_JSON[\"models\"][0]['eval_frequency']['value'] = '3'\n", "\n", "HPO_file = open(\"model-hpo-configuration.json\", \"w\")\n", "json.dump(HPO_JSON, HPO_file)\n", "HPO_file.close()\n", "\n", "#upload the new model HPO configuration file to S3 processing output location\n", "!aws s3 cp config/model-hpo-configuration.json $preprocess_location/" ] }, { "cell_type": "markdown", "id": "518d9693", "metadata": {}, "source": [ "#### 8.2 Train the Model" ] }, { "cell_type": "code", "execution_count": null, "id": "88900bf5", "metadata": { "slideshow": { "slide_type": "fragment" } }, "outputs": [], "source": [ "%%bash -s \"$NEPTUNE_ENDPOINT\" \"$REGION\" \"$BUCKET\" \"$TRAINING_ID\" \"$PROCESSING_ID\"\n", "\n", "awscurl --region $2 --service neptune-db -X POST https://$1:8182/ml/modeltraining -H 'Content-Type: application/json' -d '\n", " {\n", " \"id\" : \"'${4}'\",\n", " \"dataProcessingJobId\" : \"'${5}'\",\n", " \"trainModelS3Location\" : \"s3://'${3}'/neptune-export/neptune-model-graph-autotrainer\",\n", " \"trainingInstanceType\" : \"ml.p3.2xlarge\",\n", " \"maxHPONumberOfTrainingJobs\": 2\n", " }'" ] }, { "cell_type": "code", "execution_count": null, "id": "f8ac5930", "metadata": {}, "outputs": [], "source": [ "%%bash -s \"$NEPTUNE_ENDPOINT\" \"$TRAINING_ID\" --out training_response\n", "\n", "curl -s https://${1}:8182/ml/modeltraining/${2}" ] }, { "cell_type": "code", "execution_count": null, "id": "9cfdd31e", "metadata": {}, "outputs": [], "source": [ "ast.literal_eval(training_response)" ] }, { "cell_type": "markdown", "id": "177a4030", "metadata": {}, "source": [ "### 9- Store the Variables" ] }, { "cell_type": "code", "execution_count": null, "id": "ba118446", "metadata": {}, "outputs": [], "source": [ "%store BUCKET\n", "%store REGION\n", "%store ACCOUNT_ID\n", "%store PREFIX\n", "%store outputS3Uri\n", "%store fraud_labels\n", "%store idss\n", "%store NEPTUNE_ENDPOINT\n", "%store NEPTUNE_LOAD_ROLE\n", "%store PROCESSING_ID\n", "%store TRAINING_ID\n", "%store ENDPOINT_ID" ] }, { "cell_type": "code", "execution_count": null, "id": "b2178d53", "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "celltoolbar": "Slideshow", "kernelspec": { "display_name": "conda_python3", "language": "python", "name": "conda_python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.13" } }, "nbformat": 4, "nbformat_minor": 5 }