{ "cells": [ { "cell_type": "markdown", "id": "203c59ca", "metadata": {}, "source": [ "Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.\n", "\n", "SPDX-License-Identifier: Apache-2.0" ] }, { "cell_type": "markdown", "id": "be84818d", "metadata": {}, "source": [ "# This notebook walks through intermediate results for data processing on Reddit user-behavior data" ] }, { "cell_type": "markdown", "id": "166d8a80", "metadata": {}, "source": [ "## Table of contents\n", "1. Download Reddit comments dataset from PushShift.io for May 2008\n", "2. Set rules for anomalous vs benign users along with data processing\n", "3. Generate author/user labels and save to a csv file\n", "4. Generate user and subreddit index files\n", "5. Save edgelist data as csv file\n", "6. Train/validation/test split\n", "7. Get node features using NLP models" ] }, { "cell_type": "code", "execution_count": null, "id": "88a28db8", "metadata": {}, "outputs": [], "source": [ "import json\n", "import pandas as pd\n", "import os \n", "import sys" ] }, { "cell_type": "code", "execution_count": null, "id": "21dc1334", "metadata": {}, "outputs": [], "source": [ "sys.path.append('../../src/')" ] }, { "cell_type": "markdown", "id": "84c53674", "metadata": {}, "source": [ "### 1. Download Reddit dataset and save it in a dataframe " ] }, { "cell_type": "code", "execution_count": null, "id": "5bb8a451", "metadata": {}, "outputs": [], "source": [ "reddit_raw_data_file_path = '../../data/01_raw/user_behavior/RC_2008-05.zst'" ] }, { "cell_type": "code", "execution_count": null, "id": "ac73d56a", "metadata": {}, "outputs": [], "source": [ "records = map(json.loads, open(reddit_raw_data_file_path.rstrip(\".zst\"), encoding=\"utf8\"))\n", "df = pd.DataFrame.from_records(records)" ] }, { "cell_type": "code", "execution_count": null, "id": "b2856db9", "metadata": {}, "outputs": [], "source": [ "df.head(10)" ] }, { "cell_type": "code", "execution_count": null, "id": "0cb9e61c", "metadata": {}, "outputs": [], "source": [ "df.info()" ] }, { "cell_type": "markdown", "id": "a5783b0c", "metadata": {}, "source": [ "### Observation about the data:\n", "1. There are 536380 rows and 20 columns where each row is an unique post with 20 attributes/columns related to that comment\n", "2. Most important attributes include author, sub-reddit, body and score. Body is the comment thread content, and score is the total votes received on Reddit (1 for one upvote and -1 for downvote). Each record represents one author posts something (body) related to the sub-reddit topic. \n", "3. Each unique author can have multiple comments across more than one subreddit with varying scores for each comment\n" ] }, { "cell_type": "markdown", "id": "bdfb57da", "metadata": {}, "source": [ "### 2. Data processing" ] }, { "cell_type": "markdown", "id": "a7e43bd0", "metadata": {}, "source": [ "#### Data processing steps to get input for ELAND model. Steps include:\n", "1. Drop records of absolute scores lesser than 10\n", "2. Drop user if they have posted less than 10 times\n", "3. Drop users that are [deleted]\n", "\n", "#### We don't have ground truth labels for training the model. To generate labels on users that are neeeded for next step, we used a rule to group users into either benign and anomalous users based on their posts scores stats. \n", " - Anomalous user: An author who has commented atleast 10 times and every score of theirs is lesser than or equal to -10\n", " - Benign user: An author who has commented atleast 10 times and every score of theirs is greater than or equal to 10" ] }, { "cell_type": "code", "execution_count": null, "id": "7ab02791", "metadata": {}, "outputs": [], "source": [ "#Drop records if their absoulte value of score is lesser than 10\n", "df_score = df.drop(df[abs(df.score) < 10].index)" ] }, { "cell_type": "code", "execution_count": null, "id": "df290b2c", "metadata": {}, "outputs": [], "source": [ "df.shape, df_score.shape #a lot of comments with less than score of 10" ] }, { "cell_type": "code", "execution_count": null, "id": "ba7cf919", "metadata": {}, "outputs": [], "source": [ "#check lowest score and highest score\n", "df_score.score.min(), df_score.score.max()" ] }, { "cell_type": "code", "execution_count": null, "id": "4a697f51", "metadata": {}, "outputs": [], "source": [ "df_score['author'].value_counts()" ] }, { "cell_type": "code", "execution_count": null, "id": "425382cd", "metadata": {}, "outputs": [], "source": [ "df_score['subreddit'].value_counts()" ] }, { "cell_type": "code", "execution_count": null, "id": "6095c965", "metadata": {}, "outputs": [], "source": [ "#Drop user if they have posted less than 10 times\n", "counts = df_score['author'].value_counts()\n", "res = df_score[~df_score['author'].isin(counts[counts < 10].index)]\n", "\n", "#Drop users that are [deleted]\n", "res = res.drop(res[res.author=='[deleted]'].index)" ] }, { "cell_type": "code", "execution_count": null, "id": "c0d0e489", "metadata": {}, "outputs": [], "source": [ "res['author'].value_counts()" ] }, { "cell_type": "code", "execution_count": null, "id": "2bf096dc", "metadata": {}, "outputs": [], "source": [ "#Number of unique users\n", "len(res.author.unique())" ] }, { "cell_type": "markdown", "id": "8095eebd", "metadata": {}, "source": [ "## Create user labels" ] }, { "cell_type": "code", "execution_count": null, "id": "d8f48182", "metadata": {}, "outputs": [], "source": [ "benign = pd.DataFrame()\n", "anomaly = pd.DataFrame()" ] }, { "cell_type": "code", "execution_count": null, "id": "8a87bc22", "metadata": {}, "outputs": [], "source": [ "benign = benign.append(res)\n", "print(benign.shape)" ] }, { "cell_type": "code", "execution_count": null, "id": "04aae8a7", "metadata": {}, "outputs": [], "source": [ "#remove records that score less than 10 \n", "benign = benign.drop(benign[benign.score < 10].index)" ] }, { "cell_type": "code", "execution_count": null, "id": "c5bac6cf", "metadata": {}, "outputs": [], "source": [ "#check one example of benign author\n", "benign.loc[benign['author'] == 'jonknee'].T" ] }, { "cell_type": "code", "execution_count": null, "id": "79eae1c3", "metadata": {}, "outputs": [], "source": [ "##Anomalous author\n", "anomaly = anomaly.append(res)\n", "\n", "#Remove records with score larger than -10 \n", "anomaly = anomaly.drop(anomaly[anomaly.score > -10].index)" ] }, { "cell_type": "code", "execution_count": null, "id": "8ad70413", "metadata": {}, "outputs": [], "source": [ "#Example author\n", "anomaly.loc[anomaly['author'] == 'I_AM_A_NEOCON']" ] }, { "cell_type": "code", "execution_count": null, "id": "21eb530b", "metadata": {}, "outputs": [], "source": [ "#Same author can have high score comments and low score comments at the same time \n", "benign.loc[benign['author'] == 'I_AM_A_NEOCON']" ] }, { "cell_type": "code", "execution_count": null, "id": "ff6c6df2", "metadata": {}, "outputs": [], "source": [ "anomaly_author_names = anomaly.author.unique()\n", "benign_author_names = benign.author.unique()" ] }, { "cell_type": "code", "execution_count": null, "id": "5c95c6fc", "metadata": {}, "outputs": [], "source": [ "def common_member(a, b):\n", " \"\"\"check common elements of a and b\"\"\"\n", " a_set = set(a)\n", " b_set = set(b)\n", " \n", " if (a_set & b_set):\n", " return (a_set & b_set)\n", " else:\n", " print(\"No common elements\")" ] }, { "cell_type": "code", "execution_count": null, "id": "afb0ef18", "metadata": {}, "outputs": [], "source": [ "#Remove authors that overlap in benign and anomalous\n", "overlap_authors = common_member(benign_author_names, anomaly_author_names)\n", "len(overlap_authors)" ] }, { "cell_type": "code", "execution_count": null, "id": "e930b1a5", "metadata": {}, "outputs": [], "source": [ "benign = benign[~benign['author'].isin(overlap_authors)]\n", "benign_author_names = benign.author.unique()\n", "print(\"Number of benign users: \", len(benign.author.unique()))\n", "print(\"Number of anomalous users: \", len(anomaly.author.unique()))" ] }, { "cell_type": "markdown", "id": "b36ef710", "metadata": {}, "source": [ "### 3. Generate author/user labels and save to a csv file" ] }, { "cell_type": "code", "execution_count": null, "id": "35b2de67", "metadata": {}, "outputs": [], "source": [ "benign_user_label = pd.DataFrame()\n", "benign_user_label['author'] = benign_author_names\n", "benign_user_label['label'] = 0 #0 as benign user\n", "anomalous_user_label = pd.DataFrame()\n", "anomalous_user_label['author'] = anomaly_author_names\n", "anomalous_user_label['label'] = 1" ] }, { "cell_type": "code", "execution_count": null, "id": "7a9a94f4", "metadata": {}, "outputs": [], "source": [ "benign_user_label.shape, anomalous_user_label.shape" ] }, { "cell_type": "code", "execution_count": null, "id": "e607dd4c", "metadata": {}, "outputs": [], "source": [ "benign_user_label.head(2)" ] }, { "cell_type": "code", "execution_count": null, "id": "8ea69f38", "metadata": {}, "outputs": [], "source": [ "anomalous_user_label.head(2)" ] }, { "cell_type": "code", "execution_count": null, "id": "610d0b6c", "metadata": {}, "outputs": [], "source": [ "user_label = pd.concat([benign_user_label, anomalous_user_label])" ] }, { "cell_type": "code", "execution_count": null, "id": "dfd762c7", "metadata": {}, "outputs": [], "source": [ "# Save user label\n", "user_label_filepath = '../../data/02_intermediate/user_behavior/user_labels.csv'" ] }, { "cell_type": "code", "execution_count": null, "id": "91f26847", "metadata": {}, "outputs": [], "source": [ "from anomaly_detection_spatial_temporal_data.utils import ensure_directory" ] }, { "cell_type": "code", "execution_count": null, "id": "9648006a", "metadata": {}, "outputs": [], "source": [ "ensure_directory(user_label_filepath)\n", "user_label.to_csv(user_label_filepath, index=False)" ] }, { "cell_type": "markdown", "id": "fc456a03", "metadata": {}, "source": [ "### 4. Generate user and subreddit index files" ] }, { "cell_type": "markdown", "id": "d848dd49", "metadata": {}, "source": [ "#### Each subreddit topic is given an index and saved as a pickle file. We will be naming the file p2index.pkl\n", "#### Each author is also given an index and saved as a pickle file. We will be naming the file u2index.pkl" ] }, { "cell_type": "code", "execution_count": null, "id": "7c4627d0", "metadata": {}, "outputs": [], "source": [ "benign_prod_names = benign.subreddit.unique()\n", "benign_prod_names = benign_prod_names.tolist()\n", "\n", "anomaly_prod_names = anomaly.subreddit.unique()\n", "anomaly_prod_names = anomaly_prod_names.tolist()" ] }, { "cell_type": "code", "execution_count": null, "id": "c0517379", "metadata": {}, "outputs": [], "source": [ "total_prod_names = benign_prod_names + anomaly_prod_names\n", "total_prod_names = sorted(list(set(total_prod_names)))" ] }, { "cell_type": "code", "execution_count": null, "id": "6aa94b90", "metadata": {}, "outputs": [], "source": [ "p2index={}\n", "count = 0\n", "for subreddit in total_prod_names:\n", " p2index[subreddit]=count\n", " count+=1" ] }, { "cell_type": "code", "execution_count": null, "id": "50d7baa1", "metadata": {}, "outputs": [], "source": [ "total_author_names = benign_author_names.tolist() + anomaly_author_names.tolist()\n", "total_author_names = sorted(list(set(total_author_names)))" ] }, { "cell_type": "code", "execution_count": null, "id": "829004fa", "metadata": {}, "outputs": [], "source": [ "u2index={}\n", "count = 0\n", "for author in total_author_names:\n", " u2index[author]=count\n", " count+=1" ] }, { "cell_type": "markdown", "id": "d11fd5c4", "metadata": {}, "source": [ "### Save the index mapping for author/user and subreddit topic " ] }, { "cell_type": "code", "execution_count": null, "id": "000ac31e", "metadata": {}, "outputs": [], "source": [ "import pickle\n", "with open(\"../../data/02_intermediate/user_behavior/u2index.pkl\",\"wb\") as f:\n", " pickle.dump(u2index, f)" ] }, { "cell_type": "code", "execution_count": null, "id": "430e3f05", "metadata": {}, "outputs": [], "source": [ "with open(\"../../data/02_intermediate/user_behavior/p2index.pkl\",\"wb\") as f:\n", " pickle.dump(p2index,f)" ] }, { "cell_type": "markdown", "id": "fab0268e", "metadata": {}, "source": [ "### 5. Save edge list as csv file" ] }, { "cell_type": "code", "execution_count": null, "id": "34a38c22", "metadata": {}, "outputs": [], "source": [ "benign.shape, anomaly.shape" ] }, { "cell_type": "code", "execution_count": null, "id": "52203248", "metadata": {}, "outputs": [], "source": [ "edgelist_df = benign.append(anomaly, ignore_index=True)\n", "edgelist_df = edgelist_df.sort_values(by = 'retrieved_on')\n", "print(edgelist_df.shape)" ] }, { "cell_type": "code", "execution_count": null, "id": "009968ca", "metadata": {}, "outputs": [], "source": [ "edgelist_df[['author','subreddit','retrieved_on']].head(10)" ] }, { "cell_type": "code", "execution_count": null, "id": "44560bc6", "metadata": {}, "outputs": [], "source": [ "edge_list_file_path = \"../../data/02_intermediate/user_behavior/edge_list.csv\"\n", "edgelist_df[['author','subreddit','retrieved_on']].to_csv(edge_list_file_path, index=False)" ] }, { "cell_type": "markdown", "id": "09ba034b", "metadata": {}, "source": [ "### 6. Train/validation/test split " ] }, { "cell_type": "code", "execution_count": null, "id": "eaba6b83", "metadata": {}, "outputs": [], "source": [ "import random\n", "\n", "def generate_n_lists(num_of_lists, num_of_elements, value_from=0, value_to=100):\n", " s = random.sample(range(value_from, value_to + 1), num_of_lists * num_of_elements)\n", " return [s[i*num_of_elements:(i+1)*num_of_elements] for i in range(num_of_lists)]\n", "\n", "l = generate_n_lists(2, 393, 0, 786)" ] }, { "cell_type": "code", "execution_count": null, "id": "94bf6ef1", "metadata": {}, "outputs": [], "source": [ "len(l), len(l[0]), len(l[1])" ] }, { "cell_type": "code", "execution_count": null, "id": "177c3843", "metadata": {}, "outputs": [], "source": [ "import numpy as np" ] }, { "cell_type": "code", "execution_count": null, "id": "f245d445", "metadata": {}, "outputs": [], "source": [ "import numpy as np\n", "data_tvt = (np.array(l[0][:195]), np.array(l[0][195:]), np.array(l[1]))\n", "print(type(data_tvt))\n", "print(len(data_tvt[0]),len(data_tvt[1]), len(data_tvt[2]))" ] }, { "cell_type": "code", "execution_count": null, "id": "d939f0f4", "metadata": {}, "outputs": [], "source": [ "with open(\"../../data/02_intermediate/user_behavior/data_tvt.pkl\",\"wb\") as f:\n", " pickle.dump(data_tvt,f)" ] }, { "cell_type": "markdown", "id": "2d788dea", "metadata": {}, "source": [ "### 7. Get node features using NLP models" ] }, { "cell_type": "markdown", "id": "d5ee3d5a", "metadata": {}, "source": [ "- To get node feature for user/author, we preprocess comments from each author, get their Top 10 used words and feed these words into word2vec model to get embeddings as author node features.\n", "- To get node feature for subreddit topic, we get the Top 10 used words for each topic and feed these words into word2vec model to get embeddings as subreddit topic node features. \n" ] }, { "cell_type": "markdown", "id": "57b1ba2c", "metadata": {}, "source": [ "#### Steps for comments/posts body processing are:\n", "1. Convert words to lower\n", "2. Remove numbers\n", "3. Remove punctuation and symbols\n", "4. Normalize the words (lemmatize and stem the words)" ] }, { "cell_type": "code", "execution_count": null, "id": "b55ab585", "metadata": {}, "outputs": [], "source": [ "from gensim.test.utils import common_texts\n", "from gensim.models import Word2Vec\n", "import re\n", "import collections\n", "import nltk\n", "nltk.download('punkt')\n", "nltk.download('stopwords')\n", "from nltk.tokenize import word_tokenize\n", "from nltk.stem import PorterStemmer\n", "import gensim.downloader" ] }, { "cell_type": "markdown", "id": "0c42a1bc", "metadata": {}, "source": [ "### Download the pretrained models" ] }, { "cell_type": "code", "execution_count": null, "id": "8fdb5e5b", "metadata": {}, "outputs": [], "source": [ "vectors = gensim.downloader.load('word2vec-google-news-300')" ] }, { "cell_type": "code", "execution_count": null, "id": "a1afc15e", "metadata": {}, "outputs": [], "source": [ "stopwords = set(nltk.corpus.stopwords.words('english'))\n", "stemmer= PorterStemmer()" ] }, { "cell_type": "markdown", "id": "8e415159", "metadata": {}, "source": [ "### Get the user node features (user2vec) " ] }, { "cell_type": "code", "execution_count": null, "id": "21654a07", "metadata": {}, "outputs": [], "source": [ "type(vectors['hi']),vectors['hi'].shape" ] }, { "cell_type": "code", "execution_count": null, "id": "d1bfec48", "metadata": {}, "outputs": [], "source": [ "final_user2vec_npy = np.zeros((len(u2index), 300))\n", "\n", "for u in u2index:\n", " user = edgelist_df.loc[edgelist_df['author'] == u]\n", " comment_row_list = []\n", " for index, rows in user.iterrows():\n", " my_list = rows.body\n", " my_list = my_list.replace('\\n',\" \")\n", " my_list = my_list.replace('\\t',\" \")\n", " my_list = my_list.lower()\n", " my_list = ''.join([i for i in my_list if not i.isdigit()])\n", " my_list = re.sub(r'[^\\w\\s]', ' ', my_list)\n", " tokens = word_tokenize(my_list)\n", " my_list = [i for i in tokens if not i in stopwords]\n", " comment_row_list.append(my_list)\n", " \n", " flat_list = [x for xs in comment_row_list for x in xs]\n", " counter = collections.Counter(flat_list)\n", " top10 = counter.most_common(10)\n", " #print(f'top 10 words used by {u} are:', top10)\n", " final_vectors = np.zeros((10, 300))\n", " for i, w in enumerate(top10):\n", " try:\n", " embedding = vectors[w[0]]\n", " #embedding = embedding.tolist()\n", " except:\n", " #print('no embeddings created for word: {}'.format(w[0]))\n", " embedding = np.array([0] * 300)\n", " final_vectors[i,:]=embedding\n", " final_embeddings = np.sum(final_vectors, axis=0) \n", "\n", "# if u2index[u] < 1:\n", "# print(final_vectors.shape, final_embeddings.shape)\n", " final_user2vec_npy[u2index[u],:] = final_embeddings" ] }, { "cell_type": "code", "execution_count": null, "id": "e48705b4", "metadata": {}, "outputs": [], "source": [ "final_user2vec_npy.shape" ] }, { "cell_type": "code", "execution_count": null, "id": "68e61049", "metadata": {}, "outputs": [], "source": [ "# Save the user2vec feature matrix \n", "userfeat_file = \"../../data/02_intermediate/user_behavior/user2vec_npy.npz\"\n", "np.savez(userfeat_file,data=final_user2vec_npy)" ] }, { "cell_type": "markdown", "id": "04cd3d2d", "metadata": {}, "source": [ "#### Get the subreddit topic node features (prod2vec)" ] }, { "cell_type": "code", "execution_count": null, "id": "6f062f63", "metadata": {}, "outputs": [], "source": [ "final_prod2vec_npy = np.zeros((len(p2index), 300))\n", "\n", "for p in p2index:\n", " subreddit = edgelist_df.loc[edgelist_df['subreddit'] == p]\n", " subreddit_row_list = []\n", " for index, rows in subreddit.iterrows():\n", " my_list = rows.body\n", " my_list = my_list.replace('\\n',\" \")\n", " my_list = my_list.replace('\\t',\" \")\n", " my_list = my_list.lower()\n", " my_list = ''.join([i for i in my_list if not i.isdigit()])\n", " my_list = re.sub(r'[^\\w\\s]', ' ', my_list)\n", " tokens = word_tokenize(my_list)\n", " my_list = [i for i in tokens if not i in stopwords]\n", " subreddit_row_list.append(my_list)\n", " \n", " flat_list = [x for xs in subreddit_row_list for x in xs]\n", " counter = collections.Counter(flat_list)\n", " top10 = counter.most_common(10)\n", " #print(f'top 10 words for subreddit topic {p} are:', top10)\n", "\n", " final_vectors = np.zeros((10, 300))\n", " for i, w in enumerate(top10):\n", " try:\n", " embedding = vectors[w[0]]\n", " #embedding = embedding.tolist()\n", " except:\n", " #print('no embeddings created for word: {}'.format(w[0]))\n", " embedding = np.array([0] * 300)\n", " final_vectors[i,:]=embedding\n", " final_embeddings = np.sum(final_vectors, axis=0)\n", " final_prod2vec_npy[p2index[p],:] = final_embeddings" ] }, { "cell_type": "code", "execution_count": null, "id": "4850cd9a", "metadata": {}, "outputs": [], "source": [ "type(final_prod2vec_npy),final_prod2vec_npy.shape" ] }, { "cell_type": "code", "execution_count": null, "id": "cdb8db6b", "metadata": {}, "outputs": [], "source": [ "# Save the prod2vec feature matrix \n", "prodfeat_file = \"../../data/02_intermediate/user_behavior/prod2vec_npy.npz\"\n", "np.savez(prodfeat_file,data=final_prod2vec_npy)" ] }, { "cell_type": "markdown", "id": "f2fa0ca1", "metadata": {}, "source": [ "# References\n", "\n", "Jason Baumgartner, Savvas Zannettou, Brian Keegan, Megan Squire, and Jeremy Blackburn. 2020. The Pushshift Reddit Dataset." ] } ], "metadata": { "kernelspec": { "display_name": "kedro-eland-venv", "language": "python", "name": "kedro-eland-venv" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.12" } }, "nbformat": 4, "nbformat_minor": 5 }