{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "This notebook demonstrates a prototype of a SageMaker processor that mimics SageMaker framework estimators:\n",
    "\n",
    "1. zip and upload a `source_dir` (and `dependencies`) to S3. This is done by calling the estimator's\n",
    "   implementation, hence it should inherit the same capabilities, e.g., `source_dir` or `dependencies`\n",
    "   can be a git repository, etc. See estimator docstring for more detail on the possible values\n",
    "   for `source_dir`, `entry_point`, and `dependencies`.\n",
    "2. processing job can unpack the `sourcedir.tar.gz`, then install `requirements.txt`,\n",
    "3. and finally run the python entrypoint.\n",
    "\n",
    "Steps:\n",
    "- **Action**: click *Kernel* -> *Restart Kernel and Run All Cells...* \n",
    "- **Expected outcome**: no exception seen."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%matplotlib inline\n",
    "%load_ext autoreload\n",
    "%autoreload 2\n",
    "%config InlineBackend.figure_format = 'retina'\n",
    "\n",
    "import sagemaker as sm\n",
    "from sagemaker.processing import ProcessingInput, ProcessingOutput\n",
    "from sagemaker.mxnet.estimator import MXNet\n",
    "from sagemaker.processing import ScriptProcessor\n",
    "\n",
    "import smconfig\n",
    "\n",
    "# Configuration of this screening test.\n",
    "sess = sm.Session()\n",
    "s3_bucket = 's3://my-bucket'  # Change this (but make sure to start with 's3://')\n",
    "sm_kwargs = smconfig.SmKwargs(sm.get_execution_role())\n",
    "s3_input_path = f'{s3_bucket}/smproc-stopgap/entrypoint-input'\n",
    "s3_sagemaker_path = f'{s3_bucket}/smproc-stopgap/sagemaker'\n",
    "\n",
    "# Propagate to env vars of the whole notebook, for usage by ! or %%.\n",
    "%set_env BUCKET=$s3_bucket\n",
    "%set_env S3_INPUT_PATH=$s3_input_path\n",
    "%set_env S3_SAGEMAKER_PATH=$s3_sagemaker_path"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We want to reuse the framework estimator for these functionalities:\n",
    "- logic to pack sourcedir + dependencies,\n",
    "- auto-detect container.\n",
    "\n",
    "In the next cell, we start by instantiating an `MXNet` estimator class, as we will use\n",
    "mxnet (training) container to run our processing job."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "estimator = MXNet(\n",
    "    entry_point='processing.py',\n",
    "    source_dir='./sourcedir',\n",
    "    framework_version='1.6.0',\n",
    "    py_version='py3',\n",
    "\n",
    "    # sourcedir.tar.gz and output use pre-defined bucket.\n",
    "    code_location=s3_sagemaker_path,\n",
    "    output_path=s3_sagemaker_path,\n",
    "\n",
    "    instance_count=1,\n",
    "    instance_type='ml.m5.large',\n",
    "    sagemaker_session=sess,\n",
    "    **sm_kwargs.train,\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Instantiate a `ScriptProcessor` and tells it to use the MXNet container. Then, immediately specify a job name. We'll use this\n",
    "jobname to mimic a few estimator's niceities, in particular:\n",
    "- all code artifacts will be uploaded to `s3://...../jobname/...`.\n",
    "- job output to `s3://..../jobname/...`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Define the code and runtime environment.\n",
    "processor = ScriptProcessor(\n",
    "    image_uri=estimator.training_image_uri(),\n",
    "    command=['/bin/bash'],\n",
    "    instance_count=1,\n",
    "    instance_type='ml.m5.large',\n",
    "    sagemaker_session=sess,\n",
    "    **sm_kwargs.processing,\n",
    ")\n",
    "\n",
    "# Generate job name and track it. We need to do this to set the S3 output path\n",
    "# to s3://mybucket/...../jobname/output/....\n",
    "#\n",
    "# See: https://github.com/aws/sagemaker-python-sdk/blob/570c67806f4f85f954d836d01c6bb06a24b939ee/src/sagemaker/processing.py#L315\n",
    "processing_job_name = processor._generate_current_job_name()\n",
    "%set_env PROCESSING_JOB_NAME=$processing_job_name"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now we can instruct the estimator to upload (sourcedir + dependencies) to `s3://.../jobname/source/sourcedir.tar.gz`. Note that SageMaker SDK automatically uses `source/sourcedir.tar.gz`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Zip source_dir/ and upload to specific area in S3.\n",
    "estimator._prepare_for_training(job_name=processing_job_name)\n",
    "print(f'Uploaded {estimator.source_dir} to', estimator._hyperparameters['sagemaker_submit_directory'])"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Next, we manually upload the bootstrapping code to a specific S3 bucket `s3://.../jobname/source/...`,\n",
    "otherwise SageMaker SDK always uploads to default_bucket() `s3://sagemaker-{}-{}/`. This works fine for\n",
    "account with create_s3_bucket permission, but not for restricted account."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Upload bootstrapping code to the same S3 directory as sourcedir.tar.gz\n",
    "!aws s3 cp ./runproc.sh $S3_SAGEMAKER_PATH/$PROCESSING_JOB_NAME/source/runproc.sh\n",
    "print('Uploaded bootstrapping code to the same directory as sagemaker_submit_directory')\n",
    "print(f'Bootstrapping script will run /opt/ml/input/code/{estimator._hyperparameters[\"sagemaker_program\"]}')\n",
    "\n",
    "# Environment variables to tell the bootstrapper (i.e., runproc.sh) the filename of python entrypoint.\n",
    "# The runproc.sh needs only sagemaker_program, but we'll inject all sagemaker_* just in case.\n",
    "processor.env = {k: str(v) for k, v in estimator._hyperparameters.items() if k.startswith('sagemaker_')}"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now it's time to submit a processing job."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Create a dummy input file\n",
    "!echo \"Dummy input file\" | aws s3 cp - $S3_INPUT_PATH/input.txt\n",
    "\n",
    "# Submit a processing job.\n",
    "processor.run(\n",
    "    # Bootstrapping code\n",
    "    code=f'{s3_sagemaker_path}/{processing_job_name}/source/runproc.sh',\n",
    "\n",
    "    job_name=processing_job_name,\n",
    "    inputs=[\n",
    "        ProcessingInput(source=s3_input_path, destination='/opt/ml/processing/input'),\n",
    "        \n",
    "        # ScriptProcessor job will download only s3://..../code/runproc.sh, hence we need to also\n",
    "        # inject our s3://.../sourcedir.tar.gz.\n",
    "        #\n",
    "        # We'll follow the exact same mechanism that ScriptProcessor does, which is to inject the\n",
    "        # S3 code artifact as a processing input with destination /opt/ml/processing/input/code/payload/.\n",
    "        #\n",
    "        # Unfortunately, as much as I'd like to put sourcedir.tar.gz to /opt/ml/processing/input/code/,\n",
    "        # this cannot be done as this destination is already used by the ScriptProcessor for runproc.sh,\n",
    "        # and the SDK requires each destination used by one input.\n",
    "        # - Note that the parameterized form of this path is available as ScriptProcessor._CODE_CONTAINER_BASE_PATH\n",
    "        #   and ScriptProcessor._CODE_CONTAINER_INPUT_NAME.\n",
    "        # - See: https://github.com/aws/sagemaker-python-sdk/blob/a7399455f5386d83ddc5cb15c0db00c04bd518ec/src/sagemaker/processing.py#L425-L426)\n",
    "        ProcessingInput(source=estimator._hyperparameters['sagemaker_submit_directory'], destination='/opt/ml/processing/input/code/payload/')\n",
    "    ],\n",
    "    outputs=[ProcessingOutput(source='/opt/ml/processing/output', destination=f'{s3_sagemaker_path}/{processing_job_name}/output')],\n",
    "    arguments=None,\n",
    "    wait=True,\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Once the job is done, let's probe the output..."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Probe output\n",
    "!aws s3 cp $S3_SAGEMAKER_PATH/$PROCESSING_JOB_NAME/output/processing.jsonl -"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "conda_python3",
   "language": "python",
   "name": "conda_python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.10"
  },
  "toc-autonumbering": true
 },
 "nbformat": 4,
 "nbformat_minor": 4
}