{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# [모듈 01] Amazon S3 버킷에서 SageMaker Feature Store로 자동화된 데이터 변환 및 수집\n", "\n", "이 노트북은 다음과 같은 작업을 합니다.\n", "\n", "1. 아키텍처 개요\n", "2. 전제 조건\n", "3. 기본 변수 로딩\n", " - S3 data prefix to monitor : CSV 가 업로드가 되는 S3 Prefix\n", " - Data Wrangler flow URL\n", " - Data Wrangler output name\n", " - Feature group name\n", "4. 세이지 메이커 사용자 템플릿을 통한 프로젝트 생성\n", "5. 세이지 메이커 사용자 템플릿을 통한 프로젝트 분석 (데이타 수집 프로젝트)\n", "6. 자동화 파이프라인 테스트\n", " - 지정된 S3 Prefix 에 CSV 를 업로딩하여 파이브라인을 자동으로 실행\n", "7. 피쳐 스토어에서 데이터 확인\n", "---" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# 1. 아키텍처 개요\n", "이 노트북은 [AWS 서비스 카탈로그](https://aws.amazon.com/servicecatalog), [SageMaker 프로젝트](https://docs.aws.amazon.com/sagemaker/latest/dg/sagemaker) 사용 방법을 보여줍니다. -projects-whatis.html) 및 [Pipelines](https://aws.amazon.com/sagemaker/pipelines/)를 사용하여 SageMaker Studio에서 재사용 가능하고 이식 가능한 구성 요소를 생성합니다.\n", "\n", "이 프로젝트는 S3 버킷에 업로드된 새 데이터 파일에서 트리거되는 [SageMaker Feature Store](https://aws.amazon.com/sagemaker/feature-store/)로의 기능 변환 및 수집을 자동화합니다. SageMaker 프로젝트는 필요한 모든 구성 요소를 생성하고 리소스 간의 모든 권한 및 링크를 설정합니다.\n", "\n", "\"solution" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# 2. 전제 조건\n", "\n", "## 2.1. 기본 리소스 생성 \n", "- SageMaker 프로젝트 배포를 진행하려면 먼저 다음 리소스를 생성해야 합니다.\n", "- 이 모든 작업은 [`00-setup` 노트북](00-setup.ipynb)에서 수행됩니다. 이 노트북을 실행하기 전에 설정 노트북을 실행했는지 확인하십시오.\n", " - 출력 노드를 포함하는 Data Wrangler `.flow` 파일. `.flow` 파일은 지정된 S3 접두사에 업로드해야 합니다.\n", " - 데이터에서 추출한 피쳐를 저장할 피쳐 그룹\n", " - SageMaker 프로젝트 포트폴리오 -> [초기 설정]으로 완료(../README.md#deploy-sagemaker-project-portfolio)\n", " - 새 데이터 파일이 업로드될 S3 버킷\n", "\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 2.2. AmazonSageMakerServiceCatalogProductsLaunchRole 에 권한 추가\n", "
💡 AmazonSageMakerServiceCatalogProductsLaunchRole 에 AWSLambda_FullAccess, IAMFullAccess 추가 함
" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "%%sh\n", "\n", "aws iam attach-role-policy \\\n", " --role-name AmazonSageMakerServiceCatalogProductsLaunchRole \\\n", " --policy-arn arn:aws:iam::aws:policy/AWSLambda_FullAccess\n", "\n", "aws iam attach-role-policy \\\n", " --role-name AmazonSageMakerServiceCatalogProductsLaunchRole \\\n", " --policy-arn arn:aws:iam::aws:policy/IAMFullAccess\n", "\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# 3. 기본 변수 로딩" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "2.70.0\n" ] } ], "source": [ "import sagemaker\n", "import boto3\n", "import time\n", "import json\n", "import os\n", "from time import gmtime, strftime\n", "from sagemaker.workflow.pipeline import Pipeline\n", "from sagemaker.feature_store.feature_group import FeatureGroup\n", "from sagemaker.session import Session\n", "\n", "print(sagemaker.__version__)" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "# load environment variables from %store\n", "%store -r " ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Stored variables and their in-db values:\n", "abalone_dataset_file_name -> 'abalone.csv'\n", "abalone_dataset_local_url -> '../dataset/abalone.csv'\n", "data_bucket -> 'sagemaker-ap-northeast-2-018763497627'\n", "domain_id -> 'd-lnzveulicvk5'\n", "dw_flow_file_url -> 's3://sagemaker-ap-northeast-2-018763497627/featur\n", "dw_output_name -> 'c8880ed5-b8a0-4375-899b-1c4d86828152.default'\n", "execution_role -> 'arn:aws:iam::018763497627:role/mod-6297809195fe48\n", "feature_group_name -> 'FG-abalone-07-11-12-33-f12df440'\n", "s3_data_prefix -> 'sagemaker-ap-northeast-2-018763497627/feature-sto\n", "s3_flow_prefix -> 'sagemaker-ap-northeast-2-018763497627/feature-sto\n", "s3_fs_query_output_prefix -> 'sagemaker-ap-northeast-2-018763497627/feature-sto\n", "s3_input_data_prefix -> 'sagemaker-ap-northeast-2-018763497627/feature-sto\n", "s3_to_fs_pipeline_name -> 's3-fs-ingest-pipeline-p-ygzud1btjy9u'\n" ] } ], "source": [ "%store" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "try:\n", " data_bucket\n", " dw_flow_file_url\n", " dw_output_name\n", " feature_group_name\n", " s3_fs_query_output_prefix\n", " s3_data_prefix\n", " s3_flow_prefix\n", " abalone_dataset_local_url\n", "except NameError:\n", " print(\"+++++++++++++++++++++++++++++++++++++++++++++++\")\n", " print(\"[ERROR] YOU HAVE TO RUN 00-setup.ipynb notebook\")\n", " print(\"+++++++++++++++++++++++++++++++++++++++++++++++\")" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Stored 's3_input_data_prefix' (str)\n" ] } ], "source": [ "# Set the string literals\n", "s3_input_data_prefix = f\"{data_bucket}/feature-store-ingestion-pipeline/landing-zone/\"\n", "pipeline_name_prefix = \"s3-fs-ingest-pipeline\"\n", "\n", "%store s3_input_data_prefix" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "프로젝트 생성을 위한 아래 주요한 변수를 확인 하세요." ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Project parameters:\n", "S3 data prefix to monitor: sagemaker-ap-northeast-2-018763497627/feature-store-ingestion-pipeline/landing-zone/\n", "Data Wrangler flow URL: s3://sagemaker-ap-northeast-2-018763497627/feature-store-ingestion-pipeline/dw-flow/dw2-flow-07-11-12-33-f12df440.flow\n", "Data Wrangler output name: c8880ed5-b8a0-4375-899b-1c4d86828152.default\n", "Feature group name: FG-abalone-07-11-12-33-f12df440\n" ] } ], "source": [ "print(\"Project parameters:\")\n", "print(f\"S3 data prefix to monitor: {s3_input_data_prefix}\")\n", "print(f\"Data Wrangler flow URL: {dw_flow_file_url}\")\n", "print(f\"Data Wrangler output name: {dw_output_name}\")\n", "print(f\"Feature group name: {feature_group_name}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# 4. 세이지 메이커 사용자 템플릿을 통한 프로젝트 생성\n", "- ⭐ Studio IDE(옵션 1)에서 프로젝트를 생성하거나 이 노트북에서 직접 프로그래밍 방식으로(옵션 2) 프로젝트를 생성할 수 있습니다. \n", "- 옵션 2는 수동 입력이 필요하지 않으므로 권장됩니다. \n", " - 옵션 1은 프로젝트 매개변수에 대한 UX를 시연하기 위해 제공됩니다." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 4.1. 옵션 1: Studio에서 프로젝트 만들기\n", "\n", "1. **SageMaker 리소스** 위젯에서 **프로젝트**를 선택합니다.\n", "\n", "\"studio-create-project\"\n", "\n", "2. **조직 템플릿**으로 이동하여 자동화된 변환 및 수집 파이프라인을 위한 프로젝트 템플릿을 선택합니다. **프로젝트 템플릿 선택**을 클릭합니다.\n", "\n", "\n", "\n", "3. 프로젝트 파라미터를 입력 하세요.\n", "\n", "\n", "매개변수는 다음과 같습니다.\n", "- **프로젝트 이름 및 설명**: 프로젝트 이름 및 설명 제공\n", "- **파이프라인 이름 접두사**: 파이프라인 이름에 대한 접두사를 제공하거나 기본값을 그대로 둡니다.\n", "- **파이프라인 설명**: 파이프라인에 대한 설명을 제공하거나 기본값을 그대로 둡니다.\n", "- **S3 접두사**: `s3_input_data_prefix` 변수의 값으로 설정\n", "- **Data Wrangler flow S3 url**: `dw_flow_file_url` 변수 값으로 설정\n", "- **Data Wrangler 출력 이름**: `dw_output_name` 변수의 값으로 설정\n", "- **기능 그룹 이름**: `feature_group_name` 변수의 값으로 설정\n", "- **Lambda 실행 역할**: 람다 함수에 대한 고유한 IAM 역할을 제공하거나 '자동'으로 두어 새 역할을 자동으로 생성합니다.\n", "\n", "**프로젝트 만들기**를 클릭합니다.\n", "\n", "
💡 프로젝트 생성이 완료될 때까지 기다리기 \n", "
\n", "배너 \"프로젝트 생성 중...\":\n", "\n", "\"studio-creating-project-banner\"\n", "\n", "프로젝트 세부 정보 페이지로 변경됩니다.\n", "\n", "" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### 생성된 프로젝트의 이름과 ID를 가져옵니다.\n", "\n", "
💡 옵션 1 - Studio IDE에서 프로젝트 생성을 사용하는 경우에만 다음 셀을 실행하십시오. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Get the latest created project\n", "sm = boto3.client(\"sagemaker\")\n", "r = sm.list_projects(SortBy=\"CreationTime\", SortOrder=\"Descending\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "r" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "if r.get(\"ProjectSummaryList\") is None or len(r.get(\"ProjectSummaryList\")) == 0:\n", " raise Exception(\"[ERROR]: cannot retrieve the project list!\")\n", " \n", "if r[\"ProjectSummaryList\"][0][\"ProjectStatus\"] not in (\"CreateCompleted\"):\n", " raise Exception(\"[ERROR]: wait until project creation is completed!\")\n", "else:\n", " project_name = r[\"ProjectSummaryList\"][0][\"ProjectName\"]\n", " project_id = r[\"ProjectSummaryList\"][0][\"ProjectId\"]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### End of Option 1 section\n", "---" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 4.2 옵션 2: 코드로 프로젝트 생성 - 권장\n", "
💡 Studio IDE를 통해 프로젝트를 만든 경우 이 섹션을 건너뛰세요. " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "- [boto3 Python SDK](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html#SageMaker.Client.create_project)를 사용하여 노트북에서 새 프로젝트를 생성하거나 파이썬 코드를 사용하세요..\n", "- 먼저 서비스 카탈로그 CloudFormation 템플릿에서 `ProvisioningArtifactIds` 및 `ProductId`를 가져옵니다." ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "{'Stacks': [{'StackId': 'arn:aws:cloudformation:ap-northeast-2:018763497627:stack/sm-project-sc-portfolio/a54a3d10-8806-11ec-88e0-0278d35f1cce',\n", " 'StackName': 'sm-project-sc-portfolio',\n", " 'Description': 'Create Service Catalog products as SageMaker project templates for various re-usable components\\n',\n", " 'Parameters': [{'ParameterKey': 'SCPortfolioPrincipalRoleArn',\n", " 'ParameterValue': 'arn:aws:iam::018763497627:role/mod-6297809195fe4845-SageMakerExecutionRole-1A1EDDFAJER7W'},\n", " {'ParameterKey': 'SCProductLaunchRoleArn',\n", " 'ParameterValue': 'AmazonSageMakerServiceCatalogProductsLaunchRole'}],\n", " 'CreationTime': datetime.datetime(2022, 2, 7, 11, 11, 6, 712000, tzinfo=tzlocal()),\n", " 'RollbackConfiguration': {},\n", " 'StackStatus': 'CREATE_COMPLETE',\n", " 'DisableRollback': True,\n", " 'NotificationARNs': [],\n", " 'Capabilities': ['CAPABILITY_NAMED_IAM'],\n", " 'Outputs': [{'OutputKey': 'ProductName',\n", " 'OutputValue': 'Automated Feature Transformation and Ingestion Pipeline v1.0',\n", " 'Description': 'Service Catalog data science product name'},\n", " {'OutputKey': 'ProvisioningArtifactIds',\n", " 'OutputValue': 'pa-nhut374mh6ie4',\n", " 'Description': 'Service Catalog data science provisioning artifact Ids'},\n", " {'OutputKey': 'AmazonSageMakerExecutionRolePolicyArn',\n", " 'OutputValue': 'arn:aws:iam::018763497627:policy/sm-project-sc-portfolio-AmazonSageMakerExecutionRolePolicy-1XRAPW3P21850',\n", " 'Description': 'Managed policy for Amazon SageMaker execution role with permissions to run the notebooks with Feature Store ingestion experiments'},\n", " {'OutputKey': 'PortfolioId',\n", " 'OutputValue': 'port-vm7gex7cjqwlw',\n", " 'Description': 'Service Catalog data science portfolio Id'},\n", " {'OutputKey': 'AmazonSageMakerExecutionRoleName',\n", " 'OutputValue': 'mod-6297809195fe4845-SageMakerExecutionRole-1A1EDDFAJER7W',\n", " 'Description': 'Name of the Amazon SageMaker execution role'},\n", " {'OutputKey': 'ProductId',\n", " 'OutputValue': 'prod-5polw6pb5aft2',\n", " 'Description': 'Service Catalog data science product Id'},\n", " {'OutputKey': 'ProvisioningArtifactNames',\n", " 'OutputValue': 'Automated Feature Transformation and Ingestion Pipeline v1.0',\n", " 'Description': 'Service Catalog data science provisioning artifact names'},\n", " {'OutputKey': 'FSIngestionProductPolicyArn',\n", " 'OutputValue': 'arn:aws:iam::018763497627:policy/sm-project-sc-portfolio-AmazonSageMakerServiceCatalogFSIngestionProductPolicy-1D50I8CJWCHGM',\n", " 'Description': 'Managed policy for AmazonSageMakerServiceCatalogProductsLaunchRole to launch an Feature Store ingestion product'}],\n", " 'Tags': [],\n", " 'EnableTerminationProtection': False,\n", " 'DriftInformation': {'StackDriftStatus': 'NOT_CHECKED'}}],\n", " 'ResponseMetadata': {'RequestId': 'e8bdf448-c392-4283-bd74-ec794148de4c',\n", " 'HTTPStatusCode': 200,\n", " 'HTTPHeaders': {'x-amzn-requestid': 'e8bdf448-c392-4283-bd74-ec794148de4c',\n", " 'content-type': 'text/xml',\n", " 'content-length': '4084',\n", " 'date': 'Mon, 07 Feb 2022 11:15:25 GMT'},\n", " 'RetryAttempts': 0}}" ] }, "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], "source": [ "cf = boto3.client(\"cloudformation\")\n", "\n", "r = cf.describe_stacks(StackName=\"sm-project-sc-portfolio\")\n", "r" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "SageMaker 프로젝트에 대한 매개변수 설정:" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [], "source": [ "sm = boto3.client(\"sagemaker\")\n", "\n", "provisioning_artifact_ids = [v for v in r[\"Stacks\"][0][\"Outputs\"] if v[\"OutputKey\"] == \"ProvisioningArtifactIds\"][0][\"OutputValue\"]\n", "product_id = [v for v in r[\"Stacks\"][0][\"Outputs\"] if v[\"OutputKey\"] == \"ProductId\"][0][\"OutputValue\"]\n", "project_name = f\"s3-fs-ingest-{strftime('%d-%H-%M-%S', gmtime())}\"\n", "project_parameters = [\n", " {\n", " 'Key': 'PipelineDescription',\n", " 'Value': 'Feature Store ingestion pipeline'\n", " },\n", " {\n", " 'Key': 'DataWranglerFlowUrl',\n", " 'Value': dw_flow_file_url\n", " },\n", " {\n", " 'Key': 'DataWranglerOutputName',\n", " 'Value': dw_output_name\n", " },\n", " {\n", " 'Key': 'S3DataPrefix',\n", " 'Value': s3_input_data_prefix\n", " },\n", " {\n", " 'Key': 'FeatureGroupName',\n", " 'Value': feature_group_name\n", " },\n", " {\n", " 'Key': 'PipelineNamePrefix',\n", " 'Value': pipeline_name_prefix\n", " },\n", " ]" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[{'Key': 'PipelineDescription', 'Value': 'Feature Store ingestion pipeline'},\n", " {'Key': 'DataWranglerFlowUrl',\n", " 'Value': 's3://sagemaker-ap-northeast-2-018763497627/feature-store-ingestion-pipeline/dw-flow/dw2-flow-07-11-12-33-f12df440.flow'},\n", " {'Key': 'DataWranglerOutputName',\n", " 'Value': 'c8880ed5-b8a0-4375-899b-1c4d86828152.default'},\n", " {'Key': 'S3DataPrefix',\n", " 'Value': 'sagemaker-ap-northeast-2-018763497627/feature-store-ingestion-pipeline/landing-zone/'},\n", " {'Key': 'FeatureGroupName', 'Value': 'FG-abalone-07-11-12-33-f12df440'},\n", " {'Key': 'PipelineNamePrefix', 'Value': 's3-fs-ingest-pipeline'}]" ] }, "execution_count": 10, "metadata": {}, "output_type": "execute_result" } ], "source": [ "project_parameters" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "마지막으로 서비스 카탈로그 제품 템플릿에서 SageMaker 프로젝트를 생성합니다." ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "{'ProjectArn': 'arn:aws:sagemaker:ap-northeast-2:018763497627:project/s3-fs-ingest-07-11-15-27', 'ProjectId': 'p-opu00heyaxaa', 'ResponseMetadata': {'RequestId': '5680400e-1d3a-4a04-af13-ef428380f0c5', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '5680400e-1d3a-4a04-af13-ef428380f0c5', 'content-type': 'application/x-amz-json-1.1', 'content-length': '124', 'date': 'Mon, 07 Feb 2022 11:15:29 GMT'}, 'RetryAttempts': 0}}\n" ] } ], "source": [ "# create SageMaker project\n", "r = sm.create_project(\n", " ProjectName=project_name,\n", " ProjectDescription=\"Feature Store ingestion from S3\",\n", " ServiceCatalogProvisioningDetails={\n", " 'ProductId': product_id,\n", " 'ProvisioningArtifactId': provisioning_artifact_ids,\n", " 'ProvisioningParameters': project_parameters\n", " },\n", ")\n", "\n", "print(r)\n", "project_id = r[\"ProjectId\"]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
💡 프로젝트 생성이 완료될 때까지 기다리기 \n", "
\n", "클라우드 포메이션 콘솔로 이동해서 확인하셔도 되고, 세이지 메이커 프로젝트 화면에서 확인도 가능합니다.\n", "\n", "![cf-project.png](img/cf-project.png)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 옵션 2 섹션 끝\n", "---" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# 5. 세이지 메이커 사용자 템플릿을 통한 프로젝트 분석 (데이타 수집 프로젝트)\n", "- 아래의 스튜디오 화면은 위의 UI 에서 프로젝트 생성으로 방식으로 들어가시면 됩니다. \n", "- 이미 옵션2를 실행 하셨으면 Project 한 개가 보입니다. 클릭하시고 보세요." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 5.1. 프로젝트 템플릿은 자동화된 데이터 변환 및 수집에 필요한 모든 리소스를 생성\n", "- 지정된 S3 접두사에 새 데이터가 업로드될 때마다 AWS Lambda 함수를 시작하기 위한 [EventBridge 규칙](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-rules.html)\n", "- SageMaker 파이프라인을 시작하는 AWS Lambda 함수\n", "- DataWrangler 프로세서를 사용하여 처리 작업을 실행하는 SageMaker 파이프라인\n", "- 업로드된 '.flow' 파일을 데이터 변환 워크플로와 함께 사용하는 DataWrangler 프로세서\n", "\n", "## 5.2 시드 코드가 있는 CodeCommit 리포지토리\n", "- 파이프라인 생성 및 파이프라인 파라미터 구성을 위한 모든 소스 코드는 [AWS CodeCommit](https://aws.amazon.com/codecommit/) 리포지토리로 제공됩니다. \n", " - 코드는 완벽하게 작동하며 즉시 사용할 수 있습니다. 이 코드를 소유하고 요구 사항에 따라 파이프라인의 구성 또는 매개변수를 변경할 수 있습니다.\n", "- 코드 작업을 시작하려면 저장소를 Studio 사용자의 홈 디렉토리에 복제해야 합니다.\n", "\n", "\"studo-project-clone-repo\"\n", "\n", "- 소스 코드를 변경하고 CodeCommit 리포지토리로 푸시할 수 있습니다. 프로젝트는 또한 [AWS CodeBuild](https://aws.amazon.com/codebuild/) 단계를 시작하는 [AWS CodePipeline](https://aws.amazon.com/codepipeline/) CI/CD 파이프라인을 제공합니다. \n", "- 저장소에 새 커밋이 있을때 마다 빌드는 저장소에서 코드를 가져오고 `create_pipeline` 함수(파일 `build.py`)를 호출합니다. \n", "- 기존 코드를 변경하거나 `pipeline.py` 파일의 `pipeline.create_pipeline`에 고유한 코드를 제공할 수 있습니다. \n", " - 경로: amazon-sagemaker-reusable-components-kr/project-seed-code/s3-fs-ingestion/pipeline.py\n", "```\n", " # create DW processor\n", " processor = Processor(\n", " role=execution_role,\n", " image_uri=container_uri,\n", " instance_count=p_processing_instance_count,\n", " instance_type=p_processing_instance_type,\n", " volume_size_in_gb=p_processing_volume_size,\n", " sagemaker_session=sagemaker_session,\n", " )\n", "\n", " step_process = ProcessingStep(\n", " name=\"datawrangler-processing-to-feature-store\",\n", " processor=processor,\n", " inputs=[flow_input] + [data_input],\n", " outputs=[processing_job_output],\n", " job_arguments=[f\"--output-config '{json.dumps(output_config)}'\"],\n", " )\n", "\n", " pipeline = Pipeline(\n", " name=pipeline_name,\n", " parameters=[\n", " p_processing_instance_type, \n", " p_processing_instance_count,\n", " p_processing_volume_size,\n", " p_flow_output_name,\n", " p_input_flow,\n", " p_input_data,\n", " p_feature_group_name\n", " ],\n", " steps=[step_process],\n", " sagemaker_session=sagemaker_session\n", " )\n", "\n", " response = pipeline.upsert(\n", " role_arn=execution_role,\n", " description=pipeline_description,\n", " tags=[\n", " {'Key': 'sagemaker:project-name', 'Value': project_name },\n", " {'Key': 'sagemaker:project-id', 'Value': project_id }\n", " ],\n", " )\n", "```\n", "\n", "- 기본 코드는 Data Wrangler 프로세서를 사용하여 SageMaker 파이프라인을 구성하고 파이프라인을 upserts합니다." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 5.3. SageMaker 파이프라인\n", "- 이 프로젝트는 Data Wrangler 프로세서를 사용한 하나의 처리 단계로 구성된 SageMaker 파이프라인을 제공합니다. \n", "- 파이프라인은 지정된 Data Wrangler `.flow` 파일에 포함된 변환을 수행하고 피쳐 저장소의 지정된 피쳐 그룹에서 변환된 피쳐를 수집합니다.\n", "- 이 파이프라인은 지정된 S3 위치에 새 파일이 업로드될 때마다 Lambda 함수에 의해 시작됩니다. \n", "- 파이프라인은 프로젝트에 연결되어 있으며 프로젝트 세부정보 페이지의 **파이프라인** 탭에서 사용할 수 있습니다.\n", "\n", "\"studio-project-details-pipelines\"\n", "\n", "여기에서 파이프라인 그래프, 매개변수, 설정 및 실행 기록을 볼 수 있습니다.\n", "\n", "\"studio-pipeline-execution-history\"\n", "\n", "\n", "**실행 시작**을 클릭하고 파이프라인 매개변수를 제공하여 Studio에서 수동으로 새 실행을 시작할 수도 있습니다.\n", "\n", "\"studio-pipeline-parameter-input\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# 6. 자동화 파이프라인 테스트\n", "\n", "배포된 데이터 변환 및 피쳐 저장소 수집 파이프라인을 테스트하려면 다음 단계를 수행하십시오.\n", "1. 모니터링되는 S3 접두사 위치에 데이터 파일을 업로드합니다. 그러면 데이터 파이프라인을 통해 데이터 변환 및 수집이 시작됩니다.\n", "1. 파이프라인 실행 모니터링\n", "1. 피쳐 그룹에 로드된 데이터 확인" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "⭐ EventBridge 규칙은 'PutObject' 및 'CompleteMultipartUpload'라는 두 가지 S3 이벤트를 모니터링합니다. 두 S3 버킷 간에 객체를 복사하면 EventBrige 규칙이 시작되지 않습니다." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "다음 S3 `PUT` 이벤트는 새 파이프라인 실행을 시작하는 Lambda 함수를 시작합니다.\n", "![S3-eventbridge-rule.png](img/S3-eventbridge-rule.png)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "람다 함수는 파이프라인을 실행 함\n", "![run-pipeline-lambda.png](img/run-pipeline-lambda.png)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 6.1. 데이터를 S3 버킷에 업로드" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "file_name = f\"abalone-{strftime('%d-%H-%M-%S', gmtime())}.csv\"" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!aws s3 cp {abalone_dataset_local_url} s3://{s3_input_data_prefix}{file_name}" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 6.2. 파이프라인 실행을 모니터링" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "try:\n", " project_id\n", " project_name\n", "except NameError:\n", " raise Exception(\"[ERROR]: project_id or project_name variables are not set\")\n", " \n", "if project_id is None or project_name is None:\n", " raise Exception(\"[ERROR]: project_id or project_name variables are not set\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Get the the project data\n", "r = sm.describe_project(ProjectName=project_name)\n", "\n", "# Get the pipeline prefix from the project parameters\n", "pipeline_name_prefix = [p for p in r[\"ServiceCatalogProvisioningDetails\"][\"ProvisioningParameters\"] if p[\"Key\"] == \"PipelineNamePrefix\"][0][\"Value\"]" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pipeline_name_prefix" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# set the pipeline name\n", "s3_to_fs_pipeline_name = f\"{pipeline_name_prefix}-{project_id}\"\n", "\n", "%store s3_to_fs_pipeline_name" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# check pipeline execution \n", "summaries = sm.list_pipeline_executions(PipelineName=s3_to_fs_pipeline_name).get('PipelineExecutionSummaries')\n", "summaries" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "latest_execution = sm.list_pipeline_executions(PipelineName=s3_to_fs_pipeline_name).get('PipelineExecutionSummaries')[0].get('PipelineExecutionArn')\n", "print (latest_execution)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Wait for pipeline execution to complete 'Executing' status\n", "while sm.describe_pipeline_execution(PipelineExecutionArn=latest_execution)[\"PipelineExecutionStatus\"] == \"Executing\":\n", " print('Pipeline is in Executing status...')\n", " time.sleep(30)\n", " \n", "print('Pipeline is done Executing')\n", "print(sm.describe_pipeline_execution(PipelineExecutionArn=latest_execution))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "또는 Studio의 파이프라인 위젯 내에서 파이프라인 실행을 모니터링할 수 있습니다.\n", "\n", "![](../img/studio-pipeline-executing.png)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 7. 피쳐 스토어에서 데이터 확인\n", "실행이 완료되면 데이터가 피쳐 그룹에 로드되었는지 확인할 수 있습니다." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "피쳐 그룹 오브젝트를 생성" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "feature_store_session = Session()\n", "\n", "feature_group = FeatureGroup(\n", " name=feature_group_name, \n", " sagemaker_session=feature_store_session\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Build SQL query to features group\n", "fs_query = feature_group.athena_query()\n", "\n", "query_string = f'SELECT * FROM \"{fs_query.table_name}\"'\n", "print(f'Prepared query {query_string}')\n", "print(fs_query)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Run Athena query. The output is loaded to a Pandas dataframe.\n", "fs_query.run(\n", " query_string=query_string, \n", " output_location=f\"s3://{s3_fs_query_output_prefix}\"\n", ")\n", "\n", "fs_query.wait()\n", "data_df = fs_query.as_dataframe()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The `DataFrame` contains now all features from the feature group:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data_df" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 8. Next\n", " [`99-clean-up` notebook](99-clean-up.ipynb) 로 이동하셔서 리소스를 제거 하세요." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# A. 에러 케이스 및 트러블 슈팅" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## A.1. Project 실행시 에러" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "![lambda_error.png](../img2/lambda_error.png)\n", "\n", "\n", "## 해결\n", "- AmazonSageMakerServiceCatalogProductsLaunchRole 에 LambdaFull 추가 함\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## A.2 랭글러 에러\n", "- 원본의 에러 발생은 데이터 소스가 달라서 밠생 함.\n", "- [해결] 직접 데이터 랭글러 데이터 플로우를 생성하여 해결 함.\n", "![wrangler_erro.png](../img2/wrangler_error.png)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "instance_type": "ml.t3.medium", "kernelspec": { "display_name": "Python 3 (Data Science)", "language": "python", "name": "python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:ap-northeast-2:806072073708:image/datascience-1.0" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.10" } }, "nbformat": 4, "nbformat_minor": 4 }