# 멀티 프로세스 앙상블

본 노트북에서는 여러분의 데이터를 다양한 모델을 이용하여 학습시킨 후 앙상블을 이용하여 가장 높은 성능을 나타내는 모델을 만든 방법에 대한 예제를 제공합니다.
본 노트북 실행을 위해서는 학습(train), 테스트(test), 검증(validation) 데이터셋을 준비되어 있어야 합니다. 
여러분은 SageMaker Search 기능을 이용하여 가장 높은 성능의 모델을 찾고 새로운 모델에 대하여 배치추론작업을 병렬로 진행하게 될 것입니다.

In [4]:
# !pip install sagemaker==1.72.0 -U

In [5]:
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
import boto3
import os
from sagemaker.amazon.amazon_estimator import get_image_uri
import sagemaker
from sagemaker import get_execution_role
from sklearn.model_selection import train_test_split
import numpy as np

import sagemaker
from random import shuffle
import multiprocessing
from multiprocessing import Pool
import csv
import nltk
from sagemaker.tuner import IntegerParameter, CategoricalParameter, ContinuousParameter, HyperparameterTuner

In [6]:
# put the name of your bucket here
bucket = sagemaker.Session().default_bucket()  # replace with an existing bucket if needed
prefix = 'sagemaker/DEMO-xgboost-dm'           # prefix used for all data stored within the bucket

sess = sagemaker.Session()
role = get_execution_role()
client = boto3.client('sagemaker')

### 1. 학습 데이터셋과 테스트 데이터셋 업로드

데이터셋의 첫번째 컬럼에 목표변수가 레이블 되어 있어야 합니다. 만약 학습데이터셋과 테스트데이터셋이 없다면 본 예제의 다음 노트북을 이용하여 생성합니다. 
- [xgboost_direct_marketing_sagemaker.ipynb](xgboost_direct_marketing_sagemaker.ipynb) 


위 실습에서 생성한 파일을 사용할 때 **SageMaker 노트북 환경에 따라 다음 로컬 경로를 확인합니다.**

In [60]:
# sagemaker notebook인 경우 아래 경로로 확인합니다.
# os.chdir('/home/ec2-user/SageMaker/xgboost/')

# sagemaker studio인 경우 아래 경로로 확인합니다.
# os.chdir('/root/xgboost/')

In [152]:
# !head train.csv

In [116]:
train = pd.read_csv('train.csv', names = list(range(89)))
validation = pd.read_csv('validation.csv', names = list(range(89)))
test = pd.read_csv('test.csv', names = list(range(89)))

In [117]:
train_labels = np.array(train[0]).astype("float32")
train_features = np.array(train.drop(0, axis=1)).astype("float32")
val_labels = np.array(validation[0]).astype("float32")
val_features  = np.array(validation.drop(0, axis=1)).astype("float32")

### 2. 학습을 위한 함수 정의

- 알고리즘을 입력받아서 SageMaker Estimator를 선언하고 리턴하는 함수 (base estimator로부터 알고리즘별 필요한 하이퍼파리미터를 함께 정의하여 리턴함)

In [23]:
def get_base_estimator(clf, sess, role):

    container = get_image_uri(boto3.Session().region_name, clf)

    est = sagemaker.estimator.Estimator(container,
                                    role, 
                                    train_instance_count=1, 
                                    train_instance_type='ml.m4.xlarge',
                                    output_path='s3://{}/{}/output'.format(bucket, clf),
                                    sagemaker_session=sess)
    return est

In [24]:
def get_estimator(clf, sess, role):
    container = get_image_uri(boto3.Session().region_name, clf)
    
    if clf == 'xgboost':
        est = get_base_estimator(clf, sess, role)
        est.set_hyperparameters(max_depth=5,
                        eta=0.2,
                        gamma=4,
                        min_child_weight=6,
                        subsample=0.8,
                        silent=0,
                        objective='binary:logistic',
                        num_round=100)
        
    elif clf == 'linear-learner':
        est = sagemaker.LinearLearner(role=sagemaker.get_execution_role(),
                                               train_instance_count=1,
                                               train_instance_type='ml.m4.xlarge',
                                               predictor_type='binary_classifier',
                                               num_classes=2)

    elif clf == 'knn':
        est = sagemaker.KNN(role=sagemaker.get_execution_role(),
                                              k = 10,
                                               train_instance_count=1,
                                               train_instance_type='ml.m4.xlarge',
                                               predictor_type='classifier',
                                                sample_size = 200)
        
    elif clf == 'factorization-machines':
        est = sagemaker.FactorizationMachines(role=sagemaker.get_execution_role(),
                                               train_instance_count=1,
                                               train_instance_type='ml.m4.xlarge',
                                               predictor_type='binary_classifier',
                                               num_factors = 2)
        
    return est

- train, test, validation 데이터셋을 s3 업로드 (이전 랩에서 업로드한 동일한 위치)

In [146]:
def copy_to_s3(bucket):
    os.system('aws s3 cp train.csv s3://{}/{}/train/train.csv'.format(bucket, prefix))
    os.system('aws s3 cp validation.csv s3://{}/{}/validation/validation.csv'.format(bucket, prefix))
    os.system('aws s3 cp test.csv s3://{}/{}/test/test.csv'.format(bucket, prefix))
    os.system('aws s3 cp test_features.csv s3://{}/{}/test/test_features.csv'.format(bucket, prefix))
        
copy_to_s3(bucket)

- 알고리즘별 HPO 작업을 위한 tuner를 정의하고 리턴하는 함수

In [18]:
def get_tuner(clf, est):
        
    # this should search for the most recent hyperparameter tuning job, pull it in, and use for a warm start
        
    if clf == 'xgboost':
        objective_metric_name = 'validation:auc'

        hyperparameter_ranges = {'eta': ContinuousParameter(0, 1),
                        'min_child_weight': ContinuousParameter(1, 10),
                        'alpha': ContinuousParameter(0, 2),
                        'max_depth': IntegerParameter(1, 10)}
        
    elif clf == 'knn':
        
        objective_metric_name = 'test:accuracy'

        hyperparameter_ranges = {'k': IntegerParameter(1, 1024),
                        'sample_size': IntegerParameter(256, 20000000)}
        
    elif clf == 'linear-learner':
        objective_metric_name = 'test:recall'
        
        hyperparameter_ranges = {'l1': ContinuousParameter(0.0000001,1),
                            'use_bias': CategoricalParameter([True, False])}
        
    elif clf == 'factorization-machines':
        objective_metric_name = 'test:binary_classification_accuracy'
        
        hyperparameter_ranges = {'bias_wd': IntegerParameter(1, 1000)}
        
    tuner = HyperparameterTuner(est,
                    objective_metric_name,
                    hyperparameter_ranges,
                    max_jobs=30,
                    max_parallel_jobs=3)
    
    return tuner

- 알고리즘별로 하이퍼파라미터 튜닝작업을 설정하고 실행하는 함수

In [19]:
def run_training_job(clf):

    
    # this should loop through splits in k-fold cross validation
    
    # build the estimator
    est = get_estimator(clf, sess, role)

    # get the hyperparameter tuner config 
    if clf == 'xgboost':
        
        tuner = get_tuner(clf, est)
        
        
        tuner.fit({'train': s3_input_train, 'validation': s3_input_validation}) 

    else:
        # set the records
        train_records = est.record_set(train_features, train_labels, channel='train')
        test_records = est.record_set(val_features, val_labels, channel='validation')

        tuner = get_tuner(clf, est)
        
        tuner.fit([train_records, test_records])

- 알고리즘 리스트를 입력받고 병렬로 학습을 실행시크는 함수

In [20]:
def magic_loop(models_to_run):
    pool = Pool(processes=multiprocessing.cpu_count())
    transformed_rows = pool.map(run_training_job, models_to_run)
    pool.close() 
    pool.join()

- 학습과 검증용 데이터셋 정의

In [36]:
s3_input_train = sagemaker.s3_input(s3_data='s3://{}/{}/train'.format(bucket, prefix), content_type='csv')

s3_input_validation = sagemaker.s3_input(s3_data='s3://{}/{}/validation'.format(bucket, prefix), content_type='csv')

's3_input' class will be renamed to 'TrainingInput' in SageMaker Python SDK v2.
's3_input' class will be renamed to 'TrainingInput' in SageMaker Python SDK v2.


### 3. 알고리즘별 학습 실행

In [38]:
# clfs = ['xgboost', 'linear-learner', 'factorization-machines', 'knn']

clfs = [ 'xgboost']

magic_loop(clfs)

'get_image_uri' method will be deprecated in favor of 'ImageURIProvider' class in SageMaker Python SDK v2.
There is a more up to date SageMaker XGBoost image. To use the newer image, please set 'repo_version'='1.0-1'. For example:
	get_image_uri(region, 'xgboost', '1.0-1').
'get_image_uri' method will be deprecated in favor of 'ImageURIProvider' class in SageMaker Python SDK v2.
There is a more up to date SageMaker XGBoost image. To use the newer image, please set 'repo_version'='1.0-1'. For example:
	get_image_uri(region, 'xgboost', '1.0-1').
Parameter image_name will be renamed to image_uri in SageMaker Python SDK v2.


SageMaker 콘솔에서 Hyperparameter tuning jobs 메뉴를 통해 학습작업이 잘 진행되는지 확인합니다.

### 4. 최고 성능 모델의 선택

학습이 완료되면, SageMaker search 기능을 이용하여 방금 실행한 학습작업 중 가장 높은 성능을 내는 모델을 검색합니다.     



In [73]:
import boto3
smclient = boto3.client(service_name='sagemaker')
import datetime

# Search the training job by Amazon S3 location of model artifacts
search_params={
   "MaxResults": 100,
   "Resource": "TrainingJob",
   "SearchExpression": { 
      "Filters": [ 
         { 
            "Name": "InputDataConfig.DataSource.S3DataSource.S3Uri",
            "Operator": "Contains",
             
             # set this to have a word that is in your bucket name
            "Value": '{}/{}'.format(bucket, prefix)
         },
        { 
            "Name": "TrainingJobStatus",
            "Operator": "Equals",
            "Value": 'Completed'
         }, 
    ],
     
   },
    
    "SortBy": "Metrics.validation:auc",
    "SortOrder": "Descending"
}
results = smclient.search(**search_params)

- `Metrics.validation:auc`기준으로 정렬한 15개의 작업결과 리턴

In [83]:
from sagemaker.model import Model

def get_models(results):

    role = sagemaker.get_execution_role()
    models = []

    for each in results['Results']:
        job_name = each['TrainingJob']['TrainingJobName']
        artifact = each['TrainingJob']['ModelArtifacts']['S3ModelArtifacts']

        # get training image
        image =  each['TrainingJob']['AlgorithmSpecification']['TrainingImage']
        m = Model(artifact, image, role = role, sagemaker_session = sess, name = job_name)
        models.append(m)
        
    return models[:15]

models = get_models(results)

Parameter image will be renamed to image_uri in SageMaker Python SDK v2.
Parameter image will be renamed to image_uri in SageMaker Python SDK v2.
Parameter image will be renamed to image_uri in SageMaker Python SDK v2.
Parameter image will be renamed to image_uri in SageMaker Python SDK v2.
Parameter image will be renamed to image_uri in SageMaker Python SDK v2.
Parameter image will be renamed to image_uri in SageMaker Python SDK v2.
Parameter image will be renamed to image_uri in SageMaker Python SDK v2.
Parameter image will be renamed to image_uri in SageMaker Python SDK v2.
Parameter image will be renamed to image_uri in SageMaker Python SDK v2.
Parameter image will be renamed to image_uri in SageMaker Python SDK v2.
Parameter image will be renamed to image_uri in SageMaker Python SDK v2.
Parameter image will be renamed to image_uri in SageMaker Python SDK v2.
Parameter image will be renamed to image_uri in SageMaker Python SDK v2.
Parameter image will be renamed to image_uri in Sag

### 5. 배치 추론 앙상블

이제 각 모델에 대하여 개별적으로 배치 추론작업을 실행합니다.


In [147]:
def run_batch_transform(model, bucket):

    transformer = model.transformer(
        instance_count=1,
        instance_type='ml.m4.xlarge',
        output_path='s3://{}/{}/batch_results/{}'.format(bucket, prefix, model.name)
    )

    transformer.transform(data='s3://{}/{}/test/test_features.csv'.format(bucket, prefix), content_type='text/csv')

    
for model in models:
    run_batch_transform(model, bucket)

Using already existing model: xgboost-201019-1005-021-5865b5f5
Using already existing model: xgboost-201019-1005-016-763f7a27
Using already existing model: xgboost-201019-1005-019-d7cc687a
Using already existing model: xgboost-2020-09-09-08-23-45-285
Using already existing model: xgboost-201019-1005-028-7681edf7
Using already existing model: xgboost-201019-1005-006-2738e46e
Using already existing model: xgboost-201019-1005-005-46f5941e
Using already existing model: xgboost-201019-1005-018-80900beb
Using already existing model: xgboost-201019-1005-024-9b43007a
Using already existing model: xgboost-201019-1005-004-cc50bc0b
Using already existing model: xgboost-201019-1005-029-cfd48ce3
Using already existing model: xgboost-201019-1005-015-bc230053
Using already existing model: xgboost-201019-1005-023-0948411e
Using already existing model: xgboost-201019-1005-012-be7c2277
Using already existing model: xgboost-201019-1005-020-30fb5f78


15개의 작업이 병렬로 수행되고 작업이 완료되기까지 수 분이 걸립니다.  
SageMaker 콘솔의 Inference > Batch Transofrm Jobs 메뉴에서 진행상태를 모니터링할 수 있습니다.

### 6. 배치추론 결과 합치기 

배치 추론작업이 완료되면 추론 결과를 취합합니다. 개별 결과 중 최대 confidence를 보이는 값을 예측 결과로 선택하고 단일 XGBoost 모델을 사용하는 것과 비교하여 얼마나 잘 수행되는지 비교해 보겠습니다. 

다음 셀은 S3에 저장된 배추 추론 결과를 로컬 노트북 환경으로 복사합니다. 


In [173]:
os.system('aws s3 sync s3://{}/{}/batch_results/ batch_results/'.format(bucket, prefix))


0

In [174]:
def get_dataframe():
    '''
    Loops through the directory on your local notebook instance where the batch results were stored, 
        and generates a dataframe where each column is the output from a different model.
    '''
    frames  = []
    
    for sub_dir in os.listdir('batch_results'):
        if '.ipynb' not in sub_dir and '.out' not in sub_dir:

            old_file = 'batch_results/{}/test.csv.out'.format(sub_dir)
            
            new_file = 'batch_results/{}/test.csv'.format(sub_dir)
            
            # remove the .out file formate
            os.system('cp {} {}'.format( old_file, new_file))
            
            df = pd.read_csv('batch_results/{}/test.csv'.format(sub_dir), names = [sub_dir])

            frames.append(df)
            
    df = pd.concat(frames, axis=1)
                
    return df

In [175]:
def consolidate_results(df):

    df['max'] = 0
    df['min'] = 0
    df['diff'] = 0

    for idx, row in df.iterrows():

        top = max(row)
        bottom = min(row)

        diff = top - bottom

        df.loc[idx, 'max'] = top
        df.loc[idx, 'min'] = bottom
        df.loc[idx, 'diff'] = diff

    return df

bare_df = get_dataframe()
consolidated_df = consolidate_results(bare_df)

- test.csv의 실제값으로부터 `y_true` 컬럼 추가

In [179]:
def add_label_to_results(df):
    test_data = pd.read_csv('test.csv', header=None)
    y_true = test_data[0].values.tolist()
    df['y_true'] = y_true
    return df
    
df = add_label_to_results(consolidated_df)

### 7. 혼돈행력(Confusion Matrix) 생성

마지막으로 모델의 성능을 비교해 보고 앙상블이 도움이 되었는지 확인해 봅니다.


In [181]:
def get_confusion_matrix(df, model_column, accuracy=None):
    
    mx = pd.crosstab(index=df['y_true'], columns=np.round(df[model_column]), rownames=['actuals'], colnames=['predictions'])

    # lower right corner
    tps = mx.iloc[1, 1]
    # upper right corner
    fps = mx.iloc[0, 1]
    # lower left corner
    fns = mx.iloc[1, 0]

    precision = np.round(tps / (tps + fns), 4) * 100
    recall = np.round(tps / (tps + fps), 4) * 100
    print ('Precision = {}%, Recall = {}%'.format(precision, recall))
    
    if accuracy:
        # upper left corner 
        tns = mx.iloc[0, 0]
        accuracy = (tps + tns) / (fns + fps + tps + tns) * 100
        print ('Overall binary classification accuracy = {}%'.format(accuracy))
        
    return mx

### 앙상블 적용전(단일 모델) 

In [193]:
one_model = df.columns[0]
one_model

'xgboost-201019-1005-006-2738e46e'

In [194]:
get_confusion_matrix(df,one_model, accuracy=True)

Precision = 21.95%, Recall = 67.95%
Overall binary classification accuracy = 89.63340616654529%


predictions,0.0,1.0
actuals,Unnamed: 1_level_1,Unnamed: 2_level_1
0,3586,50
1,377,106


### 앙상블 적용 후

In [195]:
get_confusion_matrix(df, 'max', accuracy=True)

Precision = 28.16%, Recall = 60.440000000000005%
Overall binary classification accuracy = 89.41490653071133%


predictions,0.0,1.0
actuals,Unnamed: 1_level_1,Unnamed: 2_level_1
0,3547,89
1,347,136


본 예제에서는 앙상블 적용 후 precision은 7% 정도 증가한 결과를 보인 반면 recall 은 7%정도 줄어들었습니다. 전체적인 분류성능은 크게 달라지지 않았습니다.  
여러분의 데이터셋으로 다른 머신러닝 모델과의 앙상블을 실행해보고 결과를 비교해 보십시오.
