{ "cells": [ { "cell_type": "markdown", "source": [ "# 90-day COVID Diagnosis Prediction with Synthea\n", "---\n", "\n", "By the end of this tutorial, you should be able to create a patient cohort, build a CNN model with Tensorflow, and perform SageMaker batch transform.\n", "\n", "We use Synthea data (synthetic) data to demonstrate the modeling technique. However, readers can choose your own data sets to try. \n", "\n", "## Contents\n", "\n", "1. [Background](#1_Background)\n", "1. [Setup](#2_Setup)\n", "1. [Data](#3_Data)\n", " 1. [Citation](#3_A_Citation)\n", " 1. [Cohort Creation](#3_B_CohortCreation)\n", " 1. [Data Cleaning](#3_C_DataCleaning)\n", "1. [Train](#4_Train)\n", "1. [Testing](#5_Testing)\n", "1. [Evaluation](#6_Evaluation)\n", "1. [Extensions](#7_Extensions)" ], "metadata": {} }, { "cell_type": "markdown", "source": [ "## 1_Background\n", "In this tutorial, you will see an end-to-end example of how to prepare a open dataset and build a convolutional neural network model to predict patient outcomes.\n", "\n", "Healthcare data can be challenging to work with and AWS customers have been looking for solutions to solve certain business challenges with the help of data and machine learning (ML) techniques. Some of the data is structured, such as birthday, gender, and marital status, but most of the data is unstructured, such as diagnosis codes or physician’s notes. This data is designed for human beings to understand, but not for computers to comprehend. The key challenges of using healthcare data are as follows:\n", "\n", "* How to effectively use both structured and unstructured data to get a complete view of the data\n", "* How to intuitively interpret the prediction results\n", "\n", "With the rise of AI/ML technologies, solving these challenges became possible." ], "metadata": {} }, { "cell_type": "markdown", "source": [ "\n", "## 2_Setup\n", "\n", "_This notebook was created and tested on an ml.m5.2xlarge notebook instance._\n", "\n", "Let's start by specifying:\n", "\n", "- The S3 bucket and prefix that you want to use for training, validation and testing data, as well as a bucket to store Athena intermediate results. This should be within the same region as the Notebook Instance.\n", "- The IAM role arn used to give training and hosting access to your data. The `get_execution_role()` function will return the role you configured for this notebook instance. As we will use both Glue catalog and S3, make sure you configured these access permissions in the role. " ], "metadata": {} }, { "cell_type": "code", "execution_count": null, "source": [ "! pip install -r requirements.txt" ], "outputs": [], "metadata": {} }, { "cell_type": "code", "execution_count": null, "source": [ "bucket = 'pop-modeling-'\n", "prefix = 'sagemaker/synthea'\n", "\n", "RANDOM_STATE = 2021\n", "\n", "# Define IAM role\n", "import boto3\n", "import re\n", "from sagemaker import get_execution_role\n", "import sagemaker\n", "sagemaker_session = sagemaker.Session()\n", "\n", "role = get_execution_role()\n", "s3 = boto3.resource('s3')\n", "my_session = boto3.session.Session()" ], "outputs": [], "metadata": { "isConfigCell": true, "tags": [ "parameters" ] } }, { "cell_type": "markdown", "source": [ "Next, we'll import the Python libraries we'll need for the remainder of the tutorial." ], "metadata": {} }, { "cell_type": "code", "execution_count": null, "source": [ "import pandas as pd\n", "import numpy as np\n", "import json\n", "import os\n", "import sys\n", "import sagemaker\n", "import boto3\n", "from botocore.client import ClientError\n", "from sagemaker.predictor import csv_serializer\n", "from sklearn.preprocessing import StandardScaler\n", "from sklearn.impute import SimpleImputer\n", "from sklearn.metrics import roc_curve, auc\n", "from time import strftime, gmtime\n", "from matplotlib import pyplot as plt" ], "outputs": [], "metadata": {} }, { "cell_type": "markdown", "source": [ "---\n", "## 3_Data\n" ], "metadata": {} }, { "cell_type": "markdown", "source": [ "## 3_A_Citation\n", "\n", "SyntheaTM is a Synthetic Patient Population Simulator. The goal is to output synthetic, realistic (but not real), patient data and associated health records in a variety of formats.\n", "\n", "Copyright 2017-2021 The MITRE Corporation\n", "\n", "Licensed under the Apache License, Version 2.0 (the \"License\"); you may not use this file except in compliance with the License. You may obtain a copy of the License at\n", "\n", "http://www.apache.org/licenses/LICENSE-2.0\n", "Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License." ], "metadata": {} }, { "cell_type": "markdown", "source": [ "## 3_B_CohortCreation\n", "After you get access to the dataset, you can read and issue SQL queries via Athena. You can also save the results as a CSV file for feature use. \n", "\n", "In order to get a seamless experience, we will be using PyAthena library, and issue the queries directly from SageMaker notebook. If you have not installed PyAthena, simple do `!pip install pyathena`" ], "metadata": {} }, { "cell_type": "markdown", "source": [ "Now, we have successfully created a table called `pop_main`. Next, we will read the data from the created table, and prepare the data for modeling." ], "metadata": {} }, { "cell_type": "code", "execution_count": null, "source": [ "from pyathena import connect\n", "s3_staging_dir='s3://'+bucket+'/athena/temp'\n", "conn = connect(s3_staging_dir=s3_staging_dir, region_name=my_session.region_name)\n", "query_get_all = \"\"\"SELECT * FROM healthlake_synthea.pop_main\"\"\"\n", "data = pd.read_sql_query(query_get_all, conn)\n", "conn.close()" ], "outputs": [], "metadata": {} }, { "cell_type": "code", "execution_count": null, "source": [ "from ml.data_augument import process_hl_data\n", "hl_data = process_hl_data('data')\n", "hl_data.shape" ], "outputs": [], "metadata": {} }, { "cell_type": "code", "execution_count": null, "source": [ "data['events'] = data['cond_cd'].str[1:-1].str.replace(',', '').replace('null', '').str.strip() + ' ' +\\\n", " data['proc_cd'].str[1:-1].str.replace(',', '').replace('null', '').str.strip() + ' ' +\\\n", " data['rxnorm'].str[1:-1].str.replace(',', '').replace('null', '').str.strip() + ' ' +\\\n", " data['loinc_cd'].str[1:-1].str.replace(',', '').replace('null', '').str.strip()\n", "target_col='covid90'\n", "data = data[['patient_id', 'age', 'gender', 'if_married', 'events', target_col]].dropna()" ], "outputs": [], "metadata": {} }, { "cell_type": "code", "execution_count": null, "source": [ "df = pd.merge(data, hl_data, how='left', on='patient_id')\n", "df['events'] = df['events']+df['code_value'].fillna('').str[1:-1].str.replace(',', '').str.strip()\n", "df = df.drop(['code_value', 'code_description'], axis=1).dropna()\n", "df['events'] = df['events'].apply(lambda x: ' '.join(x.split()))\n", "df = df[df['events']!=''].reset_index(drop=True)" ], "outputs": [], "metadata": {} }, { "cell_type": "markdown", "source": [ "## 3_C_DataCleaning " ], "metadata": {} }, { "cell_type": "markdown", "source": [ "The final dataset contains 455 records from 45 unique sample patients.\n", "\n", "The last attribute, `covid90`, is the target attribute–the attribute that we want the ML model to predict. Because the target attribute is binary, our model will be performing binary prediction (yes/no), also known as binary classification. " ], "metadata": {} }, { "cell_type": "markdown", "source": [ "### Train-Test Split" ], "metadata": {} }, { "cell_type": "code", "execution_count": null, "source": [ "# Each patient has more than 1 record\n", "df.shape[0]==df.patient_id.nunique()" ], "outputs": [], "metadata": {} }, { "cell_type": "code", "execution_count": null, "source": [ "ids = df['patient_id'].drop_duplicates().reset_index(drop=True)\n", "# Split on patient_id\n", "train_id, test_id = np.split(ids.sample(frac=1, random_state=RANDOM_STATE), [int(0.8*len(ids))])\n", "\n", "train_id.to_csv('data/train_id.csv', index=False)\n", "test_id.to_csv('data/test_id.csv', index=False)\n", "train_df = df.loc[df['patient_id'].isin(train_id), :]\n", "test_df = df.loc[df['patient_id'].isin(test_id), :]\n", "train_df.to_csv('data/train_df.csv', index=False)\n", "test_df.to_csv('data/test_df.csv', index=False)" ], "outputs": [], "metadata": {} }, { "cell_type": "markdown", "source": [ "Upload the training/testing id to S3" ], "metadata": {} }, { "cell_type": "code", "execution_count": null, "source": [ "sagemaker_session.upload_data(path='data/train_id.csv', bucket=bucket, key_prefix=prefix)\n", "sagemaker_session.upload_data(path='data/test_id.csv', bucket=bucket, key_prefix=prefix)\n", "sagemaker_session.upload_data(path='data/train_df.csv', bucket=bucket, key_prefix=prefix)\n", "sagemaker_session.upload_data(path='data/test_df.csv', bucket=bucket, key_prefix=prefix)" ], "outputs": [], "metadata": {} }, { "cell_type": "markdown", "source": [ "### Emedding\n", "Hyperparameters that we can specify during the W2V training:\n", "- `sg` - Training algorithm: 1 for skip-gram; otherwise CBOW.\n", "- `size` - Dimensionality of the word vectors.\n", "- `window` – Maximum distance between the current and predicted word within a sentence.\n", "- `min_count` – Ignores all words with total frequency lower than this.\n", "- `workers` – Use these many worker threads to train the model.\n", "- `iter` – Number of epochs over the corpus.\n", "- `alpha` - Initial learning rate." ], "metadata": {} }, { "cell_type": "code", "execution_count": null, "source": [ "import pandas as pd\n", "import gensim\n", "from gensim.models import Word2Vec\n", "import sys\n", "sys.path.append('ml')\n", "from data import EpochLogger\n", "\n", "\n", "event_list = [i.split(' ') for i in train_df['events'].values]\n", "event_list = [i for i in event_list if i!=['']]\n", "epoch_logger = EpochLogger()\n", "\n", "EMBEDDING_DIM = 100\n", "WINDOW = 100\n", "EPOCHS = 10\n", "model = Word2Vec(\n", " event_list,\n", " sg = 1,\n", " vector_size = EMBEDDING_DIM,\n", " window = WINDOW,\n", " min_count= 1,\n", " workers = 2, \n", " alpha = 0.05,\n", " epochs = EPOCHS,\n", " callbacks=[epoch_logger]\n", ")" ], "outputs": [], "metadata": {} }, { "cell_type": "code", "execution_count": null, "source": [ "model.save('data/W2V_event_dim' + str(EMBEDDING_DIM) + '_win' + str(WINDOW) + '.model')\n", "sagemaker_session.upload_data(path='data/W2V_event_dim' + str(EMBEDDING_DIM) + '_win' + str(WINDOW) + '.model', bucket=bucket, key_prefix='{}/{}'.format(prefix,'sagemaker/train'))" ], "outputs": [], "metadata": {} }, { "cell_type": "markdown", "source": [ "### PreProcessing\n", "Scale, Imputation, convert to embeddings" ], "metadata": {} }, { "cell_type": "code", "execution_count": null, "source": [ "%load_ext autoreload\n", "%autoreload 2\n", "\n", "import sys\n", "sys.path.append('ml')\n", "\n", "# Import custom lib\n", "from data import read_csv_s3, load_embedding_matrix_trained_model, EpochLogger, df2tensor, get_csv_output_from_s3\n", "from evaluation import plot_metrics, plot_test_auc\n", "from evaluation_tf import f1, METRICS, early_stopping\n", "from config import RANDOM_STATE, MAX_LENGTH, EPOCHS, BATCH_SIZE, EMBEDDING_DIM, filters, kernel_size\n", "\n", "# Import tensorflow lib\n", "import tensorflow as tf\n", "from tensorflow.keras.preprocessing.sequence import pad_sequences\n", "from tensorflow.keras.layers import Dense, Dropout, Embedding, Flatten, Conv1D, GlobalMaxPool1D, concatenate, MaxPooling1D, Lambda\n", "from tensorflow.keras.models import Sequential\n", "from tensorflow import keras\n", "from tensorflow.keras import backend as k\n", "from tensorflow.keras.optimizers import Adam\n", "\n", "# Import pandas, numpy, plot\n", "import pandas as pd\n", "import numpy as np\n", "from gensim.models import Word2Vec\n", "import matplotlib as mpl\n", "from matplotlib import pyplot as plt\n", "\n", "# Import sklearn lib\n", "from sklearn.model_selection import train_test_split\n", "from sklearn.impute import SimpleImputer\n", "from sklearn.metrics import classification_report\n", "from sklearn.pipeline import Pipeline\n", "from sklearn.preprocessing import StandardScaler\n", "from sklearn.metrics import confusion_matrix\n", "import seaborn as sns" ], "outputs": [], "metadata": {} }, { "cell_type": "code", "execution_count": null, "source": [ "embedding_matrix, idx_to_code_map, code_to_idx_map, unkown_idx = load_embedding_matrix_trained_model(\n", " model_name='data/W2V_event_dim100_win100.model', embedding_dim=EMBEDDING_DIM)\n", "\n", "labels = df[target_col]\n", "feature_static = df[['age', 'gender', 'if_married']]\n", "static_feature_names = [i for i in feature_static.columns]\n", "STATIC_F_DIM = feature_static.shape[1]\n", "\n", "indices_train = df['patient_id'].isin(train_id)\n", "indices_test = df['patient_id'].isin(test_id)\n", "train_df = df[indices_train].reset_index(drop=True)\n", "y_train = labels[indices_train].reset_index(drop=True)\n", "test_df = df[indices_test].reset_index(drop=True)\n", "y_test = labels[indices_test].reset_index(drop=True)\n", "\n", "static_train = feature_static.loc[indices_train, :]\n", "static_test = feature_static.loc[indices_test, :]\n", "\n", "# Impute median for static features\n", "pipe = Pipeline(steps=[\n", " ('impute', SimpleImputer(strategy='median')),\n", " ('scaler', StandardScaler())\n", "])\n", "pipe.fit(static_train)\n", "static_train_imputed = pipe.transform(static_train)\n", "static_test_imputed = pipe.transform(static_test)\n", "\n", "train_processed_df = pd.concat([pd.DataFrame(static_train_imputed, columns=feature_static.columns), \n", " train_df['events'], train_df[target_col]], axis=1)\n", "test_processed_df = pd.concat([pd.DataFrame(static_test_imputed, columns=feature_static.columns), \n", " test_df['events'], test_df[target_col]], axis=1)" ], "outputs": [], "metadata": {} }, { "cell_type": "code", "execution_count": null, "source": [ "df_train, df_validation = train_test_split(train_processed_df, test_size=0.2, random_state=RANDOM_STATE)\n", "df_test = test_processed_df\n", "df_train.to_csv('data/train.csv', index=False)\n", "df_train[static_feature_names].to_csv('data/train-static.csv', index=False)\n", "df_validation.to_csv('data/validation.csv', index=False)\n", "df_validation[static_feature_names].to_csv('data/valid-static.csv', index=False)\n", "df_test.to_csv('data/test.csv', index=False)\n", "df_test[static_feature_names].to_csv('data/test-static.csv', index=False)\n", "\n", "train_input = sagemaker_session.upload_data(path='data/train.csv', bucket=bucket, key_prefix='{}/{}/{}'.format(prefix, 'sagemaker','train'))\n", "train_static_input = sagemaker_session.upload_data(path='data/train-static.csv', bucket=bucket,key_prefix='{}/{}/{}'.format(prefix, 'sagemaker','train'))\n", "valid_input = sagemaker_session.upload_data(path='data/validation.csv', bucket=bucket,key_prefix='{}/{}/{}'.format(prefix, 'sagemaker','validation'))\n", "valid_static_input = sagemaker_session.upload_data(path='data/valid-static.csv', bucket=bucket,key_prefix='{}/{}/{}'.format(prefix, 'sagemaker','validation'))\n", "test_input = sagemaker_session.upload_data(path='data/test.csv', bucket=bucket, key_prefix='{}/{}/{}'.format(prefix, 'sagemaker','test'))\n", "test_static_input = sagemaker_session.upload_data(path='data/test-static.csv', bucket=bucket, key_prefix='{}/{}/{}'.format(prefix, 'sagemaker','test'))" ], "outputs": [], "metadata": {} }, { "cell_type": "markdown", "source": [ "---\n", "## 4_Train\n", "\n", "Let's take the preprocessed training data and fit a customized TensorFlow Model. Sagemaker provides prebuilt TensorFlow containers that can be used with the Python SDK. The previous Scikit-learn job preprocessed the raw input data and made it ready for machine learning models. Now, we call the `tf_model.py` script and use the transformed data to train a model.\n", "\n", "Some common parameters we need to configer here are:\n", "* `entry_point`: defines the entry point file name\n", "* `source_dir`: defines the directory where entry point file and requirements.txt file are\n", "* `role`: Role ARN\n", "* `train_instance_count`: number of training instance for the job \n", "* `train_instance_type`: type of training instance. For large dataset with deep learning, GPU enabled instances can significantly improve the training speed\n", "* `code_location`: the location of sourdir.tar.gz file that contains the scripts and configurations\n", "* `output_path`: the location of model artifact model.tar.gz\n", "* `framework_version`: the TensorFlow framework version\n", "* `py_version`: python version\n", "* `script_mode`: if you want to use script model for the training job\n", "* `base_job_name`: for SageMaker to group your training jobs with this prefix and have more organized report on SageMaker console\n", "* `hyperparameters`: hyperparameters that you want to pass into the model\n", "* `metric_definitions`: metrics that you want to emit to SageMaker console and further to CloudWatch" ], "metadata": {} }, { "cell_type": "code", "execution_count": null, "source": [ "from sagemaker.tensorflow import TensorFlow\n", "tf_estimator = TensorFlow(entry_point='tf_model.py', \n", " source_dir='ml',\n", " role=role,\n", " instance_count=1, \n", " instance_type='ml.m5.xlarge',\n", " code_location='s3://{}/{}/{}'.format(bucket, prefix, 'sagemaker/tf-model-script-mode'),\n", " output_path='s3://{}/{}/{}'.format(bucket, prefix, 'sagemaker/output'),\n", " framework_version='1.15', \n", " py_version='py3',\n", " script_mode=True,\n", " base_job_name='POP-CNN',\n", " hyperparameters={'epochs': 10,\n", " 'learning-rate': 1e-3,\n", " 'batch-size':32, \n", " 'max-length':MAX_LENGTH,\n", " 'static-f-dim': len(static_feature_names),\n", " 'target-col': target_col},\n", " metric_definitions=[{\n", " \"Name\": \"f1\",\n", " \"Regex\": \"val_f1_m: (.*?)$\"\n", " },{\n", " \"Name\": \"loss\",\n", " \"Regex\": \"val_loss: (.*?) -\"\n", " },{\n", " \"Name\": \"precision\",\n", " \"Regex\": \"val_precision: (.*?) -\"\n", " },{\n", " \"Name\": \"recall\",\n", " \"Regex\": \"val_recall: (.*?) -\"\n", " }]\n", " )\n", "tf_estimator.fit({'training': f's3://{bucket}/{prefix}/sagemaker/train',\n", " 'validation': f's3://{bucket}/{prefix}/sagemaker/validation'})" ], "outputs": [], "metadata": {} }, { "cell_type": "markdown", "source": [ "---\n", "## 5_Testing\n", "\n", "Let's prepare the test data as the data format accepted by our model, and upload it to s3 for future use. " ], "metadata": {} }, { "cell_type": "code", "execution_count": null, "source": [ "input_tensors = df2tensor(test_input, test_static_input, maxlen=MAX_LENGTH, model_name='data/W2V_event_dim100_win100.model')\n", "with open(\"data/test.jsonl\", \"w\") as text_file:\n", " text_file.write(input_tensors)\n", "test_s3 = sagemaker_session.upload_data(path='data/test.jsonl', bucket=bucket, key_prefix='{}/{}/{}'.format(prefix, 'sagemaker','test'))" ], "outputs": [], "metadata": {} }, { "cell_type": "markdown", "source": [ "---\n", "## 5_BatchTransform\n", "\n", "We leverage SageMaker batch transform to do a batched prediction. " ], "metadata": {} }, { "cell_type": "code", "execution_count": null, "source": [ "tf_transformer = tf_estimator.transformer(\n", " instance_count=1, \n", " instance_type='ml.m5.xlarge',\n", " assemble_with='Line',\n", " accept='text/csv',\n", " output_path='s3://{}/{}/{}'.format(bucket, prefix, 'sagemaker/batch-transform'), \n", " max_payload=100)\n", "\n", "# Make batch transform on test input\n", "tf_transformer.transform(test_s3, content_type='application/jsonlines')\n", "print(\"Waiting for transform job: \" + tf_transformer.latest_transform_job.job_name)\n", "tf_transformer.wait()" ], "outputs": [], "metadata": {} }, { "cell_type": "markdown", "source": [ "## 6_Evaluation\n", "\n", "Please keep in mind, the Synthea data is artificially synthesized. The modeling results are only provided as a general evaluation step, and should not be the focus here." ], "metadata": {} }, { "cell_type": "code", "execution_count": null, "source": [ "output = get_csv_output_from_s3(tf_transformer.output_path, 'test.jsonl.out')\n", "y_prob = np.array(output).squeeze()\n", "y_pred = (y_prob>0.5).astype(int)\n", "test_df = read_csv_s3(bucket, 'sagemaker/synthea/sagemaker/test', 'test.csv')\n", "y_test = test_df['covid90'].values" ], "outputs": [], "metadata": {} }, { "cell_type": "code", "execution_count": null, "source": [ "print(classification_report(y_test, y_pred))" ], "outputs": [], "metadata": {} }, { "cell_type": "code", "execution_count": null, "source": [ "plot_test_auc(y_test, y_prob)" ], "outputs": [], "metadata": {} } ], "metadata": { "celltoolbar": "Tags", "kernelspec": { "display_name": "conda_tensorflow_p36", "language": "python", "name": "conda_tensorflow_p36" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.13" }, "notice": "Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. Licensed under the Apache License, Version 2.0 (the \"License\"). You may not use this file except in compliance with the License. A copy of the License is located at http://aws.amazon.com/apache2.0/ or in the \"license\" file accompanying this file. This file is distributed on an \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License." }, "nbformat": 4, "nbformat_minor": 2 }