{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Data Scientist - Feature Engineering\n", "\n", "This notebook demonstrates a sample of the activities and artifacts prepared by a Data Scientist to establish the Feature Engineering pipelines.\n", "\n", "***\n", "*This notebook should work well with the Python 3 (Data Science) kernel in SageMaker Studio*\n", "***" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Environment setup\n", "Import libraries, setup logging, and define few variables. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import logging\n", "import json\n", "import sagemaker\n", "import string\n", "\n", "from pathlib import Path\n", "from sagemaker.utils import name_from_base\n", "from sagemaker.feature_store.feature_group import FeatureGroup\n", "import shutil\n", "\n", "from utils.feature_store_utils import format_feature_defs" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%load_ext autoreload\n", "%autoreload 2" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Set up a logger" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "logger = logging.getLogger(\"__name__\")\n", "logger.setLevel(logging.INFO)\n", "logger.addHandler(logging.StreamHandler())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Define SageMaker and Boto3 sessions and few additional parameters" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sagemaker_session = sagemaker.Session()\n", "boto_session = sagemaker_session.boto_session\n", "sagemaker_client = sagemaker_session.sagemaker_client\n", "region = sagemaker_session.boto_region_name\n", "role = sagemaker.get_execution_role()\n", "\n", "bucket = sagemaker_session.default_bucket()\n", "prefix = \"mlops-demo\"\n", "feature_eng_base_path = Path(\"feature_engineering\")\n", "feature_eng_base_path.mkdir(exist_ok=True)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "feat_eng_pipelines_path = feature_eng_base_path / \"pipelines\"\n", "feat_eng_pipelines_path.mkdir(exist_ok=True)\n", "\n", "feat_eng_conf_path = feature_eng_base_path / \"configurations\"\n", "feat_eng_conf_path.mkdir(exist_ok=True)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "You'll store the offline FeatureStore in a prefix in the default S3 bucket" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "feature_store_offline_s3_uri = f\"s3://{bucket}/{prefix}/fs/\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Retrieve the URI of the raw data files stored by [DataScientist-00-DataDownload.ipynb](DataScientist-00-DataDownload.ipynb)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%store -r claims_uri\n", "%store -r customers_uri" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Data Wrangler" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Editing the template `flow` files to point at the correct dataset in S3" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "with (feature_eng_base_path / \"claims_flow_template\").open(\"r\") as f, (\n", " feat_eng_pipelines_path / \"claims.flow\"\n", ").open(\"w\") as g:\n", " variables = {\"data_uri\": claims_uri}\n", " template = string.Template(f.read())\n", " claims_flow = template.substitute(variables)\n", " claims_flow = json.loads(claims_flow)\n", " json.dump(claims_flow, g, indent=2)\n", " logger.info(\"Created claims.flow \")\n", "\n", "with (feature_eng_base_path / \"customers_flow_template\").open(\"r\") as f, (\n", " feat_eng_pipelines_path / \"customers.flow\"\n", ").open(\"w\") as g:\n", " variables = {\"data_uri\": customers_uri}\n", " template = string.Template(f.read())\n", " claims_flow = template.substitute(variables)\n", " claims_flow = json.loads(claims_flow)\n", " json.dump(claims_flow, g, indent=2)\n", " logger.info(\"Created customers.flow \")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can review the feature engineering:\n", "- Let's look at the feature engineering for the [Claims Dataset](feature_engineering/claims.flow)\n", "\n", "- Let's look at the feature engineering for the [Customers Dataset](feature_engineering/customers.flow)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Feature Store" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "For development purposes, you could create the *Feature Groups* using the Data Wrangle export option to generate a Jupyter Notebook for each flow file.\n", "In this case instead, you'll still generate the Notebooks, but you'll use to extract the `column_schemas` that we need for the *Feature Groups*.\n", "\n", "\n", "You'll encode the relevant feature groups configurations, including the `column_schemas`, in `*.fg.json` files in `feature_engineering folder`. \n", "These configurations can be parsed by `get_fg_conf()` (in [feature_store_utils.py](utils/feature_store_utils.py)) and can be included in the CI/CD. \n", "Here's a template of a `*.fg.json` file\n", "\n", "```\n", "{\n", " \"feature_group_name\": \"customers\",\n", " \"event_time_feature_name\": \"event_time\",\n", " \"feature_group_name\": \"customers\",\n", " \"record_identifier_feature_name\": \"policy_id\",\n", " \"disable_glue_table_creation\": false,\n", " \"enable_online_store\": false,\n", " \"column_schemas\":\n", "}\n", "```" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "featurestore_runtime = boto_session.client(\n", " service_name=\"sagemaker-featurestore-runtime\", region_name=region\n", ")\n", "\n", "feature_store_session = sagemaker.Session(\n", " boto_session=boto_session,\n", " sagemaker_client=sagemaker_client,\n", " sagemaker_featurestore_runtime_client=featurestore_runtime,\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Claims Feature Group" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# claims_column_schemas = <> # <--- Copy here the column_schemas from the Jupyter Notebook generated with DataWrangler\n", "claims_column_schemas = [\n", " {\"name\": \"policy_id\", \"type\": \"long\"},\n", " {\"name\": \"incident_severity\", \"type\": \"long\"},\n", " {\"name\": \"num_vehicles_involved\", \"type\": \"long\"},\n", " {\"name\": \"num_injuries\", \"type\": \"long\"},\n", " {\"name\": \"num_witnesses\", \"type\": \"long\"},\n", " {\"name\": \"police_report_available\", \"type\": \"long\"},\n", " {\"name\": \"injury_claim\", \"type\": \"float\"},\n", " {\"name\": \"vehicle_claim\", \"type\": \"float\"},\n", " {\"name\": \"total_claim_amount\", \"type\": \"float\"},\n", " {\"name\": \"incident_month\", \"type\": \"long\"},\n", " {\"name\": \"incident_day\", \"type\": \"long\"},\n", " {\"name\": \"incident_dow\", \"type\": \"long\"},\n", " {\"name\": \"incident_hour\", \"type\": \"long\"},\n", " {\"name\": \"fraud\", \"type\": \"long\"},\n", " {\"name\": \"driver_relationship_self\", \"type\": \"long\"},\n", " {\"name\": \"driver_relationship_na\", \"type\": \"long\"},\n", " {\"name\": \"driver_relationship_spouse\", \"type\": \"long\"},\n", " {\"name\": \"driver_relationship_child\", \"type\": \"long\"},\n", " {\"name\": \"driver_relationship_other\", \"type\": \"long\"},\n", " {\"name\": \"incident_type_collision\", \"type\": \"long\"},\n", " {\"name\": \"incident_type_breakin\", \"type\": \"long\"},\n", " {\"name\": \"incident_type_theft\", \"type\": \"long\"},\n", " {\"name\": \"collision_type_front\", \"type\": \"long\"},\n", " {\"name\": \"collision_type_rear\", \"type\": \"long\"},\n", " {\"name\": \"collision_type_side\", \"type\": \"long\"},\n", " {\"name\": \"collision_type_na\", \"type\": \"long\"},\n", " {\"name\": \"authorities_contacted_police\", \"type\": \"long\"},\n", " {\"name\": \"authorities_contacted_none\", \"type\": \"long\"},\n", " {\"name\": \"authorities_contacted_ambulance\", \"type\": \"long\"},\n", " {\"name\": \"authorities_contacted_fire\", \"type\": \"long\"},\n", " {\"name\": \"event_time\", \"type\": \"float\"},\n", "]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can now build the Feature Group configuration dictionary" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "claim_fg_props = dict(\n", " FeatureGroupName=\"dev-claims\",\n", " FeatureDefinitions=format_feature_defs(claims_column_schemas),\n", " RecordIdentifierFeatureName=\"policy_id\",\n", " EventTimeFeatureName=\"event_time\",\n", " OnlineStoreConfig={\n", " \"EnableOnlineStore\": False,\n", " },\n", " OfflineStoreConfig={\n", " \"S3StorageConfig\": {\n", " \"S3Uri\": feature_store_offline_s3_uri,\n", " },\n", " \"DisableGlueTableCreation\": False,\n", " },\n", " Description=\"Claim feature group\",\n", " Tags=[\n", " {\"Key\": \"stage\", \"Value\": \"dev\"},\n", " ],\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "try:\n", " response = sagemaker_client.create_feature_group(**claim_fg_props, RoleArn=role)\n", "except sagemaker_client.exceptions.ResourceInUse:\n", " logger.exception(\"The FeatureGroup exist already\", exc_info=False)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "For ease of use, you can also create a FeatureGroup object using SageMaker SDK" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "claims_feature_group = FeatureGroup(\n", " name=claim_fg_props[\"FeatureGroupName\"],\n", " sagemaker_session=feature_store_session,\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "claims_feature_group.describe()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Customers Feature Group" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# customers_column_schemas = <> # <--- Copy here the column_schemas from the Jupyter Notebook generated with DataWrangler\n", "customers_column_schemas = [\n", " {\"name\": \"policy_id\", \"type\": \"long\"},\n", " {\"name\": \"customer_age\", \"type\": \"long\"},\n", " {\"name\": \"customer_education\", \"type\": \"long\"},\n", " {\"name\": \"months_as_customer\", \"type\": \"long\"},\n", " {\"name\": \"policy_deductable\", \"type\": \"long\"},\n", " {\"name\": \"policy_annual_premium\", \"type\": \"long\"},\n", " {\"name\": \"policy_liability\", \"type\": \"long\"},\n", " {\"name\": \"auto_year\", \"type\": \"long\"},\n", " {\"name\": \"num_claims_past_year\", \"type\": \"long\"},\n", " {\"name\": \"num_insurers_past_5_years\", \"type\": \"long\"},\n", " {\"name\": \"customer_gender_male\", \"type\": \"long\"},\n", " {\"name\": \"customer_gender_female\", \"type\": \"long\"},\n", " {\"name\": \"policy_state_ca\", \"type\": \"long\"},\n", " {\"name\": \"policy_state_wa\", \"type\": \"long\"},\n", " {\"name\": \"policy_state_az\", \"type\": \"long\"},\n", " {\"name\": \"policy_state_or\", \"type\": \"long\"},\n", " {\"name\": \"policy_state_nv\", \"type\": \"long\"},\n", " {\"name\": \"policy_state_id\", \"type\": \"long\"},\n", " {\"name\": \"event_time\", \"type\": \"float\"},\n", "]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can now build the Feature Group configuration dictionary" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "customers_fg_props = dict(\n", " FeatureGroupName=\"dev-customers\",\n", " FeatureDefinitions=format_feature_defs(customers_column_schemas),\n", " RecordIdentifierFeatureName=\"policy_id\",\n", " EventTimeFeatureName=\"event_time\",\n", " OnlineStoreConfig={\n", " \"EnableOnlineStore\": False,\n", " },\n", " OfflineStoreConfig={\n", " \"S3StorageConfig\": {\n", " \"S3Uri\": feature_store_offline_s3_uri,\n", " },\n", " \"DisableGlueTableCreation\": False,\n", " },\n", " Description=\"Customers feature group\",\n", " Tags=[\n", " {\"Key\": \"stage\", \"Value\": \"dev\"},\n", " ],\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "try:\n", " response = sagemaker_client.create_feature_group(**customers_fg_props, RoleArn=role)\n", " logger.info(\"FeatureGroup created\")\n", "except sagemaker_client.exceptions.ResourceInUse:\n", " logger.exception(\"The FeatureGroup exist already\", exc_info=False)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "For ease of use, you can also create a FeatureGroup object using SageMaker SDK" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "customers_feature_group = FeatureGroup(\n", " name=customers_fg_props[\"FeatureGroupName\"],\n", " sagemaker_session=feature_store_session,\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "customers_feature_group.describe()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Data Processing Pipelines" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Prepare a subfolder in the `feature_engineering` folder to store the script with the pipeline definition and any additional library we need." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "shutil.copy(\"utils/parse_flow.py\", feat_eng_pipelines_path)\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%writefile {feat_eng_pipelines_path}/feature_ingestion_pipeline.py\n", "\n", "import json\n", "import sagemaker\n", "\n", "from sagemaker.processing import (\n", " FeatureStoreOutput,\n", " ProcessingInput,\n", " ProcessingJob,\n", " ProcessingOutput,\n", " Processor,\n", ")\n", "from sagemaker.utils import name_from_base\n", "from sagemaker.workflow.parameters import ParameterInteger, ParameterString\n", "from sagemaker.workflow.pipeline import Pipeline\n", "from sagemaker.workflow.steps import ProcessingStep\n", "from sagemaker.wrangler.processing import DataWranglerProcessor\n", "\n", "from .parse_flow import FlowFile\n", "\n", "def get_pipeline(\n", " role: str,\n", " pipeline_name: str,\n", " prefix: str,\n", " sagemaker_session: sagemaker.Session=None,\n", " **kwarg,\n", ")-> Pipeline:\n", " \"\"\"[summary]\n", "\n", "\n", " Args:\n", " role ([type]): [description]\n", " pipeline_name ([type]): [description]\n", " sagemaker_session ([type], optional): [description]. Defaults to None.\n", "\n", " Returns:\n", " Pipeline: [description]\n", " \"\"\"\n", " flow_file_path = kwarg[\"flow_file_path\"]\n", " feature_group_name = kwarg[\"feature_group_name\"]\n", "\n", " bucket = sagemaker_session.default_bucket()\n", " flow_file = FlowFile(flow_file_path)\n", "\n", " instance_count = ParameterInteger(name=\"InstanceCount\", default_value=1)\n", " instance_type = ParameterString(name=\"InstanceType\", default_value=\"ml.m5.4xlarge\")\n", " input_data_uri = ParameterString(name=\"InputDataURI\")\n", " \n", " \n", "\n", " flow_file_uri = sagemaker.s3.S3Uploader.upload(\n", " local_path=flow_file_path,\n", " desired_s3_uri=f\"s3://{bucket}/{prefix}/feature_ingestion/{name_from_base(pipeline_name)}\",\n", " sagemaker_session=sagemaker_session,\n", " )\n", "\n", " output_content_type = \"CSV\"\n", " output_config = {flow_file.output_name: {\"content_type\": output_content_type}}\n", " job_argument = [f\"--output-config '{json.dumps(output_config)}'\"]\n", "\n", " data_sources = [\n", " ProcessingInput(\n", " input_name=\"InputData\",\n", " source=input_data_uri,\n", " destination=f\"/opt/ml/processing/{flow_file.input_name}\",\n", " )\n", " ]\n", "\n", " outputs = [\n", " ProcessingOutput(\n", " output_name=flow_file.output_name,\n", " app_managed=True,\n", " feature_store_output=FeatureStoreOutput(\n", " feature_group_name=feature_group_name\n", " ),\n", " )\n", " ]\n", "\n", " data_wrangler_processor = DataWranglerProcessor(\n", " role=role,\n", " data_wrangler_flow_source=flow_file_uri,\n", " instance_count=instance_count,\n", " instance_type=instance_type,\n", " sagemaker_session=sagemaker_session,\n", " )\n", "\n", " data_wrangler_step = ProcessingStep(\n", " name=\"data-wrangler-step\",\n", " processor=data_wrangler_processor,\n", " inputs=data_sources,\n", " outputs=outputs,\n", " job_arguments=job_argument,\n", " )\n", "\n", " pipeline = Pipeline(\n", " name=pipeline_name,\n", " parameters=[\n", " instance_count,\n", " instance_type,\n", " input_data_uri,\n", " ],\n", " steps=[data_wrangler_step],\n", " sagemaker_session=sagemaker_session,\n", " )\n", "\n", " return pipeline\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "You can now import the function that create the pipeline object. Thanks to the `autoreload` extension, we can update the script and rerun the call above, and the function will be automatically reloaded." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from feature_engineering.pipelines.feature_ingestion_pipeline import get_pipeline" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Claims feature ingestion pipeline" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "claims_pipeline_args = {\n", " \"flow_file_path\": (feat_eng_pipelines_path / \"claims.flow\").as_posix(),\n", " \"feature_group_name\": claims_feature_group.name,\n", "}\n", "claims_pipeline = get_pipeline(\n", " role=role,\n", " pipeline_name=\"dev-claims-pipeline\",\n", " sagemaker_session=sagemaker_session,\n", " prefix=prefix,\n", " **claims_pipeline_args\n", ")\n", "json.loads(claims_pipeline.definition())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Customers feature ingestion pipeline" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "customers_pipeline_conf = {\n", " \"flow_file_path\": (feat_eng_pipelines_path / \"customers.flow\").as_posix(),\n", " \"feature_group_name\": customers_feature_group.name,\n", "}\n", "customers_pipeline = get_pipeline(\n", " role=role,\n", " pipeline_name=\"dev-customers-pipeline\",\n", " prefix=prefix,\n", " sagemaker_session=sagemaker_session,\n", " **customers_pipeline_conf\n", ")\n", "json.loads(customers_pipeline.definition())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create the pipelines" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "try:\n", " claims_pipeline.update(\n", " role_arn=role,\n", " description=\"Claims feature ingestion pipeline\",\n", " )\n", " logging.info(\"Pipeline updated\")\n", "except:\n", " claims_pipeline.create(\n", " role_arn=role,\n", " description=\"Claims feature ingestion pipeline\",\n", " )\n", " logging.info(\"Pipeline created\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "try:\n", " customers_pipeline.update(\n", " role_arn=role,\n", " description=\"Claims feature ingestion pipeline\",\n", " )\n", " logging.info(\"Pipeline updated\")\n", "except:\n", " customers_pipeline.create(\n", " role_arn=role,\n", " description=\"Claims feature ingestion pipeline\",\n", " )\n", " logging.info(\"Pipeline created\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Run the pipelines" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "claims_pipeline_execution = claims_pipeline.start(\n", " parameters={\"InputDataURI\": claims_uri},\n", " execution_display_name=\"dev-run\",\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "claims_pipeline_execution.describe()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "customers_pipeline_execution = customers_pipeline.start(\n", " parameters={\"InputDataURI\": customers_uri},\n", " execution_display_name=\"dev-run\",\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "customers_pipeline_execution.describe()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Write configuration files for operationalization" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Feature Groups configurations" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "claims_fg_props_prod = dict(\n", " FeatureGroupName=\"mlops-demo-claims\",\n", " FeatureDefinitions=format_feature_defs(claims_column_schemas),\n", " RecordIdentifierFeatureName=\"policy_id\",\n", " EventTimeFeatureName=\"event_time\",\n", " OnlineStoreConfig={\n", " \"EnableOnlineStore\": True, # <-- In production we want the online store turned on\n", " },\n", " OfflineStoreConfig={\n", " \"S3StorageConfig\": {\n", " \"S3Uri\": feature_store_offline_s3_uri,\n", " },\n", " \"DisableGlueTableCreation\": False,\n", " },\n", " Description=\"Claim feature group\",\n", ")\n", "\n", "with (feat_eng_conf_path / \"claims.fg.json\").open(\"w\") as f:\n", " json.dump(claims_fg_props_prod, f, indent=2)\n", "\n", "\n", "customers_fg_props_prod = dict(\n", " FeatureGroupName=\"mlops-demo-customers\",\n", " FeatureDefinitions=format_feature_defs(customers_column_schemas),\n", " RecordIdentifierFeatureName=\"policy_id\",\n", " EventTimeFeatureName=\"event_time\",\n", " OnlineStoreConfig={\n", " \"EnableOnlineStore\": True, # <-- In production we want the online store turned on\n", " },\n", " OfflineStoreConfig={\n", " \"S3StorageConfig\": {\n", " \"S3Uri\": feature_store_offline_s3_uri,\n", " },\n", " \"DisableGlueTableCreation\": False,\n", " },\n", " Description=\"Customers feature group\",\n", ")\n", "\n", "\n", "with (feat_eng_conf_path / \"customers.fg.json\").open(\"w\") as f:\n", " json.dump(customers_fg_props_prod, f, indent=2)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Pipelines configurations" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "claims_config = dict(\n", " pipeline_name=\"claims-preprocessing\",\n", " code_file_path=\"pipelines/feature_ingestion_pipeline.py\",\n", " pipeline_configuration=claims_pipeline_args,\n", ")\n", "with (feat_eng_conf_path / \"claims.pipeline.json\").open(\"w\") as f:\n", " json.dump(claims_config, f, indent=2)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "customers_config = dict(\n", " pipeline_name=\"customers-preprocessing\",\n", " code_file_path=\"pipelines/feature_ingestion_pipeline.py\",\n", " pipeline_configuration=customers_pipeline_conf,\n", ")\n", "with (feat_eng_conf_path / \"customers.pipeline.json\").open(\"w\") as f:\n", " json.dump(customers_config, f, indent=2)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Clean-up" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# customers_pipeline.delete()\n", "# claims_pipeline.delete()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# claims_feature_group.delete()\n", "# customers_feature_group.delete()" ] } ], "metadata": { "instance_type": "ml.t3.medium", "kernelspec": { "display_name": "Python 3 (Data Science)", "language": "python", "name": "python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:ap-southeast-1:492261229750:image/datascience-1.0" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.10" } }, "nbformat": 4, "nbformat_minor": 4 }