#!/usr/bin/env python3 # -*- encoding: utf-8 -*- # vim: tabstop=2 shiftwidth=2 softtabstop=2 expandtab import sys import json import os import base64 import traceback import hashlib import datetime import boto3 from elasticsearch import Elasticsearch from elasticsearch import RequestsHttpConnection from requests_aws4auth import AWS4Auth ES_INDEX, ES_TYPE = (os.getenv('ES_INDEX', 'retail'), os.getenv('ES_TYPE', 'trans')) ES_HOST = os.getenv('ES_HOST') REQUIRED_FIELDS = [e for e in os.getenv('REQUIRED_FIELDS', '').split(',') if e] DATE_TYPE_FIELDS = [e for e in os.getenv('DATE_TYPE_FIELDS', '').split(',') if e] DATE_FORMAT = os.getenv('DATE_FORMAT', '%Y-%m-%d %H:%M:%S') AWS_REGION = os.getenv('REGION_NAME', 'us-east-1') session = boto3.Session(region_name=AWS_REGION) credentials = session.get_credentials() credentials = credentials.get_frozen_credentials() access_key, secret_key, token = (credentials.access_key, credentials.secret_key, credentials.token) aws_auth = AWS4Auth( access_key, secret_key, AWS_REGION, 'es', session_token=token ) es_client = Elasticsearch( hosts = [{'host': ES_HOST, 'port': 443}], http_auth=aws_auth, use_ssl=True, verify_certs=True, connection_class=RequestsHttpConnection ) print('[INFO] ElasticSearch Service', json.dumps(es_client.info(), indent=2), file=sys.stderr) def lambda_handler(event, context): import collections counter = collections.OrderedDict([('reads', 0), ('writes', 0), ('invalid', 0), ('index_errors', 0), ('errors', 0)]) doc_list = [] for record in event['Records']: try: counter['reads'] += 1 payload = base64.b64decode(record['kinesis']['data']).decode('utf-8') json_data = json.loads(payload) if not any([json_data.get(k, None) for k in REQUIRED_FIELDS]): counter['invalid'] += 1 continue doc_id = ':'.join([json_data.get(k, '') for k in REQUIRED_FIELDS if k]) json_data['doc_id'] = hashlib.md5(doc_id.encode('utf-8')).hexdigest()[:8] for k in DATE_TYPE_FIELDS: if k in json_data: dt = datetime.datetime.strptime(json_data[k], DATE_FORMAT) json_data[k] = dt.strftime("%Y-%m-%dT%H:%M:%SZ") # es_index_action_meta = {"index": {"_index": ES_INDEX, "_type": ES_TYPE, "_id": json_data['doc_id']}} es_index_action_meta = {"index": {"_index": ES_INDEX, "_id": json_data['doc_id']}} doc_list.append(es_index_action_meta) doc_list.append(json_data) counter['writes'] += 1 except Exception as ex: counter['errors'] += 1 traceback.print_exc() if doc_list: try: es_bulk_body = '\n'.join([json.dumps(e) for e in doc_list]) res = es_client.bulk(body=es_bulk_body, index=ES_INDEX, refresh=True) except Exception as ex: counter['index_errors'] += 1 traceback.print_exc() print('[INFO]', ', '.join(['{}={}'.format(k, v) for k, v in counter.items()]), file=sys.stderr) if __name__ == '__main__': kinesis_data = [ '''{"Invoice": "489434", "StockCode": "85048", "Description": "15CM CHRISTMAS GLASS BALL 20 LIGHTS", "Quantity": 12, "InvoiceDate": "2009-12-01 07:45:00", "Price": 6.95, "Customer_ID": "13085.0", "Country": "United Kingdom"}''', '''{"Invoice": "489435", "StockCode": "22350", "Description": "CAT BOWL ", "Quantity": 12, "InvoiceDate": "2009-12-01 07:46:00", "Price": 2.55, "Customer_ID": "13085.0", "Country": "United Kingdom"}''', '''{"Invoice": "489436", "StockCode": "48173C", "Description": "DOOR MAT BLACK FLOCK ", "Quantity": 10, "InvoiceDate": "2009-12-01 09:06:00", "Price": 5.95, "Customer_ID": "13078.0", "Country": "United Kingdom"}''', '''{"Invoice": "491970", "StockCode": "21218", "Description": "RED SPOTTY BISCUIT TIN", "Quantity": 2, "InvoiceDate": "2009-12-14 18:03:00", "Price": 8.65, "Customer_ID": "", "Country": "United Kingdom"}''', ] records = [{ "eventID": "shardId-000000000000:49545115243490985018280067714973144582180062593244200961", "eventVersion": "1.0", "kinesis": { "approximateArrivalTimestamp": 1428537600, "partitionKey": "partitionKey-1", "data": base64.b64encode(e.encode('utf-8')), "kinesisSchemaVersion": "1.0", "sequenceNumber": "49545115243490985018280067714973144582180062593244200961" }, "invokeIdentityArn": "arn:aws:iam::EXAMPLE", "eventName": "aws:kinesis:record", "eventSourceARN": "arn:aws:kinesis:EXAMPLE", "eventSource": "aws:kinesis", "awsRegion": "us-east-1" } for e in kinesis_data] event = {"Records": records} lambda_handler(event, {})