### Setup

This notebook performs the following setup actions for this example use of Amazon SageMaker Feature Store:

1. Create online-only feature groups
2. Create source and destination Amazon Managed Streaming for Kafka (MSK) topics.
3. Create an Amazon Kinesis Data Applications (KDA) application that aggregates data from source topic and loads to the target topic.

**Recommended settings to run this notebook in SageMaker Studio:**

- Image: Data Science
- Kernel: Python3
- Instance type: ml.m5.large (2 vCPU + 8 GiB)

**Important Note:**

DO NOT "Run All Cells" on this notebook, manual steps are needed for successful execution.

### Get ARN's of Lambda functions from CloudFormation stack outputs
1. InvokeFraudEndpointLambdaARN
2. StreamingAggLambdaARN

Be sure to set the stack name in cell below to match the name of the CloudFormation stack used to create this SageMaker domain. If you used the default stack name, you should not need to make any updates

![SegmentLocal](images/get-cloudformation-stack-name.gif "get_cf_name")

In [None]:
STACK_NAME = 'sagemaker-featurestore-msk-kda-stack' # if you're not using the default stack name, replace this
%store STACK_NAME

In [None]:
import sys
import boto3

cf_client = boto3.client('cloudformation')

try:
 outputs = cf_client.describe_stacks(StackName=STACK_NAME)['Stacks'][0]['Outputs']
 for o in outputs:
 if o['OutputKey'] == 'IngestLambdaFunctionARN':
 lambda_to_fs_arn = o['OutputValue']
 if o['OutputKey'] == 'PredictLambdaFunctionARN':
 lambda_to_model_arn = o['OutputValue']
 if o['OutputKey'] == 'PredictLambdaFunctionName':
 predict_lambda_name = o['OutputValue']
 if o['OutputKey'] == 'AggregateFeatureStoreNameOutput':
 aggregate_feature_group_name = o['OutputValue'] 
 if o['OutputKey'] == 'AggregateBatchFeatureStoreNameOutput':
 aggregate_batch_feature_group_name = o['OutputValue']
except:
 msg = f'CloudFormation stack {STACK_NAME} was not found. Please set the STACK_NAME properly and re-run this cell'
 sys.exit(ValueError(msg))

In [None]:
print(f'lambda_to_model_arn: {lambda_to_model_arn}')
print(f'lambda_to_fs_arn: {lambda_to_fs_arn}')
print(f'predict_lambda_name: {predict_lambda_name}')
print(f'aggregate_feature_group_name: {aggregate_feature_group_name}')
print(f'aggregate_batch_feature_group_name: {aggregate_batch_feature_group_name}')

In [None]:
try:
 response = cf_client.describe_stack_resource(
 StackName=STACK_NAME,
 LogicalResourceId='MSKCluster'
 )
 MSKClusterArn = response["StackResourceDetail"]["PhysicalResourceId"]
except:
 msg = f'CloudFormation stack {STACK_NAME} was not found. Please set the STACK_NAME properly and re-run this cell'
 sys.exit(ValueError(msg))

In [None]:
print(f'msk_cluster_arn: {MSKClusterArn}')

In [None]:
%store lambda_to_model_arn

In [None]:
%store predict_lambda_name

In [None]:
%store aggregate_feature_group_name

In [None]:
%store aggregate_batch_feature_group_name

In [None]:
%store MSKClusterArn

In [None]:
from IPython.display import display_html
def restartkernel() :
 display_html("",raw=True)

### Imports and other setup

In [None]:
from sagemaker import get_execution_role
import sagemaker
import boto3
import json

role = get_execution_role()
sm = boto3.Session().client(service_name='sagemaker')
smfs_runtime = boto3.Session().client(service_name='sagemaker-featurestore-runtime')

## Create online-only feature groups
When using Amazon SageMaker Feature Store, a core design decision is the definition of feature groups. For our credit card fraud detection use case, we have decided to use two of them:

1. aggregate_feature_group_name - holds aggregate features that will be updated in near real-time (streaming ingestion)
2. aggregate_batch_feature_group_name - holds aggregate features that will be updated in batch

Establishing a feature group is a one-time step and is done using the `CreateFeatureGroup` API. 

Feature groups can be created as **online-only**, **offline-only**, or both **online and offline**, which replicates updates from an online store to an offline store in Amazon S3. Since our focus in this example is on demonstrating the use of the feature store for online inference and streaming aggregation of features, we make each of our feature groups online-only.

In addition to a feature group name, we provide metadata about each feature in the group. We are using a json file to define the schema, but this is not a requirement. We use a schema file to demonstrate how you might capture the feature group definitions, enabling you to recreate them consistently as you move from a development environment to a test or production environment. In our schema file, we also highlight the record identifier and the event timestamp. All feature groups must have these two features, but you get to decide how to name them.

Here is a visual summary of the feature groups we will create below.

![SegmentLocal](images/feature_groups.png "connection")

#### cc-agg-fg schema

In [None]:
!pygmentize schema/cc-agg-fg-schema.json

#### cc-agg-batch-fg schema

In [None]:
!pygmentize schema/cc-agg-batch-fg-schema.json

#### Utility functions to simplify creation of feature groups
`schema_to_defs` takes our schema file and returns feature definitions, and the names of the record identifier and event timestamp feature.

In [None]:
def schema_to_defs(filename):
 schema = json.loads(open(filename).read())
 
 feature_definitions = []
 
 for col in schema['Features']:
 feature = {'FeatureName': col['name']}
 if col['type'] == 'double':
 feature['FeatureType'] = 'Fractional'
 elif col['type'] == 'bigint':
 feature['FeatureType'] = 'Integral'
 else:
 feature['FeatureType'] = 'String'
 feature_definitions.append(feature)

 return feature_definitions, schema['record_identifier_feature_name'], schema['event_time_feature_name']

`schema_to_fg` creates a feature group from a schema file. If no s3 URI is passed, an online-only feature group is created.

In [None]:
def create_feature_group_from_schema(filename, fg_name, role_arn=None, s3_uri=None):
 schema = json.loads(open(filename).read())
 
 feature_defs = []
 
 for col in schema['features']:
 feature = {'FeatureName': col['name']}
 if col['type'] == 'double':
 feature['FeatureType'] = 'Fractional'
 elif col['type'] == 'bigint':
 feature['FeatureType'] = 'Integral'
 else:
 feature['FeatureType'] = 'String'
 feature_defs.append(feature)

 record_identifier_name = schema['record_identifier_feature_name']
 event_time_name = schema['event_time_feature_name']

 if role_arn is None:
 role_arn = get_execution_role()

 if s3_uri is None:
 offline_config = {}
 else:
 offline_config = {'OfflineStoreConfig': {'S3StorageConfig': {'S3Uri': s3_uri}}}
 
 sm.create_feature_group(
 FeatureGroupName = fg_name,
 RecordIdentifierFeatureName = record_identifier_name,
 EventTimeFeatureName = event_time_name,
 FeatureDefinitions = feature_defs,
 Description = schema['description'],
 Tags = schema['tags'],
 OnlineStoreConfig = {'EnableOnlineStore': True},
 RoleArn = role_arn,
 **offline_config)

#### Create the two feature groups

In [None]:
create_feature_group_from_schema('schema/cc-agg-fg-schema.json', aggregate_feature_group_name)

In [None]:
create_feature_group_from_schema('schema/cc-agg-batch-fg-schema.json', aggregate_batch_feature_group_name)

#### Show that the feature store is aware of the new feature groups

In [None]:
sm.list_feature_groups()

#### Describe each feature group
Note that each feature group gets its own ARN, allowing you to manage IAM policies that control access to individual feature groups. The feature names and types are displayed, and the record identifier and event time features are called out specifically. Notice that there is only an `OnlineStoreConfig` and no `OfflineStoreConfig`, as we have decided not to replicate features offline for these groups.

In [None]:
sm.describe_feature_group(FeatureGroupName=aggregate_feature_group_name)

In [None]:
sm.describe_feature_group(FeatureGroupName=aggregate_batch_feature_group_name)

## Create Source and Destination Kafka topics

##### Why install Apache Kafka Libraries?

Amazon Managed Streaming for Apache Kafka (MSK) lets you use Apache Kafka data-plane operations to create topics and to produce and consume data. In MSK, You can use the AWS Management Console, the AWS Command Line Interface (AWS CLI), or the APIs in the SDK to perform control-plane operations. But since Kafka topic creation is a data-plane activity, we install Apache Kafka client libraries from a separate SageMaker Studio terminal window and execute it. For more details on how MSK works, refer to https://docs.aws.amazon.com/msk/latest/developerguide/what-is-msk.html. The required network and VPC configuration is already setup via the CloudFormation template.

Execute below commands in a separate terminal window in this SageMaker domain. Detailed instructions on how to create topics are available at https://docs.aws.amazon.com/msk/latest/developerguide/create-topic.html.

##### Retrieve MSK cluster connection string

The MSK cluster required would be created via the CloudFormation template. Retrieve the connection string for the pre-created MSK cluster using steps shown in this animation. Save off the connection string in a temporary location.

![SegmentLocal](images/get-msk-cluster-connection-string.gif "connection")

##### Apache Kafka client installation from SageMaker Studio terminal window


1. Navigate to SageMaker Studio environment.
2. In the top menu, hit "File" and choose "New" -> "Terminal".
3. Once a terminal window is fully available, execute commands in below cells. 
4. Replace the *cluster-connection-string* in commands below with Kafka connection string previously saved off in a temporary location.

##### Prepare the terminal environment

```
sudo yum -y update
sudo yum -y install java-1.8.0
sudo yum -y install wget
```

##### Create directory for Apache Kafka client download

```
mkdir kafka
chmod 777 kafka
cd kafka
```

##### Download and install Apache Kafka client

```
sudo wget https://archive.apache.org/dist/kafka/2.6.2/kafka_2.12-2.6.2.tgz
tar -xzf kafka_2.12-2.6.2.tgz
cd kafka_2.12-2.6.2
```

##### Create source topic (replace cluster-connection-string with MSK cluster connection string stored in temporary location)

```
bin/kafka-topics.sh --create --bootstrap-server --replication-factor 2 --partitions 1 --topic cctopic
```

##### Create destination topic (replace cluster-connection-string with connection string stored in temporary location)

```
bin/kafka-topics.sh --create --bootstrap-server --replication-factor 2 --partitions 1 --topic ccdesttopic
```

##### Verify created topics (replace cluster-connection-string with connection string stored in temporary location)

```
bin/kafka-topics.sh --bootstrap-server --list
```

##### To make cleanup easier remove the kafka client. Re-install using steps above if it had to be referred again.

```
cd ..
rm -rf kafka_2.12-2.6.2
rm -rf kafka_2.12-2.6.2.tgz
```

##### Exit out of the terminal window

```
exit
```

## Map the Kafka source topic as an event source for Lambda fraud detection

In [None]:
lambda_client = boto3.client('lambda')

source_topic_name = 'cctopic'

lambda_client.create_event_source_mapping(EventSourceArn=MSKClusterArn,
 FunctionName=lambda_to_model_arn,
 StartingPosition='LATEST',
 Enabled=True,
 Topics=[source_topic_name]
 ) #DestinationConfig would handle discarded record

## Map the Kafka destination topic as an event source for Lambda streaming ingest features

In [None]:
lambda_client = boto3.client('lambda')

dest_topic_name = 'ccdesttopic'

lambda_client.create_event_source_mapping(EventSourceArn=MSKClusterArn,
 FunctionName=lambda_to_fs_arn,
 StartingPosition='LATEST',
 Enabled=True,
 Topics=[dest_topic_name]
 ) #DestinationConfig would handle discarded records

## Create Amazon Kinesis Data Analytics for Apache Flink application

Now that we have source/destination MSK topics and have them hooked up to the respective Lambda functions, let us focus on creating a Amazon Kinesis Data Analytics for Apache Flink application using a pre-created Apache Zeppelin studio environment.

The MSK cluster connection string saved off in a temporary location would be needed again for steps below.

#### Open the pre-created KDA Studio Environment

For new projects, we recommend that you use the new Kinesis Data Analytics Studio. Kinesis Data Analytics Studio combines ease of use with advanced analytical capabilities, which makes it possible to build sophisticated stream processing applications in minutes. Let’s see how that works in practice. Studio notebooks for Kinesis Data Analytics allows you to interactively query data streams in real time, and easily build and run stream processing applications using standard SQL, Python, and Scala. With a few clicks in the AWS Management console, you can launch a serverless notebook to query data streams and get results in seconds. In steps below, we navigate to the Kinesis console, run the pre-created KDA Studio environment, upload sample notebook, key in MSK connection strings, build and deploy an executable KDA streaming application.

![SegmentLocal](images/start-open-kda-studio-environment.gif "studio")

##### Download pre-created Zeppelin Notebook

Download the pre-built Zeppelin notebook (from GitHub) into your local environment. This notebook defines the schema for source and destination Kafka topics. In addition, it also defines the logic for aggregating data in source topic and feeds into the destination topic using Apache Flink Streaming SQL. To learn more on how this works, refer to https://docs.aws.amazon.com/kinesisanalytics/latest/java/how-zeppelin-interactive.html.

![SegmentLocal](images/download-zeppelin-notebook.gif "zpln_download")

##### Upload pre-created Zeppelin Notebook

Upload the pre-built Zeppelin notebook (from GitHub) into this Studio environment. 

Do not execute invdividual cells in the imported notebook. The notebook would be built and deployed directly from Zeppelin environment. 

![SegmentLocal](images/upload-kda-msk-flink-note-zeppelin.gif "zpln_upload")

##### Add MSK cluster connection string to cells

Using Apache Kafka client libraries, the source and target topics were created in the MSK cluster previously from SageMaker Studio terminal. In this step, the MSK cluster's connection string (saved off in a temporary location) is keyed into the Zeppelin notebook cells. In these cells, the notebook creates the metadata for the topics which is registered via AWS Glue Schema Registry.

![SegmentLocal](images/key-in-msk-cluster-connection-string.gif "zpln_connection_string")

##### Build KDA app from Zeppelin Notebook

Building a Kinesis Data Analytics Application by building the Zeppelin notebook. This step approximately takes about 5 minutes.

![SegmentLocal](images/build-kda-app.gif "zpln_build_app")

##### Deploy KDA app and run

Now the application code is built and uploaded to S3. Next steps are to create a KDA application from it and start it. Starting a Kinesis Data Analytics application typically takes several minutes. Ensure that the CloudFormation created role that has text "KDAStreamingApplicationExecutionRole" completely or partly in its name.

![SegmentLocal](images/deploy-run-kda-app.gif "zpln_deploy_app")