#!/usr/bin/env python3 import argparse from audioop import mul import boto3 import warnings import configparser import humanfriendly import io import logging import multiprocessing import os import random import string import sys import tarfile import traceback from boto3.s3.transfer import TransferConfig from datetime import datetime from tqdm import tqdm warnings.filterwarnings('ignore', message='Unverified HTTPS request') def str2bool(value): if value.lower() in ('yes', 'true', 't', 'y', '1'): return True elif value.lower() in ('no', 'false', 'f', 'n', '0'): return False else: raise argparse.ArgumentTypeError('Boolean value expected.') quit_flag = 'DONE' current_time = datetime.now().strftime('%Y%m%d_%H%M%S') partition_command = 'gen_list' copy_command = 'upload' log_list = [] # treating arguments # create the top-level parser parser = argparse.ArgumentParser(prog='SnowTransferTool') subparsers = parser.add_subparsers(dest='cmd') # create the parser for the "gen_list" command parser_a = subparsers.add_parser( partition_command, help=partition_command + ' --help') parser_a.add_argument('--config_file', help='path of config file e) /tmp/config. If this argument is not present in command line, --src, --filelist_dir, --partition_size, and --log_dir are required', action='store', default='') parser_a.add_argument('--src', help='source directory e) /data/dir1/', action='store', required='--config_file' not in sys.argv) parser_a.add_argument('--filelist_dir', help='output destination e) /tmp/file_list/', action='store', required='--config_file' not in sys.argv) parser_a.add_argument('--partition_size', help='size limit for each partition e) 1Tb', action='store', required='--config_file' not in sys.argv, default="10Gb") parser_a.add_argument('--device_capacity', help='the storage capacity of the device to which you want to upload e) 80Tb', action='store', required='--config_file' not in sys.argv, default="80Tb") parser_a.add_argument('--log_dir', help='directory that stores log files e)/tmp/JID01', action='store', required='--config_file' not in sys.argv, default='/tmp/snowtransfertool/log') # create the parser for the "upload" command parser_b = subparsers.add_parser(copy_command, help=copy_command + ' --help') parser_b.add_argument('--config_file', help='path of config file e) /tmp/config. If this argument is not present in command line, --src, --bucket_name, --endpoint, and --log_dir are required', action='store', default='') parser_b.add_argument('--src', help='source directory e) /data/dir1/', action='store', required='--config_file' not in sys.argv) parser_b.add_argument('--bucket_name', help='your bucket name e) your-bucket', action='store', required='--config_file' not in sys.argv) parser_b.add_argument('--endpoint', help='snowball endpoint e) http://10.10.10.10:8080', action='store', required='--config_file' not in sys.argv) parser_b.add_argument('--log_dir', help='directory that stores log files e)/tmp/JID01', action='store', required='--config_file' not in sys.argv, default='/tmp/snowtransfertool/log') parser_b.add_argument('--profile_name', help='aws_profile_name e) sbe1', action='store', default='default') parser_b.add_argument('--aws_access_key_id', help='aws access key id', action='store', default='') parser_b.add_argument('--aws_secret_access_key', help='aws secret access key', action='store', default='') parser_b.add_argument( '--prefix_root', help='prefix root e) dir1/', action='store', default='') parser_b.add_argument('--max_process', help='max number of process e) 5', action='store', default=5, type=int) parser_b.add_argument( '--partition_size', help='size limit of batched files e) 1Gb', action='store', default='1Gb') parser_b.add_argument('--max_files', help='max number of file that each tar file contains e) 100000', action='store', default=100000, type=int) parser_b.add_argument('--compression', help='True|False compress file to "gz" format', action='store', default="False", type=str2bool, choices=[True, False]) parser_b.add_argument('--extract_flag', help='True|False; Set the autoextract flag', action='store', default="True", type=str2bool, choices=[True, False]) parser_b.add_argument('--target_file_prefix', help='prefix of the target file we are creating into the snowball', action='store', default='') parser_b.add_argument('--upload_logs', help='True|False; Upload the generated logs to Snowball and S3', action='store', default="True", type=str2bool, choices=[True, False]) parser_b.add_argument('--ignored_path_prefix', help='The path prefix need to be ignored when source is partition file', action='store', default='') # parse argument lists args = parser.parse_args() if (not args.cmd): help_mesg = f'''usage: SnowTransferTool [-h] {{{partition_command},{copy_command}}} ... positional arguments: {{{partition_command},{copy_command}}} {partition_command} {partition_command} --help {copy_command} {copy_command} --help''' print(help_mesg) # sys.exit() # generate random 6 character def gen_rand_char(): char_set = string.ascii_uppercase + string.digits return (''.join(random.sample(char_set*6, 6))) randchar = gen_rand_char() # src and log_dir are required in both commands. # src = args.src # log_dir = args.log_dir if (args.cmd == partition_command): # src and log_dir are required in both commands. src = args.src log_dir = args.log_dir # config for gen_list filelist_dir = args.filelist_dir partition_size = humanfriendly.parse_size(args.partition_size, binary=True) device_capacity = humanfriendly.parse_size( args.device_capacity, binary=True) if (args.cmd == copy_command): # src and log_dir are required in both commands. src = args.src log_dir = args.log_dir # config for upload prefix_root = args.prefix_root # Don't forget to add last slash '/' bucket_name = args.bucket_name profile_name = args.profile_name aws_access_key_id = args.aws_access_key_id aws_secret_access_key = args.aws_secret_access_key endpoint = args.endpoint max_process = args.max_process max_tarfile_size = humanfriendly.parse_size( args.partition_size, binary=True) # 10GiB, 100GiB is max limit of snowball max_files = args.max_files # default for no compression, "gz" to enable compression = 'gz' if args.compression else '' target_file_prefix = args.target_file_prefix extract_flag = args.extract_flag # default True upload_logs = args.upload_logs ignored_path_prefix = args.ignored_path_prefix # set up logger def setup_logger(logger_name, log_file, level=logging.INFO, sHandler=False, format='%(message)s'): l = logging.getLogger(logger_name) l.setLevel(level) formatter = logging.Formatter(format) fileHandler = logging.FileHandler(log_file, mode='a', encoding='utf-8') fileHandler.setFormatter(formatter) streamHandler = logging.StreamHandler() streamHandler.setFormatter(formatter) l.addHandler(fileHandler) if sHandler: l.addHandler(streamHandler) return l def create_logger(): try: os.makedirs(log_dir, exist_ok=True) except: raise ValueError('Error while trying to make directory:' + log_dir) error_batch = log_dir + '/error-batch-%s.log' % current_time success_batch = log_dir + '/success-batch-%s.log' % current_time error_upload = log_dir + '/error-upload-%s.log' % current_time success_upload = log_dir + '/success-upload-%s.log' % current_time snow_transfer_log = log_dir + '/snowTransfer-full-%s.log' % current_time global log_list log_list = [error_batch, success_batch, error_upload, success_upload, snow_transfer_log] # define logger global error_log_batch, success_log_batch, error_log_upload, success_log_upload, snow_transfer_full snow_transfer_full = setup_logger('snow_transfer_full', snow_transfer_log, level=log_level, sHandler=True, format='%(asctime)s : %(funcName)s : [%(levelname)s] : %(message)s') if (args.cmd == copy_command): error_log_batch = setup_logger( 'error_log_batch', error_batch, level=log_level, sHandler=False) success_log_batch = setup_logger( 'success_log_batch', success_batch, level=log_level, sHandler=False) error_log_upload = setup_logger( 'error_log_upload', error_upload, level=log_level, sHandler=False) success_log_upload = setup_logger( 'success_log_upload', success_upload, level=log_level, sHandler=False) log_level = logging.INFO # DEBUG, INFO, WARNING, ERROR def setup_config(cmd): if (not bool(args.config_file)): return if (not os.path.isfile(args.config_file)): raise IOError('Config file not found! ' + args.config_file) list_of_globals = globals() config = configparser.ConfigParser() config.read(args.config_file) if (cmd == partition_command): list_of_globals['src'] = config['GENLIST']['src'] list_of_globals['filelist_dir'] = config['GENLIST']['filelist_dir'] list_of_globals['partition_size'] = humanfriendly.parse_size( config['GENLIST']['partition_size'], binary=True) list_of_globals['device_capacity'] = humanfriendly.parse_size( config['GENLIST']['device_capacity'], binary=True) list_of_globals['log_dir'] = config['GENLIST']['log_dir'] elif (cmd == copy_command): # if the config file was provided, overwrite the default setting with values in config file. list_of_globals['src'] = config[copy_command.upper()]['src'] list_of_globals['endpoint'] = config[copy_command.upper()]['endpoint'] list_of_globals['bucket_name'] = config[copy_command.upper() ]['bucket_name'] list_of_globals['prefix_root'] = config[copy_command.upper() ]['prefix_root'] list_of_globals['profile_name'] = config[copy_command.upper() ]['profile_name'] list_of_globals['aws_access_key_id'] = config[copy_command.upper( )]['aws_access_key_id'] list_of_globals['aws_secret_access_key'] = config[copy_command.upper( )]['aws_secret_access_key'] list_of_globals['max_process'] = int( config[copy_command.upper()]['max_process']) list_of_globals['max_tarfile_size'] = humanfriendly.parse_size( config[copy_command.upper()]['partition_size'], binary=True) list_of_globals['max_files'] = int( config[copy_command.upper()]['max_files']) list_of_globals['compression'] = 'gz' if eval( config[copy_command.upper()]['compression'].capitalize()) else '' list_of_globals['target_file_prefix'] = config[copy_command.upper( )]['target_file_prefix'] list_of_globals['extract_flag'] = eval( config[copy_command.upper()]['extract_flag'].capitalize()) list_of_globals['log_dir'] = config[copy_command.upper()]['log_dir'] list_of_globals['upload_logs'] = eval( config[copy_command.upper()]['upload_logs'].capitalize()) list_of_globals['ignored_path_prefix'] = config[copy_command.upper( )]['ignored_path_prefix'] def human_readable_size(size, decimal_places=2): for unit in ['B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB']: if size < 1024.0 or unit == 'PiB': break size /= 1024.0 return f'{size:.{decimal_places}f} {unit}' def valid_setting(cmd): if (cmd == partition_command): # check if the filelist_dir and partition_size were correctly set up if (not src or not filelist_dir or not partition_size or not log_dir or not device_capacity): raise ValueError( 'src, filelist_dir, partition_size, device_capacity, and log_dir must be specified!') if not os.path.isdir(src): raise IOError('Source directory not found: ' + src) elif (cmd == copy_command): if (not src or not bucket_name or not endpoint or not log_dir): raise ValueError( 'src, bucket_name, endpoint and log_dir must be specified!') def print_setting(): if (args.cmd == partition_command): output_config = f''' Command: {args.cmd} src: {src} filelist_dir: {filelist_dir} partition_size: {human_readable_size(partition_size)} device_capacity: {human_readable_size(device_capacity)} log_dir: {log_dir}''' snow_transfer_full.info(output_config) elif (args.cmd == copy_command): output_config = f''' command: {args.cmd} src: {src} endpoint: {endpoint} bucket_name: {bucket_name} log_dir: {log_dir} profile_name: {profile_name} prefix_root: {prefix_root} max_process: {max_process} partition_size: {human_readable_size(max_tarfile_size)} max_files: {max_files} compression: {bool(compression)} target_file_prefix: {target_file_prefix} extract_flag: {extract_flag} upload_log: {upload_logs} ignored_path_prefix: {ignored_path_prefix}''' snow_transfer_full.info(output_config) # Adding this to store the previous state so that we can change the file suffix when one file list was fully generated last_key = '' def write_to_file(dict, fl_type_creating, fl_type_done, is_last_write=False): global last_key for key, value in dict.items(): if key != last_key: if os.path.isfile(last_key): snow_transfer_full.info("Changing the suffix of file %s from %s to %s" % ( last_key, fl_type_creating, fl_type_done)) # Rename the file os.rename(last_key, last_key.replace( fl_type_creating, fl_type_done, 1)) last_key = key with open(key, 'a') as f: for line in value: f.write('%s\n' % line) if (is_last_write): if os.path.isfile(last_key): snow_transfer_full.info("Changing the suffix of file %s from %s to %s" % ( last_key, fl_type_creating, fl_type_done)) # Rename the file os.rename(last_key, last_key.replace( fl_type_creating, fl_type_done, 1)) return 0 # error handler for ``shutil.rmtree`` def handle_rmtree_err(func, path, exc_info): if not os.path.isdir(path): snow_transfer_full.info('Directory does not exsit, creating one...') return snow_transfer_full.error( 'Error while trying to clean: %s\n%s' % (path, exc_info)) raise ValueError() def isPathEmpty(path): if os.path.isdir(path): # Checking if the directory is empty or not if not os.listdir(path): return True else: return False def isPathExsits(path): return os.path.isdir(path) def scan_recursive(src): for entry in os.scandir(src): if entry.is_dir(follow_symlinks=False): yield from scan_recursive(entry.path) elif entry.is_file(): yield entry def gen_filelist(src): sum_size = 0 fl_prefix = 'fl_' + randchar + "_" fl_type_creating = '.creating' fl_type_done = '.ready' fl_index = 1 delimiter = ' ' num_file_processed = 0 file_target_mp = dict() device_num = 1 device_folder_name = 'device_' + \ human_readable_size(device_capacity).replace(' ', '') + '_' device_sum_size = 0 if (not isPathExsits(filelist_dir)): try: snow_transfer_full.info( 'Directory does not exsit, creating one...') os.makedirs(filelist_dir) except: snow_transfer_full.error( 'Error while trying to make directory:' + filelist_dir) raise ValueError() snow_transfer_full.info( 'Generating file list by size %s bytes' % partition_size) for entry in scan_recursive(src): fl_name = filelist_dir + '/' + device_folder_name + \ str(device_num) + '/' + fl_prefix + \ str(fl_index) + fl_type_creating file_name = entry.path f_size = entry.stat().st_size sum_size = sum_size + f_size f_info = [file_name, str(f_size)] f_info_str = delimiter.join(f_info) if partition_size < sum_size: if (sum_size - f_size != 0): fl_index += 1 snow_transfer_full.info('Part #%d: size = %d' % (fl_index - 1, sum_size - f_size)) device_sum_size = device_sum_size + sum_size - f_size # If this file will fall into the next device, we need to create a new folder with a new device id. # Use max(partition_size, f_size) to handle some extreme cases that the size of one file is larger that the partition size. if device_sum_size + max(partition_size, f_size) > device_capacity: device_num += 1 device_sum_size = 0 fl_name = filelist_dir + '/' + device_folder_name + \ str(device_num) + '/' + fl_prefix + \ str(fl_index) + fl_type_creating sum_size = f_size if (not isPathExsits(filelist_dir + '/' + device_folder_name + str(device_num))): try: snow_transfer_full.info( 'Creating a new folder for device ' + str(device_num)) os.makedirs(filelist_dir + '/' + device_folder_name + str(device_num)) except: snow_transfer_full.error( 'Error while trying to make new folder for device ' + str(device_num)) raise ValueError() if fl_name not in file_target_mp: file_target_mp[fl_name] = [] file_target_mp[fl_name].append(f_info_str) num_file_processed += 1 # flush the dict every 100000 file to avoid OOM if (num_file_processed % 100000 == 0): snow_transfer_full.info( 'Number of scanned file: %d', num_file_processed) write_to_file(file_target_mp, fl_type_creating, fl_type_done) file_target_mp = dict() # write the remaining info to file write_to_file(file_target_mp, fl_type_creating, fl_type_done, True) snow_transfer_full.info('Part #%d: size = %d' % (fl_index, sum_size)) snow_transfer_full.info( 'Number of scanned file: %d, symlink and empty folders were ignored', num_file_processed) snow_transfer_full.info('File lists are generated!!') snow_transfer_full.info('Check %s' % filelist_dir) return 0 def set_up_transfer_config(): global mpu_max_concurrency, transfer_config, s3_client if os.name == 'posix': multiprocessing.set_start_method('fork') mpu_max_concurrency = 10 transfer_config = TransferConfig(max_concurrency=mpu_max_concurrency) # S3 session session = boto3.Session(profile_name=profile_name) if (aws_secret_access_key and aws_secret_access_key): session = boto3.Session(region_name='snow', aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key) s3_client = session.client('s3', endpoint_url=endpoint) # execute multi-threading def run_multip(max_process, exec_func, q, job_meta): p_list = [] for i in range(max_process): p = multiprocessing.Process(target=exec_func, args=(q, job_meta)) p_list.append(p) p.daemon = True p.start() return p_list def finishq(q, p_list): for j in range(max_process): q.put(quit_flag) for pi in p_list: pi.join() def conv_obj_name(file_name, prefix_root, sub_prefix): if len(prefix_root) == 0: pass elif prefix_root[-1] != '/': prefix_root = prefix_root + '/' if sub_prefix[-1] != '/': sub_prefix = sub_prefix + '/' if os.name == 'nt': sub_prefix = sub_prefix.replace('\\', '/') obj_name = prefix_root + \ file_name.replace('\\', '/').replace(sub_prefix, '', 1) else: file_name = file_name.replace(sub_prefix, '', 1) if (file_name[0] == '/'): file_name = file_name[1:] obj_name = prefix_root + file_name if (len(sub_prefix) == 0 and obj_name[0] == '/'): obj_name = obj_name[1:] return obj_name def copy_to_snowball(tar_name, org_files_list, job_meta): delimeter = ', ' tar_file_size = 0 recv_buf = io.BytesIO() collected_files_no = 0 if (len(org_files_list) == 0): return 0 # do not perform batch when there is only one file in file_list because this could be an very large file # that could not tar in memory. Use mpu instead. if (len(org_files_list) == 1): file_name, obj_name, file_size = org_files_list[0] file_meta_data = delimeter.join([file_name, obj_name, str(file_size)]) error_log_batch.info(file_meta_data) try: s3_client.upload_file(file_name, bucket_name, obj_name, Config=transfer_config) except Exception as e: snow_transfer_full.error(str(e)) error_log_upload.info(file_meta_data) try: s3_client.head_object(Bucket=bucket_name, Key=obj_name) success_log_upload.info(file_meta_data) snow_transfer_full.info('%s was uploaded' % (file_name)) lock.acquire() try: job_meta['uploaded_obj_num'] += 1 finally: lock.release() except: error_log_upload.info(file_meta_data) return 1 lock.acquire() try: job_meta['tar_num'] += 1 tmp_name = '' if os.path.isfile( src) else '-' + str(job_meta['tar_num']) if compression == '': tar_name = tar_name + tmp_name + '.tar' elif compression == 'gz': tar_name = tar_name + tmp_name + '.tgz' finally: lock.release() with tarfile.open(fileobj=recv_buf, mode='w:' + compression) as tar: for file_name, obj_name, file_size in tqdm(org_files_list, desc=tar_name, bar_format='{l_bar}{bar:50}| {n_fmt}/{total_fmt}'): file_meta_data = delimeter.join( [tar_name, file_name, obj_name, str(file_size)]) try: tar.add(file_name, arcname=obj_name) tar_file_size += file_size collected_files_no += 1 success_log_batch.info(file_meta_data) except Exception as e: snow_transfer_full.error(str(e)) error_log_batch.info(file_meta_data) recv_buf.seek(0) lock.acquire() try: job_meta['batched_obj_num'] += collected_files_no job_meta['total_tar_size'] += + recv_buf.getbuffer().nbytes finally: lock.release() tar_meta_data = delimeter.join( [tar_name, bucket_name, str(tar_file_size), str(collected_files_no)]) try: if extract_flag: s3_client.upload_fileobj(recv_buf, bucket_name, tar_name, ExtraArgs={'Metadata': { 'snowball-auto-extract': 'true', 'snowball-auto-batch': 'true'}, 'StorageClass': 'STANDARD'}, Config=transfer_config) else: s3_client.upload_fileobj( recv_buf, bucket_name, tar_name, Config=transfer_config) except Exception as e: snow_transfer_full.error(str(e)) error_log_upload.info(tar_meta_data) try: s3_client.head_object(Bucket=bucket_name, Key=tar_name) success_log_upload.info(tar_meta_data) snow_transfer_full.info('%s was uploaded' % (tar_name)) lock.acquire() try: job_meta['uploaded_obj_num'] += 1 finally: lock.release() except: error_log_upload.info(tar_meta_data) return collected_files_no def upload_file(q, job_meta): global target_file_prefix while True: org_files_list = q.get() tar_name = ('%ssnowball-%s-%s' % (target_file_prefix, current_time, os.path.basename(src) if os.path.isfile(src) else job_meta['random_str'])) if org_files_list == quit_flag: break try: copy_to_snowball(tar_name, org_files_list, job_meta) except Exception as e: snow_transfer_full.error(traceback.format_exc()) def push_file_to_list(sum_size, org_files_list, file_name, sub_prefix, q, job_meta, src_dir="", is_partition_file=False): try: sub_prefix = src_dir if src_dir else sub_prefix obj_name = conv_obj_name(file_name, prefix_root, sub_prefix) if (os.path.islink(file_name)): # continue if it is a symlink return try: f_size = os.stat(file_name).st_size except Exception as e: snow_transfer_full.error(str(e)) job_meta['total_size'] += f_size file_info = (file_name, obj_name, f_size) if (is_partition_file or (sum_size + f_size < max_tarfile_size and len(org_files_list) < max_files)): org_files_list.append(file_info) sum_size += f_size else: # since upload is much slower than list generation, do not keep pushing list to the queue to avoid OOM while (q.qsize() > max_process): pass # put files into queue in max_tarfile_size q.put(org_files_list) org_files_list = [file_info] sum_size = f_size job_meta['total_file_num'] += 1 except Exception as e: snow_transfer_full.error(str(e)) return (org_files_list, sum_size) # get files to upload def upload_get_files(q, job_meta): sum_size = 0 org_files_list = [] if (os.path.isfile(src)): snow_transfer_full.info( "Uploading from file partition list, please make sure the partition file contains your files' absolute path") with open(src) as src_file: while True: # split from the end of the string to handle space in file names. file_name = src_file.readline().rsplit(' ', 1)[0] if not file_name: break if (os.path.isfile(file_name)): tmp = push_file_to_list( sum_size, org_files_list, file_name, src, q, job_meta, ignored_path_prefix, True) if (tmp): org_files_list, sum_size = tmp # put remained files into queue q.put(org_files_list) return # get all files from given directory for entry in scan_recursive(src): file_name = entry.path tmp = push_file_to_list(sum_size, org_files_list, file_name, src, q, job_meta) if (tmp): org_files_list, sum_size = tmp # put remained files into queue q.put(org_files_list) def upload_file_multi(file_list_queue, job_meta): p_list = run_multip(max_process, upload_file, file_list_queue, job_meta) # get object list and ingest to processes upload_get_files(file_list_queue, job_meta) # sending quit_flag and join processes finishq(file_list_queue, p_list) def batch_and_upload(): file_list_queue = multiprocessing.Manager().Queue() job_meta = multiprocessing.Manager().dict() job_meta['total_file_num'] = 0 job_meta['total_size'] = 0 job_meta['tar_num'] = 0 job_meta['batched_obj_num'] = 0 job_meta['total_tar_size'] = 0 job_meta['uploaded_obj_num'] = 0 job_meta['random_str'] = randchar upload_file_multi(file_list_queue, job_meta) snow_transfer_full.info('%d out of %d files were batched into %d tar files' % ( job_meta['batched_obj_num'], job_meta['total_file_num'], job_meta['tar_num'])) snow_transfer_full.info('Total size: %s' % ( human_readable_size(job_meta['total_size']))) snow_transfer_full.info('Total size after batching: %s' % ( human_readable_size(job_meta['total_tar_size']))) if (job_meta['total_file_num'] != 0): snow_transfer_full.info('Avg file size before: %s' % ( human_readable_size(job_meta['total_size'] / job_meta['total_file_num']))) snow_transfer_full.info('Avg file size after: %s' % (human_readable_size( job_meta['total_size'] / ((job_meta['total_file_num'] - job_meta['batched_obj_num']) + job_meta['tar_num'])))) snow_transfer_full.info('%d files were uploaded' % (job_meta['uploaded_obj_num'])) # upload log files to S3 def upload_log(): for file in log_list: s3_client.upload_file(file, bucket_name, file[1:]) snow_transfer_full.info('Log files were uploaded!') # set up config outside of main function to support Windows "spawn" mechanism of starting new processes if (args.cmd): setup_config(args.cmd) create_logger() if (args.cmd == copy_command): set_up_transfer_config() lock = multiprocessing.Lock() # start main function if __name__ == '__main__': if (not args.cmd): sys.exit() print_setting() valid_setting(args.cmd) if args.cmd == copy_command: snow_transfer_full.info('Batching and uploading files...') batch_and_upload() if (upload_logs): upload_log() elif args.cmd == partition_command: gen_filelist(src) snow_transfer_full.info('Program finished!')