import boto3
import csv
import json
from time import sleep
from datetime import datetime

# S3 buckect details (UPDATE THIS <BUCKET_NAME>)
BUCKET_NAME = "ecommerce-raw-us-east-1-dev"
KEY = "ecomm_user_activity_sample/202019-Nov-sample.csv"

# AWS Settings
s3 = boto3.client('s3', region_name='us-east-1')
s3_resource = boto3.resource('s3', region_name='us-east-1')
kinesis_client = boto3.client('kinesis', region_name='us-east-1')

# Kinesis Details 
kinesis_stream_name = 'ecommerce-raw-user-activity-stream-1'
streaming_partition_key = 'category_id'

# Function can be converted to Lambda;
#   i.e. by iterating the S3-put events records; e.g. record['s3']['bucket']['name']
def stream_data_simulator(input_s3_bucket, input_s3_key):
    s3_bucket = input_s3_bucket
    s3_key = input_s3_key

    # Read CSV Lines and split the file into lines
    csv_file = s3_resource.Object(s3_bucket, s3_key)
    s3_response = csv_file.get()
    lines = s3_response['Body'].read().decode('utf-8').split('\n')

    for row in csv.DictReader(lines):
        try:
            # Convert to JSON, to make it easier to work in Kinesis Analytics
            line_json = json.dumps(row)
            json_load = json.loads(line_json)

            # Adding fake txn ts:
            json_load['txn_timestamp'] = datetime.now().isoformat()
            # print(json_load)

            # Write to Kinesis Streams:
            response = kinesis_client.put_record(StreamName=kinesis_stream_name, Data=json.dumps(json_load, indent=4),
                                                 PartitionKey=str(json_load[streaming_partition_key]))
            # response['category_code'] = json_load['category_code']
            print('HttpStatusCode:', response['ResponseMetadata']['HTTPStatusCode'], ', ', json_load['category_code'])
            # print(response)
            
            # Adding a temporary pause, for demo-purposes:
            sleep(0.5)

        except Exception as e:
            print('Error: {}'.format(e))


# Run stream:
for i in range(0, 5):
    stream_data_simulator(input_s3_bucket=BUCKET_NAME,
                          input_s3_key=KEY)