# [모듈 2-1] SageMaker Pipelines 사용하기
이 노트북에서는 아래와 같은 작업을 수행합니다.
- 데이터 준비
- Pipeline 정의
- 데이터 전처리: Processing Step 이용


In [None]:
import sagemaker

sagemaker_session = sagemaker.session.Session()
bucket = sagemaker_session.default_bucket()
role = sagemaker.get_execution_role()

In [None]:
pipeline_prefix = 'autoglueon'
s3_data_path = f's3://{bucket}/{pipeline_prefix}/raw'
s3_config_path = f's3://{bucket}/{pipeline_prefix}/config'

sm_dataset_path = '../data/raw'
sm_claims_data_path = f'{sm_dataset_path}/claims.csv'
sm_customers_data_path = f'{sm_dataset_path}/customers.csv'
sm_config_path = f'../config'

In [None]:
print("claim_data_path: ", sm_claims_data_path)
print("customer_data_path: ", sm_customers_data_path)
print("s3_data_path: ", s3_data_path)

In [None]:
s3_claims_data_path = sagemaker.s3.S3Uploader.upload(
 local_path = sm_claims_data_path, 
 desired_s3_uri = s3_data_path
)
print("claims data path in S3: ", s3_claims_data_path)

s3_customers_data_path = sagemaker.s3.S3Uploader.upload(
 local_path = sm_customers_data_path, 
 desired_s3_uri = s3_data_path
)
print("customers data path in S3: ", s3_customers_data_path)


s3_config_path = sagemaker.s3.S3Uploader.upload(
 local_path = sm_config_path, 
 desired_s3_uri = s3_config_path
)
print("config path in S3: ", s3_config_path)


# 2.2 전처리 스텝 개발

In [None]:
import os
import boto3
import pandas as pd
from IPython.display import display as dp

In [None]:
df_customers = pd.read_csv(sm_customers_data_path)
df_customers.head()

In [None]:
df_claims = pd.read_csv(sm_claims_data_path)
df_claims.head()

In [None]:
%%writefile src/preprocess.py

import argparse
import os
import requests
import tempfile
import subprocess, sys

import pandas as pd
import numpy as np
from glob import glob

from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder

import logging
import logging.handlers

def _get_logger():
 '''
 로깅을 위해 파이썬 로거를 사용
 # https://stackoverflow.com/questions/17745914/python-logging-module-is-printing-lines-multiple-times
 '''
 loglevel = logging.DEBUG
 l = logging.getLogger(__name__)
 if not l.hasHandlers():
 l.setLevel(loglevel)
 logging.getLogger().addHandler(logging.StreamHandler(sys.stdout)) 
 l.handler_set = True
 return l 

logger = _get_logger()


def split_train_test(df, test_ratio=0.1):
 '''
 두 개의 데이터 세트로 분리
 '''
 total_rows = df.shape[0]
 train_end = int(total_rows * (1 - test_ratio))
 
 train_df = df[0:train_end]
 test_df = df[train_end:]
 
 return train_df, test_df


def get_dataframe(base_preproc_input_dir, file_name_prefix ): 
 '''
 파일 이름이 들어가 있는 csv 파일을 모두 저장하여 데이터 프레임을 리턴
 '''
 
 input_files = glob('{}/{}*.csv'.format(base_preproc_input_dir, file_name_prefix))
 #claim_input_files = glob('{}/dataset*.csv'.format(base_preproc_input_dir)) 
 logger.info(f"input_files: \n {input_files}") 
 
 if len(input_files) == 0:
 raise ValueError(('There are no files in {}.\n' +
 'This usually indicates that the channel ({}) was incorrectly specified,\n' +
 'the data specification in S3 was incorrectly specified or the role specified\n' +
 'does not have permission to access the data.').format(base_preproc_input_dir, "train"))
 
 raw_data = [ pd.read_csv(file, index_col=0) for file in input_files ]
 df = pd.concat(raw_data)
 
 logger.info(f"dataframe shape \n {df.shape}") 
 logger.info(f"dataset sample \n {df.head(2)}") 
 #logger.info(f"df columns \n {df.columns}") 
 
 return df


def convert_type(raw, cols, type_target):
 '''
 해당 데이터 타입으로 변경
 '''
 df = raw.copy()
 
 for col in cols:
 df[col] = df[col].astype(type_target)
 
 return df
 

if __name__ =='__main__':
 
 ################################
 #### 커맨드 인자 파싱 
 ################################# 
 
 parser = argparse.ArgumentParser()
 parser.add_argument('--base_output_dir', type=str, default="/opt/ml/processing/output")
 parser.add_argument('--base_preproc_input_dir', type=str, default="/opt/ml/processing/input") 
 parser.add_argument('--split_rate', type=float, default=0.1) 
 parser.add_argument('--label_column', type=str, default="fraud") 
 # parse arguments
 args = parser.parse_args() 
 
 logger.info("######### Argument Info ####################################")
 logger.info(f"args.base_output_dir: {args.base_output_dir}")
 logger.info(f"args.base_preproc_input_dir: {args.base_preproc_input_dir}") 
 logger.info(f"args.label_column: {args.label_column}") 
 logger.info(f"args.split_rate: {args.split_rate}") 

 base_output_dir = args.base_output_dir
 base_preproc_input_dir = args.base_preproc_input_dir
 label_column = args.label_column 
 split_rate = args.split_rate

 ################################# 
 #### 두개의 파일(claim, customer) 을 로딩하여 policy_id 로 조인함 ########
 ################################# 
 
 logger.info(f"\n### Loading Claim Dataset")
 claim_df = get_dataframe(base_preproc_input_dir,file_name_prefix='claim' ) 
 
 logger.info(f"\n### Loading Customer Dataset") 
 customer_df = get_dataframe(base_preproc_input_dir,file_name_prefix='customer' ) 
 
 df = customer_df.join(claim_df, how='left')
 logger.info(f"### dataframe merged with customer and claim: {df.shape}")


 ################################# 
 #### 카테고리 피쳐를 원핫인코딩 
 ################################# 
 
 logger.info(f"\n ### Encoding: Category Features") 
 categorical_features = df.select_dtypes(include=['object']).columns.values.tolist() 
 #categorical_features = ['driver_relationship'] 
 logger.info(f"categorical_features: {categorical_features}") 

 categorical_transformer = Pipeline(
 steps=[
 ("imputer", SimpleImputer(strategy="constant", fill_value="missing")),
 ("onehot", OneHotEncoder(handle_unknown="ignore"))
 ]
 )
 
 preprocess = ColumnTransformer(
 transformers=[
 ("cat", categorical_transformer, categorical_features)
 ],
 sparse_threshold = 0, # dense format 으로 제공
 )

 X_pre_category = preprocess.fit_transform(df)
 

 # 원핫인코딩한 컬럼의 이름 로딩
 # Ref: Sklearn Pipeline: Get feature names after OneHotEncode In ColumnTransformer, https://stackoverflow.com/questions/54646709/sklearn-pipeline-get-feature-names-after-onehotencode-in-columntransformer
 
 processed_category_features = preprocess.transformers_[0][1].named_steps['onehot'].get_feature_names(categorical_features)
 #logger.info(f"processed_category_features: {processed_category_features}")
# print(X_pre)
 
 ###############################
 ### 숫자형 변수 전처리 
 ###############################
 
 logger.info(f"\n ### Encoding: Numeric Features") 
 
 float_cols = df.select_dtypes(include=['float64']).columns.values
 int_cols = df.select_dtypes(include=['int64']).columns.values
 numeric_features = np.concatenate((float_cols, int_cols), axis=0).tolist()
 
 logger.info(f"int_cols: \n{int_cols}") 
 logger.info(f"float_cols: \n{float_cols}") 
 #logger.info(f"numeric_features: \n{numeric_features}")

 # 따로 스케일링은 하지 않고, 미싱 값만 중간값을 취함
 numeric_transformer = Pipeline(
 steps=[
 ("imputer", SimpleImputer(strategy="median")),
 # ("scaler", StandardScaler())
 ]
 )

 numeric_preprocessor = ColumnTransformer(
 transformers=[
 ("cat", numeric_transformer, numeric_features)
 ],
 sparse_threshold = 0,
 )

 X_pre_numeric = numeric_preprocessor.fit_transform(df) 

 
 ###############################
 ### 전처리 결과 결합 ####
 ###############################
 
 logger.info(f"\n ### Handle preprocess results") 
 
 # 전처리 결과를 데이터 프레임으로 생성
 category_df = pd.DataFrame(data=X_pre_category, columns=processed_category_features)
 numeric_df = pd.DataFrame(data=X_pre_numeric, columns=numeric_features) 

 full_df = pd.concat([numeric_df, category_df ], axis=1)
 
 # float 타입을 int 로 변경
 full_df = convert_type(full_df, cols=int_cols, type_target='int')
 full_df = convert_type(full_df, cols=processed_category_features, type_target='int') 
 
 # label_column을 맨 앞으로 이동 시킴
 full_df = pd.concat([full_df[label_column], full_df.drop(columns=[label_column])], axis=1)
 
 ############################### 
 # 훈련, 테스트 데이터 세트로 분리 및 저장
 ###############################
 
 train_df, test_df = split_train_test(full_df, test_ratio=split_rate) 
 train_df.to_csv(f"{base_output_dir}/dataset/train.csv", index=False)
 test_df.to_csv(f"{base_output_dir}/dataset/test.csv", index=False) 

 logger.info(f"preprocessed train shape \n {train_df.shape}") 
 logger.info(f"preprocessed test shape \n {test_df.shape}") 

 # logger.info(f"preprocessed train path \n {base_output_dir}/train/train.csv")
 logger.info(f"\n ### Final result for train dataset ") 
 logger.info(f"preprocessed train sample \n {train_df.head(2)}")

In [None]:
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput

framework_version = "0.23-1"
processing_instance_type = "ml.m5.xlarge"
processing_instance_count = 1
split_rate = 0.1

sklearn_processor = SKLearnProcessor(
 framework_version = framework_version,
 instance_type = processing_instance_type,
 instance_count = processing_instance_count,
 base_job_name = "sklearn-fraud-process",
 role = role,
)

In [None]:
sklearn_processor.run(
 code = "src/preprocess.py",
 inputs = [ProcessingInput(source = s3_data_path, destination = "/opt/ml/processing/input")],
 outputs = [ProcessingOutput(output_name = "dataset",
 source = "/opt/ml/processing/output/dataset")],
 arguments = ['--split_rate', f"{split_rate}"]
)


In [None]:
sklearn_processor.latest_job.outputs[0].destination

In [None]:
sagemaker.s3.S3Downloader.download(s3_uri = sklearn_processor.latest_job.outputs[0].destination,
 local_path = '../data/preprocessed')

In [None]:
preprocessed_train_path = '../data/preprocessed/train.csv'
preprocessed_train_df = pd.read_csv(preprocessed_train_path)
preprocessed_train_df.head()

# 모델 빌딩 파이프라인의 스텝 생성
## 모델 빌딩 파이프라인의 파라미터 정의
파이프라인에서 사용 할 파라미터를 정의합니다. 파이프라인을 실행할 때 파라미터를 이용하여 실행 조건을 커스터마이징 할 수 있습니다.
지원하는 파라미터 타입은 다음과 같습니다.

* `ParameterString` - 파이썬 타입에서 `str` 
* `ParameterInteger` - 파이썬 타입에서 `int` 
* `ParameterFloat` - 파이썬 타입에서 `float` 

디폴트 값을 지정할 수 있으며, 파이프라인 실행 시 재지정 할 수도 있습니다.

In [None]:
from sagemaker.workflow.parameters import (
 ParameterInteger,
 ParameterString,
)

processing_instance_count = ParameterInteger(
 name="ProcessingInstanceCount",
 default_value=1
)
processing_instance_type = ParameterString(
 name="ProcessingInstanceType",
 default_value="ml.m5.xlarge"
)
input_data = ParameterString(
 name="InputData",
 default_value=s3_data_path,
)

## 전처리 스텝 프로세서 정의

In [None]:
# !pip install --upgrade sagemaker

In [None]:
import sagemaker
sagemaker.__version__

In [None]:
from sagemaker.sklearn.processing import SKLearnProcessor

framework_version = "0.23-1"
processing_instance_type = "ml.m5.xlarge"

sklearn_processor = SKLearnProcessor(
 framework_version=framework_version,
 instance_type=processing_instance_type,
 instance_count=processing_instance_count,
 base_job_name="sklearn-fraud-process",
 role=role,
)

# 전처리 스텝 정의
Processing Step에서는 다음과 같은 인자들을 지정합니다.
* name: 스텝 명
* processor: 전처리 프로세서
* inputs: 입력 데이터 S3 경로
* outputs: 처리결과가 저장 될 Docker 안의 경로
* job arguments: 사용자 정의 인자
* code: 전처리 코드 경로

보다 자세한 내용은 링크를 확인하세요
[처리 단계, Processing Step](https://docs.aws.amazon.com/ko_kr/sagemaker/latest/dg/build-and-manage-steps.html#step-type-processing)

In [None]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
 

step_process = ProcessingStep(
 name="Fraud-Preprocessing",
 processor=sklearn_processor,
 code= "src/preprocess.py",
 inputs = [ProcessingInput(source = s3_data_path, destination = "/opt/ml/processing/input")],
 outputs = [ProcessingOutput(output_name = "dataset",
 source = "/opt/ml/processing/output/dataset")],
 job_arguments = ['--split_rate', f"{split_rate}"]
)

# 파이프라인 정의 및 실행
생성한 단계들을 하나의 파이프라인으로 조합하고 실행할 수 있습니다.
파이프라인은 name, parapeters, steps 인자가 필수로 필요합니다. 
여기서 파이프라인의 name은 Account와 Region에 대해 유일해야 합니다.

주의사항:
- 정의에 사용한 모든 파라미터가 존재해야 합니다.
- 파이프라인으로 전달된 단계(step)들은 실행순서와는 무관합니다. SageMaker Pipeline은 단계가 실행되고 완료될 수 있도록 의존관계를를 해석합니다.
- [알림] 정의한 stpes 이 복수개이면 복수개를 기술합니다. 만약에 step 간에 의존성이 있으면, 명시적으로 기술하지 않아도 같이 실행 됩니다.

In [None]:
from sagemaker.workflow.pipeline import Pipeline

pipeline_name = pipeline_prefix
pipeline = Pipeline(
 name=pipeline_name,
 parameters=[
 processing_instance_type, 
 processing_instance_count,
 input_data,
 ],
 steps=[step_process],
)

In [None]:
import json

definition = json.loads(pipeline.definition())
definition

## 파이프라인 정의를 등록하고 실행하기

In [None]:
pipeline.upsert(role_arn=role)
execution = pipeline.start()

In [None]:
execution.describe()

In [None]:
execution.wait()

In [None]:
execution.list_steps()