# [모듈 3.2] 전처리 스텝 개발 (SageMaker Model Building Pipeline 전처리 스텝)

이 노트북은 "모델 전처리" 스텝을 정의하고, 모델 빌딩 파이프라인을 생성하여 실행하는 노트북 입니다.
아래의 목차와 같이 노트북 실행이 될 예정이고
전체를 모두 실행시에 완료 시간은 약 5분-10분 소요 됩니다.

- 1. 전처리 개요 (SageMaker Processing 이용)
- 2. 기본 라이브러리 로딩
- 3. 원본 데이터 파일 확인 및 전처리 코드 로직 확인
- 4. 모델 빌딩 파이프라인 의 스텝(Step) 생성
- 5. 파리마터, 단계, 조건을 조합하여 최종 파이프라인 정의 및 실행
- 6. 세이지 메이커 스튜디오에서 확인하기
- 7. 전처리 파일 경로 추출

---
## SageMaker 파이프라인 소개

![mdp_how_it_works.png](img/mdp_how_it_works.png)



SageMaker 파이프라인은 다음 기능을 지원하며 본 lab_03_pipelinie 에서 일부를 다루게 됩니다. 

* Processing job steps - 데이터처러 워크로드를 실행하기 위한 SageMaker의 관리형 기능. Feature engineering, 데이터 검증, 모델 평가, 모델 해석 등에 주로 사용됨 
* Training job steps - 학습작업. 모델에게 학습데이터셋을 이용하여 모델에게 예측을 하도록 학습시키는 작업 
* Conditional execution steps - 조건별 실행분기. 파이프라인을 분기시키는 역할.
* Register model steps - 학습이 완료된 모델패키지 리소스를 이후 배포를 위한 모델 레지스트리에 등록하기 
* Create model steps - 추론 엔드포인트 또는 배치 추론을 위한 모델의 생성 
* Transform job steps - 배치추론 작업. 배치작업을 이용하여 노이즈, bias의 제거 등 데이터셋을 전처리하고 대량데이터에 대해 추론을 실행하는 단계
* Pipelines - Workflow DAG. SageMaker 작업과 리소스 생성을 조율하는 단계와 조건을 가짐
* Parametrized Pipeline executions - 특정 파라미터에 따라 파이프라인 실행방식을 변화시키기 


- 상세한 개발자 가이드는 아래 참조 하세요.
    - [세이지 메이커 모델 빌딩 파이프라인의 개발자 가이드](https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines.html)

---
### 노트북 커널
- 이 워크샵은 노트북 커널이 `conda_python3` 를 사용합니다. 다른 커널일 경우 변경 해주세요.
---


# 1. 전처리 개요 (SageMaker Processing 이용)

이 노트북은 세이지 메이커의 Processing Job을 통해서 데이터 전처리를 합니다. <br>
상세한 사항은 개발자 가이드를 참조 하세요. -->  [SageMaker Processing](https://docs.aws.amazon.com/ko_kr/sagemaker/latest/dg/processing-job.html)


![Processing-1.png](img/Processing-1.png)
- 일반적으로 크게 아래 4가지의 스텝으로 진행이 됩니다.

    - (1) S3에 입력 파일 준비
    - (2) 전처리를 수행하는 코드 준비
    - (3) Projcessing Job을 생성시에 아래와 같은 항목을 제공합니다.
        - Projcessing Job을 실행할 EC2(예: ml.m4.2xlarge) 기술
        - EC2에서 로딩할 다커 이미지의 이름 기술
        - S3 입력 파일 경로
        - 전처리 코드 경로
        - S3 출력 파일 경로
    - (4) EC2에서 전치리 실행 하여 S3 출력 위치에 저장



## 1.2 프로세싱 스텝 
- 프로세싱 단계의 개발자 가이드 
    - [프로세싱 스텝](https://docs.aws.amazon.com/ko_kr/sagemaker/latest/dg/build-and-manage-steps.html#step-type-processing)



# 2. 기본 라이브러리 로딩

세이지 메이커 관련 라이브러리를 로딩 합니다.

In [22]:
import boto3
import sagemaker
import pandas as pd
from IPython.display import display as dp

region = boto3.Session().region_name
sagemaker_session = sagemaker.session.Session()
role = sagemaker.get_execution_role()

## 2.1 노트북 변수 로딩


저장된 변수를 확인 합니다.

In [23]:
%store

Stored variables and their in-db values:
bucket                         -> 'sagemaker-us-east-1-585843180719'
claims_data_uri                -> 's3://sagemaker-us-east-1-585843180719/sagemaker-w
customers_data_uri             -> 's3://sagemaker-us-east-1-585843180719/sagemaker-w
input_data_uri                 -> 's3://sagemaker-us-east-1-585843180719/sagemaker-w
preprocessing_code             -> 'src/preprocessing.py'
project_prefix                 -> 'sagemaker-webinar-pipeline-base'


In [24]:
%store -r

# 3. 원본 데이터 파일 확인 및 전처리 코드 로직 확인


## 3.1. 전처리에서 사용할 원본 데이터를 확인
- 고객 데어터
- 보험 청구 데이터

In [25]:
import os

data_dir = '../data/raw'
local_claim_data_path = f"{data_dir}/claims.csv"
local_customers_data_path = f"{data_dir}/customers.csv"


In [26]:
customers_data_df = pd.read_csv(local_customers_data_path)
customers_data_df.head()

Unnamed: 0,policy_id,customer_age,months_as_customer,num_claims_past_year,num_insurers_past_5_years,policy_state,policy_deductable,policy_annual_premium,policy_liability,customer_zip,customer_gender,customer_education,auto_year
0,1,54,94,0,1,WA,750,3000,25/50,99207,Unkown,Associate,2006
1,2,41,165,0,1,CA,750,2950,15/30,95632,Male,Bachelor,2012
2,3,57,155,0,1,CA,750,3000,15/30,93203,Female,Bachelor,2017
3,4,39,80,0,1,AZ,750,3000,30/60,85208,Female,Advanced Degree,2020
4,5,39,60,0,1,CA,750,3000,15/30,91792,Female,High School,2018


In [27]:
claims_data_df = pd.read_csv(local_claim_data_path)
claims_data_df.head()

Unnamed: 0,policy_id,driver_relationship,incident_type,collision_type,incident_severity,authorities_contacted,num_vehicles_involved,num_injuries,num_witnesses,police_report_available,injury_claim,vehicle_claim,total_claim_amount,incident_month,incident_day,incident_dow,incident_hour,fraud
0,1,Spouse,Collision,Front,Minor,,2,0,0,No,71600,8913.668763,80513.668763,3,17,6,8,0
1,2,Self,Collision,Rear,Totaled,Police,3,4,0,Yes,6400,19746.724395,26146.724395,12,11,2,11,0
2,3,Self,Collision,Front,Minor,Police,2,0,1,Yes,10400,11652.969918,22052.969918,12,24,1,14,0
3,4,Child,Collision,Side,Minor,,2,0,0,No,104700,11260.930936,115960.930936,12,23,0,19,0
4,5,Self,Collision,Side,Major,Police,2,1,0,No,3400,27987.704652,31387.704652,5,8,2,8,0


## 3.2 전처리 로직 로컬에서 실행

In [28]:
preprocessing_code = 'src/preprocessing.py'
%store preprocessing_code

Stored 'preprocessing_code' (str)


### 로컬 환경 셋업 

- 로컬에서 테스트 하기 위해 세이지메이커의 다커 컨테이너와 같은 환경을 생성합니다.
- split_rate = 0.2 로 해서 훈련 및 테스트 데이터 세트의 비율을 8:2로 정합니다.

In [29]:
import os

# 도커 컨테이너의 출력 폴더와 비슷한 환경 기술
# 아래 경로 : opt/ml/processing/output
# 도커 경로 : /opt/ml/processing/output
base_output_dir = 'opt/ml/processing/output' 

# 도커 컨테이너의 입력 폴더와 비슷한 환경 기술
base_preproc_input_dir = 'opt/ml/processing/input'
os.makedirs(base_preproc_input_dir, exist_ok=True)

# 출력 훈련 폴더를 기술 합니다.
base_preproc_output_train_dir = 'opt/ml/processing/output/train/'
os.makedirs(base_preproc_output_train_dir, exist_ok=True)

# 출력 테스트 폴더를 기술 합니다.
base_preproc_output_test_dir = 'opt/ml/processing/output/test/'
os.makedirs(base_preproc_output_test_dir, exist_ok=True)


split_rate = 0.2

#### claims.csv, customers.csv 를 다커환경과 비슷한 경로로 복사

In [30]:
! cp {local_claim_data_path} {base_preproc_input_dir}
! cp {local_customers_data_path} {base_preproc_input_dir}

### 로컬에서 스크립트 실행
전처리 코드에서 제공하는 로그를 통해서, 전처리 수행 내역을 확인 합니다.

In [31]:
! python {preprocessing_code} --base_preproc_input_dir {base_preproc_input_dir} \
                              --base_output_dir {base_output_dir} \
                              --split_rate {split_rate}

######### Argument Info ####################################
args.base_output_dir: opt/ml/processing/output
args.base_preproc_input_dir: opt/ml/processing/input
args.label_column: fraud
args.split_rate: 0.2

### Loading Claim Dataset
input_files: 
 ['opt/ml/processing/input/claims.csv']
dataframe shape 
 (5000, 17)
dataset sample 
           driver_relationship incident_type  ... incident_hour fraud
policy_id                                    ...                    
1                      Spouse     Collision  ...             8     0
2                        Self     Collision  ...            11     0

[2 rows x 17 columns]

### Loading Customer Dataset
input_files: 
 ['opt/ml/processing/input/customers.csv']
dataframe shape 
 (5000, 12)
dataset sample 
            customer_age  months_as_customer  ...  customer_education  auto_year
policy_id                                    ...                               
1                    54                  94  ...           Associate      

### 전처리된 데이터 확인
실제로 전처리 된 파일의 내역을 확인 합니다.
훈련 및 테스트 세트의 fraud 의 분포를 확인 합니다. (0: non-fruad, 1: fraud)

In [32]:
preprocessed_train_path = os.path.join(base_output_dir + '/train/train.csv')
preprocessed_test_path = os.path.join(base_output_dir + '/test/test.csv')

preprocessed_train_df = pd.read_csv(preprocessed_train_path)
preprocessed_test_df = pd.read_csv(preprocessed_test_path)

preprocessed_train_df.head()

Unnamed: 0,fraud,vehicle_claim,total_claim_amount,customer_age,months_as_customer,num_claims_past_year,num_insurers_past_5_years,policy_deductable,policy_annual_premium,customer_zip,...,collision_type_missing,incident_severity_Major,incident_severity_Minor,incident_severity_Totaled,authorities_contacted_Ambulance,authorities_contacted_Fire,authorities_contacted_None,authorities_contacted_Police,police_report_available_No,police_report_available_Yes
0,0,8913.668763,80513.668763,54,94,0,1,750,3000,99207,...,0,0,1,0,0,0,1,0,1,0
1,0,19746.724395,26146.724395,41,165,0,1,750,2950,95632,...,0,0,0,1,0,0,0,1,0,1
2,0,11652.969918,22052.969918,57,155,0,1,750,3000,93203,...,0,0,1,0,0,0,0,1,0,1
3,0,11260.930936,115960.930936,39,80,0,1,750,3000,85208,...,0,0,1,0,0,0,1,0,1,0
4,0,27987.704652,31387.704652,39,60,0,1,750,3000,91792,...,0,1,0,0,0,0,0,1,1,0


첫번때 데이터 세트는 훈련 데이터 세트, 두번째는 테스트 데이터 세트 입니다. 각각의 fraud의 비율을 확인하세요.

In [33]:
print("Train Data Set: ")
dp(preprocessed_train_df[['fraud']].value_counts())

print("\nTest Data Set:")
dp(preprocessed_test_df[['fraud']].value_counts())

Train Data Set: 


fraud
0        3869
1         131
dtype: int64


Test Data Set:


fraud
0        967
1         33
dtype: int64

# 4. 모델 빌딩 파이프라인 의 스텝(Step) 생성


## 4.1 모델 빌딩 파이프라인 변수 생성

파이프라인에서 사용할 파이프라인 파라미터를 정의합니다. 파이프라인을 스케줄하고 실행할 때 파라미터를 이용하여 실행조건을 커스마이징할 수 있습니다. 파라미터를 이용하면 파이프라인 실행시마다 매번 파이프라인 정의를 수정하지 않아도 됩니다.

지원되는 파라미터 타입은 다음과 같습니다:

* `ParameterString` - 파이썬 타입에서 `str` 
* `ParameterInteger` - 파이썬 타입에서 `int` 
* `ParameterFloat` - 파이썬 타입에서 `float` 

이들 파라미터를 정의할 때 디폴트 값을 지정할 수 있으며 파이프라인 실행시 재지정할 수도 있습니다. 지정하는 디폴트 값은 파라미터 타입과 일치하여야 합니다.


파이프라인의 각 스텝에서 사용할 변수를 파라미터 변수로서 정의 합니다.


In [34]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
)

processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1
)
processing_instance_type = ParameterString(
    name="ProcessingInstanceType",
    default_value="ml.m5.xlarge"
)

input_data = ParameterString(
    name="InputData",
    default_value=input_data_uri,
)


## 4.2. 전처리 스텝 프로세서 정의
- 전처리의 내장 SKLearnProcessor 를 통해서 sklearn_processor 오브젝트를 생성 합니다.

In [51]:
from sagemaker.sklearn.processing import SKLearnProcessor

framework_version = "1.0-1"

sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    base_job_name="sklearn-fraud-process",
    role=role,
)


The input argument instance_type of function (sagemaker.image_uris.retrieve) is a pipeline variable (<class 'sagemaker.workflow.parameters.ParameterString'>), which is not allowed. The default_value of this Parameter object will be used to override it. Please make sure the default_value is valid.


## 4.3. 전처리 스텝 단계 정의
- 처리 단계에서는 아래와 같은 주요 인자가 있습니다.
    - 단계 이름
    - processor 기술: 위에서 생성한 processor 오브젝트를 제공
    - inputs: S3의 경로를 기술하고, 다커안에서의 다운로드 폴더(destination)을 기술 합니다.
    - outputs: 처리 결과가 저장될 다커안에서의 폴더 경로를 기술합니다.
        - 다커안의 결과 파일이 저장 후에 자동으로 S3로 업로딩을 합니다.
    - job_arguments: 사용자 정의의 인자를 기술 합니다.
    - code: 전처리 코드의 경로를 기술 합니다.
- 처리 단계의 상세한 사항은 여기를 보세요. --> [처리 단계, Processing Step](https://docs.aws.amazon.com/ko_kr/sagemaker/latest/dg/build-and-manage-steps.html#step-type-processing)

In [52]:
preprocessing_code

'src/preprocessing.py'

In [53]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
    

step_process = ProcessingStep(
    name="Fraud-Basic-Process",
    processor=sklearn_processor,
    inputs=[
        ProcessingInput(source=input_data_uri,destination='/opt/ml/processing/input'),
         ],
    outputs=[ProcessingOutput(output_name="train",
                              source='/opt/ml/processing/output/train'),
             ProcessingOutput(output_name="test",
                              source='/opt/ml/processing/output/test')],
    job_arguments=["--split_rate", f"{split_rate}"],    
    code= preprocessing_code
)


# 5. 파리마터, 단계, 조건을 조합하여 최종 파이프라인 정의 및 실행


이제 지금까지 생성한 단계들을 하나의 파이프라인으로 조합하고 실행하도록 하겠습니다.

파이프라인은 name, parameters, steps 속성이 필수적으로 필요합니다. 
여기서 파이프라인의 이름은 (account, region) 조합에 대하여 유일(unique))해야 합니다.


주의:

- 정의에 사용한 모든 파라미터가 존재해야 합니다.
- 파이프라인으로 전달된 단계(step)들은 실행순서와는 무관합니다. SageMaker Pipeline은 단계가 실행되고 완료될 수 있도록 의존관계를를 해석합니다.
- [알림] 정의한 stpes 이 복수개이면 복수개를 기술합니다. 만약에 step 간에 의존성이 있으면, 명시적으로 기술하지 않아도 같이 실행 됩니다.



## 5.1 파이프라인 정의


In [54]:
from sagemaker.workflow.pipeline import Pipeline

pipeline_name = project_prefix
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_type, 
        processing_instance_count,
        input_data,
    ],
    steps=[step_process],
)

## 5.2 파이프라인 정의 확인
위에서 정의한 파이프라인 정의는 Json 형식으로 정의 되어 있습니다.

In [55]:
import json

definition = json.loads(pipeline.definition())
definition

{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'ProcessingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m5.xlarge'},
  {'Name': 'ProcessingInstanceCount', 'Type': 'Integer', 'DefaultValue': 1},
  {'Name': 'InputData',
   'Type': 'String',
   'DefaultValue': 's3://sagemaker-us-east-1-585843180719/sagemaker-webinar-pipeline-base/input'}],
 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'},
  'TrialName': {'Get': 'Execution.PipelineExecutionId'}},
 'Steps': [{'Name': 'Fraud-Basic-Process',
   'Type': 'Processing',
   'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType': {'Get': 'Parameters.ProcessingInstanceType'},
      'InstanceCount': {'Get': 'Parameters.ProcessingInstanceCount'},
      'VolumeSizeInGB': 30}},
    'AppSpecification': {'ImageUri': '683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemaker-scikit-learn:1.0-1-cpu-py3',
     'ContainerArguments': ['--split_rate', '0.2'],
     'ContainerEntrypoi

## 5.3 파이프라인 정의를 제출하고 실행하기 

파이프라인 정의를 파이프라인 서비스에 제출합니다. 함께 전달되는 역할(role)을 이용하여 AWS에서 파이프라인을 생성하고 작업의 각 단계를 실행할 것입니다.   

In [56]:
pipeline.upsert(role_arn=role)
execution = pipeline.start()

워크플로우의 실행상황을 살펴봅니다. 

In [57]:
execution.describe()


{'PipelineArn': 'arn:aws:sagemaker:us-east-1:585843180719:pipeline/sagemaker-webinar-pipeline-base',
 'PipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:585843180719:pipeline/sagemaker-webinar-pipeline-base/execution/ejfhjx0f5its',
 'PipelineExecutionDisplayName': 'execution-1678692546980',
 'PipelineExecutionStatus': 'Executing',
 'CreationTime': datetime.datetime(2023, 3, 13, 7, 29, 6, 892000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2023, 3, 13, 7, 29, 6, 892000, tzinfo=tzlocal()),
 'CreatedBy': {},
 'LastModifiedBy': {},
 'ResponseMetadata': {'RequestId': 'aa7e0a4f-1d59-4732-a737-47f9b26e8472',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'aa7e0a4f-1d59-4732-a737-47f9b26e8472',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '427',
   'date': 'Mon, 13 Mar 2023 07:29:06 GMT'},
  'RetryAttempts': 0}}

## 5.4 파이프라인 실행 기다리기

In [58]:
execution.wait()

실행이 완료될 때까지 기다립니다.

실행된 단계들을 리스트업합니다. 파이프라인의 단계실행 서비스에 의해 시작되거나 완료된 단계를 보여줍니다.

## 5.5 파이프라인 실행 단계 기록 보기

In [59]:
execution.list_steps()

[{'StepName': 'Fraud-Basic-Process',
  'StartTime': datetime.datetime(2023, 3, 13, 7, 29, 7, 821000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2023, 3, 13, 7, 35, 38, 103000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'AttemptCount': 0,
  'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:585843180719:processing-job/pipelines-ejfhjx0f5its-fraud-basic-process-ggf58zvugz'}}}]

# 6. 세이지 메이커 스튜디오에서 확인하기
- 아래의 그림 처럼 SageMaker Studio에 로긴후에 따라하시면, SageMaker Studio 에서도 실행 내역을 확인할 수 있습니다.
    - 그림 처럼 (1), (2), (3) 을 순서대로 클릭사시면 (4) 의 전처리 스텝의 실형 결과를 확인 할 수 있습니다.
- SageMaker Studio 개발자 가이드 --> [SageMaker Studio](https://docs.aws.amazon.com/ko_kr/sagemaker/latest/dg/studio.html)




![process_basic.png](img/process_basic.png)


# 7. 전처리 파일 경로 추출
- 다음 노트북에서 사용할 훈련 및 테스트의 전처리 S3 경로를 저장 합니다.

In [60]:
from src.p_utils import get_proc_artifact

import boto3
client = boto3.client("sagemaker")

train_preproc_dir_artifact = get_proc_artifact(execution, client, kind=0 )
test_preproc_dir_artifact = get_proc_artifact(execution, client, kind=1 )

print("train_preproc_dir_artifact: \n", train_preproc_dir_artifact)
print("test_preproc__dir_artifact: \n", test_preproc_dir_artifact)



train_preproc_dir_artifact: 
 s3://sagemaker-us-east-1-585843180719/sagemaker-webinar-pipeline-base/ejfhjx0f5its/Fraud-Basic-Process/output/train
test_preproc__dir_artifact: 
 s3://sagemaker-us-east-1-585843180719/sagemaker-webinar-pipeline-base/ejfhjx0f5its/Fraud-Basic-Process/output/test


다음 노트북에서 아래 변수를 사용하기 위해서 저장 합니다.

In [61]:
%store train_preproc_dir_artifact
%store test_preproc_dir_artifact

Stored 'train_preproc_dir_artifact' (str)
Stored 'test_preproc_dir_artifact' (str)
