# Using HealthOmics Storage with genomic references and readsets

The goal of this notebook is to get you acquainted with HealthOmics Storage. <br>If you complete this notebook you will have:
1. Created a Reference Store
2. Imported a Reference Genome
3. Created a Sequence Store
4. Imported several FASTQ files
5. Imported a CRAM file
6. Downloaded a ReadSet using the HealthOmics Transfer Manager.

## Prerequisites

### Python requirements
* Python >= 3.8
* Packages:
  * boto3 >= 1.26.19
  * botocore >= 1.29.19

### AWS requirements

#### AWS CLI
You will need the AWS CLI installed and configured in your environment. <br>Supported AWS CLI versions are:

* AWS CLI v2 >= 2.9.3 (Recommended)
* AWS CLI v1 >= 1.27.19

#### AWS Region
AWS HealthOmics is currently available in Oregon (us-west-2), N. Virginia (us-east-1), Dublin (eu-west-1), London (eu-west-2), Frankfurt (eu-central-1), and Singapore (ap-southeast-1).

**================**

**AWS HealthOmics only allows importing data within the same region.** 

This notebook works in all AWS HealthOmics supported regions, utilizing regional buckets.
Data in the regional buckets originate from the following, publicaly available buckets:

* s3://1000genomes-dragen-v3.7.6/references/fasta/hg38.fa
* s3://1000genomes/1000G_2504_high_coverage/additional_698_related/data/ERR3988761/HG00405.final.cram*
* s3://giab/data/NA12878/NIST_NA12878_HG001_HiSeq_300x/140407_D00360_0016_AH948VADXX/Project_RM8398/Sample_U0a/U0a_CGATGT_L001_R{1,2}_*.fastq.gz

## Policy Set Up

This notebook runs under the role that was created/selected when the notebook instance was created.<br>
This role is a very specific role, for running notebooks in Sagemaker.

Prior to executing this notebook, verify that the specific role has the below policies attached:

## Environment Set Up
We execute a reset of resources, in case we re-run the tutorial.

In [None]:
%reset -f

We set up our environment, by importing required libraries

In [None]:
from datetime import datetime
import itertools as it
import json
import gzip
import os
from pprint import pprint
import time
import urllib

import boto3
import botocore.exceptions
import requests


### Create a service IAM role

For the purposes of this tutorial, we will use the following policy and trust policy to demo the capabilities of AWS HealthOmics, you are free to customize permissions as required for your use case after going though this tutorial.

**NOTE:**
In this case we've defined rather permissive permissions (i.e. "\*" Resources). In particular, we are allowing read/write access to all S3 buckets available to the account for this tutorial. In a real world setting you will want to scope this down to only the minimally needed actions on necessary resources.

In [None]:
# set a timestamp
dt_fmt = '%Y%m%dT%H%M%S'
ts = datetime.now().strftime(dt_fmt)

In [None]:
demo_policy = {
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "omics:*"
      ],
      "Resource": "*"
    },
    {
      "Effect": "Allow",
      "Action": [
        "ram:AcceptResourceShareInvitation",
        "ram:GetResourceShareInvitations"
      ],
      "Resource": "*"
    },
    {
      "Effect": "Allow",
      "Action": [
        "s3:GetBucketLocation",
        "s3:PutObject",
        "s3:GetObject",
        "s3:ListBucket",
        "s3:AbortMultipartUpload",
        "s3:ListMultipartUploadParts",
        "s3:GetObjectAcl",
        "s3:PutObjectAcl"
      ],
      "Resource": "*"
    }
  ]
}

demo_trust_policy = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Service": "omics.amazonaws.com"
            },
            "Action": "sts:AssumeRole"
        }
    ]
}

In order to proceed we need to create a couple of resources. <br>The first is the role that you will be passing into AWS HealthOmics. If the role doesn't exist, we will need to create it and attach the policy and trust policy defined above.

In [None]:
# We will use this as the base name for our role and policy
omics_iam_name = f'OmicsTutorialRole-{ts}'

# Create the iam client
iam = boto3.resource('iam')

# Check if the role already exist if not create it
try:
    role = iam.Role(omics_iam_name)
    role.load()
    
except botocore.exceptions.ClientError as ex:
    if ex.response["Error"]["Code"] == "NoSuchEntity":
        #Create the role with the corresponding trust policy
        role = iam.create_role(
            RoleName=omics_iam_name, 
            AssumeRolePolicyDocument=json.dumps(demo_trust_policy))
        
        #Create policy
        policy = iam.create_policy(
            PolicyName='{}-policy'.format(omics_iam_name), 
            Description="Policy for AWS HealthOmicsics demo",
            PolicyDocument=json.dumps(demo_policy))
        
        #Attach the policy to the role
        policy.attach_role(RoleName=omics_iam_name)
    else:
        print('Somthing went wrong, please retry and check your account settings and permissions')

Now that we know the role exists, lets create a helper function to help us retrieve the role arn which we will need to pass into the service API calls. 
<br>The role arn will grant AWS HealthOmics the proper permissions to access the resources it needs in your AWS account.

In [None]:
def get_role_arn(role_name):
    try:
        iam = boto3.resource('iam')
        role = iam.Role(role_name)
        role.load()  # calls GetRole to load attributes
    except botocore.exceptions.ClientError:
        print("Couldn't get role named %s."%role_name)
        raise
    else:
        return role.arn

Let's get the region in which we are running our notebook. 

In [None]:
region = boto3.session.Session().region_name

We are using region information to create a regional  dictionary of datasources we'll be using in this tutorial

In [None]:
regional_bucket = f'aws-genomics-static-{region}'

SOURCE_S3_URI_TPL = {
    "reads": {
        "fastq": "s3://{regional_bucket}/omics-tutorials/data/fastq/NA12878/Sample_U0a/U0a_CGATGT_L001_R{read}_{part:03}.fastq.gz",
    }
}

SOURCE_S3_URIS = {
    "reference": f's3://{regional_bucket}/omics-tutorials/data/references/hg38/Homo_sapiens_assembly38.fasta',
    "reads": {
        "fastq": [
            SOURCE_S3_URI_TPL['reads']['fastq'].format(regional_bucket=regional_bucket,read=read, part=part) 
            for read, part, in list(it.product([1,2], [1,2,3,4]))],
        "cram": [
            f's3://{regional_bucket}/omics-tutorials/data/cram/ERR3988761/HG00405.final.cram',
            f's3://{regional_bucket}/omics-tutorials/data/cram/ERR3988762/HG00408.final.cram',
            f's3://{regional_bucket}/omics-tutorials/data/cram/ERR3988763/HG00418.final.cram',
            f's3://{regional_bucket}/omics-tutorials/data/cram/ERR3988764/HG00420.final.cram'
        ]
    }
}

### Creating the AWS HealthOmics client

In [None]:
omics = boto3.client('omics', region_name=region)

## Reference stores

We are first going to import a genome reference. This reference will be taken from 1000 genomes, which will also be the source of the reads. This import has two steps. First, if this is the first time you've created a reference, you first must create a reference store.

Reference Stores (and Seqeuence Stores) also support CMKs; however, we'll use AWS-owned encryption keys throughout.

To start lets create a helper method to retrieve the reference store id and have it return empty if it doesn't exist. There should only be one reference store per region per account.



In [None]:
def get_ref_store_id(client=None):
    if not client:
        client = boto3.client('omics')
    
    resp = client.list_reference_stores(maxResults=10)
    list_of_stores = resp.get('referenceStores')
    store_id = None
    
    if list_of_stores != None:
        # As mentioned above there can only be one store per region
        # if there is a store present is the first one
        store_id = list_of_stores[0].get('id')
    
    return store_id

In [None]:
print(f"Checking for a reference store in region: {omics.meta.region_name}")
if get_ref_store_id(omics) == None:
    response = omics.create_reference_store(name='myReferenceStore')
    print(response)
else:
    print("Congratulations, you have an existing reference store!")

### Importing references

We'll now import a reference using the `start_reference_import_job` API call.

This will use the reference store we created (or found) and the IAM role we created above. All references in a Reference store must have a unique name. So, we're also going to apply a timestamp to the reference name to ensure that it is unique.

Also note that the IAM role used should have the ability to read the object in the S3 bucket that your reference is sourced from.

In [None]:
ref_name = f'tutorial-1kg-grch38-{ts}'

ref_import_job = omics.start_reference_import_job(
    referenceStoreId=get_ref_store_id(omics), 
    roleArn=get_role_arn(omics_iam_name),
    sources=[{
        'sourceFile': SOURCE_S3_URIS["reference"],
        'name': ref_name,
        'tags': {'SourceLocation': '1kg'}
    }])

You can check the status here either using `omics.get_reference_import_job` or by navigating to the [Console](https://console.aws.amazon.com/omics/home#/reference/referenceStoreDetail)

In [None]:
ref_import_job = omics.get_reference_import_job(
    referenceStoreId=get_ref_store_id(omics), 
    id=ref_import_job['id'])
ref_import_job

The import can take up to 5 minutes to complete. We can wait for it to complete using a `waiter`.

In [None]:
print(f"waiting for job {ref_import_job['id']} to complete")
try:
    waiter = omics.get_waiter('reference_import_job_completed')
    waiter.wait(referenceStoreId=ref_import_job['referenceStoreId'], id=ref_import_job['id'])

    print(f"job {ref_import_job['id']} complete")
except botocore.exceptions.WaiterError as e:
    print(f"job {ref_import_job['id']} FAILED:")
    print(e)

Once the import job is complete, we should be able to see our reference in the list of available references in the Reference store.

In [None]:
resp = omics.list_references(referenceStoreId=get_ref_store_id(omics), filter={"name": ref_name})

ref_list = resp
pprint(resp)

And now we can get more details for our imported reference with the following:

In [None]:
# storing this specific reference as we'll be using it later
ref = omics.get_reference_metadata(
    referenceStoreId=get_ref_store_id(omics), 
    id=ref_list['references'][0]['id'])
ref

Now that you've imported your reference, let's import our first set of genomics reads.

## Sequence Stores

We will now create our first sequence store. A sequence store is similar to an S3 bucket; it holds a set of objects, known as read sets. 

The easiest way to think about a read set is it an abstraction of a set of genomics file types (FASTQ, BAM, CRAM) that store reads from a sequencer. When you get a read set back from a sequence store and process it, the data you get back is semantically identical to what you put in.

Note that you can use customer managed CMKs with HealthOmics storage; these are managed at the store level.

### Creating sequence stores

Let's create a sequence store and retrieve its details.

In [None]:
sequence_store_name = f'tutorial-seq-store-{ts}'
response = omics.create_sequence_store(name=sequence_store_name)

response

In [None]:
seqstore = response
omics.get_sequence_store(id=seqstore['id'])

### Import sequence data

Now, let's import some data in the form of FASTQs and CRAMs. AWS HealthOmics allows you to provide additional metadata for your reads using import manifests. Let's look at how to generate manifests for CRAM and FASTQ data.

#### CRAM manifests
CRAM manifests are relatively simple since all the read data is typically in one source file.

In [None]:
role_arn = get_role_arn(omics_iam_name)

In [None]:
# create a list of cram sources
cram_sources = []
for source_uri in SOURCE_S3_URIS['reads']['cram']:
    source = urllib.parse.urlparse(source_uri)
    subject_id = source.path[1:].split('/')[-2]  # accession, e.g. ERR1234567
    sample_id = source.path[1:].split('/')[-1].split('.')[0] # basename of object, e.g. HG12345
    
    cram_sources += [{
        'subjectId': subject_id,
        'sampleId': sample_id,
        'sourceFileType': 'CRAM',
        'sourceFiles': { 'source1': source_uri },
        'referenceArn': ref['arn'],  # we stashed this earlier
        'generatedFrom': source_uri,
        'description': f'{subject_id}/{sample_id} from {source.netloc}',
        'tags': {'SourceLocation': source.netloc}
    }]
cram_sources

#### FASTQ manifests
FASTQs can be a bit more complicated since read data can come from one or two source files. Additionally, sources for multiple paired-end reads may exist in the same directory.

In [None]:
# the set of source URIs is evenly matched with two reads {R1, R2} and for parts each
# so we can just sort and divide the list into two to get matched lists of paired reads
source_uris = zip(
    sorted(SOURCE_S3_URIS['reads']['fastq'])[:len(SOURCE_S3_URIS['reads']['fastq'])//2],
    sorted(SOURCE_S3_URIS['reads']['fastq'])[len(SOURCE_S3_URIS['reads']['fastq'])//2:]
)

fastq_sources = []
for reads in source_uris:
    source = urllib.parse.urlparse(reads[0])
    subject_id = source.path[1:].split('/')[-3]  # NA12878
    sample_id = source.path[1:].split('/')[-2]  # Sample_U0a
    
    fastq_sources += [{
        'subjectId': subject_id,
        'sampleId': sample_id,
        'sourceFileType': 'FASTQ',
        'sourceFiles': { 
            'source1': reads[0],
            'source2': reads[1]
        },
        'referenceArn': ref['arn'],  # we stashed this earlier
        'generatedFrom': f'{subject_id}/{sample_id} fastq',
        'description': f'{subject_id}/{sample_id} from {source.netloc}',
        'tags': {'SourceLocation': source.netloc}
    }]
fastq_sources

## Importing ReadSets

Now, we'll make a combined set of import sources for our store. You can mix and match read sets of different types with imports. 

In [None]:
import_sources = fastq_sources + cram_sources
import_sources

In [None]:
readset_import_job = omics.start_read_set_import_job(
    roleArn=get_role_arn(omics_iam_name),
    sequenceStoreId=seqstore['id'], 
    sources=import_sources)
readset_import_job

You can monitor this progress with `omics.get_read_set_import_job`

In [None]:
readset_import_job = omics.get_read_set_import_job(
    sequenceStoreId=seqstore['id'], 
    id=readset_import_job['id'])

readset_import_job

Depending on the size of the source data, ReadSet import jobs can take up to 30min for each source to complete.

The import of an individual Read Set into a sequence store will take longer than a traditional S3 copy. This is because AWS HealthOmics will validate files, calculate metadata, and organize your ReadSets to make it easier for you to consume and share.

AWS HealthOmics supports high parallelism across imports. For example, if the import of a FASTQ takes X minutes, the import of many FASTQs of the same size should also take approximately X minutes. Each import job supports a maximum of 1000 read sets. So we could import all of 1000 genomes with only 4 API calls. For higher parallelism, you can split larger datastes into multiple ReadSet import jobs.

## Copying References and ReadSets to a local file system

Many existing tools for genomics data processing expect to read files on the local filesystem.

AWS HealthOmics supports multipart downloads much like S3, but specifically designed for genomic datatypes. After importing your references and readsets into AWS HealthOmics Storage, you can easily retrieve them and their supporting files (e.g. index files like `*.fai` for references, or `*.crai` for reads).

In [None]:
ref_list = omics.list_references(referenceStoreId=get_ref_store_id(omics))
readset_list = omics.list_read_sets(sequenceStoreId=seqstore['id'])

pprint(ref_list)
pprint(readset_list)

AWS HealthOmics provides a number of basic methods to retrieve data. For efficiency, references and read sets are stored in multiple parts. We can see this if we look at one of the references we imported earlier:

In [None]:
ref_metadata = omics.get_reference_metadata(referenceStoreId=ref['referenceStoreId'], id=ref['id'])
ref_metadata

Notice the `files` property:

In [None]:
ref_metadata['files']

Here we can see the `source` file has several parts. There is also a smaller `index` file which will typically have only one part.

The AWS HealthOmics API `get_reference` lets you download one part at a time. Let's use it to download pieces of the reference and its associated index.

In [None]:
# download a part of the reference
response = omics.get_reference(referenceStoreId=ref['referenceStoreId'], id=ref['id'], partNumber=1, file='SOURCE')
with open('reference.part.fa', 'wb') as f:
    f.write(response['payload'].read())

# download a part of the reference index
response = omics.get_reference(referenceStoreId=ref['referenceStoreId'], id=ref['id'], partNumber=1, file='INDEX')
with open('reference.part.fai', 'wb') as f:
    f.write(response['payload'].read())

There are similar APIs for downloading ReadSets. One notable difference is that ReadSets can have more than one source - e.g. `SOURCE1` and `SOURCE2` - and no index if they originate from FASTQ, or only a single source and an accompanying index if they come from an aligned format like BAM or CRAM.

> **Note** the following cells will only work if you have an available readset. The first cell below will check for one. Subsequent cells thereafter assume a readset was found.

In [None]:
# check if there are readsets available
readset_import_jobs = omics.list_read_set_import_jobs(sequenceStoreId=seqstore['id'], filter={'status': 'IN_PROGRESS'})
readset_list = omics.list_read_sets(sequenceStoreId=seqstore['id'])
if not readset_list.get('readSets'):
    print(f"no active readsets available in {seqstore['id']}.")
    if readset_import_jobs.get('importJobs'):
        print(f"{len(readset_import_jobs.get('importJobs'))} in progress readset import jobs found. Readsets should be available in a few minutes.")
else:
    readset = readset_list['readSets'][0]
    pprint(readset)

In [None]:
readset_metadata = omics.get_read_set_metadata(sequenceStoreId=readset['sequenceStoreId'], id=readset['id'])
readset_metadata

In [None]:
response = omics.get_read_set(sequenceStoreId=readset['sequenceStoreId'], id=readset['id'], partNumber=1, file='SOURCE1')

In [None]:
# download a part of the readset
response = omics.get_read_set(sequenceStoreId=readset['sequenceStoreId'], id=readset['id'], partNumber=1, file='SOURCE1')
with open(f"reads1.part.{readset_metadata['fileType'].lower()}", 'wb') as f:
    f.write(response['payload'].read())