import json import boto3 import psycopg2 import time import pandas from sqlalchemy import create_engine from sqlalchemy import text from concurrent.futures import ThreadPoolExecutor from concurrent.futures import as_completed from urllib.parse import quote_plus as urlquote import urllib import re import os SQL_SCRIPT_S3_PATH=os.environ['SQL_SCRIPT_S3_PATH'] REDSHIFT_CLUSTER_ENDPOINT=os.environ['REDSHIFT_CLUSTER_ENDPOINT'] REDSHIFT_IAM_ROLE=os.environ['REDSHIFT_IAM_ROLE'] BUCKET_NAME=os.environ['SQL_SCRIPT_S3_PATH'] REDSHIFT_USER_NAME=os.environ['REDSHIFT_USER_NAME'] NUMBER_OF_PARALLEL_SESSIONS_LIST=os.environ['NUMBER_OF_PARALLEL_SESSIONS_LIST'] DISABLE_RESULT_CACHE=os.environ['DISABLE_RESULT_CACHE'] DEFAULT_OUTPUT_LIMIT=os.environ['DEFAULT_OUTPUT_LIMIT'] MAX_NUMBER_OF_QUERIES=os.environ['MAX_NUMBER_OF_QUERIES'] MAX_PARALLEL_SESSIONS=os.environ['MAX_PARALLEL_SESSIONS'] QUERY_LABEL_PREFIX=os.environ['QUERY_LABEL_PREFIX'] def connect_to_redshift(host,username): client = boto3.client('redshift') cluster_creds = client.get_cluster_credentials(DbUser=username, DbName=REDSHIFT_CLUSTER_ENDPOINT.split('/')[1], ClusterIdentifier=REDSHIFT_CLUSTER_ENDPOINT.split('.')[0]) connection_string='postgresql://'+ urlquote(cluster_creds['DbUser']) + ':'+ urlquote(cluster_creds['DbPassword']) + '@'+ REDSHIFT_CLUSTER_ENDPOINT return create_engine(connection_string,pool_size=0, max_overflow=-1) def get_json_config_from_s3(script_s3_path): bucket, key = script_s3_path.replace("s3://", "").split("/", 1) obj = boto3.client('s3').get_object(Bucket=bucket, Key=key) return json.loads(obj['Body'].read().decode('utf-8')) def get_sql_scripts_from_s3(): bucket, key = SQL_SCRIPT_S3_PATH.replace("s3://", "").split("/", 1) obj = boto3.client('s3').get_object(Bucket=bucket, Key=key) script = obj['Body'].read().decode('utf-8') script = script.format(redshift_iam_role=REDSHIFT_IAM_ROLE, bucket_name=BUCKET_NAME) split_scripts = script.split(';')[:-1] if len(split_scripts)>int(MAX_NUMBER_OF_QUERIES): split_scripts=split_scripts[0:int(MAX_NUMBER_OF_QUERIES)] return split_scripts def get_sql(engine, number_of_parallel_sessions): sql_script = "" pattern = re.compile(r'limit[\s|\t|\n]+[\d]+[\s]*$', re.IGNORECASE) for query in get_sql_scripts_from_s3(): if not re.search(pattern, query): query += " limit " + DEFAULT_OUTPUT_LIMIT sql_script+=query + ";\n" if DISABLE_RESULT_CACHE=='true': sql_script = "set enable_result_cache_for_session to false;\n" + sql_script sql_script = "set query_group to '" + QUERY_LABEL_PREFIX + str(number_of_parallel_sessions) + "';\n" + sql_script df = pandas.read_sql(text(sql_script), engine) return df def run_concurrency_test(number_of_parallel_sessions): engine=connect_to_redshift(REDSHIFT_CLUSTER_ENDPOINT,REDSHIFT_USER_NAME) start_time = time.time() try: with ThreadPoolExecutor(max_workers=number_of_parallel_sessions) as executor: futures = [] for _ in range(number_of_parallel_sessions): futures.append(executor.submit( get_sql, engine, number_of_parallel_sessions)) for future in as_completed(futures): rs = future.result() except Exception as e: raise e elapsed_time_in_secs = (time.time() - start_time) print("--- %s seconds ---" % elapsed_time_in_secs) return elapsed_time_in_secs print(f'script:{SQL_SCRIPT_S3_PATH}, cluster:{REDSHIFT_CLUSTER_ENDPOINT},role:{REDSHIFT_IAM_ROLE},bucket:{BUCKET_NAME},user:{REDSHIFT_USER_NAME},sessions:{NUMBER_OF_PARALLEL_SESSIONS_LIST}') for sessions in NUMBER_OF_PARALLEL_SESSIONS_LIST.split(','): number_of_parallel_sessions=int(sessions) if number_of_parallel_sessions <= int(MAX_PARALLEL_SESSIONS): print(f'running {number_of_parallel_sessions} parallel threads ..') run_concurrency_test(number_of_parallel_sessions) else: print(f'parallel sessions {number_of_parallel_sessions} exceeds maximum allowed {MAX_PARALLEL_SESSIONS} ..')