# Module 2: Speed ML Development with Apache Iceberg offline store compaction

Feature Store supports Apache Iceberg as a table format for storing features. This accelerates model development by enabling faster query performance when extracting ML training datasets, taking advantage of Iceberg table compaction. Depending on the design of your feature groups and their scale, you can experience training query performance improvements of 10x to 100x by using this new capability.

**Note:** Please set kernel to `Python 3 (Data Science)` and select instance to `ml.m5.4xlarge`

**This notebook uses the feature groups created in `module-2` and only applies to feature groups created with table format `Iceberg`**

---

## Contents

1. [Background](#Background)
1. [Setup](#Setup)
1. [Compaction](#Compaction)
   1. [Compaction using Athena](#Compaction-using-Athena)
   1. [Compaction using Spark](#Compaction-using-Spark)
1. [Scheduled Compaction](#Scheduled-Compaction)

# Background

In this notebook, you will learn how to schedule table maintenance operations (compaction, storage reclaim) using Amazon Athena and AWS Glue



[Apache Iceberg](https://iceberg.apache.org/) is an open table format for very large analytic datasets. It manages large collections of files as tables, and it supports modern analytical data lake operations such as record-level insert, update, delete, and time travel queries. 

With Feature Store, you can create feature groups with Iceberg table format as an alternative to the default standard Glue format.  With that, customers can leverage the table format to use Icebergâ€™s file compaction and data pruning features as per their use case and optimization requirements. Iceberg also enables customers to perform deletion, time-travel queries, high concurrent transactions, and higher performance of selective queries.

By combining Iceberg as a table format and table maintenance operations such as compaction, customer will benefit from faster query performance when working with offline feature groups at scale and as a result help customer build training dataset faster. 

The following diagram shows the structure of the offline store using Iceberg as a table format.

![Offline File Compaction](../images/smfs_apache_iceberg_compaction.png "Offline File Compaction")

In this notebook, you will learn how execute Icebergâ€™s table management procedures using AWS Athena and use AWS services to schedule these tasks to run on-demand or on a schedule.

# Setup

#### Imports

In [None]:
from sagemaker.feature_store.feature_group import FeatureGroup
from sagemaker import get_execution_role
import sagemaker
import logging
import boto3
import pandas as pd
import time
import re
import os
import sys
import subprocess
import importlib
sys.path.append('..')
from utilities import Utils
from utilities import feature_store_helper

In [None]:
sm_version = sagemaker.__version__
major, minor, patch = sm_version.split('.')
if int(major) < 2 or int(minor) < 125:
    subprocess.check_call([sys.executable, '-m', 'pip', 'install', 'sagemaker==2.125.0'])
    importlib.reload(sagemaker)

In [None]:
logger = logging.getLogger('__name__')
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler())

In [None]:
logger.info(f'Using SageMaker version: {sagemaker.__version__}')
logger.info(f'Using Pandas version: {pd.__version__}')

#### Essentials

In [None]:
sagemaker_session = sagemaker.Session()
role = sagemaker.get_execution_role()
default_bucket = sagemaker_session.default_bucket()
region = sagemaker_session.boto_region_name
logger.info(f'Default S3 bucket = {default_bucket}')
prefix = 'sagemaker-feature-store'

boto_session = boto3.Session(region_name=region)
sagemaker_client = boto_session.client(service_name='sagemaker', region_name=region)
featurestore_runtime = boto_session.client(service_name='sagemaker-featurestore-runtime', region_name=region)

feature_store_session = sagemaker.Session(boto_session=boto_session, 
                                          sagemaker_client=sagemaker_client, 
                                          sagemaker_featurestore_runtime_client=featurestore_runtime)

#### Retrieve Feature Group

In [None]:
# Retrieve FG name
%store -r orders_feature_group_name
orders_fg = FeatureGroup(name=orders_feature_group_name, sagemaker_session=feature_store_session)

#### Query Data

In [None]:
orders_query = orders_fg.athena_query()
orders_table = orders_query.table_name
orders_database = orders_query.database

In [None]:
query_string = f'SELECT * ' \
    f'FROM' \
    f'    (SELECT *,' \
    f'         row_number()' \
    f'        OVER (PARTITION BY "order_id"' \
    f'    ORDER BY  event_time desc, Api_Invocation_Time DESC, write_time DESC) AS row_num' \
    f'    FROM "{orders_database}"."{orders_table}")' \
    f'WHERE row_num = 1 and NOT is_deleted limit 10000;'

%store query_string
query_string

In [None]:
query_results= 'sagemaker-featurestore/athena-results'
output_location = f's3://{default_bucket}/{query_results}/query_results/'
print(f'Athena query output location: \n{output_location}')

In [None]:
%%timeit
orders_query.run(query_string=query_string, output_location=output_location)
orders_query.wait()

In [None]:
joined_df = orders_query.as_dataframe()
joined_df.head()

# Compaction

## Compaction using Athena

[Amazon Athena](https://aws.amazon.com/athena/) is a serverless SQL query engine that natively supports Iceberg management procedures. In this section, you will use Athena to manually compact the offline feature group you created.  

Note you will need to use [Athena engine version 3](https://docs.aws.amazon.com/athena/latest/ug/engine-versions-reference-0003.html). For this, you can create a new workgroup, or configure an existing workgroup, and select the recommended Athena engine version 3. For more information and instructions for changing your Athena engine version, refer to Changing Athena engine versions.

![Offline File Compaction](../images/smfs_athena_workgroup.png "Offline File Compaction")

#### Query pre compaction

The $partitions table provides a detailed overview of the partitions of the Iceberg table. You can retrieve the information about the partitions of the Iceberg table. Let's have a look at the number of files and total bytes size (pre-compaction).

In [None]:
stats_query_string = f'SELECT sum(total_size), sum(file_count) ' \
    f'FROM "{orders_table}$partitions"' \

%store stats_query_string
stats_query_string

In [None]:
%%timeit
orders_query.run(query_string=stats_query_string, output_location=output_location)
orders_query.wait()

In [None]:
stats_joined_df = orders_query.as_dataframe()
stats_joined_df.head()

#### Iceberg Compaction Procedures

As data accumulates into an Iceberg table, queries gradually become less efficient because of the increased processing time required to open additional files. Compactions optimizes the structural layout of the table without altering table content. 

For this, you will use the `OPTIMIZE table REWRITE DATA` compaction table maintenance command in Athena. The following syntax shows how to optimize data layout of a Feature Group stored using the Iceberg table format. See [documentation](https://docs.aws.amazon.com/athena/latest/ug/querying-iceberg-data-optimization.html) for more detail.

```sql
OPTIMIZE "sagemaker_featurestore"."<FEATURE_GROUP_TABLE_NAME>" REWRITE DATA USING BIN_PACK
```

<div class="alert alert-info"> ðŸ’¡ <strong> Iceberg Optimize more Runs needed: </strong>
In the event the table could not be compacted within the quey execution timeout for Athena, you will receive an error ICEBERG_OPTIMIZE_MORE_RUNS_NEEDED. This happens if the entire table could not be compacted within the execution time and you should rerun the command, to resume compaction for remaining partitions.
</div>

![Offline File Compaction](../images/smfs_optimize_query_more_runs_needed.png "Offline File Compaction")

After running the optimize command, you will use the `VACUUM` procedure which performs snapshot expiration and removal of orphan files. These actions reduce metadata size and remove files not in the current table state that are also older than the retention period specified for the table. See [documentation](https://docs.aws.amazon.com/athena/latest/ug/querying-iceberg-data-optimization.html) for more detail.

```sql
VACUUM "sagemaker_featurestore"."<FEATURE_GROUP_TABLE_NAME>"
```

We created a script to run the procedures in Python. Let's explore it.

* The script executes the OPTIMIZE query using boto3.
* It handles the error message error ICEBERG_OPTIMIZE_MORE_RUNS_NEEDED which requires you to resume compaction for remaining partitions.
* It executes VACUUM query if the OPTIMIZE query is successful.

In [None]:
!pip3 install pyspark
!pip3 install git+https://github.com/awslabs/aws-glue-libs.git

In [None]:
!pygmentize ./smfs_offline_compaction.py

#### Execute the compaction procedures

In [None]:
!python smfs_offline_compaction.py --region {region} --database {orders_database} --table {orders_table} --workgroup primary --outputlocation {output_location}

#### Query post compaction

In [None]:
%%timeit
orders_query.run(query_string=query_string, output_location=output_location)
orders_query.wait()

In [None]:
joined_df = orders_query.as_dataframe()
joined_df.head()

Depending on the design of your feature groups and their scale, you can experience training query performance improvements of 10x to 100x by using this new capability. Since the dataset used in this example is small, you might not see a noticeable improvement in query performance. Let's have a look at the number of files and total bytes size post compaction.

In [None]:
%%timeit
orders_query.run(query_string=stats_query_string, output_location=output_location)
orders_query.wait()

In [None]:
stats_joined_df = orders_query.as_dataframe()
stats_joined_df.head()

## Compaction using Spark

Customers can also use Spark to manage the compaction jobs and maintenance methods. For more detail on the Spark procedures, see the [documentation](https://iceberg.apache.org/docs/latest/spark-procedures/).

You first need to configure some of the common properties.

``` sql
%%configure -f
{
  "conf": {
    "spark.sql.catalog.smfs": "org.apache.iceberg.spark.SparkCatalog",
    "spark.sql.catalog.smfs.catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog",
    "spark.sql.catalog.smfs.warehouse": "<YOUR_ICEBERG_DATA_S3_LOCATION>",
    "spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
    "spark.sql.catalog.smfs.glue.skip-name-validation": "true"
  }
}
```

The following code can be used to optimize the feature groups via Spark.

``` sql
spark.sql(f"""CALL smfs.system.rewrite_data_files(table => '{DATABASE}.`{ICEBERG_TABLE}`')""")
```

You can then execute the next two table maintenance procedures to remove older snapshots and orphan files which are no longer needed.

``` sql
spark.sql(f"""CALL smfs.system.expire_snapshots(table => '{DATABASE}.`{ICEBERG_TABLE}`', older_than => TIMESTAMP '{one_day_ago}', retain_last => 1)""")
spark.sql(f"""CALL smfs.system.remove_orphan_files(table => '{DATABASE}.`{ICEBERG_TABLE}`')""")
```

You can then incorporate the above Spark commands into your Spark environment. For example, you can create a job that performs the optimization above on a desired schedule or in a pipeline after ingestion. 


# Scheduled Compaction

In this section, you will learn how to automate the table management procedures to compact your offline feature store. The diagram below illustrates the architecture for creating Feature Groups in Iceberg table format and a fully automated table management solution which includes file compaction and cleanup operations. 

![Offline File Compaction](../images/smfs_scheduled_compaction.png "Offline File Compaction")

* At a high level, you created a Feature Group using the Iceberg table format and ingested records into the online feature store in the previous section. 
* Feature values were automatically replicated from the online store to the historical offline store. 
* [Athena](https://aws.amazon.com/athena/) will be used to run the Iceberg management procedures.
* To schedule the procedures, you will setup a [AWS Glue](https://aws.amazon.com/glue/) job using a Python shell script and create an AWS Glue job schedule. 

### AWS Glue Job Setup

#### Create IAM Role

First you need to create a role for AWS Glue to have permissions to access Amazon Athena, Amazon S3, and CloudWatch.
* Navigate to the IAM console.
* Create an AWS Identity and Access Management (IAM) service role for AWS Glue. 
* Attach a policy that allows access to Athena, Amazon Simple Storage Service (Amazon S3), and Amazon CloudWatch Logs. For example, you can add [AmazonAthenaFullAccess](https://docs.aws.amazon.com/athena/latest/ug/managed-policies.html#amazonathenafullaccess-managed-policy) and [CloudWatchLogsFullAccess](https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/iam-identity-based-access-control-cwl.html#managed-policies-cwl) to the role. AmazonAthenaFullAccess allows full access to Athena and includes basic permissions for Amazon S3. CloudWatchLogsFullAccess allows full access to CloudWatch Logs.


![Offline File Compaction](../images/smfs_aws_glue_role.png "Offline File Compaction")

#### Create AWS Glue Job

Next, you need to create the actual AWS Glue Job.

* Navigate to the AWS Glue console.
* Choose the **Jobs** tab under AWS Glue Studio.
* Select **Python Shell script editor**. 
* Choose to *Upload and edit an existing script*. Use the script located in this module [smfs_offine_compaction.py](./smfs_offine_compaction.py)
* Click **Create**.
* Edit the name of the job (as per screenshot below).

![Offline File Compaction](../images/smfs_glue_script_upload.png "Offline File Compaction")



The **Job details** lets you configure the AWS Glue job.

* Select *IAM role* you created earlier on.
* Select *Python 3.9* or the latest available Python version.
* In the same tab, you can also define a number of other configuration options such as **Number of retries** or **Job timeout**. Leave the value as default.

![Offline File Compaction](../images/smfs_glue_job_details.png "Offline File Compaction")


* In the **Advanced properties** section, you can add **Job parameters** to execute the script, as per the example screenshot below.
* Click "Save".

![Offline File Compaction](../images/smfs_glue_job_parameters.png "Offline File Compaction")

* In the **Schedules** tab, you can define the schedule to run the feature store maintenance procedures. For example, the screenshot below shows you how to run the job on a schedule of every 6 hours. For more information, check the [documentation](https://docs.aws.amazon.com/glue/latest/dg/monitor-data-warehouse-schedule.html).

![Offline File Compaction](../images/smfs_glue_schedule.png "Offline File Compaction")

* You can click "Save" and "Run" to start a compaction job.

![Offline File Compaction](../images/smfs_glue_run.png "Offline File Compaction")

* You can also monitor job runs to understand runtime metrics such as completion status, duration, and start time. You can check the CloudWatch logs for the AWS Glue job to check that the procedures run successfully.

![Offline File Compaction](../images/smfs_glue_logs.png "Offline File Compaction")