import os import boto3 import paramiko import json import datetime def worker_handler(event, context): try: json_message = event['Records'][0]['Sns']['Message'] print "invoked as SNS" except Exception as e: print "invoked as API Gateway" json_message = json.dumps(event) message = json.loads(json_message) launch_command = message['launch'] date_time = datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S') print date_time print "Downloading SSH Key" try: s3 = boto3.client('s3') print "Connecting to S3" #Download private key file from secure S3 bucket hpc_bucket_s3=os.environ['SLURM_BUCKET_S3'] hpc_key_s3=os.environ['SLURM_KEY_S3'] print "Downloading key: s3://%s/%s"%(hpc_bucket_s3,hpc_key_s3) s3.download_file('%s'%hpc_bucket_s3,'%s'%hpc_key_s3,'/tmp/keyname.pem') except Exception as e: print e print "Downloaded key to /tmp" k = paramiko.RSAKey.from_private_key_file("/tmp/keyname.pem") c = paramiko.SSHClient() c.set_missing_host_key_policy(paramiko.AutoAddPolicy()) host=os.environ['HEADNODE'] print "Connecting to " + host c.connect( hostname = host, username = "centos", pkey = k ) print "Connected to " + host sbatch_file = "\ #!/bin/bash\n\ #SBATCH --job-name=%s\n\ #SBATCH --nodes=%s\n\ #SBATCH --ntasks-per-node=%s\n\ #SBATCH --cpus-per-task=%s\n\ #SBATCH --constraint=%s\n\ #SBATCH --chdir=%s\n\ \n\ aws s3 cp %s .\n\ %s\n\ tar -czvf %s.tar.gz .\n\ aws s3 cp %s.tar.gz %s\n\ "%(message['jobname'],message['nodes'],message['tasks-per-node'],message['cpus-per-task'],message['feature'],message['io'][0]['workdir'],message['io'][1]['input'],launch_command,message['jobname'],message['jobname'],message['io'][2]['output']) f = open("/tmp/job_%s.sbatch"%date_time, "w") f.write(sbatch_file) f.close() c_ftp = c.open_sftp() c_ftp.put("/tmp/job_%s.sbatch"%date_time,"/home/centos/job_%s.sbatch"%date_time) c_ftp.close() commands= [ "mkdir -p %s"%message['io'][0]['workdir'], "sbatch job_%s.sbatch"%date_time ] for command in commands: print "Executing {}".format(command) stdin , stdout, stderr = c.exec_command(command) exec_out = str(stdout.read()) exec_err = str(stderr.read()) print stdout.read() print stderr.read() out={'jobid': exec_out, 'error': exec_err, 'name': message['username']} return { "statusCode": 200, "body": json.dumps(out) }