# Working with Amazon SageMaker Offline Feature Store 

## SageMaker Feature Store Offline SDK enables you to easily build ML-ready datasets from Feature Groups

**How to use Amazon SageMaker Feature Store Offline SDK to build ML-ready datasets by loading Pandas dataframes and joining one or more Feature Groups.**

**Note:** Please set kernel to `Python 3 (Data Science)` and select instance to `ml.m5.large`, one of the fast-launch types.

---

# Contents

1. [Overview](#Overview)
1. [Describe Data Sets](#Datasets)
1. [Generate Timestamps](#Generate-Timestamps)
1. [Enhance Dataframe with Time-Series Data](#Enhance-Dataframe-with-Time-Series-Data)
1. [Create Dataset from single Feature Group](#Create-Dataset-from-single-Feature-Group)
1. [Demonstrate use of Point-in-Time Join](#Demonstrate-use-of-Point-in-Time-Join)
1. [Create Dataset from Joining two Feature Groups](#Create-Dataset-from-Joining-two-Feature-Groups)
1. [Create Dataset using Event Time window](#Create-Dataset-using-Event-Time-window)
1. [Create Dataset using as-of Timestamp](#Create-Dataset-using-as-of-Timestamp)
1. [Clean Up](#Clean-Up)

--- 

### LICENSE

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. </br>
// SPDX-License-Identifier: MIT-0 </br>


---

## Overview

SageMaker Feature Store is a purpose-built service to store and retrieve feature data for use by Machine Learning models. The Feature Store provides an Online store capable of low latency high throughput reads and writes, plus an Offline store that provides bulk access to all historical record data, and an automated synchronization of data between the Online and Offline store. 

The Feature Store Offline SDK provides the ability to quickly and easily build ML-ready datasets for use by ML model training or pre-processing. The SDK makes it easy to build datasets from SQL join, point-in-time accurate join, and event range time frames, all without the need to write any SQL code. This functionality is accessed via the DatasetBuilder class which is the primary entry point for the SDK functionality. This notebook will instruct the user on how to operate the SDK to retrieve feature data in a number of scenarios, including:


1. Perform basic feature-level join between one or more Feature Groups and Pandas dataframes. The Offline Store SDK allows you to create datasets using table join logic without the need to write any SQL code. This capability can speed development and time-to-market for your ML use cases.

2. Retrieval of point-in-time accurate feature data based on an entity or event-level dataframe. This key capability allows you to retrieve data using “row-level time travel” according to the event times provided to the Dataset Builder (builder).

3. Retrieval of feature data with event times within a specified time frame. If you want to retrieve only a subset of your data containing records whose event_time falls within a certain timeframe, you can simply provide the start and end times to the builder.

4. Retrieval of feature data with as-of-timestamp (referred to as “time travel”). This capability is useful when you want to access data that represents the state of the datastore at some time in the past. Use cases involve rollback, datastore audits, and avoidance of feature leakage for ML training jobs.


In the past, these sorts of data retrieval scenarios required the user to write complex SQL query syntax which would then be submitted to the Amazon Athena service. Athena provides a flexible SQL query engine on top of S3 object storage. While this is still possible, the Offline SDK makes it much easier to build these datasets that incorporate one or more Pandas Dataframes and one or more existing Feature Groups.  For users that prefer to accomplish this type of feature engineering using Apache Spark, SageMaker Feature Store also provides a Spark connector.

To demonstrate these capabilities, this notebook sample will create several Feature Groups, and several Pandas Dataframes which are related by a PrimaryKey-to-ForeignKey relationship. We will ingest sample data from a public dataset into the Feature Groups prior to calling methods of the Offline SDK. Then, we will illustrate how to retrieve the ingested features from the multiple Feature Groups and combine them to build feature sets that can be used to train an ML model. 

This notebook will walk through the following steps:

* Create base Dataframe from public Leads dataset using simulated timestamp data, and replicate a handful of records with simulated time-series data (used for point-in-time join)
* Create Dataset Builder object with point-in-time join (row-level time travel), and show records that meet that criteria
* Compare Athena SQL query results to Offline SDK results, verifying the point-in-time accuracy
* Create target Dataframe and ingest public Web Marketing dataset into target Feature Group 
* Create Dataset Builder object by joining the base and target Feature Groups
* Retrieve combined feature data using event time range window, and show records that meet that criteria
* Retrieve feature data using as-of timestamp which references the `write_time` datastore attribute
* Examine CSV data files, written to S3 by the Offline SDK, which are useful for further processing or feeding directly to training or batch scoring jobs

---

## Datasets

### Overview of Leads and Marketing Datasets

The use case concerns different marketing activities/metrics captured for the lead by the marketing campaigns. There are two datasets, LeadData and WebMarketingData.

LeadData CSV file provides data about the lead and each lead has a unique Lead_ProspectID and Lead_EventTime associated with it. It provides information including job role (JobRole), lead profile (LeadProfile) , whether they used marketing promotion or not (UsedPromo), region (Region), unique Id (Lead_ProspectID) and whether they converted into a sales or not (Converted) etc. This “converted” field is our target feature for model prediction.

WebMarketingData CSV file provides data on what all different marketing activities / matrices were performed by the lead under different campaigns run by marketing team. Each activity has a unique Web_ProspectID and Web_EventTime associated with it. It provides data including the last campaign activity performed by the lead (LastCampaignActivity), number of page views per visit (PageViewsPerVisit), total time spend on website (TotalTimeOnWebsite), whether the lead attended the marketing event or not (AttendMarketingEvent), whether the lead viewed the advertisement or not (ViewedAdvertisement) etc.

These datasets can also be downloaded from AWS hosted buckets here:

Leads Data:
https://static.us-east-1.prod.workshops.aws/public/1874ad19-b8dc-4295-923f-a738875ed5c3/static/lab8/LeadData.csv

WebMarketing Data: 
https://static.us-east-1.prod.workshops.aws/public/1874ad19-b8dc-4295-923f-a738875ed5c3/static/lab8/WebMarketingData.csv

Note: You do *not* need to download these data files to run this notebook since the CSV files are already stored in the ./data sub-directory.

### Create Feature Groups using SageMaker SDK

Create Feature Groups from Pandas dataframes using SageMaker Feature Store Offline SDK

In [None]:
# Initial imports

import subprocess
import importlib
import random
import string
import datetime
import time
import sys
import os
import re
import sagemaker


#### Enforce a minimum required version of SageMaker SDK library

In [None]:
sm_version = sagemaker.__version__

major, minor, patch = sm_version.split('.')

if int(major) < 2 or int(minor) < 132:
    print('Upgrading sagemaker version from: ' + sm_version)
    subprocess.check_call([sys.executable, '-m', 'pip', 'install', 'sagemaker==2.132.0'])
    importlib.reload(sagemaker)


In [None]:
import sagemaker
print('SageMaker version: ' + sagemaker.__version__)

In [None]:
from time import gmtime, strftime, sleep
from datetime import date, datetime, timezone
from random import randint

from sagemaker.feature_store.feature_store import FeatureStore
from sagemaker.feature_store.feature_group import FeatureGroup
from sagemaker.session import Session
from sagemaker import get_execution_role

import sagemaker
import boto3
import logging
import pandas as pd
import numpy as np
import re


In [None]:
# Import local helper code with convenience functions 

sys.path.append('./code')
import sample_helper

In [None]:
# Configure logging

logger = logging.getLogger('__name__')
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler())

In [None]:
# Configure Session objects 

region = boto3.Session().region_name
print(region)

boto_session = boto3.Session(region_name=region)
sagemaker_session = sagemaker.Session()

role = sagemaker.get_execution_role()
print(role) 

# Allocate SageMaker, Feature Store, and S3 clients

s3_client = boto3.client('s3', region_name=region)
sagemaker_client = boto_session.client(service_name="sagemaker", region_name=region)
featurestore_runtime = boto_session.client(service_name="sagemaker-featurestore-runtime",region_name=region)

### Create required Feature Store Session object 

In [None]:
# Create FeatureStore session object

feature_store_session = sagemaker.Session(
    boto_session=boto_session,
    sagemaker_client=sagemaker_client,
    sagemaker_featurestore_runtime_client=featurestore_runtime,
)

feature_store = FeatureStore(sagemaker_session=feature_store_session)

In [None]:
# Reference to default S3 bucket
s3_bucket_name = sagemaker_session.default_bucket()
print(f'Using S3 default bucket: {s3_bucket_name}')

# Note: Artifacts created by this notebook will be located under this S3 prefix
s3_prefix = "offline-store-sdk-artifacts-" + "".join(random.choice(string.ascii_lowercase) for i in range(10))
print(f'S3 prefix for offline store data: {s3_prefix}')

## Generate Timestamps

### Now we will generate random timestamps and add them to a Dataframe

Feature Groups require a field for EventTime, which represents the actual date/time at which the event occurred. This field must be either type int (unix epoch time) or a string that corresponds to ISO 8601 standard.

Note: The timestamps are generated purposely to span within a single calendar year (Jan 2022 ~ Dec 2022).


In [None]:
# Note: initial unix time 1640995200 is 01-01-2022
initial_unix_timestamp = 1640995200

# Note: delta of 31536000 is one year of unix time
one_year_delta = 31536000

# Function gen_timestamps generates randomized datetimes using start and delta arguments
df_timestamps = sample_helper.gen_timestamps(initial_unix_timestamp, one_year_delta, num=10000)

print(df_timestamps.shape)
print(df_timestamps.columns)

df_timestamps.head()

### Read Lead data from CSV file and load into Pandas Dataframe

In [None]:
# Create base Dataframe by using Pandas to read CSV file 
base_data_df = pd.read_csv("./data/LeadData.csv")

In [None]:
# Drop column to avoid ValueError: Failed to infer Feature type based on dtype bool for column DoNotReachOut.
base_data_df = base_data_df.drop('DoNotReachOut', axis=1)  

# Set 'Lead_EventTime' column using the generated timestamps
base_data_df['Lead_EventTime'] = df_timestamps['Timestamp'].astype('string')

# Verify column names for Leads dataset, with Primary Key 'Lead_ProspectID'
print(f'Column names: {base_data_df.columns}')

In [None]:
print(base_data_df.dtypes)
base_data_df.head()

### Now generate NEWER timestamps for creating time-series data

Note: Point-in-Time-Accurate-Join method is useful to operate over time-series data. Therefore, we clone an existing record from the dataset, create new (fake) values for `LeadSource` to easily identify these new records, and then append these additional records to the original dataframe. We also generate newer timestamps (from Jan 2023) for the new sample records.

In [None]:
# Note: unix time 1672531200 is 01-01-2023
jan2023_unix_timestamp = 1672531200

# Note: delta of 2592000 is one MONTH of unix time
one_month_delta = 2592000

new_timestamps_df = sample_helper.gen_timestamps(jan2023_unix_timestamp, one_month_delta, num=100)

new_timestamps_df.head()

## Enhance Dataframe with Time-Series Data

We select existing records and create copies with different timestamps for time-series data

In [None]:
# Locate existing records to replicate 

sample_df1 = base_data_df.iloc[[0]]
sample_rec1 = sample_df1.iloc[0]
record_id1 = sample_rec1['Lead_ProspectID']
print(f'Sample record ID[0]: {record_id1}')

sample_df2 = base_data_df.iloc[[1]]
sample_rec2 = sample_df2.iloc[0]
record_id2 = sample_rec2['Lead_ProspectID']
print(f'Sample record ID[1]: {record_id2}')


#### Generate new dataframes for time-series data using the new timestamps

Note: The code below calls methods in supplemental Python file under the ./code directory.

In [None]:
# Generate new records for time-series data

new_records_df1 = sample_helper.gen_new_records(sample_df1, new_timestamps_df, num=10)
new_records_df2 = sample_helper.gen_new_records(sample_df2, new_timestamps_df, num=10)

print(new_records_df1.shape)
print(new_records_df2.shape)

In [None]:
# View new records for time-series #1

new_records_df1.head()

In [None]:
# View new records for time-series #2

new_records_df2.head()

In [None]:
# Append new records to copy of base Dataframe

updated_base_df = base_data_df.copy(deep=True)

updated_base_df = updated_base_df.append(new_records_df1, ignore_index=True)
updated_base_df = updated_base_df.append(new_records_df2, ignore_index=True)

print(updated_base_df.shape)

### Create FeatureGroup for base dataframe

We create the initial (base) Feature Group by choosing a name, loading feature definitions from the Pandas dataframe, and then calling `FeatureGroup.create`, providing the names for the two required attributes (`record_identifier_name` and `event_time_feature_name`).


In [None]:
print('Objects created under bucket: ' + s3_bucket_name)

# Create Feature Group:
# Note: the base_name becomes the name of the FeatureGroup, and can be changed by user
base_name = "off-sdk-fg-lead"   # No underscores

base_fg = FeatureGroup(name=base_name, sagemaker_session=feature_store_session)

base_fg.load_feature_definitions(data_frame=updated_base_df) 

base_fg.create(
    s3_uri=f"s3://{s3_bucket_name}/{s3_prefix}",
    record_identifier_name="Lead_ProspectID",
    event_time_feature_name="Lead_EventTime",
    role_arn=role,
    enable_online_store=True,  # False to disable Online store
)


In [None]:
sample_helper.wait_for_feature_group_creation_complete(feature_group=base_fg)

### Lookup Offline Store S3 Uri

We call `FeatureGroup.describe` method to lookup config items for the Feature Group. In this case, we lookup the full S3 Uri prefix where Offline Store files are written, and remove the bucket name to be used as S3 prefix further down.

In [None]:
# Use describe to lookup Offline Store S3 URI

base_fg_resolved_output_s3_uri = base_fg.describe()["OfflineStoreConfig"]["S3StorageConfig"]["ResolvedOutputS3Uri"]

base_fg_s3_prefix = base_fg_resolved_output_s3_uri.replace(f"s3://{s3_bucket_name}/", "")
print(base_fg_s3_prefix)

### Ingest data from Dataframe into Feature Group

#### Wait for data ingested into Feature Group to replicate to Offline Store

When data is written to a Feature Group (e.g. using `put_record` or `ingest`), it lands in the Online Store initially. Next, this data is replicated to the Offline Store hosted on S3, where it can be queried and loaded for pre-processing or model training. This replication can take anywhere from 5 minutes up to 15 minutes. The wait function below will test and wait for data to appear in the S3 Offline Store.

In [None]:
base_fg.ingest(data_frame=updated_base_df, max_workers=3, wait=True)

sample_helper.wait_for_feature_group_data_ingest(s3_bucket_name, base_fg_s3_prefix)

## Create Dataset from single Feature Group

We can create a Feature Store Dataset from one or multiple existing Feature Groups. Here we call the first new SDK method `FeatureStore.create_dataset` which wraps an underlying call to the [DatasetBuilder](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#dataset-builder) class to generate our first dataset. 

Note: The `to_csv_file` method returns a tuple, consisting of the S3 location of the CSV file, and the underlying query executed to produce the datast. The execution query can be inspected for debugging purposes, or can be provided as an Athena query if desired.

In [None]:
# Create dataset from single (base) FeatureGroup 

ds1_builder = feature_store.create_dataset(
	base=base_fg,
	output_path=f"s3://{s3_bucket_name}/{s3_prefix}/dataset_query_results"
)

# Returns: tuple (CSV file, SQL query)
csv_file, query = ds1_builder.to_csv_file()

# Show S3 location of CSV file
print(f'CSV file: {csv_file}')

#### Create Dataset from Feature Group and Pandas Dataframe

Another way to create a Feature Store Dataset is to reference a Pandas dataframe and an existing Feature Group. In this case, a SQL Join operation is performed between the existing feature group and the dataframe, using the `record_identifier_feature_name` attribute as join key.

In [None]:
# Create dataset from Pandas dataframe

ds2_builder = feature_store.create_dataset(
    base=new_records_df2,  # Pandas dataframe
    event_time_identifier_feature_name="Lead_EventTime", 
    record_identifier_feature_name="Lead_ProspectID",
    output_path=f"s3://{s3_bucket_name}/{s3_prefix}/dataset_query_results"
).with_feature_group(base_fg, "Lead_ProspectID", ["LeadSource"])


In [None]:
# Generate dataframe from dataset
ds2_df, ds2_query = ds2_builder.to_dataframe()

print(ds2_df.shape)

In [None]:
# Submit Pandas query for matching record ID
query = f"LeadSource == 'NewLeadSource_0'"

leadsource_match = ds2_df.query(query, inplace=False)
print(leadsource_match.shape)

leadsource_match.head(5)

## Demonstrate use of Point-in-Time Join

We reference the two record_id's from the extra sample records generated above to perform the point-in-time join. These new sample records create a mini-set of time-series data where the `Lead_EventTime` spans a single month (Jan 2023). Below, we create an entity dataframe with two event times that indicate the "cut-off" time. When we enable the `point_in_time_accurate_join` in the `create_dataset` call below, the internal query will exclude all records with timestamps LATER then the cut-off times supplied. The entity dataframe with the cut-off event times is submitted as the `base` dataframe in the DatasetBuilder constructor.

In [None]:
# Create Events (entity table) dataframe to pass Timestamp for Point-in-Time Join

events = [['2023-01-20T00:00:00Z', record_id1],
          ['2023-01-15T00:00:00Z', record_id2]]

print(events)

df_events = pd.DataFrame(events, columns=['Event_Time', 'Lead_ProspectID']) 


In [None]:
# Create Dataset Builder using point-in-time-accurate-join function

pit_builder = feature_store.create_dataset(
    base=df_events, 
    event_time_identifier_feature_name='Event_Time', 
    record_identifier_feature_name='Lead_ProspectID',
    output_path=f"s3://{s3_bucket_name}/{s3_prefix}/dataset_query_results"
).with_feature_group(base_fg, "Lead_ProspectID"
).point_in_time_accurate_join(
).with_number_of_recent_records_by_record_identifier(1)


### Review the results returned by the point-in-time join

Notice that there are only two records in the dataframe returned by the point-in-time join. This is because we only submitted two records in the entity dataframe, one for each record_id we want to retrieve. The entity dataframe includes the record_id (primary key) to match and the event time for each key, which is used to exclude newer records. This means that a record's event time (stored in `Lead_Eventtime` field) must contain a value that is less-than the cut-off time. 

Additionally, we only retrieve the latest record that meets this criteria since we have applied the `with_number_of_recent_records_by_record_identifier` method. When used in conjunction with `point_in_time_accurate_join` method, this allows the caller to specify how many records to return from those that meet the point-in-time join criteria. 

Notice also that each of the records returned from the point-in-time join contain the new (fake) LeadSource values, which proves they were part of the mini-set of time-series data added to the original dataframe. 


In [None]:
# Export result set to Dataframe and Query string
pit_df, pit_query = pit_builder.to_dataframe()

print(pit_df.shape)
pit_df.head()

#### We can retrieve CSV file from S3 to validate record data

In [None]:
# Write point-in-time-accurate-join result set to S3 file

# Returns: tuple (CSV file, SQL query)
pit_csv, pit_query = pit_builder.to_csv_file()
pit_csv


### Now let's build an Athena Query to compare the results of the Offline SDK

First, we build a standard Athena query using a SELECT statement that returns ALL records for the given record_id, since we have not provided a WHERE clause to restrict the results set. 

In [None]:
# Create Athena Query and retrieve table name
lead_query = base_fg.athena_query()
print(lead_query)

lead_table = lead_query.table_name
print(lead_table)

In [None]:
# Define Athena query string and output location
lead_query_recid2 = f'SELECT * FROM "sagemaker_featurestore"."{lead_table}" ' \
               f'WHERE "{lead_table}"."Lead_ProspectID" = \'{record_id2}\' '

print(lead_query_recid2)

output_location = f's3://{s3_bucket_name}/{s3_prefix}/athena_query_results/'
print(output_location)

In [None]:
lead_query.run(query_string=lead_query_recid2, output_location=output_location)
lead_query.wait()

athena_results_df = lead_query.as_dataframe()
print(athena_results_df.shape)
athena_results_df.head()

### Comparing Athena Query Results

Notice that the Athena query results also contain fields attached by the Feature Store, such as `write_time` and `is_deleted`. These fields are added to the record as it is written to the datastore, and provide additional ways to query or sort the record data. 

#### Compare records from Athena Query to Point-in-Time results above

Notice that the Athena query from the SELECT statement above does not contain any point-in-time join semantics, so it returns all records that match the specified record_id (`Lead_ProspectID`). Next, we use Pandas to sort the Athena results by event times for easy comparison. Notice that the records with timestamps LATER than the events specified in the entity dataframe (e.g. '2023-01-15T00:00:00Z') submitted to the `point_in_time_accurate_join` do NOT show up in the point-in-time results. Since we additionally specified that we only want a single record from the `create_dataset` code above, we only get the latest record prior to the "cut off" time.

In [None]:
# Sort records from Athena Query by lead_eventtime column
sort_athena_df = athena_results_df.sort_values(by='lead_eventtime', axis=0, ascending=True, inplace=False)

for idx, row in sort_athena_df.iterrows():
    lead_rec_id = row['lead_prospectid']
    lead_source = row['leadsource']
    datetime_str = row['lead_eventtime']
    print(f'Lead_ProspectID: {lead_rec_id} Lead_Source: {lead_source} DateTime: {datetime_str}')

#### Create Target Feature Group from Dataframe

In [None]:
# Create Feature Group:
# Note: the target_name becomes the name of the FeatureGroup, and can be changed by user

target_name = "off-sdk-fg-webmarketing"

target_fg = FeatureGroup(name=target_name, sagemaker_session=feature_store_session)

In [None]:
# Create target Dataframe by using Pandas to read CSV file 
target_data_df = pd.read_csv("./data/WebMarketingData.csv")

# Event time type should be either Fractional(Unix timestamp in seconds) or String (ISO-8601 format) type
target_data_df['Web_EventTime'] = df_timestamps['Timestamp']

# Verify column names for WebMarketing dataset, with Primary Key 'Web_ProspectID'
print(f'Column names: {target_data_df.columns}')

In [None]:
target_fg.load_feature_definitions(data_frame=target_data_df)

target_data_df.head()

In [None]:
# Note: we disable the Online Store below, this notebook will focus only on Offline Store

target_fg.create(
    s3_uri=f"s3://{s3_bucket_name}/{s3_prefix}",
    record_identifier_name="Web_ProspectID",
    event_time_feature_name="Web_EventTime",
    role_arn=role,
    enable_online_store=True,  # Enable Online store for get_record and put_record
)

#### Ingest data into target Feature Group

In [None]:
sample_helper.wait_for_feature_group_creation_complete(feature_group=target_fg)

In [None]:
target_fg_resolved_output_s3_uri = target_fg.describe()["OfflineStoreConfig"]["S3StorageConfig"]["ResolvedOutputS3Uri"]

target_fg_s3_prefix = target_fg_resolved_output_s3_uri.replace(f"s3://{s3_bucket_name}/", "")
print(target_fg_s3_prefix)

#### Wait for data ingested into Feature Group to replicate to Offline Store

When data is written to a Feature Group (e.g. using `put_record` or `ingest`), it lands in the Online Store initially. Next, this data is replicated to the Offline Store hosted on S3, where it can be queried and loaded for pre-processing or model training. This replication can take anywhere from 5 minutes up to 15 minutes. The wait function below will test and wait for data to appear in the S3 Offline Store.


In [None]:
target_fg.ingest(data_frame=target_data_df, max_workers=3, wait=True)

sample_helper.wait_for_feature_group_data_ingest(s3_bucket_name, target_fg_s3_prefix)

#### We read a sample record (index 9999) and write a new version

This updated record will have a later `write_time` than the original record from the table. In the create_dataset `as_of` section below, we will use a carefully constructed datetime (cut-off time) to exclude this later version of the record from our dataframe.

In [None]:
sample_rec9999 = base_data_df.iloc[9999]

record_id9999 = sample_rec9999['Lead_ProspectID']
print(f'Sample record ID[9999]: {record_id9999}')

#### Use `get_record` to retrieve record and `put_record` to write new version

In [None]:
print(f'Using record ID: {record_id9999}')
colnames = list(target_data_df.columns.values)

rec = target_fg.get_record(record_id9999, colnames)
print(rec)

now_ts = int(round(time.time()))
now_iso8601 = sample_helper.convert_timestamp_to_iso8601(now_ts)
print(now_iso8601)

Notice we also update the `TotalWebVisits` field to `9999` to make it easy to identify this additional record in the results below.

In [None]:
updated_rec = []
for kv_field in rec:
    if kv_field['FeatureName'] == 'Web_EventTime':
        kv_field['ValueAsString'] = now_iso8601
    if kv_field['FeatureName'] == 'TotalWebVisits':
        kv_field['ValueAsString'] = '9999'
    updated_rec.append(kv_field)

print(updated_rec)

In [None]:
# Write updated record
featurestore_runtime.put_record(FeatureGroupName=target_fg.name, Record=updated_rec)

## Create Dataset from Joining two Feature Groups

Below, we use the `FeatureStore.create_dataset` and `with_feature_group` methods to build a dataset by joining two existing feature groups. To create the dataset, we provide the feature group references, the join key (referred to in the `target_feature_name_in_base` argument), and the set of features to include in the results.

In [None]:
# Create dataset by joining two Feature Groups 

join_builder = feature_store.create_dataset(
	base=base_fg,
	output_path=f"s3://{s3_bucket_name}/{s3_prefix}/dataset_query_results"
).with_feature_group(
        feature_group=target_fg, 
        target_feature_name_in_base="Lead_ProspectID", 
        included_feature_names=["Web_ProspectID", 'LastCampaignActivity', 'PageViewsPerVisit',
       'TotalTimeOnWebsite', 'TotalWebVisits', 'AttendedMarketingEvent',
       'OrganicSearch', 'ViewedAdvertisement']
)

In [None]:
# Use the builder to generate a Pandas Dataframe
join_df, join_query_str = join_builder.to_dataframe()

#### Notice that the joined result set includes columns from both the Leads and WebMarketing datasets

The DatasetBuilder functions will modify feature names as needed to avoid name collisions. For example, notice that features from the WebMarketing feature group below are appended, such as `Web_ProspectID.1`.

In [None]:
# Confirm list of columns and dimensions in the resultset
print(join_df.columns)
print(join_df.shape)

# Display few records from join dataframe
join_df.head(5)

### Write results to S3 as CSV file

To validate the join operation from the `with_feature_group` call above, we can use the builder object method `to_csv_file` to write the results to S3. We can download this CSV file to verify that the join was performed correctly.

In [None]:
# Returns: tuple (CSV file, SQL query)
s3_file, s3_query_str = join_builder.to_csv_file()
print(s3_file)

## Create Dataset using Event Time window

Setup the Event Time range window by converting Unix epoch times to Python datetimes. The `with_event_time_range` method allows the user to specify a time range (start and end) to apply to the WHERE clause of the query, thus providing a time constraint to records returned in the result set. 

In [None]:
# Setup Event Time window: seconds of unix epoch time

# Start at 07/01/2022 and set time window to one day
start_ts = 1656633600
time_window = 86400

# Using hard-coded timestamps from dataset, then adding time window
datetime_start = datetime.fromtimestamp(start_ts)
datetime_end = datetime.fromtimestamp(start_ts+time_window)
print(f'Setting retrieval time window: {datetime_start} until {datetime_end}')


In [None]:
# Create dataset with specified event_time window

time_window_builder = feature_store.create_dataset(
	base=base_fg, 
	output_path=f"s3://{s3_bucket_name}/{s3_prefix}/dataset_query_results"
).with_feature_group(
        feature_group=target_fg, 
        target_feature_name_in_base="Lead_ProspectID", 
        included_feature_names=["Web_ProspectID", 'LastCampaignActivity', 'PageViewsPerVisit',
       'TotalTimeOnWebsite', 'TotalWebVisits', 'AttendedMarketingEvent',
       'OrganicSearch', 'ViewedAdvertisement']
).with_event_time_range(
        starting_timestamp=datetime_start, 
        ending_timestamp=datetime_end
)


In [None]:
# Export to dataframe
time_window_df, time_window_query_str = time_window_builder.to_dataframe()

# Confirm list of columns in the result-set
print(time_window_df.columns)

# Confirm that only subset of records in result-set due to event_time window constraint
print(time_window_df.shape)

In [None]:
# Dump resulting dataframe
time_window_df.head(5)

#### We can directly view the SQL code produced by the Offline Store SDK

Whenever the `to_csv_file` or `to_dataframe` methods are called, a tuple is returned where the second member is the actual SQL query produced by the Offline SDK engine. This SQL code can be reviewed to better understand how the various methods impact the executed query. 

In [None]:
# Show SQL used to create dataset, using the event_time window constraint 
print(time_window_query_str)

## Create Dataset using as-of Timestamp

Next we demonstrate how to build a dataset and perform feature retrieval with the `as_of` method, which takes a single timestamp argument. 

Please Note: The `as_of` method applies the time constraint to the `write_time` internal field which is automatically generated by the Feature Store service. The `write_time` field represents the actual time the record is written to the datastore (as opposed to the client-provided `event_time`). The `write_time` field, along with other datastore-driven fields, is only retrievable using Athena, so we will construct Athena queries below to validate the results returned by the `as-of` method of the DatasetBuilder.

In [None]:
# Create Athena Query and retrieve table name
webmark_query = target_fg.athena_query()
print(webmark_query)

webmark_table = webmark_query.table_name
print(webmark_table)

### We choose a cut-off time from one of our two sample records for index 9999

Using an Athena query, we pull all records from the WebMarketing table with a matching record ID (using index 9999). We then sort them according to `write_time` and choose the latest value as the cut-off time for the `as_of` dataset. The latest value will correspond with the extra record written with `put_record` method above. We purposely wrote this record later so its `write_time` value would be different.

In [None]:
# Define Athena query string and output location
webmark_query_recids = f'SELECT * FROM "sagemaker_featurestore"."{webmark_table}" ' \
               f'WHERE "{webmark_table}"."Web_ProspectID" = \'{record_id9999}\' ' 
print(webmark_query_recids)

output_location = f's3://{s3_bucket_name}/{s3_prefix}/athena_query_results/'
print(output_location)

### We need to wait for last record to replicate to Offline Store

Whenever a record is written to the Online Store (using df.ingest or put_record), it is replicated after a short delay to the Offline storage on S3. Below, we run an Athena query in a loop that waits for the most recent record to arrive in the Offline Store.

In [None]:
# While loop to run Athena query and count number of rows returned

num_waits = 0
while num_waits < 10:
    webmark_query.run(query_string=webmark_query_recids, output_location=output_location)
    webmark_query.wait()
    athena_web_results_df = webmark_query.as_dataframe()
    num_rows, num_cols = athena_web_results_df.shape
    print(f'num_rows found: {num_rows}')
    if (num_rows > 1):
        break   # break out of while loop
    print("Waiting for latest record to arrive in offline store...")
    time.sleep(60)
    num_waits += 1


In [None]:
# Sort records from Athena query according to `write_time` field (descending)

sort_athena_df2 = athena_web_results_df.sort_values(by='write_time', axis=0, ascending=False, inplace=False)
sort_athena_df2.head()

In [None]:
# Use first record (with latest datetime) to create the cut-off value 
webmark_rec = sort_athena_df2.iloc[0]

webmark_write_time = webmark_rec['write_time']
print(f'write_time: {webmark_write_time}')

webmark_write_time_truncated = webmark_write_time[:-4]
print(f'write_time truncated: {webmark_write_time_truncated}')

datetime_format = "%Y-%m-%d" + " " + "%H:%M:%S" 

asof_cutoff_datetime = datetime.strptime(webmark_write_time_truncated, datetime_format)
print(f'as-of cut-off datetime: {asof_cutoff_datetime}')


In [None]:
# Create dataset using as-of timestamp
print(f'using cut-off time: {asof_cutoff_datetime}')

as_of_builder = feature_store.create_dataset(
	base=base_fg, 
	output_path=f"s3://{s3_bucket_name}/{s3_prefix}/dataset_query_results"
).with_feature_group(
        feature_group=target_fg, 
        target_feature_name_in_base='Lead_ProspectID', 
        included_feature_names=['Web_ProspectID', 'Web_EventTime', 'TotalWebVisits']
).as_of(asof_cutoff_datetime)


### Retrieval using the `as_of` method excludes records later than the supplied cut-off time from the result set

Our JOIN query for the entire dataset, prior to adding the new record above, resulted in 10,020 records (the intial 10,000 plus our time-series records). We then called `get_record` followed by `put_record` to create an additional modified version of one record (using record index 9999). Applying the `as_of` criteria with a specific cut-off time eliminated the most recent record, the one written with `put_record` method. 

In [None]:
as_of_df, as_of_query = as_of_builder.to_dataframe()

print(as_of_df.shape)
print(as_of_df.columns)

### Using a Pandas query, we show that the later record is not part of the `as_of` dataframe

Here we run a Pandas query on the `as_of_df` dataframe with a query condition that matches the given record ID. The Pandas query only returns one record, the one corresponding to the earlier timestamp (the record with later `write_time` was excluded).

In [None]:
# Query condition using record index 9999

condition_id = f"`Web_ProspectID.1` == '{record_id9999}'"
print(condition_id)

subset_asof_df = as_of_df.query(condition_id, inplace=False)
print(subset_asof_df.shape)

subset_asof_df.head()

## Clean Up

The cell below will cleanup resources created by this notebook, including the Feature Groups and (optionally) the S3 bucket.

In [None]:
# delete feature groups created by this notebook

# Uncomment and run these lines of code to delete the feature groups
#base_fg.delete()
#target_fg.delete()