B W`:M@sddlZddlmZddlZddlZddlZddlmZmZddlm Z ddl Z ddl Z ddl Z ej ejdGdddeZdS)N) ClientError)datetime timedelta)uuid4)levelc@seZdZdZd.ddZddZd/d d Zd0d d ZddZd1ddZ ddZ ddZ d2ddZ d3ddZ d4ddZd5dd Zd!d"Zd#d$Zd%d&Zd'd(Zd)d*Zd+d,Zd-S)6FarOptaFarOpt class used to initialize, configure and submit jobs to the back end :param framework: Currently only ortools, TO DO is to extend to other frameworks. Note that other frameworks like pyomo, DEAP, inspyred and pulp are supported :type framework: string, optional :param stackname: Points to the backend CDK stack that needs to be launched separately. Default name is faropt, but you many need to pass in another name while testing :type stackname: string, optional ortoolsfaroptc Cstd}y$|j|d}|ddddkr.tdd|_||_|ddd }x|D]}|d d kr|d |_td |j|d dkr|d |_td|j|d dkr|d |_ td|j |d dkr|d |_ td|j |d dkr\|d |_ td|j q\Wd|_ d|_ Wn4tk rd}zd|_t|Wdd}~XYnXdg|_||jkrtd|n||_dS)z_Constructor method: Gets buckets and tables associated with the already launched stack Zcloudformation)Z StackNameZStacksrZ StackStatus)ZCREATE_COMPLETEZUPDATE_COMPLETEzFarOpt backend is ready!TZOutputsZ OutputKeyZ s3asyncoutputZ OutputValuezAsync Bucket: Zs3outputzBucket: recipetablezRecipe Table: jobtablez Job table: lambdaoptzLambda Opt function: FNrz/Only ortools is supported for now. You entered )boto3clientZdescribe_stackslogginginfoready stackname asyncbucketbucketr r r configured submitted ExceptionerrorZallowed_frameworkswarning framework)selfrrcfresponseoutputsoutputer!3/tmp/pip-unpacked-wheel-9of43mn_/faropt/__init__.py__init__sB                zFarOpt.__init__c Cstdd}tdd}xbt|D]T\}}}t|||x<|D]4}t||tj ||tj |d||qBWq&W| tj ||_ ||_d|_tddS)aRZips up a local folder containing your main.py code, and any other subfolders/files required to run your project. Make note of the output structure printed to see if all files that you need are printed.ArithmeticError :param source_dir: path to your source, such as './home/src/' :type source_dir: string zListing project files ...z source.zipw/TzConfigured job!N)rrzipfileZipFileoswalkprintwritepathjoinrelpathcloseabspathpath_file_name file_namer)r source_dirr2zfdirnamesubdirsfilesfilenamer!r!r" configureIs    0zFarOpt.configureFaropt SDK usercCs|jr~td|_|j|j|_tt}|j rN|||j |j d|dd}n |||j |j d|j|dd}|jj|dn tddS) a;Adds a recipe referencing the job that you submitted (see self object params). :param recipe_name: Friendly name for your recipe :type recipe_name: string :param maintainer: Recipe author/maintainer :type source_dir: string, optional. Defaults to 'Faropt SDK User' dynamodbz /source.zipzsee path)recipeid descriptionrr, maintainercoder%)ItemzPlease configure the job first!N)rr resource ddb_resourceTabler ddb_tablestrrmicror stagedkeyrjobnamer2put_itemrr)rZ recipe_namer>ZUIDjobr!r!r" add_recipeas&   zFarOpt.add_recipeFc Csxtdtd}tdd}||||WdQRXtjd|_ d|_ tdd|_ |rh| |j |d dS) aRuns job based on a source file in bucket/key. For example, place a source.zip in s3://bucket/key/source.zip and submit a job :param bucket: Bucket name :type bucket: string :param key: path/key on S3 that looks like path/to/s3/key/source.zip inside the bucket :type key: string :param micro: Submit a micro job. :type micro: bool zDownloading source...s3z/tmp/source.zipwbNz source.zipzConfigured job!T)rF)rrr ropendownload_fileobjr(r,r0r1r2rstagesubmit)rrkeyrFrLfr!r!r" run_s3_jobs    zFarOpt.run_s3_jobcCs4|jddx"|jD]}|d|kr|dSqWdS)aReturns UUID of a recipe based on friendly description/ recipe name :param description: friendly description/ recipe name :type description: string :return: First UUID that matches the description of the recipe :rtype: uuid4() F)verboser=r<N) list_recipesrecipes)rr=rr!r!r"get_recipe_id_from_descriptions   z%FarOpt.get_recipe_id_from_descriptionc Csytd|_|j|j|_|jjd|id}|dd}|dd}tdt d}t d d }| |||Wd QRXt j d |_d |_td d|_|j|dWn,tk r}zt|dSd }~XYnXd S)zRuns already registered recipe :param recipe_id: UUID of recipe :type recipe_id: string :param micro: Submit as a micro job :type recipe_id: bool r;r<)ZKeyr@r,rzDownloading recipe...rLz/tmp/source.ziprMNz source.zipzConfigured job!T)rFF)r rArBrCr rDZget_itemrrrrNrOr(r,r0r1r2rrQrr) rZ recipe_idrFrr,rrLrSr r!r!r" run_recipes$        zFarOpt.run_recipecCs6x&|dkr&t|tdqWtddS)zPolls for the primary status of the container task that runs this job. You should see PROVISIONING > PENDING > RUNNING > STOPPED > JOB COMPLETED :return: primary status of the job that was submitted :rtype: list STOPPEDzJOB COMPLETED!N)primary_statusr*timesleeprr)rr!r!r"waits z FarOpt.waitcCstdtd}yxtddtt}| |j |j d|d}tdt|||_ d||_ td|j d |j dWntd YnXd S) zaUploads the source.zip but does not submit to fargate. Useful when you want to run later z Staging jobrLz%Y-%m-%d-%H-%M-%S-zstaged/z /source.zipzStaged job! id: zLook for s3://z/staged/zCould not stage jobN)rrr rrnowstrftimerEr upload_filer1rrHrGr)r s3_clienteventidrr!r!r"rPs    z FarOpt.stagec Cs|jrtdtd}td|_|j|j|_ |syvd|_ t ddtt}||j|j|d}tdt|||_|j|j|dd }|j j|d Wn,tk r}zt|dSd }~XYnXny|d |_ td td}|j|jddt|j|jdd}|j|j|jdd }|j j|d |d}tt !|"t !|"|_#Wn.tk r}zt|dSd }~XYnXn tdd |_$d S)aRuns job defined in object params. Creates a new job ID to track and sets submitted to True. Check self.jobname to reference the job that was submitted. View self.logs() once the job has completed :param micro: Submit a micro job. By submitting a micro job, you are restricted to using ortools, pyomo and deap libraries for jobs that last up to 5 minutes :type micro: bool zSubmitting jobrLr;Fz%Y-%m-%d-%H-%M-%Sraz /source.zipzSubmitted job! id: )jobidrr,)r@NTz{By submitting a micro job, you are restricted to using ortools, pyomo and deap libraries for jobs that last up to 5 minuteslambdaZRequestResponseZTail)Zs3bucketZs3key)Z FunctionNameZInvocationTypeZLogTypeZPayloadZ LogResultzPlease configure the job first!)%rrrr rrArBrCr rDrFrrbrcrErrdr1rrHrIrrrPZinvoker jsondumpsrrGencoder*base64 b64decodedecode micrologsr) rrFrerfrrJr Z lamclientZbase64_messager!r!r"rQsR         z FarOpt.submit Tc Cstd}|j|j|d}g}x|dD]}||dd|dd|dd|dd|d d|d dd |r(td |ddd |ddd|ddd|ddd|d d q(W||_|S)a[Returns list of recipes registered :param limit: Number of recipes to return, Defaults to 10 :type limit: int, optional :param verbose: Verbose print of the recipe table, Defaults to True :type verbose: bool, optional :return: Recipe table scan (raw) results :rtype: boto3 response r;) TableNameLimitItemsr<Srr,r=r>r?)r<rr,r=r>r?z recipeid:z | bucket:z | path:z | description:z | maintainer:)r rscanr appendr*rW)rlimitrU ddb_clientrZ allrecipesrJr!r!r"rV2s HRzFarOpt.list_recipesc Cstd}|j|j|d}g}xp|dD]d}||dd|dd|ddd|r(td |ddd |ddd |ddq(W||_|S) aNReturns list of jobs submitted :param limit: Number of jobs to return, Defaults to 10 :type limit: int, optional :param verbose: Verbose print of the job table, Defaults to True :type verbose: bool, optional :return: job table scan (raw) results :rtype: boto3 response r;)rqrrrsrgrtrr,)rgrr,zjobid:z | bucket:z | path:)r rrur rvr*jobs)rrwrUrxrZalljobsrJr!r!r" list_jobsKs *6zFarOpt.list_jobsrc csd}d}x|dkr|dk r$d|i}ni}|ddddd}td }|jfd d ||d d |}|d}|d} t| }||kr| |d} d}n ||}g} x| D] } | VqWq WdS)z7Internal, use self.logs() instead of streaming NrZ nextTokentaskstaskArnr%logsfaroptlogGroupzfaroptlogs/FarOptImage/T) logGroupName logStreamNameZ startTimeZ startFromHeadZnextForwardTokenevents)statussplitr rget_log_eventslen) r start_timeskipZ next_tokenZ event_countZ token_argtaskarnrrrevr!r!r" stream_logsfs0     zFarOpt.stream_logscCsX|jrJ|jdkrJtd}|ddddd}|jd|d d }n td d S) zStops a submitted task Fecsr|rr}r%r~ FarOptClusterzUser stopped task)clusterZtaskreasonz0Please ensure you have submitted a non-micro jobN) rrFr rrrZ stop_taskrr)rrrrr!r!r"stops  z FarOpt.stopcCs2x,|dD] }tt|dd|dq WdS)Nr timestampz | message)r*rE)rrrr!r!r" printlogsszFarOpt.printlogscCs||dkrh|jdkr\|ddddd}td}|jd d |d }||qxt|j ntd |d S)z)Prints logs of a submitted job. )r[ZDEPROVISIONINGZRUNNINGFr|rr}r%r~rrzfaroptlogs/FarOptImage/)rrz,Please wait for the task to start running | N) r]rFrrr rrrr*ro)rrrrr!r!r"rs      z FarOpt.logscCs&|jdkr|dddSdSdS)zReturns the last status of the submitted job; Can be PROVISIONING > PENDING > RUNNING > STOPPED > JOB COMPLETED :return: primary staus :rtype: string Fr|rZ lastStatusr[N)rFr)rr!r!r"r]s zFarOpt.primary_statuscCs|jrz|jdkrztd}|jd|jd}|d}|gkr^td|jd|jdd}|d}|jd|dd gd }|St d d S) zxReturns the full status of the submitted job; used in primary_status, which should be enough for most use cases Frr)r startedByZtaskArnsz-No running tasks. Checking completed tasks...r[)rrZ desiredStatusr)rr|z$Please submit a non-micro job first!N) rrFr rZ list_tasksrHrrZdescribe_tasksr)rrZ response1Z running_tasksZ stopped_tasksrr!r!r"rs"   z FarOpt.statuscCsPtd}|d|}|jd|jdgttddtddd d gd }|S) a"Returns raw metric data that was submitted from the backend. To use this, do from utils import * in your main.py, and then use log_metric like this, for e.g: log_metric('total_distance',total_distance) :return: response from cloudwatch :rtype: json string cloudwatchrrg)NameValue)minutesr{ZAverageZMinimumZMaximum)Z DimensionsZ StartTimeZEndTimeZPeriodZ Statistics)r rAZMetricZget_statisticsrHrrbr)rZ metric_namerZmetricrr!r!r"get_metric_datas   zFarOpt.get_metric_dataN)rr )r:)F)F)F)rpT)rpT)rr)__name__ __module__ __qualname____doc__r#r9rKrTrYrZr`rPrQrVrzrrrrr]rrr!r!r!r"rs& 3 (    B   & r)r Zbotocore.exceptionsrrr(r&rruuidrr^rlri basicConfigINFOobjectrr!r!r!r"s