# [모듈 5.1] 고급 모델 빌딩 파이프라인 개발 

이 노트북은 아래와 같은 목차로 진행 됩니다. 전체를 모두 실행시에 완료 시간은 **약 30분** 소요 됩니다.

- 1. SageMaker 모델 빌드 파이프라인을 이용한 모델 빌드 오케스트레이션
- 2. 파이프라인 개발자 가이드
- 3. 기본 라이브러리 로딩
- 4. 모델 빌딩 파이프라인 의 스텝(Step) 생성
    - 4.1 모델 빌딩 파이프라인 변수 생성
    - 4.2 캐싱 정의
    - 4.3 전처리 스텝 단계 정의
    - 4.4 모델 학습을 위한 학습단계 정의
    - 4.5 모델 평가 단계
    - 4.6 모델 등록 스텝
    - 4.7 세이지 메이커 모델 스텝 생성
    - 4.8 HPO 스텝
    - 4.9 조건 스텝
- 5. 파리마터, 단계, 조건을 조합하여 최종 파이프라인 정의 및 실행
- 6. 세이지 메이커 스튜디오에서 실행 확인 하기
- 7. Pipeline 캐싱 및 파라미터 이용한 실행
- 8. 계보(Lineage)
    
---

# 1. SageMaker 모델 빌드 파이프라인을 이용한 모델 빌드 오케스트레이션

Amazon SageMaker Model building pipeline은 머신러닝 워크플로우를 개발하는 데이터 과학자, 엔지니어들에게 SageMaker작업과 재생산가능한 머신러닝 파이프라인을 오케스트레이션하는 기능을 제공합니다. 또한 커스텀빌드된 모델을 실시간 추론환경이나 배치변환을 통한 추론 실행환경으로 배포하거나, 생성된 아티팩트의 계보(lineage)를 추적하는 기능을 제공합니다. 이 기능들을 통해 모델 아티팩트를 배포하고, 업무환경에서의 워크플로우를 배포/모니터링하고, 간단한 인터페이스를 통해 아티팩트의 계보 추적하고, 머신러닝 애플리케이션 개발의 베스트 프렉티스를 도입하여, 보다 안정적인 머신러닝 애플리케이션 운영환경을 구현할 수 있습니다. 

SageMaker pipeline 서비스는 JSON 선언으로 구현된 SageMaker Pipeline DSL(Domain Specific Language, 도메인종속언어)를 지원합니다. 이 DSL은 파이프라인 파라마터와 SageMaker 작업단계의 DAG(Directed Acyclic Graph)를 정의합니다. SageMaker Python SDK를 이용하면 이 파이프라인 DSL의 생성을 보다 간편하게 할 수 있습니다. 






# 2. 파이프라인 개발자 가이드
## SageMaker 파이프라인 소개

![mdp_how_it_works.png](../img/mdp_how_it_works.png)



SageMaker 파이프라인은 다음 기능을 지원하며 본 lab_03_pipelinie 에서 일부를 다루게 됩니다. 

* Processing job steps - 데이터처러 워크로드를 실행하기 위한 SageMaker의 관리형 기능. Feature engineering, 데이터 검증, 모델 평가, 모델 해석 등에 주로 사용됨 
* Training job steps - 학습작업. 모델에게 학습데이터셋을 이용하여 모델에게 예측을 하도록 학습시키는 작업 
* Conditional execution steps - 조건별 실행분기. 파이프라인을 분기시키는 역할.
* Register model steps - 학습이 완료된 모델패키지 리소스를 이후 배포를 위한 모델 레지스트리에 등록하기 
* Create model steps - 추론 엔드포인트 또는 배치 추론을 위한 모델의 생성 
* Transform job steps - 배치추론 작업. 배치작업을 이용하여 노이즈, bias의 제거 등 데이터셋을 전처리하고 대량데이터에 대해 추론을 실행하는 단계
* Pipelines - Workflow DAG. SageMaker 작업과 리소스 생성을 조율하는 단계와 조건을 가짐
* Parametrized Pipeline executions - 특정 파라미터에 따라 파이프라인 실행방식을 변화시키기 


- 상세한 개발자 가이드는 아래 참조 하세요.
    - [세이지 메이커 모델 빌딩 파이프라인의 개발자 가이드](https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines.html)




# 3. 기본 라이브러리 로딩

세이지 메이커 관련 라이브러리를 로딩 합니다.

In [1]:
import boto3
import sagemaker
import pandas as pd
import os

sagemaker_session = sagemaker.session.Session()
role = sagemaker.get_execution_role()
sm_client = boto3.client("sagemaker")


## 3.1 노트북 변수 로딩


저장된 변수를 확인 합니다.

In [2]:
%store

Stored variables and their in-db values:
bucket                             -> 'sagemaker-us-east-1-057716757052'
claims_data_uri                    -> 's3://sagemaker-us-east-1-057716757052/sagemaker-w
customers_data_uri                 -> 's3://sagemaker-us-east-1-057716757052/sagemaker-w
input_data_uri                     -> 's3://sagemaker-us-east-1-057716757052/sagemaker-w
input_preproc_data_uri             -> 's3://sagemaker-us-east-1-057716757052/sagemaker-w
project_prefix                     -> 'sagemaker-webinar-pipeline-advanced'
test_preproc_data_uri              -> 's3://sagemaker-us-east-1-057716757052/sagemaker-w
train_preproc_data_uri             -> 's3://sagemaker-us-east-1-057716757052/sagemaker-w


기존 노트북에서 저장한 변수를 로딩 합니다.

In [3]:
%store -r

# 4. 모델 빌딩 파이프라인 의 스텝(Step) 생성


## 4.1 모델 빌딩 파이프라인 변수 생성


본 노트북에서 사용하는 파라미터는 다음과 같습니다.

* `processing_instance_type` - 프로세싱 작업에서 사용할 `ml.*` 인스턴스 타입 
* `processing_instance_count` - 프로세싱 작업에서 사용할 인스턴스 개수 
* `training_instance_type` - 학습작업에서 사용할 `ml.*` 인스턴스 타입
* `model_approval_status` - 학습된 모델을 CI/CD를 목적으로 등록할 때의 승인 상태 (디폴트는 "PendingManualApproval")
* `input_data` - 입력데이터에 대한 S3 버킷 URI



파이프라인의 각 스텝에서 사용할 변수를 파라미터 변수로서 정의 합니다.


In [4]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
)

processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1
)
processing_instance_type = ParameterString(
    name="ProcessingInstanceType",
    default_value="ml.m5.xlarge"
)

training_instance_type = ParameterString(
    name="TrainingInstanceType",
    default_value="ml.m5.xlarge"
)

training_instance_count = ParameterInteger(
    name="TrainingInstanceCount",
    default_value=1
)

model_eval_threshold = ParameterFloat(
    name="model2eval2threshold",
    default_value=0.85
)

input_data = ParameterString(
    name="InputData",
    default_value=input_data_uri,
)

model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)


## 4.2 캐싱 정의

- 참고: 캐싱 파이프라인 단계:  [Caching Pipeline Steps](https://docs.aws.amazon.com/ko_kr/sagemaker/latest/dg/pipelines-caching.html)

In [5]:
from sagemaker.workflow.steps import CacheConfig

cache_config = CacheConfig(enable_caching=True, 
                           expire_after="7d")


## 4.3 전처리 스텝 단계 정의


In [7]:
from sagemaker.sklearn.processing import SKLearnProcessor

split_rate = 0.2
framework_version = "0.23-1"

sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    base_job_name="sklearn-fraud-process",
    role=role,
)
# print("input_data: \n", input_data)

The input argument instance_type of function (sagemaker.image_uris.retrieve) is a pipeline variable (<class 'sagemaker.workflow.parameters.ParameterString'>), which is not allowed. The default_value of this Parameter object will be used to override it. Please make sure the default_value is valid.


In [8]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
    
step_process = ProcessingStep(
    name="Fraud-Advance-Preprocess",
    processor=sklearn_processor,
    inputs=[
        ProcessingInput(source=input_data, destination='/opt/ml/processing/input'),        
         ],
    outputs=[ProcessingOutput(output_name="train",
                              source='/opt/ml/processing/output/train'),
             ProcessingOutput(output_name="test",
                              source='/opt/ml/processing/output/test')],
    job_arguments=["--split_rate", f"{split_rate}"],        
    code= 'src/preprocessing.py',
    cache_config = cache_config, # 캐시 정의
)


## 4.4 모델 학습을 위한 학습단계 정의 



###  하이퍼파라미터 세팅

In [9]:
base_hyperparameters = {
       "scale_pos_weight" : "29",        
        "max_depth": "6",
        "alpha" : "0", 
        "eta": "0.3",
        "min_child_weight": "1",
        "objective": "binary:logistic",
        "num_round": "100",
}


###  Estimator 생성

Estimator 생성시에 인자가 필요 합니다. 주요한 인자만 보겠습니다.
- 사용자 훈련 코드 ""xgboost_script.py"
- 훈련이 끝난 후에 결과인 모델 아티펙트의 경로 "estimator_output_path" 지정 합니다. 지정 안할 시에는 디폴트 경로로 저장 됩니다.


In [10]:
from sagemaker.xgboost.estimator import XGBoost

estimator_output_path = f's3://{bucket}/{project_prefix}/training_jobs'
print("estimator_output_path: \n", estimator_output_path)


xgb_train = XGBoost(
    entry_point = "xgboost_script.py",
    source_dir = "src",
    output_path = estimator_output_path,
    hyperparameters = base_hyperparameters,
    role = role,
    instance_count = training_instance_count,
    instance_type = training_instance_type,
    framework_version = "1.0-1")

The input argument instance_type of function (sagemaker.image_uris.retrieve) is a pipeline variable (<class 'sagemaker.workflow.parameters.ParameterString'>), which is not allowed. The default_value of this Parameter object will be used to override it. Please make sure the default_value is valid.


estimator_output_path: 
 s3://sagemaker-us-east-1-057716757052/sagemaker-webinar-pipeline-advanced/training_jobs


### 모델 훈련 스탭 생성
스텝 생성시에 위에서 생성한 Estimator 입력 및 입력 데이타로서 전처리 데이터가 존재하는 S3 경로를 제공합니다.

훈련의 입력이 이전 전처리의 결과가 제공됩니다.
- `step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri`

In [11]:
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep


step_train = TrainingStep(
    name= "Fraud-Advance-Train",
    estimator=xgb_train,
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "train"
            ].S3Output.S3Uri,
            # s3_data= train_preproc_dir_artifact,            
            content_type="text/csv"
        ),
    },
    cache_config = cache_config, # 캐시 정의    
)



## 4.5 모델 평가 단계

### SKLearn Processor 생성

SKLearn Processor 생성시에 인자가 필요 합니다.



In [12]:
from sagemaker.processing import ScriptProcessor


script_eval = SKLearnProcessor(
                             framework_version= "0.23-1",
                             role=role,
                             instance_type=processing_instance_type,
                             instance_count=1,
                             base_job_name="script-fraud-scratch-eval",
                                    )



The input argument instance_type of function (sagemaker.image_uris.retrieve) is a pipeline variable (<class 'sagemaker.workflow.parameters.ParameterString'>), which is not allowed. The default_value of this Parameter object will be used to override it. Please make sure the default_value is valid.


### Property 파일 정의

- PropertyFile 은 step_eval 단계가 실행 한 후에 모델 평가 지표의 결과 파일 내용을 정의하는데 사용 됩니다.

```
형식:
<property_file_instance> = PropertyFile(
    name="<property_file_name>",
    output_name="<processingoutput_output_name>",
    path="<path_to_json_file>"
)
예시:
evaluation_report = PropertyFile(
    name="EvaluationReport",
    output_name="evaluation",
    path="evaluation.json"
)
```


- 위의 PropertyFile 의  output_name="evaluation" 이고, 파일 이름은 evaluation.json" 라는 것을 의미 합니다. "evaluation.json" 파일 안에는 아래의 값이 저장이 됩니다.

```
    report_dict = {
        "binary_classification_metrics": {
            "auc": {
                "value": <roc_score>,
                "standard_deviation" : "NaN",
            },
        },
    }
```


- 최종적으로 evaluation.json 안의 \<roc_score\> 값이 추후 (조건 스텝) 에 사용이 됩니다.
- step_eval 이 실행이 되면 `evaluation.json` 이 S3에 저장이 됩니다.



#### 참고
- 참고 자료: [Property Files and JsonGet](https://docs.aws.amazon.com/ko_kr/sagemaker/latest/dg/build-and-manage-propertyfile.html)

In [13]:
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.steps import ProcessingStep

from sagemaker.workflow.properties import PropertyFile


evaluation_report = PropertyFile(
    name="EvaluationReport",
    output_name="evaluation",
    path="evaluation.json"
)



### 모델 평가 스텝 정의

In [14]:


step_eval = ProcessingStep(
    name= "Fraud-Advance-Evaluation",
    processor=script_eval,
    inputs=[
        ProcessingInput(
            source= step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model"
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs[
                "test"
            ].S3Output.S3Uri,
        destination="/opt/ml/processing/test"
        )
    ],
    outputs=[
        ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
    ],
    code="src/evaluation.py",
    cache_config = cache_config, # 캐시 정의    
  property_files=[evaluation_report], # 현재 이 라인을 넣으면 에러 발생
)

## 4.6 모델 등록 스텝

### 모델 그룹 생성

- 참고
    - 모델 그룹 릭스팅 API:  [ListModelPackageGroups](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_ListModelPackageGroups.html)
    - 모델 지표 등록: [Model Quality Metrics](https://docs.aws.amazon.com/ko_kr/sagemaker/latest/dg/model-monitor-model-quality-metrics.html)

In [15]:
model_package_group_name = f"{project_prefix}"
model_package_group_input_dict = {
 "ModelPackageGroupName" : model_package_group_name,
 "ModelPackageGroupDescription" : "Sample model package group"
}
response = sm_client.list_model_package_groups(NameContains=model_package_group_name)
if len(response['ModelPackageGroupSummaryList']) == 0:
    print("No model group exists")
    print("Create model group")    
    
    create_model_pacakge_group_response = sm_client.create_model_package_group(**model_package_group_input_dict)
    print('ModelPackageGroup Arn : {}'.format(create_model_pacakge_group_response['ModelPackageGroupArn']))    
else:
    print(f"{model_package_group_name} exitss")

No model group exists
Create model group
ModelPackageGroup Arn : arn:aws:sagemaker:us-east-1:057716757052:model-package-group/sagemaker-webinar-pipeline-advanced


### 모델 평가 메트릭 정의

In [16]:
from sagemaker.workflow.step_collections import RegisterModel

from sagemaker.model_metrics import MetricsSource, ModelMetrics 


model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json"
    )
)




### 모델 등록 스텝 정의

In [17]:
step_register = RegisterModel(
    name= "Fraud-Advance-Model_Register",
    estimator=xgb_train,
    image_uri= step_train.properties.AlgorithmSpecification.TrainingImage,
    model_data= step_train.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
)

## 4.7 세이지 메이커 모델 스텝 생성
- 아래 두 파리미터의 입력이 이전 스텝의 결과가 제공됩니다.
    - image_uri= step_train.properties.AlgorithmSpecification.TrainingImage,
    - model_data= step_train.properties.ModelArtifacts.S3ModelArtifacts,



### 세이지 메이커 모델 생성

In [18]:
from sagemaker.model import Model
    
model = Model(
    image_uri= step_train.properties.AlgorithmSpecification.TrainingImage,
    model_data= step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=sagemaker_session,
    role=role,
)

### 세이지 메이커 모델 스탭 정의

In [19]:
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.steps import CreateModelStep


inputs = CreateModelInput(
    instance_type="ml.m5.large",
    # accelerator_type="ml.eia1.medium",
)
step_create_model = CreateModelStep(
    name= "Fraud-Advance-Create-SageMaker-Model",
    model=model,
    inputs=inputs,
)

## 4.8 HPO 스텝

### 튜닝할 하이퍼파라미터 범위 설정
여기서는 `eta, min_child_weight, alpha, max_depth` 를 튜닝 합니다.

In [20]:
from sagemaker.tuner import (
    IntegerParameter,
    CategoricalParameter,
    ContinuousParameter,
    HyperparameterTuner,
)

hyperparameter_ranges = {
    "eta": ContinuousParameter(0, 1),
    "min_child_weight": ContinuousParameter(1, 10),
    "alpha": ContinuousParameter(0, 2),
    "max_depth": IntegerParameter(1, 10),
}


### 튜너 설정 및 생성
- xbg_estimator 정의된  estimator 기술
- `objective_metric_name = "validation:auc"` 튜닝을 하고자 하는 지표 기술
    - 이 지표의 경우는 훈련 코드에서 정의 및 기록을 해야만 합니다.
- `hyperparameter_ranges` 튜닝하고자 하는 파라미터의 범위 설정
- `max_jobs` 기술
    - 총 훈련잡의 갯수 입니다.
- `max_parallel_jobs` 기술
    - 병렬로 실행할 훈련잡의 개수 (리소스 제한에 따라서 에러가 발생할 수 있습니다. 이 경우에 줄여 주세요.)


In [21]:
objective_metric_name = "validation:auc"

tuner = HyperparameterTuner(
    xgb_train, objective_metric_name, hyperparameter_ranges, 
    max_jobs=5,
    max_parallel_jobs=5,
)



### 튜닝 단계 정의 



In [22]:
from sagemaker.workflow.steps import TuningStep
    
step_tuning = TuningStep(
    name = "Fraud-Advance-HPO",
    tuner = tuner,
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "train"
            ].S3Output.S3Uri,
            # s3_data= train_preproc_dir_artifact,            
            content_type="text/csv"
        ),
    },    
    cache_config = cache_config, # 캐시 정의        
)

## 4.9 조건 스텝

### 조건  단계 정의
- 조건 단계에서 사용하는 ConditionLessThanOrEqualTo 에서 evaluation.json 을 로딩하여 내용을 확인

```

형식:
cond_lte = ConditionLessThanOrEqualTo(
    left=JsonGet(
        step=step_eval,
        property_file=<property_file_instance>,
        json_path="test_metrics.roc.value",
    ),
    right=6.0
)

에시:
cond_lte = ConditionLessThanOrEqualTo(
    left=JsonGet(
        step=step_eval,
        property_file=evaluation_report,
        json_path="binary_classification_metrics.auc.value",
    ),
    right=6.0
)


```

- property_file=evaluation_report 는 위의 모델 평가 스텝에서 정의한  PropertyFile-evaluation_report 를 사용합니다. evaluation_report 에서 정의한 evaluation.json 파일 안의 "binary_classification_metrics.auc.value" 의 값을 사용한다는 것을 의미 합니다.


In [23]:
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
from sagemaker.workflow.condition_step import (
    ConditionStep,
    JsonGet,
)


cond_lte = ConditionLessThanOrEqualTo(
    left=JsonGet(
        step=step_eval,
        property_file=evaluation_report,
        json_path="binary_classification_metrics.auc.value",
    ),
    # right=8.0
    right = model_eval_threshold
)

step_cond = ConditionStep(
    name= "Fraud-Advance-Condition",
    conditions=[cond_lte],
    if_steps=[step_tuning],        
    else_steps=[step_register, step_create_model], 
)

The class JsonGet has been renamed in sagemaker>=2.
See: https://sagemaker.readthedocs.io/en/stable/v2.html for details.


# 5. 파리마터, 단계, 조건을 조합하여 최종 파이프라인 정의 및 실행


이제 지금까지 생성한 단계들을 하나의 파이프라인으로 조합하고 실행하도록 하겠습니다.

파이프라인은 name, parameters, steps 속성이 필수적으로 필요합니다. 
여기서 파이프라인의 이름은 (account, region) 조합에 대하여 유일(unique))해야 합니다.
우리는 또한 여기서 Experiment 설정을 추가 하여, 실험에 등록 합니다.

주의:

- 정의에 사용한 모든 파라미터가 존재해야 합니다.
- 파이프라인으로 전달된 단계(step)들은 실행순서와는 무관합니다. SageMaker Pipeline은 단계가 실행되고 완료될 수 있도록 의존관계를를 해석합니다.

## 5.1 파이프라인 정의


위에서 정의한 아래의 4개의 스텝으로 파이프라인 정의를 합니다.
-     steps=[step_process, step_train, step_create_model, step_deploy],
- 아래는 약 20분 정도 소요 됩니다.

In [24]:
from sagemaker.workflow.pipeline import Pipeline

project_prefix = 'sagemaker-pipeline-phase2-step-by-step'

pipeline_name = project_prefix
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_type, 
        processing_instance_count,
        training_instance_type,        
        training_instance_count,                
        input_data,
        model_eval_threshold,
        model_approval_status,        
    ],
#   steps=[step_process, step_train, step_register, step_eval, step_cond],
  steps=[step_process, step_train, step_eval, step_cond],
)



## 5.2 파이프라인 정의 확인
위에서 정의한 파이프라인 정의는 Json 형식으로 정의 되어 있습니다.

In [25]:
import json

definition = json.loads(pipeline.definition())
definition

Using provided s3_resource


No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config
No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config
No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config
Popping out 'CertifyForMarketplace' from the pipeline definition since it will be overridden in pipeline execution time.


Using provided s3_resource


{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'ProcessingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m5.xlarge'},
  {'Name': 'ProcessingInstanceCount', 'Type': 'Integer', 'DefaultValue': 1},
  {'Name': 'TrainingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m5.xlarge'},
  {'Name': 'TrainingInstanceCount', 'Type': 'Integer', 'DefaultValue': 1},
  {'Name': 'InputData',
   'Type': 'String',
   'DefaultValue': 's3://sagemaker-us-east-1-057716757052/sagemaker-webinar-pipeline-advanced/input'},
  {'Name': 'model2eval2threshold', 'Type': 'Float', 'DefaultValue': 0.85},
  {'Name': 'ModelApprovalStatus',
   'Type': 'String',
   'DefaultValue': 'PendingManualApproval'}],
 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'},
  'TrialName': {'Get': 'Execution.PipelineExecutionId'}},
 'Steps': [{'Name': 'Fraud-Advance-Preprocess',
   'Type': 'Processing',
   'Arguments': {'ProcessingResources': {'ClusterConfig': {'Instanc

## 5.3 파이프라인 정의를 제출하고 실행하기 

파이프라인 정의를 파이프라인 서비스에 제출합니다. 함께 전달되는 역할(role)을 이용하여 AWS에서 파이프라인을 생성하고 작업의 각 단계를 실행할 것입니다.   

In [26]:
pipeline.upsert(role_arn=role)
execution = pipeline.start()

No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config
No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config
No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config
Popping out 'CertifyForMarketplace' from the pipeline definition since it will be overridden in pipeline execution time.


Using provided s3_resource
Using provided s3_resource


No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config
No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config
No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config
Popping out 'CertifyForMarketplace' from the pipeline definition since it will be overridden in pipeline execution time.


Using provided s3_resource
Using provided s3_resource


In [27]:
execution.describe()

{'PipelineArn': 'arn:aws:sagemaker:us-east-1:057716757052:pipeline/sagemaker-pipeline-phase2-step-by-step',
 'PipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:057716757052:pipeline/sagemaker-pipeline-phase2-step-by-step/execution/ptk818ucwlgf',
 'PipelineExecutionDisplayName': 'execution-1682945391089',
 'PipelineExecutionStatus': 'Executing',
 'CreationTime': datetime.datetime(2023, 5, 1, 12, 49, 50, 974000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2023, 5, 1, 12, 49, 50, 974000, tzinfo=tzlocal()),
 'CreatedBy': {},
 'LastModifiedBy': {},
 'ResponseMetadata': {'RequestId': 'db3ce70f-0eea-47bb-bb74-4e01b7620688',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'db3ce70f-0eea-47bb-bb74-4e01b7620688',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '441',
   'date': 'Mon, 01 May 2023 12:49:51 GMT'},
  'RetryAttempts': 0}}

## 5.4 파이프라인 실행 기다리기

In [28]:
execution.wait()

실행이 완료될 때까지 기다립니다.

실행된 단계들을 리스트업합니다. 파이프라인의 단계실행 서비스에 의해 시작되거나 완료된 단계를 보여줍니다.

## 5.5 파이프라인 실행 단계 기록 보기

In [29]:
execution.list_steps()

[{'StepName': 'Fraud-Advance-HPO',
  'StartTime': datetime.datetime(2023, 5, 1, 13, 1, 5, 173000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2023, 5, 1, 13, 4, 34, 432000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'AttemptCount': 0,
  'Metadata': {'TuningJob': {'Arn': 'arn:aws:sagemaker:us-east-1:057716757052:hyper-parameter-tuning-job/ptk818ucwlgf-fraud-a-hseaxwn5ad'}}},
 {'StepName': 'Fraud-Advance-Condition',
  'StartTime': datetime.datetime(2023, 5, 1, 13, 1, 3, 998000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2023, 5, 1, 13, 1, 4, 430000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'AttemptCount': 0,
  'Metadata': {'Condition': {'Outcome': 'True'}}},
 {'StepName': 'Fraud-Advance-Evaluation',
  'StartTime': datetime.datetime(2023, 5, 1, 12, 56, 27, 189000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2023, 5, 1, 13, 1, 3, 314000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'AttemptCount': 0,
  'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:s

# 6. 세이지 메이커 스튜디오에서 실행 확인 하기


![hpo-pipeline.png](img/hpo-pipeline.png)

# 7. Pipeline 캐싱 및 파라미터 이용한 실행
- 캐싱은 2021년 7월 현재 Training, Processing, Transform 의 Step에 적용이 되어 있습니다.
- 상세 사항은 여기를 확인하세요. -->  [캐싱 파이프라인 단계](https://docs.aws.amazon.com/ko_kr/sagemaker/latest/dg/pipelines-caching.html)


## 7.1 캐싱을 이용한 파이프라인 실행
생성한 파이프라인을 다른 파라미터값을 이용하여 다시 실행할 수 있습니다. 파라미터정보는 딕셔너리 형태로 파라미터이름과 값을 지정하여 전달하면 디폴트값을 오버라이드하게 됩니다. 

모델의 성능에 따라 이번에는 컴퓨팅최적화된 인스턴스 타입을 이용하여 파이프라인을 실행하고 승인 상태를 자동으로 "Approved"로 설정하고 싶다면 다음 셀의 코드를 실행할 수 있습니다. 모델의 승인상태가 "Approved"라는 의미는 `RegisterModel` 단계에서 패키지버전이 등록될 때 자동으로 CI/CD 파이프라인에 의해 배포가능한 상태가 된다는 것을 의미합니다. 이후 배포파이프라인 프로세스는 SageMaker project를 통하여 자동화할 수 있습니다. 



In [30]:
is_cache = True

In [31]:
%%time 

from IPython.display import display as dp
import time

if is_cache:
    execution = pipeline.start(
        parameters=dict(
            model2eval2threshold=0.8,
        )
    )    
    
    # execution = pipeline.start()
    time.sleep(10)
    dp(execution.list_steps())    
    execution.wait()


[{'StepName': 'Fraud-Advance-Model_Register-RegisterModel',
  'StartTime': datetime.datetime(2023, 5, 1, 13, 5, 4, 603000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2023, 5, 1, 13, 5, 5, 748000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'AttemptCount': 0,
  'Metadata': {'RegisterModel': {'Arn': 'arn:aws:sagemaker:us-east-1:057716757052:model-package/sagemaker-webinar-pipeline-advanced/1'}}},
 {'StepName': 'Fraud-Advance-Create-SageMaker-Model',
  'StartTime': datetime.datetime(2023, 5, 1, 13, 5, 4, 603000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2023, 5, 1, 13, 5, 6, 73000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'AttemptCount': 0,
  'Metadata': {'Model': {'Arn': 'arn:aws:sagemaker:us-east-1:057716757052:model/pipelines-a2prwtopeob6-fraud-advance-create-cscljotkob'}}},
 {'StepName': 'Fraud-Advance-Condition',
  'StartTime': datetime.datetime(2023, 5, 1, 13, 5, 2, 652000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2023, 5, 1, 13, 5, 3, 292000, 

CPU times: user 9.87 ms, sys: 3.88 ms, total: 13.8 ms
Wall time: 10.5 s


In [32]:
if is_cache:
    dp(execution.list_steps())

[{'StepName': 'Fraud-Advance-Model_Register-RegisterModel',
  'StartTime': datetime.datetime(2023, 5, 1, 13, 5, 4, 603000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2023, 5, 1, 13, 5, 5, 748000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'AttemptCount': 0,
  'Metadata': {'RegisterModel': {'Arn': 'arn:aws:sagemaker:us-east-1:057716757052:model-package/sagemaker-webinar-pipeline-advanced/1'}}},
 {'StepName': 'Fraud-Advance-Create-SageMaker-Model',
  'StartTime': datetime.datetime(2023, 5, 1, 13, 5, 4, 603000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2023, 5, 1, 13, 5, 6, 73000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'AttemptCount': 0,
  'Metadata': {'Model': {'Arn': 'arn:aws:sagemaker:us-east-1:057716757052:model/pipelines-a2prwtopeob6-fraud-advance-create-cscljotkob'}}},
 {'StepName': 'Fraud-Advance-Condition',
  'StartTime': datetime.datetime(2023, 5, 1, 13, 5, 2, 652000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2023, 5, 1, 13, 5, 3, 292000, 

## 7.1 캐싱을 이용한 파이프라인 실행 결과 보기

![cache-pipeline-result.png](img/cache-pipeline-result.png)

# 8. 계보(Lineage)

파이프라인에 의해 생성된 아티팩트의 계보를 살펴봅니다.

In [33]:
import time
from sagemaker.lineage.visualizer import LineageTableVisualizer


viz = LineageTableVisualizer(sagemaker.session.Session())
for execution_step in reversed(execution.list_steps()):
    print(execution_step)
    display(viz.show(pipeline_execution_step=execution_step))
    time.sleep(1)

{'StepName': 'Fraud-Advance-Preprocess', 'StartTime': datetime.datetime(2023, 5, 1, 13, 4, 57, 841000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2023, 5, 1, 13, 4, 58, 558000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'CacheHitResult': {'SourcePipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:057716757052:pipeline/sagemaker-pipeline-phase2-step-by-step/execution/ptk818ucwlgf'}, 'AttemptCount': 0, 'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:057716757052:processing-job/pipelines-ptk818ucwlgf-Fraud-Advance-Prepro-VgEV6kr5Gt'}}}


Unnamed: 0,Name/Source,Direction,Type,Association Type,Lineage Type
0,s3://...bb21017985f2/input/code/preprocessing.py,Input,DataSet,ContributedTo,artifact
1,s3://...agemaker-webinar-pipeline-advanced/input,Input,DataSet,ContributedTo,artifact
2,68331...om/sagemaker-scikit-learn:0.23-1-cpu-py3,Input,Image,ContributedTo,artifact
3,s3://...lgf/Fraud-Advance-Preprocess/output/test,Output,DataSet,Produced,artifact
4,s3://...gf/Fraud-Advance-Preprocess/output/train,Output,DataSet,Produced,artifact


{'StepName': 'Fraud-Advance-Train', 'StartTime': datetime.datetime(2023, 5, 1, 13, 4, 59, 634000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2023, 5, 1, 13, 5, 0, 284000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'CacheHitResult': {'SourcePipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:057716757052:pipeline/sagemaker-pipeline-phase2-step-by-step/execution/ptk818ucwlgf'}, 'AttemptCount': 0, 'Metadata': {'TrainingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:057716757052:training-job/pipelines-ptk818ucwlgf-Fraud-Advance-Train-GozJPnx0HC'}}}


Unnamed: 0,Name/Source,Direction,Type,Association Type,Lineage Type
0,s3://...gf/Fraud-Advance-Preprocess/output/train,Input,DataSet,ContributedTo,artifact
1,68331...naws.com/sagemaker-xgboost:1.0-1-cpu-py3,Input,Image,ContributedTo,artifact
2,s3://...nce-Train-GozJPnx0HC/output/model.tar.gz,Output,Model,Produced,artifact


{'StepName': 'Fraud-Advance-Evaluation', 'StartTime': datetime.datetime(2023, 5, 1, 13, 5, 1, 98000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2023, 5, 1, 13, 5, 1, 839000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'CacheHitResult': {'SourcePipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:057716757052:pipeline/sagemaker-pipeline-phase2-step-by-step/execution/ptk818ucwlgf'}, 'AttemptCount': 0, 'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:057716757052:processing-job/pipelines-ptk818ucwlgf-Fraud-Advance-Evalua-XPz31rfC8Q'}}}


Unnamed: 0,Name/Source,Direction,Type,Association Type,Lineage Type
0,s3://...5b2dc96b96cd633/input/code/evaluation.py,Input,DataSet,ContributedTo,artifact
1,s3://...lgf/Fraud-Advance-Preprocess/output/test,Input,DataSet,ContributedTo,artifact
2,s3://...nce-Train-GozJPnx0HC/output/model.tar.gz,Input,Model,ContributedTo,artifact
3,68331...om/sagemaker-scikit-learn:0.23-1-cpu-py3,Input,Image,ContributedTo,artifact
4,s3://...36ab10f5b2dc96b96cd633/output/evaluation,Output,DataSet,Produced,artifact


{'StepName': 'Fraud-Advance-Condition', 'StartTime': datetime.datetime(2023, 5, 1, 13, 5, 2, 652000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2023, 5, 1, 13, 5, 3, 292000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'AttemptCount': 0, 'Metadata': {'Condition': {'Outcome': 'False'}}}


None

{'StepName': 'Fraud-Advance-Create-SageMaker-Model', 'StartTime': datetime.datetime(2023, 5, 1, 13, 5, 4, 603000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2023, 5, 1, 13, 5, 6, 73000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'AttemptCount': 0, 'Metadata': {'Model': {'Arn': 'arn:aws:sagemaker:us-east-1:057716757052:model/pipelines-a2prwtopeob6-fraud-advance-create-cscljotkob'}}}


None

{'StepName': 'Fraud-Advance-Model_Register-RegisterModel', 'StartTime': datetime.datetime(2023, 5, 1, 13, 5, 4, 603000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2023, 5, 1, 13, 5, 5, 748000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'AttemptCount': 0, 'Metadata': {'RegisterModel': {'Arn': 'arn:aws:sagemaker:us-east-1:057716757052:model-package/sagemaker-webinar-pipeline-advanced/1'}}}


Unnamed: 0,Name/Source,Direction,Type,Association Type,Lineage Type
0,s3://...nce-Train-GozJPnx0HC/output/model.tar.gz,Input,Model,ContributedTo,artifact
1,68331...naws.com/sagemaker-xgboost:1.0-1-cpu-py3,Input,Image,ContributedTo,artifact
2,sagemaker-webinar-pipeline-advanced-1-PendingM...,Input,Approval,ContributedTo,action
3,sagemaker-webinar-pipeline-advanced-1682945375...,Output,ModelGroup,AssociatedWith,context
