#Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
#SPDX-License-Identifier: MIT-0
import os
import logging
import jsonlines
from itertools import chain, islice
from collections import Counter
import numpy as np
import pandas as pd
import multiprocessing
import sklearn
import nltk
nltk.download('punkt')
nltk.download('wordnet')
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()
# constants
BOS_SYMBOL = ""
EOS_SYMBOL = ""
UNK_SYMBOL = ""
PAD_SYMBOL = ""
PAD_ID = 0
TOKEN_SEPARATOR = " "
VOCAB_SYMBOLS = [PAD_SYMBOL, UNK_SYMBOL, BOS_SYMBOL, EOS_SYMBOL]
LABEL_DICT = {1:'positive', 0:'negative'}
def pre_model_data_preprocessing(data, textual_columns, features_columns, category_column):
"""
This function is used ensure the same robust data processing and cleaning for both training and inference.
It performs the following steps:
-Creating a common text_information column based on a list of defined columns (textual_columns)
-Keeps only records where textual information is not missing
-Drops duplicates and encodes the category values usign the sklearn label encoder
Arguments:
data {pandas.DataFrame} -- The input dataset
textual_columns {List} -- The list of columns that contains textual information and will be aggregated into one column
features_columns {List} -- The list of columns that will be used in the rest of the process
category_column {List} -- The column used as a category
Returns:
pd.DataFrame -- The processed dataset containing additional textual column "textual information"
"""
logger.info(f"Shape of data given in input :{data.shape}")
for column in textual_columns:
data[column] = data[column].fillna("")
data[column] = data[column].astype("str")
data["text_information"] = data[textual_columns].agg(' '.join, axis=1)
#Making sure there is no null values after processing..
data = data[~data["text_information"].isnull()]
data = data[data["text_information"]!=""]
logger.info(f"After droping missing textual information: {data.shape}")
#Droping duplicates
data = data.drop_duplicates(subset=['text_information'])
logger.info(f"Total number of records after droping duplicates : {data.shape[0]}")
#Keeping only the feature columns
l_encoder = sklearn.preprocessing.LabelEncoder()
data["category_id"] = l_encoder.fit_transform(data[category_column])
data = data[features_columns]
#Making sure feature columns are in string format
for column in features_columns:
data[column] = data[column].astype("str")
data = data.reset_index(drop=True)
return data
def get_tokens(line, tokenizer):
"""
Yields tokens from input string.
Args:
line {String} -- The input string
tokenizer {nltk.tokenize.TreebankWordTokenizer} -- The tokenizer object allows to split a text string to a list of tokens
Yields:
token {String} -- Iterats over tokens
"""
for token in tokenizer.tokenize(line):
if len(token) > 0:
yield token
def sentence_to_integers(sentence, tokenizer, word_dict):
"""
Converts a sentence of text to a list of tokens using the word dictionary
Arguments:
sentence {string} -- [description]
tokenizer {nltk.tokenize.TreebankWordTokenizer} -- The tokenizer object allows to split a text string to a list of tokens
word_dict {dict} -- The vocabulary dictionary, this dictionary maps a word to it's id
Returns:
list -- a list of tokens
"""
return [word_dict[token] for token in get_tokens(sentence, tokenizer)
if token in word_dict]
def get_tokens_from_pairs(input_dict, tokenizer):
"""
Parse the dictionary and tokenize the textual information inside
Arguments:
input_dict {dict} -- [description]
tokenizer {nltk.tokenize.TreebankWordTokenizer} -- The tokenizer object allows to split a text string to a list of tokens
Returns:
Iterator -- An iterator containing all textual information
"""
iter_list = []
for sentence_key in ['left_text_information', 'right_text_information']:
sentence = input_dict[sentence_key]
iter_list.append(get_tokens(sentence, tokenizer))
return chain(iter_list[0], iter_list[1])
def build_vocab(data):
"""
Creates a vocabulary mapping from words to ids. Increasing integer ids are assigned by word frequency,
using lexical sorting as a tie breaker. The only exception to this are special symbols such as the padding symbol
(PAD).
Arguments:
data {list} : List of pairs
Returns:
List -- a list of tokens
"""
results = []
tokenizer = nltk.tokenize.TreebankWordTokenizer()
for line in data:
for token in get_tokens_from_pairs(line, tokenizer):
if token not in set(VOCAB_SYMBOLS):
results.append(token)
return results
def build_vocab_parallel(data_iter, num_words=50000, min_count=1, use_reserved_symbols=True,
sort=True):
"""
Calls the "build_vocab()" in a parallel mode, splits the data_iter, after loading it; given as input
based on the number of cpus available and calls the function
Arguments:
data_iter {Iterator} -- Sequence of sentences containing whitespace delimited tokens.
Keyword Arguments:
num_words {int} -- Maximum number of words in the vocabulary. (default: {50000})
min_count {int} -- Minimum occurrences of words to be included in the vocabulary. (default: {1})
use_reserved_symbols {bool} -- If we're using the reserver symbols dictionary (default: {True})
sort {bool} -- If the vocab should be sorted or not (default: {True})
Returns:
dict -- The vocabulary dictionary, this dictionary maps a word to it's id
"""
logger.info("Preparing data for prarallel processing")
unloaded_lines = [e for e in data_iter]
nb_cpus = multiprocessing.cpu_count()
chunk_size = int(len(unloaded_lines) // nb_cpus)
nb_chunks = int(len(unloaded_lines) / chunk_size)
# This might fill up the memory, use carefully, check with htop
list_datasets = []
for i in range(nb_chunks + 1):
sub_unloaded_lines = unloaded_lines[i * chunk_size: (i + 1) * chunk_size]
list_datasets.append(sub_unloaded_lines)
del sub_unloaded_lines
logger.info("Launching parallel generation of vocab")
logger.info(f"Number of cpus uses : {nb_cpus}")
logger.info(f"Number of params combination : {len(list_datasets)}")
p = multiprocessing.Pool(nb_cpus)
results = p.map(build_vocab, list_datasets)
p.close()
all_tokens = []
for result in results:
all_tokens.extend(result)
raw_vocab = Counter(all_tokens)
logger.info(f"Initial vocabulary: {len(raw_vocab)} types")
#For words with the same count, they will be ordered reverse alphabetically.
pruned_vocab = sorted(((c, w) for w, c in raw_vocab.items() if c >= min_count), reverse=True)
logger.info(f"Pruned vocabulary: {len(pruned_vocab)} types (min frequency {min_count})")
#Truncate the vocabulary to fit size num_words (only includes the most frequent ones)
vocab = islice((w for c, w in pruned_vocab), num_words)
if sort:
#Sort the vocabulary alphabetically
vocab = sorted(vocab)
if use_reserved_symbols:
vocab = chain(set(VOCAB_SYMBOLS), vocab)
word_to_id = {word: idx for idx, word in enumerate(vocab)}
logger.info(f"Final vocabulary: {len(word_to_id)}")
if use_reserved_symbols:
# Important: pad symbol becomes index 0
assert word_to_id[PAD_SYMBOL] == PAD_ID
return raw_vocab, word_to_id
def convert_text_to_integers(data_iter, word_to_id, output_path):
"""
This function converts the left and right pairs from the textual information
to numerical values using the vocabulary provided
Args:
data_iter {Iterator} -- Sequence of sentences containing whitespace delimited tokens.
word_to_id {dict} -- The vocabulary dictionary, this dictionary maps a word to it's id
output_path {String} -- The path to which the outputs are saved
"""
tokenizer = nltk.tokenize.TreebankWordTokenizer()
count = 0
max_seq_length = 0
with jsonlines.open(output_path, mode='w') as writer:
for in_dict in data_iter:
out_dict = dict()
label = in_dict['pair_label']
if label in LABEL_DICT:
rsentence1 = in_dict['left_text_information']
rsentence2 = in_dict['right_text_information']
for idx, sentence in enumerate([rsentence1, rsentence2]):
# logger.info(count, sentence)
s = sentence_to_integers(sentence, tokenizer, word_to_id)
out_dict[f'in{idx}'] = s
max_seq_length = max(len(s), max_seq_length)
out_dict['pair_label'] = label
writer.write(out_dict)
else:
logger.info(label)
count += 1
logger.info(f"There are in total {count} invalid labels")
logger.info(f"The max length of converted sequence is {max_seq_length}")
return
def generate_negative_pairs_for_cat(X, category_id, max_negative_per_cat, nb_categories):
"""[summary]
Args:
X {pd.Dataframe}: The dataframe from which we'll generate the pairs
category_id {String}: The category id for which we want to generate negative pairs
max_negative_per_cat {int}: The maximum number of samples we'll generate for this category
nb_categories {int}: The total number of categories in the dataset
Returns:
pd.Dataframe: A pandas dataframe containing left and right product information (i.e: id, textual information, category)
"""
logger.info(f"Generating negative pairs for category nb : {category_id}")
X_same_cat = X[X["category_id"]==category_id]
X_diff_cat = X[X["category_id"]!=category_id]
if len(X_same_cat) < max_negative_per_cat:
sample_left_pair = X_same_cat.sample(max_negative_per_cat, replace=True)
else:
sample_left_pair = X_same_cat.sample(max_negative_per_cat)
if len(X_diff_cat) < max_negative_per_cat:
sample_right_pair = X_diff_cat.sample(max_negative_per_cat, replace=True)
else:
sample_right_pair = X_diff_cat.sample(max_negative_per_cat)
sample_left_pair = sample_left_pair.rename(columns={"id":"left_id","category_id":"left_category_id",\
"text_information":"left_text_information"})
sample_right_pair = sample_right_pair.rename(columns={"id":"right_id","category_id":"right_category_id",\
"text_information":"right_text_information"})
sample_left_pair = sample_left_pair.reset_index(drop=True)
sample_right_pair = sample_right_pair.reset_index(drop=True)
product_pairs = pd.concat([sample_left_pair, sample_right_pair], axis=1)
return product_pairs
def generate_sentence_pairs(X, limits):
"""
This function creates negative and positive pairs, it uses a stratified sampling to ensure
representation of categories distribution in the data.
Instead of creating all possible pairs then scoping down, we take the opposit approach working backwards
from the target number of records we aim to generate
Arguments:
X {pd.DataFrame} -- The data scoped down to "feature_columns"
limits {dict} -- A dictionary of limits containing "TOTAL_NB_OF_RECORDS" and "PC_POSITIVE" where:
- TOTAL_NB_OF_RECORDS : The total number of positive and negative pairs we want to generate
- PC_POSITIVE : The percentage of the positive pairs from TOTAL_NB_OF_RECORDS
Returns:
sentences_data_negative : The list of pairs that represent a negative combination (see note)
sentences_data_positive : The list of pairs that represent a positive combination (see note)
negative_indices : The list of unique ids that represent negative indices
positive_indices : The list of unique ids that represent positive indices
Note - Each pair contains the following :contains the following keys: "pair_id", "pair_label",
"left_id", "left_category_id", "left_text_information", "right_id", "right_category_id", "right_text_information"
"""
logger.info("Using the following limits to generate pairs :")
logger.info(limits)
logger.info("----------------------------------------------")
TOTAL_NB_OF_RECORDS = limits["TOTAL_NB_OF_RECORDS"]
PC_POSITIVE = limits["PC_POSITIVE"]
TOTAL_NB_OF_POSITIVE_RECORDS = int(np.ceil(PC_POSITIVE*TOTAL_NB_OF_RECORDS))
TOTAL_NB_OF_NEGATIVE_RECORDS = TOTAL_NB_OF_RECORDS - TOTAL_NB_OF_POSITIVE_RECORDS
category_ids = X["category_id"].unique()
nb_categories = len(category_ids)
max_positive_per_cat = int(np.ceil(TOTAL_NB_OF_POSITIVE_RECORDS/nb_categories))
max_negative_per_cat = int(np.ceil(TOTAL_NB_OF_NEGATIVE_RECORDS/nb_categories))
logger.info(f"Number of categories is : {nb_categories}")
logger.info(f"Number of requested records is : {TOTAL_NB_OF_RECORDS}")
logger.info(f"Number of positive pairs to generate per category are : {max_positive_per_cat}")
logger.info(f"Number of negative pairs to generate per category are : {max_negative_per_cat}")
if max_negative_per_cat != max(max_negative_per_cat, nb_categories):
logger.info('Warning, the max negative pair per cat is lower than the total number of categories')
#Generating positive pair for each category id
positive_product_pairs = []
for category_id in X["category_id"].unique():
logger.info(f"Generating positive pairs for category nb : {category_id}")
X_same_cat = X[X["category_id"]==category_id]
if len(X_same_cat) < max_positive_per_cat:
sample_left_pair = X_same_cat.sample(max_positive_per_cat, replace=True)
sample_right_pair = X_same_cat.sample(max_positive_per_cat, replace=True)
else:
sample_left_pair = X_same_cat.sample(max_positive_per_cat)
sample_right_pair = X_same_cat.sample(max_positive_per_cat)
sample_left_pair = sample_left_pair.rename(columns={"id":"left_id","category_id":"left_category_id",\
"text_information":"left_text_information"})
sample_right_pair = sample_right_pair.rename(columns={"id":"right_id","category_id":"right_category_id",\
"text_information":"right_text_information"})
sample_left_pair = sample_left_pair.reset_index(drop=True)
sample_right_pair = sample_right_pair.reset_index(drop=True)
product_pairs = pd.concat([sample_left_pair, sample_right_pair],axis=1)
positive_product_pairs.append(product_pairs)
#Concatenating and adding unique pair ids and label=1
positive_product_pairs = pd.concat(positive_product_pairs)
positive_product_pairs = positive_product_pairs.reset_index(drop=True)
positive_product_pairs["pair_id"] = range(len(positive_product_pairs))
positive_product_pairs["pair_label"] = 1
#Generating negative pair for each category id
negative_product_pairs = []
for category_id in category_ids:
negative_product_pairs.append(generate_negative_pairs_for_cat(X, category_id, max_negative_per_cat, nb_categories))
#Concatenating and adding unique pair ids and label=0
negative_product_pairs = pd.concat(negative_product_pairs, axis=0)
negative_product_pairs = negative_product_pairs.reset_index(drop=True)
negative_product_pairs["pair_id"] = range(len(negative_product_pairs))
negative_product_pairs["pair_label"] = 0
logger.info("Performing checks on generated pairs...")
if np.sum(positive_product_pairs["left_category_id"]==positive_product_pairs["right_category_id"]) == positive_product_pairs.shape[0]:
logger.info("Check passed : All positive pairs have the same category ids")
else:
#Raising an exception would be better
logger.warning("Check failed : Warning, not all positive pairs have the same category ids")
if np.sum(negative_product_pairs["left_category_id"]==negative_product_pairs["right_category_id"]) == 0:
logger.info("Check passed : All negative pairs have different category ids")
else:
#Raising an exception would be better
logger.warning("Check failed : Warning, not all negative pairs have different category ids")
#Transforming to json records
sentences_data_positive = positive_product_pairs.to_dict('records')
sentences_data_negative = negative_product_pairs.to_dict('records')
positive_indices = positive_product_pairs["pair_id"].values
negative_indices = negative_product_pairs["pair_id"].values
return sentences_data_negative, sentences_data_positive, negative_indices, positive_indices
def transform_textual_records_to_numerical(textual_records, tokenizer, word_to_id):
"""
This functions transforms the textual infromation in each record to numerical
information using the word_to_id vocabulary and the tokenizer passed in parameters
Arguments:
textual_records {List} -- A list of pairs where each pair contains two textual descriptions and a label
tokenizer {nltk.tokenize.TreebankWordTokenizer} -- The tokenizer object allows to split a text string to a list of tokens
word_to_id {dict} -- The vocabulary dictionary, this dictionary maps a word to it's id
Returns:
A List of pairs where each textual pair is replaced by a numerical pair
"""
textual_records["in0"] = textual_records["left_text_information"].parallel_apply(lambda x:sentence_to_integers(x, tokenizer, word_to_id))
textual_records["in1"] = textual_records["right_text_information"].parallel_apply(lambda x:sentence_to_integers(x, tokenizer, word_to_id))
numerical_records = textual_records[["in0","in1","pair_label"]]
numerical_records = numerical_records.rename(columns={"pair_label":"label"})
numerical_records = numerical_records.to_dict('records')
return numerical_records
def check_data_leak(training_records, test_records):
"""
This function is used to ensure there is no records being present in both the training and test records
Arguments:
training_records {List} -- The list of training pairs (see note)
test_records {List} -- The list of test pairs (see note)
Returns:
boolean -- True meaning that there a pair that is present both in trainign and test, False otherwise.
Note - Each pair contains the following :contains the following keys: "pair_id", "pair_label",
"left_id", "left_category_id", "left_text_information", "right_id", "right_category_id", "right_text_information"
"""
train_sentence_ids = []
for e in training_records:
train_sentence_ids.append(e["left_id"])
train_sentence_ids.append(e["right_id"])
train_sentence_ids = np.unique(train_sentence_ids)
test_sentence_ids = []
for e in test_records:
test_sentence_ids.append(e["left_id"])
test_sentence_ids.append(e["right_id"])
test_sentence_ids = np.unique(test_sentence_ids)
if len(set(train_sentence_ids).intersection(test_sentence_ids)) == 0:
return False
else:
return True