{ "cells": [ { "cell_type": "markdown", "source": [ "# Amazon Fraud Detector - Send Events Example\n", "\n", "\n", "Once you’ve defined an event, you can start to ingest your transactions to fraud detector. This is an example notebook shows you how to stream your events from a file stored in S3 to Amazon Fraud Detector.\n", "\n", "### Setup permissions\n", "------\n", "\n", "First, setup your AWS credentials so that Fraud Detector can store and access training data. See more details: https://docs.aws.amazon.com/frauddetector/latest/ug/set-up.html\n", "\n", "To use Amazon Fraud Detector, you have to set up permissions that allow access to the Amazon Fraud Detector console and API operations. You also have to allow Amazon Fraud Detector to perform tasks on your behalf and to access resources that you own. We recommend creating an AWS Identify and Access Management (IAM) user with access restricted to Amazon Fraud Detector operations and required permissions. You can add other permissions as needed. If you are using SageMaker Notebook Instance, attach the two policies __*AmazonFraudDetectorFullAccessPolicy*__ and __*AmazonS3FullAccess*__ to the Instance's IAM role and restart your kernel.\n", "\n", "\n", "### Preparation\n", "-----\n", "\n", "Before this step, you should have:\n", "1. created a S3 bucket and upload the csv file you want to send to Amazon Fraud Detector
\n", "https://docs.aws.amazon.com/frauddetector/latest/ug/step-1-get-s3-data.html\n", "2. defined your Entity and Event on Amazon Fraud Detector console
\n", "https://docs.aws.amazon.com/frauddetector/latest/ug/define-event.html\n", "\n", "\n", "
💡 Concepts \n", "\n", "- ENTITY represents the \"what or who\" that is performing the event you want to evaluate\n", "- EVENT is a business activity that you want evaluated for fraud risk \n", "\n", "
\n", "\n" ], "metadata": {} }, { "cell_type": "code", "execution_count": 1, "source": [ "# -- Import packages -- \n", "import os\n", "import logging\n", "import boto3\n", "import pandas as pd\n", "import numpy as np\n", "from datetime import datetime\n", "from multiprocessing import Pool\n", "\n", "# -- Display --\n", "from IPython.core.display import display, HTML\n", "from IPython.display import clear_output\n", "display(HTML(\"\"))" ], "outputs": [ { "output_type": "display_data", "data": { "text/html": [ "" ], "text/plain": [ "" ] }, "metadata": {} } ], "metadata": {} }, { "cell_type": "code", "execution_count": 9, "source": [ "# -- initialize the AFD client \n", "client = boto3.client('frauddetector')" ], "outputs": [], "metadata": { "scrolled": true } }, { "cell_type": "markdown", "source": [ "\n", "### Setup Notebook\n", "-----\n", "\n", "1. Plug in your S3 Bucket and CSV File \n", "2. Plug in the major components of Fraud Detector you have defined on the console \n", "3. Plug in the column names of the major components\n", "\n", "
💡 Plug in Column Names of the Major Components: \n", " \n", "- ENTITY_ID_COL is the column name of entity id, e.g. \"customer_id\"\n", "- EVENT_ID_COL is the column name of event id, e.g. \"transaction_id\"\n", "- TIMESTAMP_COL is the column name of event timestamp (when the event happens), e.g. \"EVENT_TIMESTAMP\"\n", "- LABEL_COL is the column name of the target variable, e.g. \"EVENT_LABEL\"\n", "
" ], "metadata": {} }, { "cell_type": "code", "execution_count": 3, "source": [ "# -- this is all you need to fill out, once complete simply interactively run each code cell -- \n", "S3_BUCKET = \"your-s3-bucket\" \n", "S3_FILE = \"path-to-your-file\" \n", "\n", "EVENT_TYPE = \"event-type-defined-on-console\"\n", "\n", "ENTITY_ID_COL = \"column-name-of-entity-id\" \n", "EVENT_ID_COL = \"column-name-of-event-id\" \n", "TIMESTAMP_COL = \"column-name-of-event-timestamp\"\n", "LABEL_COL = \"column-name-of-event-label\" " ], "outputs": [], "metadata": {} }, { "cell_type": "markdown", "source": [ "\n", "### Load Your Dataset from S3\n", "-----" ], "metadata": {} }, { "cell_type": "code", "execution_count": 5, "source": [ "# -- connect to S3, snag file, and convert to a panda's dataframe --\n", "s3 = boto3.resource('s3')\n", "obj = s3.Object(S3_BUCKET, S3_FILE)\n", "body = obj.get()['Body']\n", "df = pd.read_csv(body, dtype=object)\n", "\n", "# -- convert the event timestamp to standard format --\n", "df[TIMESTAMP_COL] = pd.to_datetime(df[TIMESTAMP_COL]).apply(lambda x: x.strftime(\"%Y-%m-%dT%H:%M:%SZ\"))\n", "\n", "print(\"Number of events to send:\", df.shape[0])" ], "outputs": [ { "output_type": "stream", "name": "stdout", "text": [ "Number of events to send: 116715\n" ] } ], "metadata": {} }, { "cell_type": "markdown", "source": [ "\n", "### Send Events\n", "-----\n", "\n", "The following section will turn on the eventIngestion and automatically stream your data from s3 bucket to Amazon Fraud Detector.\n", "\n", "_**Note**: the following code use multiprocessing package to parallelize the_ `send_event` _call_" ], "metadata": {} }, { "cell_type": "code", "execution_count": 6, "source": [ "#record_count = 5000 # -- send 5000 records for testing -- \n", "record_count = df.shape[0] # -- uncomment this to run all of the records -- \n", "\n", "# -- check the eventType, if eventIngestion is disabled, enable it before send events\n", "response = client.get_event_types(name = EVENT_TYPE)\n", "event_variables = response['eventTypes'][0]['eventVariables']\n", "event_labels = response['eventTypes'][0]['labels']\n", "entity_type = response['eventTypes'][0]['entityTypes'][0]\n", "\n", "if response['eventTypes'][0]['eventIngestion'] != 'ENABLED':\n", " response = client.put_event_type (\n", " name = EVENT_TYPE,\n", " eventVariables = event_variables,\n", " labels = event_labels,\n", " eventIngestion = 'ENABLED',\n", " entityTypes = [entity_type])\n", " response = client.get_event_types(name = EVENT_TYPE)\n", "print('Event Ingestion of eventType \"{0}\" has been {1}.'.format(EVENT_TYPE, response['eventTypes'][0]['eventIngestion']))" ], "outputs": [ { "output_type": "stream", "name": "stdout", "text": [ "Event Ingestion of eventType \"transaction_event\" has been ENABLED.\n" ] } ], "metadata": {} }, { "cell_type": "code", "execution_count": 7, "source": [ "%%time\n", "def _send_event(record):\n", " event_id = str(record[0])\n", " entity_id = str(record[1])\n", " event_timestamp = str(record[2])\n", " event_label = record[3]\n", " \n", " try:\n", " rec_content = {event_variables[i]: record[4:][i] for i in range(len(event_variables)) if pd.isnull(record[4+i])==False} \n", " if pd.isnull(event_label):\n", " # If event label is missing, do not assign label \n", " response = client.send_event(\n", " eventId = event_id,\n", " eventTypeName = EVENT_TYPE,\n", " eventTimestamp = event_timestamp,\n", " eventVariables = rec_content,\n", " entities = [\n", " {\n", " 'entityType': entity_type,\n", " 'entityId': entity_id\n", " },\n", " ]\n", " )\n", " else:\n", " response = client.send_event(\n", " eventId = event_id,\n", " eventTypeName = EVENT_TYPE,\n", " eventTimestamp = event_timestamp,\n", " eventVariables = rec_content,\n", " assignedLabel = str(event_label),\n", " labelTimestamp = event_timestamp,\n", " entities = [\n", " {\n", " 'entityType': entity_type,\n", " 'entityId': entity_id\n", " },\n", " ]\n", " )\n", " http_status_code = response['ResponseMetadata']['HTTPStatusCode']\n", " if http_status_code != 200:\n", " logging.error('EventId: {0}, httpStatusCode: {1}'.format(event_id, str(http_status_code)))\n", " except Exception as e:\n", " logging.error('EventId: {0}, Failed to send_event: {1}'.format(event_id, str(e)))\n", "\n", " \n", "# -- send events in parallel --\n", "cols_keep = [EVENT_ID_COL, ENTITY_ID_COL, TIMESTAMP_COL, LABEL_COL] + event_variables\n", "df_list = df.iloc[0:record_count,:]\n", "df_list = df_list[cols_keep].values.tolist()\n", "with Pool(processes = 8) as p:\n", " result = p.map(_send_event, df_list)" ], "outputs": [ { "output_type": "stream", "name": "stdout", "text": [ "CPU times: user 1.67 s, sys: 334 ms, total: 2.01 s\n", "Wall time: 14min 57s\n" ] } ], "metadata": { "scrolled": true } }, { "cell_type": "markdown", "source": [ "#### After sending all your events, we recommend you to wait for 10-20 mins to ensure the events are fully ingested by the system. Then, you can go back to console to create and train your model.\n" ], "metadata": {} }, { "cell_type": "markdown", "source": [ "### (Optional) Delete All Events by Event Type\n", "----\n", "If you do not want to use an event type anymore and want to delete all events of this event type, you can use `deleteEventsByEventType` API by running following:\n", "\n", "```python\n", "# delete all the events with the event type \n", "client.delete_events_by_event_type(eventTypeName = EVENT_TYPE)\n", "# if you see the status is COMPLETE, go to step 3.\n", "client.get_delete_events_by_event_type_status(eventTypeName = EVENT_TYPE)\n", "# delete the event type\n", "client.delete_event_type(name = EVENT_TYPE)\n", "```\n", "\n", "_**Note**: If you have created any models, detectors or rules with this event type, you need to delete those associated objects first._\n", "\n" ], "metadata": {} }, { "cell_type": "code", "execution_count": null, "source": [], "outputs": [], "metadata": {} } ], "metadata": { "kernelspec": { "display_name": "conda_mxnet_p36", "language": "python", "name": "conda_mxnet_p36" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.13" } }, "nbformat": 4, "nbformat_minor": 4 }