{ "cells": [ { "cell_type": "code", "execution_count": 259, "id": "95344a61", "metadata": {}, "outputs": [], "source": [ "## This script is used to read and preprocess clinical data (in tabular format) from S3 and store features in SageMaker FeatureStore" ] }, { "cell_type": "code", "execution_count": 260, "id": "efa5b476", "metadata": {}, "outputs": [], "source": [ "import boto3\n", "import numpy as np\n", "import pandas as pd\n", "import matplotlib.pyplot as plt\n", "import io, os\n", "from time import gmtime, strftime, sleep\n", "import time\n", "import sagemaker\n", "from sagemaker.session import Session\n", "from sagemaker import get_execution_role\n", "from sagemaker.feature_store.feature_group import FeatureGroup" ] }, { "cell_type": "markdown", "id": "d3e657f8", "metadata": {}, "source": [ "## Set up SageMaker FeatureStore" ] }, { "cell_type": "code", "execution_count": 261, "id": "da137d82", "metadata": {}, "outputs": [], "source": [ "region = boto3.Session().region_name\n", "\n", "boto_session = boto3.Session(region_name=region)\n", "sagemaker_client = boto_session.client(service_name='sagemaker', region_name=region)\n", "featurestore_runtime = boto_session.client(service_name='sagemaker-featurestore-runtime', region_name=region)\n", "\n", "feature_store_session = Session(\n", " boto_session=boto_session,\n", " sagemaker_client=sagemaker_client,\n", " sagemaker_featurestore_runtime_client=featurestore_runtime\n", ")\n", "\n", "role = get_execution_role()\n", "s3_client = boto3.client('s3', region_name=region)\n", "\n", "default_s3_bucket_name = feature_store_session.default_bucket()\n", "prefix = 'sagemaker-featurestore-demo'" ] }, { "cell_type": "markdown", "id": "78ef8a21", "metadata": {}, "source": [ "## Get data from S3" ] }, { "cell_type": "code", "execution_count": 262, "id": "04fa869d", "metadata": {}, "outputs": [], "source": [ "# Get data from S3 \n", "bucket_clin = 'nsclc-clinical-genomic-data'\n", "#bucket_clin = \n", "\n", "# Clinical data \n", "#data_key_clin = 'Clinical-data-119patients.csv'\n", "data_key_clin = 'NSCLCR01Radiogenomic_DATA_LABELS_2018-05-22_1500-shifted.csv'\n", "#data_key_clin = \n", "\n", "data_location_clin = 's3://{}/{}'.format(bucket_clin, data_key_clin)\n", "data_clinical = pd.read_csv(data_location_clin)" ] }, { "cell_type": "markdown", "id": "e80ed14b", "metadata": {}, "source": [ "## Preprocess Data" ] }, { "cell_type": "code", "execution_count": 264, "id": "d6131af6", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "(147, 89)" ] }, "execution_count": 264, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Keep samples starting with \"R01-*\" as these IDs have corresponding medical imaging data. Delete samples with Case IDs \"AMC-*\". \n", "data_clinical = data_clinical[~data_clinical[\"Case ID\"].str.contains(\"AMC\")]\n", "\n", "# Delete columns with ID and dates\n", "list_delete_cols = ['Quit Smoking Year', 'Date of Recurrence', 'Date of Last Known Alive', 'Date of Death', 'CT Date', 'PET Date']\n", "data_clinical.drop(list_delete_cols, axis=1, inplace=True)\n", "\n", "# List of features with catergorical value\n", "list_encode_cols = [\"Patient affiliation\", \"Gender\", \"Ethnicity\", \"Smoking status\", \"%GG\", \"Tumor Location (choice=RUL)\", \"Tumor Location (choice=RML)\", \"Tumor Location (choice=RLL)\", \"Tumor Location (choice=LUL)\", \"Tumor Location (choice=LLL)\", \"Tumor Location (choice=L Lingula)\", \"Tumor Location (choice=Unknown)\", \"Histology \", \"Pathological T stage\", \"Pathological N stage\", \"Pathological M stage\", \"Histopathological Grade\", \"Lymphovascular invasion\", \"Pleural invasion (elastic, visceral, or parietal)\", \"EGFR mutation status\", \"KRAS mutation status\", \"ALK translocation status\", \"Adjuvant Treatment\", \"Chemotherapy\", \"Radiation\", \"Recurrence\", \"Recurrence Location\"]\n", "\n", "# List of features with numeric value\n", "list_nonenc_cols = [\"Case ID\", \"Age at Histological Diagnosis\", \"Weight (lbs)\", \"Pack Years\", \"Time to Death (days)\", \"Days between CT and surgery\", \"Survival Status\"]\n", "\n", "# One-hot encoding of features with categorical value\n", "data_clinical_enc = pd.get_dummies(data_clinical[list_encode_cols])\n", "\n", "data_clinical_nonenc = data_clinical[list_nonenc_cols]\n", "\n", "# Combine all features\n", "data_clin = pd.concat([data_clinical_enc, data_clinical_nonenc], axis=1)\n", "\n", "# Feature names inside FeatureStore should not have special chars and should be < 64 chars long\n", "# Update feature names accordingly\n", "\n", "l_char = ['-',' ','%','/','<','>','(',')','=',',',':']\n", "\n", "for col in (data_clin.columns):\n", "\n", " if (col == \"Case ID\"):\n", " data_clin.rename(columns={col: col.replace(' ','_')}, inplace = True)\n", " continue\n", "\n", " for char in l_char:\n", " if char in col:\n", " data_clin.rename(columns={col: col.replace(char,'')}, inplace = True)\n", " col = col.replace(char,'')\n", " \n", " if (len(col)>=64):\n", " data_clin.rename(columns={col: col[:60]}, inplace = True)\n", " \n", "# Change label (survival status) \"Dead\"=1 and \"Alive\"=0 \n", "data_clin[\"SurvivalStatus\"].replace({\"Dead\": \"1\", \"Alive\": \"0\"}, inplace=True)\n", "\n", "\n", "# Drop samples with missing values. \n", "# Fill NaN with 0. For eg. PackYears for non-smokers is \"NA\". Change it to 0.\n", "data_clin = data_clin[data_clin['Weightlbs'] != \"Not Collected\"]\n", "data_clin = data_clin[data_clin['PackYears'] != \"Not Collected\"]\n", "data_clin.fillna(0)" ] }, { "cell_type": "markdown", "id": "33d68a8a", "metadata": {}, "source": [ "## Ingest data into FeatureStore" ] }, { "cell_type": "code", "execution_count": 266, "id": "a03327c5", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Patientaffiliation_Stanford\n", "Patientaffiliation_VA\n", "Gender_Female\n", "Gender_Male\n", "Ethnicity_AfricanAmerican\n", "Ethnicity_Asian\n", "Ethnicity_Caucasian\n", "Ethnicity_HispanicLatino\n", "Ethnicity_NativeHawaiianPacificIslander\n", "Smokingstatus_Current\n", "Smokingstatus_Former\n", "Smokingstatus_Nonsmoker\n", "GG_0\n", "GG_100\n", "GG_2550\n", "GG_5075\n", "GG_75100\n", "GG_025\n", "GG_NotAssessed\n", "TumorLocationchoiceRUL_Checked\n", "TumorLocationchoiceRUL_Unchecked\n", "TumorLocationchoiceRML_Checked\n", "TumorLocationchoiceRML_Unchecked\n", "TumorLocationchoiceRLL_Checked\n", "TumorLocationchoiceRLL_Unchecked\n", "TumorLocationchoiceLUL_Checked\n", "TumorLocationchoiceLUL_Unchecked\n", "TumorLocationchoiceLLL_Checked\n", "TumorLocationchoiceLLL_Unchecked\n", "TumorLocationchoiceLLingula_Checked\n", "TumorLocationchoiceLLingula_Unchecked\n", "TumorLocationchoiceUnknown_Unchecked\n", "Histology_Adenocarcinoma\n", "Histology_NSCLCNOSnototherwisespecified\n", "Histology_Squamouscellcarcinoma\n", "PathologicalTstage_T1a\n", "PathologicalTstage_T1b\n", "PathologicalTstage_T2a\n", "PathologicalTstage_T2b\n", "PathologicalTstage_T3\n", "PathologicalTstage_T4\n", "PathologicalTstage_Tis\n", "PathologicalNstage_N0\n", "PathologicalNstage_N1\n", "PathologicalNstage_N2\n", "PathologicalMstage_M0\n", "PathologicalMstage_M1a\n", "PathologicalMstage_M1b\n", "HistopathologicalGrade_G1Welldifferentiated\n", "HistopathologicalGrade_G2Moderatelydifferentiated\n", "HistopathologicalGrade_G3Poorlydifferentiated\n", "HistopathologicalGrade_OtherTypeIWelltomoderatelydifferentiated\n", "HistopathologicalGrade_OtherTypeIIModeratelytopoorlydifferen\n", "Lymphovascularinvasion_Absent\n", "Lymphovascularinvasion_NotCollected\n", "Lymphovascularinvasion_Present\n", "Pleuralinvasionelasticvisceralorparietal_No\n", "Pleuralinvasionelasticvisceralorparietal_Notcollected\n", "Pleuralinvasionelasticvisceralorparietal_Yes\n", "EGFRmutationstatus_Mutant\n", "EGFRmutationstatus_Notcollected\n", "EGFRmutationstatus_Unknown\n", "EGFRmutationstatus_Wildtype\n", "KRASmutationstatus_Mutant\n", "KRASmutationstatus_Notcollected\n", "KRASmutationstatus_Unknown\n", "KRASmutationstatus_Wildtype\n", "ALKtranslocationstatus_Notcollected\n", "ALKtranslocationstatus_Translocated\n", "ALKtranslocationstatus_Unknown\n", "ALKtranslocationstatus_Wildtype\n", "AdjuvantTreatment_No\n", "AdjuvantTreatment_Yes\n", "Chemotherapy_No\n", "Chemotherapy_Yes\n", "Radiation_No\n", "Radiation_Yes\n", "Recurrence_no\n", "Recurrence_yes\n", "RecurrenceLocation_distant\n", "RecurrenceLocation_local\n", "RecurrenceLocation_regional\n", "Case_ID\n", "AgeatHistologicalDiagnosis\n", "Weightlbs\n", "PackYears\n", "TimetoDeathdays\n", "DaysbetweenCTandsurgery\n", "SurvivalStatus\n", "Waiting for Feature Group Creation\n", "Waiting for Feature Group Creation\n", "FeatureGroup clinical-feature-group-29-00-11-19 successfully created.\n" ] }, { "data": { "text/plain": [ "IngestionManagerPandas(feature_group_name='clinical-feature-group-29-00-11-19', sagemaker_fs_runtime_client_config=, max_workers=3, max_processes=1, _async_result=, _processing_pool=, _failed_indices=[])" ] }, "execution_count": 266, "metadata": {}, "output_type": "execute_result" } ], "source": [ "clinical_feature_group_name = 'clinical-feature-group-' + strftime('%d-%H-%M-%S', gmtime())\n", "clinical_feature_group = FeatureGroup(name=clinical_feature_group_name, sagemaker_session=feature_store_session)\n", "\n", "current_time_sec = int(round(time.time()))\n", "\n", "def cast_object_to_string(data_frame):\n", " for label in data_frame.columns:\n", " print (label)\n", " if data_frame.dtypes[label] == 'object':\n", " data_frame[label] = data_frame[label].astype(\"str\").astype(\"string\")\n", "\n", "# Cast object dtype to string. SageMaker FeatureStore Python SDK will then map the string dtype to String feature type.\n", "cast_object_to_string(data_clin)\n", "\n", "# Record identifier and event time feature names\n", "record_identifier_feature_name = \"Case_ID\"\n", "event_time_feature_name = \"EventTime\"\n", "\n", "# Append EventTime feature\n", "data_clin[event_time_feature_name] = pd.Series([current_time_sec]*len(data_clin), dtype=\"float64\")\n", "\n", "## If event time generates NaN\n", "data_clin[event_time_feature_name] = data_clin[event_time_feature_name].fillna(0)\n", "\n", "# Load feature definitions to the feature group. SageMaker FeatureStore Python SDK will auto-detect the data schema based on input data.\n", "clinical_feature_group.load_feature_definitions(data_frame=data_clin); # output is suppressed\n", "\n", "\n", "def wait_for_feature_group_creation_complete(feature_group):\n", " status = feature_group.describe().get(\"FeatureGroupStatus\")\n", " while status == \"Creating\":\n", " print(\"Waiting for Feature Group Creation\")\n", " time.sleep(5)\n", " status = feature_group.describe().get(\"FeatureGroupStatus\")\n", " if status != \"Created\":\n", " raise RuntimeError(f\"Failed to create feature group {feature_group.name}\")\n", " print(f\"FeatureGroup {feature_group.name} successfully created.\")\n", "\n", "clinical_feature_group.create(\n", " s3_uri=f\"s3://{default_s3_bucket_name}/{prefix}\",\n", " record_identifier_name=record_identifier_feature_name,\n", " event_time_feature_name=event_time_feature_name,\n", " role_arn=role,\n", " enable_online_store=True\n", ")\n", "\n", "wait_for_feature_group_creation_complete(feature_group=clinical_feature_group)\n", "\n", "clinical_feature_group.ingest(\n", " data_frame=data_clin, max_workers=3, wait=True\n", ")" ] }, { "cell_type": "code", "execution_count": null, "id": "2ac04c7a", "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "conda_python3", "language": "python", "name": "conda_python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.13" } }, "nbformat": 4, "nbformat_minor": 5 }