{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "This notebook demonstrates a prototype of a SageMaker processor that mimics SageMaker framework estimators:\n", "\n", "1. zip and upload a `source_dir` (and `dependencies`) to S3. This is done by calling the estimator's\n", " implementation, hence it should inherit the same capabilities, e.g., `source_dir` or `dependencies`\n", " can be a git repository, etc. See estimator docstring for more detail on the possible values\n", " for `source_dir`, `entry_point`, and `dependencies`.\n", "2. processing job can unpack the `sourcedir.tar.gz`, then install `requirements.txt`,\n", "3. and finally run the python entrypoint.\n", "\n", "Steps:\n", "- **Action**: click *Kernel* -> *Restart Kernel and Run All Cells...* \n", "- **Expected outcome**: no exception seen." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%matplotlib inline\n", "%load_ext autoreload\n", "%autoreload 2\n", "%config InlineBackend.figure_format = 'retina'\n", "\n", "import sagemaker as sm\n", "from sagemaker.processing import ProcessingInput, ProcessingOutput\n", "from sagemaker.mxnet.estimator import MXNet\n", "from sagemaker.processing import ScriptProcessor\n", "\n", "import smconfig\n", "\n", "# Configuration of this screening test.\n", "sess = sm.Session()\n", "s3_bucket = 's3://my-bucket' # Change this (but make sure to start with 's3://')\n", "sm_kwargs = smconfig.SmKwargs(sm.get_execution_role())\n", "s3_input_path = f'{s3_bucket}/smproc-stopgap/entrypoint-input'\n", "s3_sagemaker_path = f'{s3_bucket}/smproc-stopgap/sagemaker'\n", "\n", "# Propagate to env vars of the whole notebook, for usage by ! or %%.\n", "%set_env BUCKET=$s3_bucket\n", "%set_env S3_INPUT_PATH=$s3_input_path\n", "%set_env S3_SAGEMAKER_PATH=$s3_sagemaker_path" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We want to reuse the framework estimator for these functionalities:\n", "- logic to pack sourcedir + dependencies,\n", "- auto-detect container.\n", "\n", "In the next cell, we start by instantiating an `MXNet` estimator class, as we will use\n", "mxnet (training) container to run our processing job." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "estimator = MXNet(\n", " entry_point='processing.py',\n", " source_dir='./sourcedir',\n", " framework_version='1.6.0',\n", " py_version='py3',\n", "\n", " # sourcedir.tar.gz and output use pre-defined bucket.\n", " code_location=s3_sagemaker_path,\n", " output_path=s3_sagemaker_path,\n", "\n", " instance_count=1,\n", " instance_type='ml.m5.large',\n", " sagemaker_session=sess,\n", " **sm_kwargs.train,\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Instantiate a `ScriptProcessor` and tells it to use the MXNet container. Then, immediately specify a job name. We'll use this\n", "jobname to mimic a few estimator's niceities, in particular:\n", "- all code artifacts will be uploaded to `s3://...../jobname/...`.\n", "- job output to `s3://..../jobname/...`." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Define the code and runtime environment.\n", "processor = ScriptProcessor(\n", " image_uri=estimator.training_image_uri(),\n", " command=['/bin/bash'],\n", " instance_count=1,\n", " instance_type='ml.m5.large',\n", " sagemaker_session=sess,\n", " **sm_kwargs.processing,\n", ")\n", "\n", "# Generate job name and track it. We need to do this to set the S3 output path\n", "# to s3://mybucket/...../jobname/output/....\n", "#\n", "# See: https://github.com/aws/sagemaker-python-sdk/blob/570c67806f4f85f954d836d01c6bb06a24b939ee/src/sagemaker/processing.py#L315\n", "processing_job_name = processor._generate_current_job_name()\n", "%set_env PROCESSING_JOB_NAME=$processing_job_name" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now we can instruct the estimator to upload (sourcedir + dependencies) to `s3://.../jobname/source/sourcedir.tar.gz`. Note that SageMaker SDK automatically uses `source/sourcedir.tar.gz`." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Zip source_dir/ and upload to specific area in S3.\n", "estimator._prepare_for_training(job_name=processing_job_name)\n", "print(f'Uploaded {estimator.source_dir} to', estimator._hyperparameters['sagemaker_submit_directory'])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Next, we manually upload the bootstrapping code to a specific S3 bucket `s3://.../jobname/source/...`,\n", "otherwise SageMaker SDK always uploads to default_bucket() `s3://sagemaker-{}-{}/`. This works fine for\n", "account with create_s3_bucket permission, but not for restricted account." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Upload bootstrapping code to the same S3 directory as sourcedir.tar.gz\n", "!aws s3 cp ./runproc.sh $S3_SAGEMAKER_PATH/$PROCESSING_JOB_NAME/source/runproc.sh\n", "print('Uploaded bootstrapping code to the same directory as sagemaker_submit_directory')\n", "print(f'Bootstrapping script will run /opt/ml/input/code/{estimator._hyperparameters[\"sagemaker_program\"]}')\n", "\n", "# Environment variables to tell the bootstrapper (i.e., runproc.sh) the filename of python entrypoint.\n", "# The runproc.sh needs only sagemaker_program, but we'll inject all sagemaker_* just in case.\n", "processor.env = {k: str(v) for k, v in estimator._hyperparameters.items() if k.startswith('sagemaker_')}" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now it's time to submit a processing job." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Create a dummy input file\n", "!echo \"Dummy input file\" | aws s3 cp - $S3_INPUT_PATH/input.txt\n", "\n", "# Submit a processing job.\n", "processor.run(\n", " # Bootstrapping code\n", " code=f'{s3_sagemaker_path}/{processing_job_name}/source/runproc.sh',\n", "\n", " job_name=processing_job_name,\n", " inputs=[\n", " ProcessingInput(source=s3_input_path, destination='/opt/ml/processing/input'),\n", " \n", " # ScriptProcessor job will download only s3://..../code/runproc.sh, hence we need to also\n", " # inject our s3://.../sourcedir.tar.gz.\n", " #\n", " # We'll follow the exact same mechanism that ScriptProcessor does, which is to inject the\n", " # S3 code artifact as a processing input with destination /opt/ml/processing/input/code/payload/.\n", " #\n", " # Unfortunately, as much as I'd like to put sourcedir.tar.gz to /opt/ml/processing/input/code/,\n", " # this cannot be done as this destination is already used by the ScriptProcessor for runproc.sh,\n", " # and the SDK requires each destination used by one input.\n", " # - Note that the parameterized form of this path is available as ScriptProcessor._CODE_CONTAINER_BASE_PATH\n", " # and ScriptProcessor._CODE_CONTAINER_INPUT_NAME.\n", " # - See: https://github.com/aws/sagemaker-python-sdk/blob/a7399455f5386d83ddc5cb15c0db00c04bd518ec/src/sagemaker/processing.py#L425-L426)\n", " ProcessingInput(source=estimator._hyperparameters['sagemaker_submit_directory'], destination='/opt/ml/processing/input/code/payload/')\n", " ],\n", " outputs=[ProcessingOutput(source='/opt/ml/processing/output', destination=f'{s3_sagemaker_path}/{processing_job_name}/output')],\n", " arguments=None,\n", " wait=True,\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Once the job is done, let's probe the output..." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Probe output\n", "!aws s3 cp $S3_SAGEMAKER_PATH/$PROCESSING_JOB_NAME/output/processing.jsonl -" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "conda_python3", "language": "python", "name": "conda_python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.10" }, "toc-autonumbering": true }, "nbformat": 4, "nbformat_minor": 4 }