# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License").
# You may not use this file except in compliance with the License.
# A copy of the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "LICENSE.txt" file accompanying this file.
# This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import sys
import time
import argparse
from awsbatch.common import AWSBatchCliConfig, Boto3ClientFactory, config_logger
from awsbatch.utils import convert_to_date, fail, get_job_type
def _get_parser():
"""
Parse input parameters and return the ArgumentParser object.
If the command is executed without the --cluster parameter, the command will use the default cluster_name
specified in the [main] section of the user's awsbatch-cli.cfg configuration file and will search
for the [cluster cluster-name] section, if the section doesn't exist, it will ask to CloudFormation
the required information.
If the --cluster parameter is set, the command will search for the [cluster cluster-name] section
in the user's awsbatch-cli.cfg configuration file or, if the file doesn't exist, it will ask to CloudFormation
the required information.
:return: the ArgumentParser object
"""
parser = argparse.ArgumentParser(description="Shows the output of the given Job.")
parser.add_argument("-c", "--cluster", help="Cluster to use")
parser.add_argument("-hd", "--head", help="Gets the first
lines of the job output", type=int)
parser.add_argument("-t", "--tail", help="Gets the last lines of the job output", type=int)
parser.add_argument(
"-s",
"--stream",
help="Gets the job output and waits for additional output to be produced. "
"It can be used in conjunction with --tail to start from the "
"latest lines of the job output",
action="store_true",
)
parser.add_argument("-sp", "--stream-period", help="Sets the streaming period. Default is 5", type=int)
parser.add_argument("-ll", "--log-level", help=argparse.SUPPRESS, default="ERROR")
parser.add_argument("job_id", help="The job ID")
return parser
def _validate_parameters(args):
"""
Validate input parameters.
:param args: args variable
"""
if args.head:
if args.tail:
fail("Parameters validation error: --tail and --head option cannot be set at the same time")
if args.stream:
fail("Parameters validation error: --stream and --head option cannot be set at the same time")
if args.stream_period and not args.stream:
fail("Parameters validation error: --stream-period can be used only with --stream option")
class AWSBoutCommand:
"""awsbout command."""
def __init__(self, log, boto3_factory):
"""
Initialize the object.
:param log: log
:param boto3_factory: an initialized Boto3ClientFactory object
"""
self.log = log
self.boto3_factory = boto3_factory
def run(self, job_id, head=None, tail=None, stream=None, stream_period=None):
"""Print job output."""
log_stream = self.__get_log_stream(job_id)
if log_stream:
self.log.info("Log stream is (%s)" % log_stream)
self.__print_log_stream(log_stream, head, tail, stream, stream_period)
def __get_log_stream(self, job_id):
"""
Get log stream for the given job.
:param job_id: job id (ARN)
:return: the log_stream if there, or None
"""
log_stream = None
try:
batch_client = self.boto3_factory.get_client("batch")
jobs = batch_client.describe_jobs(jobs=[job_id])["jobs"]
if len(jobs) == 1:
job = jobs[0]
self.log.debug(job)
if "nodeProperties" in job:
# MNP job
container = job["nodeProperties"]["nodeRangeProperties"][0]["container"]
elif "container" in job:
container = job["container"]
else:
container = {}
if get_job_type(job) != "SIMPLE":
fail("No output available for the Job (%s). Please ask for its children." % job["jobId"])
else:
if "logStreamName" in container:
log_stream = container.get("logStreamName")
else:
print("No log stream found for job (%s) in the status (%s)" % (job_id, job["status"]))
else:
fail("Error asking job output for job (%s). Job not found." % job_id)
except Exception as e:
fail("Error listing jobs from AWS Batch. Failed with exception: %s" % e)
return log_stream
def __print_log_stream(self, log_stream, head=None, tail=None, stream=None, stream_period=None): # noqa: C901 FIXME
"""
Ask for log stream and print it.
:param log_stream: job log stream
"""
logs_client = self.boto3_factory.get_client("logs")
try:
# The maximum number of log events returned by the get_log_events function is as many log events
# as can fit in a response size of 1 MB, up to 10,000 log events
max_limit = 10000
if head:
limit = head
start_from_head = True
elif tail:
limit = tail
start_from_head = False
else:
limit = max_limit
start_from_head = False
response = logs_client.get_log_events(
logGroupName="/aws/batch/job", logStreamName=log_stream, limit=limit, startFromHead=start_from_head
)
events = response["events"]
self.log.debug(response)
if not events:
print("No events found.")
self.__print_events(events)
if limit == max_limit or stream:
# get paginated items
next_token = response["nextForwardToken"]
while next_token is not None or stream:
self.log.info("Next Forward Token is (%s)" % next_token)
if stream:
period = stream_period if stream_period else 5
self.log.info("Waiting other %s seconds..." % period)
time.sleep(period)
response = logs_client.get_log_events(
logGroupName="/aws/batch/job", logStreamName=log_stream, nextToken=next_token
)
self.__print_events(response["events"])
# if nextForwardToken is the same we passed in, we reached the end of the stream
if stream:
next_token = response["nextForwardToken"]
else:
next_token = (
response["nextForwardToken"] if response["nextForwardToken"] != next_token else None
)
except KeyboardInterrupt:
self.log.info("Interrupted by the user")
sys.exit(0)
except Exception as e:
fail("Error listing jobs from AWS Batch. Failed with exception: %s" % e)
@staticmethod
def __print_events(events):
"""
Print given events.
:param events: events to print
"""
for event in events:
print("{0}: {1}".format(convert_to_date(event["timestamp"]), event["message"]))
def main():
"""Command entrypoint."""
try:
# parse input parameters and config file
args = _get_parser().parse_args()
_validate_parameters(args)
log = config_logger(args.log_level)
log.info("Input parameters: %s", args)
config = AWSBatchCliConfig(log=log, cluster=args.cluster)
boto3_factory = Boto3ClientFactory(region=config.region, proxy=config.proxy)
AWSBoutCommand(log, boto3_factory).run(
job_id=args.job_id, head=args.head, tail=args.tail, stream=args.stream, stream_period=args.stream_period
)
except KeyboardInterrupt:
print("Exiting...")
sys.exit(0)
except Exception as e:
fail("Unexpected error. Command failed with exception: %s" % e)
if __name__ == "__main__":
main()