# SageMaker Batch Transform Inference job with Hugging Face Transformers
---



## Introduction
---

SageMaker 리얼타임 엔드포인트(SageMaker real-time endpoint)는 실시간으로 추론 결괏값을 빠른 응답속도 내에 전송받을 수 있지만, **호스팅 서버가 최소 1대 이상 구동**되어야 하므로 비용적인 측면에서 부담이 됩니다. 이런 경우 아래의 유즈케이스들에 해당하면 SageMaker 배치 변환(batch transform) 기능을 사용해 훈련 인스턴스처럼 배치 변환을 수행하는 때에만 컴퓨팅 인스턴스를 사용하여 비용을 절감할 수 있습니다.

- 일/주/월 단위 정기적인 마케팅 캠페인이나 실시간 추천이 필요 없는 경우 전체 데이터셋에 대한 추론 결괏값 계산
- 일부 추론 결괏값을 데이터베이스나 스토리지에 저장
- SageMaker 호스팅 엔드포인트가 제공하는 1초 미만의 대기 시간이 필요하지 않은 경우

자세한 내용은 아래 웹페이지를 참조해 주세요.
- Amazon SageMaker Batch Transform: (https://docs.aws.amazon.com/sagemaker/latest/dg/how-it-works-batch.html) 
- API docs: https://sagemaker.readthedocs.io/en/stable/overview.html#sagemaker-batch-transform

In [None]:
import csv
import json
import sys
import numpy as np
import logging
import sagemaker
from sagemaker.s3 import S3Uploader, s3_path_join
from sagemaker.huggingface import HuggingFaceModel

logging.basicConfig(
    level=logging.INFO, 
    format='[{%(filename)s:%(lineno)d} %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler(filename='tmp.log'),
        logging.StreamHandler(sys.stdout)
    ]
)
logger = logging.getLogger(__name__)

sess = sagemaker.Session()
role = sagemaker.get_execution_role()

# sagemaker session bucket -> used for uploading data, models and logs
# sagemaker will automatically create this bucket if it not exists
sagemaker_session_bucket=None
if sagemaker_session_bucket is None and sess is not None:
    # set to default bucket if a bucket name is not given
    sagemaker_session_bucket = sess.default_bucket()

<br>

## 1. Run Batch Transform after training a model 
---

추론을 수행할 데이터셋에 대한 S3 uri를 지정하고 transformer job을 실행하면, SageMaker는 배치 변환을 위한 컴퓨팅 인스턴스를 프로비저닝 후 S3에 저장된 데이터셋을 다운로드하고 추론을 수행한 후, 결과를 S3에 업로드합니다.


```python
batch_job = huggingface_estimator.transformer(
    instance_count=1,
    instance_type='ml.c5.2xlarge',
    strategy='SingleRecord')

batch_job.transform(
    data='s3://s3-uri-to-batch-data',
    content_type='application/json',    
    split_type='Line')
```

### Data Pre-Processing

배치 변환을 위해 https://github.com/e9t/nsmc/ 에 공개된 네이버 영화 리뷰 테스트 데이터셋을 다운로드합니다. 테스트 데이터셋은 총 5만건입니다.

In [None]:
!wget https://raw.githubusercontent.com/e9t/nsmc/master/ratings_test.txt

In [None]:
num_lines = sum(1 for line in open('ratings_test.txt')) - 1
y_true = np.zeros((num_lines))

Hugging Face 추론 컨테이너는 `{'input' : '입력 데이터'}` 포맷으로 된 요청을 인식하기에 테스트 데이터셋을 아래와 같은 포맷으로 변환해야 합니다.

```json
{'inputs': '뭐야 이 평점들은.... 나쁘진 않지만 10점 짜리는 더더욱 아니잖아'}
{'inputs': '지루하지는 않은데 완전 막장임... 돈주고 보기에는....'}
{'inputs': '3D만 아니었어도 별 다섯 개 줬을텐데.. 왜 3D로 나와서 제 심기를 불편하게 하죠??'}
{'inputs': '음악이 주가 된, 최고의 음악영화'}
...
```

In [None]:
dataset_csv_file = 'ratings_test.txt'
dataset_jsonl_file="./ratings_test.jsonl"
with open(dataset_csv_file, "r+") as infile, open(dataset_jsonl_file, "w+") as outfile:
    reader = csv.DictReader(infile, delimiter="\t")
    for idx, row in enumerate(reader):
        row_dict = {'inputs': row['document']}

        y_true[idx] = row['label']
        json.dump(row_dict, outfile)
        outfile.write('\n')

### Upload dataset to S3

In [None]:
# uploads a given file to S3.
input_s3_path = s3_path_join("s3://", sagemaker_session_bucket, "batch_transform/input")
output_s3_path = s3_path_join("s3://", sagemaker_session_bucket, "batch_transform/output")
s3_file_uri = S3Uploader.upload(dataset_jsonl_file, input_s3_path)

print(f"{dataset_jsonl_file} uploaded to {s3_file_uri}")

### Create Inference Transformer to run the batch job

In [None]:
# Hub Model configuration. <https://huggingface.co/models>
hub = {
    'HF_MODEL_ID':'daekeun-ml/koelectra-small-v3-nsmc', # model_id from hf.co/models
    'HF_TASK':'text-classification' # NLP task you want to use for predictions
}

# create Hugging Face Model Class
huggingface_model = HuggingFaceModel(
    env=hub,                        # configuration for loading model from Hub
    role=role,                      # iam role with permissions to create an Endpoint
    transformers_version="4.12.3",  # transformers version used
    pytorch_version="1.9.1",        # pytorch version used
    py_version='py38',              # python version used
)

# create Transformer to run our batch job
batch_job = huggingface_model.transformer(
    instance_count=1,              # number of instances used for running the batch job
    instance_type='ml.g4dn.xlarge',# instance type for the batch job
    output_path=output_s3_path,    # we are using the same s3 path to save the output with the input
    strategy='SingleRecord')       # How we are sending the "requests" to the endpoint


In [None]:
# starts batch transform job and uses s3 data as input
batch_job.transform(
    data=s3_file_uri,               # preprocessed file location on s3 
    content_type='application/json',# mime-type of the file    
    split_type='Line', # how the datapoints are split, here lines since it is `.jsonl`
    wait=False
)             

### Wait for the batch transform jobs to complete

배치 추론 작업이 완료될 때까지 기다립니다. 약 15분의 시간이 소요됩니다.

In [None]:
batch_job.wait(logs=True)

<br>

## 2. Access Prediction file
---


배치 변환 작업이 성공적으로 완료되면 `.out` 확장자의 출력 파일이 S3에 저장됩니다. `Transformer`에서 `join_source` 매개변수를 사용해서 입력 파일과 출력 파일을 병합하는 것도 가능하며, 자세한 내용은 (https://docs.aws.amazon.com/sagemaker/latest/dg/batch-transform-data-processing.html 를 참조해 주세요.

In [None]:
import os
import json
from sagemaker.s3 import S3Downloader
from ast import literal_eval
# creating s3 uri for result file -> input file + .out
batch_transform_dir = './batch'
!rm -rf {batch_transform_dir}
os.makedirs(batch_transform_dir, exist_ok=True)

output_file = f"{dataset_jsonl_file}.out"
local_output_path = f"{batch_transform_dir}/{output_file}"
output_s3_filepath = s3_path_join(output_s3_path, output_file)

logger.info(output_s3_filepath)

# download file
S3Downloader.download(output_s3_filepath, batch_transform_dir)

### Processing data

예측 레이블 및 확률값을 받아옵니다.

In [None]:
# Read in the file
with open(local_output_path, 'r') as file :
    filedata = file.read()

# Replace the target string
filedata = filedata.replace('][', '\n').replace('[', '').replace(']', '')

# Write the file out again
with open(f'{batch_transform_dir}/file.txt', 'w') as file:
    file.write(filedata)

In [None]:
import numpy as np
y_pred = np.zeros((num_lines), dtype='int')
y_score = np.zeros((num_lines), dtype='float32')

In [None]:
batch_transform_result = []
with open(f'{batch_transform_dir}/file.txt') as f:
    for idx, line in enumerate(f):
        result = literal_eval(line)
        y_pred[idx] = result['label']
        y_score[idx] = result['score']

### Confusion Matrix

In [None]:
def plot_confusion_matrix(cm, target_names=None, cmap=None, normalize=True, labels=True, title='Confusion matrix'):
    import itertools
    import matplotlib.pyplot as plt
    accuracy = np.trace(cm) / float(np.sum(cm))
    misclass = 1 - accuracy

    if cmap is None:
        cmap = plt.get_cmap('Blues')

    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
        
    plt.figure(figsize=(8, 6))
    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()

    thresh = cm.max() / 1.5 if normalize else cm.max() / 2
    
    if target_names is not None:
        tick_marks = np.arange(len(target_names))
        plt.xticks(tick_marks, target_names)
        plt.yticks(tick_marks, target_names)
    
    if labels:
        for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
            if normalize:
                plt.text(j, i, "{:0.4f}".format(cm[i, j]),
                         horizontalalignment="center",
                         color="white" if cm[i, j] > thresh else "black")
            else:
                plt.text(j, i, "{:,}".format(cm[i, j]),
                         horizontalalignment="center",
                         color="white" if cm[i, j] > thresh else "black")

    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label\naccuracy={:0.4f}; misclass={:0.4f}'.format(accuracy, misclass))
    plt.show()

In [None]:
from sklearn.metrics import confusion_matrix

cf = confusion_matrix(y_true, y_pred)
plot_confusion_matrix(cf, normalize=False)

In [None]:
from sklearn.metrics import confusion_matrix, classification_report
print(classification_report(y_true, y_pred, target_names=['0','1']))