#!/opt/workflow/python/bin/python2.7 # # Copyright 2013-2018 Edico Genome Corporation. All rights reserved. # # This file contains confidential and proprietary information of the Edico Genome # Corporation and is protected under the U.S. and international copyright and other # intellectual property laws. # # $Id$ # $Author$ # $Change$ # $DateTime$ # # Executable python script that runs in a cloud instance to copy Dragen input files # from S3 and output files to S3 # from __future__ import division from __future__ import print_function from future import standard_library standard_library.install_aliases() from builtins import str from builtins import range from builtins import object from past.utils import old_div import getopt import os import sys import urllib.request, urllib.parse, urllib.error # For testing multipart downloads using byte ranges from multiprocessing import Pool, TimeoutError import requests # TODO: replace this with urlib2 from urllib.request import urlopen import scheduler.aws_utils as aws import scheduler.scheduler_utils as utils from scheduler.logger import Logger # Constants ... DOWNLOAD_CHUNK_SIZE = 16*1024 # Chunk size in bytes for download VALID_MODES = ['download', 'import', 'upload'] # Operational modes # # Global command line args # run_mode = None source_url = None s3_bucket = None s3_obj_key = None local_path = None work_dir = "/staging/tmp" log_dir = "/tmp" stdout_flag = False nosign_flag = False multipart_flag = False class DHaul(object): ######################################################################################## # constructor # def __init__(self, logger): self.logger = logger self.multipart_flag = multipart_flag self.nosign_flag = nosign_flag self.download_dir = work_dir self.file_name = None self.download_full_path = None self.download_len = 0 ######################################################################################## # download_from_url - Download file from the specified URL to the pre-configured download dir # Returns the number of bytes downloaded (i.e. file size) # def download_from_url(self, url): url_parts = url.split('?') # Get the file name from the URL main_url = url_parts[0] self.file_name = main_url.split('/')[-1] # Since we are operating on a URL string, we should decode it to remove any percent encodings self.file_name = urllib.parse.unquote(self.file_name) # Make sure the path exists for the target directory utils.check_create_dir(self.download_dir) # Configure the full path of the download file self.download_full_path = self.download_dir + '/' + self.file_name # If multipart download is enabled, use this new method if self.multipart_flag: # Number of bytes total to download urlinfo = urllib.request.urlopen(url).info() numBytes = int(urlinfo['content-length']) # Multipart download pool = Pool() numThreads = 8 # hardcoded to 8 threads partialBytes = old_div(numBytes,numThreads) + 1 results = [] # Spawn threads to download the partial files for idx in range(numThreads): results.append(pool.apply_async(self, (url, self.download_full_path, idx, partialBytes))) # Wait for the partial files to complete try: for idx in range(numThreads): results[idx].get(timeout = 5000) except TimeoutError: self.logger.error('ERROR: Timed out waiting for partial download to complete!') sys.exit(1) # Concatenate the files fileList = ["'%s.part.%03d'" % (self.download_full_path, idx) for idx in range(numThreads)] cmdList = ['cat'] + fileList + ['>', '"%s"' % self.download_full_path] cmd = " ".join(cmdList) os.system(cmd) # Downloaded length is the content length self.download_len = numBytes # Clean up of partial files, removing quotes on sides which was to protect for spaces in filenames [os.remove(x.strip("'")) for x in fileList] else: r = urlopen(url) if r.getcode() >= 400: self.logger.error('HTTP error status code=%d' % r.getcode()) return 0 with open(self.download_full_path, 'wb') as f: while True: chunk = r.read(DOWNLOAD_CHUNK_SIZE) if not chunk: break self.download_len += len(chunk) f.write(chunk) # Check to make sure the downloaded bytes matches file size file_len = os.path.getsize(self.download_full_path) assert (file_len == self.download_len), "File size does not match download len!" return file_len ######################################################################################## # class functor which calls the multi_download thread function # def __call__(self, url, filename, n, partialBytes): self.multi_download(url, filename, n, partialBytes) ######################################################################################## # multi_download - Downloads a byte range of a file from a presigned URL # TODO: Refactor to use URLIB2 instead of requests # def multi_download(self, url, filename, n, partialBytes): global lock # Create a partial filename from the full filename partname = "%s.part.%03d" % (filename , n) # Calculate the byte range and place it in the GET header byte_start = (n ) * partialBytes byte_stop = (n + 1) * partialBytes - 1 byte_range = "bytes=%d-%d" % (byte_start, byte_stop) headers = {'Range': byte_range} # Download in 'stream' mode to make sure we don't run out of memory r = requests.get(url, stream=True, headers=headers) # Write the chunks to the partial file with open(partname, 'wb') as f: for chunk in r.iter_content(chunk_size=DOWNLOAD_CHUNK_SIZE): f.write(chunk) ######################################################################################## # run_import - Run the 'import' function to import a file from a URL to local S3 # def run_import(self): global source_url self.logger.log('Starting d_haul import ...') # Download from source URL try: file_size = self.download_from_url(source_url) except Exception as e: raise e if not file_size: self.logger.error('Could not properly download from %s' % source_url) sys.exit(1) self.logger.log("Downloaded file with size=%s" % file_size) # Upload to cloud S3 Bucket try: upload_size = aws.s3_upload(self.download_full_path, s3_bucket, s3_obj_key) except Exception as e: os.remove(self.download_full_path) raise e # Final clean up work if not upload_size: self.logger.error('Could not upload file to S3!') else: self.logger.log("Uploaded file to S3 with size=%s" % upload_size) try: os.remove(self.download_full_path) except OSError: self.logger.error("Error deleting temporary file.") return ######################################################################################## # run_download - Run the 'download' function to download locally from S3 directory or object # def run_download(self): global s3_bucket, s3_obj_key, local_path, source_url self.logger.log('Starting d_haul download ...') # Figure out if path is dir or object if local_path.endswith('/'): # Assume local path is directory, 'full' path is not used self.download_dir = local_path self.download_full_path = None else: # Assume local path points to absolute file location and source is a single object # In this case the download_dir is not use (it is implicit in full path) self.download_full_path = local_path self.download_dir = None try: if source_url: # Download from source URL tot_size = self.download_from_url(source_url) elif self.download_dir: # Call the full bucket download function tot_size = aws.s3_download_dir(s3_bucket, s3_obj_key, self.download_dir, nosign=self.nosign_flag) else: # Assume we are only downloading one object obj_info = { "bucket": s3_bucket, "obj_key": s3_obj_key, "tgt_path": self.download_full_path, "region": "us-east-1" } tot_size = aws.s3_download_file(obj_info, nosign=self.nosign_flag) except Exception as e: raise e if not tot_size: self.logger.error('Could not properly download to %s' % local_path) return self.logger.log("Downloaded %d bytes to location %s" % (tot_size, local_path)) return ######################################################################################## # run_upload - Run the 'upload' function to upload a local file or directory to S3 bucket # def run_upload(self): global s3_bucket, s3_obj_key, local_path, source_url try: self.logger.log('Starting d_haul upload ...') tot_size = aws.s3_upload(local_path, s3_bucket, s3_obj_key) self.logger.log("Uploaded %d bytes to S3" % tot_size) # Log the error if file is missing, but do not exit with error code except ValueError as v: self.logger.error(str(v)) return ######################################################################################## # usage # def usage(): print() print("Usage: d_haul -m [parameters]") print(" Mode 'import' (from URL to S3): 'url', 'bucket', 'key' (for object)") print(" Mode 'download' (from S3): 'bucket', 'key' (used as prefix if dir download), 'path' (dir or file)") print(" Mode 'upload' (to S3): 'path' (local dir or file), 'bucket', 'key' (used as prefix if dir upload)") print() print(" -m ,--mode= Select mode: 'import','download','upload'") print(" -u ,--url= Source URL (import only)") print(" -b ,--bucket= S3 Bucket") print(" -k ,--key= S3 Object Key or Prefix (dir)") print(" -p ,--path= Local path for directory (ending with /) or file") print(" -w ,--work-dir= Working directory (Optional, default to /staging/tmp/)") print(" -l ,--log-dir= Logging and status directory (Optional, default to /tmp/)") print(" -s,--stdout Log to stdout, instead of to log-dir") print(" -h,--help This help message") print() print() sys.exit(1) ######################################################################################## # process_args - Process command line args and update globals. # def process_args(): global run_mode, source_url, s3_bucket, s3_obj_key, local_path, work_dir, log_dir, local_path, stdout_flag, nosign_flag, multipart_flag try: opts, args = getopt.getopt(sys.argv[1:], "m:u:b:k:p:w:l:snxh", ["mode=", "url=", "bucket=", "key=", "path=", "work-dir=", "log-dir=", "stdout", "nosign", "multipart", "help"]) except getopt.GetoptError as err: print(str(err)) usage() sys.exit(1) for o, v in opts: if o in ("-m", "--mode"): run_mode = v.lower() elif o in ("-u", "--url"): source_url = v elif o in ("-b", "--bucket"): s3_bucket = v elif o in ("-k", "--key"): s3_obj_key = v elif o in ("-p", "--path"): local_path = v elif o in ("-w", "--work-dir"): work_dir = v.rstrip().lstrip().rstrip('/') elif o in ("-l", "--log-dir"): log_dir = v.rstrip().lstrip().rstrip('/') elif o in ("-s", "--stdout"): stdout_flag=True elif o in ("-n", "--nosign"): nosign_flag=True elif o in ("-x", "--multipart"): multipart_flag=True else: print("Unrecognized option %s %s" % (o, v)) usage() if not run_mode or run_mode not in VALID_MODES: print("ERROR: Missing or invalid mode!") usage() if not s3_bucket and run_mode == 'import': print("ERROR: S3 Bucket option required for import mode!") usage() if not source_url and not s3_obj_key: print("ERROR: S3 Object Key option required!") usage() if not source_url and run_mode == 'import': print("ERROR: Source URL is required for import mode!") usage() if not s3_bucket and nosign_flag: print("ERROR: --nosign option is only valid for bucket/key!") usage() if not source_url and multipart_flag: print("ERROR: --multipart flag is only valid for URL downloads!") if not run_mode == 'download' and multipart_flag: print("ERROR: --multipart flag is only valid for URL downloads!") if not local_path and (run_mode == 'download' or run_mode == 'upload'): print("ERROR: Local path is required for download or upload mode!") usage() return ######################################################################################## # main # def main(): global stdout_flag process_args() # set up log_dir .... try to create if it doesn't exist utils.check_create_dir(log_dir) if stdout_flag: logger = Logger() else: logger = Logger(logpath="%s/d_haul.log" % log_dir, stdout=True) try: d_haul = DHaul(logger) if run_mode == 'import': d_haul.run_import() elif run_mode == 'download': d_haul.run_download() else: d_haul.run_upload() except Exception as e: print(str(e)) sys.exit(1) if __name__ == "__main__": main()