# Step Function 기반 MLOps 구축하기

## 1. 사전 준비 과정
### 1.1 MLOps 구현을 위한 Acount 정보 가져오기

In [None]:
import boto3
import json
from sagemaker import get_execution_role
from time import strftime
import calendar
import time

In [None]:
iam_client = boto3.client('iam')
role=get_execution_role()
base_role_name=role.split('/')[-1]

In [None]:
sts_client = boto3.client("sts")
account_id = sts_client.get_caller_identity()['Account']
sess = boto3.Session()
region = sess.region_name

In [None]:
role

### 1.2 MLOps에서 활용할 Policy 설정하기

해당 HOL에서 구현할 아키텍처에 필요한 managed policy를 아래와 같이 정의합니다. Role을 별도 생성하셔도 되지만 HOL의 편의성을 위해 SageMaker Notebook/Studio와 동일한 Role에 policy를 추가하여 계속 활용합니다.

In [None]:
iam_client.attach_role_policy(
 RoleName=base_role_name,
 PolicyArn='arn:aws:iam::aws:policy/AmazonEC2ContainerRegistryFullAccess'
)
iam_client.attach_role_policy(
 RoleName=base_role_name,
 PolicyArn='arn:aws:iam::aws:policy/AmazonEventBridgeFullAccess'
)
iam_client.attach_role_policy(
 RoleName=base_role_name,
 PolicyArn='arn:aws:iam::aws:policy/AWSLambda_FullAccess'
)
iam_client.attach_role_policy(
 RoleName=base_role_name,
 PolicyArn='arn:aws:iam::aws:policy/AWSCodeCommitFullAccess'
)
iam_client.attach_role_policy(
 RoleName=base_role_name,
 PolicyArn='arn:aws:iam::aws:policy/SecretsManagerReadWrite'
)

### 1.3 CodeCommit 생성
CodeCommit 콘솔에 가서 CodeCommit을 생성합니다. 학습 시 사용했던 CodeCommit을 활용하셔도 됩니다.


![codecommit-intro.png](../figures/codecommit-intro.png)

### 1.4 CodeCommit 관련 Credentials 생성 및 Secret Manager에 저장하기

#### - CodeCommit Credentials

In [None]:
user_name = 'XXXXXX' ## ==> IAM에서 사용자 아이디 확인
codecommit_cred = 'codecommit-cred-'+user_name
codecommit_cred

In [None]:
try:
 response = iam_client.list_service_specific_credentials(
 UserName=user_name,
 ServiceName='codecommit.amazonaws.com'
 )
 if len(response['ServiceSpecificCredentials']) > 0:
 response = iam_client.delete_service_specific_credential(
 UserName=user_name,
 ServiceSpecificCredentialId=response['ServiceSpecificCredentials'][-1]['ServiceSpecificCredentialId']
 )
except:
 print("Create new codecommit crendentials")
 pass
finally:
 response = iam_client.create_service_specific_credential(
 UserName=user_name,
 ServiceName='codecommit.amazonaws.com'
 )
 ServiceUserName = response['ServiceSpecificCredential']['ServiceUserName']
 ServicePassword = response['ServiceSpecificCredential']['ServicePassword']
print(f"ServiceUserName : {ServiceUserName} \nServicePassword : {ServicePassword}")

#### - Secret Manager (Optional)
CodeCommit의 Credentials 정보를 Secret Manager에 Key, Value로 넣어놓고 안전하게 사용합니다.

In [None]:
sec_client = boto3.client('secretsmanager')

In [None]:
secret_string = json.dumps({
 "username": ServiceUserName,
 "password": ServicePassword
 })

In [None]:
sec_list = [[sec_name['Name'], sec_name['ARN']] for sec_name in sec_client.list_secrets()['SecretList'] if sec_name['Name'] == codecommit_cred]

if len(sec_list) == 0:
 sec_response = sec_client.create_secret(
 Name=codecommit_cred,
 Description='This credential uses git_config for SageMaker in Lambda',
 SecretString=secret_string,
 Tags=[
 {
 'Key': 'codecommit-name',
 'Value': 'codecommit_credentials'
 },
 ]
 )
 sec_arn = sec_response['ARN']
 print(f'sec_arn : {sec_arn}')
else:
 print(f'sec_arn : {sec_list[0][1]}')
 sec_response = sec_client.update_secret(
 SecretId=sec_list[0][1],
 SecretString=secret_string
 )
 sec_arn = sec_list[0][1]

In [None]:
%store sec_arn

## 2. MLOps 구성하기

### 2.1 Create Step functions

- step_functions 폴더 내의 mlops-yolov5.json을 import하여 기존 구성한 base definition을 활용합니다.

![step-functions.png](../figures/step-functions.png)

### 2.2 Step functions의 Role에 대한 정책(Policy) 추가
- 기본적으로 Lambda를 이용하여 AWS 의 SageMaker 등을 활용할 예정이므로 Step functions을 실행하는 Role에는 LambdaFullAccess 정책이 추가되어야 합니다.

![step-functions-role.png](../figures/step-functions-role.png)

## 3. Step Functions에 활용할 Lambda 함수 생성
### 3.1 Start-Training-Job의 Lambda 생성

In [None]:
!mkdir ./1-Start-Training-Job

In [None]:
%%writefile 1-Start-Training-Job/sm_training_job.py

import json
import boto3
import os
from time import strftime
import subprocess
import sagemaker

import datetime
import glob
import os
import time
import warnings

from sagemaker.pytorch import PyTorch

from smexperiments.experiment import Experiment
from smexperiments.trial import Trial

import base64
from botocore.exceptions import ClientError


instance_type = os.environ["INSTANCE_TYPE"]


def lambda_handler(event, context):

 role = 'arn:aws:iam::${account_id}:role/service-role/AmazonSageMaker-ExecutionRole-{}' ### <== 1. Role 추가
 
 sagemaker_session = sagemaker.Session()
 
 experiment_name = 'yolov5-poc-exp1' ### <== 2. Experiment 명
 
 instance_count = 1
 do_spot_training = False
 max_wait = None
 max_run = 2*60*60
 
 ## SageMaker Experiments Setting
 try:
 sm_experiment = Experiment.load(experiment_name)
 except:
 sm_experiment = Experiment.create(
 experiment_name=experiment_name,
 tags=[{'Key': 'model-name', 'Value': 'yolov5'}]
 ) 
 
 ## Trials Setting
 create_date = strftime("%m%d-%H%M%s")
 
 sm_trial = Trial.create(trial_name=f'{experiment_name}-{create_date}',
 experiment_name=experiment_name)

 job_name = f'{sm_trial.trial_name}'
 
 #TODO: use the bucket name

 bucket = '' ### <== 3. 사용할 Bucket 명
 code_location = f's3://{bucket}/yolov5/sm_codes'
 output_path = f's3://{bucket}/yolov5/output' 
 
 # TODO
 metric_definitions = [
 {'Name': 'Precision', 'Regex': r'all\s+[0-9.]+\s+[0-9.]+\s+([0-9.]+)'},
 {'Name': 'Recall', 'Regex': r'all\s+[0-9.]+\s+[0-9.]+\s+[0-9.]+\s+([0-9.]+)'},
 {'Name': 'mAP@.5', 'Regex': r'all\s+[0-9.]+\s+[0-9.]+\s+[0-9.]+\s+[0-9.]+\s+([0-9.]+)'},
 {'Name': 'mAP@.5:.95', 'Regex': r'all\s+[0-9.]+\s+[0-9.]+\s+[0-9.]+\s+[0-9.]+\s+[0-9.]+\s+([0-9.]+)'}
 ]
 
 hyperparameters = {
 'data': 'data_sm.yaml',
 'cfg': 'yolov5s.yaml',
 'weights': 'weights/yolov5s.pt', # Transfer learning
 'batch-size': 64,
 'epochs': 1,
 'project': '/opt/ml/model',
 'workers': 0, # To avoid shm OOM issue
 'freeze': 10, # For transfer learning, freeze all Layers except for the final output convolution layers.
 }
 
 
 s3_data_path = f's3://{bucket}/dataset/BCCD'
 checkpoint_s3_uri = f's3://{bucket}/poc_yolov5/checkpoints'

 if do_spot_training:
 max_wait = max_run

 
 secret=get_secret()
 
 ## 
 codecommit_repo = f'https://git-codecommit.${region}.amazonaws.com/v1/repos/${git_repo_name}' ### <== 4. source codecommit repository
 
 git_config = {'repo': codecommit_repo,
 'branch': 'main',
 'username': secret['username'],
 'password': secret['password']}
 
 source_dir = 'yolov5'
 
 estimator = PyTorch(
 entry_point='train.py',
 source_dir=source_dir,
 git_config=git_config,
 role=role,
 sagemaker_session=sagemaker_session,
 framework_version='1.10',
 py_version='py38',
 instance_count=instance_count,
 instance_type=instance_type,
 # volume_size=1024,
 code_location = code_location,
 output_path=output_path,
 metric_definitions=metric_definitions,
 hyperparameters=hyperparameters,
 # distribution=distribution,
 # disable_profiler=True,
 # debugger_hook_config=False,
 max_run=max_run,
 use_spot_instances=do_spot_training,
 max_wait=max_wait,
 checkpoint_s3_uri=checkpoint_s3_uri,
 )
 
 estimator.fit(
 inputs={'inputdata': s3_data_path},
 job_name=job_name,
 wait=False,
 )
 
 event['training_job_name'] = job_name
 event['stage'] = 'Training'
 
 return event
 

def get_secret():
 secret_name = f"arn:aws:secretsmanager:${region}:${account-id}:secret:${secret-manager-name}" ### <== 5. Secret Manager ARN 정보
 region_name = "ap-northeast-2" ### <== 6. region 명

 secret = {}
 # Create a Secrets Manager client
 session = boto3.session.Session()
 client = session.client(
 service_name='secretsmanager',
 region_name=region_name
 )

 # In this sample we only handle the specific exceptions for the 'GetSecretValue' API.
 # See https://docs.aws.amazon.com/secretsmanager/latest/apireference/API_GetSecretValue.html
 # We rethrow the exception by default.

 get_secret_value_response = client.get_secret_value(
 SecretId=secret_name
 )
 
 if 'SecretString' in get_secret_value_response:
 secret = get_secret_value_response['SecretString']
 secret = json.loads(secret)
 else:
 print("secret is not defined. Checking the Secrets Manager")

 return secret



In [None]:
%%writefile 1-Start-Training-Job/Dockerfile

# Define function directory
ARG FUNCTION_DIR="/function"

FROM python:buster as build-image

# Install aws-lambda-cpp build dependencies
RUN apt-get update && \
 apt-get install -y \
 g++ \
 make \
 cmake \
 unzip \
 git \
 libcurl4-openssl-dev

# Include global arg in this stage of the build
ARG FUNCTION_DIR
# Create function directory
RUN mkdir -p ${FUNCTION_DIR}

# Copy function code
COPY sm_training_job.py ${FUNCTION_DIR}
# COPY git_lambda ${FUNCTION_DIR}/git_lambda
# COPY yolov5 ${FUNCTION_DIR}/yolov5

# Install the runtime interface client
RUN pip install \
 --target ${FUNCTION_DIR} \
 awslambdaric sagemaker smdebug sagemaker-experiments

# Multi-stage build: grab a fresh copy of the base image
FROM python:buster

# Include global arg in this stage of the build
ARG FUNCTION_DIR
# Set working directory to function root directory
WORKDIR ${FUNCTION_DIR}

# Copy in the build image dependencies
COPY --from=build-image ${FUNCTION_DIR} ${FUNCTION_DIR}

ENTRYPOINT [ "/usr/local/bin/python", "-m", "awslambdaric" ]
CMD [ "sm_training_job.lambda_handler" ]

#### - Role 정보

In [None]:
import sagemaker
role = sagemaker.get_execution_role()
role

#### - Secret Manager ARN 정보

In [None]:
sec_arn = sec_response['ARN']
%store sec_arn
sec_arn

In [None]:
%store -r

In [None]:
print(f"bucket : {bucket}\ncodecommit_repo : {codecommit_repo}")

#### - Training job의 lambda_function 생성
- 기존 Yolov5의 학습작업의 실행 노트북에서 작성한 학습 클러스터의 정의와 실행 명령어를 그대로 사용하시면 됩니다.
- 학습 코드는 CodeCommit에 push 된 코드를 가져와서 학습을 실행하게 됩니다.

#### - Lambda Container Image 생성 후 ECR Push하기

In [None]:
%%bash
cd ./1-Start-Training-Job
echo $(pwd)
container_name=lambda-yolo5-training
account=$(aws sts get-caller-identity --query Account --output text)

# Get the region defined in the current configuration (default to us-west-2 if none defined)
region=$(aws configure get region)
# region=${region:-us-west-2}

fullname="${account}.dkr.ecr.${region}.amazonaws.com/${container_name}:1.0"

# If the repository doesn't exist in ECR, create it.
aws ecr describe-repositories --repository-names "${container_name}" > /dev/null 2>&1
if [ $? -ne 0 ]
then
 aws ecr create-repository --repository-name "${container_name}" > /dev/null
fi

# # Get the login command from ECR and execute it directly
# $(aws ecr get-login-password --region us-west-2 | docker login --username AWS --password-stdin "763104351884.dkr.ecr.us-west-2.amazonaws.com")

# Build the docker image locally with the image name and then push it to ECR
# with the full name.
docker build -f Dockerfile -t ${fullname} .
# docker tag ${container_name} ${fullname}

# Get the login command from ECR and execute it directly
$(aws ecr get-login --region ${region} --no-include-email)
docker push ${fullname}

#### - Lambda 함수 생성
![lambda-container.png](../figures/lambda-container.png)


#### - Lambda Role 설정
Role은 기본적인 Role을 생성하신 후, 필요한 Policy를 추가하도록 합니다.
- SecretsManagerReadWrite
- AmazonSageMakerFullAccess
- AWSLambdaBasicExecutionRole

![lambda-role-setting.png](../figures/lambda-role-setting.png)

#### - Lambda start-training-job의 설정 변경
![lambda-start-training-job-config.png](../figures/lambda-start-training-job-config.png)

### 3.2 Check Status Training
lambda에서 수행되는 함수를 zip으로 묶어서 upload를 합니다.

In [None]:
!mkdir ./2-Check-Status-Training

In [None]:
%%writefile 2-Check-Status-Training/lambda_function.py

import json
import boto3
import os

sagemaker = boto3.client('sagemaker')

def lambda_handler(event, context):
 stage = event['stage']

 if stage == 'Training':
 training_job_name = event['training_job_name']
 training_details = describe_training_job(training_job_name)
 print(training_details)

 status = training_details['TrainingJobStatus']
 if status == 'Completed':
 s3_output_path = training_details['OutputDataConfig']['S3OutputPath']
 model_data_url = os.path.join(s3_output_path, training_details['TrainingJobName'], 'output/model.tar.gz')

 event['message'] = 'Training job "{}" complete. Model data uploaded to "{}"'.format(training_job_name, model_data_url)
 event['model_data_url'] = model_data_url
 event['training_job'] = training_details['TrainingJobName']
 elif status == 'Failed':
 failure_reason = training_details['FailureReason']
 event['message'] = 'Training job failed. {}'.format(failure_reason)
 
 event['status'] = status
 
 print(event)
 
 return event

def describe_training_job(name):
 """ Describe SageMaker training job identified by input name.
 Args:
 name (string): Name of SageMaker training job to describe.
 Returns:
 (dict)
 Dictionary containing metadata and details about the status of the training job.
 """
 try:
 response = sagemaker.describe_training_job(
 TrainingJobName = name
 )
 except Exception as e:
 print(e)
 print('Unable to describe training job.')
 raise(e)
 
 return response


In [None]:
%%bash
cd ./2-Check-Status-Training
rm -rf check-status-training.zip
zip -r check-status-training.zip lambda_function.py

In [None]:
%store -r
print(f"bucket : {bucket}")

In [None]:
!aws s3 cp 2-Check-Status-Training/check-status-training.zip s3://$bucket/lambda_function/

### 3.3 Check Accuracy
lambda에서 수행되는 함수를 zip으로 묶어서 upload를 합니다.

In [None]:
!mkdir ./3-Check-Accuracy

In [None]:
%%writefile 3-Check-Accuracy/lambda_function.py


import json
import boto3
import tarfile
from io import BytesIO
import os
import pickle
from io import StringIO
import csv


s3 = boto3.client('s3')
sm = boto3.client('sagemaker')
s3_resource = boto3.resource('s3')


acc_col_num = os.environ['ACC_COL_NUM']
bucket = os.environ['BUCKET']


def lambda_handler(event, context):
 # print(event) 
 model_data_url = event['model_data_url']
 # bucket = event['bucket']
 key_value = model_data_url.split(bucket)[1][1:]
 print(key_value)
 tar_file_obj = s3.get_object(Bucket=bucket, Key=key_value)
 tar_content = tar_file_obj ['Body'].read()
 
 accuracy = 0
 
 with tarfile.open(fileobj = BytesIO(tar_content)) as tar:
 for tar_resource in tar:
 if (tar_resource.isfile()):
 if "results.csv" in tar_resource.name:
 inner_file_bytes = tar.extractfile(tar_resource).read()
 file_data = inner_file_bytes.decode('utf-8')
 file = StringIO(file_data)
 csv_data = csv.reader(file, delimiter=",")

 max_line = len(list(csv_data))

 file = StringIO(file_data)
 csv_data = csv.reader(file, delimiter=",")

 line_count = 0

 for row in csv_data:
 line_count += 1
 if line_count == max_line:
 accuracy = row[int(acc_col_num)].lstrip()
 
 print("accuracy is " + accuracy)
 
 desired_accuracy = event['desired_accuracy']
 
 if accuracy > desired_accuracy:
 event['train_result'] = "PASS"
 print("PASS")
 else:
 event['train_result'] = "FAIL"
 print("FAIL")

 return event


In [None]:
%%bash
cd ./3-Check-Accuracy
rm -rf check-accuracy.zip
zip -r check-accuracy.zip lambda_function.py

In [None]:
!aws s3 cp 3-Check-Accuracy/check-accuracy.zip s3://$bucket/lambda_function/

#### - lambda-check-accuracy 설정 추가

- check하는 시간이 3초 이상 소요되므로 20초 정도로 변경합니다.
- 또한, 버킷 이름을 추가해 주시면 됩니다.

![lambda-check-accuracy-setting.png](../figures/lambda-check-accuracy-setting.png)

### 3.4 Reister Model
- 학습이 완료된 이후 설정한 desired accuracy를 넘는 모델 Artifacts는 Model Registry에 등록하게 됩니다.

In [None]:
!mkdir 4-Register-Model

In [None]:
import sagemaker
image_uri = sagemaker.image_uris.retrieve(framework='pytorch', 
 image_scope='training',
 version='1.10',
 instance_type='ml.c5.2xlarge', 
 region=region)
image_uri

In [None]:
%%writefile 4-Register-Model/lambda_function.py

import json
import boto3
import botocore
import os


sm_client = boto3.client("sagemaker")

model_package_group_name = os.environ['MODEL_PACKAGE_GROUP_NAME']
model_package_group_desc = os.environ['MODEL_PACKAGE_GROUP_DESC']


def lambda_handler(event, context):
 
 modelpackage_inference_specification = {
 "InferenceSpecification": {
 "Containers": [
 {
 "Image": '763104351884.dkr.ecr.ap-northeast-2.amazonaws.com/pytorch-training:1.10-cpu-py38',
 }
 ],
 "SupportedContentTypes": [ "application/x-image" ],
 "SupportedResponseMIMETypes": [ "application/x-image" ],
 }
 }
 
 model_data_url = event['model_data_url'] 
 
 
 # Specify the model data
 modelpackage_inference_specification["InferenceSpecification"]["Containers"][0]["ModelDataUrl"]=model_data_url
 
 create_model_package_input_dict = {
 "ModelPackageGroupName" : model_package_group_name,
 "ModelPackageDescription" : model_package_group_desc,
 "ModelApprovalStatus" : "PendingManualApproval"
 }

 create_model_package_input_dict.update(modelpackage_inference_specification)
 modelpackage_inference_specification["InferenceSpecification"]["Containers"][0]
 
 try:
 create_mode_package_response = sm_client.create_model_package(**create_model_package_input_dict)
 except botocore.exceptions.ClientError as ce:
 # When model package group does not exit
 print('Model package grop does not exist. Creating a new one')
 if ce.operation_name == "CreateModelPackage":
 if ce.response["Error"]["Message"] == "Model Package Group does not exist.":
 # Create model package group
 create_model_package_group_response = sm_client.create_model_package_group(
 ModelPackageGroupName=model_package_group_name,
 ModelPackageGroupDescription=model_package_group_desc,
 )
 
 create_mode_package_response = sm_client.create_model_package(**create_model_package_input_dict)
 
 return event


In [None]:
%%bash
cd ./4-Register-Model
rm -rf register-model.zip
zip -r register-model.zip lambda_function.py

In [None]:
!aws s3 cp ./4-Register-Model/register-model.zip s3://$bucket/lambda_function/

#### - lambda-register-model의 설정 추가

- 추론에 사용된 ECR의 Container URI, 생성할 Model Packagegroup의 name과 Description을 환경변수에 추가합니다.

![lambda-register-model-setting.png](../figures/lambda-register-model-setting.png)

## 4. Lambda-Step-Functions-Trigger
- Step Function을 실행하는 Lambda 함수를 생성합니다.
- 이 Lambda 함수를 통해 S3의 object가 추가된 경우 또는 CodeCommit에 신규 학습 코드가 push 된 경우 자동으로 Step Functions을 실행할 수 있습니다.

In [None]:
!mkdir 5-Step-Functions-Trigger

In [None]:
%%writefile 5-Step-Functions-Trigger/lambda_function.py

import json
import boto3
import os


s3 = boto3.client('s3')
sf = boto3.client('stepfunctions')


state_machine_arn = os.environ['STATE_MACHINE_ARN']
desired_accuracy = os.environ['DESIRED_ACCURACY']


def lambda_handler(event, context):
 
 print(event)
 json_string = {
 "desired_accuracy": desired_accuracy
 }
 
 # json_content = json.loads(json_string)
 print(json_string)
 
 sf.start_execution(
 stateMachineArn = state_machine_arn,
 input = json.dumps(json_string))
 
 return event


In [None]:
%%bash
cd ./5-Step-Functions-Trigger
rm -rf step-functions-trigger.zip
zip -r step-functions-trigger.zip lambda_function.py

In [None]:
!aws s3 cp ./5-Step-Functions-Trigger/step-functions-trigger.zip s3://$bucket/lambda_function/

### - Lambda step functions trigger의 설정 변경

![lambda-step-functions-trigger-config.png](../figures/lambda-step-functions-trigger-config.png)

### - Trigger 추가
Trigger를 추가할 경우 아래와 같이 추가를 할 수 있습니다.


![step-functions-trigger.png](../figures/step-functions-trigger.png)