import boto3 import botocore import sys import time client = boto3.client('redshift-data') def execute_sql(sql_string,sql_name): cluster_id='sfdev-appflow-redshift-cluster' db_name='sfdev-appflow' secret_arn='dev/sfdev-appflow/awsuser' response = client.execute_statement(ClusterIdentifier=cluster_id,Database=db_name,SecretArn=secret_arn,Sql=sql_string,StatementName=sql_name) print('Running {} : {}'.format(sql_name,sql_string)) print(response) sresponse = client.describe_statement(Id=response['Id']) print(sresponse) sresponse_status = sresponse['Status'] while sresponse_status: if sresponse_status == 'FINISHED' or sresponse_status == 'ABORTED' or sresponse_status == 'FAILED': print('Run {} for {} : {}'.format(sresponse_status,sql_name,sql_string)) break else: sresponse = client.describe_statement(Id=response['Id']) print(sresponse) sresponse_status = sresponse['Status'] time.sleep(5) continue return sresponse_status def lambda_handler(event, context): insert_stage_sql='insert into stg_sfdev_appflow.account(select id, isdeleted, name, type, billingstreet, billingcity, billingstate, cast(billingpostalcode as integer), billingcountry, phone, fax, accountnumber, website, industry, tickersymbol, description, rating, sysdate as createddate, sysdate as lastmodifieddate from ext_sfdev_appflow.account);' start_trans_sql='begin transaction;' upd_tgt_sql='update tgt_sfdev_appflow.account set isdeleted = stga.isdeleted, name = stga.name, type = stga.type, billingstreet = stga.billingstreet, billingcity = stga.billingcity, billingstate = stga.billingstate, billingpostalcode = stga.billingpostalcode, billingcountry = stga.billingcountry, phone = stga.phone, fax = stga.fax, accountnumber = stga.accountnumber, website = stga.website, industry = stga.industry, tickersymbol = stga.tickersymbol, description = stga.description, rating = stga.rating, lastmodifieddate = stga.lastmodifieddate from stg_sfdev_appflow.account stga where tgt_sfdev_appflow.account.id = stga.id;' del_stg_sql='delete from stg_sfdev_appflow.account using tgt_sfdev_appflow.account tgta where tgta.id = stg_sfdev_appflow.account.id;' ins_tgt_sql='insert into tgt_sfdev_appflow.account select * from stg_sfdev_appflow.account;' end_trans_sql='end transaction;' trunc_stg_sql='truncate table stg_sfdev_appflow.account;' # Executing the sqls if execute_sql(insert_stage_sql,'Insert to stage') != 'FINISHED': sys.exit(1) if execute_sql(start_trans_sql,'Start transaction') != 'FINISHED': sys.exit(2) if execute_sql(upd_tgt_sql,'Update target') != 'FINISHED': sys.exit(3) if execute_sql(del_stg_sql,'Delete stage') != 'FINISHED': sys.exit(4) if execute_sql(ins_tgt_sql,'Insert target') != 'FINISHED': sys.exit(5) if execute_sql(end_trans_sql,'End transaction') != 'FINISHED': sys.exit(6) if execute_sql(trunc_stg_sql,'Truncate stage') != 'FINISHED': sys.exit(7)