{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# 離反予測を用いた SageMaker Pipelines の ML パイプライン構築\n", "\n", "※ SageMaker **Studio** Notebook を前提としており、カーネルは Python3(Data Science) をご利用ください。 \n", "※ confirms this notebook works under data science kernel on SageMaker **Studio** notebook\n", "\n", "## シナリオ\n", "\n", "電話回線の離反データセット(回線ごとのデータと離反した/しなかったの結果のデータセット)を使って、\n", "SageMaker Pipelines を用いたML パイプラインを構築します。\n", "データの詳細については[こちら](https://github.com/aws-samples/amazon-sagemaker-examples-jp/blob/master/xgboost_customer_churn/xgboost_customer_churn.ipynb)に詳細があります。 \n", "\n", "5000 行の 元データを 1666 行ずつ 3 分割し、それぞれ 1 日目に入手するデータ、 2 日目に入手するデータ、 3 日目に入手するデータと仮定し、\n", "* 1 日目は今あるデータを SageMaker Processing, Training をそれぞれ手動で動かす。\n", "* 2 日目は 1 日目のデータに加えて、 2 日目に手に入ったデータも利用して学習し、 1 日目と 2 日目のモデルを比較して、2 日目のほうが精度がよければ 2 日目のモデルを登録(create_model)するのを、パイプラインを構築して実行する。\n", "* 3 日目は 1 日目と 2 日目のデータに加えて、3 日目に手に入ったデータも利用して学習し、2 日目と 3 日目のモデルを比較して、3 日目のほうが精度がよければ 3 日目のモデルを登録(create_model)するのを、2 日目に作成したパイプラインのパラメータだけを変更して実行する。\n", "* 最後には精度が変わらなかった時のテストとして、同じデータで学習し、パイプラインで精度が変わらなかった場合は新たにモデルが作成(create_model)されていないことをGUIから実行して確認する" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import os, json, sagemaker, pandas as pd, numpy as np\n", "from sklearn.model_selection import train_test_split\n", "from sagemaker.sklearn.processing import SKLearnProcessor\n", "from sagemaker.processing import ProcessingInput, ProcessingOutput, ScriptProcessor\n", "from sagemaker.inputs import TrainingInput\n", "from sagemaker import get_execution_role\n", "from sagemaker.workflow.parameters import ParameterInteger, ParameterString\n", "from sagemaker.workflow.properties import PropertyFile\n", "from sagemaker.workflow.steps import ProcessingStep,TrainingStep\n", "from sagemaker.estimator import Estimator\n", "from sagemaker.model import Model\n", "from sagemaker.inputs import CreateModelInput\n", "from sagemaker.workflow.steps import CreateModelStep\n", "from sagemaker.transformer import Transformer\n", "from sagemaker.workflow.conditions import ConditionEquals\n", "from sagemaker.workflow.condition_step import ConditionStep, JsonGet\n", "from sagemaker.workflow.pipeline import Pipeline" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## データ準備 \n", "T/F(離反した、してない) の割合が変わらないように、5000 行のデータを 3 分割する。" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# データをダウンロード\n", "![ -e churn.txt ] && rm churn.txt\n", "!aws s3 cp s3://sagemaker-sample-files/datasets/tabular/synthetic/churn.txt ./" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# 使用するデータを確認\n", "df = pd.read_csv('./churn.txt')\n", "df.head()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# データを分割する際、離反データが偏らないように、離反したデータと離反しなかったデータを分けて分割する\n", "df_true = df[df['Churn?']=='True.'].reset_index()\n", "df_false = df[df['Churn?']=='False.'].reset_index()\n", "df_true = df_true.drop(['index'],axis=1)\n", "df_false = df_false.drop(['index'],axis=1)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# 分割前にシャッフルする\n", "df_true_shuffle = df_true.sample(frac=1, random_state=42)\n", "df_false_shuffle = df_false.sample(frac=1, random_state=42)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# 3分割する\n", "split_num = 3\n", "split_df_list = []\n", "for i in range(split_num):\n", " idx_min_true,idx_max_true = i*len(df_true)//split_num,(i+1)*len(df_true)//split_num\n", " idx_min_false,idx_max_false = i*len(df_false)//split_num,(i+1)*len(df_false)//split_num\n", " tmp_df = pd.concat([df_true[idx_min_true:idx_max_true],df_false[idx_min_false:idx_max_false]],axis=0)\n", " split_df_list.append(tmp_df)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# 分割ファイルをローカルに出力する\n", "RAWDATA_DIR = './raw_data/'\n", "os.makedirs(f'{RAWDATA_DIR}/', exist_ok=True)\n", "local_csvfile_list = []\n", "for i,split_df in enumerate(split_df_list):\n", " file_name = f'{RAWDATA_DIR}day_{str(i+1)}.csv'\n", " split_df.to_csv(file_name,index=False)\n", " local_csvfile_list.append(file_name)\n", "print(*local_csvfile_list)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 1 日目 : 前処理、学習を手作業で行う\n", "### 前処理\n", "前処理は[こちら](https://github.com/aws-samples/amazon-sagemaker-examples-jp/blob/master/xgboost_customer_churn/xgboost_customer_churn.ipynb)と同じことを SageMaker Processing で行う。コンテナは scikit-learn のビルトインコンテナを利用する" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Processor 定義\n", "ROLE = get_execution_role()\n", "# この PIPELINE_NAME 変数は後ほどパイプラインを作成するときに使うが、前処理の名前などにも流用するため、ここで宣言する\n", "PIPELINE_NAME = 'my-pipeline'\n", "PRE_PROCESS_JOBNAME = f'{PIPELINE_NAME}-pre-process'\n", "sklearn_processor = SKLearnProcessor(\n", " base_job_name = PRE_PROCESS_JOBNAME,\n", " framework_version='0.23-1',\n", " role=ROLE,\n", " instance_type='ml.m5.xlarge',instance_count=1\n", ")\n", "\n", "BUCKET = sagemaker.session.Session().default_bucket()\n", "RAWDATA_SUB_PREFIX = RAWDATA_DIR.replace('./','').replace('/','')\n", "RAWDATA_S3_URI = f's3://{BUCKET}/{PIPELINE_NAME}-{RAWDATA_SUB_PREFIX}'\n", "\n", "# input 定義\n", "# 3 分割したファイルのうちの一つを\n", "rawcsv_s3_uri = sagemaker.s3.S3Uploader.upload(local_csvfile_list[0],RAWDATA_S3_URI)\n", "PRE_PROCESS_RAW_DATA_INPUT_DIR = '/opt/ml/processing/input/raw_data'\n", "\n", "# output 定義\n", "PRE_PROCESS_TRAIN_OUTPUT_DIR = '/opt/ml/processing/output/train'\n", "PRE_PROCESS_VALID_OUTPUT_DIR = '/opt/ml/processing/output/valid'\n", "PRE_PROCESS_TEST_OUTPUT_DIR = '/opt/ml/processing/output/test'\n", "\n", "sklearn_processor.run(\n", " code='./preprocess/preprocess.py',\n", " # ProcessingInput は指定したものを全て S3 から processing インスタンスにコピーされる。 Destination でコピー先を指定できる。\n", " inputs=[\n", " ProcessingInput( \n", " source=rawcsv_s3_uri,\n", " destination=PRE_PROCESS_RAW_DATA_INPUT_DIR\n", " ),\n", " ],\n", " # processing インスタンスの source にあるものを全て S3 に格納する。(processing インスタンス側でこのディレクトリは自動で作成される)\n", " outputs=[\n", " ProcessingOutput(\n", " output_name = 'train',\n", " source=PRE_PROCESS_TRAIN_OUTPUT_DIR,\n", " ),\n", " ProcessingOutput(\n", " output_name = 'valid',\n", " source=PRE_PROCESS_VALID_OUTPUT_DIR,\n", " ),\n", " ProcessingOutput(\n", " output_name = 'test',\n", " source=PRE_PROCESS_TEST_OUTPUT_DIR,\n", " )\n", " ],\n", " # processing インスタンスのどこに csv ファイルが配置されたか、どこにファイルを出力すればよいのか、を\n", " # コードに渡すための引数\n", " arguments=[\n", " '--raw-data-input-dir',PRE_PROCESS_RAW_DATA_INPUT_DIR,\n", " '--train-output-dir',PRE_PROCESS_TRAIN_OUTPUT_DIR,\n", " '--valid-output-dir',PRE_PROCESS_VALID_OUTPUT_DIR,\n", " '--test-output-dir',PRE_PROCESS_TEST_OUTPUT_DIR,\n", " ]\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 学習\n", "xgboost を利用する。ハイパーパラメータは[こちら](https://github.com/aws-samples/amazon-sagemaker-examples-jp/blob/master/xgboost_customer_churn/xgboost_customer_churn.ipynb)と同じにして SageMaker Training で行う。 \n", "コンテナは xgboost のビルトインコンテナを利用する" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# 前処理結果の S3 URI を取得する\n", "train_csv_s3_uri = sklearn_processor.latest_job.describe()['ProcessingOutputConfig']['Outputs'][0]['S3Output']['S3Uri'] + '/train.csv'\n", "valid_csv_s3_uri = sklearn_processor.latest_job.describe()['ProcessingOutputConfig']['Outputs'][1]['S3Output']['S3Uri'] + '/valid.csv'\n", "test_csv_s3_uri = sklearn_processor.latest_job.describe()['ProcessingOutputConfig']['Outputs'][2]['S3Output']['S3Uri'] + '/test.csv'\n", "print(train_csv_s3_uri)\n", "print(valid_csv_s3_uri)\n", "print(test_csv_s3_uri)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "CONTENT_TYPE='text/csv'\n", "train_s3_input = TrainingInput(train_csv_s3_uri, content_type=CONTENT_TYPE)\n", "valid_s3_input = TrainingInput(valid_csv_s3_uri, content_type=CONTENT_TYPE)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "XGB_CONTAINER_URI = sagemaker.image_uris.retrieve(\"xgboost\", sagemaker.session.Session().boto_region_name, \"1.2-1\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "TRAIN_JOBNAME = f'{PIPELINE_NAME}-train'\n", "MODEL_S3_URI = f's3://{BUCKET}/{TRAIN_JOBNAME}'\n", "HYPERPARAMETERS = {\n", " \"max_depth\":\"5\",\n", " \"eta\":\"0.2\",\n", " \"gamma\":\"4\",\n", " \"min_child_weight\":\"6\",\n", " \"subsample\":\"0.8\",\n", " \"objective\":\"binary:logistic\",\n", " \"num_round\":\"100\"\n", "}\n", "xgb = Estimator(\n", " XGB_CONTAINER_URI,\n", " ROLE,\n", " base_job_name = TRAIN_JOBNAME,\n", " hyperparameters=HYPERPARAMETERS,\n", " instance_count=1, \n", " instance_type='ml.m5.xlarge',\n", " output_path = MODEL_S3_URI\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "xgb.fit({'train': train_s3_input, 'validation': valid_s3_input})" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "INITIAL_MODEL_S3_URI = xgb.model_data" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 2 日目 : パイプラインを作成する\n", "新しくデータが入ってくるので、追加データも併せてモデルを学習しなおして精度を確認し、精度が上がっていたらモデルを作成しておきます。 \n", "1 日目とほぼ同じことをやるので、パイプラインを作成して省力化します。 \n", "前日に追加する処理として、1 日目のデータで学習したモデルと 2 日目のデータを追加して学習したモデルで精度を比較し、精度が上がっていたらモデル作成、のオペレーションを追加します。" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# helper 関数\n", "# pipeline で利用する名前は camel case を使うのが一般的なので、区切り文字を削除し、頭を大文字にする関数を準備\n", "def to_camel(s_v:str,s_s:str)->str:\n", " '''\n", " s_v: camel_case に変えたい文字\n", " s_s: 区切り文字\n", " '''\n", " return ''.join(word.title() for word in s_v.split(s_s))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "二日目のデータを S3 にアップロードして、その S3 URI 取得ておきます。" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "rawcsv_s3_uri = sagemaker.s3.S3Uploader.upload(local_csvfile_list[1],RAWDATA_S3_URI)\n", "print(rawcsv_s3_uri)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Step 定義\n", "パイプラインで実行するには、実行させたい処理を全て [Step](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.steps.Step) で定義する必要があります。 \n", "前処理、学習、後処理、精度評価結果分岐、モデル作成について Step を定義していきます。 \n", "まずは前処理の Step を [ProcessingStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.steps.ProcessingStep) で定義します。 \n", "また、3 日目以降の実行も見据えて、 3 日目の実行で変わるところは [Parameter](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.steps.ProcessingStep) を定義しておき、パイプライン実行時に設定できるようにしておきます。 \n", "\n", "#### 前処理ステップ\n", "Parameters は文字列なのか数値なのかで使用するものが変わります。 \n", "今回は文字列で定義する箇所が 3 日以降実行時に変更されるので、[ParameterString](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.parameters.ParameterString) を使用しますが、整数なら [ParameterInteger](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.parameters.ParameterInteger), 浮動小数ならば [ParameterFloat](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.parameters.ParameterFloat) を利用します。 \n", "それぞれデフォルト値が必須ですので、 1 日目の前処理結果の S3 URI を `xxx_csv_s3_uri` で受けていたので、それらを利用します。" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# 前処理ステップ定義\n", "\n", "sklearn_processor = SKLearnProcessor(\n", " base_job_name = PRE_PROCESS_JOBNAME,\n", " framework_version='0.23-1',\n", " role=ROLE,\n", " instance_type='ml.m5.xlarge',instance_count=1\n", ")\n", "\n", "rawcsv_s3_uri_param = ParameterString(name='RawCsvS3UriParam',default_value=rawcsv_s3_uri)\n", "\n", "PRE_PROCESSED_TRAIN_DATA_INPUT_DIR = '/opt/ml/processing/input/train'\n", "PRE_PROCESSED_VALID_DATA_INPUT_DIR = '/opt/ml/processing/input/valid'\n", "PRE_PROCESSED_TEST_DATA_INPUT_DIR = '/opt/ml/processing/input/test'\n", "\n", "# Pipeline 実行時に渡すパラメータ設定\n", "# 名前はキャメルケース\n", "pre_processed_train_data_s3_uri_param = ParameterString(name='PreProcessedTrainDataS3UriParam',default_value=train_csv_s3_uri)\n", "pre_processed_valid_data_s3_uri_param = ParameterString(name='PreProcessedValidDataS3UriParam',default_value=valid_csv_s3_uri)\n", "pre_processed_test_data_s3_uri_param = ParameterString(name='PreProcessedTestDataS3UriParam',default_value=test_csv_s3_uri)\n", "\n", "# 前処理を実行する ProcessingStep 定義\n", "pre_process_step = ProcessingStep(\n", " code='./preprocess/preprocess.py',\n", " name=f'{to_camel(PRE_PROCESS_JOBNAME,\"-\")}Step',\n", " processor=sklearn_processor,\n", " inputs=[\n", " ProcessingInput(\n", " source=rawcsv_s3_uri_param, # Parameter を渡します。翌日以降はまた URI が変わるため。\n", " destination=PRE_PROCESS_RAW_DATA_INPUT_DIR\n", " ),\n", " ProcessingInput(\n", " source=pre_processed_train_data_s3_uri_param, # Parameter を渡します。翌日以降はまた URI が変わるため。\n", " destination=PRE_PROCESSED_TRAIN_DATA_INPUT_DIR\n", " ),\n", " ProcessingInput(\n", " source=pre_processed_valid_data_s3_uri_param, # Parameter を渡します。翌日以降はまた URI が変わるため。\n", " destination=PRE_PROCESSED_VALID_DATA_INPUT_DIR\n", " ),\n", " ProcessingInput(\n", " source=pre_processed_test_data_s3_uri_param, # Parameter を渡します。翌日以降はまた URI が変わるため。\n", " destination=PRE_PROCESSED_TEST_DATA_INPUT_DIR\n", " ),\n", " ],\n", " outputs=[\n", " ProcessingOutput(\n", " output_name = 'train',\n", " source=PRE_PROCESS_TRAIN_OUTPUT_DIR,\n", " ),\n", " ProcessingOutput(\n", " output_name = 'valid',\n", " source=PRE_PROCESS_VALID_OUTPUT_DIR,\n", " ),\n", " ProcessingOutput(\n", " output_name = 'test',\n", " source=PRE_PROCESS_TEST_OUTPUT_DIR,\n", " )\n", " ],\n", " job_arguments=[\n", " '--raw-data-input-dir',PRE_PROCESS_RAW_DATA_INPUT_DIR,\n", " '--pre-processed-train-data-input-dir',PRE_PROCESSED_TRAIN_DATA_INPUT_DIR,\n", " '--pre-processed-valid-data-input-dir',PRE_PROCESSED_VALID_DATA_INPUT_DIR,\n", " '--pre-processed-test-data-input-dir',PRE_PROCESSED_TEST_DATA_INPUT_DIR,\n", " '--train-output-dir',PRE_PROCESS_TRAIN_OUTPUT_DIR,\n", " '--valid-output-dir',PRE_PROCESS_VALID_OUTPUT_DIR,\n", " '--test-output-dir',PRE_PROCESS_TEST_OUTPUT_DIR,\n", " '--merge'\n", " ]\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### 学習用ステップ\n", "学習用の Step である [TraningStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.steps.TrainingStep) を定義します。 \n", "学習の入力には、先程の前処理の結果を使いますが、先程 `ProcessingStep` で定義した `pre_process_step` インスタンスにあるプロパティから、前段の処理の出力の URI を連携できるので、それを利用します。" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# 学習ステップ定義\n", "xgb = Estimator(\n", " XGB_CONTAINER_URI,\n", " ROLE,\n", " base_job_name = TRAIN_JOBNAME,\n", " hyperparameters=HYPERPARAMETERS,\n", " instance_count=1, \n", " instance_type='ml.m5.xlarge',\n", " output_path = MODEL_S3_URI\n", ")\n", "\n", "train_step = TrainingStep(\n", " name=f'{to_camel(TRAIN_JOBNAME,\"-\")}Step',\n", " estimator=xgb,\n", " inputs={\n", " \"train\": TrainingInput(\n", " s3_data=pre_process_step.properties.ProcessingOutputConfig.Outputs[\"train\"].S3Output.S3Uri, # 前の処理の output_name が 'train' の出力 S3 URI を利用\n", " content_type=CONTENT_TYPE\n", " ),\n", " \"validation\": TrainingInput(\n", " s3_data=pre_process_step.properties.ProcessingOutputConfig.Outputs[\"valid\"].S3Output.S3Uri, # 前の処理の output_name が 'valid' の出力 S3 URI を利用\n", " content_type=CONTENT_TYPE\n", " )\n", " },\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### 後処理用ステップ\n", "後処理で前回学習したモデルと今回学習したモデルの性能評価を AUC で行います。\n", "評価用スクリプトは予め `./postprocessing/postprocess.py` に格納してあります。 \n", "後処理用のステップにも ProcessingStep を利用します。2 つのモデルを評価する後処理ステップを定義しますが、 \n", "評価ステップの入力は、1. 前回の学習で出来上がったモデルと、2. 今回の学習で出来上がったモデルの S3 URI と、3. 評価用データです。 \n", "それぞれ下記の通りに入力を定義します。\n", "1. 前回学習したモデルは毎回変わるため、 `ParameterString` で Parameter として設定しておきます。デフォルト値が必須ですが、 1 日目に学習したモデルの URI を `INITIAL_MODEL_S3_URI = xgb.model` で格納していたので、それを利用します。 \n", "1. 今回の学習で出来上がったモデルの S3 URI は 学習ステップ時に前処理ステップの結果を取得したのと同様に、前段の `train_step` プロパティから取得できるのでそれを利用します。\n", "1. 評価用データは前処理の結果の `test.csv` を利用するので、前処理ステップの `pre_process_step` のプロパティから取得します。" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "lasttime_train_model_s3_uri_param = ParameterString(name='LasttimeTrainModelS3UriParam',default_value=INITIAL_MODEL_S3_URI)\n", "\n", "POST_PROCESS_JOBNAME = f'{PIPELINE_NAME}-post-process'\n", "POST_PROCESS_INPUT_DATA_DIR = '/opt/ml/processing/input/data'\n", "POST_PROCESS_OUTPUT_DIR = '/opt/ml/processing/output'\n", "POST_PROCESS_THISTIME_TRAIN_MODEL_DIR = '/opt/ml/processing/input/thistime_train_model'\n", "POST_PROCESS_LASTTIME_TRAIN_MODEL_DIR = '/opt/ml/processing/input/lasttime_train_model'\n", "EVALUATION_FILE = 'evaluation.json'\n", "eval_processor = ScriptProcessor(\n", " base_job_name = f'{POST_PROCESS_JOBNAME}',\n", " image_uri=XGB_CONTAINER_URI,\n", " command=['python3'],\n", " instance_type='ml.m5.xlarge',\n", " instance_count=1,\n", " role=ROLE,\n", ")\n", "eval_report = PropertyFile(\n", " name='EvaluationReport',\n", " output_name='Evaluation',\n", " path=EVALUATION_FILE\n", ")\n", "\n", "eval_step = ProcessingStep(\n", " code='./postprocess/postprocess.py',\n", " name=f'{to_camel(POST_PROCESS_JOBNAME,\"-\")}EvalStep',\n", " processor=eval_processor,\n", " inputs=[\n", " ProcessingInput(\n", " source=pre_process_step.properties.ProcessingOutputConfig.Outputs['test'].S3Output.S3Uri,\n", " destination=POST_PROCESS_INPUT_DATA_DIR\n", " ),\n", " ProcessingInput(\n", " source=train_step.properties.ModelArtifacts.S3ModelArtifacts,\n", " destination=POST_PROCESS_THISTIME_TRAIN_MODEL_DIR\n", " ),\n", " ProcessingInput(\n", " source=lasttime_train_model_s3_uri_param,\n", " destination=POST_PROCESS_LASTTIME_TRAIN_MODEL_DIR\n", " ),\n", " ],\n", " outputs=[\n", " ProcessingOutput(\n", " output_name='Evaluation',\n", " source=POST_PROCESS_OUTPUT_DIR\n", " ),\n", " ],\n", " property_files=[eval_report],\n", " job_arguments=[\n", " '--input-data-dir',POST_PROCESS_INPUT_DATA_DIR,\n", " '--thistime-train-model-dir',POST_PROCESS_THISTIME_TRAIN_MODEL_DIR,\n", " '--lasttime-train-model-dir',POST_PROCESS_LASTTIME_TRAIN_MODEL_DIR,\n", " '--output-dir',POST_PROCESS_OUTPUT_DIR,\n", " '--output-file',EVALUATION_FILE\n", " ]\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### モデル作成ステップ\n", "評価後、前回の学習した結果より AUC がよかったら、今回の学習結果を SageMaker にそのモデルを作成します。 \n", "前回より AUC がよかったら、の処理についてはこの後記述します。分岐処理もステップで表現しますが、分岐ステップに分岐後の step を登録する必要があるためです。 \n", "[CreateModelStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.steps.CreateModelStep) で定義します。入力はもちろん学習ステップのプロパティから取得したモデルの S3 URI です。\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "model = Model(\n", " image_uri=XGB_CONTAINER_URI,\n", " model_data=train_step.properties.ModelArtifacts.S3ModelArtifacts,\n", " sagemaker_session=sagemaker.session.Session(),\n", " role=ROLE,\n", ")\n", "model_inputs = CreateModelInput(\n", " instance_type=\"ml.m5.large\",\n", ")\n", "create_model_step = CreateModelStep(\n", " name=f'{to_camel(PIPELINE_NAME,\"-\")}CreateModelStep',\n", " model=model,\n", " inputs=model_inputs,\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### 分岐ステップ\n", "前回の学習より今回の学習のほうが AUC がよかった場合の分岐ステップを記述します。 \n", "[後処理スクリプト](./postprocess/postprocess.py)で、 `evaluation.json` を出力するよう記述してあり、 jsonの中にそれぞれのモデルの AUC と、今回の学習のほうがよかった場合は 1 、悪かった場合は 0 を残す、 `classification_metrics.model_change` キーを残しているので、 \n", "その値が 1 だった場合は、`True` を返す `cond_e` を [ConditionEquals](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.conditions.ConditionEquals)で定義します。\n", "\n", "作成した `cond_e` を [ConditionStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.condition_step.ConditionStep) の `if_steps` に登録します。 \n", "`if_steps` はリストを受け取りますが、 and 条件なので気をつけてください。 \n", "また、条件が False だった場合の挙動を記述したい場合は `else_step` を記述してください。" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "cond_e = ConditionEquals(\n", " left=JsonGet(\n", " step=eval_step,\n", " property_file=eval_report,\n", " json_path=\"classification_metrics.model_change\",\n", " ),\n", " right=1,\n", ")\n", "\n", "cond_step = ConditionStep(\n", " name=f'{to_camel(PIPELINE_NAME,\"-\")}ConditionStep',\n", " conditions=[cond_e],\n", " if_steps=[create_model_step],\n", " else_steps=[], \n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### パイプライン生成\n", "最後に今まで作成した parameters と step をつなげてパイプラインを [Pipeline](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.condition_step.ConditionStep) で生成します。 \n", "pipeline インスタンスの `start ` メソッドでパイプラインを実行できます。parameter のデフォルト値で実行します。" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pipeline = Pipeline(\n", " name=to_camel(PIPELINE_NAME,\"-\"),\n", " parameters=[\n", " rawcsv_s3_uri_param,\n", " pre_processed_train_data_s3_uri_param,\n", " pre_processed_valid_data_s3_uri_param,\n", " pre_processed_test_data_s3_uri_param,\n", " lasttime_train_model_s3_uri_param\n", " ],\n", " steps=[\n", " pre_process_step,\n", " train_step,\n", " eval_step,\n", " cond_step,\n", " ],\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# パイプライン定義を確認\n", "print(json.loads(pipeline.definition()))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# パイプラインを SageMaker Pipelines に登録\n", "pipeline.upsert(role_arn=ROLE)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# パイプライン実行。非同期処理\n", "execution = pipeline.start()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# 実行しているパイプラインの中身を表示\n", "execution.describe()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# パイプラインが完了するまで待つ\n", "execution.wait()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 3 日目 : 作成済のパイプラインにパラメータを与えて動かす\n", "2 日目に作成したパイプライン作成工数の投資回収をします。すでに構築済のパイプラインを呼び出して、 3 日目用のパラメータを与えて実行します。 \n", "最初に 3 日目のデータを S3 にアップロードします。" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "rawcsv_s3_uri = sagemaker.s3.S3Uploader.upload(local_csvfile_list[2],RAWDATA_S3_URI)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 構築済パイプラインの呼び出し\n", "構築済のパイプラインを呼び出すのは [sagemaker.workflow.pipeline.Pipeline](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.pipeline.Pipeline)を利用し、作成済のパイプライン名を `name` 引数に指定します。 \n", "パイプライン構築時と同じクラスですが、 `steps` や `parameters` 引数を指定せずに、名前だけ構築済のものを指定すれば、構築済のパイプラインを呼び出せます" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "built_pipeline = Pipeline(\n", " name=to_camel(PIPELINE_NAME,\"-\"), # 存在しているパイプライン名\n", ")\n", "print(built_pipeline.describe())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 入力パラメータの確認\n", "引き渡すパラメータを確認します。describe メソッドの結果から Parameters キーを参照します。" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "for param in json.loads(built_pipeline.describe()['PipelineDefinition'])['Parameters']:\n", " print(param)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 入力パラメータの設定\n", "* `RawCsvS3Param` : 先程アップロードした 3 日目の csv の S3 URI を指定します。\n", "* `PreProcessedXXXXXDataS3UriParam` : 先程の前処理した結果の URI を boto3 を利用して取得します。\n", "* `LasttimeTrainModelS3UriParam` : 先程学習したモデルの URI を boto3 を利用して取得します。" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# 先程のパイプライン実行でできたモデルの S3 URI を取得する\n", "import boto3\n", "client = boto3.client('sagemaker')\n", "lasttime_train_model_s3_uri=''\n", "for step in execution.list_steps():\n", " if step['StepName'] == f'{to_camel(TRAIN_JOBNAME,\"-\")}Step':\n", " last_train_arn = step['Metadata']['TrainingJob']['Arn']\n", "lasttime_train_model_s3_uri = client.describe_training_job(TrainingJobName=last_train_arn.split('/')[-1])['ModelArtifacts']['S3ModelArtifacts']\n", "print(lasttime_train_model_s3_uri)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# 先程のパイプライン実行でできた整形済の train, valid, testデータの S3 URI を取得する(3 日目のデータをマージする)\n", "pre_processed_train_data_s3_uri = ''\n", "pre_processed_valid_data_s3_uri = ''\n", "pre_processed_test_data_s3_uri = ''\n", "\n", "last_processing_arn = ''\n", "for step in execution.list_steps():\n", " if step['StepName'] == f'{to_camel(PRE_PROCESS_JOBNAME,\"-\")}Step':\n", " last_processing_arn = step['Metadata']['ProcessingJob']['Arn']\n", "\n", "for output in client.describe_processing_job(ProcessingJobName=last_processing_arn.split('/')[-1])['ProcessingOutputConfig']['Outputs']:\n", " if output['OutputName'] == 'train':\n", " pre_processed_train_data_s3_uri = f'{output[\"S3Output\"][\"S3Uri\"]}/train.csv'\n", " elif output['OutputName'] == 'valid':\n", " pre_processed_valid_data_s3_uri = f'{output[\"S3Output\"][\"S3Uri\"]}/valid.csv'\n", " elif output['OutputName'] == 'test':\n", " pre_processed_test_data_s3_uri = f'{output[\"S3Output\"][\"S3Uri\"]}/test.csv'\n", "print(pre_processed_train_data_s3_uri,pre_processed_valid_data_s3_uri,pre_processed_test_data_s3_uri)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "パラメータを dict 形式に格納します。" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "input_param = {\n", " 'RawCsvS3UriParam': rawcsv_s3_uri,\n", " 'LasttimeTrainModelS3UriParam': lasttime_train_model_s3_uri,\n", " 'PreProcessedTrainDataS3UriParam': pre_processed_train_data_s3_uri,\n", " 'PreProcessedValidDataS3UriParam': pre_processed_valid_data_s3_uri,\n", " 'PreProcessedTestDataS3UriParam': pre_processed_test_data_s3_uri,\n", "}\n", "\n", "print(json.dumps(input_param,indent=4))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### パイプライン実行\n", "あとは paramters 引数に input_param を指定して start するとパイプラインが実行できます。 \n", "一度パイプラインを作成してしまえば、一連のプロセスをすぐに動かすことができます。" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# パイプラインをキック\n", "execution = built_pipeline.start(parameters = input_param)\n", "# パイプラインの完了を待つ\n", "execution.wait()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 最後に if 分岐の False ルート確認と GUI から実行\n", "3 日目と全く同じデータを学習し、3 日と同じモデルで評価します。同じ精度が出ればモデル作成ステップが動かないので、それを利用して分岐ステップが機能しているかを確認しまうs。\n", "パイプラインは GUI でも実行可能なので、 GUI で実行してみます。3 日目のパイプライン実行と入力が一箇所だけ、 `LasttimeTrainModelS3UriParam` だけ変更する必要があるので、3 日目の学習のモデル S3 URI を取得します。" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# 先程のパイプライン実行でできたモデルの S3 URI を取得する\n", "import boto3\n", "client = boto3.client('sagemaker')\n", "lasttime_train_model_s3_uri=''\n", "for step in execution.list_steps():\n", " if step['StepName'] == f'{to_camel(TRAIN_JOBNAME,\"-\")}Step':\n", " last_train_arn = step['Metadata']['TrainingJob']['Arn']\n", "lasttime_train_model_s3_uri = client.describe_training_job(TrainingJobName=last_train_arn.split('/')[-1])['ModelArtifacts']['S3ModelArtifacts']\n", "print(lasttime_train_model_s3_uri)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "先程の `input_json` を書き換えます。" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "input_param['LasttimeTrainModelS3UriParam']=lasttime_train_model_s3_uri" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "下記の出力をテキストエディタなどにコピーしておきます。この中身を GUI に入力します。" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(json.dumps(input_param,indent=4))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "1. 左のペインから三角形の頂点に○がついているアイコンを選択し、表示されるプルダウンで `Pipelines` を選択します。 \n", " ![](media/1.png) \n", "\n", "2. 表示されるパイプライン一覧から作成したパイプライン(このノートブックで特にいじっていなければ `MyPipeline`) をダブルクリックします。 \n", " ![](media/2.png) \n", "\n", "3. パイプラインの実行履歴が表示されます。右上にある `Start an Execution` をクリックして、パイプラインの実行設定を行います。 \n", " ![](media/3.png) \n", "\n", "4. `Name` のテキストボックに任意の実行名を入力し、他のテキストボックスには先程コピーした内容をそれぞれ入力して、 `Start` をクリックします。 \n", " ![](media/4.png) \n", "\n", "5. `Executing` と表示されたパイプラインの実行が作成されます(即座に反映されないことがあるので、表示されない場合は待ってください)。`Execiting` と表示された実行をダブルクリックします。\n", " ![](media/5.png) \n", "\n", "6. 実行の様子がわかります。各ステップを選択すると、ログや artifact を確認できます。\n", " ![](media/6.png) \n", " \n", "7. 実行が完了すると Status が緑に変わります。また、 3 日目と同じモデルが出来上がり、同じ精度が出ているので、`ConditionStep` の結果が `False` になり、 `CreateModelStep` が実行されていないことが確認できます。(今までの実行を確認すると、緑になっています)\n", " ![](media/7.png) \n", "\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## あとかたづけ\n", "パイプラインの削除は パイプラインインスタンスにある delete メソッドを利用して削除します。 \n", "使用したり作成したデータとモデルは S3 に保管されているのでマネジメントコンソールや boto3 などで別途削除してください。" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "built_pipeline.delete()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "instance_type": "ml.t3.medium", "kernelspec": { "display_name": "Python 3 (Data Science)", "language": "python", "name": "python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:ap-northeast-1:102112518831:image/datascience-1.0" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.10" } }, "nbformat": 4, "nbformat_minor": 4 }