{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Amazon SageMaker Data Wrangler for Churn Data Preprocessing\n", "\n", "Amazon SageMaker Data Wrangler は、単一のビジュアルインターフェイスからデータ選択、クレンジング、探索、視覚化など、データ準備ワークフローの各ステップを簡素化して実現するツールです。\n", "\n", "このサンプルでは、音楽ストリーミングサイトのユーザーアクティビティログから顧客離脱予測に使うための特徴量エンジニアリングを行います。\n", "\n", "このサンプルを実行するには、Amazon SageMaker Studio でこのノートブックを開いて実行してください。\n", "\n", "## 準備" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import sagemaker\n", "import json\n", "import pandas as pd" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sagemaker_session = sagemaker.Session()\n", "bucket = sagemaker_session.default_bucket() # replace with your own bucket name if you have one\n", "prefix = \"churn\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "まずは、今回使用するユーザーアクティビティログを読み込み、スキーマを確認します。\n", "\n", "このサンプルで使用するデータは [EventSim](https://github.com/Interana/eventsim) で生成された1年間(2019/10/28 - 2020/10/28)の\n", "音楽ストリーミングサイトのユーザーアクティビティーデータのデータセットです。\n", "\n", "ユーザーのアクティビティごとにレコードがある状態です。これを集計してユーザーごとのレコードに変換していきます。\n", "\n", "このデータを Python で分析するサンプルについては [こちらのページ](https://sagemaker-examples.readthedocs.io/en/latest/use-cases/customer_churn/1_cust_churn_dataprep.html#Feature-Engineering-with-SageMaker-Processing) をご参照ください。" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!aws s3 cp s3://sagemaker-sample-files/datasets/tabular/customer-churn/customer-churn-data-v2.zip ./data/raw/customer-churn-data.zip" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!unzip -p ./data/raw/customer-churn-data.zip sample.csv > ./data/sample_data.csv" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data = pd.read_csv(\"data/sample_data.csv\")\n", "data.head(5)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "スキーマを確認できたら、S3 にアップロードします" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "input_data_path = sagemaker_session.upload_data('data/sample_data.csv', bucket=bucket, key_prefix= f\"{prefix}/input\")\n", "input_data_path" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "次に、アップロードした S3 の URI を Amazon SageMaker Data Wrangler で使用する `dw_example.flow` に書き込みます。" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "flow_file = \"dw_example.flow\"\n", "\n", "# read flow file and change the s3 location to our `processing_output_filename`\n", "with open(flow_file, \"r\") as f:\n", " flow = f.read()\n", "\n", " flow = json.loads(flow)\n", " flow[\"nodes\"][0][\"parameters\"][\"dataset_definition\"][\"s3ExecutionContext\"][\n", " \"s3Uri\"\n", " ] = input_data_path\n", "\n", "with open(\"dw_example.flow\", \"w\") as f:\n", " json.dump(flow, f)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "これで準備が整いましたので、`dw_example.flow` をクリックして SageMaker Studio で SageMaker Data Wrangler を開きます。\n", "\n", "## 特徴量エンジニアリングワークフローの実行\n", "\n", "以降のステップは SageMaker Data Wrangler のタブで行います。\n", "\n", "SageMaker Data Wrangler を開くと最初にワークフローが表示されます。\n", "\n", "S3 からデータをインポートし、データ型を指定し、複数の変換 Step を実行しています。\n", "\n", "![](images/flow.png)\n", "\n", "Steps にマウスをホバーすると **+** マークが出現するのでクリックして **Add Transform** を選択します。\n", "\n", "![](images/add_transform.png)\n", "\n", "そうすると、データとデータに与えられた変換の一覧が表示されます。\n", "\n", "ステップを選択すると、そのステップの変換の内容とそのステップを実行したあとのデータが表示されます。\n", "\n", "どのような変換が使用されているのか確認してみましょう。\n", "\n", "複数のステップがありますが大まかに以下のようなステップになっています\n", "\n", "1. csv データを s3 から取得\n", "2. データの各列の型を指定\n", "3. 必要のない行と列を削除する\n", "4. 学習に使うデータ範囲を決め、顧客離脱(churn)を定義し列を追加。\n", " - 2020/07/01 以前のデータを学習に使い、それ以降で `Cancellation Confirmation` イベントがあれば顧客離脱とする\n", "5. sessionId や userId をもとにグループして集計して列を追加\n", " - sessionId でグルーピングし、セッションごとのアクティビティや活動時間を集計する\n", " - userId でグルーピングし、様々な特徴を作成する\n", "6. 必要のない列を削除し、重複排除を行うことでユーザーごとの行にする\n", "\n", "![](images/transform_list.png)\n", "\n", "データの前処理が完了したら、Data flow に戻り、Steps の **+** マークをクリックして Add Destination から s3 に csv としてエクスポートする設定をします。\n", "\n", "![](images/export_s3.png)\n", "\n", "画面右上の **Create job** からジョブを作成し実行すると SageMaker Processing によるジョブが実行され s3 に結果が出力されます。\n", "\n", "![](images/create_job.png)\n", "\n", "処理が終わったら Notebook と Data Wrangler で使用しているインスタンスを終了します。\n", "明示的にインスタンスを終了しないと起動されたままなので気をつけてください。\n", "\n", "![](images/terminate.png)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Next Step\n", "\n", "このサンプルでは Amazon SageMaker Data Wrangler を使用してデータの特徴量エンジニアリングを行いました。\n", "\n", "出力されたデータからそのまま SageMaker Autopilot でモデルを学習させることも可能ですし、SageMaker Canvas を使用してインタラクティブにモデルを作成することも可能です。\n", "\n", "今回は試行錯誤のためにユーザー数が 100 人ほどの小さなデータセットの `sample_data.csv` を使用しましたが、モデルを学習させる際は最低 500 人ほどのデータが必要になります。\n", "\n", "次のステップとして自社のデータか、[こちら](https://sagemaker-examples.readthedocs.io/en/latest/use-cases/customer_churn/1_cust_churn_dataprep.html#Feature-Engineering-with-SageMaker-Processing)を参考により大きなデータセットを作成してみましょう。\n", "\n", "また、今回はジョブを作成して s3 に出力しましたが、SageMaker Pipeline や Python スクリプトにエクスポートして定期実行することも可能です。\n", "\n", "今回は s3 から csv ファイルをダウンロードしましたが、他にも Redshift や Athena などからクエリしてデータをインポートすることも可能です。" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "instance_type": "ml.m5.large", "kernelspec": { "display_name": "Python 3.8.9 64-bit", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.9" }, "vscode": { "interpreter": { "hash": "31f2aee4e71d21fbe5cf8b01ff0e069b9275f58929596ceb00d14d90e3e16cd6" } } }, "nbformat": 4, "nbformat_minor": 4 }