import dateutil import yaml import threading import logging import asyncio from collections import OrderedDict import common.util import common.config from tools.ExternalObjectReplicator.util.glue_util import clone_glue_catalog from common.aws_service import redshift_execute_query import tools.ExternalObjectReplicator.util.copy_util as copy_util from tools.ExternalObjectReplicator.util.copy_util import ( clone_objects_to_s3, get_s3_folder_size, check_file_existence, ) from common.log import init_logging, log_version logger = logging.getLogger("ExternalObjectReplicatorLogger") g_disable_progress_bar = None global_lock = threading.Lock() g_bar_format = "{desc}: {percentage:3.0f}%|{bar}" def execute_svl_query(cluster_object, end_time, file_config, redshift_user, start_time): with open( "tools/ExternalObjectReplicator/sql/external_table_query.sql", "r" ) as svv_external_table: external_table_query = svv_external_table.read().format( start=start_time, end=end_time, db=cluster_object["database"] ) logger.info("Scanning system tables to find Glue databases and tables") external_table_response = redshift_execute_query( cluster_object["id"], redshift_user, cluster_object["database"], file_config["region"], external_table_query, ) spectrum_source_location = [] spectrum_obj_not_found = [] # Query Spectrum files logger.info("Scanning system tables to find Spectrum files queried by source cluster") with open("tools/ExternalObjectReplicator/sql/svl_s3_list.sql", "r") as svl_s3_list: SVL_S3LIST_query = svl_s3_list.read().format( start=start_time, end=end_time, db=cluster_object["database"] ) SVL_S3LIST_result = redshift_execute_query( cluster_object["id"], redshift_user, cluster_object["database"], file_config["region"], SVL_S3LIST_query, ) total_SVL_S3_List_scan = [] for record in SVL_S3LIST_result["Records"]: total_SVL_S3_List_scan.append( [{"stringValue": f"{record[0]['stringValue']}{'/'}{record[1]['stringValue']}"}] ) if SVL_S3LIST_result["TotalNumRows"] > 0: logger.info( f"{len(SVL_S3LIST_result['Records'])} files detected across Spectrum queries from the SVL_S3LIST system " f"table between {start_time} and {end_time}" ) spectrum_source_location, spectrum_obj_not_found = asyncio.run( async_check_file_existence(SVL_S3LIST_result, "spectrumfiles") ) logger.info( f"Number of Spectrum files that can be replicated: " f"{(len(spectrum_source_location))} files" ) logger.info( f"Total size of Spectrum files that can be replicated: " f"{get_s3_folder_size(copy_file_list=spectrum_source_location)}" ) else: logger.info("No Spectrum files found.") return ( SVL_S3LIST_result, spectrum_source_location, external_table_response, spectrum_obj_not_found, ) async def async_check_file_existence(query_response, obj_type): files_found, files_not_found = await check_file_existence(query_response, obj_type) return files_found, files_not_found def execute_stl_load_query(cluster_object, end_time, file_config, redshift_user, start_time): # Query COPY objects copy_objects_not_found = [] copy_source_location = [] with open("tools/ExternalObjectReplicator/sql/stl_load_query.sql", "r") as stl_load: STL_LOAD_query = stl_load.read().format( start=start_time, end=end_time, db=cluster_object["database"] ) STL_LOAD_response = redshift_execute_query( cluster_object["id"], redshift_user, cluster_object["database"], file_config["region"], STL_LOAD_query, ) logger.info("Scanning system tables to find COPY files") logger.debug(f"Executing SQL Query to find COPY files") if STL_LOAD_response["TotalNumRows"] > 0: logger.info( f"{len(STL_LOAD_response['Records'])} files detected across COPY queries from the STL_LOAD_COMMIT system " f"table between {start_time} and {end_time}" ) copy_source_location, copy_objects_not_found = asyncio.run( async_check_file_existence(STL_LOAD_response, "copyfiles") ) logger.info( f"Percentage of COPY files that can be replicated: " f"{((len(copy_source_location)) / len(STL_LOAD_response['Records'])) * 100}%" ) logger.info( f"Total size of COPY files that can be replicated: " f"{get_s3_folder_size(copy_file_list=copy_source_location)}" ) else: logger.info("No COPY files found.") return STL_LOAD_response, copy_objects_not_found, copy_source_location def main(): # Parse config file file_config = common.config.get_config_file_from_args() # Setup Logging level = logging.getLevelName(file_config.get("log_level", "INFO").upper()) init_logging( "external_replicator.log", dir="tools/ExternalObjectReplicator/logs", level=level, preamble=yaml.dump(file_config), backup_count=file_config.get("backup_count", 2), script_type="external object replicator", logger_name="ExternalObjectReplicatorLogger", ) log_version() cluster_object = common.util.cluster_dict(endpoint=file_config["source_cluster_endpoint"]) start_time = dateutil.parser.parse(file_config["start_time"]).astimezone(dateutil.tz.tzutc()) end_time = dateutil.parser.parse(file_config["end_time"]).astimezone(dateutil.tz.tzutc()) redshift_user = file_config["redshift_user"] ( STL_LOAD_response, copy_objects_not_found, copy_source_location, ) = execute_stl_load_query(cluster_object, end_time, file_config, redshift_user, start_time) ( SVL_S3LIST_result, spectrum_source_location, external_table_response, spectrum_obj_not_found, ) = execute_svl_query(cluster_object, end_time, file_config, redshift_user, start_time) options = ["1. Yes - Proceed with cloning", "2. No - Exit"] print("Would you like to proceed with cloning?") print(options[0]) print(options[1]) choice = input("Enter your choice: ") if int(choice) == 1: logger.info("Cloning the copy objects") if STL_LOAD_response["TotalNumRows"] > 0: logger.info(f"== Begin to clone COPY files to {file_config['target_s3_location']} ==") copy_util.clone_objects_to_s3( file_config["target_s3_location"], obj_type="copyfiles", source_location=copy_source_location, objects_not_found=copy_objects_not_found, ) if SVL_S3LIST_result["TotalNumRows"] > 0: logger.info( f"== Begin to clone Spectrum files to {file_config['target_s3_location']} ==" ) logger.info("== Begin to clone Glue databases and tables ==") clone_glue_catalog( external_table_response["Records"], file_config["target_s3_location"], file_config["region"], ) copy_util.clone_objects_to_s3( file_config["target_s3_location"], objects_not_found=spectrum_obj_not_found, source_location=spectrum_source_location, obj_type="spectrumfiles", ) elif SVL_S3LIST_result["TotalNumRows"] == 0 and STL_LOAD_response["TotalNumRows"] == 0: logger.info("No object found to be replicated") else: logger.info("Customer decided not to proceed with cloning") if __name__ == "__main__": main()