{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "This notebook is developed using ml.t3.medium instance with `Python 3 (Data Science)` kernel on SageMaker Studio." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Import SageMaker SDK and Create a Session" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "import boto3\n", "import sagemaker\n", "import time\n", "from time import gmtime, strftime\n", "\n", "session = sagemaker.Session()\n", "role = sagemaker.get_execution_role()\n", "aws_region = session.boto_region_name\n", "\n", "# Project Bucket\n", "bucket = session.default_bucket()\n", "dataset_prefix = 'medical-imaging/dataset'\n", "scaled_dataset_prefix = 'medical-imaging/scaled_dataset'\n", "scaled_zipped_dataset_prefix = 'medical-imaging/scaled_zipped_dataset'" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### VPC setup: subnets/SGs options" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "# please specify valid vpc subnet ID and security group ID to train within your\n", "# VPC of choice. This is required for model training with Amazon FSx for lustre.\n", "vpc_subnet_ids = ['subnet-xxxxxxxxxx']\n", "security_group_ids = ['sg-xxxxxxxxxx']" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Define a SageMaker `PyTorch Estimator`" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "from sagemaker.pytorch import PyTorch\n", "def get_pytorch_estimator(entry_point, hyperparameters, instance_type, \n", " instance_count, output_prefix, \n", " dist_training_config=None, volume_size=10, \n", " subnets=None, security_group_ids=None):\n", " pt_estimator = PyTorch(\n", " role=role,\n", " sagemaker_session=session,\n", " subnets=subnets,\n", " security_group_ids=security_group_ids,\n", "\n", " source_dir='src',\n", " entry_point=entry_point,\n", " hyperparameters=hyperparameters,\n", " py_version='py36',\n", " framework_version='1.6.0',\n", "\n", " instance_count=instance_count,\n", " instance_type=instance_type,\n", " volume_size=volume_size,\n", "\n", " enable_sagemaker_metrics=True,\n", " metric_definitions=metric_def,\n", "\n", " debugger_hook_config=False,\n", " disable_profiler=True,\n", " distribution=dist_training_config,\n", "\n", " code_location=f's3://{bucket}/{output_prefix}/output',\n", " output_path=f's3://{bucket}/{output_prefix}/output',\n", " max_run=432000 # Max runtime of of 5 days\n", " )\n", " \n", " return pt_estimator\n", "\n", "# Training loop metrics to persist\n", "metric_def = [\n", " {\n", " \"Name\": \"train_loss\",\n", " \"Regex\": \"train_loss: (.*?)$\",\n", " },\n", " {\n", " \"Name\": \"average_loss\",\n", " \"Regex\": \"average loss: (.*?)$\",\n", " },\n", " {\n", " \"Name\": \"mean_dice\",\n", " \"Regex\": \"current mean dice: (.*?) \",\n", " },\n", " {\n", " \"Name\": \"time_per_epoch\",\n", " \"Regex\": \"secs_time_per_epoch: (.*?)$\",\n", " },\n", " {\n", " \"Name\": \"dice_tc\",\n", " \"Regex\": \"tc: (.*?) \",\n", " },\n", " {\n", " \"Name\": \"dice_wt\",\n", " \"Regex\": \"wt: (.*?) \",\n", " },\n", " {\n", " \"Name\": \"dice_et\",\n", " \"Regex\": \"et: (.*?)$\",\n", " },\n", "]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Single GPU Device Experiments - Original Dataset 484 training pairs (4.65 GB)\n", "\n", "Run training for three of MONAI's dataset classes:\n", "1. `Dataset`: standard data loading\n", "2. `PersistentDataset`: persist processed data on disk\n", "2. `CacheDataset`: persist processed data in CPU memory" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "training_data_on_s3 = \"s3://{}/{}/Task01_BrainTumour\".format(bucket, dataset_prefix)\n", "\n", "hyperparameters = {\n", " 'torch_dataset_type': \"Dataset\",\n", " 'lr': 5e-3,\n", " 'epochs': 10,\n", " 'batch_size': 16,\n", " 'num_workers': 4\n", "}\n", " \n", " \n", "for dataset_type in ['Dataset', 'PersistentDataset', 'CacheDataset']:\n", "\n", " hyperparameters[\"torch_dataset_type\"] = dataset_type\n", " \n", " # Instanciate a training container with pytorch image\n", " WORKFLOW_DATE_TIME = strftime(\"%Y-%m-%d-%H-%M-%S\", gmtime())\n", " output_prefix = \"brats_ebs/{}/{}/sagemaker\".format(WORKFLOW_DATE_TIME, dataset_type)\n", " pt_estimator = get_pytorch_estimator('single_gpu_training.py', \n", " hyperparameters, \n", " 'ml.p3.2xlarge', \n", " 1, \n", " output_prefix, \n", " dist_training_config=None, \n", " volume_size=100)\n", "\n", "\n", " # Luanch training job\n", " pt_estimator.fit(\n", " job_name='monai-1gpu-{}-{}'.format(dataset_type, WORKFLOW_DATE_TIME),\n", " inputs={'train':training_data_on_s3},\n", " wait=False\n", " )\n", " time.sleep(1)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Results:\n", "The above runs should produce 3 training jobs. Visit the SageMaker training jobs for details on each. The `CacheDataset` run should be the fasttest, followed by `PersistentDataset` and `Dataset`" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Single GPU Device Experiment - Synthetically Scaled Dataset 48,400 training pairs (~450GB compressed, ~7TB decompressed)\n", " 4. `Dataset`: standard data loading but on large dataset\n", " \n", " Please make sure your account has 3,000GB quota for **[Size of EBS volume for an instance](https://docs.aws.amazon.com/general/latest/gr/sagemaker.html)** for a SageMaker training job. Please visit [AWS service quotas](https://docs.aws.amazon.com/general/latest/gr/aws_service_limits.html) page for requesting a quota increase." ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "scaled_training_data_on_s3 = \"s3://{}/{}/\".format(bucket, scaled_dataset_prefix)\n", "\n", "hyperparameters = {\n", " 'torch_dataset_type': \"Dataset\",\n", " 'lr': 5e-3,\n", " 'epochs': 10,\n", " 'batch_size': 16,\n", " 'num_workers': 4\n", "}\n", "\n", "# Instanciate a training container with pytorch image\n", "WORKFLOW_DATE_TIME = strftime(\"%Y-%m-%d-%H-%M-%S\", gmtime())\n", "output_prefix = \"brats_ebs/{}/{}/sagemaker\".format(WORKFLOW_DATE_TIME, \"ScaledDataset\")\n", "pt_estimator = get_pytorch_estimator('single_gpu_training.py', \n", " hyperparameters, \n", " 'ml.p3.2xlarge', \n", " 1, \n", " output_prefix, \n", " dist_training_config=None, \n", " volume_size=3000)\n", "\n", "# Luanch training job \n", "pt_estimator.fit(\n", " job_name='monai-1gpu-ScaledDataset-{}'.format(WORKFLOW_DATE_TIME),\n", " inputs={'train':scaled_training_data_on_s3},\n", " wait=False\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Results:\n", "The above run should produce 1 SageMaker training job. Visit the SageMaker training jobs for details on each.\n", "\n", "The average epoch should take around 31,000 seconds or 8 hours 37 minutes. **The entire 10 epoch training job should take close to four days to finish.** Moreover, we run into disk and memory limits if we try to use `PersistentDataset` or `CacheDataset`." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Solution:\n", "[SageMaker Distributed Data Parallel Training Library](https://docs.aws.amazon.com/sagemaker/latest/dg/data-parallel.html) + SageMaker Processing + FSx" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Step 1: SageMaker managed distributed image pre-processing\n", "Define data processing infrastructure " ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [], "source": [ "from sagemaker.processing import ScriptProcessor\n", "script_processor = ScriptProcessor(\n", " image_uri=\"763104351884.dkr.ecr.us-east-1.amazonaws.com/pytorch-training:1.8.1-cpu-py36-ubuntu18.04\",\n", " instance_count=20,\n", " instance_type='ml.c4.8xlarge',\n", " volume_size_in_gb=1024,\n", " role=sagemaker.get_execution_role(),\n", " command=['python3']\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Define input/output paths" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [], "source": [ "from sagemaker.processing import ProcessingInput, ProcessingOutput\n", "PROCESSING_JOB_DATETIME = strftime(\"%Y-%m-%d-%H-%M-%S\", gmtime())\n", "transformed_data_prefix = \"medical-imaging/transformed_scaled_dataset/\"+PROCESSING_JOB_DATETIME+\"/train\"\n", "processing_s3_output_path = \"s3://{}/{}\".format(bucket, transformed_data_prefix)\n", "\n", "\n", "ScriptProcessorOutput = [\n", " ProcessingOutput(\n", " output_name='train',\n", " destination=processing_s3_output_path,\n", " source='/opt/ml/processing/train'\n", " )\n", "]\n", "\n", "ScriptProcessorInput = [\n", " ProcessingInput(\n", " source=\"s3://{}/{}/\".format(bucket,scaled_zipped_dataset_prefix),\n", " destination='/opt/ml/processing/input',\n", " s3_data_distribution_type='ShardedByS3Key'\n", " )\n", "]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Run processing job" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "script_processor.run(job_name=\"brats-sharded-preprocessing-{}\".format(WORKFLOW_DATE_TIME),\n", " code='src/sharded_image_preprocessing.py',\n", " inputs=ScriptProcessorInput,\n", " outputs=ScriptProcessorOutput,\n", " wait=True\n", " )" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Step 2: FSx for Lustre\n", "Upon completion, the SageMaker distributed data processing job writes transformed data to `scaled_training_data_on_s3_combined`. To expedite data transfer from S3 to training hosts, we create a high performant file system using Amazon FSx for Lustre. *Note: you need to make sure you have the proper permission in the execution role. Please add [AmazonFSxFullAccess](https://us-east-1.console.aws.amazon.com/iam/home#/policies/arn:aws:iam::aws:policy/AmazonFSxFullAccess) and follow the page to [Add permissions to use data repositories in Amazon S3](https://docs.aws.amazon.com/fsx/latest/LustreGuide/setting-up.html#fsx-adding-permissions-s3).*" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fsx_client = boto3.client(\"fsx\")\n", "\n", "fsx_response = fsx_client.create_file_system(\n", " FileSystemType='LUSTRE',\n", " StorageCapacity=2400,\n", " StorageType='SSD',\n", " SubnetIds=[vpc_subnet_ids[0]],\n", " SecurityGroupIds=security_group_ids,\n", " LustreConfiguration={\n", " 'ImportPath': processing_s3_output_path+\"/\",\n", " 'DeploymentType': 'PERSISTENT_1',\n", " 'PerUnitStorageThroughput': 200\n", " }\n", ")\n", "\n", "\n", "fsx_status = \"CREATING\"\n", "while fsx_status == \"CREATING\":\n", " time.sleep(60)\n", " fsx_describe = fsx_client.describe_file_systems(\n", " FileSystemIds=[fsx_response[\"FileSystem\"][\"FileSystemId\"]]\n", " )\n", " fsx_status = fsx_describe[\"FileSystems\"][0][\"Lifecycle\"]\n", " print(fsx_status)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Step 3: Launch a distributed data-parallel training job with SageMaker" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "First, setup file-system as input for SageMaker training" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Specify FSx Lustre file system id.\n", "file_system_id = fsx_response[\"FileSystem\"][\"FileSystemId\"]\n", "# Specify FSx Lustre mount id.\n", "fsx_mount_id = fsx_response[\"FileSystem\"][\"LustreConfiguration\"][\"MountName\"]\n", "\n", "# Directory path for input data on the file system. \n", "file_system_directory_path = f'/{fsx_mount_id}/{transformed_data_prefix}'\n", "print(f'FSx file-system data input path:{file_system_directory_path}')\n", "\n", "# Specify the access mode of the mount of the directory associated with the file system. \n", "# Directory must be mounted 'ro'(read-only).\n", "file_system_access_mode = 'ro'\n", "\n", "# Specify your file system type.\n", "file_system_type = 'FSxLustre'\n", "\n", "from sagemaker.inputs import FileSystemInput\n", "train = FileSystemInput(\n", " file_system_id=file_system_id,\n", " file_system_type=file_system_type,\n", " directory_path=file_system_directory_path,\n", " file_system_access_mode=file_system_access_mode\n", ")\n", "\n", "data_channels = {'train': train}\n", "print(data_channels)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Create SageMaker PyTorch Estimator" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# output path: model artifacts and source code\n", "TRAINING_JOB_DATETIME = strftime(\"%Y-%m-%d-%H-%M-%S\", gmtime())\n", "output_prefix = \"brats_fsx/{}/sagemaker\".format(TRAINING_JOB_DATETIME)\n", "\n", "# compute resources\n", "instance_type = 'ml.p3.16xlarge'\n", "instance_count = 2\n", "world_size = instance_count * 8\n", "num_vcpu = 64\n", "num_workers = 16 \n", "\n", "# network hyperparameters\n", "hyperparameters = {'lr': 1e-4 * world_size,\n", " 'batch_size': 4 * world_size,\n", " 'epochs': 10,\n", " 'num_workers': num_workers\n", " }\n", "\n", "dist_config = {'smdistributed':\n", " {'dataparallel':{'enabled': True}}\n", " }\n", "\n", "pt_estimator = get_pytorch_estimator('multi_gpu_training.py',\n", " hyperparameters,\n", " instance_type,\n", " instance_count,\n", " output_prefix,\n", " dist_training_config=dist_config,\n", " subnets=vpc_subnet_ids,\n", " security_group_ids=security_group_ids)\n", "\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Launch training job" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "pt_estimator.fit(\n", " job_name='brats-2p316-fsx-64batch-16worker-{}'.format(TRAINING_JOB_DATETIME),\n", " inputs=data_channels,\n", " wait=True\n", ")" ] } ], "metadata": { "instance_type": "ml.t3.medium", "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.8" } }, "nbformat": 4, "nbformat_minor": 4 }