# Running WDL and Nextflow pipelines with HealthOmics Workflows

In this tutorial, you will learn how to create, run, and debug WDL and Nextflow based pipelines that process data from HealthOmics Storage and Amazon S3 using HealthOmics Workflows.

## Prerequisites
### Python requirements
* Python >= 3.8
* Packages:
 * boto3 >= 1.26.19
 * botocore >= 1.29.19

### AWS requirements

#### AWS CLI
You will need the AWS CLI installed and configured in your environment. Supported AWS CLI versions are:

* AWS CLI v2 >= 2.9.3 (Recommended)
* AWS CLI v1 >= 1.27.19

#### Output buckets
You will need a bucket **in the same region** you are running this tutorial in to store workflow outputs.

#### Input data
If you modify any of the workflows to retrieve input data (e.g. references or raw reads), that data **MUST reside in the same region**. AWS HealthOmics does not support cross-region read or write at this time.

## Environment setup

In [None]:
import json
from datetime import datetime
import glob
import io
import os
from pprint import pprint
from textwrap import dedent
from time import sleep
from urllib.parse import urlparse
from zipfile import ZipFile, ZIP_DEFLATED

import boto3
import botocore.exceptions

## Create a service IAM role
To use AWS HealthOmics, you need to create an IAM role that grants the service permissions to access resources in your account. We'll do this below using the IAM client.

> **Note**: this step is fully automated from the HealthOmics Workflows Console when you create a run

In [None]:
dt_fmt = '%Y%m%dT%H%M%S'
ts = datetime.now().strftime(dt_fmt)

iam = boto3.client('iam')
role = iam.create_role(
 RoleName=f"OmicsServiceRole-{ts}",
 AssumeRolePolicyDocument=json.dumps({
 "Version": "2012-10-17",
 "Statement": [{
 "Principal": {
 "Service": "omics.amazonaws.com"
 },
 "Effect": "Allow",
 "Action": "sts:AssumeRole"
 }]
 }),
 Description="HealthOmics service role",
)

After creating the role, we next need to add policies to grant permissions. In this case, we are allowing read/write access to all S3 buckets in the account. This is fine for this tutorial, but in a real world setting you will want to scope this down to only the necessary resources. We are also adding a permissions to create CloudWatch Logs which is where any outputs sent to `STDOUT` or `STDERR` are collected.

In [None]:
AWS_ACCOUNT_ID = boto3.client('sts').get_caller_identity()['Account']

policy_s3 = iam.create_policy(
 PolicyName=f"omics-s3-access-{ts}",
 PolicyDocument=json.dumps({
 "Version": "2012-10-17",
 "Statement": [
 {
 "Effect": "Allow",
 "Action": [
 "s3:PutObject",
 "s3:Get*",
 "s3:List*",
 ],
 "Resource": [
 "arn:aws:s3:::*/*"
 ]
 }
 ]
 })
)

policy_logs = iam.create_policy(
 PolicyName=f"omics-logs-access-{ts}",
 PolicyDocument=json.dumps({
 "Version": "2012-10-17",
 "Statement": [
 {
 "Effect": "Allow",
 "Action": [
 "logs:CreateLogGroup"
 ],
 "Resource": [
 f"arn:aws:logs:*:{AWS_ACCOUNT_ID}:log-group:/aws/omics/WorkflowLog:*"
 ]
 },
 {
 "Effect": "Allow",
 "Action": [
 "logs:DescribeLogStreams",
 "logs:CreateLogStream",
 "logs:PutLogEvents",
 ],
 "Resource": [
 f"arn:aws:logs:*:{AWS_ACCOUNT_ID}:log-group:/aws/omics/WorkflowLog:log-stream:*"
 ]
 }
 ]
 })
)


for policy in (policy_s3, policy_logs):
 _ = iam.attach_role_policy(
 RoleName=role['Role']['RoleName'],
 PolicyArn=policy['Policy']['Arn']
 )

## Using AWS HealthOmics Workflows - the basics
AWS HealthOmics Workflows allows you to perform bioinformatics compute - like genomics secondary analysis - at scale on AWS. These compute workloads are defined using workflow languages like WDL and Nextflow that specify multiple compute tasks and their input and output dependencies.

The cell below creates an example WDL workflow. (To learn more about WDL see: https://github.com/openwdl/wdl). This is a simple workflow with one task that creates a copy of a file. It's simple enough that we can stash the entire definition in a Python string. Note that more complex workflows may be larger and have multiple files. In that case, it would be better to create and edit the workflow in a separate text editor, notably one that also supports language specific syntax highlighting.

In [None]:
os.makedirs('workflows/wdl/sample', exist_ok=True)

wdl = dedent("""
version 1.0

workflow Test {
	input {
		File input_file
	}

	call FileCopy {
		input:
			input_file = input_file,
	}

	output {
		File output_file = FileCopy.output_file
	}
}

task FileCopy {
	input {
		File input_file
	}

	command {
		echo "copying ~{input_file}" | tee >(cat >&2)
		cat ~{input_file} > output
	}

	output {
		File output_file = "output"
	}
}
""").strip()

with open('workflows/wdl/sample/main.wdl', 'wt') as f:
 f.write(wdl)

To run this workflow, we'll start by creating a client for the `omics` service.

In [None]:
omics = boto3.client('omics')

Now we need to bundle up the workflow as a zip-file and call the `create_workflow` API for `omics`. We'll encapsulate these operations in a function for reuse later.

In [None]:
def create_workflow(
 workflow_root_dir, 
 parameters={"param_name":{"description": "param_desc"}}, 
 name=None, 
 description=None, 
 main=None):
 buffer = io.BytesIO()
 print("creating zip file:")
 with ZipFile(buffer, mode='w', compression=ZIP_DEFLATED) as zf:
 for file in glob.iglob(os.path.join(workflow_root_dir, '**/*'), recursive=True):
 if os.path.isfile(file):
 arcname = file.replace(os.path.join(workflow_root_dir, ''), '')
 print(f".. adding: {file} -> {arcname}")
 zf.write(file, arcname=arcname)

 response = omics.create_workflow(
 name=name,
 description=description,
 definitionZip=buffer.getvalue(), # this argument needs bytes
 main=main,
 parameterTemplate=parameters,
 )

 workflow_id = response['id']
 print(f"workflow {workflow_id} created, waiting for it to become ACTIVE")

 try:
 waiter = omics.get_waiter('workflow_active')
 waiter.wait(id=workflow_id)

 print(f"workflow {workflow_id} ready for use")
 except botocore.exceptions.WaiterError as e:
 print(f"workflow {workflow_id} FAILED:")
 print(e)

 workflow = omics.get_workflow(id=workflow_id)
 return workflow

There are a few things to notice:

* To avoid polluting the local filesystem the zip-file is created in-memory with a byte buffer. If your workflow has many files such that the resultant bundle is large, you should consider alternative means of creating the zip file.
* A `main.(ext)` file, where `ext` matches the type of the workflow (e.g. `wdl`, or `nf`) must be at the root level of the zip file. HealthOmics uses this file as the primary entrypoint for the workflow. This is relevant for more modular workflows that have multiple definition files. In the call below, we explicitly point to `main.wdl`.
* The `definitionZip` argument takes a binary value and reads the byte buffer value directly.
* The `parameters` argument is a list of `parameterTemplate`s, which for now provide the parameter's name, and a description of what the parameter is. Actual parameter values are provided when the workflow is "run" - more on this below.

We can now use this function to create a workflow in HealthOmics Workflows from our WDL definition above:

In [None]:
workflow = create_workflow(
 'workflows/wdl/sample', 
 parameters={"input_file": {"description": "input text file to copy"}},
 name="Sample",
 description="Sample WDL workflow",
 main="main.wdl"
)
pprint(workflow)

Now we can start a workflow run with some input data using the `start_run` API call.

Note the following:
* Here the parameter value `input_file` is associated with an S3 uri. This is specific to this case. Workflow parameters will vary depending on the workflow definition they are associated with.

* We provide the ARN to the service role we created above. You can specify different roles as needed depending on what resources your workflow needs access to.

* We provide an `outputUri` with `start_run`. This is where the workflow will place **final** outputs as they are defined by the workflow definition (e.g. values in the `workflow.output` block of a WDL workflow). All intermediate results are discarded once the workflow completes.

In the cell below, we're using `waiters` to check for when the run starts and completes. These will block the current execution thread.

It will take about **30min** for this workflow to start (scale up resources), run, and stop (scale down resources). Because this workflow is simple, the time it spends in a `RUNNING` state is fairly short relative to the scale-up/down times. For more complex workflows, or ones that process large amounts of data, the `RUNNING` state will be much longer (e.g. several hours). In that case, it's recommended to asynchronously check on the workflow status.

In [None]:
role['Role']['Arn']

In [None]:
## NOTE: replace these S3 URIs with ones you have access to
input_uri = "s3://my_source_data_bucket/source_file_to_copy"
output_uri = "s3://my_results_data_bucket/path/to/results"

run = omics.start_run(
 workflowId=workflow['id'],
 name="Sample workflow run",
 roleArn=role['Role']['Arn'],
 parameters={
 "input_file": input_uri
 },
 outputUri=output_uri,
)

print(f"running workflow {workflow['id']}, starting run {run['id']}")
try:
 waiter = omics.get_waiter('run_running')
 waiter.wait(id=run['id'], WaiterConfig={'Delay': 30, 'MaxAttempts': 60})

 print(f"run {run['id']} is running")

 waiter = omics.get_waiter('run_completed')
 waiter.wait(id=run['id'], WaiterConfig={'Delay': 60, 'MaxAttempts': 60})

 print(f"run {run['id']} completed")
except botocore.exceptions.WaiterError as e:
 print(e)

Once the run completes we can verify its status by either listing it:

In [None]:
[_ for _ in omics.list_runs()['items'] if _['id'] == run['id']]

or getting its full details:

In [None]:
omics.get_run(id=run['id'])

We can verify that the correct output was generated by listing the `outputUri` for the workflow run:

In [None]:
s3uri = urlparse(omics.get_run(id=run['id'])['outputUri'])
boto3.client('s3').list_objects_v2(
 Bucket=s3uri.netloc, 
 Prefix='/'.join([s3uri.path[1:], run['id']])
)['Contents']

Workflows typically have multiple tasks. We can list workflow tasks with:

In [None]:
tasks = omics.list_run_tasks(id=run['id'])
pprint(tasks['items'])

and get specific task details with:

In [None]:
task = omics.get_run_task(id=run['id'], taskId=tasks['items'][0]['taskId'])
pprint(task)

After running the cell above we should see that each task has an associated CloudWatch Logs LogStream. These capture any text generated by the workflow task that has been sent to either `STDOUT` or `STDERR`. These outputs are helpful for debugging any task failures and can be retrieved with:

In [None]:
events = boto3.client('logs').get_log_events(
 logGroupName="/aws/omics/WorkflowLog",
 logStreamName=f"run/{run['id']}/task/{task['taskId']}"
)
for event in events['events']:
 print(event['message'])

## Using HealthOmics Workflows RunGroups
HealthOmics Workflows Run Groups are a means to control the amount of resources a set of workflows has, and therefore costs associated with running workflows. With a Run Group you can set the max number of concurrent vCPUs used by tasks, the maximum duration of tasks, or the max concurrent number of runs.

In the cell below, we'll create a run group that with a maximum of 100 vCPUs and a workflow duration limit of 600 minutes (10hrs).

In [None]:
run_group = omics.create_run_group(
 name="TestRunGroup",
 maxCpus=100,
 maxDuration=600,
)

omics.get_run_group(id=run_group['id'])

One of the ways you can use a RunGroup is to run multiple iterations of a workflow - each with different input values. Below we'll define a simple Nextflow workflow that takes a simple string parameter that we can easily modify for multiple iterations.

In [None]:
os.makedirs('workflows/nf/sample', exist_ok=True)

nf = dedent('''
nextflow.enable.dsl = 2

params.greeting = 'hello'
params.addressee = null

if (!params.addressee) exit 1, "required parameter 'addressee' missing"

process Greet {
 publishDir '/mnt/workflow/pubdir'
 input:
 val greeting
 val addressee
 
 output:
 path "output", emit: output_file
 
 script:
 """
 echo "${greeting} ${addressee}" | tee output
 """
}

workflow {
 Greet(params.greeting, params.addressee)
}

''').strip()

with open('workflows/nf/sample/main.nf', 'wt') as f:
 f.write(nf)

We'll use the `create_function` function we defined above to create an HealthOmics Workflow from this definition:

In [None]:
workflow = create_workflow(
 'workflows/nf/sample',
 parameters={
 "greeting": {
 "description": "(string) greeting to use",
 "optional": True
 },
 "addressee": {
 "description": "(string) who to greet"
 }
 },
 name="GreetingsNF",
 description="Greetings Nextflow workflow",
 main="main.nf"
)
pprint(workflow)

We can now run our this workflow with our run group. We'll start several runs of the workflow concurrently, each with different inputs to distinguish them, to see how the run group works:

In [None]:
rg_runs = []
run_inputs = [
 {"greeting": "Hello", "addressee": "AWS"},
 {"greeting": "Bonjour", "addressee": "HealthOmics"},
 {"greeting": "Hola", "addressee": "Workflows"},
]

for run_num, run_input in enumerate(run_inputs):
 run = omics.start_run(
 workflowId=workflow['id'],
 name=f"{workflow['name']} - {run_num} :: {run_input}",
 roleArn=role['Role']['Arn'],
 parameters=run_input,
 outputUri=output_uri,
 
 runGroupId=run_group['id'], # <-- here is where we specify the run group
 )
 
 print(f"({run_num}) workflow {workflow['id']}, run group {run_group['id']}, run {run['id']}, input {run_input}")
 rg_runs += [run]

We can now list all the runs in the RunGroup and should see all of them transition from `PENDING` to `STARTING` at once.

(run the following cell multiple times)

In [None]:
[(_['id'], _['status']) for _ in omics.list_runs(runGroupId=run_group['id'])['items']]