{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Bike-Share Demand Forecasting 2b: SageMaker DeepAR Algorithm\n", "\n", "이전 [1_Data_Preparation](1_Data_Preparation.ipynb) 노트북에서 수행한 bike-share 수요 예측 문제를 해결하기 위해 3가지 방법을 살펴봅니다.\n", "\n", "1. AWS \"Managed AI\"서비스 ([Amazon Forecast] (https://aws.amazon.com/forecast/))으로 일반적/규격화된 비즈니스 문제를 다룹니다.\n", "2. SageMaker의 built-in된 알고리즘 ([DeepAR] (https://docs.aws.amazon.com/sagemaker/latest/dg/deepar.html))을 사용하여 1번과 동일한 비즈니스 문제를 다룹니다.\n", "3. custom SageMaker 알고리즘을 사용하여 부가적인 차별적 SageMaker의 기능을 활용하면서 핵심 모델링을 수행합니다.\n", "\n", "\n", "**이 노트북은 SageMaker의 built-in 알고리즘인 DeepAR을 적용하는 방법을 보여줍니다.**\n", "\n", "**This notebook shows how to apply the SageMaker DeepAR built-in algorithm.**" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Dependencies and configuration\n", "\n", "라이브러리를 로딩한 다음, 설정값을 정의하고, AWS SDKs에 연결합니다." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "

첫번째 설치 이후 두번째 실행 시에는 False로 수정 후 진행하세요

" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "install_needed = True # should only be True once\n", "# install_needed = False" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import sys\n", "import IPython\n", "\n", "if install_needed:\n", " print(\"installing deps and restarting kernel\")\n", " !{sys.executable} -m pip install -U pip\n", " !{sys.executable} -m pip install -U sagemaker\n", " IPython.Application.instance().kernel.do_shutdown(True)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Basic data configuration is initialised and stored in the Data Preparation notebook\n", "# ...We just retrieve it here:\n", "%store -r\n", "assert bucket, \"Variable `bucket` missing from IPython store\"\n", "\n", "assert data_prefix, \"Variable `data_prefix` missing from IPython store\"\n", "assert target_train_filename, \"Variable `target_train_filename` missing from IPython store\"\n", "assert target_test_filename, \"Variable `target_test_filename` missing from IPython store\"\n", "assert related_filename, \"Variable `related_filename` missing from IPython store\"\n", "\n", "sm_train_filename = \"train.json\"\n", "sm_test_filename = \"test.json\"\n", "sm_inference_filename = \"predict_input.json\"" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%load_ext autoreload\n", "%autoreload 1\n", "\n", "# Built-Ins:\n", "from datetime import datetime, timedelta\n", "import json\n", "import time\n", "\n", "# External Dependencies:\n", "import boto3\n", "import matplotlib as mpl\n", "import matplotlib.pyplot as plt\n", "import pandas as pd\n", "import sagemaker\n", "\n", "# Local Dependencies:\n", "%aimport util" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sagemaker.__version__" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "session = boto3.Session()\n", "region = session.region_name\n", "smsession = sagemaker.Session()\n", "s3 = session.client(service_name=\"s3\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sm_role_arn = sagemaker.get_execution_role()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Overview" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 1: Determine your algorithm details\n", "\n", "이 문제에 대해 SageMaker DeepAR 알고리즘을 사용하려면, 알고리즘을 구성하고 원하는 형식의 데이터를 제공해야 합니다.\n", "\n", "특히, 일부의 built-in 알고리즘은 [SageMaker Python SDK](https://sagemaker.readthedocs.io/en/stable/index.html)에서 지원합니다.\n", "\n", "```python\n", "# 아래와 같은 방식을 지원합니다.\n", "estimator = sagemaker.KMeans(...)\n", "# 아직 아래 방식은 지원하지 않습니다.\n", "estimator = sagemaker.DeepAR(...)\n", "```\n", "\n", "이런 경우에는 [SageMaker built-in algorithms common parameters doc](https://docs.aws.amazon.com/sagemaker/latest/dg/sagemaker-algo-docker-registry-paths.html)에서 리스트된 것과 같이 container image에 대한 URL을 제공합니다.\n", "\n", "다행히 이 문제의 경우에는 copy/paste 대신에 이미지 경로에서 바로 가져올 수 있는 프로그래밍 방식의 방법이 있습니다. 하지만, 이 알고리즘이 GPU 가속을 지원하는지, 분산 학습이 가능한지, 다양한 입력과 출력 형식 등을 지원하는지에 대한 상세한 내용을 common parameters 문서에서 확인해야 할 필요는 있습니다.\n", "더욱 상세한 내용에 대해서는 [DeepAR algorithm documentation](https://docs.aws.amazon.com/sagemaker/latest/dg/deepar.html)에서 확인하시기 바랍니다." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "training_image = sagemaker.image_uris.retrieve(\n", " \"forecasting-deepar\",\n", " region,\n", ")\n", "print(training_image)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 2: Prepare training data\n", "\n", "common docs나 [DeepAR algorithm docs](https://docs.aws.amazon.com/sagemaker/latest/dg/deepar.html)에서 언급한 것처럼, DeepAR 알고리즘은 JSONLines 또는 Parquet 형식처럼 아래 형식의 학습 데이터가 필요합니다.\n", "\n", "```json\n", "{\"start\": \"2009-11-01 00:00:00\", \"target\": [target timeseries...], \"cat\": [categorical features...], \"dynamic_feat\": [[related TS 1...], [related TS 2...]}\n", "{\"start\": \"2009-11-02 00:00:00\", \"target\": [target timeseries...], \"cat\": [categorical features...], \"dynamic_feat\": [[related TS 1...], [related TS 2...]} ...\n", "```\n", "\n", "우리는 **1_Data_Preparation**에서 생성한 데이터 파일을 다시 가져와서 수행할 예정입니다." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "target_train_df = pd.read_csv(f\"./data/{target_train_filename}\")\n", "target_test_df = pd.read_csv(f\"./data/{target_test_filename}\")\n", "related_df = pd.read_csv(f\"./data/{related_filename}\")\n", "\n", "related_df.head()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "먼저, DeepAR에서는 숫자 형태의 `dynamic_feat`만 허용하므로 기존의 이진 형태의 related 데이터셋 내 필드 값들을 변경해야 합니다." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "related_df[\"holiday\"] = related_df[\"holiday\"].astype(int)\n", "related_df[\"workingday\"] = related_df[\"workingday\"].astype(int)\n", "\n", "related_df.head()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "이전 과정에서 **target** timeseries 를 training과 test 데이터셋으로 분리하였고, 하나의 큰 리스트로 **related** 데이터셋으로 만들었습니다.\n", "\n", "[best practices](https://docs.aws.amazon.com/sagemaker/latest/dg/deepar.html#deepar_best_practices) 문서를 보시면, SageMaker DeepAR은 조금 다른 형태를 원하는 것을 알 수 있습니다.\n", "\n", "* 학습 시 `train` 데이터 셋에는 학습 기간에 대한 *target* + *related* 데이터 \n", "* 학습 시 `test` 데이터 셋에는 학습 및 테스트 기간 포함 전체 *target* + *related* 데이터\n", "* **inference** 데이터셋에는 학습 기간 *target* series와 전체 기간의 *related* series 데이터 필요" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "first_test_ts = target_test_df[\"timestamp\"][0]\n", "related_train_df = related_df[related_df[\"timestamp\"] < first_test_ts]\n", "related_test_df = related_df[related_df[\"timestamp\"] >= first_test_ts]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "training과 test 데이터셋을 target 포맷으로 변경합니다." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# We'll loop through our customer types creating a record for each:\n", "customer_types = target_train_df[\"customer_type\"].unique()\n", "\n", "# Related timeseries are general, not per-custtype, so we can format out here:\n", "dynamic_feats_train = related_train_df.drop(columns=\"timestamp\")\n", "dynamic_feats_train = [dynamic_feats_train[col].to_list() for col in dynamic_feats_train.columns]\n", "dynamic_feats_test = related_test_df.drop(columns=\"timestamp\")\n", "dynamic_feats_test = [dynamic_feats_test[col].to_list() for col in dynamic_feats_test.columns]\n", "\n", "# Training data set (training timestamps only):\n", "train_lines = []\n", "# Test data set (training + test timestamps):\n", "test_lines = []\n", "# Inference data set (training target + full related series):\n", "inference_lines = []\n", "\n", "for customer_type in customer_types:\n", " ctmr_target_train_df = target_train_df[target_train_df[\"customer_type\"] == customer_type]\n", " target_train = ctmr_target_train_df[\"demand\"].to_list()\n", " target_test = target_test_df[target_test_df[\"customer_type\"] == customer_type][\"demand\"].to_list()\n", " \n", " train_lines.append({\n", " \"start\": ctmr_target_train_df[\"timestamp\"].iloc[0],\n", " \"target\": target_train,\n", " \"dynamic_feat\": dynamic_feats_train\n", " })\n", " test_lines.append({\n", " \"start\": ctmr_target_train_df[\"timestamp\"].iloc[0],\n", " \"target\": target_train + target_test,\n", " \"dynamic_feat\": [\n", " dynamic_feats_train[ixf] + dynamic_feats_test[ixf] for ixf in range(len(dynamic_feats_train))\n", " ]\n", " })\n", " inference_lines.append({\n", " \"start\": ctmr_target_train_df[\"timestamp\"].iloc[0],\n", " \"target\": target_train,\n", " \"dynamic_feat\": [\n", " dynamic_feats_train[ixf] + dynamic_feats_test[ixf] for ixf in range(len(dynamic_feats_train))\n", " ]\n", " })" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "*JSON Lines* 형식은 JSON이지만, `[... , ...]` 배열 대신 줄바꿈 형식으로 각 데이터 분리되어 저장됩니다. 따라서, 전체 JSON Lines 파일은 *유효한 JSON 타입이 아니며*, 파일의 각각 라인만이 *유효한 JSON 타입* 입니다.\n", "\n", "JSON Line 형식으로 학습과 테스트 파일을 작성한 다음, S3로 업로드를 합니다.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from pandas import json_normalize\n", "import numpy as np\n", "json_normalize(train_lines)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "np.array(train_lines[0]['target']).shape, np.array(train_lines[0]['dynamic_feat']).shape" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(\"Writing data sets to file...\")\n", "!mkdir -p ./data/smdeepar\n", "\n", "with open(f\"./data/smdeepar/{sm_train_filename}\", \"w\") as f:\n", " for ix in range(len(train_lines)):\n", " if (ix > 0):\n", " f.write(\"\\n\")\n", " f.write(json.dumps(train_lines[ix]))\n", "\n", "with open(f\"./data/smdeepar/{sm_test_filename}\", \"w\") as f:\n", " for ix in range(len(test_lines)):\n", " if (ix > 0):\n", " f.write(\"\\n\")\n", " f.write(json.dumps(test_lines[ix]))\n", "\n", "with open(f\"./data/smdeepar/{sm_inference_filename}\", \"w\") as f:\n", " for ix in range(len(inference_lines)):\n", " if (ix > 0):\n", " f.write(\"\\n\")\n", " f.write(json.dumps(inference_lines[ix]))\n", "\n", "print(\"Uploading dataframes to S3...\")\n", "s3.upload_file(\n", " Filename=f\"./data/smdeepar/{sm_train_filename}\",\n", " Bucket=bucket,\n", " Key=f\"{data_prefix}smdeepar/{sm_train_filename}\"\n", ")\n", "print(f\"s3://{bucket}/{data_prefix}smdeepar/{sm_train_filename}\")\n", "s3.upload_file(\n", " Filename=f\"./data/smdeepar/{sm_test_filename}\",\n", " Bucket=bucket,\n", " Key=f\"{data_prefix}smdeepar/{sm_test_filename}\"\n", ")\n", "print(f\"s3://{bucket}/{data_prefix}smdeepar/{sm_test_filename}\")\n", "s3.upload_file(\n", " Filename=f\"./data/smdeepar/{sm_inference_filename}\",\n", " Bucket=bucket,\n", " Key=f\"{data_prefix}smdeepar/{sm_inference_filename}\"\n", ")\n", "print(f\"s3://{bucket}/{data_prefix}smdeepar/{sm_inference_filename}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 3: Set up the SageMaker estimator and train the model\n", "\n", "유용한 부분으로는 [Python SageMaker SDK](https://sagemaker.readthedocs.io/en/stable/index.html)을 아래와 같이 사용할 수 있습니다.\n", "\n", "1. 알고리즘과 모델의 fitting (학습) 및 하이퍼파라미터를 정의하는 [Estimator](https://sagemaker.readthedocs.io/en/stable/estimators.html)를 생성합니다.\n", "2. fit 과 검증을 수행할 [data channels](https://docs.aws.amazon.com/sagemaker/latest/dg/your-algorithms-training-algo-running-container.html#your-algorithms-training-algo-running-container-inputdataconfig)를 정의합니다.\n", "3. 모델을 데이터에 [Fit](https://sagemaker.readthedocs.io/en/stable/estimators.html#sagemaker.estimator.Estimator.fit)합니다." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "estimator = sagemaker.estimator.Estimator(\n", " image_uri=training_image,\n", " image_name=training_image,\n", " role=sm_role_arn,\n", " instance_count=1,\n", " instance_type=\"ml.m5.4xlarge\",\n", " output_path=f\"s3://{bucket}/output/smdeepar/\",\n", " base_job_name=\"bike-demo-deepar\",\n", " hyperparameters={\n", " \"context_length\": 24*14, # 2 weeks, same as our target forecast window\n", " \"epochs\": 5,\n", " \"prediction_length\": 24*14,\n", " \"time_freq\": \"1H\",\n", " \"early_stopping_patience\": 20,\n", " \"num_eval_samples\": 24*14,\n", " },\n", " max_run=1*60*60,\n", " use_spot_instances=True,\n", " max_wait=1*60*60,\n", ")\n", "\n", "# Training channels can be specified simply as an S3 path string, or using the s3_input API like \n", "# this for more control over distribution and format parameters:\n", "train_channel = sagemaker.session.TrainingInput(\n", " f\"s3://{bucket}/{data_prefix}smdeepar/{sm_train_filename}\",\n", " content_type=\"json\", # (The correct MIME type for JSON lines is still in community debate...)\n", " s3_data_type=\"S3Prefix\"\n", ")\n", "test_channel = sagemaker.inputs.TrainingInput(\n", " f\"s3://{bucket}/{data_prefix}smdeepar/{sm_test_filename}\",\n", " content_type=\"json\", \n", " s3_data_type=\"S3Prefix\"\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "# This will block until training is complete, showing console output below:\n", "estimator.fit(inputs={\"train\": train_channel, \"test\": test_channel },\n", " wait=False)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 4: While the model trains...\n", "\n", "DeepAR은 딥러닝 모델입니다. 그래서 학습시간이 조금 걸리게 됩니다. \n", "학습 시간 동안 이전 Amazon Forecast의 predictor 또는 forecast의 수행 상태를 다시 확인하실 수 있습니다." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "estimator.logs()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 5: Generating predictions with SageMaker Batch Transform\n", "\n", "학습된 SageMaker 모델을 사용하기 위해서는 2가지 우선적인 방법이 있습니다. 실시간 inference 를 위해 endpoints로 모델을 [`deploy()`]를 하거나, batch transform으로 저장된 데이터셋에 적용할 수도 있습니다.\n", "\n", "S3에 이미 저장된 Test 데이터셋이 있기 때문에 이 예제에서는 batch 방법으로 수행합니다. 하지만, 예를 들어 다른 날씨 데이터로 \"what-if\" forecasts를 수행하는 방식으로 interactive 서비스가 필요하다면 endpoint를 생성하는 방법으로 선택할 수도 있습니다.\n", "\n", "batch transform job에서 SageMaker는 아래와 같은 workflow로 동작합니다.\n", "\n", "* 임시 인스턴스에 모델을 배포 (이 방법은 실시간 endpoint를 배포하는 것도 유사함)\n", "* S3에서 입력 데이터를 읽음\n", "* 입력 데이터를 모델 인스턴스로 전달\n", "* 결과 데이터를 S3로 저장\n", "* 임시 인스턴스를 정리\n", "\n", "따라서, 우리는 인스턴스를 배포하거나 삭제하는 것에 대해 고민할 필요가 없습니다.\n", "하지만, 입력 데이터를 모델 인스턴스들로 분리하는 방법과 결과를 함께 수집하는 방법을 SageMaker가 알 수 있도록 해야 합니다." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "job_name = 'bike-demo-deepar-' + time.strftime(\"%m%d-%H%M%s\")\n", "\n", "transformer = estimator.transformer(\n", " instance_count=1, # We only have 2 \"records\" (customer types) - multi-instance would be overkill!\n", " instance_type=\"ml.c4.2xlarge\", # Per the docs, DeepAR only uses CPU at inference time\n", " strategy=\"SingleRecord\", # Send records to the model one at a time\n", " assemble_with=\"Line\", # Join results with a newline in the output file (JSONLines)\n", " output_path=f\"s3://{bucket}/results/smdeepar/untuned\",\n", " env={\n", " # We only want the p10, p50, p90 configs to compare with Amazon Forecast, so will override default output:\n", " \"DEEPAR_INFERENCE_CONFIG\": json.dumps({\n", " \"num_samples\": 100,\n", " \"output_types\": [\"mean\", \"quantiles\"],\n", " \"quantiles\": [\"0.1\", \"0.5\", \"0.9\"]\n", " })\n", " }\n", ")\n", "\n", "trans_batch = transformer.transform(\n", " job_name=job_name,\n", " data=f\"s3://{bucket}/{data_prefix}smdeepar/{sm_inference_filename}\",\n", " split_type=\"Line\", # Records are separated by a newline in the input file (JSONLines)\n", " logs=True,\n", " wait=False\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "transformer.latest_transform_job.wait()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 6: Download and reformat the results\n", "\n", "batch transform의 결과는 S3에 저장되며, 이 노트북으로 결과 파일을 다운로드 받도록 합니다." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "batch_results = transformer.output_path\n", "\n", "!mkdir -p results/smdeepar/untuned\n", "!aws s3 cp --recursive $batch_results results/smdeepar/untuned\n", "print(\"SMDeepAR results folder contents:\")\n", "!ls results/smdeepar/untuned" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "... 이후 메모리에 JSON lines 파일을 로드합니다." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "results_local_filename = f\"results/smdeepar/untuned/{sm_inference_filename}.out\"\n", "results_raw = []\n", "with open(results_local_filename) as f:\n", " for line in f:\n", " results_raw.append(json.loads(line))\n", "\n", "# Note the order of records (hence the correspondence to customer_types) is preserved in SM batch:\n", "assert (\n", " len(results_raw) == len(customer_types)\n", "), \"Mismatch: Batch transform should return one prediction per customer type!\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "... 예측 결과를 우리가 Amazon Forecast에서 했던 동일한 표준 형태로 전환합니다." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "model_id = \"sagemaker-deepar\"\n", "\n", "first_test_ts = target_test_df[\"timestamp\"].iloc[0]\n", "test_start_dt = datetime(\n", " int(first_test_ts[0:4]),\n", " int(first_test_ts[5:7]),\n", " int(first_test_ts[8:10]),\n", " int(first_test_ts[11:13]),\n", " int(first_test_ts[14:16]),\n", " int(first_test_ts[17:])\n", ")\n", "test_end_dt = test_start_dt + timedelta(days=14)\n", "\n", "# Timestamps aren't listed in the DeepAR output, so we synthesize them from the test data:\n", "test_df = target_test_df.copy()\n", "test_df[\"timestamp\"] = pd.to_datetime(test_df[\"timestamp\"])\n", "test_df = test_df[test_df[\"timestamp\"] < test_end_dt]\n", "ctype_test_dfs = {\n", " ctype: test_df[test_df[\"customer_type\"] == ctype]\n", "for ctype in customer_types}\n", "\n", "clean_results_df = pd.DataFrame()\n", "for ix in range(len(customer_types)):\n", " prediction = results_raw[ix]\n", "\n", " df = pd.DataFrame()\n", " df[\"timestamp\"] = ctype_test_dfs[customer_types[ix]][\"timestamp\"]\n", " df[\"model\"] = model_id\n", " df[\"customer_type\"] = customer_types[ix]\n", " df[\"mean\"] = prediction[\"mean\"]\n", " df[\"p10\"] = prediction[\"quantiles\"][\"0.1\"]\n", " df[\"p50\"] = prediction[\"quantiles\"][\"0.5\"]\n", " df[\"p90\"] = prediction[\"quantiles\"][\"0.9\"]\n", "\n", " clean_results_df = clean_results_df.append(df)\n", "\n", "clean_results_df.to_csv(\n", " f\"./results/smdeepar/untuned/results_clean_untuned.csv\",\n", " index=False\n", ")\n", "print(\"Clean results saved to ./results/smdeepar/untuned/results_clean_untuned.csv\")\n", "clean_results_df.head()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 7: Plot the performance\n", "\n", "표준 형식으로 변경한 결과 값들을 Amazon Forecast 노트북에서와 동일한 util 폴더 내 plot 함수를 사용하여 시각화를 합니다." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "first_plot_dt = test_end_dt - timedelta(days=21)\n", "actuals_plot_df = target_train_df.append(target_test_df)\n", "actuals_plot_df[\"timestamp\"] = pd.to_datetime(actuals_plot_df[\"timestamp\"])\n", "actuals_plot_df = actuals_plot_df[\n", " (actuals_plot_df[\"timestamp\"] >= first_plot_dt)\n", " & (actuals_plot_df[\"timestamp\"] < test_end_dt)\n", "]\n", "util.plot_fcst_results(actuals_plot_df, clean_results_df)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "untuned_results_filename = results_local_filename\n", "untuned_clean_results_df = clean_results_df\n", "# !mv results/smdeepar/results_clean.csv results/smdeepar/results_clean_untuned.csv" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 8: Kick off a Hyperparameter Optimization to improve performance\n", "\n", "위 하이퍼파라미터는 [DeepAR tuning documentation](https://docs.aws.amazon.com/sagemaker/latest/dg/deepar-tuning.html)와 데이터셋의 특성에 따라 어느 정도의 생각이 필요합니다.\n", "\n", "이 파라미터를 튜닝하는 것은 다양한 순차적인 조합의 테스트가 오래 걸리기 때문에 많이 어렵습니다. grid search나 수동 탐색 방식은 계산 비용이나 노동력에 관계 없이 비용이 많이 듭니다.\n", "\n", "SageMaker의 [Automatic Model Tuning](https://docs.aws.amazon.com/sagemaker/latest/dg/automatic-model-tuning.html)을 활용할 수 있습니다. 이 방식은 높은 평가 비용의 최적화 문제를 위해 특별히 설계된 베이지안 최적화 방법을 사용합니다.\n", "\n", "[HyperparameterTuner](https://sagemaker.readthedocs.io/en/stable/tuner.html)의 `fit()` 방법은 `Estimator`와 같이 기본적으로 제공되지 않습니다. (HPO 작업은 일반적으로는 오래 걸리기 때문입니다.)\n", "\n", "SageMaker console에 있는 \"Hyperparameter Tuning Jobs\"은 진행되는 작업의 상세 상태와 metrics를 확인하기에 좋은 UI를 제공합니다. \n", "\n", "**이 작업은 다음 섹션이 끝날 때까지 기다릴 필요가 없습니다.**" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "

아래 2개 값을 설정하시기 바랍니다.

" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "max_jobs=4 # TODO: Ideally 12 or more\n", "max_parallel_jobs=2 # TODO: Maybe only 1 for Event Engine, 2-3 if possible" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "tuner = sagemaker.tuner.HyperparameterTuner(\n", " estimator,\n", " #Alternative e.g. objective_metric_name=\"test:RMSE\",\n", " objective_metric_name=\"test:mean_wQuantileLoss\",\n", " hyperparameter_ranges={\n", " \"mini_batch_size\": sagemaker.tuner.IntegerParameter(1, 500),\n", " \"context_length\": sagemaker.tuner.IntegerParameter(10, round(24*14*1.1)),\n", " \"num_cells\": sagemaker.tuner.IntegerParameter(30, 200),\n", " \"num_layers\": sagemaker.tuner.IntegerParameter(1, 8),\n", " },\n", " objective_type=\"Minimize\",\n", " # Defining the maximum number and parallelism of HPO training jobs:\n", " # Note that accounts have protective limits on number of GPU instances by default.\n", " # For Event Engine accounts, default max ml.p3.2xlarge = 2\n", " # Set max_parallel_jobs = (limit / train_instance_count) - 1\n", " # (minus one lets you run HPO and non-HPO in parallel)\n", " max_jobs=max_jobs, # TODO: Ideally 12 or more\n", " max_parallel_jobs=max_parallel_jobs, # TODO: Maybe only 1 for Event Engine, 2-3 if possible\n", " base_tuning_job_name=\"bike-demo-deepar-tuning\"\n", ")\n", "\n", "tuner.fit(\n", " inputs={ \"train\": train_channel, \"test\": test_channel },\n", " wait=False\n", ")\n", "\n", "# Uncomment if you like locking up your notebook for hours:\n", "# tuner.wait()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "tuner.wait()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "... 예측 결과를 우리가 Amazon Forecast에서 했던 동일한 표준 형태로 전환합니다." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 9: Using our hyperparameter tuning job\n", "\n", "SageMaker Console의 \"Hyperparameter Tuning Jobs\" 에서 하이퍼파라미터 튜닝 진행 상태를 확인해 보고, 몇 시간이 걸릴 수 있지만 최종적으로 화면에서 \"Completed\"을 확인할 수 있습니다.\n", "\n", "console에서 직접 SageMaker에 모델 산출물을 등록해서 \"create models\"가 가능하며, 실시간 endpoints 배포 또는 batch transform jobs로 시작할 수도 있습니다.\n", "\n", "Console의 UI에서의 설정 파라미터는 노트북에서 앞서 사용한 소스코드와 직접적으로 일치하지만, SDK의 `fit()`과 `deploy()` 와 관련해서는 Model과 Endpoint 구성 정보를 단순화하거나 줄여서 제공하고 있습니다.\n", "\n", "**연습 : 가이드에 따라 이미 있는 `Inference > Models`와 `Inference > Batch transform jobs`을 사용하여, steps 3과 6에 코드를 재실행하면, console에서 동일 inference 데이터 셋에 대해 가장 좋은 HPO-tuned 모델을 실행할 수 있나요? **\n", "\n", "Note:\n", "\n", "* endpoint 배포가 아닌 batch transform job을 생성한 다음, \n", "* 이전 모델의 결과가 덮어쓰지 않게 하기 위해 새로운 transform job에 대한 다른 output 경로를 선택합니다. 이 경우 이전 경로의 **하위 폴더**로 output 경로를 정한 다음 Step 9 를 동작합니다.\n", "\n", "" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 10: Comparing HPO-tuned and best-guess performance\n", "\n", "마지막 단계를 성공적으로 수행하고 학습 작업이 완료되며, S3의 bucket 내 새로운 `.out`를 확인할 수 있습니다.\n", "\n", "우선적으로 이전 결과를 복사하시기 바랍니다.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.model import Model" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "client = boto3.client('sagemaker')\n", "response = client.describe_training_job(\n", " TrainingJobName=tuner.best_training_job()\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "tuner_model = Model(training_image, \n", " model_data=response['ModelArtifacts']['S3ModelArtifacts'],\n", " role=sm_role_arn)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "tuner_transformer = tuner_model.transformer(\n", " instance_count=1, # We only have 2 \"records\" (customer types) - multi-instance would be overkill!\n", " instance_type=\"ml.c4.2xlarge\", # Per the docs, DeepAR only uses CPU at inference time\n", " strategy=\"SingleRecord\", # Send records to the model one at a time\n", " assemble_with=\"Line\", # Join results with a newline in the output file (JSONLines)\n", " output_path=f\"s3://{bucket}/results/smdeepar/hpo/\",\n", " env={\n", " # We only want the p10, p50, p90 configs to compare with Amazon Forecast, so will override default output:\n", " \"DEEPAR_INFERENCE_CONFIG\": json.dumps({\n", " \"num_samples\": 100,\n", " \"output_types\": [\"mean\", \"quantiles\"],\n", " \"quantiles\": [\"0.1\", \"0.5\", \"0.9\"]\n", " })\n", " }\n", ")\n", "tuner_transformer.transform(\n", " f\"s3://{bucket}/{data_prefix}smdeepar/{sm_inference_filename}\",\n", " split_type=\"Line\", # Records are separated by a newline in the input file (JSONLines)\n", " logs=True,\n", " wait=False\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "tuner_transformer.latest_transform_job.wait()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "마지막으로 튜닝 전과 HPO 튜닝 후 결과를 통합하고 Amazon Forecasts에서 수행한 결과와 비교하는 그래프를 그립니다." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "batch_hpo_results = tuner_transformer.output_path\n", "\n", "!mkdir -p results/smdeepar/hpo\n", "!aws s3 cp --recursive $batch_hpo_results results/smdeepar/hpo\n", "print(\"SMDeepAR results folder contents:\")\n", "!ls results/smdeepar/hpo" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "results_local_filename = f\"results/smdeepar/hpo/{sm_inference_filename}.out\"\n", "results_raw = []\n", "with open(results_local_filename) as f:\n", " for line in f:\n", " results_raw.append(json.loads(line))\n", "\n", "# Note the order of records (hence the correspondence to customer_types) is preserved in SM batch:\n", "assert (\n", " len(results_raw) == len(customer_types)\n", "), \"Mismatch: Batch transform should return one prediction per customer type!\"" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "model_id = \"sagemaker-deepar\"\n", "\n", "first_test_ts = target_test_df[\"timestamp\"].iloc[0]\n", "test_start_dt = datetime(\n", " int(first_test_ts[0:4]),\n", " int(first_test_ts[5:7]),\n", " int(first_test_ts[8:10]),\n", " int(first_test_ts[11:13]),\n", " int(first_test_ts[14:16]),\n", " int(first_test_ts[17:])\n", ")\n", "test_end_dt = test_start_dt + timedelta(days=14)\n", "\n", "# Timestamps aren't listed in the DeepAR output, so we synthesize them from the test data:\n", "test_df = target_test_df.copy()\n", "test_df[\"timestamp\"] = pd.to_datetime(test_df[\"timestamp\"])\n", "test_df = test_df[test_df[\"timestamp\"] < test_end_dt]\n", "ctype_test_dfs = {\n", " ctype: test_df[test_df[\"customer_type\"] == ctype]\n", "for ctype in customer_types}\n", "\n", "clean_results_df = pd.DataFrame()\n", "for ix in range(len(customer_types)):\n", " prediction = results_raw[ix]\n", "\n", " df = pd.DataFrame()\n", " df[\"timestamp\"] = ctype_test_dfs[customer_types[ix]][\"timestamp\"]\n", " df[\"model\"] = model_id+'-hpo'\n", " df[\"customer_type\"] = customer_types[ix]\n", " df[\"mean\"] = prediction[\"mean\"]\n", " df[\"p10\"] = prediction[\"quantiles\"][\"0.1\"]\n", " df[\"p50\"] = prediction[\"quantiles\"][\"0.5\"]\n", " df[\"p90\"] = prediction[\"quantiles\"][\"0.9\"]\n", "\n", " clean_results_df = clean_results_df.append(df)\n", "\n", "clean_results_df.to_csv(\n", " f\"./results/smdeepar/hpo/results_clean_hpo.csv\",\n", " index=False\n", ")\n", "print(\"Clean results saved to ./results/smdeepar/hpo/results_clean_hpo.csv\")\n", "clean_results_df.head()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# !cp -r results/smdeepar/untuned/results_clean.csv results/smdeepar/results_clean_untuned.csv\n", "# !cp -r results/smdeepar/hpo/results_clean.csv results/smdeepar/results_clean_hpo.csv\n", "\n", "comparison_results_df = untuned_clean_results_df.append(clean_results_df)\n", "\n", "comparison_results_df.to_csv(\n", " f\"./results/smdeepar/results_clean.csv\",\n", " index=False\n", ")\n", "print(\"Full results saved to ./results/smdeepar/results_clean.csv\")\n", "comparison_results_df.head()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "util.plot_fcst_merge_results(actuals_plot_df, comparison_results_df)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "비교시 업데이트된 그래프는 어떠한가요? 모델이 개선되었나요?\n", "\n", "console에서 hyperparameter tuning job의 metrics 결과와 일반적인 학습 작업의 결과는 어떠한가요? 양적인 변화가 있나요?" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Extension exercises and exploring further\n", "\n", "Amazon Forecast와 같이 튜닝 전 또는 HPO로 튜닝한 결과 모두 표준 형식으로 저장되어 있습니다.\n", "\n", "Test 데이터셋 성능을 수치화하기 위해 RMSE와 가중 quantile 손실 metrics를 계산할 수 있나요?\n", "\n", "Amazon Forecast과 SageMaker DeepAR 접근의 상대적인 강점과 약점은 무엇일까요?\n", "\n", "하루까지 timestamps를 이동할 경우나 `holiday`와 `workingday` 값을 삭제했을 경우에 SageMaker DeepAR가 Amazon Forecast와 동일한 민감도를 갖나요?\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Thanks for joining in! (Clean-up time)\n", "\n", "Amazon Forecast와 같이 이 워크삽에서는 실시간 predictor endpoint로 배포하지 않습니다. \n", "하지만 몇개의 산출물을 생성했기 때문에 데이터 스토리지 비용이 중요한 경우에는 이를 정리할 필요는 있습니다.Amazon SageMaker 의 왼쪽 메뉴의 탭들과, S3 bucket을 반드시 확인하고 원치 않을 경우 정리하는 것이 필요합니다.\n", "\n", "항상 그렇듯이 SageMaker 노트북을 더이상 사용하지 않으면 중지해야 합니다." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "conda_python3", "language": "python", "name": "conda_python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.12" } }, "nbformat": 4, "nbformat_minor": 4 }