import ast
import os
import boto3
from collections import OrderedDict
import dateutil.tz
from datetime import datetime, timedelta, date
from pprint import pprint
session = boto3.Session()
glue_client = session.client('glue')
cost_explorer_client = boto3.client('ce')
runs = []
J_N = "Job Name"
JR = "JobRuns"
JN = "JobName"
SO = "StartedOn"
JS = "Job State"
JRS = "JobRunState"
AT = "Attempt"
CO = "CompletedOn"
ET = "Execution Time"
DPU = "Capacity/Workers"
COST = "Job Run Cost($)"
GLUE_JOB_COST_PER_DPU = 0.44
table_html = """'
Glue Job Status (Past 24 Hours)
'"""
table_header = """
Job Name | Job State | Attempt | Started On | Completed On | Execution Time(Secs) | Capacity/Workers | Job Run Cost($) |
"""
def get_prev_day():
return (date.today() - timedelta(days=1)).isoformat()
def get_today():
return date.today().isoformat()
def get_total_billing(client) -> dict:
start_date = get_prev_day()
end_date = get_today()
print(start_date, end_date)
response = client.get_cost_and_usage(
TimePeriod={
'Start': start_date,
'End': end_date
},
Filter={'Dimensions': {'Key': 'SERVICE', 'Values': ['AWS Glue']}},
Granularity='DAILY',
Metrics=[
'NetUnblendedCost'
]
)
return {
'start': response['ResultsByTime'][0]['TimePeriod']['Start'],
'end': response['ResultsByTime'][0]['TimePeriod']['End'],
'billing': round(float(response['ResultsByTime'][0]['Total']['NetUnblendedCost']['Amount'])),
}
def get_jb_rn_dtl(jobName, lbh=24):
data = glue_client.get_job_runs(JobName=jobName, MaxResults=30)
job_runs = []
run_count = len(data[JR])
if run_count > 0:
for i in range(0, min(26, run_count)):
run = {}
pprint(data[JR][i])
run[JN] = data[JR][i][JN]
try:
if (datetime.now(dateutil.tz.tzlocal()) - data[JR][i][
SO]).total_seconds() > int(lbh) * 60 * 60:
break
except IndexError:
continue
run[SO] = data[JR][i][SO]
run[JRS] = data[JR][i][JRS]
run['Attempt'] = data[JR][i][AT]
if data[JR][i]['GlueVersion'] == '1.0':
run[DPU] = data[JR][i]['MaxCapacity']
else:
run[DPU] = data[JR][i]['NumberOfWorkers']
try:
run[CO] = data[JR][i][CO]
run[ET] = (run[CO] - run[SO]).seconds
except KeyError:
pass
job_runs.append(run)
print(job_runs)
return job_runs
def get_job_rundetail(job_names, lbh):
for job_name in job_names:
run = get_jb_rn_dtl(job_name, lbh)
runs.append(run)
return runs
def get_job_names():
job_names = glue_client.list_jobs(MaxResults=100)
return job_names.get('JobNames')
def get_run_cost(number_of_dpu, execution_time):
execution_time_in_minutes, seconds = divmod(int(execution_time),60)
if seconds > 0:
execution_time_in_minutes = execution_time_in_minutes + 1
job_run_cost = round(execution_time_in_minutes * number_of_dpu * GLUE_JOB_COST_PER_DPU,2)
return job_run_cost
def publish_in_ses(all_runs, table_html, table_header):
lines = []
for runs in all_runs:
for run in runs:
item = OrderedDict()
item[J_N] = run.get(JN, "")
item[JS] = run.get(JRS, "")
item[AT] = run.get(AT, "")
started_on = str(run.get(SO, "")).split('.')
item[SO] = started_on[0]
completed_on = str(run.get(CO, "")).split('.')
item[CO] = completed_on[0]
execution_time = str(run.get(ET, ""))
item[ET] = execution_time
item[DPU] = run.get(DPU, "")
item[COST] = get_run_cost(item[DPU], execution_time)
if len(run) != 0:
lines.append(item)
html_table = []
lines = sorted(lines,
key=lambda i: (i[JS], i[SO], i[CO]))
for line in lines:
print(line.keys())
html_table.append("")
for key in [J_N, JS, AT, SO, CO, 'Execution Time', DPU,COST]:
html_table.append(f"{line[key]} | ")
html_table.append("
")
html_table.append("
")
total_job = len(set(i['Job Name'] for i in lines))
count_fail = sum(1 for i in lines if i[JS] == "FAILED")
count_running = sum(1 for i in lines if i[JS] == "RUNNING")
count_success = sum(1 for i in lines if i[JS] == "SUCCEEDED")
count_all = count_fail + count_running + count_success
aws_glue_cost = get_total_billing(cost_explorer_client)
subject1 = f"Number of Jobs {total_job}: Executions {count_all}: Failed {count_fail} Running {count_running} Succeeded {count_success} "
subject2 = f"AWS Glue Cost: ${aws_glue_cost['billing']} Billing Start Date: {aws_glue_cost['start']} Billing End Date: {aws_glue_cost['end']}"
table_html_meta = table_html + subject1 + "" + subject2 + table_header
full_html = table_html_meta + "".join(html_table)
print(full_html)
return subject1, full_html
def main(snd, to_a, lbh, table_html, table_header):
job_names = get_job_names()
runs = get_job_rundetail(job_names, lbh)
subject, html_content = publish_in_ses(runs, table_html, table_header)
ses = boto3.client('ses')
response = ses.send_email(
Source=snd,
Destination={
'ToAddresses': ast.literal_eval(to_a)
},
Message={
'Subject': {
'Data': subject,
},
'Body': {
'Html': {
'Data': html_content,
}
}
}
)
print(response)
def handler(event, context):
snd = os.environ['fromEmail']
to_a = os.environ['toEmail']
look_back_hours = os.environ['lookBackHours']
main(snd, to_a, look_back_hours, table_html, table_header)