{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# SageMaker 모델 빌드 파이프라인을 이용한 모델 빌드 오케스트레이션\n", "\n", "Amazon SageMaker Model building pipeline은 머신러닝 애플리케이션을 개발하는 개발자와 운영 엔지니어들에게 SageMaker작업과 재생산가능한 머신러닝 파이프라인을 오케스트레이션하는 기능을 제공합니다. 또한 커스텀빌드된 모델을 저지연(low latency) 실시간 추론환경이나 배치변환을 통한 추론 실행환경으로 배포하거나, 생성된 아티팩트의 계보(lineage)를 추적하는 기능을 제공합니다. 이 기능들을 통해 모델 아티팩트를 배포하고, 업무환경에서의 워크플로우를 배포/모니터링하고, 간단한 인터페이스를 통해 아티팩트의 계보 추적하고, 머신러닝 애플리케이션 개발의 베스트 프렉티스를 도입하여, 보다 안정적인 머신러닝 애플리케이션 운영환경을 구현할 수 있습니다. \n", "\n", "SageMaker pipeline 서비스는 JSON 선언으로 구현된 SageMaker Pipeline DSL(Domain Specific Language, 도메인종속언어)를 지원합니다. 이 DSL은 파이프라인 파라마터와 SageMaker 작업단계의 DAG(Directed Acyclic Graph)를 정의합니다. SageMaker Python SDK를 이용하면 이 파이프라인 DSL의 생성을 보다 간편하게 할 수 있습니다. \n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## SageMaker 파이프라인 소개\n", "\n", "SageMaker 파이프라인은 다음 기능을 지원하며 본 노트북에서 하나씩 다루게 됩니다. \n", "\n", "* Pipelines - Workflow DAG. SageMaker 작업과 리소스 생성을 조율하는 단계와 조건을 가진다. \n", "* Processing job steps - 데이터처러 워크로드를 실행하기 위한 SageMaker의 관리형 기능. Feature engineering, 데이터 검증, 모델 평가, 모델 해석 등에 주로 사용됨 \n", "* Training job steps - 학습작업. 모델에게 학습데이터셋을 이용하여 모델에게 예측을 하도록 학습시키는 반복적인 작업 \n", "* Conditional execution steps - 조건별 실행분기. 파이프라인을 분기시키는 역할.\n", "* Register model steps - 학습이 완료된 모델패키지 리소스를 이후 배포를 위한 모델 레지스트리에 등록하기 \n", "* Create model steps - 추론 엔드포인트 또는 배치 추론을 위한 모델의 생성 \n", "* Transform job steps - 배치추론 작업. 배치작업을 이용하여 노이즈, bias의 제거 등 데이터셋을 전처리하고 대량데이터에 대해 추론을 실행하는 단계\n", "* Parametrized Pipeline executions - 특정 파라미터에 따라 파이프라인 실행방식을 변화시키기 \n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 노트북 오버뷰\n", "\n", "본 노트북을 통해 다음 내용을 확인할 수 있습니다. :\n", "\n", "* Define a set of Pipeline parameters that can be used to parametrize a SageMaker Pipeline.\n", "* Define a Processing step that performs cleaning, feature engineering, and splitting the input data into train and test data sets.\n", "* Define a Training step that trains a model on the preprocessed train data set.\n", "* Define a Processing step that evaluates the trained model's performance on the test dataset.\n", "* Define a Create Model step that creates a model from the model artifacts used in training.\n", "* Define a Transform step that performs batch transformation based on the model that was created.\n", "* Define a Register Model step that creates a model package from the estimator and model artifacts used to train the model.\n", "* Define a Conditional step that measures a condition based on output from prior steps and conditionally executes other steps.\n", "* Define and create a Pipeline definition in a DAG, with the defined parameters and steps.\n", "* Start a Pipeline execution and wait for execution to complete.\n", "* Download the model evaluation report from the S3 bucket for examination.\n", "* Start a second Pipeline execution." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## SageMaker 파이프라인 \n", "\n", "데이터전처리, 학습, 평가, 모델생성, 배치변환 추론, 모델의 등록으로 이루어지는 전형적인 머신러닝 애플리케이션 패턴을 위한 파이프라인을 생성해 보겠습니다.\n", "\n", "![A typical ML Application pipeline](img/pipeline-full.png)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 데이터셋\n", "\n", "본 노트북에서는 [UCI Machine Learning Abalone Dataset](https://archive.ics.uci.edu/ml/datasets/abalone) [1] 데이터셋을 사용합니다. 이 작업의 목표는 전복의 물리적인 몇가지 측정을 이용하여 전복의 나이를 결정하는 것이며, 기본적으로 회귀문제입니다.\n", "\n", "데이터셋은 다음 몇가지 특성을 가집니다: length(조개껍질에서 가장 긴 길이), diameter(length에 직교하는 길이), height(조개에서 고기부분의 높이), whole_weight(전체 전복의 무게), shucked_weight(고기부분의 무게), viscera_weight(피를 뺀 내장 무게), shell_weight(건조된 후의 껍질 무게), sex(성별, 'M', 'F', 'I', I는 Infant), ring(목표변수, 정수값)\n", "\n", "링의 개수는 나이에 대한 매우 정확한 추측입니다. (나이 = 링개수 + 1.5) 하지만, 이 숫자를 얻기 위해서는 조개껍질을 뿔면을 따라 잘라서 단면을 염색하고, 현미경을 통하여 링의 수를 세어야 합니다. 이 과정은 매우 시간이 많이 소요되므로 다른 물리적 특성을 통해 나이를 측정할 수 있다면 매우 효과적일 것입니다. 우리는 이들 물리 특성들과 함께 측정된 데이터를 이용하여 링의 개수를 예측할 수 있는 모델을 만들고자 합니다. \n", "\n", "데이터셋을 S3버킷에 업로드하기 전에, 본 노트북에서 계속 사용할 상수값을 정의하겠습니다.\n", "\n", "[1] Dua, D. and Graff, C. (2019). [UCI Machine Learning Repository](http://archive.ics.uci.edu/ml). Irvine, CA: University of California, School of Information and Computer Science." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import boto3\n", "import sagemaker\n", "\n", "\n", "region = boto3.Session().region_name\n", "sagemaker_session = sagemaker.session.Session()\n", "role = sagemaker.get_execution_role()\n", "default_bucket = sagemaker_session.default_bucket()\n", "model_package_group_name = f\"AbaloneModelPackageGroupName\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "이제 데이터를 디폴트 버킷으로 업로드합니다. `input_data_uri` 변수를 통해 데이터셋의 위치를 저장하였습니다." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!mkdir -p data" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "local_path = \"data/abalone-dataset.csv\"\n", "\n", "s3 = boto3.resource(\"s3\")\n", "s3.Bucket(f\"sagemaker-servicecatalog-seedcode-{region}\").download_file(\n", " \"dataset/abalone-dataset.csv\",\n", " local_path\n", ")\n", "\n", "base_uri = f\"s3://{default_bucket}/abalone\"\n", "input_data_uri = sagemaker.s3.S3Uploader.upload(\n", " local_path=local_path, \n", " desired_s3_uri=base_uri,\n", ")\n", "print(input_data_uri)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "모델 생성 후 배치변환 추론을 실행할 때 사용할 두번째 데이터셋을 다운로드 합니다. 본 파일의 경로를 `batch_data_uri`에 저장합니다. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "local_path = \"data/abalone-dataset-batch\"\n", "\n", "s3 = boto3.resource(\"s3\")\n", "s3.Bucket(f\"sagemaker-servicecatalog-seedcode-{region}\").download_file(\n", " \"dataset/abalone-dataset-batch\",\n", " local_path\n", ")\n", "\n", "base_uri = f\"s3://{default_bucket}/abalone\"\n", "batch_data_uri = sagemaker.s3.S3Uploader.upload(\n", " local_path=local_path, \n", " desired_s3_uri=base_uri,\n", ")\n", "print(batch_data_uri)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 파이프라인 실행을 위한 파라미터 정의 \n", "\n", "파이프라인에서 사용할 파이프라인 파라미터를 정의합니다. 파이프라인을 스케줄하고 실행할 때 파라미터를 이용하여 실행조건을 커스마이징할 수 있습니다. 파라미터를 이용하면 파이프라인 실행시마다 매번 파이프라인 정의를 수정하지 않아도 됩니다.\n", "\n", "지원되는 파라미터 타입은 다음과 같습니다:\n", "\n", "* `ParameterString` - 파이썬 타입에서 `str` \n", "* `ParameterInteger` - 파이썬 타입에서 `int` \n", "* `ParameterFloat` - 파이썬 타입에서 `float` \n", "\n", "이들 파라미터를 정의할 때 디폴트 값을 지정할 수 있으며 파이프라인 실행시 재지정할 수도 있습니다. 지정하는 디폴트 값은 파라미터 타입과 일치하여야 합니다.\n", "\n", "본 노트북에서 사용하는 파라미터는 다음과 같습니다.\n", "\n", "* `processing_instance_type` - 프로세싱 작업에서 사용할 `ml.*` 인스턴스 타입 \n", "* `processing_instance_count` - 프로세싱 작업에서 사용할 인스턴스 개수 \n", "* `training_instance_type` - 학습작업에서 사용할 `ml.*` 인스턴스 타입\n", "* `model_approval_status` - 학습된 모델을 CI/CD를 목적으로 등록할 때의 승인 상태 (디폴트는 \"PendingManualApproval\")\n", "* `input_data` - 입력데이터에 대한 S3 버킷 URI\n", "* `batch_data` - 배치작업용 데이터에 대한 S3 버킷 URI" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.workflow.parameters import (\n", " ParameterInteger,\n", " ParameterString,\n", ")\n", "\n", "\n", "processing_instance_count = ParameterInteger(\n", " name=\"ProcessingInstanceCount\",\n", " default_value=1\n", ")\n", "processing_instance_type = ParameterString(\n", " name=\"ProcessingInstanceType\",\n", " default_value=\"ml.m5.xlarge\"\n", ")\n", "training_instance_type = ParameterString(\n", " name=\"TrainingInstanceType\",\n", " default_value=\"ml.m5.xlarge\"\n", ")\n", "model_approval_status = ParameterString(\n", " name=\"ModelApprovalStatus\",\n", " default_value=\"PendingManualApproval\"\n", ")\n", "input_data = ParameterString(\n", " name=\"InputData\",\n", " default_value=input_data_uri,\n", ")\n", "batch_data = ParameterString(\n", " name=\"BatchData\",\n", " default_value=batch_data_uri,\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "![Define Parameters](img/pipeline-1.png)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 특성공학(Feature Engineering)을 위한 프로세싱 단계 정의 \n", "\n", "먼저 프로세싱 단계에서 사용할 전처리 스크립트를 작성합니다.\n", "\n", "다음 노트북 셀은 `abalone`디렉토리를 만들고 그 위치에 `preprocessing.py`파일을 생성합니다. 셀에서 스크립트 내용을 수정한 후 셀을 실행하면 파일이 업데이트될 것입니다. 제공된 스크립트는 `scikit-learn`를 이용하여 다음 작업을 실행합니다:\n", "\n", "* sex 카테고리 데이터의 결측값을 채우고 학습에 적합한 형태로 인코딩합니다.\n", "* sex와 ring을 제외한 수치형 변수를 스케일링하고 정규화합니다. \n", "* 데이터를 학습, 검증, 테스트셋으로 나눕니다.\n", "\n", "프로세싱 단계에서 이 스크립트를 이용하여 입력데이터에 대한 처리를 실행할 것입니다. 그리고 프로세싱 단계의 결과 중 학습데이터는 학습단계에서 모델을 학습할 때 사용될 것입니다. 테스트 데이터셋은 평가 단계에서 이용될 것이며 학습된 모델을 이용하여 모델 평가를 위한 지표를 생성하게 됩니다. \n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!mkdir -p abalone" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%writefile abalone/preprocessing.py\n", "import argparse\n", "import os\n", "import requests\n", "import tempfile\n", "\n", "import numpy as np\n", "import pandas as pd\n", "\n", "from sklearn.compose import ColumnTransformer\n", "from sklearn.impute import SimpleImputer\n", "from sklearn.pipeline import Pipeline\n", "from sklearn.preprocessing import StandardScaler, OneHotEncoder\n", "\n", "\n", "# Since we get a headerless CSV file we specify the column names here.\n", "feature_columns_names = [\n", " \"sex\",\n", " \"length\",\n", " \"diameter\",\n", " \"height\",\n", " \"whole_weight\",\n", " \"shucked_weight\",\n", " \"viscera_weight\",\n", " \"shell_weight\",\n", "]\n", "label_column = \"rings\"\n", "\n", "feature_columns_dtype = {\n", " \"sex\": str,\n", " \"length\": np.float64,\n", " \"diameter\": np.float64,\n", " \"height\": np.float64,\n", " \"whole_weight\": np.float64,\n", " \"shucked_weight\": np.float64,\n", " \"viscera_weight\": np.float64,\n", " \"shell_weight\": np.float64\n", "}\n", "label_column_dtype = {\"rings\": np.float64}\n", "\n", "\n", "def merge_two_dicts(x, y):\n", " z = x.copy()\n", " z.update(y)\n", " return z\n", "\n", "\n", "if __name__ == \"__main__\":\n", " base_dir = \"/opt/ml/processing\"\n", "\n", " df = pd.read_csv(\n", " f\"{base_dir}/input/abalone-dataset.csv\",\n", " header=None, \n", " names=feature_columns_names + [label_column],\n", " dtype=merge_two_dicts(feature_columns_dtype, label_column_dtype)\n", " )\n", " numeric_features = list(feature_columns_names)\n", " numeric_features.remove(\"sex\")\n", " numeric_transformer = Pipeline(\n", " steps=[\n", " (\"imputer\", SimpleImputer(strategy=\"median\")),\n", " (\"scaler\", StandardScaler())\n", " ]\n", " )\n", "\n", " categorical_features = [\"sex\"]\n", " categorical_transformer = Pipeline(\n", " steps=[\n", " (\"imputer\", SimpleImputer(strategy=\"constant\", fill_value=\"missing\")),\n", " (\"onehot\", OneHotEncoder(handle_unknown=\"ignore\"))\n", " ]\n", " )\n", "\n", " preprocess = ColumnTransformer(\n", " transformers=[\n", " (\"num\", numeric_transformer, numeric_features),\n", " (\"cat\", categorical_transformer, categorical_features)\n", " ]\n", " )\n", " \n", " y = df.pop(\"rings\")\n", " X_pre = preprocess.fit_transform(df)\n", " y_pre = y.to_numpy().reshape(len(y), 1)\n", " \n", " X = np.concatenate((y_pre, X_pre), axis=1)\n", " \n", " np.random.shuffle(X)\n", " train, validation, test = np.split(X, [int(.7*len(X)), int(.85*len(X))])\n", "\n", " \n", " pd.DataFrame(train).to_csv(f\"{base_dir}/train/train.csv\", header=False, index=False)\n", " pd.DataFrame(validation).to_csv(f\"{base_dir}/validation/validation.csv\", header=False, index=False)\n", " pd.DataFrame(test).to_csv(f\"{base_dir}/test/test.csv\", header=False, index=False)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "다음으로 프로세싱을 위한 `SKLearnProcessor`의 인스턴스를 생성합니다. 이 인스턴스는 `ProcessingStep`에서 사용합니다.\n", "\n", "본 노트북에서 계속 사용할 `framework_version`값을 지정합니다.\n", "\n", "`sklearn_processor` 인스턴스의 생성시 `processing_instance_type`과 `processing_instance_count` 파라미터가 사용된 것을 확인합니다.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.sklearn.processing import SKLearnProcessor\n", "\n", "\n", "framework_version = \"0.23-1\"\n", "\n", "sklearn_processor = SKLearnProcessor(\n", " framework_version=framework_version,\n", " instance_type=processing_instance_type,\n", " instance_count=processing_instance_count,\n", " base_job_name=\"sklearn-abalone-process\",\n", " role=role,\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "프로세싱 단계의 마지막으로, 방금 생성한 프로세서 인스턴스와 입출력 채널을 지정하고 `ProcessingStep`을 생성합니다. `code`값을 통해 앞서 지정한 파일을 전달하고 있습니다. 파이프라인이 실행되면 해당 파이썬 스크립트가 실행될 것입니다. 이 단계는 SageMaker 파이썬 SDK에서 Processor 인스턴스의 `run`명령을 실행하는 것과 유사합니다.\n", "\n", "`ProcessingStep`생성시 사용할 입력데이터 지정을 위해 `input_data`파라미터를 이용하는 것을 확인합니다. 인스턴스 실행시 이 파라미터에 지정된 데이터를 이용하여 처리가 실행될 것입니다.\n", "\n", "또, `\"train\"`, `\"validation\"`, `\"test\"` 채널을 통해 프로세싱 작업의 출력이 설정된 것을 확인합니다. 이어지는 단계에서 `properties`속성을 통해 이 값을 참조하고 사용하게 됩니다. 구체적으로, 학습단계를 정의하는 시점에 사용될 것입니다.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.processing import ProcessingInput, ProcessingOutput\n", "from sagemaker.workflow.steps import ProcessingStep\n", " \n", "\n", "step_process = ProcessingStep(\n", " name=\"AbaloneProcess\",\n", " processor=sklearn_processor,\n", " inputs=[\n", " ProcessingInput(source=input_data, destination=\"/opt/ml/processing/input\"), \n", " ],\n", " outputs=[\n", " ProcessingOutput(output_name=\"train\", source=\"/opt/ml/processing/train\"),\n", " ProcessingOutput(output_name=\"validation\", source=\"/opt/ml/processing/validation\"),\n", " ProcessingOutput(output_name=\"test\", source=\"/opt/ml/processing/test\")\n", " ],\n", " code=\"abalone/preprocessing.py\",\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "![Define a Processing Step for Feature Engineering](img/pipeline-2.png)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 모델 학습을 위한 학습단계 정의 \n", "\n", "본 단계에서는 SageMaker의 [XGBoost](https://docs.aws.amazon.com/sagemaker/latest/dg/xgboost.html) 알고리즘을 이용하여 학습을 진행할 것입니다. XGBoost 알고리즘을 이용하도록 Estimator를 구성합니다. 보편적인 학습스크립트를 이용하여 입력 채널에서 정의한 학습데이터를 로드하고, 하이퍼파라미터 설정을 통해 학습을 설정하고, 모델을 학습한 후 `model_dir`경로에 학습된 모델을 저장합니다. 저장된 모델은 이후 호스팅을 위해 사용됩니다. \n", "\n", "학습된 모델이 추출되어 저장될 경로 또한 명시되었습니다. \n", "\n", "`training_instance_type`파라미터가 사용된 것을 확인합니다. 이 값은 본 예제의 파이프라인에서 여러번 사용됩니다. 본 단계에서는 estimator를 선언할 때 전달되었습니다. \n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.estimator import Estimator\n", "\n", "\n", "model_path = f\"s3://{default_bucket}/AbaloneTrain\"\n", "image_uri = sagemaker.image_uris.retrieve(\n", " framework=\"xgboost\",\n", " region=region,\n", " version=\"1.0-1\",\n", " py_version=\"py3\",\n", "# instance_type=training_instance_type,\n", ")\n", "xgb_train = Estimator(\n", " image_uri=image_uri,\n", " instance_type=training_instance_type,\n", " instance_count=1,\n", " output_path=model_path,\n", " role=role,\n", ")\n", "xgb_train.set_hyperparameters(\n", " objective=\"reg:linear\",\n", " num_round=50,\n", " max_depth=5,\n", " eta=0.2,\n", " gamma=4,\n", " min_child_weight=6,\n", " subsample=0.7,\n", " silent=0\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "학습단계의 마지막으로 방금 정의한 estimator를 이용하여 `TrainingStep`을 생성합니다. `TrainingStep`의 입력데이터 채널을 정의할 때 `ProcessingStep`단계의 `properties`를 이용하여 값을 가져오고 있습니다. 해당 값은 파이프라인의 실행요청이 발생할 때 처리될 것입니다. 이 단계는 SageMaker 파이썬 SDK의 estimator에서 `fit`명령과 유사합니다.\n", "\n", "파이프라인 설정시 `\"train\"` 출력 채널의 `S3Uri`를 이용하여 `TrainingStep`에 데이터를 지정하였습니다. 마찬가지로 `\"test\"`출력 채널의 값을 이용하여 모델 검증용 데이터를 지정하였습니다. 각 파이프라인 단계에서 `properties` 속성은 참조값으로 설정 후 실행단계에서 지정되며, 각각 파이썬 SDK상에서 대응되는 오브젝트의 describe호출의 응답의 속성과 동일합니다. 예를 들어 본 단계에서 `ProcessingStep`의 `properties` 속성은 [DescribeProcessingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeProcessingJob.html) API의 응답 결과와 동일합니다.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.inputs import TrainingInput\n", "from sagemaker.workflow.steps import TrainingStep\n", "\n", "\n", "step_train = TrainingStep(\n", " name=\"AbaloneTrain\",\n", " estimator=xgb_train,\n", " inputs={\n", " \"train\": TrainingInput(\n", " s3_data=step_process.properties.ProcessingOutputConfig.Outputs[\n", " \"train\"\n", " ].S3Output.S3Uri,\n", " content_type=\"text/csv\"\n", " ),\n", " \"validation\": TrainingInput(\n", " s3_data=step_process.properties.ProcessingOutputConfig.Outputs[\n", " \"validation\"\n", " ].S3Output.S3Uri,\n", " content_type=\"text/csv\"\n", " )\n", " },\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "![Define a Training Step to Train a Model](img/pipeline-3.png)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 학습모델을 평가하기 위한 모델 평가단계 정의 \n", "\n", "먼저 모델 평가용 프로세싱 단계에서 질행할 평가용 스크립트를 작성합니다. \n", "\n", "파이프라인 실행 후 `evaluation.json`파일을 통해 평가결과를 확인할 수 있습니다.\n", "\n", "평가 스크립트는 `xgboost`를 사용하고 다음을 실행합니다.\n", "\n", "* 모델을 로드합니다. \n", "* 테스트 데이터를 읽습니다. \n", "* 테스트 데이터에 대한 예측을 실행합니다. \n", "* 정확도, ROC곡선 등을 포함하는 분류보고서를 작성합니다. \n", "* 평가 디렉토리에 평가보고서를 저장합니다. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%writefile abalone/evaluation.py\n", "import json\n", "import pathlib\n", "import pickle\n", "import tarfile\n", "\n", "import joblib\n", "import numpy as np\n", "import pandas as pd\n", "import xgboost\n", "\n", "from sklearn.metrics import mean_squared_error\n", "\n", "\n", "if __name__ == \"__main__\":\n", " model_path = f\"/opt/ml/processing/model/model.tar.gz\"\n", " with tarfile.open(model_path) as tar:\n", " tar.extractall(path=\".\")\n", " \n", " model = pickle.load(open(\"xgboost-model\", \"rb\"))\n", "\n", " test_path = \"/opt/ml/processing/test/test.csv\"\n", " df = pd.read_csv(test_path, header=None)\n", " \n", " y_test = df.iloc[:, 0].to_numpy()\n", " df.drop(df.columns[0], axis=1, inplace=True)\n", " \n", " X_test = xgboost.DMatrix(df.values)\n", " \n", " predictions = model.predict(X_test)\n", "\n", " mse = mean_squared_error(y_test, predictions)\n", " std = np.std(y_test - predictions)\n", " report_dict = {\n", " \"regression_metrics\": {\n", " \"mse\": {\n", " \"value\": mse,\n", " \"standard_deviation\": std\n", " },\n", " },\n", " }\n", "\n", " output_dir = \"/opt/ml/processing/evaluation\"\n", " pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)\n", " \n", " evaluation_path = f\"{output_dir}/evaluation.json\"\n", " with open(evaluation_path, \"w\") as f:\n", " f.write(json.dumps(report_dict))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "다음으로, `ScriptProcessor` 프로세서의 인스턴스를 생성합니다. 이 인스턴스는 `ProcessingStep`에서 사용할 것입니다.\n", "\n", "프로세서 정의시 `processing_instance_type` 파라미터를 이용하는 것을 확인합니다. \n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.processing import ScriptProcessor\n", "\n", "\n", "script_eval = ScriptProcessor(\n", " image_uri=image_uri,\n", " command=[\"python3\"],\n", " instance_type=processing_instance_type,\n", " instance_count=1,\n", " base_job_name=\"script-abalone-eval\",\n", " role=role,\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "방금 생성한 processor 인스턴스를 이용하여 `ProcessingStep`을 생성합니다. 프로세싱의 입력 및 출력 채널과 파이프라인 실행시 호출될 스크립트 코드를 정의합니다. 이 단계는 SageMaker 파이썬 SDK에서 프로세싱 인스턴스의 `run`명령을 실행하는 것과 유사합니다.\n", "\n", "`step_train`의 `properties`속성 중 `S3ModelArtifacts`와 `step_process`의 `properties` 속성 중 `\"test\"` 출력 채널을 이용하여 프로세싱 단계의 입력을 정의하였습니다. `TrainingStep`과 `ProcessingStep`의 `properties` 속성은 각각 [DescribeTrainingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeTrainingJob.html) API와 [DescribeProcessingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeProcessingJob.html) API의 응답 오브젝트와 동일합니다.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.workflow.properties import PropertyFile\n", "\n", "\n", "evaluation_report = PropertyFile(\n", " name=\"EvaluationReport\",\n", " output_name=\"evaluation\",\n", " path=\"evaluation.json\"\n", ")\n", "step_eval = ProcessingStep(\n", " name=\"AbaloneEval\",\n", " processor=script_eval,\n", " inputs=[\n", " ProcessingInput(\n", " source=step_train.properties.ModelArtifacts.S3ModelArtifacts,\n", " destination=\"/opt/ml/processing/model\"\n", " ),\n", " ProcessingInput(\n", " source=step_process.properties.ProcessingOutputConfig.Outputs[\n", " \"test\"\n", " ].S3Output.S3Uri,\n", " destination=\"/opt/ml/processing/test\"\n", " )\n", " ],\n", " outputs=[\n", " ProcessingOutput(output_name=\"evaluation\", source=\"/opt/ml/processing/evaluation\"),\n", " ],\n", " code=\"abalone/evaluation.py\",\n", " property_files=[evaluation_report],\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "![Define a Model Evaluation Step to Evaluate the Trained Model](img/pipeline-4.png)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 추론 실행에 사용할 모델 정의 \n", "\n", "모델을 사용하는 배치변환작업을 실행하기 위해 SageMaker Model을 생성합니다.\n", "\n", "구체적으로, `TrainingStep` 인스턴스인 `step_train`의 속성 중 `S3ModelArtifacts` 정보를 활용하게 됩니다. `TrainingStep`의 `properties` 속성은 [DescribeTrainingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeTrainingJob.html) API의 응답결과와 동일합니다.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.model import Model\n", "\n", "\n", "model = Model(\n", " image_uri=image_uri,\n", " model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,\n", " sagemaker_session=sagemaker_session,\n", " role=role,\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "`instance_type`과 `accelerator_type`을 이용하여 Model input을 정의합니다. 이 값은 SageMaker 모델의 추론환경 배포를 위해 사용될 것입니다. 그 다음 앞서 정의한 model 인스턴스를 사용하여`CreateModelStep`을 생성합니다. \n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.inputs import CreateModelInput\n", "from sagemaker.workflow.steps import CreateModelStep\n", "\n", "\n", "inputs = CreateModelInput(\n", " instance_type=\"ml.m5.large\",\n", " accelerator_type=\"ml.eia1.medium\",\n", ")\n", "step_create_model = CreateModelStep(\n", " name=\"AbaloneCreateModel\",\n", " model=model,\n", " inputs=inputs,\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 배치추론 실행을 위해 배치 변환단계 정의 \n", "\n", "이제 모델 인스턴스가 정의되었습니다. 다음으로 `Transformer`인스턴스를 정의합니다. 적절한 모델 타입과 실행할 인스턴스타입, 출력이 저장될 S3 URI를 지정합니다. \n", "\n", "\n", "구체적으로는 `CreateModelStep`단계의 `step_create_model` 인스턴스의 속성 중 `ModelName`을 이용하여 모델명을 전달하였습니다. `CreateModelStep`의 `properties`속성은 [DescribeModel](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeModel.html) API의 응답 오브젝트와 대응됩니다. \n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.transformer import Transformer\n", "\n", "\n", "transformer = Transformer(\n", " model_name=step_create_model.properties.ModelName,\n", " instance_type=\"ml.m5.xlarge\",\n", " instance_count=1,\n", " output_path=f\"s3://{default_bucket}/AbaloneTransform\"\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "transformer 인스턴스와 이전에 정의한 `batch_data`파라미터를 사용하는 `TransformInput` 입력채널을 이용하여 배치변형 추론단계를 생성합니다." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.inputs import TransformInput\n", "from sagemaker.workflow.steps import TransformStep\n", "\n", "\n", "step_transform = TransformStep(\n", " name=\"AbaloneTransform\",\n", " transformer=transformer,\n", " inputs=TransformInput(data=batch_data)\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 모델 패키지 생성하고 모델 등록하기 \n", "\n", "`RegisterModel`인스턴스를 생성하기 위해 학습단계에서 선언한 Estimator 인스턴스를 사용합니다. 모델 패키지는 재사용가능한 모델 아티팩트의 추상화이며 추론을 위해 필요한 모든 정보를 포함하고 있습니다. 예를 들어, 모델의 Weight가 있는 위치나 추론이미지 등의 설정이 이에 포함됩니다. \n", "\n", "모델 패키지 그룹은 모델 패키지의 집합입니다. 특정 머신러닝 문제를 해결하기 위해 모델 패키지 그룹을 생성하고, 새로운 버전의 모델패키지를 추가할 수 있습니다. 일반적으로 SageMaker pipeline작업이 실행될 때 마다 새로운 버전의 모델패키지를 생성하고 모델패키지그룹에 추가하게 됩니다. \n", "\n", "`RegisterModel`은 Python SDK에서 Estimator 인스턴스의 `register` 메소드와 유사합니다.\n", "\n", "구체적으로, `TrainingStep`으로 정의한 `step_train`에서 `S3ModelArtifacts`를 속성을 이용하여 모델을 전달합니다. `TrainingStep`의 `properties`에서 활용할 수 있는 속성들은 [DescribeTrainingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeTrainingJob.html) 의 응답 오브젝트에 대응됩니다. \n", "\n", "본 노트북에서 사용된 모델패키지 그룹이름은 SageMaker Project와 함께 모델 레지스트리와 CI/CD 작업에서 직접 활용될 수 있습니다. \n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.model_metrics import MetricsSource, ModelMetrics \n", "from sagemaker.workflow.step_collections import RegisterModel\n", "\n", "\n", "model_metrics = ModelMetrics(\n", " model_statistics=MetricsSource(\n", " s3_uri=\"{}/evaluation.json\".format(\n", " step_eval.arguments[\"ProcessingOutputConfig\"][\"Outputs\"][0][\"S3Output\"][\"S3Uri\"]\n", " ),\n", " content_type=\"application/json\"\n", " )\n", ")\n", "step_register = RegisterModel(\n", " name=\"AbaloneRegisterModel\",\n", " estimator=xgb_train,\n", " model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,\n", " content_types=[\"text/csv\"],\n", " response_types=[\"text/csv\"],\n", " inference_instances=[\"ml.t2.medium\", \"ml.m5.xlarge\"],\n", " transform_instances=[\"ml.m5.xlarge\"],\n", " model_package_group_name=model_package_group_name,\n", " approval_status=model_approval_status,\n", " model_metrics=model_metrics,\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "![Define a Create Model Step and Batch Transform to Process Data in Batch at Scale](img/pipeline-5.png)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 정확도를 체크하고 결과에 따라 모델 생성과 배치추론, 모델 등록을 실행하는 조건단계 정의하기 \n", "\n", "\n", "본 과정에서 `step_eval` 평가 단계의 평가 결과로 측정된 모델의 정확도가 특정 값을 넘어서는 경우에만 등록되도록 구성되었습니다. 파이프라인의 `ConditionStep`은 DAG에서 해당 단계의 속성 조건에 따라 조건부로 실행을 기능을 지원합니다.\n", "\n", "이어지는 셀에서 실행하는 기능은 다음과 같습니다.\n", "* 평가 단계 `step_eval`의 출력 정확도에 대하여 `ConditionLessThanOrEqualTo` 조건 설정\n", "* 생성한 조건을 `ConditionStep`에서 분기를 위한 조건속성으로 설정\n", "* `ConditionStep`의 `if_steps`을 이용하여 조건값이 `True`일 경우에만 `CreateModelStep`, `TransformStep`, `RegisterModel` 단계가 진행되도록 설정\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo\n", "from sagemaker.workflow.condition_step import (\n", " ConditionStep,\n", " JsonGet,\n", ")\n", "\n", "\n", "cond_lte = ConditionLessThanOrEqualTo(\n", " left=JsonGet(\n", " step=step_eval,\n", " property_file=evaluation_report,\n", " json_path=\"regression_metrics.mse.value\",\n", " ),\n", " right=6.0\n", ")\n", "\n", "step_cond = ConditionStep(\n", " name=\"AbaloneMSECond\",\n", " conditions=[cond_lte],\n", " if_steps=[step_register, step_create_model, step_transform],\n", " else_steps=[], \n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "![Define a Condition Step to Check Accuracy and Conditionally Execute Steps](img/pipeline-6.png)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 파리마터, 단계, 조건을 조합하여 최종 파이프라인 정의\n", "\n", "이제 지금까지 생성한 단계들을 하나의 파이프라인으로 조합하고 실행하도록 하겠습니다.\n", "\n", "파이프라인은 `name`, `parameters`, `steps` 속성이 필요합니다. 여기서 파이프라인의 이름은 `(account, region)` 조합에 대하여 유일(unique))해야 합니다.\n", "\n", "주의:\n", "\n", "* 정의에 사용한 모든 파라미터가 존재해야 합니다. All of the parameters used in the definitions must be present.\n", "* 파이프라인으로 전달된 단계(step)들은 실행순서와는 무관합니다. SageMaker Pipeline은 단계가 실행되고 완료될 수 있도록 의존관계를를 해석합니다.\n", "* 단계들은 반드시 파이프라인내 단계 리스트와 컨디션의 if/esle 조건에서 유일해야 합니다.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.workflow.pipeline import Pipeline\n", "\n", "\n", "pipeline_name = f\"AbalonePipeline\"\n", "pipeline = Pipeline(\n", " name=pipeline_name,\n", " parameters=[\n", " processing_instance_type, \n", " processing_instance_count,\n", " training_instance_type,\n", " model_approval_status,\n", " input_data,\n", " batch_data,\n", " ],\n", " steps=[step_process, step_train, step_eval, step_cond],\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "![Define a Pipeline of Parameters, Steps, and Conditions](img/pipeline-7.png)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### (선택) 파이프라인 정의 확인 \n", "\n", "파이프라인을 정의하는 JSON을 생성하고 파이프라인 내에서 사용하는 파라미터와 단계별 속성들이 잘 정의되었는지 확인할 수 있습니다." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import json\n", "\n", "\n", "definition = json.loads(pipeline.definition())\n", "definition" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 파이프라인을 SageMaker에 제출하고 실행하기 \n", "\n", "파이프라인 정의를 파이프라인 서비스에 제출합니다. 함께 전달되는 역할(role)을 이용하여 AWS에서 파이프라인을 생성하고 작업의 각 단계를 실행할 것입니다. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pipeline.upsert(role_arn=role)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "디폴트값을 이용하여 파이프라인을 샐행합니다. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "execution = pipeline.start()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 파이프라인 운영: 파이프라인 대기 및 실행상태 확인\n", "\n", "워크플로우의 실행상황을 살펴봅니다. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "execution.describe()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "실행이 완료될 때까지 기다립니다." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# execution.wait()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "실행된 단계들을 리스트업합니다. 파이프라인의 단계실행 서비스에 의해 시작되거나 완료된 단계를 보여줍니다." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "execution.list_steps()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 평가결과 확인 \n", "\n", "파이프라인의 실행이 완료된 후 생성된 결과 모델의 평가결과를 확인합니다. `evaluation.json`파일을 S3로부터 다운로드하고 내용을 살펴봅니다." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from pprint import pprint\n", "\n", "\n", "evaluation_json = sagemaker.s3.S3Downloader.read_file(\"{}/evaluation.json\".format(\n", " step_eval.arguments[\"ProcessingOutputConfig\"][\"Outputs\"][0][\"S3Output\"][\"S3Uri\"]\n", "))\n", "pprint(json.loads(evaluation_json))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 계보(Lineage)\n", "\n", "파이프라인에 의해 생성된 아티팩트의 계보를 살펴봅니다." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import time\n", "from sagemaker.lineage.visualizer import LineageTableVisualizer\n", "\n", "\n", "viz = LineageTableVisualizer(sagemaker.session.Session())\n", "for execution_step in reversed(execution.list_steps()):\n", " print(execution_step)\n", " display(viz.show(pipeline_execution_step=execution_step))\n", " time.sleep(5)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 파라미터기반 실행 \n", "\n", "생성한 파이프라인을 다른 파라미터값을 이용하여 다시 실행할 수 있습니다. 파라미터정보는 딕셔너리 형태로 파라미터이름과 값을 지정하여 전달하면 디폴트값을 오버라이드하게 됩니다. \n", "\n", "모델의 성능에 따라 이번에는 컴퓨팅최적화된 인스턴스 타입을 이용하여 파이프라인을 실행하고 승인 상태를 자동으로 \"Approved\"로 설정하고 싶다면 다음 셀의 코드를 실행할 수 있습니다. 모델의 승인상태가 \"Approved\"라는 의미는 `RegisterModel` 단계에서 패키지버전이 등록될 때 자동으로 CI/CD 파이프라인에 의해 배포가능한 상태가 된다는 것을 의미합니다. 이후 배포파이프라인 프로세스는 SageMaker project를 통하여 자동화할 수 있습니다. \n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "execution = pipeline.start(\n", " parameters=dict(\n", " ProcessingInstanceType=\"ml.c5.xlarge\",\n", " ModelApprovalStatus=\"Approved\",\n", " )\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# execution.wait()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "execution.list_steps()" ] } ], "metadata": { "instance_type": "ml.t3.medium", "kernelspec": { "display_name": "Python 3 (Data Science)", "language": "python", "name": "python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-east-2:429704687514:image/datascience-1.0" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.10" } }, "nbformat": 4, "nbformat_minor": 4 }