## [Cromwell on AWS](https://docs.opendata.aws/genomics-workflows/)

[Cromwell](https://github.com/broadinstitute/cromwell) is a Workflow Management System geared towards scientific workflows. Cromwell is open sourced under the [BSD 3-Clause license](https://github.com/broadinstitute/cromwell/blob/develop/LICENSE.txt).

![Image of Cromwell](https://docs.opendata.aws/genomics-workflows/cromwell/images/cromwell-on-aws_infrastructure.png)

### Initialize Notebook Environment

In [None]:
import boto3
import sys
import os
import json
import base64
import project_path # path to helper methods
import pprint
import time
import pandas as pd

from lib import workshop
from botocore.exceptions import ClientError

cfn = boto3.client('cloudformation')
batch = boto3.client('batch')
iam = boto3.client('iam')
ec2_client = boto3.client('ec2')
session = boto3.session.Session()
region = session.region_name

key_name = 'genomics-ami'

### [Create S3 Bucket](https://docs.aws.amazon.com/AmazonS3/latest/gsg/CreatingABucket.html)

We will create an S3 bucket that will be used throughout the workshop for storing our data.

[s3.create_bucket](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.create_bucket) boto3 documentation

In [None]:
bucket = workshop.create_bucket_name('genomics-')
session.resource('s3').create_bucket(Bucket=bucket, CreateBucketConfiguration={'LocationConstraint': region})
print(bucket)

response = session.client('s3').put_bucket_encryption(
 Bucket=bucket,
 ServerSideEncryptionConfiguration={
 'Rules': [
 {
 'ApplyServerSideEncryptionByDefault': {
 'SSEAlgorithm': 'AES256'
 }
 },
 ]
 }
)

### [Create VPC](https://aws.amazon.com/vpc/)

Amazon Virtual Private Cloud (Amazon VPC) lets you provision a logically isolated section of the AWS Cloud where you can launch AWS resources in a virtual network that you define. You have complete control over your virtual networking environment, including selection of your own IP address range, creation of subnets, and configuration of route tables and network gateways. You can use both IPv4 and IPv6 in your VPC for secure and easy access to resources and applications.

In [None]:
vpc, subnet, subnet2 = workshop.create_and_configure_vpc()
vpc_id = vpc.id
subnet_id = subnet.id
subnet2_id = subnet2.id
print(vpc_id)
print(subnet_id)
print(subnet2_id)

### [Create a custom AMI for Cromwell](https://docs.opendata.aws/genomics-workflows/aws-batch/create-custom-ami/)

In all cases, you will need a AMI ID for the AWS Batch Compute Resource AMI that you created using the ["Create a Custom AMI"](https://docs.opendata.aws/genomics-workflows/aws-batch/create-custom-ami/) guide! We do not provide a default value since for most genomics workloads, you will need to account for more storage than the default AWS Batch AMI provides. We will download and execute a Python script to generate the custom AMI for use with Cromwell.

In [None]:
!wget https://s3.amazonaws.com/aws-genomics-workflows/artifacts/aws-custom-ami.tgz

In [None]:
!tar --warning=no-unknown-keyword -xzf aws-custom-ami.tgz && rm aws-custom-ami.tgz

Execute the Python script passing in the created VPC and Subnet from above. We will be using the UserData for cromwell in this example and the key pair will be created from the script.

Replace the values for `vpc_id` and `subnet_id` with values from the creation of the vpc above.

In [None]:
!python ./custom-ami/create-genomics-ami.py --user-data custom-ami/cromwell-genomics-ami.cloud-init.yaml --key-pair-name 'genomics-ami' \
--vpc-id '{{vpc_id}}' --subnet-id '{{subnet_id}}' --use-instance-profile


### [Create the Batch Environment](https://docs.opendata.aws/genomics-workflows/aws-batch/configure-aws-batch-cfn/)

We will create the required AWS Batch environment for genomics workflows in the next few cells. There is a [`Full Stack`](https://console.aws.amazon.com/cloudformation/home?#/stacks/new?stackName=GenomicsEnv-Full&templateURL=https://s3.amazonaws.com/aws-genomics-workflows/templates/aws-genomics-root.template.yaml) template that is self-contained and will create all of the AWS resources, including VPC network, security groups, etc if you want to access the quickstart. 

#### Helper Methods for the Batch Environment

In [None]:
def create_compute_environment(computeEnvironmentName, computeType, unitVCpus, imageId, serviceRole, instanceRole,
 subnets, securityGroups, keyPair, bidPercentage=None, spotFleetRole=None):
 
 compute_resources = {
 'type': computeType,
 'imageId': imageId,
 'minvCpus': unitVCpus * 1,
 'maxvCpus': unitVCpus * 16,
 'desiredvCpus': unitVCpus * 1,
 'instanceTypes': ['optimal'],
 'subnets': subnets,
 'securityGroupIds': securityGroups,
 'ec2KeyPair': keyPair,
 'instanceRole': instanceRole
 }
 
 if computeType == 'SPOT':
 compute_resources = {
 'type': computeType,
 'imageId': imageId,
 'minvCpus': unitVCpus * 1,
 'maxvCpus': unitVCpus * 16,
 'desiredvCpus': unitVCpus * 1,
 'instanceTypes': ['optimal'],
 'subnets': subnets,
 'securityGroupIds': securityGroups,
 'ec2KeyPair': keyPair,
 'instanceRole': instanceRole,
 'bidPercentage': bidPercentage,
 'spotIamFleetRole': spotFleetRole,
 }
 
 response = batch.create_compute_environment(
 computeEnvironmentName=computeEnvironmentName,
 type='MANAGED',
 serviceRole=serviceRole,
 computeResources=compute_resources
 )

 while True:
 describe = batch.describe_compute_environments(computeEnvironments=[computeEnvironmentName])
 computeEnvironment = describe['computeEnvironments'][0]
 status = computeEnvironment['status']
 if status == 'VALID':
 print('\rSuccessfully created compute environment {}'.format(computeEnvironmentName))
 break
 elif status == 'INVALID':
 reason = computeEnvironment['statusReason']
 raise Exception('Failed to create compute environment: {}'.format(reason))
 print('\rCreating compute environment...')
 time.sleep(5)

 return response

### Create the AWS Batch Service Role

In [None]:
role_doc = {
 "Version": "2012-10-17", 
 "Statement": [
 {"Sid": "", 
 "Effect": "Allow", 
 "Principal": {
 "Service": "batch.amazonaws.com"
 }, 
 "Action": "sts:AssumeRole"
 }]
 }

batch_role_arn = workshop.create_role(iam=iam, policy_name='GenomicsEnvBatchServiceRole', \
 assume_role_policy_document=json.dumps(role_doc), \
 managed_policy='arn:aws:iam::aws:policy/service-role/AWSBatchServiceRole')
print(batch_role_arn)

### Create the AWS Batch Spot Fleet Role

In [None]:
role_doc = {
 "Version": "2012-10-17", 
 "Statement": [
 {"Sid": "", 
 "Effect": "Allow", 
 "Principal": {
 "Service": "spotfleet.amazonaws.com"
 }, 
 "Action": "sts:AssumeRole"
 }]
 }

spot_fleet_role_arn = workshop.create_role(iam=iam, policy_name='GenomicsEnvBatchSpotFleetRole', \
 assume_role_policy_document=json.dumps(role_doc), \
 managed_policy='arn:aws:iam::aws:policy/service-role/AmazonEC2SpotFleetTaggingRole')
print(spot_fleet_role_arn)

#### Create Default and High Priority Environments

Grabs values from above for:
* `imageId` use value from `EC2 AMI ImageId:`.
* `instanceRole` use value from `IAM Instance Profile:`
* `securityGroups` use value from `EC2 Security Group:`

### Create the Default Compute Environment

In [None]:
image_id = '{{image_id}}'
instance_role = '{{instance_role}}'
security_groups = ['{{security_group}}']

bid_percentage = 100
default_env = 'DefaultCromwellEnvironment'
hp_env = 'HighPriorityCromwellEnvironment'
desired_cpu = 4

resp = create_compute_environment(default_env, 'SPOT', desired_cpu, image_id, batch_role_arn, instance_role, \
 [subnet_id], security_groups, key_name, bid_percentage, spot_fleet_role_arn)
default_ce_arn = resp['computeEnvironmentArn']
default_ce = resp['computeEnvironmentName']
print(default_ce_arn)

### Create the High Priority Compute Environment

In [None]:
resp = create_compute_environment(hp_env, 'EC2', desired_cpu, image_id, batch_role_arn, instance_role, \
 [subnet_id], security_groups, key_name)
hp_ce_arn = resp['computeEnvironmentArn']
hp_ce = resp['computeEnvironmentName']
print(hp_ce_arn)


### Create the AWS Batch Job Queues

We will be creating two job queues one each for the default and high priority environments.

In [None]:
def create_job_queue(primaryComputeEnvironmentName, secondaryComputeEnvironment, priority):
 jobQueueName = primaryComputeEnvironmentName + '_queue'
 response = batch.create_job_queue(jobQueueName=jobQueueName,
 priority=priority,
 computeEnvironmentOrder=[
 {
 'order': 1,
 'computeEnvironment': primaryComputeEnvironmentName
 },
 {
 'order': 2,
 'computeEnvironment': secondaryComputeEnvironment
 }
 ])

 while True:
 describe = batch.describe_job_queues(jobQueues=[jobQueueName])
 jobQueue = describe['jobQueues'][0]
 status = jobQueue['status']
 if status == 'VALID':
 print('\rSuccessfully created job queue {}'.format(jobQueueName))
 break
 elif status == 'INVALID':
 reason = jobQueue['statusReason']
 raise Exception('Failed to create job queue: {}'.format(reason))
 print('\rCreating job queue... ')
 time.sleep(5)

 return response

In [None]:
resp = create_job_queue(hp_env, default_env, 1000)
hp_queue_arn = resp['jobQueueArn']
hp_queue = resp['jobQueueName']
print(hp_queue_arn)

In [None]:
resp = create_job_queue(default_env, hp_env, 1)
default_queue_arn = resp['jobQueueArn']
default_queue = resp['jobQueueName']
print(default_queue_arn)


### [Launch the Cromwell Server CloudFormation stack](https://docs.opendata.aws/genomics-workflows/cromwell/cromwell-aws-batch/)

#### Cromwell Server
To ensure the highest level of security, and robustness for long running workflows, it is recommended that you use an EC2 instance as your Cromwell server for submitting workflows to AWS Batch.

A couple things to note:

* This server does not need to be permanent. In fact, when you are not running workflows, you should stop or terminate the instance so that you are not paying for resources you are not using.

* You can launch a Cromwell server just for yourself and exactly when you need it.

* This server does not need to be in the same VPC as the one that Batch will launch instances in.

We will pull the latest Cromwell server CloudFormation, build the required parameters, and create the stack.

In [None]:
!wget https://s3.amazonaws.com/aws-genomics-workflows/templates/cromwell/cromwell-server.template.yaml

In [None]:
!cat cromwell-server.template.yaml

### Required parameters for CloudFormation

In [None]:
print('VpcId={}'.format(vpc_id))
print('PublicSubnetId={}'.format(subnet_id))
print('KeyName={}'.format(key_name))
print('S3BucketName={}'.format(bucket))
print('BatchQueue={}'.format(default_queue_arn))

**Replace the `ParameterValue` below based on the `ParameterKey` above.**

In [None]:
%%writefile cromwell-params.json

[
 {
 "ParameterKey": "InstanceType",
 "ParameterValue": "t2.medium"
 }, 
 {
 "ParameterKey": "VpcId",
 "ParameterValue": "{{VpcId}}"
 }, 
 {
 "ParameterKey": "PublicSubnetID",
 "ParameterValue": "{{PublicSubnetId}}"
 }, 
 {
 "ParameterKey": "LatestAmazonLinuxAMI",
 "ParameterValue": "/aws/service/ami-amazon-linux-latest/amzn-ami-hvm-x86_64-gp2"
 }, 
 {
 "ParameterKey": "InstanceName",
 "ParameterValue": "cromwell-server"
 }, 
 {
 "ParameterKey": "KeyName",
 "ParameterValue": "{{KeyName}}"
 }, 
 {
 "ParameterKey": "SSHLocation",
 "ParameterValue": "0.0.0.0/0"
 }, 
 {
 "ParameterKey": "HTTPLocation",
 "ParameterValue": "0.0.0.0/0"
 }, 
 {
 "ParameterKey": "S3BucketName",
 "ParameterValue": "{{S3BucketName}}"
 }, 
 {
 "ParameterKey": "BatchQueue",
 "ParameterValue": "{{BatchQueue}}"
 }
]

### Create the Cromwell Server CloudFormation stack

In [None]:
stack_name = 'CromwellServer'

!aws cloudformation create-stack --stack-name $stack_name \
 --template-body file://cromwell-server.template.yaml \
 --parameters file://cromwell-params.json \
 --capabilities CAPABILITY_IAM \
 --region $region

### Wait for Cromwell CloudFormation stack completion 

In [None]:
print('waiting for stack complete...')
waiter = cfn.get_waiter('stack_create_complete')
waiter.wait(
 StackName=stack_name
)
print('stack complete.')

### Get Outputs from CloudFormation

In [None]:
response = cfn.describe_stacks(StackName=stack_name)

outputs = response["Stacks"][0]["Outputs"]
pd.set_option('display.max_colwidth', -1)
pd.DataFrame(outputs, columns=["OutputKey", "OutputValue"])

### Create the Hello World wdl file for Cromwell

In [None]:
%%writefile simple-hello.wdl

task echoHello{
 command {
 echo "Hello AWS!"
 }
 runtime {
 docker: "ubuntu:latest"
 }

}

workflow printHelloAndGoodbye {
 call echoHello
}

### Submit Hello World wdl file

In the curl below swap out `{{cromwell server}}` with the `HostName` from the CloudFormation template above. 

In [None]:
!curl -X POST "http://{{cromwell server}}/api/workflows/v1" \
 -H "accept: application/json" \
 -F "workflowSource=@simple-hello.wdl"

### Logging

All Cromwell server logs get sent to CloudWatch Logs. With these logs you can diagnose and troubleshoot any issues that may arise with submitting jobs to the AWS Batch compute environments.

In [None]:
print('https://{0}.console.aws.amazon.com/cloudwatch/home?region={0}#logStream:group=cromwell-server'.format(region))

### Real world example

In [None]:
%%writefile HaplotypeCaller.aws.wdl

## Copyright Broad Institute, 2017
##
## This WDL workflow runs HaplotypeCaller from GATK4 in GVCF mode on a single sample
## according to the GATK Best Practices (June 2016), scattered across intervals.
##
## Requirements/expectations :
## - One analysis-ready BAM file for a single sample (as identified in RG:SM)
## - Set of variant calling intervals lists for the scatter, provided in a file
##
## Outputs :
## - One GVCF file and its index
##
## Cromwell version support
## - Successfully tested on v29
## - Does not work on versions < v23 due to output syntax
##
## IMPORTANT NOTE: HaplotypeCaller in GATK4 is still in evaluation phase and should not
## be used in production until it has been fully vetted. In the meantime, use the GATK3
## version for any production needs.
##
## Runtime parameters are optimized for Broad's Google Cloud Platform implementation.
##
## LICENSING :
## This script is released under the WDL source code license (BSD-3) (see LICENSE in
## https://github.com/broadinstitute/wdl). Note however that the programs it calls may
## be subject to different licenses. Users are responsible for checking that they are
## authorized to run all programs before running this script. Please see the dockers
## for detailed licensing information pertaining to the included programs.

# WORKFLOW DEFINITION
workflow HaplotypeCallerGvcf_GATK4 {
 File input_bam
 File input_bam_index
 File ref_dict
 File ref_fasta
 File ref_fasta_index
 File scattered_calling_intervals_list

 String gatk_docker

 String gatk_path

 Array[File] scattered_calling_intervals = read_lines(scattered_calling_intervals_list)

 String sample_basename = basename(input_bam, ".bam")

 String gvcf_name = sample_basename + ".g.vcf.gz"
 String gvcf_index = sample_basename + ".g.vcf.gz.tbi"

 # Call variants in parallel over grouped calling intervals
 scatter (interval_file in scattered_calling_intervals) {

 # Generate GVCF by interval
 call HaplotypeCaller {
 input:
 input_bam = input_bam,
 input_bam_index = input_bam_index,
 interval_list = interval_file,
 gvcf_name = gvcf_name,
 ref_dict = ref_dict,
 ref_fasta = ref_fasta,
 ref_fasta_index = ref_fasta_index,
 docker_image = gatk_docker,
 gatk_path = gatk_path
 }
 }

 # Merge per-interval GVCFs
 call MergeGVCFs {
 input:
 input_vcfs = HaplotypeCaller.output_gvcf,
 vcf_name = gvcf_name,
 vcf_index = gvcf_index,
 docker_image = gatk_docker,
 gatk_path = gatk_path
 }

 # Outputs that will be retained when execution is complete
 output {
 File output_merged_gvcf = MergeGVCFs.output_vcf
 File output_merged_gvcf_index = MergeGVCFs.output_vcf_index
 }
}

# TASK DEFINITIONS

# HaplotypeCaller per-sample in GVCF mode
task HaplotypeCaller {
 File input_bam
 File input_bam_index
 String gvcf_name
 File ref_dict
 File ref_fasta
 File ref_fasta_index
 File interval_list
 Int? interval_padding
 Float? contamination
 Int? max_alt_alleles

 Int preemptible_tries
 Int disk_size
 String mem_size

 String docker_image
 String gatk_path
 String java_opt

 command {
 ${gatk_path} --java-options ${java_opt} \
 HaplotypeCaller \
 -R ${ref_fasta} \
 -I ${input_bam} \
 -O ${gvcf_name} \
 -L ${interval_list} \
 -ip ${default=100 interval_padding} \
 -contamination ${default=0 contamination} \
 --max-alternate-alleles ${default=3 max_alt_alleles} \
 -ERC GVCF
 }

 runtime {
 docker: docker_image
 memory: mem_size
 cpu: 1
 disks: "local-disk"
 }

 output {
 File output_gvcf = "${gvcf_name}"
 }
}

# Merge GVCFs generated per-interval for the same sample
task MergeGVCFs {
 Array [File] input_vcfs
 String vcf_name
 String vcf_index

 Int preemptible_tries
 Int disk_size
 String mem_size

 String docker_image
 String gatk_path
 String java_opt

 command {
 ${gatk_path} --java-options ${java_opt} \
 MergeVcfs \
 --INPUT=${sep=' --INPUT=' input_vcfs} \
 --OUTPUT=${vcf_name}
 }

 runtime {
 docker: docker_image
 memory: mem_size
 cpu: 1
 disks: "local-disk"
}

 output {
 File output_vcf = "${vcf_name}"
 File output_vcf_index = "${vcf_index}"
 }
}

### Input parameters

In [None]:
%%writefile HaplotypeCaller.aws.json

{
 "##_COMMENT1": "INPUT BAM",
 "HaplotypeCallerGvcf_GATK4.input_bam": "s3://gatk-test-data/wgs_bam/NA12878_24RG_hg38/NA12878_24RG_small.hg38.bam",
 "HaplotypeCallerGvcf_GATK4.input_bam_index": "s3://gatk-test-data/wgs_bam/NA12878_24RG_hg38/NA12878_24RG_small.hg38.bai",

 "##_COMMENT2": "REFERENCE FILES",
 "HaplotypeCallerGvcf_GATK4.ref_dict": "s3://broad-references/hg38/v0/Homo_sapiens_assembly38.dict",
 "HaplotypeCallerGvcf_GATK4.ref_fasta": "s3://broad-references/hg38/v0/Homo_sapiens_assembly38.fasta",
 "HaplotypeCallerGvcf_GATK4.ref_fasta_index": "s3://broad-references/hg38/v0/Homo_sapiens_assembly38.fasta.fai",

 "##_COMMENT3": "INTERVALS",
 "HaplotypeCallerGvcf_GATK4.scattered_calling_intervals_list": "s3://gatk-test-data/intervals/hg38_wgs_scattered_calling_intervals.txt",
 "HaplotypeCallerGvcf_GATK4.HaplotypeCaller.interval_padding": 100,

 "##_COMMENT4": "DOCKERS",
 "HaplotypeCallerGvcf_GATK4.gatk_docker": "broadinstitute/gatk:4.0.0.0",

 "##_COMMENT5": "PATHS",
 "HaplotypeCallerGvcf_GATK4.gatk_path": "/gatk/gatk",

 "##_COMMENT6": "JAVA OPTIONS",
 "HaplotypeCallerGvcf_GATK4.HaplotypeCaller.java_opt": "-Xms8000m",
 "HaplotypeCallerGvcf_GATK4.MergeGVCFs.java_opt": "-Xms8000m",

 "##_COMMENT7": "MEMORY ALLOCATION",
 "HaplotypeCallerGvcf_GATK4.HaplotypeCaller.mem_size": "10 GB",
 "HaplotypeCallerGvcf_GATK4.MergeGVCFs.mem_size": "30 GB",

 "##_COMMENT8": "DISK SIZE ALLOCATION",
 "HaplotypeCallerGvcf_GATK4.HaplotypeCaller.disk_size": 100,
 "HaplotypeCallerGvcf_GATK4.MergeGVCFs.disk_size": 100,

 "##_COMMENT9": "PREEMPTION",
 "HaplotypeCallerGvcf_GATK4.HaplotypeCaller.preemptible_tries": 3,
 "HaplotypeCallerGvcf_GATK4.MergeGVCFs.preemptible_tries": 3
}

### Submit job to Cromwell server

In [None]:
!curl -X POST "http://{{cromwell server}}/api/workflows/v1" \
 -H "accept: application/json" \
 -F "workflowSource=@HaplotypeCaller.aws.wdl" \
 -F "workflowInputs=@HaplotypeCaller.aws.json"

## Cleanup

In [None]:
def delete_compute_environment(computeEnvironment):
 response = batch.update_compute_environment(
 computeEnvironment=computeEnvironment,
 state='DISABLED',
 )
 print(response)
 time.sleep(10)
 response = batch.delete_compute_environment(
 computeEnvironment=computeEnvironment
 )
 return response

def delete_job_queue(name):
 response = batch.update_job_queue(
 jobQueue=name,
 state='DISABLED'
 )
 print(response)
 time.sleep(10)
 response = batch.delete_job_queue(
 jobQueue=name
 )
 return response


In [None]:
resp = delete_job_queue(hp_queue)

In [None]:
resp = delete_job_queue(default_queue)

In [None]:
resp = delete_compute_environment(hp_ce)

In [None]:
resp = delete_compute_environment(default_ce)

In [None]:
response = cfn.delete_stack(StackName=stack_name)

In [None]:
print('waiting for stack complete...')
waiter = cfn.get_waiter('stack_delete_complete')
waiter.wait(
 StackName=stack_name
)
print('stack complete.')

In [None]:
response = ec2_client.delete_key_pair(KeyName=key_name)

In [None]:
workshop.vpc_cleanup(vpc_id)