# Preprocessing 
* Container: codna_pytorch_py39

## AutoReload

In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import boto3

## 1. Processing-job for preprocessing

In [3]:
import os
import wget
import sagemaker
from sagemaker.pytorch.estimator import PyTorch
from sagemaker.workflow.execution_variables import ExecutionVariables
from sagemaker.processing import ProcessingInput, ProcessingOutput, FrameworkProcessor

## 2. parameter store 설정

In [4]:
from utils.ssm import parameter_store
strRegionName=boto3.Session().region_name
pm = parameter_store(strRegionName)
prefix = pm.get_params(key="PREFIX")

* params for processing job

In [14]:
local_mode = True

if local_mode: 
 instance_type = 'local'
 
 import os
 from sagemaker.local import LocalSession
 
 sagemaker_session = LocalSession()
 data_path = os.path.join(os.getcwd(), "data")
 
else:
 instance_type = "ml.m5.xlarge" ## "ml.g4dn.xlarge"
 sagemaker_session = sagemaker.Session()
 data_path = pm.get_params(key=prefix + '-S3-DATA-PATH')
 
git_config = {
 'repo': f'https://{pm.get_params(key="-".join([prefix, "CODE_REPO"]))}',
 'branch': 'main',
 'username': pm.get_params(key="-".join([prefix, "CODECOMMIT-USERNAME"]), enc=True),
 'password': pm.get_params(key="-".join([prefix, "CODECOMMIT-PWD"]), enc=True)
} 
 
print (f"instance-type: {instance_type}")
print (f"image-uri: {pm.get_params(key=''.join([prefix, '-IMAGE-URI']))}")
print (f"role: {pm.get_params(key=prefix + '-SAGEMAKER-ROLE-ARN')}")
print (f"bucket: {pm.get_params(key=prefix + '-BUCKET')}")
print (f"dataset-path: {data_path}")
print (f"sagemaker_session: {sagemaker_session}")
print (f"git_config: {git_config}")

instance-type: local
image-uri: 419974056037.dkr.ecr.us-east-1.amazonaws.com/nemo-test-training
role: arn:aws:iam::419974056037:role/service-role/AmazonSageMaker-ExecutionRole-20221206T163436
bucket: sm-nemo-ramp
dataset-path: /home/ec2-user/SageMaker/nemo-on-sagemaker/1.building-component/data
sagemaker_session: 
git_config: {'repo': 'https://git-codecommit.us-east-1.amazonaws.com/v1/repos/nemo-code', 'branch': 'main', 'username': 'dongjin-at-419974056037', 'password': 'wtLv/fP4ESjBDnyW5xgqFPGR0dMTIyK5/8gK6IS1Zsg='}


* Define processing job

In [15]:
dataset_processor = FrameworkProcessor(
 estimator_cls=PyTorch,
 framework_version=None,
 image_uri=pm.get_params(key=''.join([prefix, "-IMAGE-URI"])),
 instance_type=instance_type,
 instance_count=1,
 role=pm.get_params(key=prefix + "-SAGEMAKER-ROLE-ARN"),
 base_job_name="preprocessing", # bucket에 보이는 이름 (pipeline으로 묶으면 pipeline에서 정의한 이름으로 bucket에 보임)
 sagemaker_session=sagemaker_session
)

proc_prefix = "/opt/ml/processing"

output_path = os.path.join(
 "s3://{}".format(pm.get_params(key=prefix + "-BUCKET")),
 prefix,
 "preprocessing",
 "data"
)

In [16]:
output_path

's3://sm-nemo-ramp/nemo-asr/preprocessing/data'

In [19]:
dataset_processor.run(
 #job_name="preprocessing", ## 이걸 넣어야 캐시가 작동함, 안그러면 프로세서의 base_job_name 이름뒤에 날짜 시간이 붙어서 캐시 동작 안함
 code='preprocessing.py', #소스 디렉토리 안에서 파일 path
 source_dir= "./code", #현재 파일에서 소스 디렉토리 상대경로 # add processing.py and requirements.txt here
 git_config=git_config,
 inputs=[
 ProcessingInput(
 input_name="input-data",
 source=data_path,
 destination=os.path.join(proc_prefix, "input")
 ),
 ],
 outputs=[ 
 ProcessingOutput(
 output_name="output-data",
 source=os.path.join(proc_prefix, "output"),
 destination=output_path
 ),
 ],
 arguments=["--proc_prefix", proc_prefix, \
 "--train_mount_dir", "/opt/ml/input/data/training/", \
 "--test_mount_dir", "/opt/ml/input/data/testing/"],
)

Cloning into '/tmp/tmpn_1lg81x'...
remote: Counting objects: 20, done. 
Already on 'main'


Your branch is up to date with 'origin/main'.


INFO:sagemaker:Creating processing-job with name preprocessing-2023-03-22-10-16-10-483
INFO:sagemaker.local.local_session:Starting processing job
INFO:botocore.credentials:Found credentials from IAM Role: BaseNotebookInstanceEc2InstanceRole
INFO:sagemaker.local.image:No AWS credentials found in session but credentials from EC2 Metadata Service are available.
INFO:sagemaker.local.image:docker compose file: 
networks:
 sagemaker-local:
 name: sagemaker-local
services:
 algo-1-w6h0y:
 container_name: jv6cmky3ha-algo-1-w6h0y
 entrypoint:
 - /bin/bash
 - /opt/ml/processing/input/entrypoint/runproc.sh
 - --proc_prefix
 - /opt/ml/processing
 - --train_mount_dir
 - /opt/ml/input/data/training/
 - --test_mount_dir
 - /opt/ml/input/data/testing/
 environment: []
 image: 419974056037.dkr.ecr.us-east-1.amazonaws.com/nemo-test-training
 networks:
 sagemaker-local:
 aliases:
 - algo-1-w6h0y
 stdin_open: true
 tty: true
 volumes:
 - /tmp/tmpczdb0bse/algo-1-w6h0y/config:/opt/ml/config
 - /tmp/tmpczdb0

Creating jv6cmky3ha-algo-1-w6h0y ... 
Creating jv6cmky3ha-algo-1-w6h0y ... done
Attaching to jv6cmky3ha-algo-1-w6h0y
[36mjv6cmky3ha-algo-1-w6h0y |[0m Received arguments Namespace(proc_prefix='/opt/ml/processing', train_mount_dir='/opt/ml/input/data/training/', test_mount_dir='/opt/ml/input/data/testing/')
[36mjv6cmky3ha-algo-1-w6h0y |[0m Converting .sph to .wav...
[36mjv6cmky3ha-algo-1-w6h0y |[0m Finished conversion.
[36mjv6cmky3ha-algo-1-w6h0y |[0m ******
[36mjv6cmky3ha-algo-1-w6h0y |[0m ******
[36mjv6cmky3ha-algo-1-w6h0y |[0m Training manifest created.
[36mjv6cmky3ha-algo-1-w6h0y |[0m Test manifest created.
[36mjv6cmky3ha-algo-1-w6h0y |[0m ***Done***
[36mjv6cmky3ha-algo-1-w6h0y |[0m data_dir ['entrypoint', 'code', 'an4']
[36mjv6cmky3ha-algo-1-w6h0y |[0m self.output_dir ['an4']
[36mjv6cmky3ha-algo-1-w6h0y exited with code 0
[0mAborting on container exit...




===== Job Complete =====


In [20]:
!aws s3 sync $output_path ./data/preprocessing --quiet
output_path

's3://sm-nemo-ramp/nemo-asr/preprocessing/data'

## 3. parameter store에 Processing output 추가

In [21]:
pm.put_params(key="-".join([prefix, "PREP-DATA-PATH"]), value=output_path, overwrite=True)

'Store suceess'