# SageMaker기반으로 도커컨테이너를 직접 빌드하여 머신러닝 분류기 학습/배포하기

본 예제코드는 아래 소스에서 code-pipeline 부분을 제외하고 재구성하였습니다.
- 소스 : https://github.com/awslabs/amazon-sagemaker-mlops-workshop (lab 01)

본 예제에서는 도커이미지를 직접 생성하여 학습과 모델 배포를 진행할 것입니다. 실습에 사용할 모델은 scikit-learn(https://scikit-learn.org/)의 **Random Forest Tree** 를 이용하며, 붓꽃(iris)의 품종을 분류하는 기초적인 문제입니다. 실험에 사용할 데이터셋은 Iris (http://archive.ics.uci.edu/ml/datasets/iris)를 이용합니다. 모델 코드는 매우 간단한 예제이므로 실행환경과 진행방식에 보다 집중해 주십시오. 

Scikit-lean 컨테이너는 SageMaker에서 관리형으로 제공하고 있습니다. (https://docs.aws.amazon.com/sagemaker/latest/dg/sklearn.html) 본 예제는 컨테이너를 이용하는 방법을 보여주기 위한 예제입니다. 


## PART 1 - 도커 이미지 빌드 

### 1.1 학습용 스크립트 작성

Scikit-learn을 사용하는 간단한 예제입니다.

In [None]:
%%writefile train.py
import os
import sys
import pandas as pd
import re
import joblib
import json
from sklearn.ensemble import RandomForestClassifier

def load_dataset(path):
 # Take the set of files and read them all into a single pandas dataframe
 files = [ os.path.join(path, file) for file in os.listdir(path) ]
 
 if len(files) == 0:
 raise ValueError("Invalid # of files in dir: {}".format(path))

 raw_data = [ pd.read_csv(file, sep=",", header=None ) for file in files ]
 data = pd.concat(raw_data)

 # labels are in the first column
 y = data.iloc[:,0]
 X = data.iloc[:,1:]
 return X,y
 
def start(args):
 print("Training mode")

 try:
 X_train, y_train = load_dataset(args.train)
 X_test, y_test = load_dataset(args.validation)
 
 hyperparameters = {
 "max_depth": args.max_depth,
 "verbose": 1, # show all logs
 "n_jobs": args.n_jobs,
 "n_estimators": args.n_estimators
 }
 print("Training the classifier")
 model = RandomForestClassifier()
 model.set_params(**hyperparameters)
 model.fit(X_train, y_train)
 print("Score: {}".format( model.score(X_test, y_test)) )
 joblib.dump(model, open(os.path.join(args.model_dir, "iris_model.pkl"), "wb"))
 
 except Exception as e:
 # Write out an error file. This will be returned as the failureReason in the
 # DescribeTrainingJob result.
 trc = traceback.format_exc()
 with open(os.path.join(args.output_dir, "failure"), "w") as s:
 s.write("Exception during training: " + str(e) + "\\n" + trc)
 
 # Printing this causes the exception to be in the training job logs, as well.
 print("Exception during training: " + str(e) + "\\n" + trc, file=sys.stderr)
 
 # A non-zero exit code causes the training job to be marked as Failed.
 sys.exit(255)

### 1.2 추론용 핸들러(handler)를 만듭니다. 

SageMaker 추론 툴킷을 이용합니다. SageMaker 추론 툴킷은 SageMaker에서 머신러닝의 추론코드를 보다 쉽게 작성할 수 있도록 도와줍니다. 

- SageMaker 추론 툴킷 참조 : https://github.com/aws/sagemaker-inference-toolkit

In [None]:
%%writefile handler.py
import os
import sys
import joblib
from sagemaker_inference.default_inference_handler import DefaultInferenceHandler
from sagemaker_inference.default_handler_service import DefaultHandlerService
from sagemaker_inference import content_types, errors, transformer, encoder, decoder

class HandlerService(DefaultHandlerService, DefaultInferenceHandler):
 def __init__(self):
 op = transformer.Transformer(default_inference_handler=self)
 super(HandlerService, self).__init__(transformer=op)
 
 ## Loads the model from the disk
 def default_model_fn(self, model_dir):
 model_filename = os.path.join(model_dir, "iris_model.pkl")
 return joblib.load(open(model_filename, "rb"))
 
 ## Parse and check the format of the input data
 def default_input_fn(self, input_data, content_type):
 if content_type != "text/csv":
 raise Exception("Invalid content-type: %s" % content_type)
 return decoder.decode(input_data, content_type).reshape(1,-1)
 
 ## Run our model and do the prediction
 def default_predict_fn(self, payload, model):
 return model.predict( payload ).tolist()
 
 ## Gets the prediction output and format it to be returned to the user
 def default_output_fn(self, prediction, accept):
 if accept != "text/csv":
 raise Exception("Invalid accept: %s" % accept)
 return encoder.encode(prediction, accept)

### 1.3 컨테이너의 entrypoint 스크립트를 작성합니다. 

main함수에서 매개변수 설정을 위해 **SageMaker 학습 툴킷**(https://github.com/aws/sagemaker-training-toolkit)을 사용하였으며 추론 서빙을 위해서 **SageMaker 추론 툴킷**을 이용하고 있습니다. 이들을 사용하면 보다 간결하게 코드를 작성할 수 있습니다. 

In [None]:
%%writefile main.py
import train
import argparse
import sys
import os
import traceback
from sagemaker_inference import model_server
from sagemaker_training import environment

if __name__ == "__main__":
 if len(sys.argv) < 2 or ( not sys.argv[1] in [ "serve", "train" ] ):
 raise Exception("Invalid argument: you must inform 'train' for training mode or 'serve' predicting mode") 
 
 if sys.argv[1] == "train":
 
 env = environment.Environment()
 
 parser = argparse.ArgumentParser()
 # https://github.com/aws/sagemaker-training-toolkit/blob/master/ENVIRONMENT_VARIABLES.md
 parser.add_argument("--max-depth", type=int, default=10)
 parser.add_argument("--n-jobs", type=int, default=env.num_cpus)
 parser.add_argument("--n-estimators", type=int, default=120)
 
 # reads input channels training and testing from the environment variables
 parser.add_argument("--train", type=str, default=env.channel_input_dirs["train"])
 parser.add_argument("--validation", type=str, default=env.channel_input_dirs["validation"])

 parser.add_argument("--model-dir", type=str, default=env.model_dir)
 parser.add_argument("--output-dir", type=str, default=env.output_dir)
 
 args,unknown = parser.parse_known_args()
 train.start(args)
 else:
 model_server.start_model_server(handler_service="serving.handler")

### 1.4 Dockerfile을 작성합니다.

컨테이너에 설치하는 패키지에 주목하십시오. 컨테이너를 학습/추론용으로 모두 사용하기 위해 **SageMaker Inference Toolkit** (https://github.com/aws/sagemaker-inference-toolkit) 과 **SageMaker Training Toolkit** (https://github.com/aws/sagemaker-training-toolkit)을 설치하고 있습니다. 추론을 위해서는 multi-model-server를 이용하며 API 호출을 처리할 수 있는 웹서비스 형태로 모델을 서빙하게 됩니다. 

In [None]:
%%writefile Dockerfile
FROM python:3.7-buster

# Set a docker label to advertise multi-model support on the container
LABEL com.amazonaws.sagemaker.capabilities.multi-models=false
# Set a docker label to enable container to use SAGEMAKER_BIND_TO_PORT environment variable if present
LABEL com.amazonaws.sagemaker.capabilities.accept-bind-to-port=true

RUN apt-get update -y && apt-get -y install --no-install-recommends default-jdk
RUN rm -rf /var/lib/apt/lists/*

RUN pip --no-cache-dir install multi-model-server sagemaker-inference sagemaker-training
RUN pip --no-cache-dir install pandas numpy scipy scikit-learn

ENV PYTHONUNBUFFERED=TRUE
ENV PYTHONDONTWRITEBYTECODE=TRUE
ENV PYTHONPATH="/opt/ml/code:${PATH}"

COPY main.py /opt/ml/code/main.py
COPY train.py /opt/ml/code/train.py
COPY handler.py /opt/ml/code/serving/handler.py

ENTRYPOINT ["python", "/opt/ml/code/main.py"]

## PART 2 - 로컬 테스트: 로컬에서 이미지를 빌드하고 테스트를 수행합니다. 
### 2.1 로컬에서 이미지 빌드

SageMaker Jupyter 노트북은 이미 **도커** 환경이 설치되어 있어 별도 작업없이 바로 사용가능합니다. 

조금전 준비한 Dockerfile을 이용하여 도커이미지를 빌드합니다.


In [None]:
!docker build -f Dockerfile -t iris_model:1.0 .

### 2.2 이제 학습과 모델 배포를 위한 알고리즘 이미지가 준비되었습니다. 

### 다음은 데이터셋을 준비하겠습니다.

iris 데이터셋을 학습과 검증용으로 나누고 csv파일로 저장합니다. 이 파일들은 SageMaker환경과의 공유를 위해 S3버킷으로 업로드될 것입니다.


In [None]:
!rm -rf input
!mkdir -p input/data/train
!mkdir -p input/data/validation

import pandas as pd
import numpy as np

from sklearn import datasets
from sklearn.model_selection import train_test_split

iris = datasets.load_iris()

dataset = np.insert(iris.data, 0, iris.target,axis=1)

df = pd.DataFrame(data=dataset, columns=["iris_id"] + iris.feature_names)
X = df.iloc[:,1:]
y = df.iloc[:,0]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.33, random_state=42)

train_df = X_train.copy()
train_df.insert(0, "iris_id", y_train)
train_df.to_csv("input/data/train/training.csv", sep=",", header=None, index=None)

test_df = X_test.copy()
test_df.insert(0, "iris_id", y_test)
test_df.to_csv("input/data/validation/testing.csv", sep=",", header=None, index=None)

df.head()

### 2.3 준비한 도커이미지와 데이처를 이용하여 로컬에서 학습작업을 실행해 봅니다. 

다음 코드들은 로컬에서 SageMaker와 유사한 환경을 시뮬레이션합니다. docker run 실행시 파일 경로의 매핑에 주의합니다.

In [None]:
!rm -rf input/config && mkdir -p input/config

In [None]:
%%writefile input/config/hyperparameters.json
{"max_depth": 20, "n_jobs": 4, "n_estimators": 120}

In [None]:
%%writefile input/config/resourceconfig.json
{"current_host": "localhost", "hosts": ["algo-1-kipw9"]}

In [None]:
%%writefile input/config/inputdataconfig.json
{"train": {"TrainingInputMode": "File"}, "validation": {"TrainingInputMode": "File"}}

In [None]:
%%time
!rm -rf model/
!mkdir -p model

print( "Training...")
!docker run --rm --name "my_model" \
 -v "$PWD/model:/opt/ml/model" \
 -v "$PWD/output:/opt/ml/output" \
 -v "$PWD/input:/opt/ml/input" iris_model:1.0 train

### 2.4 다음은 추론 서빙을 테스트해봅니다. 

아래 코드는 SageMaker에 의해 생성되는 엔드포인트를 시뮬레이션합니다. 

다음 셀의 코드 실행하면 API서비스가 실행되면서 Jupyter notebook의 동작이 멈출 것입니다. 웹서비스는 8080 포트를 사용합니다. 

In [None]:
!docker run --rm --name "my_model" \
 -p 8080:8080 \
 -v "$PWD/model:/opt/ml/model" \
 -v "$PWD/input:/opt/ml/input" iris_model:1.0 serve

> 위 셀을 실행한 후 두번째 노트북 [TEST NOTEBOOK](02_Testing%20our%20local%20model%20server.ipynb)을 오픈하고 테스트를 실행합니다.

> 테스트가 끝나면 노트북 위쪽 메뉴에서 **STOP** 버튼을 클릭합니다.

## 이제 SageMaker 환경에서 이 컨테이너를 이용하여 학습과 배포를 진행해 봅니다.

AWS CodePipeline과 CodeBuild, CodeCommit을 이용하여 형상관리를 gkr 프로세스를 자동화하는 과정으로 진행하시려면 아래 원본 소스를 참고하십시오.
- https://github.com/awslabs/amazon-sagemaker-mlops-workshop 

이번에는 ECR에 배포된 알고리즘 이미지를 이용하여, SageMaker Estimator(https://sagemaker.readthedocs.io/en/stable/estimators.html)로 학습을 실행해 보겠습니다. 로컬에서 먼저 테스트를 진행한 다음 SageMaker환경에서 학습을 실행해 보겠습니다.


In [None]:
import sagemaker
import json
from sagemaker import get_execution_role

role = get_execution_role()
sagemaker_session = sagemaker.Session()
bucket = sagemaker_session.default_bucket()
prefix='mlops/iris'

iris-model을 ECR로 배포합니다.

In [None]:
!chmod +x build_and_push.sh
!./build_and_push.sh iris-model

## 데이터셋 업로드

앞단계에서 학습과 검증 데이터셋을 생성하였습니다. 이 파일을 S3로 업로드합니다.


In [None]:
train_path = sagemaker_session.upload_data(path='input/data/train', key_prefix='iris-model/input/train')
test_path = sagemaker_session.upload_data(path='input/data/validation', key_prefix='iris-model/input/validation')
print("Train: %s\nValidation: %s" % (train_path, test_path) )


## 이제 Sagemaker Estimator를 생성하고 학습과 배포를 실행해 봅니다.

In [None]:
# Create the estimator
# iris-model:test is the name of the container created in the previous notebook
# By the local codebuild test. An image with that name:tag was pushed to the ECR.
iris = sagemaker.estimator.Estimator('iris-model:latest',
 role,
 instance_count=1, 
 instance_type='local',
 output_path='s3://{}/{}/output'.format(bucket, prefix))
hyperparameters = {
 'max_depth': 20,
 'n_jobs': 4,
 'n_estimators': 120
}

print(hyperparameters)
iris.set_hyperparameters(**hyperparameters)

`instance_type`을 `local`로 지정했으므로 `.fit()`을 호출시 SageMaker가 아닌 *로컬 도커 데몬* 환경에서 새로운 학습작업이 실행될 것입니다.

In [None]:
iris.fit({'train': train_path, 'validation': test_path })

다음 명령은 로컬 도커 데몬에 새로운 추론용 컨테이너를 실행하고 이를 테스트할 수 있도록 predictor를 리턴합니다.

In [None]:
iris_predictor = iris.deploy(initial_instance_count=1, instance_type='local')

이제 로컬에 배포된 predictor(https://sagemaker.readthedocs.io/en/stable/predictors.html)를 이용하여 추론 테스트를 진행합니다.

In [None]:
import pandas as pd
import random
from sagemaker.serializers import CSVSerializer
from sagemaker.deserializers import CSVDeserializer

# configure the predictor to do everything for us
iris_predictor.serializer = CSVSerializer()
iris_predictor.deserializer = CSVDeserializer()

# load the testing data from the validation csv
validation = pd.read_csv('input/data/validation/testing.csv', header=None)
idx = random.randint(0,len(validation)-5)
req = validation.iloc[idx:idx+5].values

# cut a sample with 5 lines from our dataset and then split the label from the features.
X = req[:,1:].tolist()
y = req[:,0].tolist()

# call the local endpoint
for features,label in zip(X,y):
 prediction = iris_predictor.predict(features)

 # compare the results
 print("RESULT: {} == {} ? {}".format( label, prediction[0][0], str(label) == str(prediction[0][0])) )

## 테스트가 완료되면 엔드포인트를 삭제합니다.

In [None]:
iris_predictor.delete_endpoint()


## (Local이 아닌) 클라우드 환경에서 Sagemaker Estimator를 생성하고 학습과 배포를 실행해 봅니다.

Estimator의 첫번째 인자인 `image_uri`를 full ecr name으로 지정하고 `instance_type`을 `local`이 아닌 `ml.c4.xlarge`로 바꾸어서 실행합니다. 나머지 설정은 동일합니다.

In [None]:
import boto3

client = boto3.client("sts")
account = client.get_caller_identity()["Account"]

my_session = boto3.session.Session()
region = my_session.region_name

algorithm_name = "iris-model"

ecr_image = "{}.dkr.ecr.{}.amazonaws.com/{}:latest".format(account, region, algorithm_name)

print(ecr_image)

In [None]:
# Create the estimator
# iris-model:test is the name of the container created in the previous notebook
# By the local codebuild test. An image with that name:tag was pushed to the ECR.
iris = sagemaker.estimator.Estimator(ecr_image,
 role,
 instance_count=1, 
 instance_type='ml.c4.xlarge',
 output_path='s3://{}/{}/output'.format(bucket, prefix))
hyperparameters = {
 'max_depth': 20,
 'n_jobs': 4,
 'n_estimators': 120
}

print(hyperparameters)
iris.set_hyperparameters(**hyperparameters)

In [None]:
iris.fit({'train': train_path, 'validation': test_path })

클라우드로 배포하고 엔드포인트를 생성합니다. `instance_type`을 `local`이 아닌 `ml.c4.xlarge`로 변경하였습니다.

In [None]:
iris_predictor = iris.deploy(initial_instance_count=1, instance_type='ml.c4.xlarge')

클라우드에 배포된 predictor(https://sagemaker.readthedocs.io/en/stable/predictors.html)를 이용하여 추론 테스트를 진행합니다.

In [None]:
import pandas as pd
import random
from sagemaker.serializers import CSVSerializer
from sagemaker.deserializers import CSVDeserializer

# configure the predictor to do everything for us
iris_predictor.serializer = CSVSerializer()
iris_predictor.deserializer = CSVDeserializer()

# load the testing data from the validation csv
validation = pd.read_csv('input/data/validation/testing.csv', header=None)
idx = random.randint(0,len(validation)-5)
req = validation.iloc[idx:idx+5].values

# cut a sample with 5 lines from our dataset and then split the label from the features.
X = req[:,1:].tolist()
y = req[:,0].tolist()

# call the local endpoint
for features,label in zip(X,y):
 prediction = iris_predictor.predict(features)

 # compare the results
 print("RESULT: {} == {} ? {}".format( label, prediction[0][0], str(label) == str(prediction[0][0])) )

## 테스트가 완료되면 엔드포인트를 삭제합니다.

In [None]:
iris_predictor.delete_endpoint()


## 클라우드에서 Batch Transform 작업을 실행해 봅니다.



In [None]:
pd.DataFrame(X).to_csv('test.csv', header=None, index=None)

In [None]:
!head input/data/validation/testing.csv

In [None]:
!aws s3 cp data/validation/testing.csv s3://leonkang-datalake-tokyo/iris-testin/test.csv

In [None]:
input_path = 's3://leonkang-datalake-tokyo/iris-testin/'
output_path = 's3://leonkang-datalake-tokyo/iris-testout/'

In [None]:
output_path = 's3://{}/iris-model/output/batchout'.format(bucket)
print(test_path)
print(output_path)

In [None]:
transformer = iris.transformer(instance_count=1,
 instance_type='ml.m4.xlarge',
 output_path=output_path,
 assemble_with='Line',
 accept='text/csv',
 strategy='SingleRecord'
 )

In [None]:
transformer.transform(test_path, content_type='text/csv', split_type='Line', input_filter='$[1:]')

In [None]:
!aws s3 ls {output_path}/

In [None]:
!aws s3 cp {output_path}/testing.csv.out test_out.csv

In [None]:
!cat test_out.csv