# Profiling workflow performance with the Amazon Genomics CLI

Profiling is an essential part of developing genomics workflows. By identifying and eliminating expensive bottlenecks, users can make workflow performant and cost efficient. A common way to profile a workflow is to generate a timing chart of all tasks executed. The [Amazon Genomics CLI](https://aws.amazon.com/genomics-cli/) (AGC) provides a unified experience for running workflows across [multiple workflow engines](https://aws.github.io/amazon-genomics-cli/docs/concepts/engines/). In doing so, it also enables a common way to generate timing charts and profile workflows.

This notebook provides example code that demonstrates how to profile workflows run by the Amazon Genomics CLI as described by the blog "Profiling workflow performance with the Amazon Genomics CLI".

## Prerequisites
The code in this notebook also requires the following Python packages:
* boto3
* bokeh >= 2.1.1

To export plots as PNGs, you will also need:
* selenium
* firefox or geckodriver

You'll also need to have run a workflow with the Amazon Genomics CLI and generated the following output files:
* a workflow log from `agc logs workflow`
* a JSON formated GA4GH WES RunLog that includes AWS Batch job descriptions for each task

Some examples of the above files have been included.

## Getting started
First, a few imports from the Python standard library (`datetime`, `json`, `re`), the Bokeh and Boto3 packages, and some functions from `utils.py`.

In [1]:
from datetime import datetime
import json
from os import path
import re

from bokeh.models import ColumnDataSource, Range1d, Div
from bokeh.layouts import gridplot, column
from bokeh.plotting import figure, output_notebook, show, save
from bokeh.io.export import export_png
from bokeh.resources import CDN
import boto3

from utils import parse_datetime, get_job_resources, get_job_descriptions

We'll also create a set of time conversion factors to make changing units easier when plotting

In [2]:
TIME_SCALE_FACTORS = {
    "sec": 1, "min": 1/60, "hr": 1/3600
}

## Functions
Let's define the functions we'll need for parsing the log data and generating plots.

### Parsing `agc logs workflow` output
Here is a function that can parse the text output from an `agc logs workflow` command. We won't actually use this function because the output in `agc` v1.3.x has an occasional formatting edge case (i.e. missing separators between task names and job-ids). It is here for reference purposes only.

In [3]:
def parse_agc_log(log_file):
    """
    Parses the text output from an 'agc logs workflow' command
    
    FOR REFERENCE ONLY
    """
    with open(log_file, 'r') as f:
        lines = f.readlines()
        lines = list(filter(lambda x: x.strip(), lines))

    data = {
        k: lines[i].split(":")[1].strip()
        for i, k in enumerate(['runid', 'state'])
    }

    data['tasks'] = [
        dict(zip(
            ['name', 'job_id', 'start_time', 'end_time', 'exit_code'], 
            re.split('\t+', line.strip())
        )) 
        for line in lines[4:]
    ]
    
    for i, task in enumerate(data['tasks']):
        for k in ('start_time', 'end_time'):
            task[k] = parse_datetime(task[k])
        data['tasks'][i] = task

    return data

### Parsing GA4GH WES RunLogs
This is the function we'll use to read GA4GH WES RunLogs that `agc` uses internally that have been saved as `*.json` files. There are optional arguments to fetch job descriptions from AWS Batch.

**Note:** AWS Batch only stores job details for 48hrs after a job completes. Fetching job descriptions from AWS Batch should be done as soon as a workflow completes. This notebook assumes that AWS Batch job descriptions have been pre-fetched and included with the GA4GH WES RunLog.

In [4]:
def parse_wes_runlog(runlog_json, with_job_descriptions=False, batch=boto3.client('batch')):
    """Parses JSON formated GA4GH WES RunLog"""
    with open(runlog_json, 'r') as f:
        runlog = json.load(f)
    
    tasks = runlog['task_logs']    
    for i, task in enumerate(tasks):
        for k in ('start_time', 'end_time'):
            tasks[i][k] = parse_datetime(task[k])
    
        name, job_id = task['name'].split("|")
        tasks[i]['job_name'] = name
        tasks[i]['job_id'] = job_id
    
    if with_job_descriptions:
        descriptions = get_job_descriptions([task['job_id'] for task in tasks], batch)
        for i, task in enumerate(tasks):
            tasks[i]['job_description'] = descriptions.get(task['job_id'])
    
    runlog['task_logs'] = sorted(tasks, key=lambda x: (x['start_time'], x['name']))    
    return runlog

### Computing task cost
This funcction is a linear equation that maps vCPUs and Memory (GiB) to Linux On-Demand USD per hour EC2 costs.

**Note:** AWS prices vary by region and are updated regularly, so your model constants may differ. You can collect data to generate your own model by going to the [AWS EC2 console](https://us-west-2.console.aws.amazon.com/ec2/v2/home), and exporting a table that contains [instance types](https://us-west-2.console.aws.amazon.com/ec2/v2/home#InstanceTypes:) and their Linux On-demand per hour cost.

In [5]:
def approx_instance_cost_per_hr(vcpu, mem_gib):
    # rough OLS fit model based on c, m, r instance pricing filtered for x86_64 and non-(a,d,n)
    # returns linux on-demand per hour cost in USD
    usd = 0.03550716*vcpu + 0.003633091*mem_gib + 0.00718333
    
    return usd

### Plotting
Here we have two functions. The first, `plot_tasks_bokeh1`, is simple and only plots tasks based on their timing. The second, `plot_tasks_bokeh2`, plots timing and additional information extracted from each task's AWS Batch job description.

In [6]:
def plot_tasks_bokeh1(tasks, time_units='min', **kwargs):
    """
    plots task timing using engine reported timing (wes runlog)
    
    simple version that only shows task timing
    """
    time_scale_factor = TIME_SCALE_FACTORS[time_units]
    
    tare = min([task['start_time'] for task in tasks])
    wall_time = (max([task['end_time'] for task in tasks]) - tare).total_seconds() * time_scale_factor
    
    stats = {
        'num_tasks': len(tasks),
        'timing': {
            'wallclock': wall_time,
            'units': time_units
        }
    }
    
    source = ColumnDataSource(data=dict(
        y=range(len(tasks)),
        names=[task['job_name'] for task in tasks],
        running_left=[(task['start_time'] - tare).total_seconds() * time_scale_factor for task in tasks],
        running_right=[(task['end_time'] - tare).total_seconds() * time_scale_factor for task in tasks],
        text_x=[((task['end_time'] - tare).total_seconds() + 30) * time_scale_factor for task in tasks],
    ))
    
    output_notebook()
    
    p_job = figure(width=960, height=800)
    p_job.hbar(y='y', left='running_left', right='running_right', height=0.8, source=source)
    p_job.text(x='text_x', y='y', text='names', alpha=0.4, text_baseline='middle', text_font_size='1.5ex', source=source)
    x_max = 5*3600*time_scale_factor # max expected workflow duration in hours
    x_min = -(x_max * 0.05)
    p_job.x_range = Range1d(x_min, x_max)
    p_job.y_range.flipped = True
    p_job.xaxis.axis_label = f"task execution time ({time_units})"
    p_job.title.text = (
        f"tasks: {stats['num_tasks']}, "
        f"wall_time: {stats['timing']['wallclock']:.2f} {stats['timing']['units']}"
    )
    
    show(p_job)
    
    return p_job


def plot_tasks_bokeh2(tasks, time_units='min', info=None, show_plot=True):
    """
    plots task timing using batch job description data
    
    improved version that includes additional details like per task cpu and memory utilization and cost
    """
    time_scale_factor = TIME_SCALE_FACTORS[time_units]
    millis_to_sec = 1/1000
    mebi_to_gebi = 1/1024
    
    jobs = [task['job_description'] for task in tasks]
    tare = min([job['createdAt'] for job in jobs])
    
    colors={'SUCCEEDED': 'cornflowerblue', 'FAILED': 'orangered'}
    
    source = ColumnDataSource(data=dict(
        y = range(len(jobs)),
        names=[job['jobName'] for job in jobs],
        color=[colors[job['status']] for job in jobs],
        cost=[
            approx_instance_cost_per_hr(
                get_job_resources(job)['vcpus'], 
                get_job_resources(job)['memory'] * mebi_to_gebi
            ) * ((job['stoppedAt'] - job['startedAt']) * millis_to_sec / 3600)
            for job in jobs],
        cpus=[get_job_resources(job)['vcpus'] for job in jobs],
        memory=[get_job_resources(job)['memory'] * mebi_to_gebi for job in jobs],
        queued_left=[(job['createdAt'] - tare) * millis_to_sec * time_scale_factor for job in jobs],
        queued_right=[(job['startedAt'] - tare) * millis_to_sec * time_scale_factor for job in jobs],
        running_left=[(job['startedAt'] - tare) * millis_to_sec * time_scale_factor for job in jobs],
        running_right=[(job['stoppedAt'] - tare) * millis_to_sec * time_scale_factor for job in jobs],
        text_x=[((job['stoppedAt'] - tare) * millis_to_sec + 30) * time_scale_factor for job in jobs],
    ))
    
    wall_time = (max([job['stoppedAt'] for job in jobs]) - tare) * millis_to_sec * time_scale_factor
    total_cpu_time = sum([(job['stoppedAt'] - job['startedAt']) * cpu for job, cpu in zip(jobs, source.data['cpus'])]) * millis_to_sec * time_scale_factor
    total_cost = sum(source.data['cost'])
    stats = {
        'num_tasks': len(tasks),
        'timing': {
            'total_cpu': total_cpu_time,
            'wallclock': wall_time,
            'units': time_units
        }
    }
    
    output_notebook()
    
    p_job = figure(width=960, height=800, sizing_mode="stretch_both")
    p_job.hbar(y='y', left='queued_left', right='queued_right', height=0.8, color='lightgrey', source=source, legend_label="queued")
    p_job.hbar(y='y', left='running_left', right='running_right', height=0.8, color='color', source=source, legend_label="running")
    p_job.text(x='text_x', y='y', text='names', alpha=0.4, text_baseline='middle', text_font_size='1.5ex', source=source)
    x_max = 5*3600*time_scale_factor # max expected workflow duration in hours
    x_min = -(x_max * 0.05)
    p_job.x_range = Range1d(x_min, x_max)
    p_job.y_range.flipped = True
    p_job.xaxis.axis_label = f"task execution time ({time_units})"
    p_job.yaxis.visible = False
    p_job.legend.location = "top_right"
    p_job.title.text = (
        f"tasks: {stats['num_tasks']}, "
        f"total_cpu_time: {stats['timing']['total_cpu']:.2f} {stats['timing']['units']}, "
        f"wall_time: {stats['timing']['wallclock']:.2f} {stats['timing']['units']}"
    )
       
    p_usd = figure(width=160, y_range=p_job.y_range, sizing_mode="stretch_height")
    p_usd.hbar(y='y', right='cost', height=0.8, color='limegreen', source=source)
    x_max = 0.1
    x_min = -(x_max * 0.05)
    p_usd.x_range = Range1d(x_min, x_max*1.05)
    p_usd.xaxis.axis_label = "task cost (USD)"
    p_usd.title.text = f"total: {total_cost:.2f} USD"
    
    p_cpu = figure(width=160, y_range=p_job.y_range, sizing_mode="stretch_height")
    p_cpu.hbar(y='y', right='cpus', height=0.8, color="darkgrey", source=source)
    p_cpu.x_range = Range1d(-1, 32)
    p_cpu.xaxis.axis_label = "vcpus"
    p_cpu.yaxis.visible = False
    p_cpu.title.text = f"max cpus: {max(source.data['cpus'])}"
    
    p_mem = figure(width=160, y_range=p_job.y_range, sizing_mode="stretch_height")
    p_mem.hbar(y='y', right='memory', height=0.8, color="darkgrey", source=source)
    p_mem.x_range = Range1d(-1, 32)
    p_mem.xaxis.axis_label = "memory (GiB)"
    p_mem.yaxis.visible = False
    p_mem.title.text = f"max mem: {max(source.data['memory']):.2f} GiB"
    
    title_text = ""
    if info:
        info_text = f"{info['context']['engine']} : {info['workflow']['name']} : {info['context']['launch_type']} : {info['state']} ({info['runid']})"
        title_text = info_text
    
    g = gridplot([p_usd, p_cpu, p_mem, p_job], ncols=4, toolbar_location="right")
    
    title = Div(text=f"<strong>{title_text}</strong>")
    
    layout = column(title, g)
        
    if show_plot:
        show(layout)
    
    return layout

## Demo

Start by reading in a JSON file with GA4GH WES RunLog output for a worklow that was run with Amazon Genomics CLI.

In [7]:
runlog_file = 'logs/gatk4-data-processing__onDemandCtxCromwell__8cf8e737-6584-4309-ab5f-0aae8e885369.wes_runlog__sorted-with-job-desc.json'
runlog_basename = path.basename(runlog_file).split('.')[0]
runlog = parse_wes_runlog(runlog_file)

### Plotting - timing only

In [8]:
plot1 = plot_tasks_bokeh1(runlog['task_logs'])
html1 = save(plot1, filename=f"{runlog_basename}__plot1.html", title=runlog_basename, resources=CDN)
png1 = export_png(plot1, filename=f"{runlog_basename}__plot1.png")

### Plotting - timing, compute resources, costs

In [9]:
info = {
    'state': runlog['state'],
    'runid': runlog['run_id'],
    'context': {'engine': 'cromwell', 'launch_type': 'on-demand'}, 
    'workflow': {'name': 'gatk4-data-processing'}}
plot2 = plot_tasks_bokeh2(runlog['task_logs'], info=info)
html2 = save(plot2, filename=f"{runlog_basename}__plot2.html", title=runlog_basename, resources=CDN)
png2 = export_png(plot2, filename=f"{runlog_basename}__plot2.png")

**Note:** The input for the above workflow is a small dataset used for testing and demo purposes. The overall cost shown is not representative of a workflow processing a full 30x human whole genome sample.