{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "## Sentence tokenizer with file slicing with size of 5000\n", "\n", "This notebook helps to tokenize the paragraph into tokens of sentence using IO tokenizer. Also this has the functionality slice them into files of 5KB required as part of the further processing like translation and comprehend services.This note book also uses KMS encryption." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "import io\n", "import sys\n", "import logging\n", "import json\n", "import boto3\n", "import pandas as pd\n", "from pandas.errors import ParserError" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We would initialize the S3 object call the stringIO " ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "bucket = < enter your bucket name >\n", "subfolder = < enter the subfolder>\n", "conn = boto3.client('s3')\n", "contents = conn.list_objects(Bucket=bucket, Prefix=subfolder)['Contents']\n", "\n", "# getting the execution role to download the file\n", "role = get_execution_role()\n", "\n", "s3_resource = boto3.resource('s3')\n", "csv_buffer = StringIO()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We would write function to write files to S3 \n" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "def write_file(s3_key, file1, file2, count, serial_object, bucket):\n", " \"\"\"\n", " this function used for writing the files to S3\n", " \"\"\"\n", " s3_resource = boto3.resource('s3')\n", " if s3_key.endswith('.txt'):\n", " \n", " object_name = '{}'.format(s3_key.split('.txt')[0])\n", " state2_obj_name = object_name.replace(file1, file2)\n", " object_key = state2_obj_name + '_' + str(count + 1) + '.txt'\n", "\n", " s3_resource.Object(bucket, f\"{object_key}\").put(Body=serial_object,\n", " ServerSideEncryption='aws:kms')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We would have function for splitting files into less than 5000 bytes size. As there could be residual sentecne of the size of 500 bytes we keep tolerance to 4500 bytes" ] }, { "cell_type": "code", "execution_count": 25, "metadata": {}, "outputs": [], "source": [ "def split_df4(S3_Key, df, file1, file2, bucket):\n", " \"\"\"\n", " splitting the into chunks less than 5000 bytes\n", " \"\"\"\n", " re_size = 5002\n", " df2 = pd.DataFrame()\n", " # as many of the functions may be having more than 500 bytes so the tolerance 5000 - 500\n", " rest = sys.getsizeof(df) // 4500\n", "\n", " if sys.getsizeof(df) < 5000:\n", " serial_object = df.to_csv(None, index=False, header=False, sep=' ')\n", " write_file(S3_Key, file1, file2, 0, serial_object, bucket)\n", "\n", " else:\n", "\n", " for i in range(rest):\n", "\n", " if i > 0:\n", " # working for the cases of df being on the higher side of 5000\n", " re_size = sys.getsizeof(df2)\n", " df = df2\n", " df2 = pd.DataFrame()\n", "\n", " else:\n", " # looping for the file size\n", " sys.getsizeof(df)\n", " last_row = df.tail(1)\n", " df2 = df2.append(last_row, ignore_index=True)\n", "\n", " while re_size > 5000:\n", " df = df.dropna()\n", " last_row = df.tail(1)\n", " df2 = df2.append(last_row, ignore_index=True)\n", " re_size = sys.getsizeof(df)\n", " df = df[: -1]\n", " df2 = df2.iloc[::-1]\n", " df2 = df2.reset_index(drop=True)\n", " serial_object = df.to_csv(None, index=False, header=False, sep=' ')\n", " if sys.getsizeof(serial_object) > 62:\n", " write_file(S3_Key, file1, file2, i, serial_object, bucket)\n", " # print(\"df\"+str(i),df )\n", " df2 = df2[: -1]\n", " df = pd.DataFrame() # do not remove this required for reset of dataframe for every loop\n", " serial_object = df2.to_csv(None, index=False, header=False, sep=' ')\n", " \n", " # in the dataframe used we had the column name size of 62 bytes so zero entry file would have min\n", " # if there are any un treated junk characterds are remaining we are removing it\n", " if 62 < sys.getsizeof(serial_object) < 5000:\n", " write_file(S3_Key, file1, file2, i + 1, serial_object, bucket)\n", " i=0\n", " df2 = pd.DataFrame() # do not remove this required for reset of dataframe for every loop\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def main():\n", " \"\"\"\n", " main function\n", " \"\"\"\n", " conn = boto3.client('s3')\n", " contents = conn.list_objects(Bucket=bucket, Prefix=sub_folder)['Contents']\n", " file_1 = input_prefix + \"/\" + run_date\n", " file_2 = output_prefix + \"/\" + run_date\n", " for file in contents:\n", " # print(f['Key'])\n", " # we can select a file to do tokenization as 1613953.txt\n", " s3_key = file['Key']\n", " if s3_key == sub_folder:\n", " continue\n", " s3_object = conn.get_object(Bucket=bucket, Key=s3_key)\n", " try:\n", " df = pd.read_csv(io.BytesIO(s3_object['Body'].read()), sep='delimiter', engine='python')\n", " except ParserError:\n", " df = pd.read_csv(io.BytesIO(s3_object['Body'].read()), error_bad_lines=False,\n", " names=['token'], encoding='utf-8')\n", "\n", " # cleaning the data frame\n", " # print(df)\n", " split_df4(s3_key, df, file_1, file_2, bucket)\n", "\n", "\n", "if __name__ == \"__main__\":\n", " main()" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.5" } }, "nbformat": 4, "nbformat_minor": 5 }