{ "cells": [ { "cell_type": "markdown", "id": "81a17362-e01b-4d09-99ac-ef7abadb8194", "metadata": {}, "source": [ "# Part One: Download data for Stable Diffuion with SageMaker Training\n", "In this notebook you'll use job parallelism to download more than 100GB of data from the Laion-5B dataset used with Stable Diffusion. This is broken into a few parts:\n", "1. Download the parquet files, these will be sent to S3 directly from your SageMaker job.\n", "2. Inspect the parquet files locally. You will want to upgrade your Studio instance for this.\n", "3. Use job parallelism to run many instances in parallel, each downloading all of the image/text pairs for one parquet file.\n", "\n", "\n", "Special note, this notebook is designed to work nicely with SageMaker Studio. You'll want to get comfortable upgrading and downgrading your instances here." ] }, { "cell_type": "markdown", "id": "cff80fae-7f0b-46be-9278-258a659a7da7", "metadata": {}, "source": [ "### Step 0. Update AWS botocore to enable SM Warm Pools" ] }, { "cell_type": "code", "execution_count": null, "id": "2da71f2e-dbc3-4cb7-b09b-33adb9544768", "metadata": { "tags": [] }, "outputs": [], "source": [ "%pip install --upgrade sagemaker\n", "%pip install boto3 --upgrade\n", "%pip install botocore --upgrade" ] }, { "cell_type": "markdown", "id": "1ff863d9-32de-41b4-a7a6-03bc02d2a7d5", "metadata": {}, "source": [ "### Step 1. Write a script to download the parquet files" ] }, { "cell_type": "markdown", "id": "5a204990-bdd2-4aa9-bbc2-593be25a1f59", "metadata": { "jp-MarkdownHeadingCollapsed": true, "tags": [] }, "source": [ "Using commands suggested [by Romain's original package here](https://github.com/rom1504/img2dataset/blob/main/dataset_examples/laion5B.md#normal). " ] }, { "cell_type": "code", "execution_count": null, "id": "c3bd7527-533f-4fa1-bab0-7d4a1b9dcf66", "metadata": {}, "outputs": [], "source": [ "!mkdir bootcamp_scripts" ] }, { "cell_type": "code", "execution_count": null, "id": "67ed82a7-2ae2-4359-b43e-18a1edde6d07", "metadata": {}, "outputs": [], "source": [ "%%writefile bootcamp_scripts/parquet_download.py\n", "\n", "import argparse\n", "import os\n", "\n", "def parse_args():\n", " \n", " parser = argparse.ArgumentParser() \n", " \n", " parser.add_argument(\"--bucket\", type=str, default=os.environ[\"SM_HP_BUCKET\"])\n", " \n", " parser.add_argument(\"--num_files\", type=int, default=os.environ[\"SM_HP_NUM_FILES\"])\n", "\n", " args = parser.parse_args()\n", " \n", " return args\n", "\n", "def get_part_ids(num_files):\n", "\n", " part_ids = []\n", " \n", " if num_files > 127:\n", " print ('error, currently Laion-5B only has 127 parquet files')\n", " return []\n", " \n", " for idx in range(0, num_files):\n", " part_id = '{}'.format(idx).zfill(5)\n", " part_ids.append(part_id)\n", "\n", " return part_ids\n", "\n", "def download_parquet(bucket, num_files):\n", "\n", " part_ids = get_part_ids(num_files)\n", "\n", " for p_id in part_ids:\n", "\n", " cmd = 'wget https://huggingface.co/datasets/laion/laion2B-en-joined/resolve/main/part-{}-4cfd6e30-f032-46ee-9105-8696034a8373-c000.snappy.parquet -O - | aws s3 cp - s3://{}/metadata/laion2B-en-joined/part-{}-4cfd6e30-f032-46ee-9105-8696034a8373-c000.snappy.parquet'.format(p_id, bucket, p_id)\n", "\n", " os.system(cmd)\n", "\n", "if __name__ == \"__main__\":\n", " \n", " args = parse_args()\n", " \n", " download_parquet(args.bucket, args.num_files)\n", " " ] }, { "cell_type": "markdown", "id": "550355b6-04b0-4c65-aa54-f55ec53712a4", "metadata": {}, "source": [ "### Step 2. Run on SageMaker Training" ] }, { "cell_type": "code", "execution_count": null, "id": "05266ab3-fced-459b-a76d-6ce7f989655d", "metadata": {}, "outputs": [], "source": [ "import sagemaker\n", "from sagemaker.pytorch import PyTorch\n", "\n", "sess = sagemaker.Session()\n", "role = sagemaker.get_execution_role()\n", "\n", "bucket = sess.default_bucket()\n", "\n", "hyperparameters = {\"bucket\":bucket, \"num_files\":10}\n", "\n", "estimator = PyTorch(\n", " entry_point=\"parquet_download.py\",\n", " base_job_name=\"sd-parquet-download\",\n", " role=role,\n", " source_dir=\"bootcamp_scripts\",\n", " # configures the SageMaker training resource, you can increase as you need\n", " instance_count=1,\n", " instance_type=\"ml.m5.large\",\n", " py_version=\"py38\",\n", " framework_version = '1.10',\n", " sagemaker_session=sess,\n", " debugger_hook_config=False,\n", " hyperparameters=hyperparameters,\n", " # enable warm pools for 60 minutes, useful for debugging\n", " keep_alive_period_in_seconds = 60 * 60)" ] }, { "cell_type": "code", "execution_count": null, "id": "5348713b-8468-407d-9bc5-8de517ed8cee", "metadata": {}, "outputs": [], "source": [ "estimator.fit(wait=False)" ] }, { "cell_type": "markdown", "id": "93cd50e2-0f47-4eba-99cb-d8623dc37a6d", "metadata": {}, "source": [ "### Step 3. Analyze parquet response\n", "You can check your S3 bucket to watch the parquet files come in. Once you have at least part-00000 downloaded, you can procede to analyze it here." ] }, { "cell_type": "code", "execution_count": null, "id": "08b8034a-fce5-4399-b785-1e97c28f24df", "metadata": {}, "outputs": [], "source": [ "path = 's3://{}/metadata/laion2B-en-joined/part-00000-4cfd6e30-f032-46ee-9105-8696034a8373-c000.snappy.parquet'.format(bucket)" ] }, { "cell_type": "code", "execution_count": null, "id": "6821aaed-3265-4ad0-b809-1cb0d15aba6b", "metadata": {}, "outputs": [], "source": [ "!mkdir parquet\n", "!aws s3 cp {path} parquet/" ] }, { "cell_type": "code", "execution_count": null, "id": "67e96cf0-572e-433b-89ed-575f1060e1f6", "metadata": { "tags": [] }, "outputs": [], "source": [ "import pandas as pd\n", "\n", "parquet_file = 'parquet/part-00000-4cfd6e30-f032-46ee-9105-8696034a8373-c000.snappy.parquet'\n", "\n", "# please make sure you are using a larger instance for your notebook here, as the parquet file is quite large\n", "# if your kernel dies, it's because you need to upgrade to one with more cores\n", "# I believe the smallest instance you can use here is the ml.m5.2xlarge\n", "df = pd.read_parquet(parquet_file)" ] }, { "cell_type": "code", "execution_count": null, "id": "d32d7f70-4e61-4163-9dd9-8ba6f4268f4c", "metadata": { "tags": [] }, "outputs": [], "source": [ "df.head()" ] }, { "cell_type": "markdown", "id": "dc2823f3-d592-458f-9bd9-5ea47cd03d88", "metadata": {}, "source": [ "### Step 4. Use job parallelism to download all of the image/text pairs" ] }, { "cell_type": "markdown", "id": "5761050e-afa1-4fb6-9bd4-63cc0cfe5b3c", "metadata": {}, "source": [ "Now, to scale this out, we need to send each parquet file as an input to the job. Then, the training script will use large machines, many running at the same time, to download all of the images. Each of these will then be copied back to S3. " ] }, { "cell_type": "code", "execution_count": null, "id": "0dd29360-35b2-4c14-b1e6-254451d477f9", "metadata": {}, "outputs": [], "source": [ "import sagemaker\n", "\n", "sess = sagemaker.Session()\n", "\n", "bucket = sess.default_bucket()\n", "\n", "# this should point to the parent S3 directory with all of your parquet files\n", "parquet_path = 's3://{}/metadata/laion2B-en-joined/'.format(bucket)\n", "\n", "!aws s3 ls {parquet_path} >> parquet_list.txt" ] }, { "cell_type": "code", "execution_count": null, "id": "a8822209-e25c-410a-83e3-352c33bb0f97", "metadata": {}, "outputs": [], "source": [ "parquet_list = []\n", "\n", "with open ('parquet_list.txt') as f:\n", " \n", " for row in f.readlines():\n", " r = row.strip()\n", " parquet_list.append(r.split(' ')[-1])" ] }, { "cell_type": "code", "execution_count": null, "id": "5a981420-9526-46d1-aaac-b424e9283606", "metadata": { "tags": [] }, "outputs": [], "source": [ "# take a look at this list and make sure all the parquet files seem valid. each of these will serve as an input to its own SageMaker job\n", "parquet_list" ] }, { "cell_type": "code", "execution_count": null, "id": "e278ca8c-2d5d-48e7-9fa2-b18b3258ff7a", "metadata": { "tags": [] }, "outputs": [], "source": [ "num_files = len(parquet_list)\n", "\n", "print ('About to run {} SM jobs to download all of your parquet files'.format(num_files))" ] }, { "cell_type": "code", "execution_count": null, "id": "102d8a4c-34af-48db-8ab6-7e3adcbea265", "metadata": {}, "outputs": [], "source": [ "import os\n", "\n", "def is_open(s3_path):\n", " # checks to see if there is anything in the specific S3 path\n", " # returns True if nothing is there\n", " cmd = 'aws s3 ls {}'.format(s3_path)\n", " res = os.system(cmd)\n", " if res == 256:\n", " return True\n", " else:\n", " return False" ] }, { "cell_type": "markdown", "id": "8d8e28d2-72ef-4827-bca8-493a198b5361", "metadata": {}, "source": [ "#### Define job parameters" ] }, { "cell_type": "code", "execution_count": null, "id": "0ac8d2e6-a456-4fa1-bdd7-629ea0eb7ec8", "metadata": {}, "outputs": [], "source": [ "%%writefile bootcamp_scripts/requirements.txt\n", "img2dataset\n", "s3fs" ] }, { "cell_type": "code", "execution_count": null, "id": "7d6f2ba5-2c93-4916-9f4b-4c0f3b911c40", "metadata": {}, "outputs": [], "source": [ "%%writefile bootcamp_scripts/download_data.py\n", "\n", "from img2dataset import download\n", "import shutil\n", "import os\n", "import multiprocessing\n", "import threading\n", "import argparse\n", "\n", "def parse_args():\n", " \n", " parser = argparse.ArgumentParser()\n", "\n", " # parser.add_argument(\"--model-dir\", type=str, default=os.environ[\"SM_MODEL_DIR\"])\n", " \n", " parser.add_argument(\"--cores\", type=int, default=multiprocessing.cpu_count())\n", "\n", " parser.add_argument(\"--threads\", type=int, default=threading.active_count())\n", " \n", " parser.add_argument(\"--parquet\", type=str, default=os.environ[\"SM_CHANNEL_PARQUET\"])\n", " \n", " parser.add_argument(\"--file_name\", type=str, default=os.environ[\"SM_HP_FILE_NAME\"])\n", " \n", " parser.add_argument(\"--bucket\", type=str, default=os.environ[\"SM_MODULE_DIR\"].split('/')[2])\n", " \n", " args = parser.parse_args()\n", " \n", " return args\n", "\n", "def prep_system():\n", " \n", " args = parse_args()\n", " \n", " # send joint path and file name\n", " url_list = \"{}/{}\".format(args.parquet, args.file_name)\n", " \n", " part_number = args.file_name.split('-')[1]\n", "\n", " # point to output path in S3\n", " s3_output = \"s3://{}/data/part-{}/\".format(args.bucket, part_number)\n", " \n", " return args, url_list, s3_output\n", "\n", " \n", "if __name__ == \"__main__\":\n", " \n", " args, url_list, s3_output = prep_system()\n", " \n", " download(\n", " processes_count=args.cores,\n", " thread_count=args.threads,\n", " # takes a single parquet file\n", " url_list=url_list,\n", " image_size=256,\n", " # copies to S3 directly, bypassing local disk\n", " output_folder=s3_output,\n", " # each image / caption pair is a tarball\n", " output_format=\"webdataset\",\n", " input_format=\"parquet\",\n", " url_col=\"URL\",\n", " caption_col=\"TEXT\",\n", " enable_wandb=False,\n", " number_sample_per_shard=1000,\n", " distributor=\"multiprocessing\",\n", " )\n", " " ] }, { "cell_type": "markdown", "id": "481082b9-c0c0-4b81-8010-76cfd2d2e96a", "metadata": {}, "source": [ "#### Loop through parquet files in S3 and run SageMaker training jobs" ] }, { "cell_type": "code", "execution_count": null, "id": "b4517436-fda8-4d7a-81c8-5186b0999591", "metadata": {}, "outputs": [], "source": [ "import sagemaker\n", "from sagemaker.pytorch import PyTorch\n", "\n", "sagemaker_session = sagemaker.Session()\n", "role = sagemaker.get_execution_role()\n", "\n", "bucket = sess.default_bucket()" ] }, { "cell_type": "code", "execution_count": null, "id": "210f25dc-5ede-440b-9445-bd2493450e5a", "metadata": { "tags": [] }, "outputs": [], "source": [ "def get_estimator(part_number, p_file, output_dir):\n", " \n", " # this passes the name of your parquet file as an input to the job\n", " hyperparameters = {\"file_name\": p_file}\n", "\n", " estimator = PyTorch(entry_point=\"download_data.py\",\n", " base_job_name=\"laion-part-{}\".format(part_number),\n", " role=role,\n", " source_dir=\"bootcamp_scripts\",\n", " # configures the SageMaker training resource, you can increase as you need\n", " instance_count=1,\n", " instance_type=\"ml.c5.18xlarge\",\n", " py_version=\"py36\",\n", " framework_version = '1.8',\n", " sagemaker_session=sagemaker_session,\n", " volume_size = 250,\n", " debugger_hook_config=False,\n", " hyperparameters=hyperparameters,\n", " output_path = output_dir)\n", " return estimator\n", "\n", "for p_file in parquet_list:\n", " \n", " part_number = p_file.split('-')[1]\n", "\n", " output_dir = \"s3://{}/data/part-{}/\".format(bucket, part_number)\n", "\n", " if is_open(output_dir):\n", "\n", " est = get_estimator(part_number, p_file, output_dir)\n", "\n", " est.fit({\"parquet\":\"s3://{}/metadata/laion2B-en-joined/{}\".format(bucket, p_file)}, wait=False)\n" ] }, { "cell_type": "markdown", "id": "463907af-705f-4f1b-8d57-a113fd691476", "metadata": {}, "source": [ "### Conclusion and next steps\n", "And that's a wrap! In this notebook you downloaded the metadata for the Laion-5B dataset, and then used job parallelism on SageMaker to run a full job for each parquet file.\n", "\n", "Your next task is to configure FSx for Lustre, and ensure your training script works nicely with this and SageMaker." ] }, { "cell_type": "code", "execution_count": null, "id": "f6b4f188-ca43-4dbd-b359-37a7b09e6a5c", "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "instance_type": "ml.m5.2xlarge", "kernelspec": { "display_name": "Python 3 (PyTorch 1.10 Python 3.8 CPU Optimized)", "language": "python", "name": "python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-east-1:081325390199:image/pytorch-1.10-cpu-py38" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.10" } }, "nbformat": 4, "nbformat_minor": 5 }