In [1]:
# https://docs.gdc.cancer.gov/API/Users_Guide/Search_and_Retrieval/

import requests
import json
import boto3
import re
import gzip
import pandas as pd
import dask
from dask.distributed import Client

data_endpt = 'https://api.gdc.cancer.gov/data'
cases_endpt = 'https://api.gdc.cancer.gov/cases'
files_endpt = 'https://api.gdc.cancer.gov/files'
indexd_endpt = 'https://nci-crdc.datacommons.io/index/index/'


In [2]:
## Query Settings

# primary_site = "Breast"
project_id = "TCGA-BRCA"

data_type = "Gene Expression Quantification" # RNA-Seq
workflow_type = "HTSeq - Counts"
size = 2000

# The 'fields' parameter is passed as a comma-separated string of single names.
fields = [
      "file_name"
    , "cases.primary_site"
    , "cases.case_id"
    , "cases.project.project_id"
    , "cases.days_to_lost_to_followup"
    , "cases.submitter_id"
    , "cases.samples.submitter_id"
    , "cases.samples.sample_id"

]

fields = ','.join(fields)

#cases.project.project_id in ["TCGA-BRCA"] and files.data_type in ["Gene Expression Quantification"]
filters = {
    "op":"and",
    "content":[
        {"op": "in",
        "content":{
            "field": "cases.project.project_id",
            "value": [project_id]
            }
        },
        {"op": "in",
        "content":{
            "field": "files.data_type",
            "value": [data_type]
            }
        },
        {"op": "in",
        "content":{
            "field": "files.analysis.workflow_type",
            "value": [workflow_type]
            }
        }
    ]
}

# With a GET request, the filters parameter needs to be converted
# from a dictionary to JSON-formatted string

params = {
    "filters": json.dumps(filters),
    "fields": fields,
    "format": "JSON",
    "size": size
    }



In [3]:
## Get Files

query_response = requests.get(files_endpt, params = params)

json_response = json.loads(query_response.content.decode("utf-8"))["data"]["hits"]

print (len(json_response))
##print(json_response)

files_json = json_response

1222


In [4]:
## Scale out Dask Cluster
ecs = boto3.client('ecs')
resp = ecs.list_clusters()
clusters = resp['clusterArns']
if len(clusters) > 1:
    print("Please manually select your cluster")
cluster = clusters[0]

numWorkers=10
ecs.update_service(cluster=cluster, service='Dask-Worker', desiredCount=numWorkers)
ecs.get_waiter('services_stable').wait(cluster=cluster, services=['Dask-Worker'])

In [5]:
client = Client('Dask-Scheduler.local-dask:8786')
client


+---------+----------------+---------------+---------------+
| Package | client         | scheduler     | workers       |
+---------+----------------+---------------+---------------+
| python  | 3.6.10.final.0 | 3.7.4.final.0 | 3.7.4.final.0 |
+---------+----------------+---------------+---------------+
Notes: 
-  python: Variation is sometimes ok, sometimes not. It depends on your workloads


0,1
Client  Scheduler: tcp://Dask-Scheduler.local-dask:8786  Dashboard: http://Dask-Scheduler.local-dask:8787/status,Cluster  Workers: 10  Cores: 10  Memory: 70.00 GB


In [6]:
@dask.delayed
def get_data(uuid, sample_submitter_id):
    query_response = requests.get(indexd_endpt + "/" + uuid)
    urls_response = json.loads(query_response.content.decode("utf-8"))["urls"]
    url = [x for x in urls_response if x.startswith("s3://")]
    if len(url) != 1:
        print("Something weird with UUID " + uuid + "returned " + str(url))
    url = url[0]
    content = pd.read_csv(url, compression='gzip', header=None, dtype=str, sep="\t")
    content.index = content[0]
    content.columns = ['id', sample_submitter_id]
    content = content[[sample_submitter_id]]
    return content

delayed_results = []
for file_entry in files_json:
    delayed_results.append(get_data(file_entry["id"], file_entry["cases"][0]["samples"][0]["submitter_id"]))

In [7]:
%%time

df = pd.concat(dask.compute(*delayed_results), axis=1, join="outer")
df

CPU times: user 30.3 s, sys: 4.16 s, total: 34.5 s
Wall time: 57.5 s


Unnamed: 0_level_0,TCGA-E9-A1RI-11A,TCGA-C8-A8HQ-01A,TCGA-BH-A0BJ-11A,TCGA-OL-A66O-01A,TCGA-A7-A13E-11A,TCGA-PL-A8LX-01A,TCGA-A8-A09V-01A,TCGA-AR-A5QM-01A,TCGA-BH-A0AY-11A,TCGA-A7-A2KD-01A,...,TCGA-E9-A226-01A,TCGA-AR-A0U0-01A,TCGA-AO-A129-01A,TCGA-C8-A26Y-01A,TCGA-AR-A2LQ-01A,TCGA-D8-A1XD-01A,TCGA-E2-A570-01A,TCGA-E2-A574-01A,TCGA-D8-A73W-01A,TCGA-B6-A2IU-01A
0,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
ENSG00000000003.13,7639,1505,5050,2796,3400,982,4952,1268,4451,4883,...,3253,1599,3205,2240,3107,2846,4394,4610,1538,570
ENSG00000000005.5,3541,3,154,6,992,0,2,44,660,3,...,1,0,31,0,50,0,1,4,4,24
ENSG00000000419.11,1999,1803,1577,1890,1242,1721,1919,1375,1717,8033,...,1788,2079,4173,4838,1109,2802,2196,3899,1853,603
ENSG00000000457.12,1287,1763,2007,1174,931,759,4909,1490,1343,2769,...,3167,1034,1237,1345,1619,1310,2831,643,2096,2349
ENSG00000000460.15,290,952,319,625,259,182,1647,478,314,1359,...,1990,658,1101,635,394,448,677,668,313,615
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
__no_feature,5349982,4816279,3100857,3337177,2634237,2462158,10968241,2830905,3348946,5958795,...,4836012,2733435,3332673,5664496,2753546,3919188,2826989,2007457,4368255,2230344
__ambiguous,2732915,2862191,2153519,3728715,1877124,2213731,3061277,2389458,2337298,3105025,...,2483817,2712071,2919876,3011731,1890224,1856044,2007640,2516161,2235132,1295034
__too_low_aQual,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
__not_aligned,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0


In [8]:
numWorkers=0
ecs.update_service(cluster=cluster, service='Dask-Worker', desiredCount=numWorkers)
ecs.get_waiter('services_stable').wait(cluster=cluster, services=['Dask-Worker'])