# Data Scientist - Feature Engineering

This notebook demonstrates a sample of the activities and artifacts prepared by a Data Scientist to establish the Feature Engineering pipelines.

***
*This notebook should work well with the Python 3 (Data Science) kernel in SageMaker Studio*
***

#### Environment setup
Import libraries, setup logging, and define few variables. 

In [None]:
import logging
import json
import sagemaker
import string

from pathlib import Path
from sagemaker.utils import name_from_base
from sagemaker.feature_store.feature_group import FeatureGroup
import shutil

from utils.feature_store_utils import format_feature_defs

In [None]:
%load_ext autoreload
%autoreload 2

Set up a logger

In [None]:
logger = logging.getLogger("__name__")
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())

Define SageMaker and Boto3 sessions and few additional parameters

In [None]:
sagemaker_session = sagemaker.Session()
boto_session = sagemaker_session.boto_session
sagemaker_client = sagemaker_session.sagemaker_client
region = sagemaker_session.boto_region_name
role = sagemaker.get_execution_role()

bucket = sagemaker_session.default_bucket()
prefix = "mlops-demo"
feature_eng_base_path = Path("feature_engineering")
feature_eng_base_path.mkdir(exist_ok=True)

In [None]:
feat_eng_pipelines_path = feature_eng_base_path / "pipelines"
feat_eng_pipelines_path.mkdir(exist_ok=True)

feat_eng_conf_path = feature_eng_base_path / "configurations"
feat_eng_conf_path.mkdir(exist_ok=True)

You'll store the offline FeatureStore in a prefix in the default S3 bucket

In [None]:
feature_store_offline_s3_uri = f"s3://{bucket}/{prefix}/fs/"

Retrieve the URI of the raw data files stored by [DataScientist-00-DataDownload.ipynb](DataScientist-00-DataDownload.ipynb).

In [None]:
%store -r claims_uri
%store -r customers_uri

## Data Wrangler

Editing the template `flow` files to point at the correct dataset in S3

In [None]:
with (feature_eng_base_path / "claims_flow_template").open("r") as f, (
 feat_eng_pipelines_path / "claims.flow"
).open("w") as g:
 variables = {"data_uri": claims_uri}
 template = string.Template(f.read())
 claims_flow = template.substitute(variables)
 claims_flow = json.loads(claims_flow)
 json.dump(claims_flow, g, indent=2)
 logger.info("Created claims.flow ")

with (feature_eng_base_path / "customers_flow_template").open("r") as f, (
 feat_eng_pipelines_path / "customers.flow"
).open("w") as g:
 variables = {"data_uri": customers_uri}
 template = string.Template(f.read())
 claims_flow = template.substitute(variables)
 claims_flow = json.loads(claims_flow)
 json.dump(claims_flow, g, indent=2)
 logger.info("Created customers.flow ")

We can review the feature engineering:
- Let's look at the feature engineering for the [Claims Dataset](feature_engineering/claims.flow)

- Let's look at the feature engineering for the [Customers Dataset](feature_engineering/customers.flow)

## Feature Store

For development purposes, you could create the *Feature Groups* using the Data Wrangle export option to generate a Jupyter Notebook for each flow file.
In this case instead, you'll still generate the Notebooks, but you'll use to extract the `column_schemas` that we need for the *Feature Groups*.


You'll encode the relevant feature groups configurations, including the `column_schemas`, in `*.fg.json` files in `feature_engineering folder`. 
These configurations can be parsed by `get_fg_conf()` (in [feature_store_utils.py](utils/feature_store_utils.py)) and can be included in the CI/CD. 
Here's a template of a `*.fg.json` file

```
{
 "feature_group_name": "customers",
 "event_time_feature_name": "event_time",
 "feature_group_name": "customers",
 "record_identifier_feature_name": "policy_id",
 "disable_glue_table_creation": false,
 "enable_online_store": false,
 "column_schemas":
}
```

In [None]:
featurestore_runtime = boto_session.client(
 service_name="sagemaker-featurestore-runtime", region_name=region
)

feature_store_session = sagemaker.Session(
 boto_session=boto_session,
 sagemaker_client=sagemaker_client,
 sagemaker_featurestore_runtime_client=featurestore_runtime,
)

### Claims Feature Group

In [None]:
# claims_column_schemas = <> # <--- Copy here the column_schemas from the Jupyter Notebook generated with DataWrangler
claims_column_schemas = [
 {"name": "policy_id", "type": "long"},
 {"name": "incident_severity", "type": "long"},
 {"name": "num_vehicles_involved", "type": "long"},
 {"name": "num_injuries", "type": "long"},
 {"name": "num_witnesses", "type": "long"},
 {"name": "police_report_available", "type": "long"},
 {"name": "injury_claim", "type": "float"},
 {"name": "vehicle_claim", "type": "float"},
 {"name": "total_claim_amount", "type": "float"},
 {"name": "incident_month", "type": "long"},
 {"name": "incident_day", "type": "long"},
 {"name": "incident_dow", "type": "long"},
 {"name": "incident_hour", "type": "long"},
 {"name": "fraud", "type": "long"},
 {"name": "driver_relationship_self", "type": "long"},
 {"name": "driver_relationship_na", "type": "long"},
 {"name": "driver_relationship_spouse", "type": "long"},
 {"name": "driver_relationship_child", "type": "long"},
 {"name": "driver_relationship_other", "type": "long"},
 {"name": "incident_type_collision", "type": "long"},
 {"name": "incident_type_breakin", "type": "long"},
 {"name": "incident_type_theft", "type": "long"},
 {"name": "collision_type_front", "type": "long"},
 {"name": "collision_type_rear", "type": "long"},
 {"name": "collision_type_side", "type": "long"},
 {"name": "collision_type_na", "type": "long"},
 {"name": "authorities_contacted_police", "type": "long"},
 {"name": "authorities_contacted_none", "type": "long"},
 {"name": "authorities_contacted_ambulance", "type": "long"},
 {"name": "authorities_contacted_fire", "type": "long"},
 {"name": "event_time", "type": "float"},
]

We can now build the Feature Group configuration dictionary

In [None]:
claim_fg_props = dict(
 FeatureGroupName="dev-claims",
 FeatureDefinitions=format_feature_defs(claims_column_schemas),
 RecordIdentifierFeatureName="policy_id",
 EventTimeFeatureName="event_time",
 OnlineStoreConfig={
 "EnableOnlineStore": False,
 },
 OfflineStoreConfig={
 "S3StorageConfig": {
 "S3Uri": feature_store_offline_s3_uri,
 },
 "DisableGlueTableCreation": False,
 },
 Description="Claim feature group",
 Tags=[
 {"Key": "stage", "Value": "dev"},
 ],
)

In [None]:
try:
 response = sagemaker_client.create_feature_group(**claim_fg_props, RoleArn=role)
except sagemaker_client.exceptions.ResourceInUse:
 logger.exception("The FeatureGroup exist already", exc_info=False)

For ease of use, you can also create a FeatureGroup object using SageMaker SDK

In [None]:
claims_feature_group = FeatureGroup(
 name=claim_fg_props["FeatureGroupName"],
 sagemaker_session=feature_store_session,
)

In [None]:
claims_feature_group.describe()

### Customers Feature Group

In [None]:
# customers_column_schemas = <> # <--- Copy here the column_schemas from the Jupyter Notebook generated with DataWrangler
customers_column_schemas = [
 {"name": "policy_id", "type": "long"},
 {"name": "customer_age", "type": "long"},
 {"name": "customer_education", "type": "long"},
 {"name": "months_as_customer", "type": "long"},
 {"name": "policy_deductable", "type": "long"},
 {"name": "policy_annual_premium", "type": "long"},
 {"name": "policy_liability", "type": "long"},
 {"name": "auto_year", "type": "long"},
 {"name": "num_claims_past_year", "type": "long"},
 {"name": "num_insurers_past_5_years", "type": "long"},
 {"name": "customer_gender_male", "type": "long"},
 {"name": "customer_gender_female", "type": "long"},
 {"name": "policy_state_ca", "type": "long"},
 {"name": "policy_state_wa", "type": "long"},
 {"name": "policy_state_az", "type": "long"},
 {"name": "policy_state_or", "type": "long"},
 {"name": "policy_state_nv", "type": "long"},
 {"name": "policy_state_id", "type": "long"},
 {"name": "event_time", "type": "float"},
]

We can now build the Feature Group configuration dictionary

In [None]:
customers_fg_props = dict(
 FeatureGroupName="dev-customers",
 FeatureDefinitions=format_feature_defs(customers_column_schemas),
 RecordIdentifierFeatureName="policy_id",
 EventTimeFeatureName="event_time",
 OnlineStoreConfig={
 "EnableOnlineStore": False,
 },
 OfflineStoreConfig={
 "S3StorageConfig": {
 "S3Uri": feature_store_offline_s3_uri,
 },
 "DisableGlueTableCreation": False,
 },
 Description="Customers feature group",
 Tags=[
 {"Key": "stage", "Value": "dev"},
 ],
)

In [None]:
try:
 response = sagemaker_client.create_feature_group(**customers_fg_props, RoleArn=role)
 logger.info("FeatureGroup created")
except sagemaker_client.exceptions.ResourceInUse:
 logger.exception("The FeatureGroup exist already", exc_info=False)

For ease of use, you can also create a FeatureGroup object using SageMaker SDK

In [None]:
customers_feature_group = FeatureGroup(
 name=customers_fg_props["FeatureGroupName"],
 sagemaker_session=feature_store_session,
)

In [None]:
customers_feature_group.describe()

## Data Processing Pipelines

Prepare a subfolder in the `feature_engineering` folder to store the script with the pipeline definition and any additional library we need.

In [None]:
shutil.copy("utils/parse_flow.py", feat_eng_pipelines_path)


In [None]:
%%writefile {feat_eng_pipelines_path}/feature_ingestion_pipeline.py

import json
import sagemaker

from sagemaker.processing import (
 FeatureStoreOutput,
 ProcessingInput,
 ProcessingJob,
 ProcessingOutput,
 Processor,
)
from sagemaker.utils import name_from_base
from sagemaker.workflow.parameters import ParameterInteger, ParameterString
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.wrangler.processing import DataWranglerProcessor

from .parse_flow import FlowFile

def get_pipeline(
 role: str,
 pipeline_name: str,
 prefix: str,
 sagemaker_session: sagemaker.Session=None,
 **kwarg,
)-> Pipeline:
 """[summary]


 Args:
 role ([type]): [description]
 pipeline_name ([type]): [description]
 sagemaker_session ([type], optional): [description]. Defaults to None.

 Returns:
 Pipeline: [description]
 """
 flow_file_path = kwarg["flow_file_path"]
 feature_group_name = kwarg["feature_group_name"]

 bucket = sagemaker_session.default_bucket()
 flow_file = FlowFile(flow_file_path)

 instance_count = ParameterInteger(name="InstanceCount", default_value=1)
 instance_type = ParameterString(name="InstanceType", default_value="ml.m5.4xlarge")
 input_data_uri = ParameterString(name="InputDataURI")
 
 

 flow_file_uri = sagemaker.s3.S3Uploader.upload(
 local_path=flow_file_path,
 desired_s3_uri=f"s3://{bucket}/{prefix}/feature_ingestion/{name_from_base(pipeline_name)}",
 sagemaker_session=sagemaker_session,
 )

 output_content_type = "CSV"
 output_config = {flow_file.output_name: {"content_type": output_content_type}}
 job_argument = [f"--output-config '{json.dumps(output_config)}'"]

 data_sources = [
 ProcessingInput(
 input_name="InputData",
 source=input_data_uri,
 destination=f"/opt/ml/processing/{flow_file.input_name}",
 )
 ]

 outputs = [
 ProcessingOutput(
 output_name=flow_file.output_name,
 app_managed=True,
 feature_store_output=FeatureStoreOutput(
 feature_group_name=feature_group_name
 ),
 )
 ]

 data_wrangler_processor = DataWranglerProcessor(
 role=role,
 data_wrangler_flow_source=flow_file_uri,
 instance_count=instance_count,
 instance_type=instance_type,
 sagemaker_session=sagemaker_session,
 )

 data_wrangler_step = ProcessingStep(
 name="data-wrangler-step",
 processor=data_wrangler_processor,
 inputs=data_sources,
 outputs=outputs,
 job_arguments=job_argument,
 )

 pipeline = Pipeline(
 name=pipeline_name,
 parameters=[
 instance_count,
 instance_type,
 input_data_uri,
 ],
 steps=[data_wrangler_step],
 sagemaker_session=sagemaker_session,
 )

 return pipeline


You can now import the function that create the pipeline object. Thanks to the `autoreload` extension, we can update the script and rerun the call above, and the function will be automatically reloaded.

In [None]:
from feature_engineering.pipelines.feature_ingestion_pipeline import get_pipeline

### Claims feature ingestion pipeline

In [None]:
claims_pipeline_args = {
 "flow_file_path": (feat_eng_pipelines_path / "claims.flow").as_posix(),
 "feature_group_name": claims_feature_group.name,
}
claims_pipeline = get_pipeline(
 role=role,
 pipeline_name="dev-claims-pipeline",
 sagemaker_session=sagemaker_session,
 prefix=prefix,
 **claims_pipeline_args
)
json.loads(claims_pipeline.definition())

### Customers feature ingestion pipeline

In [None]:
customers_pipeline_conf = {
 "flow_file_path": (feat_eng_pipelines_path / "customers.flow").as_posix(),
 "feature_group_name": customers_feature_group.name,
}
customers_pipeline = get_pipeline(
 role=role,
 pipeline_name="dev-customers-pipeline",
 prefix=prefix,
 sagemaker_session=sagemaker_session,
 **customers_pipeline_conf
)
json.loads(customers_pipeline.definition())

### Create the pipelines

In [None]:
try:
 claims_pipeline.update(
 role_arn=role,
 description="Claims feature ingestion pipeline",
 )
 logging.info("Pipeline updated")
except:
 claims_pipeline.create(
 role_arn=role,
 description="Claims feature ingestion pipeline",
 )
 logging.info("Pipeline created")

In [None]:
try:
 customers_pipeline.update(
 role_arn=role,
 description="Claims feature ingestion pipeline",
 )
 logging.info("Pipeline updated")
except:
 customers_pipeline.create(
 role_arn=role,
 description="Claims feature ingestion pipeline",
 )
 logging.info("Pipeline created")

## Run the pipelines

In [None]:
claims_pipeline_execution = claims_pipeline.start(
 parameters={"InputDataURI": claims_uri},
 execution_display_name="dev-run",
)

In [None]:
claims_pipeline_execution.describe()

In [None]:
customers_pipeline_execution = customers_pipeline.start(
 parameters={"InputDataURI": customers_uri},
 execution_display_name="dev-run",
)

In [None]:
customers_pipeline_execution.describe()

## Write configuration files for operationalization

Feature Groups configurations

In [None]:
claims_fg_props_prod = dict(
 FeatureGroupName="mlops-demo-claims",
 FeatureDefinitions=format_feature_defs(claims_column_schemas),
 RecordIdentifierFeatureName="policy_id",
 EventTimeFeatureName="event_time",
 OnlineStoreConfig={
 "EnableOnlineStore": True, # <-- In production we want the online store turned on
 },
 OfflineStoreConfig={
 "S3StorageConfig": {
 "S3Uri": feature_store_offline_s3_uri,
 },
 "DisableGlueTableCreation": False,
 },
 Description="Claim feature group",
)

with (feat_eng_conf_path / "claims.fg.json").open("w") as f:
 json.dump(claims_fg_props_prod, f, indent=2)


customers_fg_props_prod = dict(
 FeatureGroupName="mlops-demo-customers",
 FeatureDefinitions=format_feature_defs(customers_column_schemas),
 RecordIdentifierFeatureName="policy_id",
 EventTimeFeatureName="event_time",
 OnlineStoreConfig={
 "EnableOnlineStore": True, # <-- In production we want the online store turned on
 },
 OfflineStoreConfig={
 "S3StorageConfig": {
 "S3Uri": feature_store_offline_s3_uri,
 },
 "DisableGlueTableCreation": False,
 },
 Description="Customers feature group",
)


with (feat_eng_conf_path / "customers.fg.json").open("w") as f:
 json.dump(customers_fg_props_prod, f, indent=2)

Pipelines configurations

In [None]:
claims_config = dict(
 pipeline_name="claims-preprocessing",
 code_file_path="pipelines/feature_ingestion_pipeline.py",
 pipeline_configuration=claims_pipeline_args,
)
with (feat_eng_conf_path / "claims.pipeline.json").open("w") as f:
 json.dump(claims_config, f, indent=2)

In [None]:
customers_config = dict(
 pipeline_name="customers-preprocessing",
 code_file_path="pipelines/feature_ingestion_pipeline.py",
 pipeline_configuration=customers_pipeline_conf,
)
with (feat_eng_conf_path / "customers.pipeline.json").open("w") as f:
 json.dump(customers_config, f, indent=2)

## Clean-up

In [None]:
# customers_pipeline.delete()
# claims_pipeline.delete()

In [None]:
# claims_feature_group.delete()
# customers_feature_group.delete()