--- AWSTemplateFormatVersion: '2010-09-09' Description: This CloudFormation Template creates a monitoring solution for the AWS Batch Jobs in transit for more than "X" seconds before moving to RUNNING state. Parameters: NumberOfSeconds: Description: Enter the desired number of seconds (upper threshold beyond which if the Job stays in transit, a notification will be triggered). Type: Number EmailAddress: Description: Enter the Email Address where you want to get the notification. Type: String NumberofMinutes: Description: Enter the frequency in minutes to trigger the Lambda Function. Please be careful around the Syntax. Format - rate(value unit), If the value is equal to 1, then the unit must be singular. Similarly, for values greater than 1, the unit must be plural. For example, rate(1 hours) and rate(5 hour) are Invalid, but rate(1 hour) and rate(5 hours) are valid. Example:- rate(10 minutes) is a valid value Type: String Resources: BatchJobSNSNotificationTopic: Type: AWS::SNS::Topic Properties: Subscription: - Endpoint: !Ref EmailAddress Protocol: email BatchLambdaFunction: Type: AWS::Lambda::Function DependsOn: LambdaExecutionRole Properties: Code: ZipFile: | #!/usr/bin/env python import sys import boto3 from pprint import pprint import datetime import argparse import json import os import time def lambda_handler(event, context): """ Lambda Handler Function """ seconds = int(event['seconds']) try: result = get_all_transit_jobs(seconds) if len(result) > 0: notify_to_sns(seconds) except: print("An exception occurred while callling get_all_transit_jobs() Function ") return result def notify_to_sns(sec): """ Notify through SNS """ try: client = boto3.client('sns') transitJobs = get_all_transit_jobs(sec) topic = os.environ['NotificationTopicARN'] #response = client.publish(TopicArn=os.environ['NotificationTopicARN'], Message=json.dumps(transitJobs)) response = client.publish(TopicArn=topic, Message=json.dumps(transitJobs)) print("Notification delivered", response) except: print("An error occurred while sending SNS notification") def get_all_jobqueues(): """ Get all the Job queues """ try: client = boto3.client('batch') response = client.describe_job_queues() queues = response['jobQueues'] myqueues = [] for queue in queues: myqueues.append(str(queue['jobQueueArn'])) return myqueues except: print("An error while getting all job queues") def get_all_transit_jobs(sec): """ Get all the jobs in transit under each queue """ try: client = boto3.client('batch') myjobqueues = get_all_jobqueues() alltransitJobs = [] for jobqueue in myjobqueues: mytransitjob = {} queuejobs = client.list_jobs(jobQueue=jobqueue, jobStatus="RUNNABLE") jobs = queuejobs['jobSummaryList'] queuejobs1=client.list_jobs(jobQueue=jobqueue, jobStatus="PENDING") jobs2=queuejobs1['jobSummaryList'] for i in jobs2: jobs.append(i) queuejobs2=client.list_jobs(jobQueue=jobqueue, jobStatus="SUBMITTED") jobs3=queuejobs2['jobSummaryList'] for i in jobs3: jobs.append(i) if jobs: for job in jobs: mytransitjob={} mytransitjob["jobQueueName"] = jobqueue mytransitjob["status"] = job['status'] mytransitjob["jobId"] = job['jobId'] unixtimestamp = job['createdAt'] currenttime = datetime.datetime.now() readable = datetime.datetime.fromtimestamp(unixtimestamp/1000.0) mytransitjob["JobStuckInSeconds"] = (currenttime - readable).seconds if mytransitjob["JobStuckInSeconds"] > sec: alltransitJobs.append(mytransitjob) time.sleep(2) return alltransitJobs except: print("Exception while calling get_all_transit_jobs Function") Handler: "index.lambda_handler" Runtime: python3.9 Timeout: 30 FunctionName: MonitorTransitBatchJobLambdaFunction Role: !GetAtt LambdaExecutionRole.Arn Environment: Variables: NotificationTopicARN: !Ref BatchJobSNSNotificationTopic LambdaExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole Path: "/" Policies: - PolicyName: root PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - logs:CreateLogGroup - logs:CreateLogStream - logs:PutLogEvents Resource: arn:aws:logs:*:*:* - Effect: Allow Action: "*" Resource: "*" BatchTransitJobCWEventRule: Type: AWS::Events::Rule Properties: Description: "ScheduledRule to monitor the Batch jobs in transit" State: ENABLED ScheduleExpression: !Ref NumberofMinutes #ScheduleExpression: !Sub | # \"rate(${NumberofMinutes} minutes)" #ScheduleExpression: "rate(2 minutes)" Targets: - Id: BatchLambdaID #Input: !Join: ["",["{\"seconds\": \"!Ref NumberOfSeconds\"}"]] Input: !Sub | { "seconds": "${NumberOfSeconds}" } Arn: Fn::GetAtt: - BatchLambdaFunction - Arn LambdaInvokePermissions: Type: AWS::Lambda::Permission Properties: FunctionName: !Ref BatchLambdaFunction Action: lambda:InvokeFunction Principal: events.amazonaws.com SourceArn: Fn::GetAtt: - BatchTransitJobCWEventRule - Arn