# A/B Test using Production Variants
---

## Introduction
---

프로덕션 ML 워크플로에서 데이터 과학자와 머신 러닝 엔지니어는 데이터/모델/컨셉 드리프트에 따른 재훈련, 하이퍼파라메터 튜닝, 피쳐 선택 등과 같은 다양한 방법들을 통해 모델을 개선합니다. 이 때 이전 모델과 신규 모델 간의 A/B 테스트를 수행함으로써, 신규 모델에 대한 검증을 충분히 해야겠죠. 그렇다면 A/B 테스트를 위해 엔드포인트를 재배포하거나 2개의 엔드포인트를 배포해야 할까요? 그렇지 않습니다. 프로덕션 Variant 기능을 사용하면, 각 variant에 대해 동일한 엔드포인트 뒤에서 여러 모델 또는 모델 버전을 테스트할 수 있습니다. 

### Production Variants
프로덕션 Variant로 단일 SageMaker Endpoint에서 신규 모델을 테스트하고 배포할 수 있습니다. 예를 들어, 카나리 롤아웃(canary rollout) 및 블루/그린 배포(blue/green deployment)를 위해 엔드포인트의 모델 간에 트래픽을 이동할 수 있습니다. 물론, 초당 요청 수(requests per second)과 같은 지표를 기반으로 엔드포인트를 자동으로 확장하거나 축소하도록 오토스케일링 policy를 구성할 수도 있습니다.

본 실습에서는 아래와 같은 기능들을 체험해 봅니다.
- 2개의 프로덕션 variant들을 배포 (Variant1: CPU, Variant2: GPU)
- 트래픽 분포 변경 (50:50 -> 80:20 -> 100:0)
- Variant2 삭제


### References
- A/B Testing ML models in production using Amazon SageMaker: https://aws.amazon.com/ko/blogs/machine-learning/a-b-testing-ml-models-in-production-using-amazon-sagemaker/
- Example: https://sagemaker-examples.readthedocs.io/en/latest/sagemaker_endpoints/a_b_testing/a_b_testing.html

In [None]:
import os
import json
import sys
import logging
import boto3
import sagemaker
import time
from datetime import datetime, timedelta
from sagemaker.huggingface import HuggingFaceModel
from sagemaker import session
from transformers import ElectraConfig
from transformers import (
    ElectraModel, ElectraTokenizer, ElectraForSequenceClassification
)

logging.basicConfig(
    level=logging.INFO, 
    format='[{%(filename)s:%(lineno)d} %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler(filename='tmp.log'),
        logging.StreamHandler(sys.stdout)
    ]
)
logger = logging.getLogger(__name__)

role = sagemaker.get_execution_role()
region = boto3.Session().region_name

sess = sagemaker.Session()
sm = boto3.Session().client("sagemaker")
sm_runtime = boto3.Session().client("sagemaker-runtime")

<br>

## 1. Deploy Models
---

사전 훈련된 한국어 자연어 처리 모델(네이버 감성 분류 긍정/부정 판별)을 배포합니다. 편의상 동일한 모델을 2개의 프로덕션 변형에 배포하지만, 다른 모델(예: 다른 하이퍼파라메터로 훈련된 모델)을 배포할 수 있습니다.

In [None]:
model_dir = 'model'

# Define the model repo
tokenizer_id = 'daekeun-ml/koelectra-small-v3-nsmc'
model_id = "daekeun-ml/koelectra-small-v3-nsmc"

# Download model and tokenizer
model = ElectraForSequenceClassification.from_pretrained(model_id)
tokenizer = ElectraTokenizer.from_pretrained(tokenizer_id)

os.makedirs(model_dir, exist_ok=True)
model.save_pretrained(model_dir)
tokenizer.save_pretrained(model_dir)

모델 파라메터 및 토크나이저를 `model.tar.gz`으로 압축합니다. 압축 파일명은 자유롭게 지정할 수 있으나, 반드시 `tar.gz`로 압축해야 합니다.

In [None]:
model_artifact_name = 'model.tar.gz'
!cd model && tar -czvf {model_artifact_name} *.*

압축한 모델 아티팩트를 Amazon S3로 복사합니다.

In [None]:
s3_prefix = 'ab-test/models/nsmc'
s3_model_path = f's3://{sess.default_bucket()}/{s3_prefix}'
s3_model_url = f'{s3_model_path}/{model_artifact_name}'
!aws s3 cp {model_dir}/{model_artifact_name} {s3_model_path}/{model_artifact_name}

### Create Models

In [None]:
ecr_uri_cpu = f'763104351884.dkr.ecr.{region}.amazonaws.com/huggingface-pytorch-inference:1.9.1-transformers4.12.3-cpu-py38-ubuntu20.04'
ecr_uri_gpu = f'763104351884.dkr.ecr.{region}.amazonaws.com/huggingface-pytorch-inference:1.9.1-transformers4.12.3-gpu-py38-cu111-ubuntu20.04'

In [None]:
model_name1 = f"model-kornlp-nsmc-cpu-{datetime.now():%Y-%m-%d-%H-%M-%S}"
model_name2 = f"model-kornlp-nsmc-gpu-{datetime.now():%Y-%m-%d-%H-%M-%S}"

sess.create_model(
    name=model_name1, role=role, container_defs={"Image": ecr_uri_cpu, "ModelDataUrl": s3_model_url}
)

sess.create_model(
    name=model_name2, role=role, container_defs={"Image": ecr_uri_gpu, "ModelDataUrl": s3_model_url}
)

### Create Variants

엔드포인트 설정에서 프로덕션 variant를 여러 개 생성할 수 있습니다. 우선 각 variant에 대해 `initial_weight`를 1로 설정합니다. 즉, 클라이언트 요청의 50%가 Variant1로 이동하고 나머지 50%가 Variant로 이동됨을 의미합니다.

본 예제에서는 최적의 레이턴시&비용 절충안을 찾기 위해 Variant1을 CPU 인스턴스로 설정하고 Variant2를 GPU 인스턴스로 설정했습니다.

In [None]:
from sagemaker.session import production_variant

variant1 = production_variant(
    model_name=model_name1,
    instance_type="ml.c5.xlarge",
    initial_instance_count=1,
    variant_name="Variant1",
    initial_weight=1,
)
variant2 = production_variant(
    model_name=model_name2,
    instance_type="ml.g4dn.xlarge",
    initial_instance_count=1,
    variant_name="Variant2",
    initial_weight=1,
)
(variant1, variant2)

### Create Production Variants

단일 엔드포인트에 2개의 프로덕션 Variant들을 생성합니다.

In [None]:
endpoint_name = f"endpoint-kornlp-nsmc-{datetime.now():%Y-%m-%d-%H-%M-%S}"
print(f"EndpointName={endpoint_name}")

sess.endpoint_from_production_variants(
    name=endpoint_name, production_variants=[variant1, variant2], wait=False
)

### Wait for the endpoint jobs to complete

엔드포인트가 생성될 때까지 기다립니다. 약 5-10분의 시간이 소요됩니다. 아래 코드 셀에서 출력되는 AWS 콘솔 링크로 접속해서 엔드포인트 배포 상태를 확인할 수 있습니다.

In [None]:
from IPython.core.display import display, HTML

def make_endpoint_link(region, endpoint_name, endpoint_task):
    endpoint_link = f'<b><a target="blank" href="https://console.aws.amazon.com/sagemaker/home?region={region}#/endpoints/{endpoint_name}">{endpoint_task} Review Endpoint</a></b>'   
    return endpoint_link 
        
endpoint_link = make_endpoint_link(region, endpoint_name, '[Deploy model from S3]')
display(HTML(endpoint_link))

In [None]:
sess.wait_for_endpoint(endpoint_name, poll=5)

<br>

## 2. Invoke Endpoint
----

엔드포인트가 배포되었습니다. 샘플 데이터로 직접 추론을 수행해 봅니다.

In [None]:
def invoke_endpoint(payload, endpoint_name, target_variant=None):
    start = time.time()
    if target_variant is not None:
        response = sm_runtime.invoke_endpoint(
            EndpointName=endpoint_name,
            ContentType="application/json", 
            TargetVariant=target_variant,
            Body=payload,
        )        
    else:        
        response = sm_runtime.invoke_endpoint(
            EndpointName=endpoint_name,
            ContentType="application/json", 
            Body=payload,
        )
    latency = (time.time() - start) * 1000
    variant = response["InvokedProductionVariant"]
    logger.info(f'[{variant}] Latency: {latency:.3f} ms')
    output = json.loads(response['Body'].read().decode())
    return output

In [None]:
payload = '{"inputs": ["불후의 명작입니다. 눈물이 앞을 가려요", "저런...5점 만점에 1점 주기도 힘들어요."]}'
invoke_endpoint(payload, endpoint_name)

Variant1와 Variant2가 고르게 호출됨을 확인할 수 있습니다.

In [None]:
for i in range(10):
    invoke_endpoint(payload, endpoint_name)

In [None]:
{
    variant["VariantName"]: variant["CurrentWeight"]
    for variant in sm.describe_endpoint(EndpointName=endpoint_name)["ProductionVariants"]
}

`VariantName`으로 Target Variant를 고정적으로 지정할 수도 있습니다.

In [None]:
invoke_endpoint(payload, endpoint_name, variant1['VariantName'])
invoke_endpoint(payload, endpoint_name, variant2['VariantName'])

<br>

## 3. Update Variant Traffic (Canary Rollouts and A/B Testing)
---

카나리 롤아웃은 신규 모델을 안전하게 배포하기 위해 사용되는 전략 중 하나입니다. 대분의 트래픽이 기존 모델로 이동하고 카나리 모델의 클러스터에 할당되는 트래픽은 상대적으로 작기 때문에 사용자 경험에 영향을 거의 주지 않습니다. SageMaker에서는 이를 위한 기능을 API로 제공하고 있으며, A/B 테스트 결과에 따라 트래픽을 특정 variant에 더 할당할 경우 굳이 호스팅 엔드포인트를 재배포하실 필요가 없습니다. `UpdateEndpointWeightsAndCapacities`를 사용하면 엔드포인트 중단 없이 각 variant에 할당된 가중치를 쉽게 수정할 수 있기 때문입니다.

In [None]:
import pandas as pd

cw = boto3.Session().client("cloudwatch")

def get_metrics_for_endpoint_variant(
    endpoint_name, 
    variant_name, 
    metric_name,
    statistic, 
    start_time, 
    end_time
):
    
    dimensions = [
        {"Name": "EndpointName", "Value": endpoint_name},
        {"Name": "VariantName", "Value": variant_name},
    ]

    metrics = cw.get_metric_statistics(
        Namespace="AWS/SageMaker",
        MetricName="Invocations",
        StartTime=start_time,
        EndTime=end_time,
        Period=60,
        Statistics=[statistic],
        Dimensions=dimensions
    )
    return (
        pd.DataFrame(metrics["Datapoints"])
        .sort_values("Timestamp")
        .set_index("Timestamp")
        .drop("Unit", axis=1)
        .rename(columns={statistic: variant_name})   
    )


def plot_endpoint_metrics(start_time=None):
    start_time = start_time or datetime.now() - timedelta(minutes=60)
    end_time = datetime.now()
    metric_name = "Invocations"
    statistic = 'Sum'
    metrics_variant1 = get_metrics_for_endpoint_variant(
        endpoint_name, variant1["VariantName"], metric_name, statistic, start_time, end_time
    )
    metrics_variant2 = get_metrics_for_endpoint_variant(
        endpoint_name, variant2["VariantName"], metric_name, statistic, start_time, end_time
    )
    metrics_variants = metrics_variant1.join(metrics_variant2, how="outer")
    metrics_variants.plot()
    return metrics_variants

### Variant 트래픽 테스트

약 2분여간 추론 요청들을 수행하면서 각 variant의 트래픽 분포를 확인해 봅니다. 현재는 50:50 가중치이므로 트래픽 분포가 고르게 이루어지고 있다는 것을 알 수 있습니다.

In [None]:
from datetime import datetime, timedelta
def invoke_endpoint_many(payload, endpoint_name, num_requests=250, sleep_secs=0.5):
    for i in range(num_requests):
        print(".", end="", flush=True)
        response = sm_runtime.invoke_endpoint(
            EndpointName=endpoint_name,
            ContentType="application/json", 
            Body=payload,
        )
        output = json.loads(response['Body'].read().decode())
        time.sleep(sleep_secs)

payload = '{"inputs": ["불후의 명작입니다. 눈물이 앞을 가려요", "저런...5점 만점에 1점 주기도 힘들어요."]}'
invocation_start_time = datetime.now()
invoke_endpoint_many(payload, endpoint_name)
time.sleep(20)  # give metrics time to catch up
plot_endpoint_metrics(invocation_start_time)

### Variant 가중치 변경 (80:20)

이제 `UpdateEndpointWeightsAndCapacities`를 사용하여 각 variant의 가중치를 변경합니다. 트래픽의 80%를 variant1로 이동하고 나머지 트래픽을 variant2로 이동합니다. Variant 가중치 수정 후 곧바로 2분 정도 추론 요청을 연속적으로 수행해 보겠습니다.

In [None]:
sm.update_endpoint_weights_and_capacities(
    EndpointName=endpoint_name,
    DesiredWeightsAndCapacities=[
        {"DesiredWeight": 80, "VariantName": variant1["VariantName"]},
        {"DesiredWeight": 20, "VariantName": variant2["VariantName"]},
    ],
)

In [None]:
print("Waiting for update to complete")
while True:
    status = sm.describe_endpoint(EndpointName=endpoint_name)["EndpointStatus"]
    if status in ["InService", "Failed"]:
        print("Done")
        break
    print(".", end="", flush=True)
    time.sleep(1)

{
    variant["VariantName"]: variant["CurrentWeight"]
    for variant in sm.describe_endpoint(EndpointName=endpoint_name)["ProductionVariants"]
}

대부분의 추론 요청이 Variant1에서 처리되고 있으며, Variant2에서 처리된 추론 요청이 적다는 것을 볼 수 있습니다.

In [None]:
invoke_endpoint_many(payload, endpoint_name)
time.sleep(20)  # give metrics time to catch up
plot_endpoint_metrics(invocation_start_time)

### Variant 가중치 변경 (100:0)

Variant1의 퍼포먼스가 만족스럽다면 트래픽의 100%를 모두 variant1로 보내도록 라우팅할 수 있습니다. variant 가중치 수정 후 곧바로 2분 정도 추론 요청을 연속적으로 수행해 보겠습니다.

In [None]:
sm.update_endpoint_weights_and_capacities(
    EndpointName=endpoint_name,
    DesiredWeightsAndCapacities=[
        {"DesiredWeight": 1, "VariantName": variant1["VariantName"]},
        {"DesiredWeight": 0, "VariantName": variant2["VariantName"]},
    ],
)
print("Waiting for update to complete")
while True:
    status = sm.describe_endpoint(EndpointName=endpoint_name)["EndpointStatus"]
    if status in ["InService", "Failed"]:
        print("Done")
        break
    print(".", end="", flush=True)
    time.sleep(1)

{
    variant["VariantName"]: variant["CurrentWeight"]
    for variant in sm.describe_endpoint(EndpointName=endpoint_name)["ProductionVariants"]
}

모든 추론 요청이 Variant1에서 처리되고 있으며, Variant2에서 처리된 추론 요청이 없다는 것을 볼 수 있습니다.

In [None]:
invoke_endpoint_many(payload, endpoint_name)
time.sleep(20)  # give metrics time to catch up
plot_endpoint_metrics(invocation_start_time)

이슈가 없다면 곧바로 엔드포인트에서 Variant2를 삭제할 수 있습니다. 바로 아래 섹션에서 Variant2를 삭제해 보겠습니다. 물론, 프로덕션에서 새로운 테스트 환경이 필요할 때에는 엔드포인트에 신규 variant를 추가하고 신규 모델을 계속 테스트할 수 있습니다. 

In [None]:
endpoint_config_name = sm.describe_endpoint(EndpointName=endpoint_name)['EndpointConfigName']

<br>

## 4. Delete Variant
---

Variant를 여러 개 띄운다는 것은 모델 호스팅 클러스터를 여러 개 띄운다는 의미입니다. 이제, 불필요한 과금을 피하기 위해 Variant1만 사용하도록 엔드포인트 구성을 업데이트합니다. 엔드포인트 업데이트는 수 분이 소요되지만, 엔드포인트 업데이트 중에도 **다운타임이 발생하지 않는다는 점**을 주목해 주세요. (즉, `invoke_endpoint()`를 계속 수행할 수 있습니다.)


**[Tip]** 본 핸즈온에서는 빠른 실습을 위해 곧바로 Variant2의 클러스터를 삭제했지만, 실제 프로덕션에서는 이전 클러스터로 빠르게 롤백해야 하는 경우를 대비하여, Variant2를 일정 시간 동안 유휴 상태로 유지하는 것을 권장드립니다.

In [None]:
updated_endpoint_config_name = f"updated-endpoint-config-kornlp-nsmc-{datetime.now():%Y-%m-%d-%H-%M-%S}"
print(updated_endpoint_config_name)

updated_endpoint_config = sm.create_endpoint_config(
    EndpointConfigName=updated_endpoint_config_name,
    ProductionVariants=[
        {
         'VariantName': variant1["VariantName"],  # Only specify variant1 to remove variant2
         'ModelName': model_name1,
         'InstanceType':'ml.m5.xlarge',
         'InitialInstanceCount': 1,
         'InitialVariantWeight': 100
        }
    ])

In [None]:
sm.update_endpoint(
    EndpointName=endpoint_name,
    EndpointConfigName=updated_endpoint_config_name
)

In [None]:
invoke_endpoint(payload, endpoint_name)

AWS 콘솔에서 엔드포인트 상태를 확인합니다. Updating 상태로 수 분의 시간이 경과 후 `InServce`로 변경됩니다.

In [None]:
endpoint_link = make_endpoint_link(region, endpoint_name, '[Deploy model from S3]')
display(HTML(endpoint_link))

In [None]:
sess.wait_for_endpoint(endpoint_name, poll=5)

<br>

## Clean up
---

In [None]:
sess.delete_endpoint(endpoint_name)

In [None]:
sess.delete_endpoint_config(endpoint_config_name)
sess.delete_endpoint_config(updated_endpoint_config_name)

In [None]:
sess.delete_model(model_name1)
sess.delete_model(model_name2)

In [None]:
!rm -rf {model_dir}