import os import json import boto3 import yfinance as yf from time import sleep from kafka import KafkaProducer client = boto3.client('kafka') secret_client = boto3.client('secretsmanager') def lambda_handler(event, context): test_data = { 'Open': [190.72000122070312, 190.52999877929688, 190.22999572753906, 190.14999389648438, 190.13999938964844, 190.05990600585938, 190.0, 189.7899932861328, 189.69000244140625, 189.63999938964844, 189.50999450683594, 189.5045928955078, 189.33999633789062, 189.27000427246094, 189.25999450683594, 189.19500732421875, 189.1199951171875, 189.02090454101562, 188.97999572753906, 188.89999389648438, 188.8300018310547, 188.77000427246094, 188.47999572753906, 188.4600067138672, 188.375, 188.36000061035156, 188.33999633789062, 188.2993927001953, 188.2700958251953, 188.2550048828125, 188.22079467773438, 188.19000244140625, 188.16000366210938, 188.16000366210938, 188.05999755859375, 187.9700927734375, 187.9600067138672, 187.9199981689453, 187.75999450683594, 187.64999389648438, 187.5070037841797, 187.4199981689453, 187.35000610351562, 187.30999755859375, 187.27999877929688, 187.19000244140625, 187.164306640625, 187.15150451660156, 187.05999755859375, 186.99000549316406, 186.6199951171875, 186.1999969482422, 186.11000061035156, 186.02999877929688, 185.8800048828125, 185.82000732421875, 185.63999938964844, 185.53500366210938, 185.4550018310547, 185.389892578125, 185.23599243164062], 'High': [190.8300018310547, 190.72000122070312, 190.5500030517578, 190.44000244140625, 190.36000061035156, 190.3300018310547, 190.30999755859375, 190.27000427246094, 190.25999450683594, 189.7899932861328, 189.7698974609375, 189.72999572753906, 189.6999969482422, 189.5399932861328, 189.4600067138672, 189.41000366210938, 189.3699951171875, 189.2050018310547, 189.13999938964844, 189.0, 188.97999572753906, 188.86000061035156, 188.85000610351562, 188.8300018310547, 188.5500030517578, 188.5, 188.5, 188.4499969482422, 188.39639282226562, 188.375, 188.3000030517578, 188.3000030517578, 188.2993927001953, 188.29800415039062, 188.24000549316406, 188.2100067138672, 188.1999969482422, 188.1898956298828, 188.0500030517578, 187.99000549316406, 187.7899932861328, 187.67999267578125, 187.55419921875, 187.5, 187.49000549316406, 187.47999572753906, 187.35549926757812, 187.35000610351562, 187.25, 187.22999572753906, 187.1699981689453, 186.64999389648438, 186.27000427246094, 186.2100067138672, 186.125, 185.89990234375, 185.8699951171875, 185.68829345703125, 185.6300048828125, 185.60000610351562, 185.41000366210938], 'Low': [190.72000122070312, 190.52999877929688, 190.1199951171875, 189.97999572753906, 189.92999267578125, 189.75999450683594, 189.72999572753906, 189.58999633789062, 189.5500030517578, 189.38999938964844, 189.30999755859375, 189.30999755859375, 189.2050018310547, 189.13999938964844, 189.1300048828125, 189.01109313964844, 188.9600067138672, 188.83999633789062, 188.8000030517578, 188.77000427246094, 188.57949829101562, 188.44000244140625, 188.43060302734375, 188.41000366210938, 188.27999877929688, 188.19000244140625, 188.16000366210938, 188.16000366210938, 188.13999938964844, 188.07150268554688, 188.05999755859375, 188.0399932861328, 187.97000122070312, 187.91000366210938, 187.89999389648438, 187.85000610351562, 187.77000427246094, 187.7666015625, 187.6999969482422, 187.49000549316406, 187.36000061035156, 187.3300018310547, 187.22000122070312, 187.15150451660156, 187.1300048828125, 187.1199951171875, 187.05999755859375, 187.02000427246094, 186.89999389648438, 186.74000549316406, 186.5399932861328, 185.8800048828125, 185.80499267578125, 185.6300048828125, 185.5500030517578, 185.54750061035156, 185.38999938964844, 185.30999755859375, 185.3000030517578, 185.1999969482422, 185.11000061035156], 'Close': [190.72000122070312, 190.61050415039062, 190.52499389648438, 190.25990295410156, 190.17469787597656, 190.13999938964844, 190.11000061035156, 189.97999572753906, 189.7779998779297, 189.64999389648438, 189.6199951171875, 189.50999450683594, 189.49000549316406, 189.30999755859375, 189.25999450683594, 189.24000549316406, 189.19000244140625, 189.1446075439453, 189.03269958496094, 188.9600067138672, 188.8751983642578, 188.85000610351562, 188.75, 188.47999572753906, 188.4600067138672, 188.375, 188.3699951171875, 188.30999755859375, 188.2899932861328, 188.28500366210938, 188.25, 188.24000549316406, 188.22000122070312, 188.16000366210938, 188.14999389648438, 188.0749969482422, 187.97999572753906, 187.97000122070312, 187.91000366210938, 187.77999877929688, 187.63699340820312, 187.5, 187.42999267578125, 187.35000610351562, 187.30999755859375, 187.27000427246094, 187.1623992919922, 187.15379333496094, 187.14999389648438, 187.07000732421875, 187.01499938964844, 186.58999633789062, 186.20700073242188, 186.02999877929688, 185.86489868164062, 185.85000610351562, 185.63999938964844, 185.52000427246094, 185.4799041748047, 185.3699951171875, 185.24000549316406], 'Adj Close': [190.72000122070312, 190.61050415039062, 190.52499389648438, 190.25990295410156, 190.17469787597656, 190.13999938964844, 190.11000061035156, 189.97999572753906, 189.7779998779297, 189.64999389648438, 189.6199951171875, 189.50999450683594, 189.49000549316406, 189.30999755859375, 189.25999450683594, 189.24000549316406, 189.19000244140625, 189.1446075439453, 189.03269958496094, 188.9600067138672, 188.8751983642578, 188.85000610351562, 188.75, 188.47999572753906, 188.4600067138672, 188.375, 188.3699951171875, 188.30999755859375, 188.2899932861328, 188.28500366210938, 188.25, 188.24000549316406, 188.22000122070312, 188.16000366210938, 188.14999389648438, 188.0749969482422, 187.97999572753906, 187.97000122070312, 187.91000366210938, 187.77999877929688, 187.63699340820312, 187.5, 187.42999267578125, 187.35000610351562, 187.30999755859375, 187.27000427246094, 187.1623992919922, 187.15379333496094, 187.14999389648438, 187.07000732421875, 187.01499938964844, 186.58999633789062, 186.20700073242188, 186.02999877929688, 185.86489868164062, 185.85000610351562, 185.63999938964844, 185.52000427246094, 185.4799041748047, 185.3699951171875, 185.24000549316406], 'Volume': [1062884, 771947, 648585, 558501, 551881, 548995, 547400, 454186, 446270, 385840, 368181, 367927, 359384, 350527, 340830, 334830, 331323, 326692, 320276, 319825, 319330, 316144, 311821, 301002, 295232, 284811, 283422, 279097, 258702, 255949, 243510, 242330, 240077, 239670, 238233, 236477, 236448, 231915, 229654, 228022, 221868, 221630, 218321, 217461, 215185, 213472, 213388, 208108, 199356, 192022, 191770, 185550, 174532, 155772, 150802, 148148, 147541, 144515, 133846, 0, 0]} msk_cluster_arn = os.environ['MSK_CLUSTER_ARN'] topic_name = os.environ['TOPIC_NAME'] msk_cluster_secret_arn = os.environ['MSK_CLUSTER_SECRET_ARN'] bootstrap_servers = get_bootstrap_servers(msk_cluster_arn) print(bootstrap_servers) username, password = get_secret_credentials(msk_cluster_secret_arn) producer = KafkaProducer(bootstrap_servers=bootstrap_servers, security_protocol='SASL_SSL',sasl_mechanism = 'SCRAM-SHA-512', sasl_plain_username=username, sasl_plain_password=password) kafka_data = {} yf_data = yf.download(topic_name, period='1h', interval='1m') for (columnName, columnData) in yf_data.items(): kafka_data[columnName] = sorted(columnData.values.tolist(), reverse=True) print("Length", len(kafka_data['Open'])) if len(kafka_data['Open']) <= 0: kafka_data = {} kafka_data = test_data for i in range(0,len(kafka_data['Open'])): stream_data = {} stream_data['Open'] = kafka_data['Open'][i] stream_data['High'] = kafka_data['High'][i] stream_data['Low'] = kafka_data['Low'][i] stream_data['Close'] = kafka_data['Close'][i] stream_data['Adj Close'] = kafka_data['Adj Close'][i] stream_data['Volume'] = kafka_data['Volume'][i] producer.send(topic_name, json.dumps(stream_data).encode('utf-8')) print('topic sent') sleep(5) def get_bootstrap_servers(cluster_arn): response = client.get_bootstrap_brokers( ClusterArn=cluster_arn ) bootstarp_servers = response['BootstrapBrokerStringSaslScram'].split(',') return bootstarp_servers def get_secret_credentials(msk_cluster_secret_arn): response = secret_client.get_secret_value( SecretId=msk_cluster_secret_arn ) secret_string = response['SecretString'] secret_string = json.loads(secret_string) username = secret_string['username'] password = secret_string['password'] return username, password