{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"import base64\n",
"import ujson as json\n",
"import pandas as pd\n",
"import boto3\n",
"from sagemaker import get_execution_role"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"role = get_execution_role()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def extract_description(x):\n",
" description = []\n",
" extract = x[0]['attachment']['data']\n",
" myString = base64.b64decode(extract)\n",
" test_list=myString.decode(\"unicode_escape\").replace('
', '').replace('', '').replace('', '').splitlines()\n",
" description = list(test_list)\n",
" return description\n",
" \n",
"def extract_patient_id(x):\n",
" x = x['reference']\n",
" x = x.split(\"/\")[-1]\n",
" x = x.split(\"}\")[0]\n",
" return x\n",
"\n",
"def extract_id(x):\n",
" x = x['id']\n",
" return x"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"aws_acct_id=boto3.client('sts').get_caller_identity().get('Account')\n",
"your_bucket_export=f'hl-synthea-export-{aws_acct_id}'"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"s3_resource = boto3.resource('s3')\n",
"\n",
"s3_client = boto3.client('s3')\n",
"bucket = s3_resource.Bucket(name=your_bucket_export)\n",
"\n",
"for obj in bucket.objects.all():\n",
" if 'DocumentReference-0.ndjson' in obj.key:\n",
" key_path=obj.key"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"s3_resource.Bucket(your_bucket_export).download_file(key_path, 'DocumentReference-0.ndjson')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"records = map(json.loads, open('/home/ec2-user/SageMaker/DocumentReference-0.ndjson'))\n",
"df = pd.DataFrame.from_records(records)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"df.head(1)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"df['description'] = df['content'].apply(extract_description)\n",
"df['patient_id'] = df['subject'].apply(extract_patient_id)\n",
"df['unique_id'] = df['patient_id']+'___'+df['id']+'.txt'"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"df_test_v1 = df[['unique_id', 'description']]\n",
"df_test_v1.head()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"your_bucket = f'hl-synthea-source-{aws_acct_id}'"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"for i, row in df.iterrows():\n",
" if i > len(df):\n",
" break\n",
" else:\n",
" f = open(row['unique_id'], 'w')\n",
" l1=map(lambda x:x+'\\n', row['description'])\n",
" f.writelines(l1)\n",
" f.close()\n",
" boto3.Session().resource('s3').Bucket(your_bucket).Object(os.path.join('source/', f.name)).upload_file(f.name)\n",
" os.remove(f'/home/ec2-user/SageMaker/{f.name}') \n",
" i+=1"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"pathtosave=os.path.abspath(os.getcwd())\n",
"\n",
"for eachRow in range(0, len(df)):\n",
" column_names = ['Score', 'Type', 'Text', 'File']\n",
" dftestCol = pd.DataFrame(columns = column_names)\n",
" test=df['extension'][eachRow][0]['extension']\n",
" for diffEnt in range(0, len(test)):\n",
" setEnt=test[diffEnt]['extension'][2:]\n",
" for overall in range(0, len(setEnt)):\n",
" dict_pairs = setEnt[overall].items()\n",
" ld = list(dict_pairs) \n",
" if \"http://healthlake.amazonaws.com/aws-cm/infer-icd10/aws-cm-icd10-entity\" == ld[0][1]:\n",
" parseSecondLayer=ld[1][1]\n",
" tempDict = {}\n",
" for i in range(0, len(parseSecondLayer)):\n",
" tempDict.update({'File': 'source/'+df['unique_id'][eachRow]})\n",
" if 'http://healthlake.amazonaws.com/aws-cm/infer-icd10/aws-cm-icd10-entity-text' in list(parseSecondLayer[i].values())[0]:\n",
" tempDict.update({'Text': list(parseSecondLayer[i].values())[1]})\n",
" elif 'http://healthlake.amazonaws.com/aws-cm/infer-icd10/aws-cm-icd10-entity-score' == list(parseSecondLayer[i].values())[0]:\n",
" tempDict.update({'Score': list(parseSecondLayer[i].values())[1]})\n",
" elif 'http://healthlake.amazonaws.com/aws-cm/infer-icd10/aws-cm-icd10-entity-begin-offset' == list(parseSecondLayer[i].values())[0]:\n",
" tempDict.update({'Type': 'ICD10Text'})\n",
" elif 'http://healthlake.amazonaws.com/aws-cm/infer-icd10/aws-cm-icd10-entity-ConceptList' == list(parseSecondLayer[i].values())[0]:\n",
" additional_extension=list(parseSecondLayer[i].values())[1]\n",
" for j in range(0, len(additional_extension)):\n",
" for k in range(0, len(additional_extension[j]['extension'])):\n",
" if 'http://healthlake.amazonaws.com/aws-cm/infer-icd10/aws-cm-icd10-entity-Concept-Code' == list(additional_extension[0]['extension'][k].values())[0]:\n",
" Code = list(additional_extension[j]['extension'][k].values())[1]\n",
" elif 'http://healthlake.amazonaws.com/aws-cm/infer-icd10/aws-cm-icd10-entity-Concept-Description' == list(additional_extension[0]['extension'][k].values())[0]:\n",
" DescriptionText = list(additional_extension[j]['extension'][k].values())[1]\n",
" elif 'http://healthlake.amazonaws.com/aws-cm/infer-icd10/aws-cm-icd10-entity-Concept-Score' == list(additional_extension[0]['extension'][k].values())[0]:\n",
" Score = list(additional_extension[j]['extension'][k].values())[1]\n",
" dftestCol = dftestCol.append(pd.DataFrame({'Score': Score, 'Type': 'ICD10Code', 'Text': Code, 'File': 'source/'+df['unique_id'][eachRow]}, index=[0]), ignore_index=True)\n",
" dftestCol = dftestCol.append(pd.DataFrame({'Score': Score, 'Type': 'ICD10description', 'Text': DescriptionText, 'File': 'source/'+df['unique_id'][eachRow]}, index=[0]), ignore_index=True)\n",
" c_maxes = dftestCol.groupby(['Type', 'Text']).Score.transform(max)\n",
" dftestCol = dftestCol.loc[dftestCol.Score == c_maxes]\n",
" dftestCol = dftestCol.append(pd.DataFrame(tempDict, index=[0]), ignore_index=True)\n",
" c_maxes = dftestCol.groupby(['Type', 'Text']).Score.transform(max)\n",
" dftestCol = dftestCol.loc[dftestCol.Score == c_maxes]\n",
" elif \"http://healthlake.amazonaws.com/aws-cm/detect-entities/aws-cm-de-entity\" in ld[0][1]:\n",
" parseSecondLayer=ld[1][1]\n",
" tempDict = {}\n",
" for i in range(0, len(parseSecondLayer)):\n",
" if 'http://healthlake.amazonaws.com/aws-cm/detect-entities/aws-cm-de-entity-text' == list(parseSecondLayer[i].values())[0]:\n",
" tempDict.update({'Text': list(parseSecondLayer[i].values())[1]})\n",
" tempDict.update({'File': 'source/'+df['unique_id'][eachRow]})\n",
" elif 'http://healthlake.amazonaws.com/aws-cm/detect-entities/aws-cm-de-entity-score' == list(parseSecondLayer[i].values())[0]:\n",
" tempDict.update({'Score': list(parseSecondLayer[i].values())[1]})\n",
" elif 'http://healthlake.amazonaws.com/aws-cm/detect-entities/aws-cm-de-entity-type' == list(parseSecondLayer[i].values())[0]:\n",
" tempDict.update({'Type': list(parseSecondLayer[i].values())[1]})\n",
" dftestCol = dftestCol.append(pd.DataFrame(tempDict, index=[0]), ignore_index=True)\n",
" c_maxes = dftestCol.groupby(['Type', 'Text']).Score.transform(max)\n",
" dftestCol = dftestCol.loc[dftestCol.Score == c_maxes]\n",
" elif \"http://healthlake.amazonaws.com/aws-cm/infer-rxnorm/aws-cm-rxnorm-entity\" == ld[0][1]:\n",
" parseSecondLayer=ld[1][1]\n",
" tempDict = {}\n",
" for i in range(0, len(parseSecondLayer)):\n",
" tempDict.update({'File': 'source/'+df['unique_id'][eachRow]})\n",
" if 'http://healthlake.amazonaws.com/aws-cm/infer-rxnorm/aws-cm-rxnorm-entity-text' in list(parseSecondLayer[i].values())[0]:\n",
" tempDict.update({'Text': list(parseSecondLayer[i].values())[1]})\n",
" elif 'http://healthlake.amazonaws.com/aws-cm/infer-rxnorm/aws-cm-rxnorm-entity-score' == list(parseSecondLayer[i].values())[0]:\n",
" tempDict.update({'Score': list(parseSecondLayer[i].values())[1]})\n",
" elif 'http://healthlake.amazonaws.com/aws-cm/infer-rxnorm/aws-cm-rxnorm-entity-begin-offset' == list(parseSecondLayer[i].values())[0]:\n",
" tempDict.update({'Type': 'RxNorm'})\n",
" elif 'http://healthlake.amazonaws.com/aws-cm/infer-rxnorm/aws-cm-rxnorm-entity-ConceptList' == list(parseSecondLayer[i].values())[0]:\n",
" additional_extension=list(parseSecondLayer[i].values())[1]\n",
" for j in range(0, len(additional_extension)):\n",
" for k in range(0, len(additional_extension[j]['extension'])):\n",
" if 'http://healthlake.amazonaws.com/aws-cm/infer-rxnorm/aws-cm-rxnorm-entity-Concepts-Code' == list(additional_extension[0]['extension'][k].values())[0]:\n",
" Code = list(additional_extension[j]['extension'][k].values())[1]\n",
" elif 'http://healthlake.amazonaws.com/aws-cm/infer-rxnorm/aws-cm-rxnorm-entity-Concepts-Description' == list(additional_extension[0]['extension'][k].values())[0]:\n",
" DescriptionText = list(additional_extension[j]['extension'][k].values())[1]\n",
" elif 'http://healthlake.amazonaws.com/aws-cm/infer-rxnorm/aws-cm-rxnorm-entity-Concepts-Score' == list(additional_extension[0]['extension'][k].values())[0]:\n",
" Score = list(additional_extension[j]['extension'][k].values())[1]\n",
" dftestCol = dftestCol.append(pd.DataFrame({'Score': Score, 'Type': 'RxnormCode', 'Text': Code, 'File': 'source/'+df['unique_id'][eachRow]}, index=[0]), ignore_index=True)\n",
" dftestCol = dftestCol.append(pd.DataFrame({'Score': Score, 'Type': 'RxnormDescription', 'Text': DescriptionText, 'File': 'source/'+df['unique_id'][eachRow]}, index=[0]), ignore_index=True)\n",
" c_maxes = dftestCol.groupby(['Type', 'Text']).Score.transform(max)\n",
" dftestCol = dftestCol.loc[dftestCol.Score == c_maxes]\n",
" dftestCol = dftestCol.append(pd.DataFrame(tempDict, index=[0]), ignore_index=True)\n",
" c_maxes = dftestCol.groupby(['Type', 'Text']).Score.transform(max)\n",
" dftestCol = dftestCol.loc[dftestCol.Score == c_maxes]\n",
" dftestCol.to_csv(os.path.join(pathtosave, df['patient_id'][eachRow]+'___'+df['id'][eachRow]+'.csv'), index = False)\n",
" boto3.Session().resource('s3').Bucket(your_bucket).Object(os.path.join('stdized-data/comprehend_results/csv/', df['patient_id'][eachRow]+'___'+df['id'][eachRow]+'.csv')).upload_file(df['patient_id'][eachRow]+'___'+df['id'][eachRow]+'.csv')\n",
" os.remove(df['patient_id'][eachRow]+'___'+df['id'][eachRow]+'.csv')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"dftestCol"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "conda_python3",
"language": "python",
"name": "conda_python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.13"
}
},
"nbformat": 4,
"nbformat_minor": 4
}