This notebook demonstrates a prototype of a SageMaker processor that mimics SageMaker framework estimators:

1. zip and upload a `source_dir` (and `dependencies`) to S3. This is done by calling the estimator's
 implementation, hence it should inherit the same capabilities, e.g., `source_dir` or `dependencies`
 can be a git repository, etc. See estimator docstring for more detail on the possible values
 for `source_dir`, `entry_point`, and `dependencies`.
2. processing job can unpack the `sourcedir.tar.gz`, then install `requirements.txt`,
3. and finally run the python entrypoint.

Steps:
- **Action**: click *Kernel* -> *Restart Kernel and Run All Cells...* 
- **Expected outcome**: no exception seen.

In [None]:
%matplotlib inline
%load_ext autoreload
%autoreload 2
%config InlineBackend.figure_format = 'retina'

import sagemaker as sm
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.mxnet.estimator import MXNet
from sagemaker.processing import ScriptProcessor

import smconfig

# Configuration of this screening test.
sess = sm.Session()
s3_bucket = 's3://my-bucket' # Change this (but make sure to start with 's3://')
sm_kwargs = smconfig.SmKwargs(sm.get_execution_role())
s3_input_path = f'{s3_bucket}/smproc-stopgap/entrypoint-input'
s3_sagemaker_path = f'{s3_bucket}/smproc-stopgap/sagemaker'

# Propagate to env vars of the whole notebook, for usage by ! or %%.
%set_env BUCKET=$s3_bucket
%set_env S3_INPUT_PATH=$s3_input_path
%set_env S3_SAGEMAKER_PATH=$s3_sagemaker_path

We want to reuse the framework estimator for these functionalities:
- logic to pack sourcedir + dependencies,
- auto-detect container.

In the next cell, we start by instantiating an `MXNet` estimator class, as we will use
mxnet (training) container to run our processing job.

In [None]:
estimator = MXNet(
 entry_point='processing.py',
 source_dir='./sourcedir',
 framework_version='1.6.0',
 py_version='py3',

 # sourcedir.tar.gz and output use pre-defined bucket.
 code_location=s3_sagemaker_path,
 output_path=s3_sagemaker_path,

 instance_count=1,
 instance_type='ml.m5.large',
 sagemaker_session=sess,
 **sm_kwargs.train,
)

Instantiate a `ScriptProcessor` and tells it to use the MXNet container. Then, immediately specify a job name. We'll use this
jobname to mimic a few estimator's niceities, in particular:
- all code artifacts will be uploaded to `s3://...../jobname/...`.
- job output to `s3://..../jobname/...`.

In [None]:
# Define the code and runtime environment.
processor = ScriptProcessor(
 image_uri=estimator.training_image_uri(),
 command=['/bin/bash'],
 instance_count=1,
 instance_type='ml.m5.large',
 sagemaker_session=sess,
 **sm_kwargs.processing,
)

# Generate job name and track it. We need to do this to set the S3 output path
# to s3://mybucket/...../jobname/output/....
#
# See: https://github.com/aws/sagemaker-python-sdk/blob/570c67806f4f85f954d836d01c6bb06a24b939ee/src/sagemaker/processing.py#L315
processing_job_name = processor._generate_current_job_name()
%set_env PROCESSING_JOB_NAME=$processing_job_name

Now we can instruct the estimator to upload (sourcedir + dependencies) to `s3://.../jobname/source/sourcedir.tar.gz`. Note that SageMaker SDK automatically uses `source/sourcedir.tar.gz`.

In [None]:
# Zip source_dir/ and upload to specific area in S3.
estimator._prepare_for_training(job_name=processing_job_name)
print(f'Uploaded {estimator.source_dir} to', estimator._hyperparameters['sagemaker_submit_directory'])

Next, we manually upload the bootstrapping code to a specific S3 bucket `s3://.../jobname/source/...`,
otherwise SageMaker SDK always uploads to default_bucket() `s3://sagemaker-{}-{}/`. This works fine for
account with create_s3_bucket permission, but not for restricted account.

In [None]:
# Upload bootstrapping code to the same S3 directory as sourcedir.tar.gz
!aws s3 cp ./runproc.sh $S3_SAGEMAKER_PATH/$PROCESSING_JOB_NAME/source/runproc.sh
print('Uploaded bootstrapping code to the same directory as sagemaker_submit_directory')
print(f'Bootstrapping script will run /opt/ml/input/code/{estimator._hyperparameters["sagemaker_program"]}')

# Environment variables to tell the bootstrapper (i.e., runproc.sh) the filename of python entrypoint.
# The runproc.sh needs only sagemaker_program, but we'll inject all sagemaker_* just in case.
processor.env = {k: str(v) for k, v in estimator._hyperparameters.items() if k.startswith('sagemaker_')}

Now it's time to submit a processing job.

In [None]:
# Create a dummy input file
!echo "Dummy input file" | aws s3 cp - $S3_INPUT_PATH/input.txt

# Submit a processing job.
processor.run(
 # Bootstrapping code
 code=f'{s3_sagemaker_path}/{processing_job_name}/source/runproc.sh',

 job_name=processing_job_name,
 inputs=[
 ProcessingInput(source=s3_input_path, destination='/opt/ml/processing/input'),
 
 # ScriptProcessor job will download only s3://..../code/runproc.sh, hence we need to also
 # inject our s3://.../sourcedir.tar.gz.
 #
 # We'll follow the exact same mechanism that ScriptProcessor does, which is to inject the
 # S3 code artifact as a processing input with destination /opt/ml/processing/input/code/payload/.
 #
 # Unfortunately, as much as I'd like to put sourcedir.tar.gz to /opt/ml/processing/input/code/,
 # this cannot be done as this destination is already used by the ScriptProcessor for runproc.sh,
 # and the SDK requires each destination used by one input.
 # - Note that the parameterized form of this path is available as ScriptProcessor._CODE_CONTAINER_BASE_PATH
 # and ScriptProcessor._CODE_CONTAINER_INPUT_NAME.
 # - See: https://github.com/aws/sagemaker-python-sdk/blob/a7399455f5386d83ddc5cb15c0db00c04bd518ec/src/sagemaker/processing.py#L425-L426)
 ProcessingInput(source=estimator._hyperparameters['sagemaker_submit_directory'], destination='/opt/ml/processing/input/code/payload/')
 ],
 outputs=[ProcessingOutput(source='/opt/ml/processing/output', destination=f'{s3_sagemaker_path}/{processing_job_name}/output')],
 arguments=None,
 wait=True,
)

Once the job is done, let's probe the output...

In [None]:
# Probe output
!aws s3 cp $S3_SAGEMAKER_PATH/$PROCESSING_JOB_NAME/output/processing.jsonl -