# Querying annotations and variants with HealthOmics Analytics

The goal of this notebook is to help you get started importing variant and annotation data into a variety of HealthOmics Analytics stores. We will cover:

1. VCF and gVCF inputs
2. Annotation data in VCF, TSV, and GFF formats
3. How to write queries on these data


## Prerequisites
### Python requirements
* Python >= 3.8
* Packages:
 * boto3 >= 1.26.19
 * botocore >= 1.29.19
 * [AWS Wrangler](https://pypi.org/project/awswrangler/) >= 2.17.0

### AWS requirements

#### AWS CLI
You will need the AWS CLI installed and configured in your environment. Supported AWS CLI versions are:

* AWS CLI v2 >= 2.9.3 (Recommended)
* AWS CLI v1 >= 1.27.19

#### AWS Region
AWS HealthOmics is currently available in Oregon (us-west-2), N. Virginia (us-east-1), Dublin (eu-west-1), London (eu-west-2), Frankfurt (eu-central-1), and Singapore (ap-southeast-1).

**================**

**AWS HealthOmics only supports importing data within the same region.**

This notebook works in all AWS HealthOmics supported regions, utilizing regional buckets. Data in the regional buckets originates from the following publicly available buckets:

* s3://1000genomes-dragen/data/precisionFDA/hg38-graph-based/
* s3://aws-genomics-datasets/omics-e2e/clinvar.vcf.gz

#### Output buckets
You will need a bucket in the same region you are running this tutorial in to store query outputs.

#### AWS HealthOmics references
You need to have the following AWS HealthOmics resources available:

* Reference store
* A reference in the Reference store named "GRCh38"

For more information on how to create these, see the AWS HealthOmics Storage tutorial.

## Set Up

### Execution role

To run this notebook in Amazon Sagemaker on a notebook instance you will need an execution role with the following permissions:

```json
{
 "Version": "2012-10-17",
 "Statement": [
 {
 "Effect": "Allow",
 "Action": [
 "iam:GetUser",
 "iam:GetPolicy",
 "iam:CreatePolicy",
 "iam:GetRole",
 "iam:PassRole",
 "iam:CreateRole",
 "iam:AttachRolePolicy"
 ],
 "Resource": "*"
 },
 {
 "Effect": "Allow",
 "Action": [
 "omics:*"
 ],
 "Resource": "*"
 },
 {
 "Effect": "Allow",
 "Action": [
 "ram:AcceptResourceShareInvitation",
 "ram:GetResourceShareInvitations",
 "ram:ListResources",
 "ram:GetResourceShares"
 ],
 "Resource": "*"
 },
 {
 "Effect": "Allow",
 "Action": [
 "glue:CreateTable",
 "glue:DeleteTable",
 "glue:GetTable",
 "glue:UpdateTable",
 "glue:GetTables"
 ],
 "Resource": "*"
 },
 {
 "Effect": "Allow",
 "Action": [
 "athena:ListWorkGroups",
 "athena:CreateWorkGroup",
 "athena:GetWorkGroup",
 "athena:StartQueryExecution",
 "athena:GetQueryExecution",
 "athena:GetQueryResults"
 ],
 "Resource": [
 "*"
 ]
 },
 {
 "Effect": "Allow",
 "Action": [
 "lakeformation:GetDataAccess",
 "lakeformation:GrantPermissions",
 "lakeformation:GetDataLakeSettings",
 "lakeformation:PutDataLakeSettings"
 ],
 "Resource": [
 "*"
 ]
 },
 {
 "Effect": "Allow",
 "Action": [
 "s3:PutObject",
 "s3:GetObject",
 "s3:ListBucket",
 "s3:DeleteObject"
 ],
 "Resource": "arn:aws:s3:::*"
 }
 ]
}
```

### Python package imports

In [None]:
from datetime import datetime
import json
import os
import time
import urllib

import boto3
import botocore.exceptions


### Create a service IAM role

For the purposes of this tutorial, we will use the following policy and trust policy to demo the capabilities of AWS HealthOmics, you are free to customize permissions as required for your use case after going though this tutorial.

**NOTE:**
In this case we've defined rather permissive permissions (i.e. "\*" Resources). In particular, we are allowing read/write access to all S3 buckets available to the account for this tutorial. In a real world setting you will want to scope this down to only the minimally needed actions on necessary resources.

In [None]:
# set a timestamp
dt_fmt = '%Y%m%dT%H%M%S'
ts = datetime.now().strftime(dt_fmt)

In [None]:
demo_policy = {
 "Version": "2012-10-17",
 "Statement": [
 {
 "Effect": "Allow",
 "Action": [
 "omics:*"
 ],
 "Resource": "*"
 },
 {
 "Effect": "Allow",
 "Action": [
 "ram:AcceptResourceShareInvitation",
 "ram:GetResourceShareInvitations"
 ],
 "Resource": "*"
 },
 {
 "Effect": "Allow",
 "Action": [
 "s3:GetBucketLocation",
 "s3:PutObject",
 "s3:GetObject",
 "s3:ListBucket",
 "s3:AbortMultipartUpload",
 "s3:ListMultipartUploadParts",
 "s3:GetObjectAcl",
 "s3:PutObjectAcl"
 ],
 "Resource": "*"
 }
 ]
}

demo_trust_policy = {
 "Version": "2012-10-17",
 "Statement": [
 {
 "Effect": "Allow",
 "Principal": {
 "Service": "omics.amazonaws.com"
 },
 "Action": "sts:AssumeRole"
 }
 ]
}

In order to proceed we need to create a couple of resources the first is the role that you will be passing into AWS HealthOmics. If the role doesn't exist, we will need to create it and attach the policy and trust policy defined above.

In [None]:
# We will use this as the base name for our role and policy
omics_iam_name = f'OmicsTutorialRole-{ts}'

# Create the iam client
iam = boto3.resource('iam')

# Check if the role already exist if not create it
try:
 role = iam.Role(omics_iam_name)
 role.load()
 
except botocore.exceptions.ClientError as ex:
 if ex.response["Error"]["Code"] == "NoSuchEntity":
 #Create the role with the corresponding trust policy
 role = iam.create_role(
 RoleName=omics_iam_name, 
 AssumeRolePolicyDocument=json.dumps(demo_trust_policy))
 
 #Create policy
 policy = iam.create_policy(
 PolicyName='{}-policy'.format(omics_iam_name), 
 Description="Policy for AWS HealthOmicsics demo",
 PolicyDocument=json.dumps(demo_policy))
 
 #Attach the policy to the role
 policy.attach_role(RoleName=omics_iam_name)
 else:
 print('Something went wrong, please retry and check your account settings and permissions')
 print(ex)

Now that we know the role exist, lets create a helper function to help us retrieve the role arn which we will need to pass into the service API calls. The role arn will grant AWS HealthOmics the permissions it needs to access the resources it needs in your aws account.

In [None]:
def get_role_arn(role_name):
 try:
 iam = boto3.resource('iam')
 role = iam.Role(role_name)
 role.load() # calls GetRole to load attributes
 except ClientError:
 print("Couldn't get role named %s."%role_name)
 raise
 else:
 return role.arn

### Creating the AWS HealthOmics client

Here we create a client for AWS HealthOmics.

We'll also get the region that the notebook is running in. This is used to select the right regional data source.

In [None]:
omics = boto3.client('omics')

region = omics.meta.region_name
regional_bucket = f'aws-genomics-static-{region}'

## Variant Store

We'll be using the DRAGEN Thousand Genomes data set, which is available in the [Registry of Open Data on AWS](https://registry.opendata.aws/ilmn-dragen-1kgp/). 


In order to create a variant store, you will need to have uploaded a reference into omics storage, the following helper method will retrieve the reference id for a given reference imported into omics storage. 

In [None]:
def get_reference_arn(ref_name, client=None):
 if not client:
 client = boto3.client('omics')
 
 resp = client.list_reference_stores(maxResults=10)
 ref_stores = resp.get('referenceStores')
 
 # There can only be one reference store per account per region
 # if there is a store present, it is the first one
 ref_store = ref_stores[0] if ref_stores else None
 
 if not ref_store:
 raise RuntimeError("You have not created a reference store, please got to the AWS HealthOmics Storage tutorial to learn how to create one. Do note continue with this notebook")
 
 ref_arn = None
 resp = omics.list_references(referenceStoreId=ref_store['id'])
 ref_list = resp.get('references')
 
 for ref in resp.get('references'):
 if ref['name'] == ref_name:
 ref_arn = ref['arn']
 
 if ref_arn == None:
 raise RuntimeError(f"Could not find {ref_name}, please got to the AWS HealthOmics Storage tutorial to learn how to import one. Do note continue with this notebook")
 
 return ref_arn

In [None]:
omics.list_reference_stores()

### Step 1 - Prepare data

This tutorial will import a few VCFs that originated from the 1000 Genomes dataset which is available via the [Registry of Open Data on AWS](https://registry.opendata.aws/ilmn-dragen-1kgp/). The original data is located in `us-east-1`. Since HealthOmics does not allow for cross-region data access, the data have been replicated across all regions that HealthOmics is available in into S3 buckets used by this tutorial.

For demonstration purposes, we're only using 3 genomes worth of data. AWS HealthOmics can handle much more, and it is possible to import the entire 1000 Genomes dataset (3202 samples).

In [None]:
SOURCE_VARIANT_URI = f"s3://{regional_bucket}/omics-tutorials/data/variants"

In [None]:
source = urllib.parse.urlparse(SOURCE_VARIANT_URI)
bucket = source.netloc
prefix = source.path[1:]

s3r = boto3.resource('s3')

bucket = s3r.Bucket(bucket)
objects = bucket.objects.filter(Prefix=prefix, MaxKeys=10_000)
ext = 'hard-filtered.vcf.gz'

# this is a list comprehension for demo purposes.
# if you have a larger set of data a generator comprehension will be more performant here 
# as it effectively lazy-loads the list of object keys
vcf_list = [f"s3://{o.bucket_name}/{o.key}" for o in objects if o.key.endswith(ext)]

Let's take a look at the list of our source VCFs

In [None]:
vcf_list

### Step 2 - Create Variant Store

Now, let's create a variant store. Feel free to update `var_store_name`. Note that Variant store names need to match the pattern `[a-z]{1}[a-z0-9_]{254}` (i.e. alphanumeric lowercase with "_" up to 255 characters)

In [None]:
var_store_name = f'tutorial_variants_{ts.lower()}'
ref_name = 'GRCh38' ## Change this reference name to match one you have created if needed

In [None]:
response = omics.create_variant_store(
 name=var_store_name, 
 reference={"referenceArn": get_reference_arn(ref_name, omics)}
)

var_store = response
response

Creating a variant store takes up to 5 min to complete. We can use a `waiter` to tell us when the store is ready to use.

In [None]:
try:
 waiter = omics.get_waiter('variant_store_created')
 waiter.wait(name=var_store['name'])

 print(f"variant store {var_store['name']} ready for use")
except botocore.exceptions.WaiterError as e:
 print(f"variant store {var_store['name']} FAILED:")
 print(e)

var_store = omics.get_variant_store(name=var_store['name'])

After the store is created, navigate to Lake Formation and create resource links as described in the AWS HealthOmics [documentation](https://docs.aws.amazon.com/omics/latest/dev/creating-variant-stores.html#create-resource-links). These resource links will be needed for querying the data in Amazon Athena below.

In the meantime, you can continue importing data (these can be done in parallel).

### Step 3 - Import VCFs

In [None]:
response = omics.start_variant_import_job(
 destinationName=var_store['name'],
 roleArn=get_role_arn(omics_iam_name),
 
 # since we only have 3 vcfs to import we can use a simple list comprehension here
 # variant import jobs have a limit of 1000 sources per import
 items=[{"source": uri} for uri in vcf_list]
)
response

Variant import jobs can take up to 10min to complete, depending on the size of the data being imported. We can monitor the progress of our imports by periodically listing them. This will not block the kernel so we can do other things.

In [None]:
omics.list_variant_import_jobs(filter={"storeName": var_store['name']})

### Step 4 - Query in Athena

In order to query your datain Amazon Athena, you need to create resource links to your database using the [AWS LakeFormation Console](https://console.aws.amazon.com/lakeformation/home). You will also need to ensure that the IAM user running this notebook is a Data Lake Administrator. **Note** without both of these in place, the following queries will fail. To satisfy these prerequisites, refer to the instructions provided in the [AWS Lake Formation documention](https://docs.aws.amazon.com/lake-formation/latest/dg/getting-started-setup.html#create-data-lake-admin) and [AWS HealthOmics documentation](https://docs.aws.amazon.com/omics/latest/dev/creating-variant-stores.html#create-resource-links).

The following code will create resource links to the `default` database in the `AwsDataCatalog` in AWS Glue. It makes a few assumptions to do so - like IAM identity you are using to run this notebook is a Data Lake Administrator and has the permissions to create AWS Glue tables.

If you want to be fully sure you are making the correct resource links and providing access to them to the correct identities it is best to create them directly. Refer to the instructions in the AWS HealthOmics [documentation](https://docs.aws.amazon.com/omics/latest/dev/creating-variant-stores.html#create-resource-links) on how to do this.

We'll need to work with AWS RAM, AWS Glue, and AWS Lakeformation to setup resource links and grant database permissions.

In [None]:
ram = boto3.client('ram')
glue = boto3.client('glue')

caller_identity = boto3.client('sts').get_caller_identity()
AWS_ACCOUNT_ID = caller_identity['Account']
AWS_IDENITY_ARN = caller_identity['Arn']

First we'll list available shared resources from `OTHER-ACCOUNTS` in AWS RAM and look for the resource that matches the `id` of the Variant store we created above.

In [None]:
response = ram.list_resources(resourceOwner='OTHER-ACCOUNTS', resourceType='glue:Database')

if not response.get('resources'):
 print('no shared resources found. verify that you have successfully created an HealthOmics Analytics store')
else:
 variantstore_resources = [resource for resource in response['resources'] if var_store['id'] in resource['arn']]
 if not variantstore_resources:
 print(f"no shared resources matching variant store id {var_store['id']} found")
 else:
 variantstore_resource = variantstore_resources[0]

variantstore_resource

Next, we'll get the specific resource share the Variant store is associated with. This is so we can get the `owningAccountId` attribute for the share. (Note we could also do this by parsing the `resourceShareArn` for the resource above, but doing it this way is more explicit)

In [None]:
resource_share = ram.get_resource_shares(
 resourceOwner='OTHER-ACCOUNTS', 
 resourceShareArns=[variantstore_resource['resourceShareArn']])['resourceShares'][0]
resource_share

Now we'll create a table in AWS Glue for the variant store. This is the same as creating a resource link in AWS LakeFormation.

In [None]:
# this creates a resource link to the table for the variant store and adds it to the `default` database
glue.create_table(
 DatabaseName='default',
 TableInput = {
 "Name": var_store['name'],
 "TargetTable": {
 "CatalogId": resource_share['owningAccountId'],
 "DatabaseName": f"variant_{AWS_ACCOUNT_ID}_{var_store['id']}",
 "Name": var_store['name'],
 }
 }
)

For this section of the tutorial, the identity that runs this notebook either:

1. needs to be a Data lake administrator in AWS Lake Formation, or
2. must be granted access to the AWS RAM shared resources by an existing administrator.

The latter pattern is recommended. Both `DESCRIBE` and `SELECT` on the target table for the variant store is required and can be done via the "Grant on target" action on a resource link in the [AWS LakeFormation Console](https://docs.aws.amazon.com/lake-formation/latest/dg/granting-table-permissions.html). You can also do this with and admin identity via the SDK with code like:

```python
lfn = boto3.client('lakeformation')

lfn.grant_permissions(
 Principal={
 "DataLakePrincipalIdentifier": principal
 },
 Resource={
 "Table": {
 "CatalogId": resource_share['owningAccountId'],
 "DatabaseName": f"variant_{AWS_ACCOUNT_ID}_{var_store['id']}",
 "Name": var_store['name']
 }
 },
 Permissions=[ 'DESCRIBE' ],
 PermissionsWithGrantOption=[ 'DESCRIBE' ]
)

lfn.grant_permissions(
 Principal={
 "DataLakePrincipalIdentifier": principal
 },
 Resource={
 "TableWithColumns": {
 "CatalogId": resource_share['owningAccountId'],
 "DatabaseName": f"variant_{AWS_ACCOUNT_ID}_{var_store['id']}",
 "Name": var_store['name'],
 "ColumnWildcard": {}
 }
 },
 Permissions=[ 'SELECT' ],
 PermissionsWithGrantOption=[ 'SELECT' ]
)
```

To do the former, you can add the identity you are using for this notebook (e.g. if in Amazon Sagemaker, the notebook instance's execution role) as an AWS Lake Formation data lake administrator with the following code:

```python
lfn = boto3.client('lakeformation')

caller_arn = boto3.client('sts').get_caller_identity()['Arn']
caller_identity = caller_arn.split(':')[-1].split('/')

principal = caller_arn # if an IAM user
if caller_identity[0] == 'assumed-role':
 principal = 'role/'.join([caller_arn.replace('sts', 'iam').split('assumed-role')[0], caller_identity[1]])

dls = lfn.get_data_lake_settings()['DataLakeSettings']
if principal not in [admin['DataLakePrincipalIdentifier'] for admin in dls['DataLakeAdmins']]:
 dls['DataLakeAdmins'] += [{'DataLakePrincipalIdentifier': principal}]

lfn.put_data_lake_settings(DataLakeSettings=dls)
```

Now that we have resource links created, we can start quering the data using [Amazon Athena](https://console.aws.amazon.com/athena). You don't need to wait for all the import jobs to complete to start doing this. Queries can be made while data imports in the background.

To query AWS HealthOmics Analytics stores, you need to use Athena engine version 3. The following code checks if you have an existing Athena workgroup that satisfies this criteria. If not it will create one called `omics`.

In [None]:
athena = boto3.client('athena')

In [None]:
athena_workgroups = athena.list_work_groups()['WorkGroups']
athena_workgroups

In [None]:
athena_workgroups = athena.list_work_groups()['WorkGroups']

athena_workgroup = None
for wg in athena_workgroups:
 print(wg['EngineVersion']['EffectiveEngineVersion'])
 if wg['EngineVersion']['EffectiveEngineVersion'] == 'Athena engine version 3':
 print(f"Workgroup '{wg['Name']}' found using Athena engine version 3")
 athena_workgroup = wg
 break
else:
 print("No workgroups with Athena engine version 3 found. creating one")
 athena_workgroup = athena.create_work_group(
 Name='omics',
 Configuration={
 "EngineVersion": {
 "SelectedEngineVersion": "Athena engine version 3"
 }
 }
 )

athena_workgroup

Let's start writing queries!

For fun, let's calculate the TI/TV ratio for these samples. You can navigate to the Athena console or do this from your Jupyter Notebook. This example uses the workgroup `omics` and assumes you have made a resource link to your Variant store in your `default` database.

In [None]:
titv_query = f"""
WITH dataset AS (
 SELECT 
 referenceallele,
 alternatealleles,
 if (cardinality(calls) = 2 and element_at(calls,1)=element_at(calls, 2),2,1) as allelecount, 
 sampleid
 FROM "default"."{var_store['name']}"
 WHERE filters[1] = 'PASS'
) 
SELECT 
 sampleid, 
 CAST (sum(CASE WHEN
 referenceallele in ('C','T') and alternateallele in ('C','T') THEN allelecount
 WHEN referenceallele in ('A','G') and alternateallele in ('A','G') THEN allelecount ELSE 0 END) AS double )/ sum(CASE WHEN
 referenceallele in ('C','T') and alternateallele in ('A','G') THEN allelecount
 WHEN referenceallele in ('A','G') and alternateallele in ('C','T') THEN allelecount ELSE 0 END) as titv_ratio
FROM dataset
CROSS JOIN UNNEST(alternatealleles) as t(alternateallele)
WHERE 
 length(referenceallele) = 1 and length(alternateallele) = 1
GROUP BY 
 sampleid
"""


We can now use [AWS Wrangler](https://aws-sdk-pandas.readthedocs.io/en/stable/) to submit the query and return results as a Pandas Dataframe

If you don't have AWS Wrangler installed in your environment you can get it by running the following cell. Otherwise, you can skip to the next one.

In [None]:
!pip install awswrangler

In [None]:
import awswrangler as wr

In [None]:
df_titv = wr.athena.read_sql_query(
 titv_query, 
 database='default', 
 workgroup=athena_workgroup['Name'])

In [None]:
df_titv

## Annotation Store

Now, let's set up an Annotation store.

AWS HealthOmics Annotation stores support annotations in VCF, GFF, and TSV formats. In this tutorial, we import [ClinVar](https://www.ncbi.nlm.nih.gov/clinvar/) annotations which can be [downloaded](https://ftp.ncbi.nlm.nih.gov/pub/clinvar/vcf_GRCh38/) from the NCBI as a VCF file. Imports need to come from an S3 location in the same region, so we'll use a copy in a regional bucket for this tutorial.

In [None]:
SOURCE_ANNOTATION_URI = f"s3://{regional_bucket}/omics-tutorials/data/annotations/clinvar.vcf.gz"

### Creating, importing data into, and querying an Annotation store

The process of creating, importing data into, and querying an Annotation store is similar to the process you did above for the Variant store, so we'll be brief on the descriptions of each step.

#### Creating ...

In [None]:
ann_store_name = f'tutorial_annotations_{ts.lower()}'
ref_name = 'GRCh38' ## Change this reference name to match one you have created if needed

In [None]:
response = omics.create_annotation_store(
 name=ann_store_name, 
 reference={"referenceArn": get_reference_arn(ref_name, omics)},
 storeFormat='VCF' ## <<--- THIS IS UNIQUE TO ANNOTATION STORES
)

ann_store = response
response

In [None]:
try:
 waiter = omics.get_waiter('annotation_store_created')
 waiter.wait(name=ann_store['name'])

 print(f"annotation store {ann_store['name']} ready for use")
except botocore.exceptions.WaiterError as e:
 print(f"annotation store {ann_store['name']} FAILED:")
 print(e)

ann_store = omics.get_annotation_store(name=ann_store['name'])

#### Importing data ...

Since the ClinVar dataset relatively small, this should take under 2min to complete.

In [None]:
response = omics.start_annotation_import_job(
 destinationName=ann_store['name'],
 roleArn=get_role_arn(omics_iam_name),
 items=[{"source": SOURCE_ANNOTATION_URI}]
)
response

In [None]:
omics.list_annotation_import_jobs(filter={"storeName": ann_store['name']})

#### Querying ...

Creating a resource link to the Annotation store is the same as with the Variant store. We'll do this all in one cell below.

In [None]:
response = ram.list_resources(resourceOwner='OTHER-ACCOUNTS', resourceType='glue:Database')

if not response.get('resources'):
 print('no shared resources found. verify that you have successfully created an HealthOmics Analytics store')
else:
 annotationstore_resources = [resource for resource in response['resources'] if ann_store['id'] in resource['arn']]
 if not annotationstore_resources:
 print(f"no shared resources matching annotation store id {ann_store['id']} found")
 else:
 annotationstore_resource = annotationstore_resources[0]

 resource_share = ram.get_resource_shares(
 resourceOwner='OTHER-ACCOUNTS', 
 resourceShareArns=[annotationstore_resource['resourceShareArn']])['resourceShares'][0]
 
 # this creates a resource link to the table for the annotation store and adds it to the `default` database
 glue.create_table(
 DatabaseName='default',
 TableInput = {
 "Name": ann_store['name'],
 "TargetTable": {
 "CatalogId": resource_share['owningAccountId'],
 "DatabaseName": f"annotation_{AWS_ACCOUNT_ID}_{ann_store['id']}",
 "Name": ann_store['name'],
 }
 }
 )


Querying the data and returning a Pandas Dataframe:

In [None]:
# this query identifies single nucleotide variants associated 
# with Non-small cell lung cancer (SNOMED_CT 254637007)
# with "drug_response" clinical significance
ncsclc_query = f"""
SELECT 
 concat('chr', contigname) as contigname, 
 start, 
 referenceallele,
 alternatealleles,
 attributes['GENEINFO'] as gene_info,
 attributes['CLNSIG'] as clinical_significance, 
 regexp_extract_all(attributes['CLNDISDB'], 'SNOMED_CT:\d+') as snomed_ct, 
 attributes['CLNDN'] as disease_name
from "default"."{ann_store['name']}"
where 
 attributes['CLNDISDB'] like '%SNOMED_CT:254637007%'
 and length(referenceallele) = 1
 and cardinality(alternatealleles) = 1
 and length(alternatealleles[1]) = 1
"""

In [None]:
df_nsclc = wr.athena.read_sql_query(
 ncsclc_query, 
 database='default', 
 workgroup=athena_workgroup['Name'])

In [None]:
df_nsclc