{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import pandas as pd\n", "import os\n", "import sys\n", "import time\n", "import sagemaker\n", "import random\n", "import boto3\n", "import numpy as np\n", "import math\n", "from sagemaker.sklearn.processing import SKLearnProcessor\n", "from sagemaker.processing import ProcessingInput, ProcessingOutput\n", "from time import gmtime, strftime\n", "from generate_synthetic_housing_data import *" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "notebook_dir = os.getcwd()\n", "package_dir = '/' + '/'.join(notebook_dir.split('/')[1:-1])\n", "sys.path.append(package_dir)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%load_ext autoreload\n", "%autoreload 2\n", "from feature_store import *\n", "from ml_lineage_helper import *\n", "from ml_lineage_helper.query_lineage import *" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Session variables**" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "role_arn = sagemaker.get_execution_role()\n", "try:\n", " role_name=role_arn.split('/')[2]\n", "except:\n", " role_name=role_arn.split('/')[1]\n", "\n", "sagemaker_session = SageMakerSession()\n", "\n", "feature_group_name = 'synthetic-housing-data-2'\n", "feature_group_description = 'Synthetic housing Feature Group'\n", "s3_prefix = 'ml-lineage-synthetic-housing-2'\n", "model_name = 'pytorch-hosted-model-v9'" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Process data" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Create local directory structure." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "random.seed(113)\n", "\n", "data_dir = os.path.join(os.getcwd(), 'data')\n", "os.makedirs(data_dir, exist_ok=True)\n", "\n", "train_dir = os.path.join(os.getcwd(), 'data/train')\n", "os.makedirs(train_dir, exist_ok=True)\n", "\n", "test_dir = os.path.join(os.getcwd(), 'data/test')\n", "os.makedirs(test_dir, exist_ok=True)\n", "\n", "raw_dir = os.path.join(os.getcwd(), 'data/raw')\n", "os.makedirs(raw_dir, exist_ok=True)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Load data locally and upload data to S3." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df = generate_houses(1506)\n", "\n", "# Get training columns\n", "train_cols = list(df.columns)\n", "del train_cols[-1]\n", "train_cols\n", "\n", "# Split data\n", "training_index = math.floor(0.8 * df.shape[0])\n", "x_train, y_train = df[train_cols][:training_index], df.PRICE[:training_index]\n", "x_test, y_test = df[train_cols][training_index:], df.PRICE[training_index:]\n", "\n", "# Scale price\n", "y_train = y_train / 100000\n", "y_test = y_test / 100000\n", "\n", "# Save locally\n", "np.save(os.path.join(raw_dir, 'x_train.npy'), x_train)\n", "np.save(os.path.join(raw_dir, 'x_test.npy'), x_test)\n", "np.save(os.path.join(train_dir, 'y_train.npy'), y_train)\n", "np.save(os.path.join(test_dir, 'y_test.npy'), y_test)\n", "\n", "# Upload to S3\n", "rawdata_s3_prefix = '{}/data/raw'.format(s3_prefix)\n", "raw_s3 = sagemaker_session.session.upload_data(path='./data/raw/', key_prefix=rawdata_s3_prefix)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Process data with SageMaker Processing." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%writefile preprocessing.py\n", "\n", "import glob\n", "import numpy as np\n", "import os\n", "from sklearn.preprocessing import StandardScaler\n", "\n", "if __name__=='__main__':\n", " \n", " input_files = glob.glob('{}/*.npy'.format('/opt/ml/processing/input'))\n", " print('\\nINPUT FILE LIST: \\n{}\\n'.format(input_files))\n", " scaler = StandardScaler()\n", " for file in input_files:\n", " raw = np.load(file)\n", " transformed = scaler.fit_transform(raw)\n", " if 'train' in file:\n", " output_path = os.path.join('/opt/ml/processing/train', 'x_train.npy')\n", " np.save(output_path, transformed)\n", " print('SAVED TRANSFORMED TRAINING DATA FILE\\n')\n", " else:\n", " output_path = os.path.join('/opt/ml/processing/test', 'x_test.npy')\n", " np.save(output_path, transformed)\n", " print('SAVED TRANSFORMED TEST DATA FILE\\n')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sklearn_processor = SKLearnProcessor(framework_version='0.20.0',\n", " role=sagemaker_session.role_arn,\n", " instance_type='ml.m5.xlarge',\n", " instance_count=2)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "processing_job_name = \"{}-preprocessing-{}\".format(s3_prefix, strftime(\"%d-%H-%M-%S\", gmtime()))\n", "output_destination = 's3://{}/{}/data'.format(sagemaker_session.bucket_name, s3_prefix)\n", "\n", "# code=can be a s3 uri for the input script\n", "sklearn_processor.run(code='preprocessing.py',\n", " job_name=processing_job_name,\n", " inputs=[ProcessingInput(\n", " source=raw_s3,\n", " destination='/opt/ml/processing/input',\n", " s3_data_distribution_type='ShardedByS3Key')],\n", " outputs=[ProcessingOutput(output_name='train',\n", " destination='{}/train'.format(output_destination),\n", " source='/opt/ml/processing/train'),\n", " ProcessingOutput(output_name='test',\n", " destination='{}/test'.format(output_destination),\n", " source='/opt/ml/processing/test')])\n", "\n", "preprocessing_job_description = sklearn_processor.jobs[-1].describe()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Getting data into a Feature Store" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "So we've got data that's already been processed. In our case, it's a synthetic housing dataset and it's been standardized." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "train_in_s3 = '{}/train/x_train.npy'.format(output_destination)\n", "test_in_s3 = '{}/test/x_test.npy'.format(output_destination)\n", "!aws s3 cp {train_in_s3} ./data/train/x_train.npy\n", "!aws s3 cp {test_in_s3} ./data/test/x_test.npy" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "x_train = np.load('./data/train/x_train.npy')\n", "x_test = np.load('./data/test/x_test.npy')\n", "\n", "# Convert to Pandas and standardize\n", "train_df = pd.DataFrame(data=x_train)\n", "train_df['target'] = y_train\n", "first_col = train_df.pop('target')\n", "train_df.insert(0, 'target', first_col)\n", "\n", "test_df = pd.DataFrame(data=x_test)\n", "test_df['target'] = y_test.reset_index(drop=True)\n", "first_col = test_df.pop('target')\n", "test_df.insert(0, 'target', first_col)\n", "\n", "# Add train/test indicator variable\n", "train_df['train'] = 1\n", "test_df['train'] = 0\n", "\n", "# Add column names\n", "column_names = list(df.columns)\n", "column_names.remove('PRICE')\n", "column_names.insert(0, 'target')\n", "column_names.append('train')\n", "column_rename_dict = {}\n", "for i, v in enumerate(train_df.columns):\n", " column_rename_dict[v] = column_names[i]\n", "train_df.rename(columns=column_rename_dict, inplace=True)\n", "test_df.rename(columns=column_rename_dict, inplace=True)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "But we want to put this data into a Feature Store so that other data scientists can use this data for building models without having to go through the pre-processing steps again.\n", "\n", "So let's get our DataFrame into the Feature Store by creating a new Feature Group and ingesting the data from the dataframe into that Feature Group. By default, ingestion in turned on, but you can turn it off by passing in the parameter `ingest=False`." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "offline_prefix = feature_group_name.replace('-', '_')\n", "feature_store = FeatureStore(feature_group_name, sagemaker_session)\n", "feature_group = feature_store.create_feature_group(train_df,\n", " feature_group_description,\n", " f'{sagemaker_session.bucket_s3_uri}/{offline_prefix}')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Ingest the test data into the Feature Group as well." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "feature_group = feature_store.create_feature_group(test_df,\n", " feature_group_description,\n", " f'{sagemaker_session.bucket_s3_uri}/{offline_prefix}')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Get training and test data from the Feature Store" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Get training data." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Wait for online FS data to be replicated to offline FS\n", "time.sleep(60*5)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "feature_store = FeatureStore(feature_group_name, sagemaker_session)\n", "\n", "query = \"\"\"\n", "select *\n", "from \"{0}\"\n", "where train=1\n", "\"\"\".format(feature_store.table_name)\n", "\n", "train_df, athena_query = feature_store.query_feature_group(query)\n", "train_df.head()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Get test data." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "query = \"\"\"\n", "select *\n", "from \"{0}\"\n", "where train=0\n", "\"\"\".format(feature_store.table_name)\n", "\n", "test_df, athena_query = feature_store.query_feature_group(query)\n", "test_df.head()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Only select features we care about and ignore the metadata columns\n", "train_df = train_df.iloc[:,:-6]\n", "test_df = test_df.iloc[:,:-6]" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "train_s3 = f's3://{sagemaker_session.bucket_name}/{s3_prefix}/train.npy'\n", "test_s3 = f's3://{sagemaker_session.bucket_name}/{s3_prefix}/test.npy'\n", "upload_df_to_s3(train_df,\n", " train_s3,\n", " sagemaker_session,\n", " csv=False)\n", "upload_df_to_s3(test_df,\n", " test_s3,\n", " sagemaker_session,\n", " csv=False)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### ML Lineage Tracking for Training and Deployment" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.pytorch import PyTorch\n", "\n", "\n", "inputs = {'train': train_s3, 'test': test_s3}\n", "#inputs = {'train': 'file://data/train.npy', 'test': 'file://data/test.npy'}\n", "\n", "hyperparameters = {'epochs': 30, 'batch_size': 128, 'learning_rate': 0.01}\n", "\n", "# Metrics to be captured from logs.\n", "metric_definitions = [{'Name': 'loss',\n", " 'Regex': ' loss: ([0-9\\\\.]+)'},\n", " {'Name': 'val_loss',\n", " 'Regex': 'Test MSE: ([0-9\\\\.]+)'}]\n", "\n", "instance_type = 'ml.c5.xlarge'\n", "estimator_parameters = {'source_dir': 'pytorch-model',\n", " 'entry_point':'train_deploy.py',\n", " 'instance_type' : instance_type,\n", " 'instance_count': 1,\n", " 'hyperparameters': hyperparameters,\n", " 'role' : sagemaker_session.role_arn,\n", " 'base_job_name':'pytorch-hosted-model',\n", " 'framework_version':'1.5.0',\n", " 'py_version':'py3',\n", " 'metric_definitions':metric_definitions}\n", "\n", "estimator = PyTorch(**estimator_parameters)\n", "\n", "estimator.fit(inputs=inputs)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Create model if you haven't already used it to deploy a real-time endpoint\n", "# or do a Batch Transform job\n", "from sagemaker.pytorch import PyTorchModel\n", "\n", "model = PyTorchModel(entry_point='train_deploy.py', source_dir='pytorch-model',\n", " model_data=estimator.model_data, role=sagemaker_session.role_arn,\n", " framework_version='1.5.0', py_version='py3', name=model_name,\n", " sagemaker_session=sagemaker.Session())\n", "model._create_sagemaker_model(instance_type=instance_type)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Get repo links to processing and training code\n", "processing_code_repo_url = get_repo_link(os.getcwd(), 'processing.py')\n", "training_code_repo_url = get_repo_link(os.getcwd(), 'pytorch-model/train_deploy.py', processing_code=False)\n", "repo_links = [processing_code_repo_url, training_code_repo_url]\n", "\n", "ml_lineage = MLLineageHelper()\n", "lineage = ml_lineage.create_ml_lineage(estimator, model_name=model_name,\n", " query=query,\n", " sagemaker_processing_job_description=preprocessing_job_description,\n", " feature_group_names=[feature_group_name],\n", " repo_links=repo_links)\n", "lineage" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "ml_lineage.graph()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "If you want to get the lineage of any SageMaker model, you can use the following snippet of code." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "lineage = MLLineageHelper(sagemaker_model_name_or_model_s3_uri=model_name)\n", "lineage.df" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "If you have a data source, you can find associated Feature Groups by providing the data source's S3 URI or Artifact ARN:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "query_lineage = QueryLineage()\n", "query_lineage.get_feature_groups_from_data_source(train_s3)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "You can also start with a Feature Group, and find associated data sources:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "query_lineage.get_data_sources_from_feature_group(feature_group.describe()['FeatureGroupArn'],\n", " max_depth=3)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Given a Feature Group, you can also find associated models:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "query_lineage.get_models_from_feature_group(feature_group.describe()['FeatureGroupArn'])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Given a SageMaker model name or artifact ARN, you can find associated Feature Groups." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "query_lineage.get_feature_groups_from_model(model_name)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Cleanup" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "dest_arns = lineage.df['Artifact Destination ARN'].values\n", "for arn in dest_arns:\n", " ml_lineage.delete_associations(arn)" ] } ], "metadata": { "instance_type": "ml.t3.medium", "kernelspec": { "display_name": "Python 3 (Data Science)", "language": "python", "name": "python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-west-2:236514542706:image/datascience-1.0" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.10" } }, "nbformat": 4, "nbformat_minor": 5 }