{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# [모듈 2-1] SageMaker Pipelines 사용하기\n", "이 노트북에서는 아래와 같은 작업을 수행합니다.\n", "- 데이터 준비\n", "- Pipeline 정의\n", "- 데이터 전처리: Processing Step 이용\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import sagemaker\n", "\n", "sagemaker_session = sagemaker.session.Session()\n", "bucket = sagemaker_session.default_bucket()\n", "role = sagemaker.get_execution_role()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pipeline_prefix = 'autoglueon'\n", "s3_data_path = f's3://{bucket}/{pipeline_prefix}/raw'\n", "s3_config_path = f's3://{bucket}/{pipeline_prefix}/config'\n", "\n", "sm_dataset_path = '../data/raw'\n", "sm_claims_data_path = f'{sm_dataset_path}/claims.csv'\n", "sm_customers_data_path = f'{sm_dataset_path}/customers.csv'\n", "sm_config_path = f'../config'" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(\"claim_data_path: \", sm_claims_data_path)\n", "print(\"customer_data_path: \", sm_customers_data_path)\n", "print(\"s3_data_path: \", s3_data_path)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "s3_claims_data_path = sagemaker.s3.S3Uploader.upload(\n", " local_path = sm_claims_data_path, \n", " desired_s3_uri = s3_data_path\n", ")\n", "print(\"claims data path in S3: \", s3_claims_data_path)\n", "\n", "s3_customers_data_path = sagemaker.s3.S3Uploader.upload(\n", " local_path = sm_customers_data_path, \n", " desired_s3_uri = s3_data_path\n", ")\n", "print(\"customers data path in S3: \", s3_customers_data_path)\n", "\n", "\n", "s3_config_path = sagemaker.s3.S3Uploader.upload(\n", " local_path = sm_config_path, \n", " desired_s3_uri = s3_config_path\n", ")\n", "print(\"config path in S3: \", s3_config_path)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# 2.2 전처리 스텝 개발" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import os\n", "import boto3\n", "import pandas as pd\n", "from IPython.display import display as dp" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df_customers = pd.read_csv(sm_customers_data_path)\n", "df_customers.head()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df_claims = pd.read_csv(sm_claims_data_path)\n", "df_claims.head()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%writefile src/preprocess.py\n", "\n", "import argparse\n", "import os\n", "import requests\n", "import tempfile\n", "import subprocess, sys\n", "\n", "import pandas as pd\n", "import numpy as np\n", "from glob import glob\n", "\n", "from sklearn.compose import ColumnTransformer\n", "from sklearn.impute import SimpleImputer\n", "from sklearn.pipeline import Pipeline\n", "from sklearn.preprocessing import StandardScaler, OneHotEncoder\n", "\n", "import logging\n", "import logging.handlers\n", "\n", "def _get_logger():\n", " '''\n", " 로깅을 위해 파이썬 로거를 사용\n", " # https://stackoverflow.com/questions/17745914/python-logging-module-is-printing-lines-multiple-times\n", " '''\n", " loglevel = logging.DEBUG\n", " l = logging.getLogger(__name__)\n", " if not l.hasHandlers():\n", " l.setLevel(loglevel)\n", " logging.getLogger().addHandler(logging.StreamHandler(sys.stdout)) \n", " l.handler_set = True\n", " return l \n", "\n", "logger = _get_logger()\n", "\n", "\n", "def split_train_test(df, test_ratio=0.1):\n", " '''\n", " 두 개의 데이터 세트로 분리\n", " '''\n", " total_rows = df.shape[0]\n", " train_end = int(total_rows * (1 - test_ratio))\n", " \n", " train_df = df[0:train_end]\n", " test_df = df[train_end:]\n", " \n", " return train_df, test_df\n", "\n", "\n", "def get_dataframe(base_preproc_input_dir, file_name_prefix ): \n", " '''\n", " 파일 이름이 들어가 있는 csv 파일을 모두 저장하여 데이터 프레임을 리턴\n", " '''\n", " \n", " input_files = glob('{}/{}*.csv'.format(base_preproc_input_dir, file_name_prefix))\n", " #claim_input_files = glob('{}/dataset*.csv'.format(base_preproc_input_dir)) \n", " logger.info(f\"input_files: \\n {input_files}\") \n", " \n", " if len(input_files) == 0:\n", " raise ValueError(('There are no files in {}.\\n' +\n", " 'This usually indicates that the channel ({}) was incorrectly specified,\\n' +\n", " 'the data specification in S3 was incorrectly specified or the role specified\\n' +\n", " 'does not have permission to access the data.').format(base_preproc_input_dir, \"train\"))\n", " \n", " raw_data = [ pd.read_csv(file, index_col=0) for file in input_files ]\n", " df = pd.concat(raw_data)\n", " \n", " logger.info(f\"dataframe shape \\n {df.shape}\") \n", " logger.info(f\"dataset sample \\n {df.head(2)}\") \n", " #logger.info(f\"df columns \\n {df.columns}\") \n", " \n", " return df\n", "\n", "\n", "def convert_type(raw, cols, type_target):\n", " '''\n", " 해당 데이터 타입으로 변경\n", " '''\n", " df = raw.copy()\n", " \n", " for col in cols:\n", " df[col] = df[col].astype(type_target)\n", " \n", " return df\n", " \n", "\n", "if __name__ =='__main__':\n", " \n", " ################################\n", " #### 커맨드 인자 파싱 \n", " ################################# \n", " \n", " parser = argparse.ArgumentParser()\n", " parser.add_argument('--base_output_dir', type=str, default=\"/opt/ml/processing/output\")\n", " parser.add_argument('--base_preproc_input_dir', type=str, default=\"/opt/ml/processing/input\") \n", " parser.add_argument('--split_rate', type=float, default=0.1) \n", " parser.add_argument('--label_column', type=str, default=\"fraud\") \n", " # parse arguments\n", " args = parser.parse_args() \n", " \n", " logger.info(\"######### Argument Info ####################################\")\n", " logger.info(f\"args.base_output_dir: {args.base_output_dir}\")\n", " logger.info(f\"args.base_preproc_input_dir: {args.base_preproc_input_dir}\") \n", " logger.info(f\"args.label_column: {args.label_column}\") \n", " logger.info(f\"args.split_rate: {args.split_rate}\") \n", "\n", " base_output_dir = args.base_output_dir\n", " base_preproc_input_dir = args.base_preproc_input_dir\n", " label_column = args.label_column \n", " split_rate = args.split_rate\n", "\n", " ################################# \n", " #### 두개의 파일(claim, customer) 을 로딩하여 policy_id 로 조인함 ########\n", " ################################# \n", " \n", " logger.info(f\"\\n### Loading Claim Dataset\")\n", " claim_df = get_dataframe(base_preproc_input_dir,file_name_prefix='claim' ) \n", " \n", " logger.info(f\"\\n### Loading Customer Dataset\") \n", " customer_df = get_dataframe(base_preproc_input_dir,file_name_prefix='customer' ) \n", " \n", " df = customer_df.join(claim_df, how='left')\n", " logger.info(f\"### dataframe merged with customer and claim: {df.shape}\")\n", "\n", "\n", " ################################# \n", " #### 카테고리 피쳐를 원핫인코딩 \n", " ################################# \n", " \n", " logger.info(f\"\\n ### Encoding: Category Features\") \n", " categorical_features = df.select_dtypes(include=['object']).columns.values.tolist() \n", " #categorical_features = ['driver_relationship'] \n", " logger.info(f\"categorical_features: {categorical_features}\") \n", "\n", " categorical_transformer = Pipeline(\n", " steps=[\n", " (\"imputer\", SimpleImputer(strategy=\"constant\", fill_value=\"missing\")),\n", " (\"onehot\", OneHotEncoder(handle_unknown=\"ignore\"))\n", " ]\n", " )\n", " \n", " preprocess = ColumnTransformer(\n", " transformers=[\n", " (\"cat\", categorical_transformer, categorical_features)\n", " ],\n", " sparse_threshold = 0, # dense format 으로 제공\n", " )\n", "\n", " X_pre_category = preprocess.fit_transform(df)\n", " \n", "\n", " # 원핫인코딩한 컬럼의 이름 로딩\n", " # Ref: Sklearn Pipeline: Get feature names after OneHotEncode In ColumnTransformer, https://stackoverflow.com/questions/54646709/sklearn-pipeline-get-feature-names-after-onehotencode-in-columntransformer\n", " \n", " processed_category_features = preprocess.transformers_[0][1].named_steps['onehot'].get_feature_names(categorical_features)\n", " #logger.info(f\"processed_category_features: {processed_category_features}\")\n", "# print(X_pre)\n", " \n", " ###############################\n", " ### 숫자형 변수 전처리 \n", " ###############################\n", " \n", " logger.info(f\"\\n ### Encoding: Numeric Features\") \n", " \n", " float_cols = df.select_dtypes(include=['float64']).columns.values\n", " int_cols = df.select_dtypes(include=['int64']).columns.values\n", " numeric_features = np.concatenate((float_cols, int_cols), axis=0).tolist()\n", " \n", " logger.info(f\"int_cols: \\n{int_cols}\") \n", " logger.info(f\"float_cols: \\n{float_cols}\") \n", " #logger.info(f\"numeric_features: \\n{numeric_features}\")\n", "\n", " # 따로 스케일링은 하지 않고, 미싱 값만 중간값을 취함\n", " numeric_transformer = Pipeline(\n", " steps=[\n", " (\"imputer\", SimpleImputer(strategy=\"median\")),\n", " # (\"scaler\", StandardScaler())\n", " ]\n", " )\n", "\n", " numeric_preprocessor = ColumnTransformer(\n", " transformers=[\n", " (\"cat\", numeric_transformer, numeric_features)\n", " ],\n", " sparse_threshold = 0,\n", " )\n", "\n", " X_pre_numeric = numeric_preprocessor.fit_transform(df) \n", "\n", " \n", " ###############################\n", " ### 전처리 결과 결합 ####\n", " ###############################\n", " \n", " logger.info(f\"\\n ### Handle preprocess results\") \n", " \n", " # 전처리 결과를 데이터 프레임으로 생성\n", " category_df = pd.DataFrame(data=X_pre_category, columns=processed_category_features)\n", " numeric_df = pd.DataFrame(data=X_pre_numeric, columns=numeric_features) \n", "\n", " full_df = pd.concat([numeric_df, category_df ], axis=1)\n", " \n", " # float 타입을 int 로 변경\n", " full_df = convert_type(full_df, cols=int_cols, type_target='int')\n", " full_df = convert_type(full_df, cols=processed_category_features, type_target='int') \n", " \n", " # label_column을 맨 앞으로 이동 시킴\n", " full_df = pd.concat([full_df[label_column], full_df.drop(columns=[label_column])], axis=1)\n", " \n", " ############################### \n", " # 훈련, 테스트 데이터 세트로 분리 및 저장\n", " ###############################\n", " \n", " train_df, test_df = split_train_test(full_df, test_ratio=split_rate) \n", " train_df.to_csv(f\"{base_output_dir}/dataset/train.csv\", index=False)\n", " test_df.to_csv(f\"{base_output_dir}/dataset/test.csv\", index=False) \n", "\n", " logger.info(f\"preprocessed train shape \\n {train_df.shape}\") \n", " logger.info(f\"preprocessed test shape \\n {test_df.shape}\") \n", "\n", " # logger.info(f\"preprocessed train path \\n {base_output_dir}/train/train.csv\")\n", " logger.info(f\"\\n ### Final result for train dataset \") \n", " logger.info(f\"preprocessed train sample \\n {train_df.head(2)}\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.sklearn.processing import SKLearnProcessor\n", "from sagemaker.processing import ProcessingInput, ProcessingOutput\n", "\n", "framework_version = \"0.23-1\"\n", "processing_instance_type = \"ml.m5.xlarge\"\n", "processing_instance_count = 1\n", "split_rate = 0.1\n", "\n", "sklearn_processor = SKLearnProcessor(\n", " framework_version = framework_version,\n", " instance_type = processing_instance_type,\n", " instance_count = processing_instance_count,\n", " base_job_name = \"sklearn-fraud-process\",\n", " role = role,\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sklearn_processor.run(\n", " code = \"src/preprocess.py\",\n", " inputs = [ProcessingInput(source = s3_data_path, destination = \"/opt/ml/processing/input\")],\n", " outputs = [ProcessingOutput(output_name = \"dataset\",\n", " source = \"/opt/ml/processing/output/dataset\")],\n", " arguments = ['--split_rate', f\"{split_rate}\"]\n", ")\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sklearn_processor.latest_job.outputs[0].destination" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sagemaker.s3.S3Downloader.download(s3_uri = sklearn_processor.latest_job.outputs[0].destination,\n", " local_path = '../data/preprocessed')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "preprocessed_train_path = '../data/preprocessed/train.csv'\n", "preprocessed_train_df = pd.read_csv(preprocessed_train_path)\n", "preprocessed_train_df.head()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# 모델 빌딩 파이프라인의 스텝 생성\n", "## 모델 빌딩 파이프라인의 파라미터 정의\n", "파이프라인에서 사용 할 파라미터를 정의합니다. 파이프라인을 실행할 때 파라미터를 이용하여 실행 조건을 커스터마이징 할 수 있습니다.\n", "지원하는 파라미터 타입은 다음과 같습니다.\n", "\n", "* `ParameterString` - 파이썬 타입에서 `str` \n", "* `ParameterInteger` - 파이썬 타입에서 `int` \n", "* `ParameterFloat` - 파이썬 타입에서 `float` \n", "\n", "디폴트 값을 지정할 수 있으며, 파이프라인 실행 시 재지정 할 수도 있습니다." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.workflow.parameters import (\n", " ParameterInteger,\n", " ParameterString,\n", ")\n", "\n", "processing_instance_count = ParameterInteger(\n", " name=\"ProcessingInstanceCount\",\n", " default_value=1\n", ")\n", "processing_instance_type = ParameterString(\n", " name=\"ProcessingInstanceType\",\n", " default_value=\"ml.m5.xlarge\"\n", ")\n", "input_data = ParameterString(\n", " name=\"InputData\",\n", " default_value=s3_data_path,\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 전처리 스텝 프로세서 정의" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# !pip install --upgrade sagemaker" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import sagemaker\n", "sagemaker.__version__" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.sklearn.processing import SKLearnProcessor\n", "\n", "framework_version = \"0.23-1\"\n", "processing_instance_type = \"ml.m5.xlarge\"\n", "\n", "sklearn_processor = SKLearnProcessor(\n", " framework_version=framework_version,\n", " instance_type=processing_instance_type,\n", " instance_count=processing_instance_count,\n", " base_job_name=\"sklearn-fraud-process\",\n", " role=role,\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# 전처리 스텝 정의\n", "Processing Step에서는 다음과 같은 인자들을 지정합니다.\n", "* name: 스텝 명\n", "* processor: 전처리 프로세서\n", "* inputs: 입력 데이터 S3 경로\n", "* outputs: 처리결과가 저장 될 Docker 안의 경로\n", "* job arguments: 사용자 정의 인자\n", "* code: 전처리 코드 경로\n", "\n", "보다 자세한 내용은 링크를 확인하세요\n", "[처리 단계, Processing Step](https://docs.aws.amazon.com/ko_kr/sagemaker/latest/dg/build-and-manage-steps.html#step-type-processing)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.processing import ProcessingInput, ProcessingOutput\n", "from sagemaker.workflow.steps import ProcessingStep\n", " \n", "\n", "step_process = ProcessingStep(\n", " name=\"Fraud-Preprocessing\",\n", " processor=sklearn_processor,\n", " code= \"src/preprocess.py\",\n", " inputs = [ProcessingInput(source = s3_data_path, destination = \"/opt/ml/processing/input\")],\n", " outputs = [ProcessingOutput(output_name = \"dataset\",\n", " source = \"/opt/ml/processing/output/dataset\")],\n", " job_arguments = ['--split_rate', f\"{split_rate}\"]\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# 파이프라인 정의 및 실행\n", "생성한 단계들을 하나의 파이프라인으로 조합하고 실행할 수 있습니다.\n", "파이프라인은 name, parapeters, steps 인자가 필수로 필요합니다. \n", "여기서 파이프라인의 name은 Account와 Region에 대해 유일해야 합니다.\n", "\n", "주의사항:\n", "- 정의에 사용한 모든 파라미터가 존재해야 합니다.\n", "- 파이프라인으로 전달된 단계(step)들은 실행순서와는 무관합니다. SageMaker Pipeline은 단계가 실행되고 완료될 수 있도록 의존관계를를 해석합니다.\n", "- [알림] 정의한 stpes 이 복수개이면 복수개를 기술합니다. 만약에 step 간에 의존성이 있으면, 명시적으로 기술하지 않아도 같이 실행 됩니다." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.workflow.pipeline import Pipeline\n", "\n", "pipeline_name = pipeline_prefix\n", "pipeline = Pipeline(\n", " name=pipeline_name,\n", " parameters=[\n", " processing_instance_type, \n", " processing_instance_count,\n", " input_data,\n", " ],\n", " steps=[step_process],\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import json\n", "\n", "definition = json.loads(pipeline.definition())\n", "definition" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 파이프라인 정의를 등록하고 실행하기" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pipeline.upsert(role_arn=role)\n", "execution = pipeline.start()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "execution.describe()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "execution.wait()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "execution.list_steps()" ] } ], "metadata": { "instance_type": "ml.t3.medium", "kernelspec": { "display_name": "Python 3 (Data Science)", "language": "python", "name": "python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:ap-northeast-2:806072073708:image/datascience-1.0" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.10" } }, "nbformat": 4, "nbformat_minor": 4 }