# Snowflake as Data Source for training an ML Model with Amazon Sagemaker
**_Use of Snowflake Data Table as Data Source for training a Sagemaker Model without having Snowflake Data to stage on S3_**

This notebook works well with the `conda_python3` kernel on a SageMaker Notebook `ml.t3.xlarge` instance.

---
---

## Contents

1. [Objective](#Objective)
1. [Background](#Background-(Problem-Description-and-Approach))
1. [Create a Training Script](#Create-a-training-script)
1. [Define `Model` Hyperparameters](#Define-Model-Hyperparameters)
1. [Launch a training job with Python SDK](#Launch-a-training-job-with-Python-SDK)
1. [Conclusion](#Conclusion)

---

## Objective

This notebook illustrates how to retrieve data stored in a [Snowflake](https://www.snowflake.com/) table and use it for training an ML model using Amazon SageMaker _without having to first store the data in S3_. 

This example uses the [California Housing dataset (provided by Scikit-Learn)](https://scikit-learn.org/stable/modules/generated/sklearn.datasets.fetch_california_housing.html) and trains a XGBoost model to predict house prices. A detailed description about the dataset can be found [here](https://inria.github.io/scikit-learn-mooc/python_scripts/datasets_california_housing.html).

To understand the code, you might also find it useful to refer to:

- *The guide on [Use XGBoost with the SageMaker Python SDK](https://sagemaker.readthedocs.io/en/stable/frameworks/xgboost/using_xgboost.html#)*
- *Docker Registry Paths [Docker Registry Paths and Example Code](https://docs.aws.amazon.com/sagemaker/latest/dg/sagemaker-algo-docker-registry-paths.html)*
- *The [SageMaker reference for Boto3](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html#client) (The general AWS SDK for Python, including low-level bindings for SageMaker as well as many other AWS services)*
---

## Background (Problem Description and Approach)

- **Problem statement**: SageMaker requires the training data to be present either in [S3 or in EFS or in FSX for Lustre](https://aws.amazon.com/blogs/machine-learning/choose-the-best-data-source-for-your-amazon-sagemaker-training-job/). In order to train a model using data stored outside of the three supported storage services, the data first needs to be ingested into one of these services (typically S3). This requires building a data pipeline (using tools such as [Amazon SageMaker Data Wrangler](https://aws.amazon.com/sagemaker/data-wrangler/)) to move data into S3. However, this may create a data management challenge in some situations (data lifecycle management, access control etc.) and it may be desirable to have the data accessible to SageMaker _without_ the intermediate storage of data into S3. This notebook illustrates a way to do this using Snowflake as a 3rd party data source.

- **Our approach**: Launch a SageMaker Training Job using the SageMaker SDK with a custom training script and have the training script download the data from Snowflake directly into the instance created for running the training job thus avoiding the temporary storage of data in S3. Use the [Snowflake Connector for Python](https://docs.snowflake.com/en/user-guide/python-connector.html) to connect and download the data from the Snowflake. Once the data is downloaded in the training instance then the training script proceeds to train an ML model using the scikit-learn SDK. **Note that it is assumed that the data is already available in Snowflake, see [`snowflake instructions`](./snowflake-instructions.md) for instructions on creating a database in Snowflake and ingesting the California Housing dataset as a table.**

- **Our tools**: [Amazon SageMaker SDK](https://sagemaker.readthedocs.io/en/stable/), [Snowflake Connector for Python](https://docs.snowflake.com/en/user-guide/python-connector.html) and [SageMaker XGBoost Estimator](https://sagemaker.readthedocs.io/en/stable/frameworks/xgboost/using_xgboost.html#).


---

## Overall Workflow

The overall workflow for this notebook is shown in the diagram below.

![sagemaker snowflake](img/sm-snowflake-1p.png)

Steps 1 and 2 are executed outside of this notebook. 

1. See [`snowflake instructions`](./snowflake-instructions.md) for instructions on creating a database in Snowflake and ingesting the California Housing dataset as a table.
1. See [`secrets manager instructions`](./secretsmanager-instructions.md) for instructions on storing Snowflake credentials.

The following sections in this notebook describe the rest of the steps i.e. downloading data from Snowflake and training the XGBoost model.

In [None]:
"""
Create directories for code files (training script and others)
"""
!mkdir src

### Script for downloading credentials

This script is written to a file locally in a directory called `src`. The `src` directory name is provided as the `source_dir` parameter to the `Estimator`, all files written to the `src` directory are made available to the training container on startup.

In [None]:
%%writefile src/snowflake_credentials.py

"""
Retrieve Snowflake password for given username from AWS SecretsManager
"""
import json
import boto3

def get_credentials(secret_id: str, region_name: str) -> str:
    
    client = boto3.client('secretsmanager', region_name=region_name)
    response = client.get_secret_value(SecretId=secret_id)
    secrets_value = json.loads(response['SecretString'])    
    
    return secrets_value


## `requirements.txt` for packages our training script will need

We will create a `requirements.txt` file and list all the dependencies in this file. The SageMaker Training job will install these dependencies on startup.

In [None]:
%%writefile src/requirements.txt

boto3==1.26.44
sagemaker==2.127.0
snowflake-connector-python==2.9.0

---

## Train an XGBoost Regressor using SageMaker Training Jobs

Now we are ready to train our ML model using SageMaker Training Jobs. We do this in the following steps:

1. Create separate Python scripts for connecting to Snowflake, querying (downloading) the data, preparing the data for ML and finally a training scripts which ties everything together.

1. Provide the training script to the SageMaker SDK [Estimator](https://sagemaker.readthedocs.io/en/stable/api/training/estimators.html) along with the source directory so that all the scripts we create can be provided to the training container when the training job is run using the [Estimator.fit](https://sagemaker.readthedocs.io/en/stable/api/training/estimators.html#sagemaker.estimator.EstimatorBase.fit) method.

**NOTE: the data from Snowflake is downloaded directly into the training container instance and at no point is it stored in S3.**

### Create a Snowflake connection Script

Use the [Snowflake Connector for Python](https://docs.snowflake.com/en/user-guide/python-connector.html) to connect and download the data from the Snowflake.

In [None]:
%%writefile src/connection.py

"""
Establish connection with Snowflake table - HOUSING
"""

import snowflake.connector
import snowflake_credentials
from snowflake_credentials import get_credentials

def connect(secret_id: str, account: str, warehouse: str, database: str, schema: str, protocol: str, region: str) -> snowflake.connector.SnowflakeConnection:
    
    secret_value = get_credentials(secret_id, region)
    sf_user = secret_value['username']
    sf_password = secret_value['password']
    sf_account = account
    sf_warehouse = warehouse
    sf_database = database
    sf_schema = schema
    sf_protocol = protocol
    
    print(f"sf_user={sf_user}, sf_password=****, sf_account={sf_account}, sf_warehouse={sf_warehouse}, "
          f"sf_database={sf_database}, sf_schema={sf_schema}, sf_protocol={sf_protocol}")    
    
    # Read to connect to snowflake
    ctx = snowflake.connector.connect(user=sf_user,
                                      password=sf_password,
                                      account=sf_account,
                                      warehouse=sf_warehouse,
                                      database=sf_database,
                                      schema=sf_schema,
                                      protocol=sf_protocol)
    
    # Once the connection is established we read the dataset (table)
    # into a dataframe
    cs=ctx.cursor()
    print("\nSnowflake connection established...")
    
    return ctx

### Create a Snowflake Querying Script

To query data records from Snowflake database table, and store it in a dataframe.

**For distributed data parallel training we download a random subset of data into each training instance. Each training instance downloads an equal amount of data which is simply `total number of rows/ total number of training hosts`**


In [None]:
%%writefile src/query_snowflake.py

"""
Read the HOUSING table (this is the california housing dataset used by this example)
"""
import pandas as pd
import snowflake.connector

def data_pull(ctx: snowflake.connector.SnowflakeConnection, table: str, hosts: int) -> pd.DataFrame:
    
    # Query Snowflake HOUSING table for number of table records
    sql_cnt = f"select count(*) from {table};"
    df_cnt = pd.read_sql(sql_cnt, ctx)

    # Retrieve the total number of table records from dataframe
    for index, row in df_cnt.iterrows():
        num_of_records = row.astype(int)
        list_num_of_rec = num_of_records.tolist()
    tot_num_records = list_num_of_rec[0]

    record_percent = str(round(100/hosts))
    print(f"going to download a random {record_percent}% sample of the data")
    # Query Snowflake HOUSING table
    sql = f"select * from {table} sample ({record_percent});"
    print(f"sql={sql}")
    
    # Get the dataset into Pandas
    df = pd.read_sql(sql, ctx)
    print(f"read data into a dataframe of shape {df.shape}")
    # Prepare the data for ML
    df.dropna(inplace=True)

    print(f"final shape of dataframe to be used for training {df.shape}")
    return df

### Create a Data Preparation Script

The input dataframe is split into training and test datasets using [SKlearn's train_test_split function](https://scikit-learn.org/stable/modules/generated/sklearn.model_selection.train_test_split.html) The split datasets are then converted to [XGB DMatrices](https://xgboost.readthedocs.io/en/stable/python/python_api.html).

In [None]:
%%writefile src/data_preparation.py

"""
Preparation of training and test datasets for XGBoost Model
"""
import pandas as pd
import xgboost as xgb
from sklearn.model_selection import train_test_split

def prepare_data(df: pd.DataFrame) -> tuple:
    
    # preparation of training and test datasets
    X = df.drop(['MEDHOUSEVAL'], axis=1)  
    y = df['MEDHOUSEVAL']
    
    # Train-test split
    X_train, X_test, y_train, y_test =  train_test_split(X, y, test_size=0.25, random_state=42)
    
    print("building training and testing datasets")
    features = X.select_dtypes('number').columns.tolist()
    print(f"features={features}")
    X_train = X_train[features]
    X_test = X_test[features]

    # -- MODEL TRAINING --    
    print("going to train the model")
    
    # Converting input datasets to XGB DMatrices for XGBoost Model Training
    dtrain = xgb.DMatrix(X_train.values, y_train.values)
    dval = xgb.DMatrix(X_test.values, y_test.values)
    watchlist = [(dtrain, 'train'), (dval, 'validation')] if dval is not None else [(dtrain, 'train')]
    
    return dtrain, dval, watchlist

### Create a Training Script

The [SageMaker Scikit-Learn Framework Container](https://docs.aws.amazon.com/sagemaker/latest/dg/pre-built-docker-containers-scikit-learn-spark.html) provides the basic runtime, and we provide a custom [`training script`](./src/train.py) that contains the actual ML training code. The folder containing the training script also contains a [`requirements.txt`](./src/requirements.txt) file to specify any additional dependencies that need to be installed in the training instance.

You can find detailed guidance in the documentation on [Preparing a Scikit-Learn training script](https://sagemaker.readthedocs.io/en/stable/frameworks/sklearn/using_sklearn.html#prepare-a-scikit-learn-training-script) (for training).


In [None]:
%%writefile src/train.py

"""
Train a Sagemaker XGBoost Model on the california housing dataset
"""

# Import dependencies
import os
import json
import logging
import argparse
import pickle as pkl
import xgboost as xgb
from connection import connect
from query_snowflake import data_pull
from data_preparation import prepare_data
from sagemaker_containers import entry_point
from sagemaker_xgboost_container import distributed
from sagemaker_xgboost_container.data_utils import get_dmatrix

def _xgb_train(params: dict, dtrain: xgb.DMatrix, evals: list, num_boost_round: int, model_dir: str, is_master: bool) -> None:
    """Run xgb train on arguments given with rabit initialized.

    This is our rabit execution function.

    :param args_dict: Argument dictionary used to run xgb.train().
    :param is_master: True if current node is master host in distributed training,
                        or is running single node training job.
                        Note that rabit_run includes this argument.
    """
    booster = xgb.train(params=params,
                        dtrain=dtrain,
                        evals=evals,
                        num_boost_round=num_boost_round)

    if is_master:
        model_location = model_dir + '/xgboost-model'
        pkl.dump(booster, open(model_location, 'wb'))
        logging.info("Stored trained model at {}".format(model_location))

    
if __name__ == "__main__":
    
    parser = argparse.ArgumentParser()

    # Hyperparameters are described here.
    parser.add_argument('--max_depth', type=int,)
    parser.add_argument('--eta', type=float)
    parser.add_argument('--gamma', type=int)
    parser.add_argument('--min_child_weight', type=int)
    parser.add_argument('--subsample', type=float)
    parser.add_argument('--verbosity', type=int)
    parser.add_argument('--objective', type=str)
    parser.add_argument('--num_round', type=int)
    parser.add_argument('--tree_method', type=str, default="auto")
    parser.add_argument('--predictor', type=str, default="auto")

    # Sagemaker specific arguments. Defaults are set in the environment variables.
    parser.add_argument('--output_data_dir', type=str, default=os.environ.get('SM_OUTPUT_DATA_DIR'))
    parser.add_argument('--model_dir', type=str, default=os.environ.get('SM_MODEL_DIR'))
    parser.add_argument('--train', type=str, default=os.environ.get('SM_CHANNEL_TRAIN'))
    parser.add_argument('--validation', type=str, default=os.environ.get('SM_CHANNEL_VALIDATION'))
    parser.add_argument('--sm_hosts', type=str, default=os.environ.get('SM_HOSTS'))
    parser.add_argument('--sm_current_host', type=str, default=os.environ.get('SM_CURRENT_HOST'))

    args, _ = parser.parse_known_args()

    # Get SageMaker host information from runtime environment variables
    sm_hosts = json.loads(args.sm_hosts)
    sm_current_host = args.sm_current_host   
    
    # snowflake related params are read from environment variables
    secret_id = os.environ["SECRET_ID"]
    account = os.environ["SF_ACCOUNT"]
    warehouse = os.environ["SF_WAREHOUSE"]
    database = os.environ["SF_DATABASE"].upper()
    schema = os.environ["SF_SCHEMA"].upper()
    table = os.environ['SF_TABLE'].upper()
    region = os.environ["AWS_REGION"]
    
    protocol = "https"
    
    # Connect to Snowflake database table 
    ctx = connect(secret_id, account, warehouse, database, schema, protocol, region)
    
    # Query data from Snowflake database table
    # IMPORTANT: DATA FROM SNOWFLAKE GOES DIRECTLY INTO PANDA DF. THE DATA DOES __NOT__ GET STAGED IN AN S3 BUCKET. 
    df = data_pull(ctx, table, len(sm_hosts))
    
    # Preparation of training and test datasets
    dtrain, dval, watchlist = prepare_data(df)
    
    # Define training hyperparameters
    train_hp = {
        'max_depth': args.max_depth,
        'eta': args.eta,
        'gamma': args.gamma,
        'min_child_weight': args.min_child_weight,
        'subsample': args.subsample,
        'verbosity': args.verbosity,
        'objective': args.objective,
        'tree_method': args.tree_method,
        'predictor': args.predictor,
    }
    
    xgb_train_args = dict(
        params=train_hp,
        dtrain=dtrain,
        evals=watchlist,
        num_boost_round=args.num_round,
        model_dir=args.model_dir)

    if len(sm_hosts) > 1:
        # Wait until all hosts are able to find each other
        entry_point._wait_hostname_resolution()

        # Execute training function after initializing rabit.
        distributed.rabit_run(
            exec_fun=_xgb_train,
            args=xgb_train_args,
            include_in_training=(dtrain is not None),
            hosts=sm_hosts,
            current_host=sm_current_host,
            update_rabit_args=True
        )
    else:
        # If single node training, call training method directly.
        if dtrain:
            xgb_train_args['is_master'] = True
            _xgb_train(**xgb_train_args)
        else:
            raise ValueError("Training channel must have data to train model.")


def model_fn(model_dir: str):
    """Deserialize and return fitted model.

    Note that this should have the same name as the serialized model in the _xgb_train method
    """
    model_file = 'xgboost-model'
    booster = pkl.load(open(os.path.join(model_dir, model_file), 'rb'))
    return booster

#### Provide the Snowflake username and connection details as environment variables to the Training container

Retrieve the Snowflake credentials from AWS Secrets Mananger. This is done by the training code and we just need to provide the secrets identifier as an environment variable.

We also need to retrieve your account identifier for Snowflake which we already obtained when we run the previous notebook (snowflake-load-dataset.ipynb).

In [None]:
%store -r sf_account_id 
%store -r sf_secret_id 
print(f"sf_account_id={sf_account_id}, sf_secret_id={sf_secret_id}")

In [None]:
import boto3

# do not change!!!
# the values of these variables match what we put in the snowflake-load-dataset.ipynb file
warehouse = "amazon_sagemake_w_snowflake_as_datasource"
database = "housing"
schema = "housing_schema"
table = "california_housing"
session = boto3.session.Session()
region = session.region_name
print(f"region={region}")

env = {"SECRET_ID": sf_secret_id, 
       "SF_ACCOUNT": sf_account_id,
       "SF_WAREHOUSE": warehouse,
       "SF_DATABASE": database,
       "SF_SCHEMA": schema,
       "SF_TABLE": table,
       "AWS_REGION": region}

### Define Model Hyperparameters

In [None]:
hyperparams = {
    "max_depth": "5",
    "eta": "0.2",
    "gamma": "4",
    "min_child_weight": "6",
    "subsample": "0.7",
    "objective": "reg:squarederror",
    "num_round": "50",
    "verbosity": "2",
}

### Distributed data parallel training

For distributed data parallel training we set the `instance_count > 1` and provide an qual amount of random subset of the data to each training instance.

In [None]:
instance_type = "ml.m5.2xlarge"
instance_count = 2

### Launch a training job

With the data uploaded and script prepared, we are ready to configure SageMaker training jobs.

In [None]:
import boto3
import sagemaker
from sagemaker import image_uris
from sagemaker import get_execution_role
from sagemaker.inputs import TrainingInput
from sagemaker.xgboost.estimator import XGBoost

role = get_execution_role()
sm_session = sagemaker.Session()
bucket = None #optionally specify your bucket here, eg: 'mybucket-us-east-1'; Otherwise, SageMaker will use 
              #the default acct bucket to upload model artifacts
if bucket is None and sm_session is not None:
    bucket = sm_session.default_bucket()
print(f"bucket={bucket}, role={role}")
prefix = "sagemaker/sagemaker-snowflake-example"
output_path = "s3://{}/{}/{}/output".format(bucket, prefix, "housing-dist-xgb")
custom_img_name = "xgboost-ddp-training-custom"
custom_img_tag = "latest"
account_id = boto3.client('sts').get_caller_identity().get('Account')

# collect default subnet IDs to deploy Sagemaker training job into
ec2_session = boto3.Session(region_name=region)
ec2_resource = ec2_session.resource("ec2")
subnet_ids = []
for vpc in ec2_resource.vpcs.all():
    # here you can choose which subnet based on the id
    if vpc.is_default == True:
        for subnet in vpc.subnets.all():
            if subnet.default_for_az == True:
                subnet_ids.append(subnet.id)

# Retrieve XGBoost custom container from ECR registry path  
xgb_image_uri = image_uris.retrieve('xgboost', region, version='1.5-1')
print(f"\nusing XGBoost image: {xgb_image_uri}")

# Create Sagemaker Estimator
xgb_script_mode_estimator = sagemaker.estimator.Estimator(
    image_uri = xgb_image_uri,
    role=role,
    instance_count=instance_count,
    instance_type=instance_type,
    output_path="s3://{}/{}/output".format(bucket, prefix),
    sagemaker_session=sm_session,
    entry_point="train.py",
    source_dir="./src",
    hyperparameters=hyperparams,
    environment=env,
    subnets = subnet_ids,
)

In [None]:
# Estimator fitting
xgb_script_mode_estimator.fit()

In [None]:
print(f"the trained model is available in S3 -> {xgb_script_mode_estimator.model_data}")



Remember that the training job that we ran is very "light", due to the very small dataset. As such, running locally on the notebook instance results in a faster execution time, compared to SageMaker. SageMaker takes longer time to run the job because it has to provision the training infrastructure. Since this example training job not very resource-intensive, the infrastructure provisioning process adds more overhead, compared to the training job itself.

In a real situation, where datasets are large, running on SageMaker can considerably speed up the execution process - and help us optimize costs, by keeping this interactive notebook environment modest and spinning up more powerful training job resources on-demand.

---

## Cleaning up

To avoid incurring future charges, delete the resources. You can do this by deleting the cloud formation template used to create the IAM role and the Amazon SageMaker Notebook.
![Cleaning Up](img/cfn-delete.png)

You will have to delete the Snowflake resources manually from the Snowflake console.

---

## Conclusion

In this notebook we saw how to download data stored in Snowflake table to Sagemaker Training job instance and train a XGBoost model using a custom training container. **This approach allows us to directly integrate Snowflake as a data source with Sagemaker notebook without having the data staged on S3.**