#!/usr/bin/env python """ Script for auto-configuring ``s3.max_concurrent_requests`` variable based on transfer speed. Note that this script only accounts for upload speed. To run this script, the installed AWS CLI version must be 1.11.24 or higher. """ import os import re import shutil import subprocess import sys import tempfile import time BUCKET = None KEY = None TRANSFER_SPEED_REGEX = re.compile('\((.*)/s\)') MAX_POLLING_INTERVAL = 10 TEMPFILE_SIZE = 5 * (1024 ** 3) TRANSFER_SPEED_DISPLAY = 'Transfer speed: %s/s' MAX_THREAD_INCREASE = 3 SIZE_SUFFIX = { 'kb': 1024, 'mb': 1024 ** 2, 'gb': 1024 ** 3, 'tb': 1024 ** 4, 'kib': 1024, 'mib': 1024 ** 2, 'gib': 1024 ** 3, 'tib': 1024 ** 4, } assert BUCKET, ( 'Please set the BUCKET variable in the auto-configure script to a ' 'bucket that you can upload to') assert KEY, ( 'Please set the KEY variable in the auto-configure script to ' 'a key that you want to upload to') def human_readable_to_bytes(value): value = value.lower() if value[-2:] == 'ib': # Assume IEC suffix. suffix = value[-3:].lower() else: suffix = value[-2:].lower() has_size_identifier = ( len(value) >= 2 and suffix in SIZE_SUFFIX) if not has_size_identifier: try: return int(value) except ValueError: raise ValueError("Invalid size value: %s" % value) else: multiplier = SIZE_SUFFIX[suffix] return float(value[:-len(suffix)]) * multiplier def create_transfer_file(directory): filename = os.path.join(directory, 'file') with open(filename, 'a+') as f: f.truncate(TEMPFILE_SIZE) return filename def determine_transfer_speed(filename): p = upload(filename) try: return _get_transfer_speed_from_process(p) except BaseException: p.terminate() raise def _get_transfer_speed_from_process(p): start_time = time.time() total_time = 0 current_transfer_speed = '' len_previous_print_statement = 0 while total_time < MAX_POLLING_INTERVAL: line = p.stdout.read(40).decode('utf-8') transfer_speed_match = TRANSFER_SPEED_REGEX.search(line) if transfer_speed_match: transfer_speed = transfer_speed_match.group(1) print_statement = TRANSFER_SPEED_DISPLAY % transfer_speed print_statement = print_statement.ljust( len_previous_print_statement, ' ') sys.stdout.write(print_statement + '\r') sys.stdout.flush() len_previous_print_statement = len(print_statement) current_transfer_speed = transfer_speed total_time = time.time() - start_time p.terminate() sys.stdout.write('\n') return current_transfer_speed def upload(filename): return subprocess.Popen( 'aws s3 cp %s s3://%s/%s' % (filename, BUCKET, KEY), stdout=subprocess.PIPE, shell=True) def set_max_concurrent_requests(num): print('Setting max_concurrent_requests to: %s' % num) subprocess.check_call( 'aws configure set s3.max_concurrent_requests %s' % num, shell=True) def get_next_max_concurrent_requests(current, stats, history): stats_in_bytes = _get_stats_to_bytes(stats) return _calculate_next_max_concurrent_requests( current, stats_in_bytes, history) def _calculate_next_max_concurrent_requests(current, stats, history): if not history: return current + MAX_THREAD_INCREASE previous = history[-1] ratio_difference = stats[current] / stats[previous] multiplier = 0 if ratio_difference > 1.75: multiplier = 1 elif ratio_difference > 1.50: multiplier = 0.67 elif ratio_difference > 1.25: multiplier = 0.33 elif ratio_difference <= 1: return history[-1] return current + int(round(MAX_THREAD_INCREASE * multiplier)) def _get_stats_to_bytes(stats): stats_in_bytes = {} for num_threads, speed in stats.items(): stats_in_bytes[num_threads] = human_readable_to_bytes( speed.replace(' ', '')) return stats_in_bytes def _cleanup_orphan_multipart_uploads(): subprocess.check_call( 'aws s3api list-multipart-uploads --bucket %s ' '--prefix %s --query Uploads[].[UploadId] --output text | ' 'xargs -I {} aws s3api abort-multipart-upload --bucket ' '%s --key %s --upload-id {}' % ( BUCKET, KEY, BUCKET, KEY), shell=True ) def autoconfigure(): temp_directory = tempfile.mkdtemp() stats = {} history = [] max_concurrent_requests = 1 try: print('Creating temporary file for uploads...') filename = create_transfer_file(temp_directory) print('Created temporary file: %s' % filename) while True: set_max_concurrent_requests(max_concurrent_requests) print('Starting upload...') stats[max_concurrent_requests] = determine_transfer_speed(filename) next_max_concurrent_requests = get_next_max_concurrent_requests( max_concurrent_requests, stats, history) if next_max_concurrent_requests <= max_concurrent_requests: break history.append(max_concurrent_requests) max_concurrent_requests = next_max_concurrent_requests print('Done tuning max_concurrent_requests...') set_max_concurrent_requests(next_max_concurrent_requests) except BaseException: raise finally: print('Cleaning up temporary file and any remaining multipart ' 'uploads...') shutil.rmtree(temp_directory) _cleanup_orphan_multipart_uploads() if __name__ == '__main__': autoconfigure()