{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import os\n", "import base64\n", "import ujson as json\n", "import pandas as pd\n", "import boto3\n", "from sagemaker import get_execution_role" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "role = get_execution_role()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def extract_description(x):\n", " description = []\n", " extract = x[0]['attachment']['data']\n", " myString = base64.b64decode(extract)\n", " test_list=myString.decode(\"unicode_escape\").replace('
', '').replace('', '').replace('', '').splitlines()\n", " description = list(test_list)\n", " return description\n", " \n", "def extract_patient_id(x):\n", " x = x['reference']\n", " x = x.split(\"/\")[-1]\n", " x = x.split(\"}\")[0]\n", " return x\n", "\n", "def extract_id(x):\n", " x = x['id']\n", " return x" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "aws_acct_id=boto3.client('sts').get_caller_identity().get('Account')\n", "your_bucket_export=f'hl-synthea-export-{aws_acct_id}'" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "s3_resource = boto3.resource('s3')\n", "\n", "s3_client = boto3.client('s3')\n", "bucket = s3_resource.Bucket(name=your_bucket_export)\n", "\n", "for obj in bucket.objects.all():\n", " if 'DocumentReference-0.ndjson' in obj.key:\n", " key_path=obj.key" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "s3_resource.Bucket(your_bucket_export).download_file(key_path, 'DocumentReference-0.ndjson')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "records = map(json.loads, open('/home/ec2-user/SageMaker/DocumentReference-0.ndjson'))\n", "df = pd.DataFrame.from_records(records)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df.head(1)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df['description'] = df['content'].apply(extract_description)\n", "df['patient_id'] = df['subject'].apply(extract_patient_id)\n", "df['unique_id'] = df['patient_id']+'___'+df['id']+'.txt'" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df_test_v1 = df[['unique_id', 'description']]\n", "df_test_v1.head()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "your_bucket = f'hl-synthea-source-{aws_acct_id}'" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "for i, row in df.iterrows():\n", " if i > len(df):\n", " break\n", " else:\n", " f = open(row['unique_id'], 'w')\n", " l1=map(lambda x:x+'\\n', row['description'])\n", " f.writelines(l1)\n", " f.close()\n", " boto3.Session().resource('s3').Bucket(your_bucket).Object(os.path.join('source/', f.name)).upload_file(f.name)\n", " os.remove(f'/home/ec2-user/SageMaker/{f.name}') \n", " i+=1" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pathtosave=os.path.abspath(os.getcwd())\n", "\n", "for eachRow in range(0, len(df)):\n", " column_names = ['Score', 'Type', 'Text', 'File']\n", " dftestCol = pd.DataFrame(columns = column_names)\n", " test=df['extension'][eachRow][0]['extension']\n", " for diffEnt in range(0, len(test)):\n", " setEnt=test[diffEnt]['extension'][2:]\n", " for overall in range(0, len(setEnt)):\n", " dict_pairs = setEnt[overall].items()\n", " ld = list(dict_pairs) \n", " if \"http://healthlake.amazonaws.com/aws-cm/infer-icd10/aws-cm-icd10-entity\" == ld[0][1]:\n", " parseSecondLayer=ld[1][1]\n", " tempDict = {}\n", " for i in range(0, len(parseSecondLayer)):\n", " tempDict.update({'File': 'source/'+df['unique_id'][eachRow]})\n", " if 'http://healthlake.amazonaws.com/aws-cm/infer-icd10/aws-cm-icd10-entity-text' in list(parseSecondLayer[i].values())[0]:\n", " tempDict.update({'Text': list(parseSecondLayer[i].values())[1]})\n", " elif 'http://healthlake.amazonaws.com/aws-cm/infer-icd10/aws-cm-icd10-entity-score' == list(parseSecondLayer[i].values())[0]:\n", " tempDict.update({'Score': list(parseSecondLayer[i].values())[1]})\n", " elif 'http://healthlake.amazonaws.com/aws-cm/infer-icd10/aws-cm-icd10-entity-begin-offset' == list(parseSecondLayer[i].values())[0]:\n", " tempDict.update({'Type': 'ICD10Text'})\n", " elif 'http://healthlake.amazonaws.com/aws-cm/infer-icd10/aws-cm-icd10-entity-ConceptList' == list(parseSecondLayer[i].values())[0]:\n", " additional_extension=list(parseSecondLayer[i].values())[1]\n", " for j in range(0, len(additional_extension)):\n", " for k in range(0, len(additional_extension[j]['extension'])):\n", " if 'http://healthlake.amazonaws.com/aws-cm/infer-icd10/aws-cm-icd10-entity-Concept-Code' == list(additional_extension[0]['extension'][k].values())[0]:\n", " Code = list(additional_extension[j]['extension'][k].values())[1]\n", " elif 'http://healthlake.amazonaws.com/aws-cm/infer-icd10/aws-cm-icd10-entity-Concept-Description' == list(additional_extension[0]['extension'][k].values())[0]:\n", " DescriptionText = list(additional_extension[j]['extension'][k].values())[1]\n", " elif 'http://healthlake.amazonaws.com/aws-cm/infer-icd10/aws-cm-icd10-entity-Concept-Score' == list(additional_extension[0]['extension'][k].values())[0]:\n", " Score = list(additional_extension[j]['extension'][k].values())[1]\n", " dftestCol = dftestCol.append(pd.DataFrame({'Score': Score, 'Type': 'ICD10Code', 'Text': Code, 'File': 'source/'+df['unique_id'][eachRow]}, index=[0]), ignore_index=True)\n", " dftestCol = dftestCol.append(pd.DataFrame({'Score': Score, 'Type': 'ICD10description', 'Text': DescriptionText, 'File': 'source/'+df['unique_id'][eachRow]}, index=[0]), ignore_index=True)\n", " c_maxes = dftestCol.groupby(['Type', 'Text']).Score.transform(max)\n", " dftestCol = dftestCol.loc[dftestCol.Score == c_maxes]\n", " dftestCol = dftestCol.append(pd.DataFrame(tempDict, index=[0]), ignore_index=True)\n", " c_maxes = dftestCol.groupby(['Type', 'Text']).Score.transform(max)\n", " dftestCol = dftestCol.loc[dftestCol.Score == c_maxes]\n", " elif \"http://healthlake.amazonaws.com/aws-cm/detect-entities/aws-cm-de-entity\" in ld[0][1]:\n", " parseSecondLayer=ld[1][1]\n", " tempDict = {}\n", " for i in range(0, len(parseSecondLayer)):\n", " if 'http://healthlake.amazonaws.com/aws-cm/detect-entities/aws-cm-de-entity-text' == list(parseSecondLayer[i].values())[0]:\n", " tempDict.update({'Text': list(parseSecondLayer[i].values())[1]})\n", " tempDict.update({'File': 'source/'+df['unique_id'][eachRow]})\n", " elif 'http://healthlake.amazonaws.com/aws-cm/detect-entities/aws-cm-de-entity-score' == list(parseSecondLayer[i].values())[0]:\n", " tempDict.update({'Score': list(parseSecondLayer[i].values())[1]})\n", " elif 'http://healthlake.amazonaws.com/aws-cm/detect-entities/aws-cm-de-entity-type' == list(parseSecondLayer[i].values())[0]:\n", " tempDict.update({'Type': list(parseSecondLayer[i].values())[1]})\n", " dftestCol = dftestCol.append(pd.DataFrame(tempDict, index=[0]), ignore_index=True)\n", " c_maxes = dftestCol.groupby(['Type', 'Text']).Score.transform(max)\n", " dftestCol = dftestCol.loc[dftestCol.Score == c_maxes]\n", " elif \"http://healthlake.amazonaws.com/aws-cm/infer-rxnorm/aws-cm-rxnorm-entity\" == ld[0][1]:\n", " parseSecondLayer=ld[1][1]\n", " tempDict = {}\n", " for i in range(0, len(parseSecondLayer)):\n", " tempDict.update({'File': 'source/'+df['unique_id'][eachRow]})\n", " if 'http://healthlake.amazonaws.com/aws-cm/infer-rxnorm/aws-cm-rxnorm-entity-text' in list(parseSecondLayer[i].values())[0]:\n", " tempDict.update({'Text': list(parseSecondLayer[i].values())[1]})\n", " elif 'http://healthlake.amazonaws.com/aws-cm/infer-rxnorm/aws-cm-rxnorm-entity-score' == list(parseSecondLayer[i].values())[0]:\n", " tempDict.update({'Score': list(parseSecondLayer[i].values())[1]})\n", " elif 'http://healthlake.amazonaws.com/aws-cm/infer-rxnorm/aws-cm-rxnorm-entity-begin-offset' == list(parseSecondLayer[i].values())[0]:\n", " tempDict.update({'Type': 'RxNorm'})\n", " elif 'http://healthlake.amazonaws.com/aws-cm/infer-rxnorm/aws-cm-rxnorm-entity-ConceptList' == list(parseSecondLayer[i].values())[0]:\n", " additional_extension=list(parseSecondLayer[i].values())[1]\n", " for j in range(0, len(additional_extension)):\n", " for k in range(0, len(additional_extension[j]['extension'])):\n", " if 'http://healthlake.amazonaws.com/aws-cm/infer-rxnorm/aws-cm-rxnorm-entity-Concepts-Code' == list(additional_extension[0]['extension'][k].values())[0]:\n", " Code = list(additional_extension[j]['extension'][k].values())[1]\n", " elif 'http://healthlake.amazonaws.com/aws-cm/infer-rxnorm/aws-cm-rxnorm-entity-Concepts-Description' == list(additional_extension[0]['extension'][k].values())[0]:\n", " DescriptionText = list(additional_extension[j]['extension'][k].values())[1]\n", " elif 'http://healthlake.amazonaws.com/aws-cm/infer-rxnorm/aws-cm-rxnorm-entity-Concepts-Score' == list(additional_extension[0]['extension'][k].values())[0]:\n", " Score = list(additional_extension[j]['extension'][k].values())[1]\n", " dftestCol = dftestCol.append(pd.DataFrame({'Score': Score, 'Type': 'RxnormCode', 'Text': Code, 'File': 'source/'+df['unique_id'][eachRow]}, index=[0]), ignore_index=True)\n", " dftestCol = dftestCol.append(pd.DataFrame({'Score': Score, 'Type': 'RxnormDescription', 'Text': DescriptionText, 'File': 'source/'+df['unique_id'][eachRow]}, index=[0]), ignore_index=True)\n", " c_maxes = dftestCol.groupby(['Type', 'Text']).Score.transform(max)\n", " dftestCol = dftestCol.loc[dftestCol.Score == c_maxes]\n", " dftestCol = dftestCol.append(pd.DataFrame(tempDict, index=[0]), ignore_index=True)\n", " c_maxes = dftestCol.groupby(['Type', 'Text']).Score.transform(max)\n", " dftestCol = dftestCol.loc[dftestCol.Score == c_maxes]\n", " dftestCol.to_csv(os.path.join(pathtosave, df['patient_id'][eachRow]+'___'+df['id'][eachRow]+'.csv'), index = False)\n", " boto3.Session().resource('s3').Bucket(your_bucket).Object(os.path.join('stdized-data/comprehend_results/csv/', df['patient_id'][eachRow]+'___'+df['id'][eachRow]+'.csv')).upload_file(df['patient_id'][eachRow]+'___'+df['id'][eachRow]+'.csv')\n", " os.remove(df['patient_id'][eachRow]+'___'+df['id'][eachRow]+'.csv')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "dftestCol" ] } ], "metadata": { "kernelspec": { "display_name": "conda_python3", "language": "python", "name": "conda_python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.13" } }, "nbformat": 4, "nbformat_minor": 4 }