# MLOps with SageMaker Pipelines
---

## Prerequisites
---

본 모듈은 여러분이 SageMaker와 SageMaker Pipelines에 대한 기본 컨셉을 알고 있다고 가정합니다. 만약 기본 컨셉에 대한 이해와 step-by-step 핸즈온이 필요하면 아래 링크들을 통해 세션 시청 후, 핸즈온을 해 보시는 것을 권장드립니다.

- SageMaker Pipelines 세션 (AWS Builders 300)
    - Part 1: https://www.youtube.com/watch?v=7IL_0-OjZWk
    - Part 2: https://www.youtube.com/watch?v=z_l2aNJswWQ
- SageMaker Pipelines Step-by-step 핸즈온
    - 입문 과정: https://github.com/gonsoomoon-ml/SageMaker-Pipelines-Step-By-Step
    - (optionally) 고급 과정 1: https://github.com/gonsoomoon-ml/SageMaker-Pipelines-Step-By-Step/tree/main/phase01
    - (optionally) 고급 과정 2: https://github.com/gonsoomoon-ml/SageMaker-Pipelines-Step-By-Step/tree/main/phase02


## Introduction
---

본 모듈에서는 SageMaker Pipelines를 사용하여 간단한 머신 러닝 파이프라인을 구축합니다. SageMaker Pipelines은 re:Invent 2020 서비스 런칭 이후 지속적으로 업데이트되고 있으며, 2021년 8월 업데이트된 주요 기능인 Lambda Step을 사용하면 호스팅 엔드포인트 모델 배포를 비롯한 서버리스 작업들을 쉽게 수행할 수 있습니다. 또한 캐싱(caching) 기능을 사용하면 모든 파이프라인을 처음부터 재시작할 필요 없이 변경된 파라메터에 대해서만 빠르게 실험해볼 수 있습니다. Lambda Step과 캐싱에 대한 자세한 내용은 아래 링크들을 참조해 주세요.

### References 
- SageMaker Pipelines SDK: https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-sdk.html
- Caching Pipeline Steps: https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-caching.html
- AWS AIML Blog: Use a SageMaker Pipeline Lambda step for lightweight model deployments: https://aws.amazon.com/de/blogs/machine-learning/use-a-sagemaker-pipeline-lambda-step-for-lightweight-model-deployments/

### Note
- 본 노트북을 실행하려면 `AmazonSageMakerFullAccess`와 `AmazonSageMakerPipelinesIntegrations` policy를 추가해야 합니다.
- 빠른 핸즈온을 위해 1,000건의 샘플 데이터와 1 epoch으로 전처리 및 훈련을 수행합니다. 사전에 이미 파인튜닝이 완료된 모델을 훈련하므로 높은 정확도를 보입니다.

In [None]:
import boto3
import os
import numpy as np
import sagemaker
import sys
import time

import sagemaker
import sagemaker.huggingface
from sagemaker.huggingface import HuggingFace, HuggingFaceModel

from sagemaker.workflow.parameters import ParameterInteger, ParameterFloat, ParameterString

from sagemaker.lambda_helper import Lambda

from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.huggingface.processing import HuggingFaceProcessor

from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import CacheConfig, ProcessingStep

from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep

from sagemaker.processing import ScriptProcessor
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.step_collections import CreateModelStep, RegisterModel

from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo,ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet

from sagemaker.workflow.pipeline import Pipeline, PipelineExperimentConfig
from sagemaker.workflow.execution_variables import ExecutionVariables

In [None]:
sess = sagemaker.Session()
region = sess.boto_region_name

# sagemaker session bucket -> used for uploading data, models and logs
# sagemaker will automatically create this bucket if it not exists
sagemaker_session_bucket=None
if sagemaker_session_bucket is None and sess is not None:
    # set to default bucket if a bucket name is not given
    sagemaker_session_bucket = sess.default_bucket()

role = sagemaker.get_execution_role()
sagemaker_session = sagemaker.Session(default_bucket=sagemaker_session_bucket)

print(f"sagemaker role arn: {role}")
print(f"sagemaker bucket: {sagemaker_session.default_bucket()}")
print(f"sagemaker session region: {sagemaker_session.boto_region_name}")


<br>

## 1. Defining the Pipeline
---

### 1.1. Pipeline parameters

기본적인 파이프라인 파라메터들을 정의합니다. 자세한 내용은 아래 링크를 참조해 주세요.

References: 
- 개발자 가이드: https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-parameters.html

In [None]:
# S3 prefix where every assets will be stored
s3_prefix = "hf-kornlp-mlops-demo"

# s3 bucket used for storing assets and artifacts
bucket = sagemaker_session.default_bucket()

# aws region used
region = sagemaker_session.boto_region_name

# base name prefix for sagemaker jobs (training, processing, inference)
base_job_prefix = s3_prefix

# Cache configuration for workflow
cache_config = CacheConfig(enable_caching=True, expire_after="7d")

# package versions
transformers_version = "4.11.0"
pytorch_version = "1.9.0"
py_version = "py38"

model_id_ = "daekeun-ml/koelectra-small-v3-nsmc"
tokenizer_id_ = "daekeun-ml/koelectra-small-v3-nsmc"
dataset_name_ = "nsmc"

model_id = ParameterString(name="ModelId", default_value=model_id_)
tokenizer_id = ParameterString(name="TokenizerId", default_value=tokenizer_id_)
dataset_name = ParameterString(name="DatasetName", default_value=dataset_name_)

### 1.2. Processing Step

빌트인 `SKLearnProcessor`를 통해 전처리 스텝을 정의합니다. 

최근 PyTorch, TensorFlow, MXNet, XGBoost, Hugging Face도 빌트인으로 지원되기 시작했습니다. `HuggingFaceProcessor` 사용 예시는 아래 코드 snippet을 참조해 주세요. 단, `HuggingFaceProcessor`는 현 시점(2022년 1월)에서는 GPU 인스턴스만 지원하기 때문에 GPU 리소스가 필요하지 않은 경우는 `SKLearnProcessor` 사용을 권장드립니다.


```python
from sagemaker.huggingface.processing import HuggingFaceProcessor

hf_processor = HuggingFaceProcessor(
    instance_type=processing_instance_type, 
    instance_count=processing_instance_count,
    pytorch_version=pytorch_version,
    transformers_version=transformers_version,
    py_version=py_version,
    base_job_name=base_job_prefix + "-preprocessing",
    sagemaker_session=sagemaker_session,    
    role=role
)

```

References: 
- AWS AIML Blog: https://aws.amazon.com/ko/blogs/machine-learning/use-deep-learning-frameworks-natively-in-amazon-sagemaker-processing/
- 개발자 가이드: https://docs.aws.amazon.com/ko_kr/sagemaker/latest/dg/build-and-manage-steps.html#step-type-processing

In [None]:
processing_instance_type = ParameterString(name="ProcessingInstanceType", default_value="ml.c5.xlarge")
processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)
processing_script = ParameterString(name="ProcessingScript", default_value="./pipeline_src/processing_sklearn.py")

In [None]:
processing_output_destination = f"s3://{bucket}/{s3_prefix}/data"

sklearn_processor = SKLearnProcessor(
    instance_type=processing_instance_type, 
    instance_count=processing_instance_count,
    framework_version="0.23-1",    
    base_job_name=base_job_prefix + "-preprocessing",
    sagemaker_session=sagemaker_session,    
    role=role
)

step_process = ProcessingStep(
    name="ProcessDataForTraining",
    cache_config=cache_config,
    processor=sklearn_processor,
    job_arguments=["--model_id", model_id_,
                   "--tokenizer_id", tokenizer_id_,
                   "--dataset_name", dataset_name_,
                   "--transformers_version", transformers_version,
                   "--pytorch_version", pytorch_version
                  ],
    outputs=[
        ProcessingOutput(
            output_name="train",
            destination=f"{processing_output_destination}/train",
            source="/opt/ml/processing/train",
        ),
        ProcessingOutput(
            output_name="validation",
            destination=f"{processing_output_destination}/test",
            source="/opt/ml/processing/validation",
        ),
        ProcessingOutput(
            output_name="test",
            destination=f"{processing_output_destination}/test",
            source="/opt/ml/processing/test",
        )        
    ],
    code=processing_script
)

### 1.3. Model Training Step

이전 랩에서 진행한 훈련 스크립트를 그대로 활용하여 훈련 스텝을 정의합니다. SageMaker Pipelines에 적용하기 위해 워크플로 파라메터(`ParameterInteger, ParameterFloat, ParameterString`)도 같이 정의합니다.

훈련, 검증 및 테스트 데이터에 대한 S3 경로는 이전 랩처럼 수동으로 지정하는 것이 아니라 체인으로 연결되는 개념이기에, 아래 예시처럼 전처리 스텝 결괏값(`step_process`)의 프로퍼티(`properties`)를 참조하여 지정해야 합니다.
```python
"train": TrainingInput(
    s3_data=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri
)
```

#### Training Parameter

In [None]:
# training step parameters
training_entry_point = ParameterString(name="TrainingEntryPoint", default_value="train.py")
training_source_dir = ParameterString(name="TrainingSourceDir", default_value="./pipeline_src")
training_instance_type = ParameterString(name="TrainingInstanceType", default_value="ml.p3.2xlarge")
training_instance_count = ParameterInteger(name="TrainingInstanceCount", default_value=1)

# hyperparameters, which are passed into the training job
n_gpus = ParameterString(name="NumGPUs", default_value="1")
epochs = ParameterString(name="Epochs", default_value="1")
seed = ParameterString(name="Seed", default_value="42")
train_batch_size = ParameterString(name="TrainBatchSize", default_value="32")
eval_batch_size = ParameterString(name="EvalBatchSize", default_value="64")           
learning_rate = ParameterString(name="LearningRate", default_value="5e-5") 

# model_id = ParameterString(name="ModelId", default_value=model_id_)
# tokenizer_id = ParameterString(name="TokenizerId", default_value=tokenizer_id_)
# dataset_name = ParameterString(name="DatasetName", default_value=dataset_name_)

In [None]:
hyperparameters = {
    'n_gpus': n_gpus,                       # number of GPUs per instance
    'epochs': epochs,                       # number of training epochs
    'seed': seed,                           # seed
    'train_batch_size': train_batch_size,   # batch size for training
    'eval_batch_size': eval_batch_size,     # batch size for evaluation
    'warmup_steps': 0,                      # warmup steps
    'learning_rate': learning_rate,         # learning rate used during training
    'tokenizer_id': model_id,               # pre-trained tokenizer
    'model_id': tokenizer_id                # pre-trained model
}

chkpt_s3_path = f's3://{bucket}/{s3_prefix}/sm-processing/checkpoints'

In [None]:
huggingface_estimator = HuggingFace(
    entry_point=training_entry_point,
    source_dir=training_source_dir,
    base_job_name=base_job_prefix + "-training",
    instance_type=training_instance_type,
    instance_count=training_instance_count,
    role=role,
    transformers_version=transformers_version,
    pytorch_version=pytorch_version,
    py_version=py_version,
    hyperparameters=hyperparameters,
    sagemaker_session=sagemaker_session,    
    disable_profiler=True,
    debugger_hook_config=False,
    checkpoint_s3_uri=chkpt_s3_path,
    checkpoint_local_path='/opt/ml/checkpoints'
)

step_train = TrainingStep(
    name="TrainHuggingFaceModel",
    estimator=huggingface_estimator,
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "train"
            ].S3Output.S3Uri
        ),
        "test": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "test"
            ].S3Output.S3Uri
        ),
    },
    cache_config=cache_config,
)

### 1.4. Model evaluation Step

훈련된 모델의 성능을 평가하기 위해 추가 `ProcessingStep`을 정의합니다. 평가 결과에 따라 모델이 생성, 등록 및 배포되거나 파이프라인이 중단됩니다.
평가 결과는 `PropertyFile`에 복사되며, 이는 이후 `ConditionStep`에서 사용됩니다.

#### Evaluation Parameter

In [None]:
evaluation_script = ParameterString(name="EvaluationScript", default_value="./pipeline_src/evaluate.py")
evaluation_instance_type = ParameterString(name="EvaluationInstanceType", default_value="ml.m5.xlarge")
evaluation_instance_count = ParameterInteger(name="EvaluationInstanceCount", default_value=1)

#### Evaluator

In [None]:
!pygmentize ./pipeline_src/evaluate.py

In [None]:
script_eval = SKLearnProcessor(
    framework_version="0.23-1",
    instance_type=evaluation_instance_type,
    instance_count=evaluation_instance_count,
    base_job_name=base_job_prefix + "-evaluation",
    sagemaker_session=sagemaker_session,
    role=role,
)

evaluation_report = PropertyFile(
    name="HuggingFaceEvaluationReport",
    output_name="evaluation",
    path="evaluation.json",
)

step_eval = ProcessingStep(
    name="HuggingfaceEvalLoss",
    processor=script_eval,
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        )
    ],
    outputs=[
        ProcessingOutput(
            output_name="evaluation",
            source="/opt/ml/processing/evaluation",
            destination=f"s3://{bucket}/{s3_prefix}/evaluation_report",
        ),
    ],
    code=evaluation_script,
    property_files=[evaluation_report],
    cache_config=cache_config,
)

### 1.5. Register the model

훈련된 모델은 모델 패키지 그룹(Model Package Group)의 모델 레지스트리(Model Registry)에 등록됩니다. 모델 레지스트리는 SageMaker Pipelines에서 소개된 개념으로, 기존 SageMaker 모델과 다르게 모델 버전 관리가 가능하며 승인 여부를 지정할 수 있습니다. 모델 승인은 `ConditionStep`의 조건을 만족할 때에만 가능하게 할 수 있습니다. (예: 정확도가 80% 이상인 경우에만 모델 배포)

In [None]:
model = HuggingFaceModel(
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    role=role,
    transformers_version=transformers_version,
    pytorch_version=pytorch_version,
    py_version=py_version,
    sagemaker_session=sagemaker_session,
)
model_package_group_name = "HuggingFaceModelPackageGroup"
step_register = RegisterModel(
    name="HuggingFaceRegisterModel",
    model=model,
    content_types=["application/json"],
    response_types=["application/json"],
    inference_instances=["ml.m5.xlarge", "ml.g4dn.xlarge"],
    transform_instances=["ml.m5.xlarge", "ml.g4dn.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status="Approved",
)

### 1.6. Model Deployment


`LambdaStep`에서 파생된 커스텀 단계 `ModelDeployment`를 생성합니다. LambdaStep에서 정의한 Lambda 함수를 통해 호스팅 리얼타임 엔드포인트를 배포합니다.

In [None]:
!pygmentize pipeline_utils/deploy_step.py

In [None]:
# custom Helper Step for ModelDeployment
from pipeline_utils.deploy_step import ModelDeployment

# we will use the iam role from the notebook session for the created endpoint
# this role will be attached to our endpoint and need permissions, e.g. to download assets from s3
sagemaker_endpoint_role=sagemaker.get_execution_role()
model_name = f"{model_id_.split('/')[-1]}-{time.strftime('%Y-%m-%d-%H-%M-%S', time.localtime())}"

step_deployment = ModelDeployment(
    model_name=model_name,
    registered_model=step_register.steps[0],
    endpoint_instance_type="ml.m5.xlarge",
    sagemaker_endpoint_role=sagemaker_endpoint_role,
    autoscaling_policy=None,
)

### 1.7. Condition for deployment

`ConditionStep`을 통해 모델 평가 결과를 검사합니다. 정확도가 일정 이상일 때(accuracy > 0.8) 모델 등록 및 배포 파이프라인을 진행합니다.

#### Condition Parameter

In [None]:
threshold_accuracy = ParameterFloat(name="ThresholdAccuracy", default_value=0.8)

#### Condition

In [None]:
cond_gte = ConditionGreaterThanOrEqualTo(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="eval_accuracy",
    ),
    right=threshold_accuracy,
)

step_cond = ConditionStep(
    name="CheckHuggingfaceEvalAccuracy",
    conditions=[cond_gte],
    if_steps=[step_register, step_deployment],
    else_steps=[],
)

<br>

## 2. Pipeline definition and execution

---

모든 스텝을 정의하였다면 파이프라인을 정의합니다. 

파이프라인 인스턴스는 이름(`name`), 파라메터(`parameters`), 및 스텝(`steps`)으로 구성됩니다. 
- 파이프라인 이름: (AWS 계정, 리전) 쌍 내에서 고유해야 합니다 
- 파라메터: 스텝 정의에 사용했던 모든 파라메터들을 파이프라인에서 정의해야 합니다. 
- 스텝: 리스트 형태로 이전 스텝들을 정의합니다. 내부적으로 데이터 종속성을 사용하여 각 스텝 간의 관계를 DAG으로 정의하기 때문에 실행 순서대로 나열할 필요는 없습니다.

In [None]:
pipeline = Pipeline(
    name=f"HuggingFaceDemoPipeline",
    parameters=[
        model_id,
        tokenizer_id,        
        dataset_name,
        processing_instance_type,
        processing_instance_count,
        processing_script,
        training_entry_point,
        training_source_dir,
        training_instance_type,
        training_instance_count,
        evaluation_script,
        evaluation_instance_type,
        evaluation_instance_count,
        threshold_accuracy,
        n_gpus,
        epochs,
        seed,
        eval_batch_size,
        train_batch_size,
        learning_rate,
    ],
    steps=[step_process, step_train, step_eval, step_cond],
    sagemaker_session=sagemaker_session,
)

#### Check the pipeline definition


In [None]:
import json

definition = json.loads(pipeline.definition())
definition

In [None]:
pipeline.upsert(role_arn=role)

#### Run the pipeline

파이프라인을 실행합니다.

In [None]:
execution = pipeline.start()

In [None]:
execution.describe()

파이프라인 실행이 완료될 때까지 기다립니다. SageMaker Studio 콘솔을 통해 진행 상황을 확인할 수도 있습니다.

In [None]:
execution.wait()

실행된 스텝들을 리스트업합니다.

In [None]:
execution.list_steps()

In [None]:
time.sleep(30)  # give time to catch up

<br>

## 3. Getting predictions from the endpoint
---

파이프라인의 모든 단계가 정상적으로 실행되었다면 배포된 엔드포인트를 통해 실시간 추론을 수행할 수 있습니다.

In [None]:
from sagemaker.huggingface import HuggingFacePredictor
from IPython.core.display import display, HTML

endpoint_name = model_name
hf_predictor = HuggingFacePredictor(endpoint_name,sagemaker_session=sagemaker_session)

def make_endpoint_link(region, endpoint_name, endpoint_task):
    endpoint_link = f'<b><a target="blank" href="https://console.aws.amazon.com/sagemaker/home?region={region}#/endpoints/{endpoint_name}">{endpoint_task} Review Endpoint</a></b>'   
    return endpoint_link 

# check if endpoint is up and running
endpoint_link = make_endpoint_link(region, endpoint_name, '[Deploy model using SageMaker Pipelines]')
display(HTML(endpoint_link))

In [None]:
# example request, you always need to define "inputs"
data = {
   "inputs": [
       "정말 재미있습니다. 세 번 봐도 질리지 않아요.",
       "시간이 아깝습니다. 다른 영화를 보세요."
   ]
}
hf_predictor.predict(data)

In [None]:
data = {
   "inputs": [
       "10점 만점에 1점만 줄께요.",
       "내용이 너무 아른거려서 잠을 이룰 수가 없었어요. 감동의 향연!",
       "액션광이기에 내용을 기대했지만 앙꼬없는 찐빵이다"
   ]
}
hf_predictor.predict(data)

<br>

## Clean up
---

과금을 방지하기 위해 사용하지 않는 리소스를 삭제합니다. 아래 코드셀은 Lambda 함수와 엔드포인트를 삭제합니다. 

In [None]:
sm_client = boto3.client("sagemaker")

# Delete the Lambda function
step_deployment.func.delete()

# Delete the endpoint
hf_predictor.delete_endpoint()