# Setup environment for data transformation and ingestion workflow
This notebook sets up needed resources and parameters for a custom SageMaker project which provision a data transformation and ingestion workflow:

<img src="../design/solution-functional-view.drawio.svg" style="background-color:white;" alt="solution overview" width="800"/>

1. Data file or files uploaded to an Amazon S3 bucket
2. Data processing and transformation process is launched
3. Extracted, processed, and transformed features are ingested into a designated feature group in Feature Store

The notebook takes you through following activites to create the pre-requisite resources:
- Get an Amazon S3 bucket for data upload
- download the dataset and explore the data
- create Amazon Data Wrangler flow for data transformation and feature ingestion
- create a new feature group in Feature Store where features are stored

‚≠ê Depending on your specific use case and requirements, for your own custom project you can consider to create all these resources as part of the project provisioning.

Load packages:

In [None]:
import os
import json
import boto3
import pandas as pd
import sagemaker
from sagemaker.session import Session
from sagemaker.feature_store.feature_definition import FeatureDefinition
from sagemaker.feature_store.feature_definition import FeatureTypeEnum
from sagemaker.feature_store.feature_group import FeatureGroup

import time
from time import gmtime, strftime
import uuid


print(sagemaker.__version__)

In [None]:
%store -r
%store

Get `domain_id` and `execution_role`:

In [None]:
NOTEBOOK_METADATA_FILE = "/opt/ml/metadata/resource-metadata.json"
domain_id = None

if os.path.exists(NOTEBOOK_METADATA_FILE):
    with open(NOTEBOOK_METADATA_FILE, "rb") as f:
        domain_id = json.loads(f.read()).get('DomainId')
        print(f"SageMaker domain id: {domain_id}")

%store domain_id

In [None]:
r = boto3.client("sagemaker").describe_domain(DomainId=domain_id)
execution_role = r["DefaultUserSettings"]["ExecutionRole"]

%store execution_role

## Get S3 bucket for data
We use the SageMaker default bucket for storing all solution artifacts and data. You can choose to create or use your own bucket. Make sure you have corresponding permissions attached to the SageMaker execution role and to `AmazonSageMakerServiceCatalogProductsUseRole` role to be able to list, read, and put objects into the bucket.

In [None]:
data_bucket = None #¬†you can use your own S3 bucket name
sagemaker_session = Session()

if data_bucket is None:
    data_bucket = sagemaker_session.default_bucket()

In [None]:
print(data_bucket)

‚≠ê You can keep the following literals set to their default values or change them if you would like.

In [None]:
# set some literals
s3_data_prefix = f"{data_bucket}/feature-store-ingestion-pipeline/dataset/"
s3_flow_prefix = f"{data_bucket}/feature-store-ingestion-pipeline/dw-flow/"
s3_fs_query_output_prefix = f"{data_bucket}/feature-store-ingestion-pipeline/fs_query_results/"

dw_flow_name = "dw-flow" # change to your custom file name if you use a different one
unique_suffix = f"{strftime('%d-%H-%M-%S', gmtime())}-{str(uuid.uuid4())[:8]}"
abalone_dataset_file_name = "abalone.csv"
abalone_dataset_local_path = "../dataset/"
abalone_dataset_local_url = f"{abalone_dataset_local_path}{abalone_dataset_file_name}"

print(f"Data Wrangler flow upload and a feature group will have this unique suffix: {unique_suffix}")

## Download the dataset
We use a well-known [Abalone dataset](https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/regression.html#abalone) in this solution. The dataset contains 4177 rows of data, and 8 features.

Dua, D. and Graff, C. (2019). UCI Machine Learning Repository [http://archive.ics.uci.edu/ml]. Irvine, CA: University of California, School of Information and Computer Science.

In [None]:
!mkdir -p ../dataset
!rm -fr ../dataset/*

Download the dataset from [UCI website](http://archive.ics.uci.edu/ml/datasets/Abalone):

In [None]:
!cd {abalone_dataset_local_path} && wget -t inf http://archive.ics.uci.edu/ml/machine-learning-databases/abalone/abalone.data
!cd {abalone_dataset_local_path} && wget -t inf http://archive.ics.uci.edu/ml/machine-learning-databases/abalone/abalone.names

Load the dataset and print first five rows.

In [None]:
# dictionary of dataset columns and data types
columns = {
    "sex":"string", 
    "length":"float", 
    "diameter":"float", 
    "height":"float", 
    "whole_weight":"float", 
    "shucked_weight":"float", 
    "viscera_weight":"float", 
    "shell_weight":"float",
    "rings":"long"
}

In [None]:
data_df = pd.read_csv(f"{abalone_dataset_local_path}abalone.data", names=columns.keys())
print(f"Data shape: {data_df.shape}")
data_df.head()

In [None]:
# save the dataframe as CSV with the header and index
data_df.to_csv(abalone_dataset_local_url, index_label="record_id")

Upload the data to the data S3 bucket.

In [None]:
!aws s3 cp {abalone_dataset_local_path}. s3://{s3_data_prefix} --recursive

In [None]:
print(f"Data uploaded to s3://{s3_data_prefix}")

## Data Wrangler flow
You can use the provided [Data Wrangler flow file](dw-flow.flow) and skip the **Create Data Wrangler flow** section and move on directly to **Set output name** step. Alternatively you can follow the instructions how to create a new flow with data transformations.

### Create Data Wrangler flow (OPTIONAL)

<div class="alert alert-info"> üí° <strong> The creation of Data Wrangler flow is optional</strong>
</div>

Follow these step-by-step instructions to create a new Data Wrangler flow and add data transformation steps to the flow. Refer to [Data Wrangler documentation](https://docs.aws.amazon.com/sagemaker/latest/dg/data-wrangler.html) for more details.

1. Select **Data Wrangler** in **SageMaker resources** widget:

<img src="../img/studio-data-wrangler.png" width="400"/>

2. Click on **New flow**: 

<img src="../img/studio-data-wrangler-new-flow.png" width="400"/>

3. Select **Amazon S3** as your source:

<img src="../img/data-wrangler-import.png" width="600"/>

4. Navigate to the S3 bucket path to import the dataset you uploaded to the S3 prefix in the previous section:

<img src="../img/data-wrangler-navigate-to-data-source.png" width="600"/>

5. Select `abalone.csv` file, check that **First row is header** is selected and **Delimiter** set to `COMMA`. Click on **Import**:

<img src="../img/data-wrangler-import-details.png" width="600"/>

6. Right-click on the untitled.flow flow if you want to rename it to `dw-flow.flow`. ‚≠ê You can use your own name and don't overwrite the provided original flow file. In this case you have to change the value of `dw_flow_name` variable correspondingly.

<img src="../img/data-wrangler-rename-flow.png" width="400"/>

7. We add Data Wrangler transformations containing several custom Python Pandas commands for the following:
    - scale all numeric columns using sklearn `StandardScaler`
    - one-hot encoding of the categorical column `sex`
    - add a timestamp column `record_time`, which is required by Feature Store

In order to add transformations, go to the **Data Flow** tab and click on the + sign next to **Date types** box and select **Add transform**:

<img src="../img/data-wrangler-add-transform.png" width="400"/>

8. Click on **+ Add step** and select **Custom transform** and **Python (Pandas)** in the selection box:

<img src="../img/data-wrangler-transform-add-step.png" width="400"/>

<img src="../img/data-wrangler-transform-custom-pandas.png" width="800"/>

Enter the following code into the editor:
```python
import pandas as pd
from sklearn.preprocessing import StandardScaler

df_scaled = df.drop(['record_id', 'sex','rings'], axis=1)
df_scaled = StandardScaler().fit_transform(df_scaled.to_numpy())
df_scaled = pd.DataFrame(df_scaled, columns=['length','diameter','height','whole_weight','shucked_weight','viscera_weight','shell_weight'])

df = pd.concat((df_scaled, df[['record_id', 'sex','rings']]), 1)
```

We scale all numeric columns in one step by using this Custom Transform in Python (Pandas) and sklearn. 

Click on **Preview**, then **Add** to add the transform to the data flow.

9. We use the native **Encode Categorical** feature of Data Wrangler to one hot encode the `Sex` variable. Click on **+ Add step** and under **Add Transform** on the right select **Encode Categorical**:

<img src="../img/data-wrangler-transform-encode-categorical.png" width="800"/>

Select `One-hot encode` for Transform, `sex` for Input column, and `Columns` as Output style.
  
Click on **Preview** to see the changes and then on **Add**.

10. Finally, click on **+ Add step** and select **Custom transform** and **Python (Pandas)**:

<img src="../img/data-wrangler-transform-custom-pandas-time.png" width="800"/>

Enter the following code into the editor:
```python
import time
import pandas as pd

record_time_feature_name = 'record_time'
current_time_sec = int(round(time.time()))
df[record_time_feature_name] = pd.Series([current_time_sec]*len(df), dtype="float")
```

Click on **Preview**, then **Add** to add the transform to the data flow.

11. Now you have three transformation steps in **Transforms** overview:

<img src="../img/data-wrangler-transform-all-steps.png" width="600"/>

12. Save your Data Wrangler flow. Select **File** and then select **Save Data Wrangler Flow**.

Click on **Back to data flow** and navigate to the **Export** tab.

Select the last step in your Data Wrangler flow.

<img src="../img/data-wrangler-export.png" width="400"/>

Choose **Export step** and select the export option **Feature Store**:

<img src="../img/data-wrangler-export-step.png" width="600"/>

13. A new generated notebook will be opened in a new window. Navigate to **Output: Feature Store** section in the notebook and locate the `output_name` variable:

<img src="../img/data-wrangler-output-name-value.png" width="600"/>

Copy the value of `output_name` variable and paste it in the following code cell in this notebook.

### End of Data Wrangler flow manual creation
---

### Set output name
Data Wrangler processor needs a `node_id` of the last transformation step, after which transformed data is exported to the output destination.  
If you created your own Data Wrangler flow or added more transformation steps to the flow, you need to set the `dw_output_name` to a correct `node_id` value as described in the previous section in step 12 and 13. Otherwise run the following code cell.

In [None]:
# Set the dw_output_name to your export node_id, otherwise keep None if you use the provided flow
dw_output_name = None

if dw_output_name is None:
    flow_content = json.loads(open(f"{dw_flow_name}.flow").read())
    dw_output_name = f"{flow_content['nodes'][len(flow_content['nodes'])-1]['node_id']}.default"
    
print(f"DataWrangler flow output name: {dw_output_name}")

###¬†Upload DataWrangler flow to S3 bucket
Finally, we upload the Data Wrangler flow to the S3 bucket. The data processing pipeline uses this flow file to run the data transformation.

In [None]:
dw_flow_file_url = f"s3://{s3_flow_prefix}{dw_flow_name}-{unique_suffix}.flow"

In [None]:
!aws s3 cp {dw_flow_name}.flow {dw_flow_file_url}

## Create feature group
We must create a new feature group in SageMaker Feature Store to store the data features. A feature group is a predefined schema for a 
collection of features - each feature in the feature group has a specified data type and name. 

A single record in a feature group corresponds to a row in your dataframe. A feature store is a 
collection of feature groups. To learn more about SageMaker Feature Store, see 
[Amazon Feature Store Documentation](http://docs.aws.amazon.com/sagemaker/latest/dg/feature-store.html).

Select Record identifier and Record time feature name. These are required parameters for feature group
creation.

In [None]:
record_identifier_feature_name = 'record_id'
if record_identifier_feature_name is None:
   raise SystemExit("Select a column name as the feature group record identifier.")

record_time_feature_name = 'record_time'
if record_time_feature_name is None:
   raise SystemExit("Select a column name as the event time feature name.")

The following is a list of the feature names and data types of the **final dataset** that will be produced when your data flow is used to process your input dataset.

In [None]:
try:
    columns
    
except NameError:
    # dictionary of dataset columns and data types
    columns = {
        "sex":"string", 
        "length":"float", 
        "diameter":"float", 
        "height":"float", 
        "whole_weight":"float", 
        "shucked_weight":"float", 
        "viscera_weight":"float", 
        "shell_weight":"float",
        "rings":"long"
    }

In [None]:
# since we added one-hot encoding for the categorical column `sex`, adjust the column list for the feature group
if columns.get("sex") is not None: 
    del columns["sex"]
    
for i in ('M', 'I', 'F'):
    columns[f"sex_{i}"] = "float"

columns

In [None]:
column_schemas = [
    *[{"name":c[0], "type":c[1]} for c in columns.items()],
    {
        "name": record_identifier_feature_name,
        "type": "long"
    },
    {
        "name": record_time_feature_name,
        "type": "float"
    },
]

column_schemas

Below we create the SDK input for those feature definitions. Some schema types in Data Wrangler are not 
supported by Feature Store. The following will create a `default_feature_type` set to String for these types.

In [None]:
default_feature_type = FeatureTypeEnum.STRING
column_to_feature_type_mapping = {
    "float": FeatureTypeEnum.FRACTIONAL,
    "long": FeatureTypeEnum.INTEGRAL
}

feature_definitions = [
    FeatureDefinition(
        feature_name=column_schema['name'], 
        feature_type=column_to_feature_type_mapping.get(column_schema['type'], default_feature_type)
    ) for column_schema in column_schemas
]
print(f"feature definitions: {feature_definitions}")

Define some literals:

In [None]:
feature_group_name_prefix = "FG-abalone"
feature_group_name = f"{feature_group_name_prefix}-{unique_suffix}"
feature_store_offline_s3_uri = f"s3://{data_bucket}"

# controls if online store is enabled. Enabling the online store allows quick access to 
# the latest value for a Record via the GetRecord API.
enable_online_store = False

Create a feature group using SageMaker Python SDK:

In [None]:
feature_group = FeatureGroup(
    name=feature_group_name,
    sagemaker_session=sagemaker_session,
    feature_definitions=feature_definitions)

feature_group.create(
    s3_uri=feature_store_offline_s3_uri,
    record_identifier_name=record_identifier_feature_name,
    event_time_feature_name=record_time_feature_name,
    role_arn=execution_role,
    enable_online_store=enable_online_store
)

Wait until the feature group is ready, it takes around 1 minute:

In [None]:
while feature_group.describe().get("FeatureGroupStatus") == "Creating":
    print("Waiting for Feature Group Creation")
    time.sleep(5)

if feature_group.describe().get("FeatureGroupStatus") != "Created":
    raise SystemExit(f"Failed to create feature group {feature_group.name}: {status}")
print(f"FeatureGroup {feature_group.name} successfully created.")

<div class="alert alert-info"> üí° <strong> Dealing with AccessDenied exception </strong></div>

If you get `AccessDenied` exception during creation of a feature group, it may be caused by Lake Formation permissions on `sagemaker_featurestore` database. 

You have to grant permissions to that database to the SageMaker execution role (or role that you use to access Feature Store) in Lake Formation as described in [this document](../grant-sm-execution-role-access-to-fs.pdf).

### Query data in feature group
Upon creation, the feature group in Feature Store is empty and contains no data. You can browse the feature group meta data by selecting **Feature Store** in **SageMaker resources** widget:

<img src="../img/studio-feature-store.png" width="400"/>

or use SageMaker SDK:

In [None]:
feature_group.describe()

You can query data in the feature group by using Athena query, as demonstrated in the next two code cells.

In [None]:
# Build SQL query to features group
fs_query = feature_group.athena_query()

query_string = f'SELECT * FROM "{fs_query.table_name}"'
print(f'Prepared query {query_string}')
print(fs_query)

In [None]:
# Run Athena query. The output is loaded to a Pandas dataframe.
fs_query.run(
    query_string=query_string, 
    output_location=f"s3://{s3_fs_query_output_prefix}"
)

fs_query.wait()
fs_df = fs_query.as_dataframe()

In [None]:
fs_df

As expected, the feature group doesn't contain any data.
Now is everything ready for deployment of the data transformation and ingestion pipeline, which is going to ingest features into our feature group.

## Store the parameters
We store the parameters for data transformation and ingestion pipeline using `%store` magic. We are going to use these parameters in the next notebook.

In [None]:
%store data_bucket
%store dw_flow_file_url
%store dw_output_name
%store feature_group_name
%store s3_data_prefix
%store s3_flow_prefix 
%store s3_fs_query_output_prefix
%store abalone_dataset_file_name
%store abalone_dataset_local_url

%store

## Release resources

In [None]:
%%html

<p><b>Shutting down your kernel for this notebook to release resources.</b></p>
<button class="sm-command-button" data-commandlinker-command="kernelmenu:shutdown" style="display:none;">Shutdown Kernel</button>
        
<script>
try {
    els = document.getElementsByClassName("sm-command-button");
    els[0].click();
}
catch(err) {
    // NoOp
}    
</script>

Proceed to the [`01-feature-store-ingest-pipeline` notebook](01-feature-store-ingest-pipeline.ipynb).