import boto3 import time import traceback import botocore.exceptions as be import json import re import os import dateutil.parser def handler(event, context): print(event) action = event["Input"].get("action") current_account_id = context.invoked_function_arn.split(":")[4] user_config = get_json_config_from_s3(os.environ["USER_CONFIG_JSON_S3_PATH"]) system_config = get_json_config_from_s3(os.environ["SYSTEM_CONFIG_JSON_S3_PATH"]) cluster_identifier_prefix = os.environ["CLUSTER_IDENTIFIER_PREFIX"] what_if_timestamp = event["Input"].get("what_if_timestamp") cluster_identifier = event["Input"].get("cluster_identifier") sql_id = event["Input"].get("sql_id") job_id = event["Input"].get("job_id") redshift_cluster_configuration = event["Input"].get( "redshift_cluster_configuration" ) redshift_cluster_index = event["Input"].get("redshift_cluster_index") workgroup = event["Input"].get("cluster_identifier") namespace = event["Input"].get("namespace_identifier") inresourceArn = event["Input"].get("namespace_arn") endpoint_type = event["Input"].get("endpoint_type") cluster_config = event["Input"].get("cluster_config") try: client = boto3.client("redshift") serverless_client = boto3.client("redshift-serverless") if ( user_config.get("DATABASE_NAME") == "N/A" or user_config.get("DATABASE_NAME") is None ): database_name = system_config.get("DATABASE_NAME") print("Database name from system_config") else: database_name = user_config.get("DATABASE_NAME") print("Database name from user_config") print("Database name {}".format(database_name)) if action == "initiate": what_if_timestamp = time.strftime( "%Y-%m-%d-%H-%M-%S", time.localtime(time.time()) ) res = {"status": what_if_timestamp} elif action == "validate_user_config": res = { "status": validate_user_config(user_config, client, current_account_id) } elif action == "run_extract": res = { "job_id": run_extract( what_if_timestamp=what_if_timestamp, simple_replay_log_location=user_config.get( "SIMPLE_REPLAY_LOG_LOCATION" ), simple_replay_extract_start_time=user_config.get( "SIMPLE_REPLAY_EXTRACT_START_TIME" ), simple_replay_extract_end_time=user_config.get( "SIMPLE_REPLAY_EXTRACT_END_TIME" ), simple_replay_extract_overwrite_s3_path=user_config.get( "SIMPLE_REPLAY_EXTRACT_OVERWRITE_S3_PATH" ), bucket_name=system_config.get("S3_BUCKET_NAME"), redshift_user_name=system_config.get("MASTER_USER_NAME"), extract_prefix=system_config.get("EXTRACT_PREFIX"), script_prefix=system_config.get("SCRIPT_PREFIX"), extract_bootstrap_script=system_config.get( "EXTRACT_BOOTSTRAP_SCRIPT" ), job_definition=system_config.get("JOB_DEFINITION"), job_queue=system_config.get("JOB_QUEUE"), ) } elif action == "batch_job_status": res = {"status": batch_job_status(job_id=job_id)} elif action == "get_redshift_configurations": res = {"status": user_config.get("CONFIGURATIONS")} elif action == "get_endpoint_type": cluster_index = int(event["Input"].get("index")) res = { "status": user_config.get("CONFIGURATIONS")[cluster_index][ "TYPE" ].upper() } elif action == "get_cluster_identifier": res = { "status": get_cluster_identifier( client, user_config, redshift_cluster_configuration, cluster_identifier_prefix, ) } elif action == "get_serverless_identifier": res = { "status": get_serverless_identifier( serverless_client, user_config, redshift_cluster_configuration, cluster_identifier_prefix, ) } elif action == "cluster_status": res = {"status": cluster_status(client, cluster_identifier)} elif action == "serverless_status": res = {"status": serverless_status(serverless_client, workgroup, namespace)} elif action == "create_parameter_group": res = {"status": create_parameter_group(client, cluster_identifier)} elif action == "update_parameter_group": if ( user_config.get("PARAMETER_GROUP_CONFIG_S3_PATH") is None or user_config.get("PARAMETER_GROUP_CONFIG_S3_PATH") == "N/A" ): parameter_group = system_config.get("PARAMETER_GROUP_CONFIG") else: parameter_group = user_config.get("PARAMETER_GROUP_CONFIG_S3_PATH") res = { "status": update_parameter_group( client, cluster_identifier, parameter_group ) } elif action == "create_cluster": maintenance_track = ( "N/A" if redshift_cluster_configuration.get("MAINTENANCE_TRACK") is None else redshift_cluster_configuration.get("MAINTENANCE_TRACK") ) res = { "status": create_cluster( client, cluster_identifier, user_config.get("SNAPSHOT_ID"), system_config.get("REDSHIFT_IAM_ROLE"), cluster_identifier, system_config.get("SUBNET_GROUP"), system_config.get("SECURITY_GROUP_ID"), user_config.get("SNAPSHOT_ACCOUNT_ID"), redshift_cluster_configuration.get("NODE_TYPE"), redshift_cluster_configuration.get("NUMBER_OF_NODES"), master_user_name=system_config.get("MASTER_USER_NAME"), database_name=database_name, secrets_manager_arn=system_config.get("SECRETS_MANAGER_ARN"), port=int(system_config.get("PORT")), publicly_accessible=( system_config.get("PUBLICLY_ACCESSIBLE") == "true" ), maintenance_track=maintenance_track, ) } elif action == "create_serverless_namespace": res = { "status": create_serverless_namespace( serverless_client, snapshot_id=user_config.get("SNAPSHOT_ID"), redshift_iam_role=system_config.get("REDSHIFT_IAM_ROLE"), snapshot_account_id=user_config.get("SNAPSHOT_ACCOUNT_ID"), master_user_name=system_config.get("MASTER_USER_NAME"), database_name=database_name, secrets_manager_arn=system_config.get("SECRETS_MANAGER_ARN"), namespace_name=namespace, workgroup_name=workgroup, region=system_config.get("REGION"), ) } elif action == "restore_serverless_snapshot": res = { "status": restore_serverless_snapshot( serverless_client, snapshot_id=user_config.get("SNAPSHOT_ID"), redshift_iam_role=system_config.get("REDSHIFT_IAM_ROLE"), snapshot_account_id=user_config.get("SNAPSHOT_ACCOUNT_ID"), master_user_name=system_config.get("MASTER_USER_NAME"), database_name=database_name, secrets_manager_arn=system_config.get("SECRETS_MANAGER_ARN"), namespace_name=namespace, workgroup_name=workgroup, region=system_config.get("AWS_REGION"), ) } elif action == "create_serverless_workgroup": res = { "status": create_serverless_workgroup( serverless_client, subnet_group=system_config.get("WORKGROUP_SUBNET"), security_group_id=system_config.get("SECURITY_GROUP_ID"), publicly_accessible=( system_config.get("PUBLICLY_ACCESSIBLE") == "true" ), base_capacity=redshift_cluster_configuration.get("BASE_RPU"), namespace_name=namespace, workgroup_name=workgroup, ) } elif action == "classic_resize_cluster": res = { "status": classic_resize_cluster( client, cluster_identifier, redshift_cluster_configuration.get("NODE_TYPE"), redshift_cluster_configuration.get("NUMBER_OF_NODES"), ) } elif action == "resume_cluster": client.resume_cluster(ClusterIdentifier=cluster_identifier) res = {"status": "initiated"} # elif action == "pause_cluster": # res = { # 'status': pause_cluster(client=client, # cluster_identifier=cluster_identifier, # redshift_cluster_index=redshift_cluster_index, # auto_pause=user_config.get('AUTO_PAUSE'), # endpoint_type=endpoint_type)} elif action == "pause_cluster": res = { "status": pause_cluster( client=client, auto_pause=user_config.get("AUTO_PAUSE"), cluster_config=cluster_config, ) } elif action == "update_wlm_config": res = { "status": update_wlm_config( client, cluster_identifier, redshift_cluster_configuration.get("WLM_CONFIG_S3_PATH"), ) } ## Added to check for clusters in pending reboot after wlm change ## elif action == "check_pending_reboot_status": res = {"status": check_pending_reboot_status(client, cluster_identifier)} elif action == "run_ddl_and_copy_script": if endpoint_type.upper() == "SERVERLESS": workgroup_name = cluster_identifier["workgroup"] res = { "sql_id": run_sql_script_from_s3( script_s3_path=user_config.get("DDL_AND_COPY_SCRIPT_S3_PATH"), action=action, cluster_identifier="N/A", redshift_iam_role=system_config.get("REDSHIFT_IAM_ROLE"), bucket_name=system_config.get("S3_BUCKET_NAME"), db=database_name, user=system_config.get("MASTER_USER_NAME"), workgroup_name=workgroup_name, ) } elif endpoint_type.upper() == "PROVISIONED": res = { "sql_id": run_sql_script_from_s3( script_s3_path=user_config.get("DDL_AND_COPY_SCRIPT_S3_PATH"), action=action, cluster_identifier=cluster_identifier, redshift_iam_role=system_config.get("REDSHIFT_IAM_ROLE"), bucket_name=system_config.get("S3_BUCKET_NAME"), db=database_name, user=system_config.get("MASTER_USER_NAME"), ) } elif action == "run_redshift_performance_test": res = { "job_id": run_redshift_performance_test( client=client, cluster_identifier=cluster_identifier, bucket_name=system_config.get("S3_BUCKET_NAME"), performance_test_bootstrap_script=system_config.get( "PERFORMANCE_TEST_BOOTSTRAP_SCRIPT" ), performance_test_python_script=system_config.get( "PERFORMANCE_TEST_PYTHON_SCRIPT" ), sql_script_s3_path=user_config.get("SQL_SCRIPT_S3_PATH"), number_of_parallel_sessions_list=user_config.get( "NUMBER_OF_PARALLEL_SESSIONS_LIST" ), job_definition=system_config.get("JOB_DEFINITION"), job_queue=system_config.get("JOB_QUEUE"), redshift_iam_role=system_config.get("REDSHIFT_IAM_ROLE"), redshift_user_name=system_config.get("MASTER_USER_NAME"), db=database_name, disable_result_cache=system_config.get("DISABLE_RESULT_CACHE"), default_output_limit=system_config.get("DEFAULT_OUTPUT_LIMIT"), max_number_of_queries=system_config.get("MAX_NUMBER_OF_QUERIES"), max_parallel_sessions=system_config.get("MAX_PARALLEL_SESSIONS"), query_label_prefix=system_config.get("QUERY_LABEL_PREFIX"), ) } elif action == "run_replay": if endpoint_type.upper() == "SERVERLESS": for key, value in workgroup.items(): if key == "workgroup": workgroup = value res = { "job_id": run_replay( client=client, serverless_client=serverless_client, what_if_timestamp=what_if_timestamp, cluster_identifier=cluster_identifier, extract_s3_path="s3://" + system_config.get("S3_BUCKET_NAME") + "/" + system_config.get("EXTRACT_PREFIX") + "/" + what_if_timestamp + "/", simple_replay_overwrite_s3_path=user_config.get( "SIMPLE_REPLAY_OVERWRITE_S3_PATH" ), simple_replay_log_location=user_config.get( "SIMPLE_REPLAY_LOG_LOCATION" ), execute_unload_statements=user_config.get( "SIMPLE_REPLAY_UNLOAD_STATEMENTS" ), bucket_name=system_config.get("S3_BUCKET_NAME"), redshift_user_name=system_config.get("MASTER_USER_NAME"), redshift_iam_role=system_config.get("REDSHIFT_IAM_ROLE"), db=database_name, extract_prefix=system_config.get("EXTRACT_PREFIX"), replay_prefix=system_config.get("REPLAY_PREFIX"), script_prefix=system_config.get("SCRIPT_PREFIX"), snapshot_account_id=user_config.get("SNAPSHOT_ACCOUNT_ID"), replay_bootstrap_script=system_config.get( "REPLAY_BOOTSTRAP_SCRIPT" ), job_definition=system_config.get("JOB_DEFINITION"), job_queue=system_config.get("JOB_QUEUE"), endpoint_type=endpoint_type, workgroupName=workgroup, ) } elif action == "gather_comparison_stats": if endpoint_type.upper() == "SERVERLESS": script_s3_path = system_config.get( "GATHER_COMPARISON_STATS_SERVERLESS_SCRIPT" ) comparison_stats_s3_path = system_config.get("COMPARISON_STATS_S3_PATH") else: script_s3_path = ( system_config.get("GATHER_COMPARISON_STATS_SCRIPT") + "|" + system_config.get("GATHER_COMPARISON_STATS_SERVERLESS_SCRIPT") ) comparison_stats_s3_path = system_config.get("COMPARISON_STATS_S3_PATH") res = { "sql_id": gather_comparison_stats( script_s3_path=script_s3_path, action=action, cluster_identifier=cluster_identifier, redshift_iam_role=system_config.get("REDSHIFT_IAM_ROLE"), bucket_name=system_config.get("S3_BUCKET_NAME"), db=database_name, user=system_config.get("MASTER_USER_NAME"), run_type="sync", what_if_timestamp=what_if_timestamp, comparison_stats_s3_path=comparison_stats_s3_path, external_schema_script=system_config.get("EXTERNAL_SCHEMA_SCRIPT"), query_label_prefix=system_config.get("QUERY_LABEL_PREFIX"), node_type=redshift_cluster_configuration.get("NODE_TYPE"), number_of_nodes=redshift_cluster_configuration.get( "NUMBER_OF_NODES" ), region=system_config.get("REGION"), cluster_config_s3_path=system_config.get("CLUSTER_CONFIG_S3_PATH"), endpoint_type=endpoint_type, redshift_client=client, ) } elif action == "populate_comparison_results": res = { "sql_id": populate_comparison_results( script_s3_path=system_config.get( "POPULATE_COMPARISON_RESULTS_SCRIPT" ), action=action, cluster_identifier=cluster_identifier, redshift_iam_role=system_config.get("REDSHIFT_IAM_ROLE"), bucket_name=system_config.get("S3_BUCKET_NAME"), db=database_name, user=system_config.get("MASTER_USER_NAME"), what_if_timestamp=what_if_timestamp, endpoint_type=endpoint_type, raw_comparison_results_s3_path=system_config.get( "RAW_COMPARISON_RESULTS_S3_PATH" ), comparison_results_s3_path=system_config.get( "COMPARISON_RESULTS_S3_PATH" ), ) } elif action == "sql_status": res = {"status": sql_status(sql_id)} elif action == "run_glue_crawler": res = {"status": run_glue_crawler(system_config.get("CRAWLER_NAME"))} elif action == "set_tag_resource": res = {"status": set_tag_resource(serverless_client, inresourceArn)} elif action == "set_untag": res = {"status": set_untag(serverless_client, inresourceArn)} elif action == "get_list_tags_for_resource": res = { "status": get_list_tags_for_resource(serverless_client, inresourceArn) } elif action == "list_workgroups": res = {"status": get_workgroups(serverless_client, workgroup)} else: raise ValueError("Invalid Task: " + action) except Exception as e: print(e) print(traceback.format_exc()) raise print(res) return res def cw_describe_log_groups(log_group_name=None, region=None): cloudwatch_client = boto3.client("logs", region) if log_group_name: return cloudwatch_client.describe_log_groups(logGroupNamePrefix=log_group_name) else: response_pg_1 = cloudwatch_client.describe_log_groups() logs = response_pg_1 token = response_pg_1.get("nextToken", "") while token != "": response_itr = cloudwatch_client.describe_log_groups(nextToken=token) logs["logGroups"].extend(response_itr["logGroups"]) token = ( response_itr["nextToken"] if "nextToken" in response_itr.keys() else "" ) return logs def validate_user_config(user_config, client, current_account_id): snapshot_identifier = user_config.get("SNAPSHOT_ID") owner_account = user_config.get("SNAPSHOT_ACCOUNT_ID") log_location = user_config.get("SIMPLE_REPLAY_LOG_LOCATION") if snapshot_identifier != "N/A" and owner_account == current_account_id: owner_account = ( boto3.client("sts").get_caller_identity()["Account"] if owner_account == "N/A" else owner_account ) response = client.describe_cluster_snapshots( SnapshotIdentifier=snapshot_identifier, OwnerAccount=owner_account ) cluster_id = response["Snapshots"][0]["ClusterIdentifier"] cluster_desc_resp = client.describe_clusters(ClusterIdentifier=cluster_id) cluster_param_grp = cluster_desc_resp["Clusters"][0]["ClusterParameterGroups"][ 0 ]["ParameterGroupName"] loggingStatusResponse = client.describe_logging_status( ClusterIdentifier=cluster_id ) if loggingStatusResponse.get("LoggingEnabled"): print("Audit logging is enabled") else: raise ValueError("Audit logging is disabled, please enable") get_cluster_params = client.describe_cluster_parameters( ParameterGroupName=cluster_param_grp ) # print(get_cluster_params) for param in get_cluster_params["Parameters"]: if param["ParameterName"] == "enable_user_activity_logging": if param["ParameterValue"] == "false": raise ValueError( "enable_user_activity_logging parameter is set to False, please set this to True" ) else: print("enable_user_activity_logging parameter is set to True") if ( log_location != "N/A" and loggingStatusResponse.get("LogDestinationType") == "s3" ): match = re.search(r"s3://([^/]+)/(.*)", log_location) list_objects_response = boto3.client("s3").list_objects_v2( Bucket=match.group(1), Prefix=match.group(2) ) elif ( log_location != "N/A" and loggingStatusResponse.get("LogDestinationType") == "cloudwatch" ): match = re.search(r"\/aws\/redshift\/cluster\/(.*)", log_location) describe_log_group_response = cw_describe_log_groups( log_group_name=match.group(0), region=user_config.get("REGION"), ) simple_replay_extract_start_time = user_config.get( "SIMPLE_REPLAY_EXTRACT_START_TIME" ) simple_replay_extract_end_time = user_config.get("SIMPLE_REPLAY_EXTRACT_END_TIME") if log_location != "N/A" and ( simple_replay_extract_start_time == "N/A" or simple_replay_extract_end_time == "N/A" ): raise ValueError( "You need to provide SIMPLE_REPLAY_EXTRACT_START_TIME and SIMPLE_REPLAY_EXTRACT_END_TIME value" ) if simple_replay_extract_start_time != "N/A": dateutil.parser.isoparse(simple_replay_extract_start_time) # raise ValueError('SIMPLE_REPLAY_EXTRACT_START_TIME not formatted as ISO 8601 format') if simple_replay_extract_end_time != "N/A": dateutil.parser.isoparse(simple_replay_extract_end_time) # raise ValueError('SIMPLE_REPLAY_EXTRACT_END_TIME not formatted as ISO 8601 format') for config in user_config["CONFIGURATIONS"]: if config["TYPE"] == "Provisioned" and config["NODE_TYPE"] not in [ "ds2.xlarge", "ds2.8xlarge", "dc2.large", "dc2.8xlarge", "ra3.xlplus", "ra3.4xlarge", "ra3.16xlarge", ]: raise ValueError( "Invalid node type - " + config["NODE_TYPE"] + ", choose a valid node type" ) return "Validation successful" def populate_comparison_results( script_s3_path, action, cluster_identifier, redshift_iam_role, bucket_name, db, user, what_if_timestamp, endpoint_type, raw_comparison_results_s3_path, comparison_results_s3_path, ): if endpoint_type.upper() == "SERVERLESS": workgroup = cluster_identifier["workgroup"] return run_sql_script_from_s3( script_s3_path=script_s3_path, action=action, cluster_identifier=cluster_identifier, redshift_iam_role=redshift_iam_role, bucket_name=bucket_name, db=db, user=user, what_if_timestamp=what_if_timestamp, raw_comparison_results_s3_path=raw_comparison_results_s3_path, comparison_results_s3_path=comparison_results_s3_path, workgroup_name=workgroup, ) else: return run_sql_script_from_s3( script_s3_path=script_s3_path, action=action, cluster_identifier=cluster_identifier, redshift_iam_role=redshift_iam_role, bucket_name=bucket_name, db=db, user=user, what_if_timestamp=what_if_timestamp, raw_comparison_results_s3_path=raw_comparison_results_s3_path, comparison_results_s3_path=comparison_results_s3_path, ) def s3_put(script_s3_path, object): bucket, key = script_s3_path.replace("s3://", "").split("/", 1) boto3.client("s3").put_object(Bucket=bucket, Key=key, Body=object) def gather_comparison_stats( script_s3_path, action, cluster_identifier, redshift_iam_role, bucket_name, db, user, run_type, what_if_timestamp, comparison_stats_s3_path, external_schema_script, query_label_prefix, node_type, number_of_nodes, region, cluster_config_s3_path, endpoint_type, redshift_client, ): if endpoint_type.upper() == "SERVERLESS": workgroup = cluster_identifier["workgroup"] comparison_stats_s3_path = comparison_stats_s3_path + "comparison_stats" print( "gather_comparison_stats >> Serverless >> cluster_identifier >>" + workgroup ) print("Serverless >> script_s3_path >>" + script_s3_path) print("Serverless >> comparison_stats_s3_path >> " + comparison_stats_s3_path) run_sql_script_from_s3( script_s3_path=script_s3_path, action=action, cluster_identifier=workgroup, redshift_iam_role=redshift_iam_role, bucket_name=bucket_name, db=db, user=user, run_type=run_type, what_if_timestamp=what_if_timestamp, comparison_stats_s3_path=comparison_stats_s3_path, query_label_prefix=query_label_prefix, workgroup_name=workgroup, ) elif endpoint_type.upper() == "PROVISIONED": try: print( "gather_comparison_stats >> Provisioned >> cluster_identifier >>" + str(cluster_identifier) ) # config=f'cluster_identifier,node_type,number_of_nodes,region\n{cluster_identifier},{node_type},{number_of_nodes},{region}' # s3_put(cluster_config_s3_path+'/'+cluster_identifier+'.csv', config) print("Provisioned >> script_s3_path >>" + script_s3_path) print( "Provisioned >> comparison_stats_s3_path >> " + comparison_stats_s3_path ) print("Checking if sys_query_history exists in the cluster") executeResponse = boto3.client("redshift-data").execute_statement( Database=db, DbUser=user, Sql="select * from sys_query_history limit 1", ClusterIdentifier=cluster_identifier, WithEvent=False, ) query_id = executeResponse["Id"] time.sleep(30) descResponse = boto3.client("redshift-data").describe_statement(Id=query_id) if "Error" not in descResponse and descResponse["ResultRows"] == 1: print("sys_query_history exists") sysQueryExists = True else: print("sys_query_history doesn't exists") sysQueryExists = False print("Creating external schema on provisioned clusters") run_sql( clusterid=cluster_identifier, db=db, user=user, script=external_schema_script, with_event=False, run_type="sync", endpoint_type="PROVISIONED", workgroup_name="N/A", ) except Exception as e: if "already exists" not in str(e): raise if sysQueryExists: script_s3_path_serverless = script_s3_path.split("|")[1] comparison_stats_s3_path = comparison_stats_s3_path + "comparison_stats" return run_sql_script_from_s3( script_s3_path=script_s3_path_serverless, action=action, cluster_identifier=cluster_identifier, redshift_iam_role=redshift_iam_role, bucket_name=bucket_name, db=db, user=user, run_type=run_type, what_if_timestamp=what_if_timestamp, comparison_stats_s3_path=comparison_stats_s3_path, query_label_prefix=query_label_prefix, ) else: return run_sql_script_from_s3( script_s3_path=script_s3_path.split("|")[0], action=action, cluster_identifier=cluster_identifier, redshift_iam_role=redshift_iam_role, bucket_name=bucket_name, db=db, user=user, run_type=run_type, what_if_timestamp=what_if_timestamp, comparison_stats_s3_path=comparison_stats_s3_path + "comparison_stats_provisioned", query_label_prefix=query_label_prefix, ) # def pause_cluster(client, cluster_identifier, redshift_cluster_index, auto_pause, endpoint_type): # if endpoint_type.upper() == 'PROVISIONED': # if auto_pause and redshift_cluster_index > 0: # try: # client.pause_cluster(ClusterIdentifier=cluster_identifier) # except be.ClientError as e: # if e.response['Error']['Code'] == 'InvalidClusterState': # print(e.response['Error']['Code']) # else: # raise # return "initiated" # else: # return "auto_pause config is false" def cluster_status(client, clusterid): try: desc = client.describe_clusters(ClusterIdentifier=clusterid)["Clusters"][0] if isinstance(desc, dict): status = ( desc.get("ClusterStatus") + desc.get("ClusterAvailabilityStatus") + ( desc.get("RestoreStatus").get("Status") if desc.get("RestoreStatus") else "" ) ) else: status = "Unavailable" except be.ClientError as e: msg = e.response["Error"]["Code"] if msg == "ClusterNotFound": status = "nonExistent" else: raise return status def serverless_status(serverless_client, workgroupname, namespacename): try: namespace_status = "" response = serverless_client.get_workgroup(workgroupName=workgroupname) print(response) for key, value in response.items(): if key == "workgroup": response = value if isinstance(response, dict): status = response.get("status") namespace_response = serverless_client.get_namespace( namespaceName=namespacename ) for key, value in namespace_response.items(): if key == "namespace": namespace_status = value.get("namespaceArn") else: status = "Unavailable" except be.ClientError as e: msg = e.response["Error"]["Code"] if msg == "ResourceNotFoundException": status = "nonExistent" namespace_status = "nonExistent" else: status = "workgroupNotFound" namespace_status = "workgroupNotFound" return {"workgroupstatus": status, "namespace_arn": namespace_status} def get_cluster_identifier( client, config, redshift_configurations, cluster_identifier_prefix ): print("provisioned") print(redshift_configurations) if ( redshift_configurations.get("USER_FRIENDLY_NAME_SUFFIX") is None or redshift_configurations.get("USER_FRIENDLY_NAME_SUFFIX") == "N/A" ): if ( redshift_configurations.get("WLM_CONFIG_S3_PATH") is None or redshift_configurations.get("WLM_CONFIG_S3_PATH") == "N/A" ): wlm_name = "" trim_wlm_name = 64 else: # wlm_name = redshift_configurations.get('WLM_CONFIG_S3_PATH').replace("s3://", "").replace("/", "").replace(".json", "") wlm_name = ( redshift_configurations.get("WLM_CONFIG_S3_PATH") .split("/")[-1] .replace(".json", "") ) wlm_name = re.sub("[^A-Za-z0-9]+", "", wlm_name) trim_wlm_name = 64 - len(wlm_name) cluster_suffix = ( redshift_configurations.get("NODE_TYPE") + "-" + redshift_configurations.get("NUMBER_OF_NODES") ) cluster_suffix = cluster_suffix[0:trim_wlm_name] + wlm_name cluster_suffix = cluster_suffix.replace(".", "-") else: cluster_suffix = redshift_configurations.get("USER_FRIENDLY_NAME_SUFFIX") cluster_id = (cluster_identifier_prefix + "-" + cluster_suffix).lower()[0:63] return (cluster_identifier_prefix + "-" + cluster_suffix).lower()[0:63] def update_wlm_config(client, cluster_identifier, wlm_config_s3_path): if wlm_config_s3_path is None or wlm_config_s3_path == "N/A": return "N/A" else: wlm_config = get_json_config_from_s3(wlm_config_s3_path) print( "Changing {} parameter group wlm : {}".format( cluster_identifier, wlm_config ) ) client.modify_cluster_parameter_group( ParameterGroupName=cluster_identifier, Parameters=[ { "ParameterName": "wlm_json_configuration", "ParameterValue": json.dumps(wlm_config), "ApplyType": "dynamic", "IsModifiable": True, }, ], ) return "initiated" ## Added to check for clusters in pending reboot after wlm change ## def check_pending_reboot_status(client, cluster_identifier): try: cluster_desc = client.describe_clusters(ClusterIdentifier=cluster_identifier)[ "Clusters" ][0] desc_paramgroup_status = cluster_desc["ClusterParameterGroups"][0][ "ParameterApplyStatus" ] status = ( cluster_desc.get("ClusterStatus") + cluster_desc.get("ClusterAvailabilityStatus") + desc_paramgroup_status ) if desc_paramgroup_status == "pending-reboot": print( "Cluster {} needs to be rebooted to apply the WLM config changes".format( cluster_identifier ) ) client.reboot_cluster(ClusterIdentifier=cluster_identifier) except Exception as err: print(err) status = "availableAvailablein-sync" return status def get_json_config_from_s3(script_s3_path): bucket, key = script_s3_path.replace("s3://", "").split("/", 1) obj = boto3.client("s3").get_object(Bucket=bucket, Key=key) return json.loads(obj["Body"].read().decode("utf-8")) def create_parameter_group(client, parameter_group_name): try: client.create_cluster_parameter_group( ParameterGroupName=parameter_group_name, ParameterGroupFamily="redshift-1.0", Description="redshift cluster parameter group", ) except be.ClientError as e: if e.response["Error"]["Code"] == "ClusterParameterGroupAlreadyExists": print(e.response["Error"]["Code"]) else: raise return "initiated" def parameter_group_status(client, parameter_group_name): parameter_group = client.describe_cluster_parameters( ParameterGroupName=parameter_group_name ) return parameter_group def update_parameter_group( client, parameter_group_name, parameter_group_config_s3_path ): target_parameter_group = client.describe_cluster_parameters( ParameterGroupName=parameter_group_name )["Parameters"] target_parameters = {} for i in target_parameter_group: target_parameters[i["ParameterName"]] = i["ParameterValue"] source_parameter_group = get_json_config_from_s3(parameter_group_config_s3_path)[ "Parameters" ] modified_parameter_group = [] for i in source_parameter_group: source_parameter_value = i["ParameterValue"].replace(" ", "") target_parameter_value = target_parameters[i["ParameterName"]].replace(" ", "") if source_parameter_value != target_parameter_value: modified_parameter_group.append(i) if modified_parameter_group: client.modify_cluster_parameter_group( ParameterGroupName=parameter_group_name, Parameters=modified_parameter_group ) return "Initiated" def classic_resize_cluster(client, clusterid, node_type, number_of_nodes): client.resize_cluster( ClusterIdentifier=clusterid, NodeType=node_type, NumberOfNodes=int(number_of_nodes), ClusterType="single-node" if int(number_of_nodes) == 1 else "multi-node", Classic=True, ) return "Initiated" def create_cluster( client, cluster_identifier, snapshot_id, redshift_iam_role, parameter_group_name, subnet_group, security_group_id, snapshot_account_id, node_type, number_of_nodes, master_user_name, database_name, port, publicly_accessible, maintenance_track, secrets_manager_arn, ): try: if snapshot_id is None or snapshot_id == "N/A": master_user_secret = json.loads( boto3.client("secretsmanager") .get_secret_value(SecretId=secrets_manager_arn) .get("SecretString") ) master_user_password = master_user_secret.get("password") maintenance_track_val = ( "Current" if maintenance_track == "N/A" else maintenance_track ) print("maintenance_track_val >> " + maintenance_track_val) client.create_cluster( DBName=database_name, ClusterIdentifier=cluster_identifier, ClusterType="single-node" if int(number_of_nodes) == 1 else "multi-node", NodeType=node_type, MasterUsername=master_user_name, MasterUserPassword=master_user_password, VpcSecurityGroupIds=[security_group_id], ClusterSubnetGroupName=subnet_group, ClusterParameterGroupName=parameter_group_name, Port=port, NumberOfNodes=int(number_of_nodes), PubliclyAccessible=publicly_accessible, IamRoles=[redshift_iam_role], MaintenanceTrackName=maintenance_track_val, ) else: if snapshot_account_id is None or snapshot_account_id == "N/A": snapshot_account_id = boto3.client("sts").get_caller_identity()[ "Account" ] if maintenance_track != "N/A": client.restore_from_cluster_snapshot( NumberOfNodes=int(number_of_nodes), NodeType=node_type, ClusterIdentifier=cluster_identifier, SnapshotIdentifier=snapshot_id, OwnerAccount=snapshot_account_id, Port=port, ClusterSubnetGroupName=subnet_group, PubliclyAccessible=publicly_accessible, ClusterParameterGroupName=parameter_group_name, VpcSecurityGroupIds=[security_group_id], IamRoles=[redshift_iam_role], MaintenanceTrackName=maintenance_track, ) else: client.restore_from_cluster_snapshot( NumberOfNodes=int(number_of_nodes), NodeType=node_type, ClusterIdentifier=cluster_identifier, SnapshotIdentifier=snapshot_id, OwnerAccount=snapshot_account_id, Port=port, ClusterSubnetGroupName=subnet_group, PubliclyAccessible=publicly_accessible, ClusterParameterGroupName=parameter_group_name, VpcSecurityGroupIds=[security_group_id], IamRoles=[redshift_iam_role], ) status = "Initiated" except be.ClientError as e: msg = e.response["Error"]["Code"] if msg == "ClusterAlreadyExists": status = msg elif msg == "InvalidParameterValue": source_node_type, source_number_of_nodes = get_source_cluster_config( client, snapshot_id ) client.restore_from_cluster_snapshot( NumberOfNodes=source_number_of_nodes, NodeType=source_node_type, ClusterIdentifier=cluster_identifier, SnapshotIdentifier=snapshot_id, OwnerAccount=snapshot_account_id, Port=port, ClusterSubnetGroupName=subnet_group, PubliclyAccessible=publicly_accessible, ClusterParameterGroupName=parameter_group_name, VpcSecurityGroupIds=[security_group_id], IamRoles=[redshift_iam_role], ) status = "NeedClassicResize" else: raise return status def get_source_cluster_config(client, snapshot_id): resp = client.describe_cluster_snapshots(SnapshotIdentifier=snapshot_id) node_type = resp["Snapshots"][0]["NodeType"] number_of_nodes = resp["Snapshots"][0]["NumberOfNodes"] return (node_type, number_of_nodes) def run_sql_script_from_s3( script_s3_path, action, cluster_identifier, redshift_iam_role, bucket_name, db, user, run_type="async", result_cache="true", with_event=False, what_if_timestamp=None, comparison_stats_s3_path=None, comparison_results_s3_path=None, raw_comparison_results_s3_path=None, query_label_prefix=None, workgroup_name=None, ): if script_s3_path is None or script_s3_path == "N/A": return "N/A" else: bucket, key = script_s3_path.replace("s3://", "").split("/", 1) obj = boto3.client("s3").get_object(Bucket=bucket, Key=key) script = obj["Body"].read().decode("utf-8") # Provisioned script = script.format( redshift_iam_role=redshift_iam_role, bucket_name=bucket_name, cluster_identifier=cluster_identifier, what_if_timestamp=what_if_timestamp, comparison_stats_s3_path=comparison_stats_s3_path, comparison_results_s3_path=comparison_results_s3_path, raw_comparison_results_s3_path=raw_comparison_results_s3_path, query_label_prefix=query_label_prefix, workgroup_name=workgroup_name, ) # Serverless query_group_statement = "set query_group to '" + action + "';\n" result_cache_statement = ( "set enable_result_cache_for_session to " + result_cache + "; \n" ) script = query_group_statement + result_cache_statement + script if workgroup_name is None: sql_id = run_sql( cluster_identifier, db, user, script, with_event, run_type, "PROVISIONED", "N/A", ) else: sql_id = run_sql( workgroup_name, db, user, script, with_event, run_type, "SERVERLESS", workgroup_name, ) return sql_id def run_sql( clusterid, db, user, script, with_event, run_type, endpoint_type, workgroup_name ): print("Script >> " + str(script)) if endpoint_type.upper() == "PROVISIONED": res = boto3.client("redshift-data").execute_statement( Database=db, DbUser=user, Sql=script, ClusterIdentifier=clusterid, WithEvent=with_event, ) elif endpoint_type.upper() == "SERVERLESS": serverless_client = boto3.client("redshift-serverless") response = serverless_client.get_workgroup(workgroupName=workgroup_name) status = response["workgroup"]["status"] while status != "AVAILABLE": print( "gather_comparison_stats >> workgroup {} is not available".format( workgroup_name ) ) time.sleep(300) res = boto3.client("redshift-data").execute_statement( Database=db, Sql=script, WithEvent=with_event, WorkgroupName=workgroup_name ) query_id = res["Id"] done = False while not done: status = sql_status(query_id) if run_type == "async": break elif status == "FINISHED": break return query_id def sql_status(query_id): if query_id == "N/A": return "FINISHED" res = boto3.client("redshift-data").describe_statement(Id=query_id) status = res["Status"] print("sql_status for " + str(query_id) + " :" + status) if status == "FAILED": print(res) raise Exception(res["Error"]) return status.strip('"') def run_redshift_performance_test( client, cluster_identifier, bucket_name, performance_test_bootstrap_script, performance_test_python_script, sql_script_s3_path, number_of_parallel_sessions_list, job_definition, job_queue, redshift_iam_role, redshift_user_name, db, disable_result_cache, default_output_limit, max_number_of_queries, max_parallel_sessions, query_label_prefix, ): if sql_script_s3_path is None or sql_script_s3_path == "N/A": return "N/A" else: desc = client.describe_clusters(ClusterIdentifier=cluster_identifier)[ "Clusters" ][0] cluster_endpoint = ( desc.get("Endpoint").get("Address") + ":" + str(desc.get("Endpoint").get("Port")) + "/" + db ) response = boto3.client("batch").submit_job( jobName="AmazonRedshiftPerformanceTesting", jobQueue=job_queue, jobDefinition=job_definition, containerOverrides={ "command": [ "sh", "-c", "yum install -y awscli; aws s3 cp $BOOTSTRAP_SCRIPT ./bootstrap.sh; sh ./bootstrap.sh", ], "environment": [ { "name": "BOOTSTRAP_SCRIPT", "value": performance_test_bootstrap_script, }, {"name": "BUCKET_NAME", "value": bucket_name}, {"name": "PYTHON_SCRIPT", "value": performance_test_python_script}, {"name": "REDSHIFT_CLUSTER_ENDPOINT", "value": cluster_endpoint}, {"name": "REDSHIFT_IAM_ROLE", "value": redshift_iam_role}, {"name": "REDSHIFT_USER_NAME", "value": redshift_user_name}, {"name": "SQL_SCRIPT_S3_PATH", "value": sql_script_s3_path}, { "name": "NUMBER_OF_PARALLEL_SESSIONS_LIST", "value": number_of_parallel_sessions_list, }, {"name": "DISABLE_RESULT_CACHE", "value": disable_result_cache}, {"name": "DEFAULT_OUTPUT_LIMIT", "value": default_output_limit}, {"name": "MAX_NUMBER_OF_QUERIES", "value": max_number_of_queries}, {"name": "MAX_PARALLEL_SESSIONS", "value": max_parallel_sessions}, {"name": "QUERY_LABEL_PREFIX", "value": query_label_prefix}, ], }, ) return response["jobId"] def get_workload_location(extract_s3_path): bucket, key = extract_s3_path.replace("s3://", "").split("/", 1) response = boto3.client("s3").list_objects_v2(Bucket=bucket, Prefix=key) if response.get("Contents"): key = response.get("Contents")[0].get("Key").split("/")[2] return extract_s3_path + key else: return None def run_extract( what_if_timestamp, simple_replay_log_location, simple_replay_extract_start_time, simple_replay_extract_end_time, simple_replay_extract_overwrite_s3_path, bucket_name, redshift_user_name, extract_prefix, script_prefix, extract_bootstrap_script, job_definition, job_queue, ): if simple_replay_log_location is None or simple_replay_log_location == "N/A": return "N/A" else: if simple_replay_extract_overwrite_s3_path is None: simple_replay_extract_overwrite_s3_path = "N/A" response = boto3.client("batch").submit_job( jobName="AmazonRedshiftExtract", jobQueue=job_queue, jobDefinition=job_definition, containerOverrides={ "command": [ "sh", "-c", "yum install -y awscli; aws s3 cp $BOOTSTRAP_SCRIPT ./bootstrap.sh; sh ./bootstrap.sh", ], "environment": [ {"name": "BOOTSTRAP_SCRIPT", "value": extract_bootstrap_script}, {"name": "BUCKET_NAME", "value": bucket_name}, { "name": "SIMPLE_REPLAY_EXTRACT_OVERWRITE_S3_PATH", "value": simple_replay_extract_overwrite_s3_path, }, { "name": "SIMPLE_REPLAY_LOG_LOCATION", "value": simple_replay_log_location, }, {"name": "REDSHIFT_USER_NAME", "value": redshift_user_name}, {"name": "WHAT_IF_TIMESTAMP", "value": what_if_timestamp}, { "name": "SIMPLE_REPLAY_EXTRACT_START_TIME", "value": simple_replay_extract_start_time, }, { "name": "SIMPLE_REPLAY_EXTRACT_END_TIME", "value": simple_replay_extract_end_time, }, {"name": "EXTRACT_PREFIX", "value": extract_prefix}, {"name": "SCRIPT_PREFIX", "value": script_prefix}, ], }, ) return response["jobId"] def run_replay( client, serverless_client, what_if_timestamp, cluster_identifier, extract_s3_path, simple_replay_log_location, simple_replay_overwrite_s3_path, execute_unload_statements, bucket_name, redshift_user_name, redshift_iam_role, db, extract_prefix, replay_prefix, script_prefix, snapshot_account_id, replay_bootstrap_script, job_definition, job_queue, endpoint_type, workgroupName, ): if simple_replay_log_location is None or simple_replay_log_location == "N/A": return "N/A" else: if simple_replay_overwrite_s3_path is None: simple_replay_overwrite_s3_path = "N/A" if endpoint_type.upper() == "PROVISIONED": desc = client.describe_clusters(ClusterIdentifier=cluster_identifier)[ "Clusters" ][0] cluster_endpoint = ( desc.get("Endpoint").get("Address") + ":" + str(desc.get("Endpoint").get("Port")) + "/" + db ) elif endpoint_type.upper() == "SERVERLESS": response = serverless_client.get_workgroup(workgroupName=workgroupName) cluster_endpoint = ( response.get("workgroup").get("endpoint").get("address") + ":" + str(response.get("workgroup").get("endpoint").get("port")) + "/" + db ) cluster_identifier = workgroupName workload_location = get_workload_location(extract_s3_path) response = boto3.client("batch").submit_job( jobName="AmazonRedshiftReplay", jobQueue=job_queue, jobDefinition=job_definition, containerOverrides={ "command": [ "sh", "-c", "yum install -y awscli; aws s3 cp $BOOTSTRAP_SCRIPT ./bootstrap.sh; sh ./bootstrap.sh", ], "environment": [ {"name": "BOOTSTRAP_SCRIPT", "value": replay_bootstrap_script}, {"name": "WHAT_IF_TIMESTAMP", "value": what_if_timestamp}, {"name": "CLUSTER_IDENTIFIER", "value": cluster_identifier}, {"name": "CLUSTER_ENDPOINT", "value": cluster_endpoint}, {"name": "WORKLOAD_LOCATION", "value": workload_location}, { "name": "SIMPLE_REPLAY_OVERWRITE_S3_PATH", "value": simple_replay_overwrite_s3_path, }, { "name": "SIMPLE_REPLAY_UNLOAD_STATEMENTS", "value": execute_unload_statements, }, { "name": "SIMPLE_REPLAY_LOG_LOCATION", "value": simple_replay_log_location, }, {"name": "BUCKET_NAME", "value": bucket_name}, {"name": "REDSHIFT_USER_NAME", "value": redshift_user_name}, {"name": "REDSHIFT_IAM_ROLE", "value": redshift_iam_role}, {"name": "EXTRACT_PREFIX", "value": extract_prefix}, {"name": "REPLAY_PREFIX", "value": replay_prefix}, {"name": "SCRIPT_PREFIX", "value": script_prefix}, {"name": "SNAPSHOT_ACCOUNT_ID", "value": snapshot_account_id}, {"name": "ENDPOINT_TYPE", "value": endpoint_type.upper()}, ], }, ) return response["jobId"] def batch_job_status(job_id, extract_s3_path=None): if job_id == "N/A": return "FINISHED" else: job_stats = boto3.client("batch").describe_jobs(jobs=[job_id]).get("jobs")[0] if job_stats.get("status") == "FAILED": raise Exception("Error:" + str(job_stats)) elif job_stats.get("status") == "SUCCEEDED": return "FINISHED" else: return job_stats.get("status") def run_glue_crawler(crawler_name): try: response = boto3.client("glue").start_crawler(Name=crawler_name) return "initiated" except be.ClientError as e: raise Exception("run_glue_crawler: " + e.__str__()) except Exception as e: raise Exception("run_glue_crawler: " + e.__str__()) def create_serverless_namespace( serverless_client, snapshot_id, redshift_iam_role, snapshot_account_id, master_user_name, database_name, secrets_manager_arn, namespace_name, workgroup_name, region, ): try: master_user_secret = json.loads( boto3.client("secretsmanager") .get_secret_value(SecretId=secrets_manager_arn) .get("SecretString") ) master_user_password = master_user_secret.get("password") serverless_client.create_namespace( adminUserPassword=master_user_password, adminUsername=master_user_name, dbName=database_name, defaultIamRoleArn=redshift_iam_role, iamRoles=[redshift_iam_role], namespaceName=namespace_name, tags=[{"key": "app_name", "value": "redshift_node_config_compare"}], ) return "Initiated" except be.ClientError as e: msg = e.response["Error"]["Code"] status = msg return status def restore_serverless_snapshot( serverless_client, snapshot_id, redshift_iam_role, snapshot_account_id, master_user_name, database_name, secrets_manager_arn, namespace_name, workgroup_name, region, ): try: if snapshot_id is not None or snapshot_id != "N/A": if snapshot_account_id is None or snapshot_account_id == "N/A": snapshot_account_id = boto3.client("sts").get_caller_identity()[ "Account" ] v_snapshotArn = ( "arn:aws:redshift:" + region + ":" + snapshot_account_id + ":snapshot:*/" + snapshot_id ) print(v_snapshotArn) print(snapshot_account_id) serverless_client.restore_from_snapshot( namespaceName=namespace_name, snapshotArn=v_snapshotArn, workgroupName=workgroup_name, ) return "Initiated" except be.ClientError as e: msg = e.response["Error"]["Code"] status = msg return status def create_serverless_workgroup( serverless_client, subnet_group, security_group_id, publicly_accessible, base_capacity, namespace_name, workgroup_name, ): try: serverless_client.create_workgroup( baseCapacity=int(base_capacity), enhancedVpcRouting=False, namespaceName=namespace_name, publiclyAccessible=False, securityGroupIds=security_group_id.split(","), subnetIds=subnet_group.split(","), workgroupName=workgroup_name, tags=[{"key": "app_name", "value": "redshift_node_config_compare"}], ) status = "Initiated" except be.ClientError as e: msg = e.response["Error"]["Code"] status = msg return status def get_serverless_identifier( serverless_client, config, redshift_configurations, cluster_identifier_prefix ): if ( redshift_configurations.get("USER_FRIENDLY_NAME_SUFFIX") is None or redshift_configurations.get("USER_FRIENDLY_NAME_SUFFIX") == "N/A" ): cluster_suffix = redshift_configurations.get("BASE_RPU") cluster_suffix = cluster_suffix.replace(".", "-") else: cluster_suffix = redshift_configurations.get("USER_FRIENDLY_NAME_SUFFIX") workgroup_name = ( "workgroup-" + (cluster_identifier_prefix + "-" + cluster_suffix).lower()[0:63] ) namespace_name = ( "namespace-" + (cluster_identifier_prefix + "-" + cluster_suffix).lower()[0:63] ) return {"workgroup": workgroup_name, "namespace": namespace_name} def set_tag_resource(serverless_client, inresourceArn): response = serverless_client.tag_resource( resourceArn=inresourceArn, tags=[ {"key": "status", "value": "restored"}, ], ) def get_list_tags_for_resource(serverless_client, inresourceArn): response = serverless_client.list_tags_for_resource(resourceArn=inresourceArn) if not response["tags"]: status = "notRestored" else: for i in response["tags"]: for key, value in i.items(): if value == "restored": status = "restored" else: status = "notRestored" return status def set_untag(serverless_client, inresourceArn): response = serverless_client.untag_resource( resourceArn=inresourceArn, tagKeys=["status"] ) def get_workgroups(serverless_client, workgroup): response = serverless_client.get_workgroup(workgroupName=workgroup) print(response.get("endpoint")) def pause_cluster(client, auto_pause, cluster_config): max_index = max( index for index, item in enumerate(cluster_config) if "redshift_cluster_index" in item ) max_range = max_index + 1 if auto_pause: for i in range(0, max_range): out_cluster_id = cluster_config[i]["cluster_identifier"]["output"]["status"] if "workgroup" not in out_cluster_id: try: print("Pausing cluster {}".format(out_cluster_id)) client.pause_cluster(ClusterIdentifier=out_cluster_id) except be.ClientError as e: if e.response["Error"]["Code"] == "InvalidClusterState": print(e.response["Error"]["Code"]) else: raise return "initiated" else: return "auto_pause config is false, clusters will not be paused"