{"cells": [{"cell_type": "markdown", "metadata": {}, "source": ["# Workflow\n", "\n", "The following notebook contains the step functions workflow definition for training and baseline jobs.\n", "\n", "This can be run after you have started the [mlops](mlops.ipynb) build and have stored `input_data`."]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["# Import the latest sagemaker, stepfunctions and boto3 SDKs\n", "import sys\n", "!{sys.executable} -m pip install --upgrade pip\n", "!{sys.executable} -m pip install -qU awscli boto3 \"sagemaker>=2.1.0<3\"\n", "!{sys.executable} -m pip install -qU \"stepfunctions==2.0.0\"\n", "!{sys.executable} -m pip show sagemaker stepfunctions"]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["import boto3\n", "import json\n", "import os\n", "import time\n", "import uuid\n", "from botocore.exceptions import ClientError\n", "\n", "import sagemaker\n", "from sagemaker.image_uris import retrieve \n", "from sagemaker.processing import Processor, ProcessingInput, ProcessingOutput\n", "from sagemaker.model_monitor.dataset_format import DatasetFormat\n", "\n", "import stepfunctions\n", "from stepfunctions import steps\n", "from stepfunctions.inputs import ExecutionInput\n", "from stepfunctions.workflow import Workflow"]}, {"cell_type": "markdown", "metadata": {}, "source": ["Load the input data from the `mlops.ipynb` notebook and print values"]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["%store -r input_data PROVISIONED_PRODUCT_NAME\n", "input_data, PROVISIONED_PRODUCT_NAME"]}, {"cell_type": "markdown", "metadata": {}, "source": ["Load variables from environment"]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["def get_config(provisioned_product_name):\n", " sc = boto3.client(\"servicecatalog\")\n", " outputs = sc.get_provisioned_product_outputs(ProvisionedProductName=provisioned_product_name)[\n", " \"Outputs\"\n", " ]\n", " config = {}\n", " for out in outputs:\n", " config[out[\"OutputKey\"]] = out[\"OutputValue\"]\n", " return config\n", "\n", "\n", "config = get_config(PROVISIONED_PRODUCT_NAME)\n", "region = config[\"Region\"]\n", "model_name = config[\"ModelName\"]\n", "role = config[\"SageMakerRoleARN\"]\n", "workflow_role_arn = config[\"WorkflowRoleARN\"]\n", "\n", "\n", "# Define the lambda function names for steps\n", "create_experiment_function_name = 'mlops-create-experiment'\n", "query_training_function_name = 'mlops-query-training'\n", "transform_header_function_name = 'mlops-add-transform-header'\n", "query_drift_function_name = 'mlops-query-drift'\n", "\n", "# Get the session and default bucket\n", "session = sagemaker.session.Session()\n", "bucket = session.default_bucket()\n", "\n", "print('region: {}'.format(region))\n", "print('bucket: {}'.format(bucket))\n", "print('sagemaker role: {}'.format(role))\n", "print('workflow role: {}'.format(workflow_role_arn))"]}, {"cell_type": "markdown", "metadata": {}, "source": ["Specify the training model and transform output base uri"]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["output_data = {\n", " 'ModelOutputUri': 's3://{}/{}/model'.format(bucket, model_name), \n", "}"]}, {"cell_type": "markdown", "metadata": {}, "source": ["## Define Training Resources\n", "\n", "### Input Schema\n", "\n", "Define the input schema for the step functions which can then be used as arguments to resources"]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["execution_input = ExecutionInput(\n", " schema={\n", " \"GitBranch\": str,\n", " \"GitCommitHash\": str,\n", " \"DataVersionId\": str,\n", " \"ExperimentName\": str,\n", " \"TrialName\": str,\n", " \"BaselineJobName\": str,\n", " \"BaselineOutputUri\": str,\n", " \"TrainingJobName\": str,\n", " \"ModelName\": str\n", " }\n", ")"]}, {"cell_type": "markdown", "metadata": {}, "source": ["### Define the model monitor baseline\n", "\n", "Define the environment variables"]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["dataset_format = DatasetFormat.csv()\n", "env = {\n", " \"dataset_format\": json.dumps(dataset_format),\n", " \"dataset_source\": \"/opt/ml/processing/input/baseline_dataset_input\",\n", " \"output_path\": \"/opt/ml/processing/output\",\n", " \"publish_cloudwatch_metrics\": \"Disabled\", # Have to be disabled from processing job?\n", "}"]}, {"cell_type": "markdown", "metadata": {}, "source": ["Define the processing inputs and outputs "]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["inputs = [\n", " ProcessingInput(\n", " source=input_data['BaselineUri'],\n", " destination=\"/opt/ml/processing/input/baseline_dataset_input\",\n", " input_name=\"baseline_dataset_input\",\n", " ),\n", "]\n", "outputs = [\n", " ProcessingOutput(\n", " source=\"/opt/ml/processing/output\",\n", " destination=execution_input[\"BaselineOutputUri\"],\n", " output_name=\"monitoring_output\",\n", " ),\n", "]"]}, {"cell_type": "markdown", "metadata": {}, "source": ["Create the baseline processing job using the sagemaker [model monitor](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_monitoring.html) container."]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["# Get the default model monitor container\n", "monor_monitor_container_uri = retrieve(region=region, framework=\"model-monitor\", version=\"latest\")\n", "\n", "# Use the base processing where we pass through the \n", "monitor_analyzer = Processor(\n", " image_uri=monor_monitor_container_uri,\n", " role=role, \n", " instance_count=1,\n", " instance_type=\"ml.m5.xlarge\",\n", " max_runtime_in_seconds=1800,\n", " env=env\n", ")"]}, {"cell_type": "markdown", "metadata": {}, "source": ["Test the model baseline processing job by running inline"]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["# monitor_analyzer.run(inputs=inputs, outputs=outputs, wait=True)"]}, {"cell_type": "markdown", "metadata": {}, "source": ["### Defining the Training Job\n", "\n", "Define the training job to run in paralell with the processing job"]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["image_uri = sagemaker.image_uris.retrieve(region=region, framework=\"xgboost\", version=\"latest\")\n", "\n", "# Create the estimator\n", "xgb = sagemaker.estimator.Estimator(\n", " image_uri,\n", " role,\n", " instance_count=1,\n", " instance_type=\"ml.m4.xlarge\",\n", " output_path=output_data['ModelOutputUri'], # NOTE: Can't use execution_input here\n", ")\n", "\n", "# Set the hyperparameters overriding with any defaults\n", "hyperparameters = {\n", " \"max_depth\": \"9\",\n", " \"eta\": \"0.2\",\n", " \"gamma\": \"4\",\n", " \"min_child_weight\": \"300\",\n", " \"subsample\": \"0.8\",\n", " \"objective\": \"reg:linear\",\n", " \"early_stopping_rounds\": \"10\",\n", " \"num_round\": \"50\", # Don't stop to early or results are bad\n", "}\n", "xgb.set_hyperparameters(**hyperparameters)\n", "\n", "# Specify the data source\n", "s3_input_train = sagemaker.inputs.TrainingInput(s3_data=input_data['TrainingUri'], content_type=\"csv\")\n", "s3_input_val = sagemaker.inputs.TrainingInput(s3_data=input_data['ValidationUri'], content_type=\"csv\")\n", "data = {\"train\": s3_input_train, \"validation\": s3_input_val}"]}, {"cell_type": "markdown", "metadata": {}, "source": ["Test the estimator directly in the notebook"]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["# xgb.fit(inputs=data)"]}, {"cell_type": "markdown", "metadata": {}, "source": ["## Define Training Workflow\n", "\n", "### 1. Create the Experiment\n", "\n", "Define the create experiment lambda.\n", "\n", "In future add [ResultsPath](https://docs.aws.amazon.com/step-functions/latest/dg/input-output-resultpath.html) to filter the results."]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["create_experiment_step = steps.compute.LambdaStep(\n", " 'Create Experiment',\n", " parameters={ \n", " \"FunctionName\": create_experiment_function_name,\n", " 'Payload': {\n", " \"ExperimentName.$\": '$.ExperimentName',\n", " \"TrialName.$\": '$.TrialName',\n", " }\n", " },\n", " result_path='$.CreateTrialResults'\n", ")"]}, {"cell_type": "markdown", "metadata": {}, "source": ["### 2a. Run processing Job\n", "\n", "Define the processing job with a specific failure handling"]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["baseline_step = steps.sagemaker.ProcessingStep(\n", " \"Baseline Job\",\n", " processor=monitor_analyzer,\n", " job_name=execution_input[\"BaselineJobName\"],\n", " inputs=inputs,\n", " outputs=outputs,\n", " experiment_config={\n", " 'ExperimentName': execution_input[\"ExperimentName\"], # '$.ExperimentName', \n", " 'TrialName': execution_input[\"TrialName\"],\n", " 'TrialComponentDisplayName': \"Baseline\",\n", " },\n", " tags={\n", " \"GitBranch\": execution_input[\"GitBranch\"],\n", " \"GitCommitHash\": execution_input[\"GitCommitHash\"],\n", " \"DataVersionId\": execution_input[\"DataVersionId\"],\n", " },\n", " result_path='$.BaselineJobResults'\n", ")\n", "\n", "baseline_step.add_catch(steps.states.Catch(\n", " error_equals=[\"States.TaskFailed\"],\n", " next_step=stepfunctions.steps.states.Fail(\n", " \"Baseline failed\", cause=\"SageMakerBaselineJobFailed\"\n", " ),\n", "))"]}, {"cell_type": "markdown", "metadata": {}, "source": ["### 2b. Run and query training Job\n", "\n", "Define the training job and add a validation step"]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["training_step = steps.TrainingStep(\n", " \"Training Job\",\n", " estimator=xgb,\n", " data=data,\n", " job_name=execution_input[\"TrainingJobName\"],\n", " experiment_config={\n", " 'ExperimentName': execution_input[\"ExperimentName\"],\n", " 'TrialName': execution_input[\"TrialName\"],\n", " 'TrialComponentDisplayName': \"Training\",\n", " },\n", " tags={\n", " \"GitBranch\": execution_input[\"GitBranch\"],\n", " \"GitCommitHash\": execution_input[\"GitCommitHash\"],\n", " \"DataVersionId\": execution_input[\"DataVersionId\"],\n", " },\n", " result_path='$.TrainingResults'\n", ")\n", "\n", "training_step.add_catch(stepfunctions.steps.states.Catch(\n", " error_equals=[\"States.TaskFailed\"],\n", " next_step=stepfunctions.steps.states.Fail(\n", " \"Training failed\", cause=\"SageMakerTrainingJobFailed\"\n", " ),\n", "))"]}, {"cell_type": "markdown", "metadata": {}, "source": ["Create a model from the training job, note this must follow training to retrieve the expected model"]}, {"cell_type": "code", "execution_count": null, "metadata": {"scrolled": true}, "outputs": [], "source": ["# Must follow the training test\n", "model_step = steps.sagemaker.ModelStep(\n", " 'Save Model',\n", " input_path='$.TrainingResults',\n", " model=training_step.get_expected_model(),\n", " model_name=execution_input['ModelName'],\n", " result_path='$.ModelStepResults'\n", ")"]}, {"cell_type": "markdown", "metadata": {}, "source": ["Query training results, and validate that the RMSE error is within an acceptable range "]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["training_query_step = steps.compute.LambdaStep(\n", " 'Query Training Results',\n", " parameters={ \n", " \"FunctionName\": query_training_function_name,\n", " 'Payload':{\n", " \"TrainingJobName.$\": '$.TrainingJobName'\n", " }\n", " },\n", " result_path='$.QueryTrainingResults'\n", ")\n", "\n", "check_accuracy_fail_step = steps.states.Fail(\n", " 'Model Error Too Low',\n", " comment='RMSE accuracy higher than threshold'\n", ")\n", "\n", "check_accuracy_succeed_step = steps.states.Succeed('Model Error Acceptable')\n", "\n", "# TODO: Update query method to query validation error using better result path\n", "threshold_rule = steps.choice_rule.ChoiceRule.NumericLessThan(\n", " variable=training_query_step.output()['QueryTrainingResults']['Payload']['results']['TrainingMetrics'][0]['Value'], value=10\n", ")\n", "\n", "check_accuracy_step = steps.states.Choice(\n", " 'RMSE < 10'\n", ")\n", "\n", "check_accuracy_step.add_choice(rule=threshold_rule, next_step=check_accuracy_succeed_step)\n", "check_accuracy_step.default_choice(next_step=check_accuracy_fail_step)"]}, {"cell_type": "markdown", "metadata": {}, "source": ["### 3. Add the Error handling in the workflow\n", "\n", "We will use the [Catch Block](https://aws-step-functions-data-science-sdk.readthedocs.io/en/stable/states.html#stepfunctions.steps.states.Catch) to perform error handling. If the Processing Job Step or Training Step fails, the flow will go into failure state."]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["sagemaker_jobs = steps.states.Parallel(\"SageMaker Jobs\")\n", "sagemaker_jobs.add_branch(baseline_step)\n", "sagemaker_jobs.add_branch(steps.states.Chain([training_step, model_step, training_query_step, check_accuracy_step]))\n", "\n", "# Do we need specific failure for the jobs for group?\n", "sagemaker_jobs.add_catch(stepfunctions.steps.states.Catch(\n", " error_equals=[\"States.TaskFailed\"],\n", " next_step=stepfunctions.steps.states.Fail(\n", " \"SageMaker Jobs failed\", cause=\"SageMakerJobsFailed\"\n", " ),\n", "))"]}, {"cell_type": "markdown", "metadata": {}, "source": ["## Execute Training Workflow\n", "\n", "Create the training workflow."]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["training_workflow_definition = steps.states.Chain([\n", " create_experiment_step,\n", " sagemaker_jobs\n", "])\n", "\n", "training_workflow_name = 'mlops-{}-training'.format(model_name)\n", "training_workflow = Workflow(training_workflow_name, training_workflow_definition, workflow_role_arn)\n", "training_workflow.create()\n", "training_workflow"]}, {"cell_type": "markdown", "metadata": {}, "source": ["We can also inspect the raw workflow definition and verify the execution variables are correctly passed in"]}, {"cell_type": "code", "execution_count": null, "metadata": {"scrolled": true}, "outputs": [], "source": ["print(training_workflow.definition.to_json(pretty=True))"]}, {"cell_type": "markdown", "metadata": {}, "source": [" Now we define the inputs for the workflow"]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["# Define some dummy job and git params\n", "job_id = uuid.uuid1().hex\n", "git_branch = 'main'\n", "git_commit_hash = 'xxx' \n", "data_verison_id = 'yyy'\n", "\n", "# Define the experiment and trial name based on model name and job id\n", "experiment_name = \"mlops-{}\".format(model_name)\n", "trial_name = \"mlops-{}-{}\".format(model_name, job_id)\n", "\n", "workflow_inputs = {\n", " \"ExperimentName\": experiment_name,\n", " \"TrialName\": trial_name,\n", " \"GitBranch\": git_branch,\n", " \"GitCommitHash\": git_commit_hash, \n", " \"DataVersionId\": data_verison_id, \n", " \"BaselineJobName\": trial_name, \n", " \"BaselineOutputUri\": f\"s3://{bucket}/{model_name}/monitoring/baseline/mlops-{model_name}-pbl-{job_id}\",\n", " \"TrainingJobName\": trial_name,\n", " \"ModelName\": trial_name,\n", "}\n", "print(json.dumps(workflow_inputs))"]}, {"cell_type": "markdown", "metadata": {}, "source": ["Then execute the workflow"]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["execution = training_workflow.execute(\n", " inputs=workflow_inputs\n", ")"]}, {"cell_type": "markdown", "metadata": {}, "source": ["Wait for the execution to complete, and output the last step."]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["execution_output = execution.get_output(wait=True)\n", "execution.list_events()[-1]"]}, {"cell_type": "markdown", "metadata": {}, "source": ["Use [list_events](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/workflow.html#stepfunctions.workflow.Execution.list_events) to list all events in the workflow execution."]}, {"cell_type": "code", "execution_count": null, "metadata": {"scrolled": true}, "outputs": [], "source": ["# execution.list_events(html=True) # Bug"]}, {"cell_type": "markdown", "metadata": {}, "source": ["## Execute Batch Transform\n", "\n", "Take the model we have trained and run a batch transform on the validation dataset.\n"]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["execution_input = ExecutionInput(\n", " schema={\n", " \"GitBranch\": str,\n", " \"GitCommitHash\": str,\n", " \"DataVersionId\": str,\n", " \"ExperimentName\": str,\n", " \"TrialName\": str,\n", " \"ModelName\": str,\n", " \"TransformJobName\": str,\n", " \"MonitorJobName\": str,\n", " \"MonitorOutputUri\": str,\n", " }\n", ")"]}, {"cell_type": "markdown", "metadata": {}, "source": ["Define some new output paths for the transform and monitoring jobs"]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["output_data['TransformOutputUri'] = f\"s3://{bucket}/{model_name}/transform/mlops-{model_name}-{job_id}\"\n", "output_data['MonitoringOutputUri'] = f\"s3://{bucket}/{model_name}/monitoring/mlops-{model_name}-{job_id}\"\n", "output_data['BaselineOutputUri'] = workflow_inputs['BaselineOutputUri']"]}, {"cell_type": "markdown", "metadata": {}, "source": ["### 1. Run the Transform Job\n", "\n", "Define a transform job to take the test dataset as input. \n", "\n", "We can configured the batch transform to [associate prediction results](https://aws.amazon.com/blogs/machine-learning/associating-prediction-results-with-input-data-using-amazon-sagemaker-batch-transform/) with the input based in the `input_filter` and `output_filter` arguments."]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["transform_step = steps.TransformStep(\n", " 'Transform Input Dataset',\n", " transformer=xgb.transformer(\n", " instance_count=1,\n", " instance_type='ml.m5.large',\n", " assemble_with='Line', \n", " accept = 'text/csv',\n", " output_path=output_data['TransformOutputUri'], # NOTE: Can't use execution_input here\n", " ),\n", " job_name=execution_input['TransformJobName'], # TEMP\n", " model_name=execution_input['ModelName'], \n", " data=input_data['TestUri'],\n", " content_type='text/csv',\n", " split_type='Line',\n", " input_filter='$[1:]', # Skip the first target column output_amount\n", " join_source='Input',\n", " output_filter='$[1:]', # Output all inputs excluding output_amount, followed by the predicted_output_amount\n", " result_path='$.TransformJobResults'\n", ")"]}, {"cell_type": "markdown", "metadata": {}, "source": ["### 2. Add the Transform Header\n", "\n", "The batch transform output does not include the header, so add this back to be able to run baseline."]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["transform_file_name = 'test.csv'\n", "header = 'duration_minutes,passenger_count,trip_distance,total_amount'"]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["transform_header_step = steps.compute.LambdaStep(\n", " 'Add Transform Header',\n", " parameters={ \n", " \"FunctionName\": transform_header_function_name,\n", " 'Payload': {\n", " \"TransformOutputUri\": output_data['TransformOutputUri'],\n", " \"FileName\": transform_file_name,\n", " \"Header\": header,\n", " }\n", " },\n", " result_path='$.TransformHeaderResults'\n", ")"]}, {"cell_type": "markdown", "metadata": {}, "source": ["### 3. Run the Model Monitor Processing Job\n", "\n", "Create a model monitor processing job that takes the output of the transform job.\n", "\n", "Reference the `constraints.json` and `statistics.json` from the output form the training baseline."]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["dataset_format = DatasetFormat.csv()\n", "env = {\n", " \"dataset_format\": json.dumps(dataset_format),\n", " \"dataset_source\": \"/opt/ml/processing/input/baseline_dataset_input\",\n", " \"output_path\": \"/opt/ml/processing/output\",\n", " \"publish_cloudwatch_metrics\": \"Disabled\", # Have to be disabled from processing job?\n", " \"baseline_constraints\": \"/opt/ml/processing/baseline/constraints/constraints.json\",\n", " \"baseline_statistics\": \"/opt/ml/processing/baseline/stats/statistics.json\"\n", "}\n", "inputs = [\n", " ProcessingInput(\n", " source=os.path.join(output_data['TransformOutputUri'], transform_file_name), # Transform with header\n", " destination=\"/opt/ml/processing/input/baseline_dataset_input\",\n", " input_name=\"baseline_dataset_input\",\n", " ),\n", " ProcessingInput(\n", " source=os.path.join(output_data['BaselineOutputUri'], 'constraints.json'),\n", " destination=\"/opt/ml/processing/baseline/constraints\",\n", " input_name=\"constraints\",\n", " ),\n", " ProcessingInput(\n", " source=os.path.join(output_data['BaselineOutputUri'], 'statistics.json'),\n", " destination=\"/opt/ml/processing/baseline/stats\",\n", " input_name=\"baseline\",\n", " ),\n", "]\n", "outputs = [\n", " ProcessingOutput(\n", " source=\"/opt/ml/processing/output\",\n", " destination=output_data['MonitoringOutputUri'],\n", " output_name=\"monitoring_output\",\n", " ),\n", "]\n", "\n", "# Get the default model monitor container\n", "monor_monitor_container_uri = retrieve(region=region, framework=\"model-monitor\", version=\"latest\")\n", "\n", "# Use the base processing where we pass through the \n", "monitor_analyzer = Processor(\n", " image_uri=monor_monitor_container_uri,\n", " role=role, \n", " instance_count=1,\n", " instance_type=\"ml.m5.xlarge\",\n", " max_runtime_in_seconds=1800,\n", " env=env\n", ")"]}, {"cell_type": "markdown", "metadata": {}, "source": ["Test the monitor baseline"]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["# monitor_analyzer.run(inputs=inputs, outputs=outputs, wait=True)"]}, {"cell_type": "markdown", "metadata": {}, "source": ["Add the monitor step"]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["monitor_step = steps.sagemaker.ProcessingStep(\n", " \"Monitor Job\",\n", " processor=monitor_analyzer,\n", " job_name=execution_input[\"MonitorJobName\"],\n", " inputs=inputs,\n", " outputs=outputs,\n", " experiment_config={\n", " 'ExperimentName': execution_input[\"ExperimentName\"],\n", " 'TrialName': execution_input[\"TrialName\"],\n", " 'TrialComponentDisplayName': \"Baseline\",\n", " },\n", " tags={\n", " \"GitBranch\": execution_input[\"GitBranch\"],\n", " \"GitCommitHash\": execution_input[\"GitCommitHash\"],\n", " \"DataVersionId\": execution_input[\"DataVersionId\"],\n", " },\n", " result_path='$.MonitorJobResults'\n", ")\n", "\n", "monitor_step.add_catch(stepfunctions.steps.states.Catch(\n", " error_equals=[\"States.TaskFailed\"],\n", " next_step=stepfunctions.steps.states.Fail(\n", " \"Monitor failed\", cause=\"SageMakerMonitorJobFailed\"\n", " ),\n", "))"]}, {"cell_type": "markdown", "metadata": {}, "source": ["Add the lambda step to query for violations in the processing job."]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["monitor_query_step = steps.compute.LambdaStep(\n", " 'Query Monitoring Results',\n", " parameters={ \n", " \"FunctionName\": query_drift_function_name,\n", " 'Payload':{\n", " \"ProcessingJobName.$\": '$.MonitorJobName'\n", " }\n", " },\n", " result_path='$.QueryMonitorResults'\n", ")\n", "\n", "check_violations_fail_step = steps.states.Fail(\n", " 'Completed with Violations',\n", " comment='Processing job completed with violations'\n", ")\n", "\n", "check_violations_succeed_step = steps.states.Succeed('Completed')\n", "\n", "# TODO: Check specific drift in violations\n", "status_rule = steps.choice_rule.ChoiceRule.StringEquals(\n", " variable=monitor_query_step.output()['QueryMonitorResults']['Payload']['results']['ProcessingJobStatus'], value='Completed'\n", ")\n", "\n", "check_violations_step = steps.states.Choice(\n", " 'Check Violations'\n", ")\n", "\n", "check_violations_step.add_choice(rule=status_rule, next_step=check_violations_succeed_step)\n", "check_violations_step.default_choice(next_step=check_violations_fail_step)"]}, {"cell_type": "markdown", "metadata": {}, "source": ["Create the transform workflow definition"]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["transform_workflow_definition = steps.states.Chain([\n", " transform_step,\n", " transform_header_step,\n", " monitor_step, \n", " monitor_query_step, \n", " check_violations_step\n", "])\n", "\n", "transform_workflow_name = 'mlops-{}-transform'.format(model_name)\n", "transform_workflow = Workflow(transform_workflow_name, transform_workflow_definition, workflow_role_arn)\n", "transform_workflow.create()\n", "transform_workflow"]}, {"cell_type": "markdown", "metadata": {}, "source": ["Define the workflow inputs based on the previous training run"]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["# Define unique names for the transform and monitor baseline jobs\n", "transform_job_name = \"mlops-{}-trn-{}\".format(model_name, job_id)\n", "monitor_job_name = \"mlops-{}-mbl-{}\".format(model_name, job_id)\n", "\n", "workflow_inputs = {\n", " \"ExperimentName\": experiment_name,\n", " \"TrialName\": trial_name,\n", " \"GitBranch\": git_branch,\n", " \"GitCommitHash\": git_commit_hash, \n", " \"DataVersionId\": data_verison_id, \n", " \"ModelName\": trial_name,\n", " \"TransformJobName\": transform_job_name, \n", " \"MonitorJobName\": monitor_job_name,\n", "}\n", "print(json.dumps(workflow_inputs))"]}, {"cell_type": "markdown", "metadata": {}, "source": ["Execute the workflow and render the progress. "]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["execution = transform_workflow.execute(\n", " inputs=workflow_inputs\n", ")"]}, {"cell_type": "markdown", "metadata": {}, "source": ["Wait for the execution to finish and list the last event."]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["execution_output = execution.get_output(wait=True)\n", "execution.list_events()[-1]"]}, {"cell_type": "markdown", "metadata": {}, "source": ["### Inspect Transform Results\n", "\n", "Verify that we can load the transform output with header"]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["from io import StringIO\n", "import pandas as pd\n", "from sagemaker.s3 import S3Downloader\n", "\n", "# Get the output, and add header\n", "transform_output_uri = os.path.join(output_data['TransformOutputUri'], transform_file_name)\n", "transform_body = S3Downloader.read_file(transform_output_uri)\n", "pred_df = pd.read_csv(StringIO(transform_body), sep=\",\")\n", "pred_df.head()"]}, {"cell_type": "markdown", "metadata": {}, "source": ["### Query monitoring output\n", "\n", "If this completed with violations, let's inspect the output to see why that is the case."]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["violiations_uri = os.path.join(output_data['MonitoringOutputUri'], 'constraint_violations.json')\n", "violiations = json.loads(S3Downloader.read_file(violiations_uri))\n", "violiations"]}, {"cell_type": "markdown", "metadata": {}, "source": ["## Cleanup\n", "\n", "Delete the workflows that we created as part of this notebook"]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["training_workflow.delete()\n", "transform_workflow.delete()"]}], "metadata": {"instance_type": "ml.t3.medium", "kernelspec": {"display_name": "conda_python3", "language": "python", "name": "conda_python3"}, "language_info": {"codemirror_mode": {"name": "ipython", "version": 3}, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.10"}}, "nbformat": 4, "nbformat_minor": 4}