{ "cells": [ { "cell_type": "code", "execution_count": null, "id": "c74b826e-39b9-4490-a8a2-9d99564a87bf", "metadata": {}, "outputs": [], "source": [ "# Import necessary libraries \n", "\n", "import kfp\n", "from kfp import components\n", "from kubeflow.training.utils import utils\n", "from kfp import dsl\n", "from kfp import compiler\n", "\n", "import os\n", "import yaml\n", "import json\n", "from kubeflow.training import PyTorchJobClient\n", "import time\n", "import boto3\n", "import kfp.components as comp\n", "\n", "import boto3\n", "import random, string\n", "from datetime import datetime" ] }, { "cell_type": "code", "execution_count": null, "id": "d06d13fe-f145-460d-b11c-54967b8f4c6f", "metadata": {}, "outputs": [], "source": [ "# Read PyTorch Operator master and worker from the YAML file\n", "# Pipeline specs are created in notebook '1_submit_pytorchdist_k8s.ipynb'. Please ensure you have run it. Alternatively, you can create the specs manually. \n", "\n", "with open(\"pipeline_yaml_specifications/pipeline_master_spec.yml\", 'r') as master_stream:\n", " master_spec_loaded = yaml.safe_load(master_stream)\n", " \n", "with open(\"pipeline_yaml_specifications/pipeline_worker_spec.yml\", 'r') as worker_stream:\n", " worker_spec_loaded = yaml.safe_load(worker_stream)" ] }, { "cell_type": "code", "execution_count": null, "id": "f665a050-218f-46f0-91a3-a0e40f62423b", "metadata": {}, "outputs": [], "source": [ "#Change the ecr image url below eg <'458473390725.dkr.ecr.us-west-2.amazonaws.com\\/kserve_layout'>\n", "!sed -i \"s/{image_url}/'458473390725.dkr.ecr.us-west-2.amazonaws.com\\/kserve_layout:latest'/g\" pipeline_components/kserve_layout_component.yaml" ] }, { "cell_type": "code", "execution_count": null, "id": "d884987d-ea54-4369-9570-6a6497ce1ff8", "metadata": {}, "outputs": [], "source": [ "# Initialize global variables \n", "user_namespace = utils.get_default_target_namespace()\n", "\n", "# Loads PyTorch Training Operator, KServe Operator and KServe Model Layout component from the File\n", "pytorch_job_op = components.load_component_from_file('pipeline_components/pytorch_component.yaml')\n", "unix_model_layout_op = components.load_component_from_file('pipeline_components/kserve_layout_component.yaml')\n", "kserve_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/master/components/kserve/component.yaml')" ] }, { "cell_type": "code", "execution_count": null, "id": "95fdaa93-e347-4f7e-9700-bcb51d1094c3", "metadata": {}, "outputs": [], "source": [ "# Same bucket name that we used in notebook 1_submit_pytorchdist_k8s.ipynb\n", "s3_bucket_name='kserve-model-20230527042622'\n", "\n", "# Set KServe InferenceEndpoint\n", "timsestamp = datetime.now().strftime('%Y%m%d%H%M%S')\n", "kserve_model_endpoint_name='image-classify-' + timsestamp\n", "\n", "kserve_model_endpoint_name" ] }, { "cell_type": "code", "execution_count": null, "id": "ea4154b6-5c34-41f8-a92a-b9f9ee1edda4", "metadata": {}, "outputs": [], "source": [ "# Create job name for tracking kuberenets PyTorchJob custom resource or SageMaker training job\n", "pytorch_distributed_jobname=f'pytorch-cnn-dist-job-{time.strftime(\"%Y-%m-%d-%H-%M-%S-%j\", time.gmtime())}'\n", "\n", "\n", "# Create Hybrid Pipeline using Kubeflow PyTorch Training Operators and Amazon SageMaker Service\n", "@dsl.pipeline(name=\"PyTorch Training pipeline\", description=\"Sample training job test\")\n", "def pytorch_cnn_pipeline(action='apply',\n", " model_name=kserve_model_endpoint_name,\n", " model_uri=f's3://{kserve_s3_bucket_name}',\n", " framework='pytorch',\n", " region='us-west-2',\n", " training_job_name=pytorch_distributed_jobname,\n", " namespace=user_namespace,\n", " ):\n", " \n", " train_task = pytorch_job_op(\n", " name=training_job_name, \n", " namespace=user_namespace, \n", " master_spec=json.dumps(master_spec_loaded), # Please refer file at pipeline_yaml_specifications/pipeline_master_spec.yml\n", " worker_spec=json.dumps(worker_spec_loaded), # Please refer file at pipeline_yaml_specifications/pipeline_worker_spec.yml\n", " delete_after_done=False\n", " )\n", " \n", " unix_model_layout=unix_model_layout_op(\n", " bucket=kserve_s3_bucket_name, \n", " model_input='model_kserve.pth', \n", " model_archive_name='cifar').after(train_task)\n", "\n", " kserve_deploy=kserve_op(action=action,\n", " model_name=model_name,\n", " model_uri=model_uri,\n", " namespace=namespace,\n", " framework=framework).after(unix_model_layout)\n", " \n", " \n", " #Disable pipeline cache \n", " train_task.execution_options.caching_strategy.max_cache_staleness = \"P0D\"\n", " unix_model_layout.execution_options.caching_strategy.max_cache_staleness = \"P0D\"\n", " kserve_deploy.execution_options.caching_strategy.max_cache_staleness = \"P0D\"" ] }, { "cell_type": "code", "execution_count": null, "id": "3f977eda-3e71-40f7-baed-745fcf4f4e84", "metadata": {}, "outputs": [], "source": [ "# DSL Compiler that compiles pipeline functions into workflow yaml.\n", "\n", "kfp.compiler.Compiler().compile(pytorch_cnn_pipeline, \"pytorch_cnn_pipeline.yaml\")" ] }, { "cell_type": "code", "execution_count": null, "id": "2fcb0cb7-f4d7-4238-8a50-f74d0ca39dd4", "metadata": {}, "outputs": [], "source": [ "# Connect to Kubeflow Pipelines using the Kubeflow Pipelines SDK client\n", "client = kfp.Client()\n", "\n", "experiment = client.create_experiment(name=\"kubeflow\")\n", "\n", "# Run a specified pipeline \n", "my_run = client.run_pipeline(experiment.id, \"pytorch_cnn_pipeline\", \"pytorch_cnn_pipeline.yaml\")\n", "\n", "# Please click “Run details” link generated below this cell to view your pipeline. You can click every pipeline step to see logs. " ] }, { "cell_type": "code", "execution_count": null, "id": "047b07db-6378-4691-98f5-10f006234429", "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.13" } }, "nbformat": 4, "nbformat_minor": 5 }