"# Get the latest created project\n",
"sm = boto3.client(\"sagemaker\")\n",
"r = sm.list_projects(SortBy=\"CreationTime\", SortOrder=\"Descending\")"
"if r.get(\"ProjectSummaryList\") is None or len(r.get(\"ProjectSummaryList\")) == 0:\n",
" raise Exception(\"[ERROR]: cannot retrieve the project list!\")\n",
" \n",
"if r[\"ProjectSummaryList\"][0][\"ProjectStatus\"] not in (\"CreateCompleted\"):\n",
" raise Exception(\"[ERROR]: wait until project creation is completed!\")\n",
" project_name = r[\"ProjectSummaryList\"][0][\"ProjectName\"]\n",
" project_id = r[\"ProjectSummaryList\"][0][\"ProjectId\"]"
"### End of Option 1 section\n",
"## 4.2 옵션 2: 코드로 프로젝트 생성 - 권장\n",
Studio IDE를 통해 프로젝트를 만든 경우 이 섹션을 건너뛰세요. "
"- [boto3 Python SDK](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html#SageMaker.Client.create_project)를 사용하여 노트북에서 새 프로젝트를 생성하거나 파이썬 코드를 사용하세요..\n",
"- 먼저 서비스 카탈로그 CloudFormation 템플릿에서 `ProvisioningArtifactIds` 및 `ProductId`를 가져옵니다."
"{'Stacks': [{'StackId': 'arn:aws:cloudformation:ap-northeast-2:018763497627:stack/sm-project-sc-portfolio/a54a3d10-8806-11ec-88e0-0278d35f1cce',\n",
" 'StackName': 'sm-project-sc-portfolio',\n",
" 'Description': 'Create Service Catalog products as SageMaker project templates for various re-usable components\\n',\n",
" 'Parameters': [{'ParameterKey': 'SCPortfolioPrincipalRoleArn',\n",
" 'ParameterValue': 'arn:aws:iam::018763497627:role/mod-6297809195fe4845-SageMakerExecutionRole-1A1EDDFAJER7W'},\n",
" {'ParameterKey': 'SCProductLaunchRoleArn',\n",
" 'ParameterValue': 'AmazonSageMakerServiceCatalogProductsLaunchRole'}],\n",
" 'CreationTime': datetime.datetime(2022, 2, 7, 11, 11, 6, 712000, tzinfo=tzlocal()),\n",
" 'RollbackConfiguration': {},\n",
" 'StackStatus': 'CREATE_COMPLETE',\n",
" 'DisableRollback': True,\n",
" 'NotificationARNs': [],\n",
" 'Capabilities': ['CAPABILITY_NAMED_IAM'],\n",
" 'Outputs': [{'OutputKey': 'ProductName',\n",
" 'OutputValue': 'Automated Feature Transformation and Ingestion Pipeline v1.0',\n",
" 'Description': 'Service Catalog data science product name'},\n",
" {'OutputKey': 'ProvisioningArtifactIds',\n",
" 'OutputValue': 'pa-nhut374mh6ie4',\n",
" 'Description': 'Service Catalog data science provisioning artifact Ids'},\n",
" {'OutputKey': 'AmazonSageMakerExecutionRolePolicyArn',\n",
" 'OutputValue': 'arn:aws:iam::018763497627:policy/sm-project-sc-portfolio-AmazonSageMakerExecutionRolePolicy-1XRAPW3P21850',\n",
" 'Description': 'Managed policy for Amazon SageMaker execution role with permissions to run the notebooks with Feature Store ingestion experiments'},\n",
" {'OutputKey': 'PortfolioId',\n",
" 'OutputValue': 'port-vm7gex7cjqwlw',\n",
" 'Description': 'Service Catalog data science portfolio Id'},\n",
" {'OutputKey': 'AmazonSageMakerExecutionRoleName',\n",
" 'OutputValue': 'mod-6297809195fe4845-SageMakerExecutionRole-1A1EDDFAJER7W',\n",
" 'Description': 'Name of the Amazon SageMaker execution role'},\n",
" {'OutputKey': 'ProductId',\n",
" 'OutputValue': 'prod-5polw6pb5aft2',\n",
" 'Description': 'Service Catalog data science product Id'},\n",
" {'OutputKey': 'ProvisioningArtifactNames',\n",
" 'OutputValue': 'Automated Feature Transformation and Ingestion Pipeline v1.0',\n",
" 'Description': 'Service Catalog data science provisioning artifact names'},\n",
" {'OutputKey': 'FSIngestionProductPolicyArn',\n",
" 'OutputValue': 'arn:aws:iam::018763497627:policy/sm-project-sc-portfolio-AmazonSageMakerServiceCatalogFSIngestionProductPolicy-1D50I8CJWCHGM',\n",
" 'Description': 'Managed policy for AmazonSageMakerServiceCatalogProductsLaunchRole to launch an Feature Store ingestion product'}],\n",
" 'Tags': [],\n",
" 'EnableTerminationProtection': False,\n",
" 'DriftInformation': {'StackDriftStatus': 'NOT_CHECKED'}}],\n",
" 'ResponseMetadata': {'RequestId': 'e8bdf448-c392-4283-bd74-ec794148de4c',\n",
" 'HTTPStatusCode': 200,\n",
" 'HTTPHeaders': {'x-amzn-requestid': 'e8bdf448-c392-4283-bd74-ec794148de4c',\n",
" 'content-type': 'text/xml',\n",
" 'content-length': '4084',\n",
" 'date': 'Mon, 07 Feb 2022 11:15:25 GMT'},\n",
" 'RetryAttempts': 0}}"
"cf = boto3.client(\"cloudformation\")\n",
"r = cf.describe_stacks(StackName=\"sm-project-sc-portfolio\")\n",
"SageMaker 프로젝트에 대한 매개변수 설정:"
"sm = boto3.client(\"sagemaker\")\n",
"provisioning_artifact_ids = [v for v in r[\"Stacks\"][0][\"Outputs\"] if v[\"OutputKey\"] == \"ProvisioningArtifactIds\"][0][\"OutputValue\"]\n",
"product_id = [v for v in r[\"Stacks\"][0][\"Outputs\"] if v[\"OutputKey\"] == \"ProductId\"][0][\"OutputValue\"]\n",
"project_name = f\"s3-fs-ingest-{strftime('%d-%H-%M-%S', gmtime())}\"\n",
"project_parameters = [\n",
" {\n",
" 'Key': 'PipelineDescription',\n",
" 'Value': 'Feature Store ingestion pipeline'\n",
" },\n",
" {\n",
" 'Key': 'DataWranglerFlowUrl',\n",
" 'Value': dw_flow_file_url\n",
" },\n",
" {\n",
" 'Key': 'DataWranglerOutputName',\n",
" 'Value': dw_output_name\n",
" },\n",
" {\n",
" 'Key': 'S3DataPrefix',\n",
" 'Value': s3_input_data_prefix\n",
" },\n",
" {\n",
" 'Key': 'FeatureGroupName',\n",
" 'Value': feature_group_name\n",
" },\n",
" {\n",
" 'Key': 'PipelineNamePrefix',\n",
" 'Value': pipeline_name_prefix\n",
" },\n",
" ]"
"[{'Key': 'PipelineDescription', 'Value': 'Feature Store ingestion pipeline'},\n",
" {'Key': 'DataWranglerFlowUrl',\n",
" 'Value': 's3://sagemaker-ap-northeast-2-018763497627/feature-store-ingestion-pipeline/dw-flow/dw2-flow-07-11-12-33-f12df440.flow'},\n",
" {'Key': 'DataWranglerOutputName',\n",
" 'Value': 'c8880ed5-b8a0-4375-899b-1c4d86828152.default'},\n",
" {'Key': 'S3DataPrefix',\n",
" 'Value': 'sagemaker-ap-northeast-2-018763497627/feature-store-ingestion-pipeline/landing-zone/'},\n",
" {'Key': 'FeatureGroupName', 'Value': 'FG-abalone-07-11-12-33-f12df440'},\n",
" {'Key': 'PipelineNamePrefix', 'Value': 's3-fs-ingest-pipeline'}]"
"마지막으로 서비스 카탈로그 제품 템플릿에서 SageMaker 프로젝트를 생성합니다."
"{'ProjectArn': 'arn:aws:sagemaker:ap-northeast-2:018763497627:project/s3-fs-ingest-07-11-15-27', 'ProjectId': 'p-opu00heyaxaa', 'ResponseMetadata': {'RequestId': '5680400e-1d3a-4a04-af13-ef428380f0c5', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '5680400e-1d3a-4a04-af13-ef428380f0c5', 'content-type': 'application/x-amz-json-1.1', 'content-length': '124', 'date': 'Mon, 07 Feb 2022 11:15:29 GMT'}, 'RetryAttempts': 0}}\n"
"# create SageMaker project\n",
"r = sm.create_project(\n",
" ProjectName=project_name,\n",
" ProjectDescription=\"Feature Store ingestion from S3\",\n",
" ServiceCatalogProvisioningDetails={\n",
" 'ProductId': product_id,\n",
" 'ProvisioningArtifactId': provisioning_artifact_ids,\n",
" 'ProvisioningParameters': project_parameters\n",
" },\n",
"project_id = r[\"ProjectId\"]"
💡 프로젝트 생성이 완료될 때까지 기다리기 \n",
"클라우드 포메이션 콘솔로 이동해서 확인하셔도 되고, 세이지 메이커 프로젝트 화면에서 확인도 가능합니다.\n",
"### 옵션 2 섹션 끝\n",
"# 5. 세이지 메이커 사용자 템플릿을 통한 프로젝트 분석 (데이타 수집 프로젝트)\n",
"- 아래의 스튜디오 화면은 위의 UI 에서 프로젝트 생성으로 방식으로 들어가시면 됩니다. \n",
"- 이미 옵션2를 실행 하셨으면 Project 한 개가 보입니다. 클릭하시고 보세요."
"## 5.1. 프로젝트 템플릿은 자동화된 데이터 변환 및 수집에 필요한 모든 리소스를 생성\n",
"- 지정된 S3 접두사에 새 데이터가 업로드될 때마다 AWS Lambda 함수를 시작하기 위한 [EventBridge 규칙](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-rules.html)\n",
"- SageMaker 파이프라인을 시작하는 AWS Lambda 함수\n",
"- DataWrangler 프로세서를 사용하여 처리 작업을 실행하는 SageMaker 파이프라인\n",
"- 업로드된 '.flow' 파일을 데이터 변환 워크플로와 함께 사용하는 DataWrangler 프로세서\n",
"## 5.2 시드 코드가 있는 CodeCommit 리포지토리\n",
"- 파이프라인 생성 및 파이프라인 파라미터 구성을 위한 모든 소스 코드는 [AWS CodeCommit](https://aws.amazon.com/codecommit/) 리포지토리로 제공됩니다. \n",
" - 코드는 완벽하게 작동하며 즉시 사용할 수 있습니다. 이 코드를 소유하고 요구 사항에 따라 파이프라인의 구성 또는 매개변수를 변경할 수 있습니다.\n",
"- 코드 작업을 시작하려면 저장소를 Studio 사용자의 홈 디렉토리에 복제해야 합니다.\n",

"- 소스 코드를 변경하고 CodeCommit 리포지토리로 푸시할 수 있습니다. 프로젝트는 또한 [AWS CodeBuild](https://aws.amazon.com/codebuild/) 단계를 시작하는 [AWS CodePipeline](https://aws.amazon.com/codepipeline/) CI/CD 파이프라인을 제공합니다. \n",
"- 저장소에 새 커밋이 있을때 마다 빌드는 저장소에서 코드를 가져오고 `create_pipeline` 함수(파일 `build.py`)를 호출합니다. \n",
"- 기존 코드를 변경하거나 `pipeline.py` 파일의 `pipeline.create_pipeline`에 고유한 코드를 제공할 수 있습니다. \n",
" - 경로: amazon-sagemaker-reusable-components-kr/project-seed-code/s3-fs-ingestion/pipeline.py\n",
" # create DW processor\n",
" processor = Processor(\n",
" role=execution_role,\n",
" image_uri=container_uri,\n",
" instance_count=p_processing_instance_count,\n",
" instance_type=p_processing_instance_type,\n",
" volume_size_in_gb=p_processing_volume_size,\n",
" sagemaker_session=sagemaker_session,\n",
" )\n",
" step_process = ProcessingStep(\n",
" name=\"datawrangler-processing-to-feature-store\",\n",
" processor=processor,\n",
" inputs=[flow_input] + [data_input],\n",
" outputs=[processing_job_output],\n",
" job_arguments=[f\"--output-config '{json.dumps(output_config)}'\"],\n",
" )\n",
" pipeline = Pipeline(\n",
" name=pipeline_name,\n",
" parameters=[\n",
" p_processing_instance_type, \n",
" p_processing_instance_count,\n",
" p_processing_volume_size,\n",
" p_flow_output_name,\n",
" p_input_flow,\n",
" p_input_data,\n",
" p_feature_group_name\n",
" ],\n",
" steps=[step_process],\n",
" sagemaker_session=sagemaker_session\n",
" )\n",
" response = pipeline.upsert(\n",
" role_arn=execution_role,\n",
" description=pipeline_description,\n",
" tags=[\n",
" {'Key': 'sagemaker:project-name', 'Value': project_name },\n",
" {'Key': 'sagemaker:project-id', 'Value': project_id }\n",
" ],\n",
" )\n",
"- 기본 코드는 Data Wrangler 프로세서를 사용하여 SageMaker 파이프라인을 구성하고 파이프라인을 upserts합니다."
"## 5.3. SageMaker 파이프라인\n",
"- 이 프로젝트는 Data Wrangler 프로세서를 사용한 하나의 처리 단계로 구성된 SageMaker 파이프라인을 제공합니다. \n",
"- 파이프라인은 지정된 Data Wrangler `.flow` 파일에 포함된 변환을 수행하고 피쳐 저장소의 지정된 피쳐 그룹에서 변환된 피쳐를 수집합니다.\n",
"- 이 파이프라인은 지정된 S3 위치에 새 파일이 업로드될 때마다 Lambda 함수에 의해 시작됩니다. \n",
"- 파이프라인은 프로젝트에 연결되어 있으며 프로젝트 세부정보 페이지의 **파이프라인** 탭에서 사용할 수 있습니다.\n",

"여기에서 파이프라인 그래프, 매개변수, 설정 및 실행 기록을 볼 수 있습니다.\n",

"**실행 시작**을 클릭하고 파이프라인 매개변수를 제공하여 Studio에서 수동으로 새 실행을 시작할 수도 있습니다.\n",

"# 6. 자동화 파이프라인 테스트\n",
"배포된 데이터 변환 및 피쳐 저장소 수집 파이프라인을 테스트하려면 다음 단계를 수행하십시오.\n",
"1. 모니터링되는 S3 접두사 위치에 데이터 파일을 업로드합니다. 그러면 데이터 파이프라인을 통해 데이터 변환 및 수집이 시작됩니다.\n",
"1. 파이프라인 실행 모니터링\n",
"1. 피쳐 그룹에 로드된 데이터 확인"
"⭐ EventBridge 규칙은 'PutObject' 및 'CompleteMultipartUpload'라는 두 가지 S3 이벤트를 모니터링합니다. 두 S3 버킷 간에 객체를 복사하면 EventBrige 규칙이 시작되지 않습니다."
"다음 S3 `PUT` 이벤트는 새 파이프라인 실행을 시작하는 Lambda 함수를 시작합니다.\n",
"람다 함수는 파이프라인을 실행 함\n",
"## 6.1. 데이터를 S3 버킷에 업로드"
"file_name = f\"abalone-{strftime('%d-%H-%M-%S', gmtime())}.csv\""
"!aws s3 cp {abalone_dataset_local_url} s3://{s3_input_data_prefix}{file_name}"
"## 6.2. 파이프라인 실행을 모니터링"
" project_id\n",
" project_name\n",
"except NameError:\n",
" raise Exception(\"[ERROR]: project_id or project_name variables are not set\")\n",
" \n",
"if project_id is None or project_name is None:\n",
" raise Exception(\"[ERROR]: project_id or project_name variables are not set\")"
"# Get the the project data\n",
"r = sm.describe_project(ProjectName=project_name)\n",
"# Get the pipeline prefix from the project parameters\n",
"pipeline_name_prefix = [p for p in r[\"ServiceCatalogProvisioningDetails\"][\"ProvisioningParameters\"] if p[\"Key\"] == \"PipelineNamePrefix\"][0][\"Value\"]"
"# set the pipeline name\n",
"s3_to_fs_pipeline_name = f\"{pipeline_name_prefix}-{project_id}\"\n",
"%store s3_to_fs_pipeline_name"
"# check pipeline execution \n",
"summaries = sm.list_pipeline_executions(PipelineName=s3_to_fs_pipeline_name).get('PipelineExecutionSummaries')\n",
"latest_execution = sm.list_pipeline_executions(PipelineName=s3_to_fs_pipeline_name).get('PipelineExecutionSummaries')[0].get('PipelineExecutionArn')\n",
"print (latest_execution)"
"# Wait for pipeline execution to complete 'Executing' status\n",
"while sm.describe_pipeline_execution(PipelineExecutionArn=latest_execution)[\"PipelineExecutionStatus\"] == \"Executing\":\n",
" print('Pipeline is in Executing status...')\n",
" time.sleep(30)\n",
" \n",
"print('Pipeline is done Executing')\n",
"또는 Studio의 파이프라인 위젯 내에서 파이프라인 실행을 모니터링할 수 있습니다.\n",
"## 7. 피쳐 스토어에서 데이터 확인\n",
"실행이 완료되면 데이터가 피쳐 그룹에 로드되었는지 확인할 수 있습니다."
"피쳐 그룹 오브젝트를 생성"
"feature_store_session = Session()\n",
"feature_group = FeatureGroup(\n",
" name=feature_group_name, \n",
" sagemaker_session=feature_store_session\n",
"# Build SQL query to features group\n",
"fs_query = feature_group.athena_query()\n",
"query_string = f'SELECT * FROM \"{fs_query.table_name}\"'\n",
"print(f'Prepared query {query_string}')\n",
"# Run Athena query. The output is loaded to a Pandas dataframe.\n",
" query_string=query_string, \n",
" output_location=f\"s3://{s3_fs_query_output_prefix}\"\n",
"data_df = fs_query.as_dataframe()"
"The `DataFrame` contains now all features from the feature group:"
"## 8. Next\n",
" [`99-clean-up` notebook](99-clean-up.ipynb) 로 이동하셔서 리소스를 제거 하세요."
"# A. 에러 케이스 및 트러블 슈팅"
"## A.1. Project 실행시 에러"
"## 해결\n",
"- AmazonSageMakerServiceCatalogProductsLaunchRole 에 LambdaFull 추가 함\n"
"## A.2 랭글러 에러\n",
"- 원본의 에러 발생은 데이터 소스가 달라서 밠생 함.\n",
"- [해결] 직접 데이터 랭글러 데이터 플로우를 생성하여 해결 함.\n",
