# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 import os import json import base64 import boto3 import pytest from moto import mock_s3 @pytest.fixture def kds_event(): data = json.dumps({"msg": "hello world"}) return { "Records": [ { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "eQRnj7LHcO9256584", "sequenceNumber": "49631468190676687901272308564704240562978646622126211730", "data": base64.b64encode(bytes(data, "utf-8")), "approximateArrivalTimestamp": 1658047150.32, }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000041:49631468190676687901272308564704240562978646622126211730", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456:role/LogHub-AppPipe-42fb0-LogProcessorLogProcessorFnSer-12R3WZ9O7QDWC", "awsRegion": "ap-northeast-1", "eventSourceARN": "arn:aws:kinesis:ap-northeast-1:123456:stream/LogHub-AppPipe-42fb0-Stream790BDEE4-ByiPt1XoNpow", }, ] } @pytest.fixture def msk_event(): data = json.dumps({"msg": "hello world"}) return { "eventSource": "aws:kafka", "eventSourceArn": "arn:aws:kafka:sa-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers": "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", "records": { "mytopic-0": [ { "topic": "mytopic", "partition": 0, "offset": 15, "timestamp": 1545084650987, "timestampType": "CREATE_TIME", "key": "abcDEFghiJKLmnoPQRstuVWXyz1234==", "value": base64.b64encode(bytes(data, "utf-8")), "headers": [], } ] }, } @pytest.fixture def s3_client(): bucket_name = os.environ.get("BACKUP_BUCKET_NAME") with mock_s3(): s3 = boto3.resource("s3", region_name="us-east-1") s3.create_bucket(Bucket=bucket_name) yield class Response: def __init__(self, status_code, text=""): self.status_code = status_code self.text = text def json(self): return json.loads(self.text) LOAD_SUCCESS_RESP = Response( 201, json.dumps( { "took": 196, "errors": False, "items": [ { "index": { "_index": "hello-elb-2022-03-31", "_type": "_doc", "_id": "hzE-338BYaSJE47C8kGt", "_version": 1, "result": "created", "_shards": {"total": 2, "successful": 1, "failed": 0}, "_seq_no": 0, "_primary_term": 1, "status": 201, } }, { "index": { "_index": "hello-elb-2022-03-31", "_type": "_doc", "_id": "iDE-338BYaSJE47C8kGt", "_version": 1, "result": "created", "_shards": {"total": 2, "successful": 1, "failed": 0}, "_seq_no": 1, "_primary_term": 1, "status": 201, } }, ], } ), ) LOAD_FAILED_RESP = Response( 201, json.dumps( { "took": 1, "errors": True, "items": [ { "index": { "_index": "hello-elb-2022-03-31", "_type": "_doc", "_id": None, "status": 400, "error": { "type": "illegal_argument_exception", "reason": "Validation Failed", }, } }, ], } ), ) def test_lambda_handler_kds_success(mocker, kds_event): os.environ["SOURCE"] = "KDS" import lambda_function mocker.patch("util.osutil.OpenSearch.exist_index_template", return_value=True) mocker.patch("util.osutil.OpenSearch.bulk_load", return_value=LOAD_SUCCESS_RESP) lambda_function.lambda_handler(kds_event, None) os.environ.pop("SOURCE") @mock_s3 def test_lambda_handler_kds_failed(mocker, kds_event): os.environ["SOURCE"] = "KDS" bucket_name = os.environ.get("BACKUP_BUCKET_NAME") s3 = boto3.resource("s3", region_name="us-east-1") s3.create_bucket(Bucket=bucket_name) import lambda_function mocker.patch("util.osutil.OpenSearch.exist_index_template", return_value=True) mocker.patch("util.osutil.OpenSearch.bulk_load", return_value=LOAD_FAILED_RESP) lambda_function.lambda_handler(kds_event, None) def test_lambda_handler_kds_cloudfront(): event = { "Records": [ { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "8bc2091b-cbed-43f4-9a01-d20a7854d38c", "sequenceNumber": "49635675801952128003396914033285311334825079587865624578", "data": "MTY3MTAwNTM1MS4xNDgJMjcuMC4zLjE1NgkwLjAwMQkyMDAJMjIxMglHRVQJaHR0cHMJZDJvbXN2bnJ4Mnl1NGkuY2xvdWRmcm9udC5uZXQJL2xvY2FsZXMvZW4vY29tbW9uLmpzb24/dj12MS4yLjAJNTAJTlJUNTctQzEJY0dYOUp4N0tGWDdVd1k4S3dSRlM1RnZGcmRVSkJwU0JxTWdhSFdpM3l1SEo3bWJscmh6VDVBPT0JZDJvbXN2bnJ4Mnl1NGkuY2xvdWRmcm9udC5uZXQJMC4wMDEJSFRUUC8yLjAJSVB2NAlNb3ppbGxhLzUuMCUyMChNYWNpbnRvc2g7JTIwSW50ZWwlMjBNYWMlMjBPUyUyMFglMjAxMF8xNV83KSUyMEFwcGxlV2ViS2l0LzUzNy4zNiUyMChLSFRNTCwlMjBsaWtlJTIwR2Vja28pJTIwQ2hyb21lLzEwOC4wLjAuMCUyMFNhZmFyaS81MzcuMzYJLQktCXY9djEuMi4wCUhpdAktCVRMU3YxLjMJVExTX0FFU18xMjhfR0NNX1NIQTI1NglIaXQJLQktCWFwcGxpY2F0aW9uL2pzb24JLQktCS0JMTc5MjkJSGl0CUpQCWd6aXAsJTIwZGVmbGF0ZSwlMjBicgkqLyoJKglob3N0OmQyb21zdm5yeDJ5dTRpLmNsb3VkZnJvbnQubmV0JTBBcHJhZ21hOm5vLWNhY2hlJTBBY2FjaGUtY29udHJvbDpuby1jYWNoZSUwQXNlYy1jaC11YTolMjJOb3Q/QV9CcmFuZCUyMjt2PSUyMjglMjIsJTIwJTIyQ2hyb21pdW0lMjI7dj0lMjIxMDglMjIsJTIwJTIyR29vZ2xlJTIwQ2hyb21lJTIyO3Y9JTIyMTA4JTIyJTBBc2VjLWNoLXVhLW1vYmlsZTo/MCUwQXVzZXItYWdlbnQ6TW96aWxsYS81LjAlMjAoTWFjaW50b3NoOyUyMEludGVsJTIwTWFjJTIwT1MlMjBYJTIwMTBfMTVfNyklMjBBcHBsZVdlYktpdC81MzcuMzYlMjAoS0hUTUwsJTIwbGlrZSUyMEdlY2tvKSUyMENocm9tZS8xMDguMC4wLjAlMjBTYWZhcmkvNTM3LjM2JTBBc2VjLWNoLXVhLXBsYXRmb3JtOiUyMm1hY09TJTIyJTBBYWNjZXB0OiovKiUwQXNlYy1mZXRjaC1zaXRlOnNhbWUtb3JpZ2luJTBBc2VjLWZldGNoLW1vZGU6Y29ycyUwQXNlYy1mZXRjaC1kZXN0OmVtcHR5JTBBYWNjZXB0LWVuY29kaW5nOmd6aXAsJTIwZGVmbGF0ZSwlMjBiciUwQWFjY2VwdC1sYW5ndWFnZTplbix6aC1DTjtxPTAuOSx6aDtxPTAuOCUwQQlob3N0JTBBcHJhZ21hJTBBY2FjaGUtY29udHJvbCUwQXNlYy1jaC11YSUwQXNlYy1jaC11YS1tb2JpbGUlMEF1c2VyLWFnZW50JTBBc2VjLWNoLXVhLXBsYXRmb3JtJTBBYWNjZXB0JTBBc2VjLWZldGNoLXNpdGUlMEFzZWMtZmV0Y2gtbW9kZSUwQXNlYy1mZXRjaC1kZXN0JTBBYWNjZXB0LWVuY29kaW5nJTBBYWNjZXB0LWxhbmd1YWdlJTBBCTEzCUUzMlFKUTM5NEJXQkVJCWQyb21zdm5yeDJ5dTRpLmNsb3VkZnJvbnQubmV0CS0JLQkxNjUwOQo=", "approximateArrivalTimestamp": 1671005356.48, }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000000:49635675801952128003396914033285311334825079587865624578", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::0000000000:role/service-role/KinesisTester-role-2gt0xp91", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:0000000000:stream/LogHub-AppPipe-43ed3-KDSBufferStream21B531A6-h4zaMOJOBl51", } ] } from lambda_function import process_kds_event, parse_sinle_tsv_line assert process_kds_event(event, parse_func=parse_sinle_tsv_line) == [ { "timestamp": "1671005351.148", "c-ip": "27.0.3.156", "time-to-first-byte": "0.001", "sc-status": "200", "sc-bytes": "2212", "cs-method": "GET", "cs-protocol": "https", "cs-host": "d2omsvnrx2yu4i.cloudfront.net", "cs-uri-stem": "/locales/en/common.json?v=v1.2.0", "cs-bytes": "50", "x-edge-location": "NRT57-C1", "x-edge-request-id": "cGX9Jx7KFX7UwY8KwRFS5FvFrdUJBpSBqMgaHWi3yuHJ7mblrhzT5A==", "x-host-header": "d2omsvnrx2yu4i.cloudfront.net", "time-taken": "0.001", "cs-protocol-version": "HTTP/2.0", "c-ip-version": "IPv4", "cs-user-agent": "Mozilla/5.0%20(Macintosh;%20Intel%20Mac%20OS%20X%2010_15_7)%20AppleWebKit/537.36%20(KHTML,%20like%20Gecko)%20Chrome/108.0.0.0%20Safari/537.36", "cs-referer": None, "cs-cookie": None, "cs-uri-query": "v=v1.2.0", "x-edge-response-result-type": "Hit", "x-forwarded-for": None, "ssl-protocol": "TLSv1.3", "ssl-cipher": "TLS_AES_128_GCM_SHA256", "x-edge-result-type": "Hit", "fle-encrypted-fields": None, "fle-status": None, "sc-content-type": "application/json", "sc-content-len": None, "sc-range-start": None, "sc-range-end": None, "c-port": "17929", "x-edge-detailed-result-type": "Hit", "c-country": "JP", "cs-accept-encoding": "gzip,%20deflate,%20br", "cs-accept": "*/*", "cache-behavior-path-pattern": "*", "cs-headers": "host:d2omsvnrx2yu4i.cloudfront.net%0Apragma:no-cache%0Acache-control:no-cache%0Asec-ch-ua:%22Not?A_Brand%22;v=%228%22,%20%22Chromium%22;v=%22108%22,%20%22Google%20Chrome%22;v=%22108%22%0Asec-ch-ua-mobile:?0%0Auser-agent:Mozilla/5.0%20(Macintosh;%20Intel%20Mac%20OS%20X%2010_15_7)%20AppleWebKit/537.36%20(KHTML,%20like%20Gecko)%20Chrome/108.0.0.0%20Safari/537.36%0Asec-ch-ua-platform:%22macOS%22%0Aaccept:*/*%0Asec-fetch-site:same-origin%0Asec-fetch-mode:cors%0Asec-fetch-dest:empty%0Aaccept-encoding:gzip,%20deflate,%20br%0Aaccept-language:en,zh-CN;q=0.9,zh;q=0.8%0A", "cs-header-names": "host%0Apragma%0Acache-control%0Asec-ch-ua%0Asec-ch-ua-mobile%0Auser-agent%0Asec-ch-ua-platform%0Aaccept%0Asec-fetch-site%0Asec-fetch-mode%0Asec-fetch-dest%0Aaccept-encoding%0Aaccept-language%0A", "cs-headers-count": "13", "primary-distribution-id": "E32QJQ394BWBEI", "primary-distribution-dns-name": "d2omsvnrx2yu4i.cloudfront.net", "origin-fbl": None, "origin-lbl": None, "asn": "16509", } ]