{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Building your own algorithm container for Causal Inference\n"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"---\n",
"\n",
"This notebook's CI test result for us-west-2 is as follows. CI test results in other regions can be found at the end of the notebook. \n",
"\n",
"\n",
"\n",
"---"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"With Amazon SageMaker, you can package your own algorithms that can than be trained and deployed in the SageMaker environment. This notebook will guide you through an example that shows you how to build a Docker container for SageMaker that hosts a Causal model, and how can you use it for training, inference and interventions of the model.\n",
"\n",
"This example shows how to build a container for using the [Causal Inference library](https://causalnex.readthedocs.io/en/latest/) using as a base the following tutorial [building your own algorithm container tutorial](https://sagemaker-examples.readthedocs.io/en/latest/advanced_functionality/scikit_bring_your_own/scikit_bring_your_own.html). We are going to use Conda Python 3 kernel in this notebook.\n",
"\n",
"## Permissions\n",
"\n",
"Running this notebook requires permissions in addition to the normal `SageMakerFullAccess` permissions. This is because we will be creating new repositories on Amazon ECR. The easiest way to add these permissions is simply to add the managed policy `AmazonEC2ContainerRegistryFullAccess` to the role that you used to start your notebook instance. There's no need to restart your notebook instance when you do this, the new permissions will be available immediately."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Installing prerequisites"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"!pip install causalnex"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### The parts of the sample container\n",
"\n",
"In the `container` directory are all the components you need to package the sample algorithm for Amazon SageMaker:\n",
"\n",
" . \n",
" |-- Dockerfile\n",
" |-- build_and_push.sh\n",
" `-- causal_nex\n",
" |-- nginx.conf\n",
" |-- predictor.py\n",
" |-- serve\n",
" |-- train\n",
" `-- wsgi.py\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### The Dockerfile"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"!cat container/Dockerfile"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Building and registering the container\n",
"\n",
"The following shell code shows how to build the container image using `docker build` and push the container image to ECR using `docker push`. This code is also available as the shell script `container/build-and-push.sh`, which you can run as `build-and-push.sh causal-nex-container` to build the image `causal-nex-container`. "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%sh\n",
"\n",
"# The name of our algorithm\n",
"algorithm_name=causal-nex-container\n",
"\n",
"cd container\n",
"\n",
"chmod +x causal_nex/train\n",
"chmod +x causal_nex/serve\n",
"\n",
"account=$(aws sts get-caller-identity --query Account --output text)\n",
"\n",
"# Get the region defined in the current configuration (default to us-west-1 if none defined)\n",
"region=$(aws configure get region)\n",
"# region=${region:-eu-west-1}\n",
"\n",
"fullname=\"${account}.dkr.ecr.${region}.amazonaws.com/${algorithm_name}:latest\"\n",
"\n",
"# If the repository doesn't exist in ECR, create it.\n",
"aws ecr describe-repositories --repository-names \"${algorithm_name}\" > /dev/null 2>&1\n",
"\n",
"if [ $? -ne 0 ]\n",
"then\n",
" aws ecr create-repository --repository-name \"${algorithm_name}\" > /dev/null\n",
"fi\n",
"\n",
"# Get the login command from ECR and execute it directly\n",
"$(aws ecr get-login --region $region --registry-ids $account --no-include-email)\n",
"\n",
"# Build the docker image locally with the image name and then push it to ECR\n",
"# with the full name.\n",
"\n",
"docker build -t ${algorithm_name} .\n",
"docker tag ${algorithm_name} ${fullname}\n",
"\n",
"docker push ${fullname}"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Using CausalNex in Amazon SageMaker\n",
"\n",
"Once you have your container packaged, you can use it to train models and use the model for hosting or batch transforms. Let's do that with the algorithm we made above. However, we have an additional bit, we can do model interventions as well, a common feature of Causal models.\n",
"\n",
"## Set up the environment and create the session\n",
"\n",
"Here we specify a bucket to use and the role that will be used for working with SageMaker. The session remembers our connection parameters to SageMaker."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import boto3\n",
"import re\n",
"\n",
"import os\n",
"import numpy as np\n",
"import pandas as pd\n",
"from sagemaker import get_execution_role\n",
"\n",
"import sagemaker as sage\n",
"from time import gmtime, strftime\n",
"\n",
"role = get_execution_role()\n",
"sess = sage.Session()\n",
"region = boto3.Session().region_name\n",
"s3_client = boto3.client(\"s3\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Upload the data for training\n",
"\n",
"When training large models with huge amounts of data, you'll typically use big data tools, like Amazon Athena, AWS Glue, or Amazon EMR, to create your data in S3. For the purposes of this example, we are using a heart failure dataset of 299 patients. \n",
"\n",
"Davide Chicco, Giuseppe Jurman: \u201cMachine learning can predict survival of patients with heart failure from serum creatinine and ejection fraction alone\u201d. BMC Medical Informatics and Decision Making 20, 16 (2020).\n",
"[Web Link](\n",
"https://bmcmedinformdecismak.biomedcentral.com/articles/10.1186/s12911-020-1023-5)\n",
"\n",
"Let's download it from the public bucket and then upload it to our default sagemaker bucket:\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"! mkdir data\n",
"\n",
"# S3 bucket where the training data is located.\n",
"data_bucket = f\"sagemaker-sample-files\"\n",
"data_prefix = \"datasets/tabular/uci_heart_failure/\"\n",
"data_bucket_path = f\"s3://{data_bucket}\"\n",
"\n",
"# S3 prefix for saving code and model artifacts.\n",
"prefix = \"DEMO-causal-nex\"\n",
"WORK_DIRECTORY = \"data/\"\n",
"\n",
"s3_client.download_file(\n",
" data_bucket,\n",
" data_prefix + \"heart_failure_clinical_records_dataset.csv\",\n",
" WORK_DIRECTORY + \"heart_failure_clinical_records_dataset.csv\",\n",
")\n",
"data_location = sess.upload_data(WORK_DIRECTORY, key_prefix=prefix)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Intro to causal modeling\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"[Causal models](https://plato.stanford.edu/entries/causal-models/) are mathematical models representing [causal relationships](https://towardsdatascience.com/introduction-to-causality-in-machine-learning-4cee9467f06f). They facilitate inferences about causal relationships from statistical data. They can teach us a good deal about the causation, and about the relationship between causation and probability. We will walk through how to modify the training script which is located in container/causal_nex/train. Let's take a look in details:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"!pygmentize -g container/causal_nex/train"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"For demo purposes, we use a [dataset](\n",
"https://bmcmedinformdecismak.biomedcentral.com/articles/10.1186/s12911-020-1023-5) of 299 patients with heart failure collected in 2015 that contains thirteen clinical features:\n",
"\n",
"- age: age of the patient (years)\n",
"- anemia: decrease of red blood cells or hemoglobin (boolean)\n",
"- high blood pressure: if the patient has hypertension (boolean)\n",
"- creatinine phosphokinase (CPK): level of the CPK enzyme in the blood (mcg/L)\n",
"- diabetes: if the patient has diabetes (boolean)\n",
"- ejection fraction: percentage of blood leaving the heart at each contraction (percentage)\n",
"- platelets: platelets in the blood (kilo platelets/mL)\n",
"- sex: woman or man (binary)\n",
"- serum creatinine: level of serum creatinine in the blood (mg/dL)\n",
"- serum sodium: level of serum sodium in the blood (mEq/L)\n",
"- smoking: if the patient smokes or not (boolean)\n",
"- time: follow-up period (days)\n",
"- [target] death event: if the patient deceased during the follow-up period (boolean) "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
" In this [paper](https://bmcmedinformdecismak.biomedcentral.com/articles/10.1186/s12911-020-1023-5/tables/6), the authors define two most important features - `serum creatinine` and `ejection fraction`.\n",
" \n",
" Bayesian Networks in [CausalNex](https://causalnex.readthedocs.io/en/0.4.2/03_tutorial/03_tutorial.html) support only discrete distributions. Any continuous features, or features with a large number of categories, should be discretized prior to fitting the Bayesian Network. Models containing variables with many possible values will typically be badly fit, and exhibit poor performance.\n",
" \n",
" As a first step, let's do the data discretization. CausalNex provides a helper class causalnex.discretiser.Discretiser, which supports several discretization methods. For our data the fixed method will be applied, providing static values that define the bucket boundaries. For example, the splitting can be done by using statistical quartiles (a type of quantile which divides the number of data points into four parts, or quarters, of more-or-less equal size), or by using [statistical quantitative description of the numeric features](https://bmcmedinformdecismak.biomedcentral.com/articles/10.1186/s12911-020-1023-5/tables/3). Therefore, ejection_fraction will be discretized into the buckets < 30, from 31 till 38, from 39 till 42, and >=42. Each bucket will be labelled as an integer from zero.\n",
" "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from causalnex.discretiser import Discretiser\n",
"import pandas as pd\n",
"\n",
"initial_df = pd.read_csv(WORK_DIRECTORY + \"/heart_failure_clinical_records_dataset.csv\")\n",
"\n",
"initial_df[\"age\"] = Discretiser(method=\"fixed\", numeric_split_points=[60]).transform(\n",
" initial_df[\"age\"].values\n",
")\n",
"initial_df[\"serum_sodium\"] = Discretiser(method=\"fixed\", numeric_split_points=[136]).transform(\n",
" initial_df[\"serum_sodium\"].values\n",
")\n",
"initial_df[\"serum_creatinine\"] = Discretiser(\n",
" method=\"fixed\", numeric_split_points=[1.1, 1.4]\n",
").transform(initial_df[\"serum_sodium\"].values)\n",
"\n",
"initial_df[\"ejection_fraction\"] = Discretiser(\n",
" method=\"fixed\", numeric_split_points=[30, 38, 42]\n",
").transform(initial_df[\"ejection_fraction\"].values)\n",
"\n",
"initial_df[\"creatinine_phosphokinase\"] = Discretiser(\n",
" method=\"fixed\", numeric_split_points=[120, 540, 670]\n",
").transform(initial_df[\"creatinine_phosphokinase\"].values)\n",
"\n",
"initial_df[\"platelets\"] = Discretiser(method=\"fixed\", numeric_split_points=[263358]).transform(\n",
" initial_df[\"platelets\"].values\n",
")\n",
"initial_df.head()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We can manually define a structure model(SM) (or it can be created by a domain expert) by specifying the relationships between different features. Defining appropriate SM is a key to a causal analysis. For example, in our case a relationship that is defined (\"ejection_fraction\", \"DEATH_EVENT\") can be translated as \"ejection_fraction\" node causes \"DEATH_EVENT\"."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import networkx as nx\n",
"\n",
"causal_graph = nx.DiGraph(\n",
" [\n",
" (\"ejection_fraction\", \"DEATH_EVENT\"),\n",
" (\"creatinine_phosphokinase\", \"DEATH_EVENT\"),\n",
" (\"age\", \"DEATH_EVENT\"),\n",
" (\"serum_sodium\", \"DEATH_EVENT\"),\n",
" (\"high_blood_pressure\", \"DEATH_EVENT\"),\n",
" (\"anaemia\", \"DEATH_EVENT\"),\n",
" (\"creatinine_phosphokinase\", \"DEATH_EVENT\"),\n",
" (\"smoking\", \"DEATH_EVENT\"),\n",
" ]\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We can visualize the statistical dependencies between these variables using a graph:\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import matplotlib.pyplot as plt\n",
"\n",
"nx.draw_networkx(causal_graph, arrows=True)\n",
"plt.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Create an estimator and fit the model\n",
"\n",
"In order to use SageMaker to fit our algorithm, we'll create an `Estimator` that defines how to use the container to train. This includes the configuration we need to invoke SageMaker training:\n",
"\n",
"* The __container name__. This is constructed as in the shell commands [above](#building_container).\n",
"* The __role__. As defined [above](#set_up_env).\n",
"* The __instance count__ which is the number of machines to use for training.\n",
"* The __instance type__ which is the type of machine to use for training.\n",
"* The __output path__ determines where the model artifact will be written.\n",
"* The __session__ is the SageMaker session object that we defined above.\n",
"\n",
"Then we use fit() on the estimator to train against the data that we uploaded above."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"account = sess.boto_session.client(\"sts\").get_caller_identity()[\"Account\"]\n",
"region = sess.boto_session.region_name\n",
"image = \"{}.dkr.ecr.{}.amazonaws.com/causal-nex-container:latest\".format(account, region)\n",
"\n",
"bn = sage.estimator.Estimator(\n",
" image_uri=image,\n",
" role=role,\n",
" instance_count=1,\n",
" instance_type=\"ml.c4.2xlarge\",\n",
" output_path=\"s3://{}/output\".format(sess.default_bucket()),\n",
" sagemaker_session=sess,\n",
")\n",
"\n",
"bn.fit(data_location)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Deploy the model\n",
"\n",
"Deploying the model to SageMaker hosting just requires a `deploy` call on the fitted model. This call takes an instance count, instance type, and optionally serializer and deserializer functions. These are used when the resulting predictor is created on the endpoint."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from sagemaker.predictor import csv_serializer\n",
"from datetime import datetime\n",
"\n",
"# to create unique endpoint\n",
"now = datetime.now()\n",
"dt_string = now.strftime(\"-%d-%m-%Y-%H-%M-%S\")\n",
"\n",
"endpoint_name = \"causal-endpoint\" + dt_string\n",
"predictor = bn.deploy(1, \"ml.m5d.xlarge\", endpoint_name=endpoint_name)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Likelihood Estimation and Predictions\n",
"\n",
"When the graph has been determined and parameter estimation for [Conditional probability distribution](https://en.wikipedia.org/wiki/Conditional_probability_distribution) (using Maximum likelihood or Bayesian parameter, in our case its done by calling .fit()) was performed, they can be used to predict the node states and corresponding probabilities. Conditional probabilities calculate the chance that a specific value for a random variable will occur given that another random variable has already taken a value. For more details on how to use CausalNex you can refer to [this article](https://causalnex.readthedocs.io/en/latest/04_user_guide/04_user_guide.html).\n",
"\n",
"### Choose some data and use it for a prediction\n",
"\n",
"In order to do some predictions, we'll create a dictionary with 2 test cases - feel free to add more! The only variable that we change is age to see how that impacts the survival outcomes (remember that we discretized variables before)."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import boto3\n",
"import json\n",
"\n",
"client = boto3.client(\"sagemaker-runtime\")\n",
"\n",
"test_cases = [\n",
" {\n",
" \"age\": 1,\n",
" \"anaemia\": 0,\n",
" \"creatinine_phosphokinase\": 2,\n",
" \"diabetes\": 0,\n",
" \"ejection_fraction\": 0,\n",
" \"high_blood_pressure\": 1,\n",
" \"platelets\": 1,\n",
" \"serum_creatinine\": 0,\n",
" \"serum_sodium\": 0,\n",
" \"sex\": 1,\n",
" \"smoking\": 0,\n",
" \"time\": 4,\n",
" },\n",
" {\n",
" \"age\": 0,\n",
" \"anaemia\": 0,\n",
" \"creatinine_phosphokinase\": 2,\n",
" \"diabetes\": 0,\n",
" \"ejection_fraction\": 0,\n",
" \"high_blood_pressure\": 1,\n",
" \"platelets\": 1,\n",
" \"serum_creatinine\": 0,\n",
" \"serum_sodium\": 0,\n",
" \"sex\": 1,\n",
" \"smoking\": 1,\n",
" \"time\": 4,\n",
" },\n",
"]\n",
"\n",
"target_node = \"DEATH_EVENT\"\n",
"payload = json.dumps({\"data\": test_cases, \"pred_type\": \"prediction\", \"target_node\": target_node})\n",
"\n",
"# invoke endpoint\n",
"response = client.invoke_endpoint(\n",
" EndpointName=endpoint_name, ContentType=\"application/json\", Body=payload\n",
")\n",
"\n",
"# decode the endpoint response\n",
"response_body = response[\"Body\"]\n",
"response_str = response_body.read().decode(\"utf-8\")\n",
"\n",
"# print the prediction state\n",
"print(\"DEATH_EVENT Predictions:\")\n",
"print(response_str)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Making interventions\n",
"To explore the effect of actions on the target variable, and examine the effect of such intervention, Do-Calculus on the Bayesian Network can be used. One of the goals of causal analysis is not only to understand exactly what causes a specific effect, but rather to be able to intervene in the process and control what the outcome is and to answer questions of the form: \n",
"- Does treatment X help to cure the disease?\n",
"- What happens if we change the type Y?\n",
"\n",
"Actually doing the intervention might be unfeasible or unethical \u2014 side-stepping actual interventions and still getting at causal effects is the whole point of this approach to [causal inference](https://fabiandablander.com/r/Causal-Inference.html).\n",
"To read more about interventions [go here](https://medium.data4sci.com/causal-inference-part-ix-interventions-c3f94190191d) or [here](https://www.cmu.edu/dietrich/philosophy/docs/scheines/PSA2006.pdf). For the deeper understanding we recommend reading \"Book of Why\" by Judea Pearl.\n",
"\n",
"Some example questions that can be answered with Causal Analysis are:\n",
"- Does the treatment X helps to cure the disease?\n",
"- What happens if we change type of detail Y in the production line?\n",
"- What is an effect of new route on the revenue of item Z?\n",
"\n",
"Let's examine the effect of intervention on the ejection_fraction node by changing its states."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import boto3\n",
"\n",
"client = boto3.client(\"sagemaker-runtime\")\n",
"node = \"ejection_fraction\"\n",
"target_node = \"DEATH_EVENT\"\n",
"# pass the json file for intervention with node on which the intervention will be performed, states and target node\n",
"payload = json.dumps(\n",
" {\n",
" \"data\": {\n",
" \"node\": node,\n",
" \"states\": [\n",
" {0: 1.0, 1: 0.0, 2: 0.0, 3: 0.0},\n",
" {0: 0.0, 1: 1.0, 2: 0.0, 3: 0.0},\n",
" {0: 0.0, 1: 0.0, 2: 1.0, 3: 0.0},\n",
" {0: 0.0, 1: 0.0, 2: 0.0, 3: 1.0},\n",
" ],\n",
" \"target_node\": target_node,\n",
" },\n",
" \"pred_type\": \"intervention\",\n",
" }\n",
")\n",
"\n",
"\n",
"response = client.invoke_endpoint(\n",
" EndpointName=endpoint_name, ContentType=\"application/json\", Body=payload\n",
")\n",
"# decode output\n",
"response_body = response[\"Body\"]\n",
"response_str = response_body.read().decode(\"utf-8\")\n",
"\n",
"\n",
"# show output\n",
"print(target_node, \"prediction with intervention/s on\", node, \"node:\\n\", response_str)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We used discretizer to have ejection_fraction values binned into 4 buckets < 30, from 31 till 38, from 39 till 42, and >=42. Each bucket is labelled as an integer from zero. Therefore, \"states\": [{0: 1.0, 1: 0.0, 2: 0.0, 3: 0.0}] means that we want to examine if the target_node \"DEATH_EVENT\" will be changed if we set(intervene) node ejection_fraction to be <30. "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Optional cleanup\n",
"When you're done with the endpoint, you'll want to clean it up."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"predictor.delete_model()\n",
"predictor.delete_endpoint()"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"## Notebook CI Test Results\n",
"\n",
"This notebook was tested in multiple regions. The test results are as follows, except for us-west-2 which is shown at the top of the notebook.\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "conda_python3",
"language": "python",
"name": "conda_python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.13"
}
},
"nbformat": 4,
"nbformat_minor": 4
}