# Apache Flink Temporal Tables on Kinesis Data Analytics for Apache Flink

Temporal Tables allow you to match real-time events with historical events to find the right value at the right time.

For example, let's say we have Stream A which contains Stock Market data such as...

```
{
'ticker_symbol': 'ABCD',
'price': 10.0,
's_timestamp': '2023-01-18 17:46:09.191Z'
}

{
'ticker_symbol': 'EFGH',
'price': 9.0,
's_timestamp': '2023-01-18 17:46:10.191Z'
}
...
```

and stock transaction data which looks like:
```
{
  'id': "1",
  "transaction_ticker_symbol": "ABCD",
  "shares_purchased": 2,
  "t_timestamp":"2023-01-18 22:19:11.784Z"
}

{
  'id': "2",
  "transaction_ticker_symbol": "EFGH",
  "shares_purchased": 4,
  "t_timestamp":"2023-01-18 22:19:11.784Z"
}
```

We might want to find out what user `1` paid for stock `ABCD` at the time of transaction. A simple join could find all the matches, but a [temporal table join](https://flink.apache.org/2019/05/14/temporal-tables.html) will show us the point in time transaction price as of the purchase time with respect to the stock info.

Let's try it out. 

## Getting Started (Flink SQL)
You can run the end-to-end Flink SQL Example by standing up an [MSK Cluster](https://aws.amazon.com/msk) as described in [this video](https://www.youtube.com/watch?v=2Qhc6ePu-0M) or [this article](https://docs.aws.amazon.com/kinesisanalytics/latest/java/gs-table-create.html), and then launching a Kinesis Data Analytics Studio application within the same VPC.

Import the zeppelin notebook located [here](src/main/sql/) to run the end to end examples. Make sure you replace the Bootstrap Server String with your kafka connection string.

## Getting started (Table API)
To start using this example, you will need to install some prerequisites locally:

### Prerequisites

- [Java](https://www.java.com/en/download/help/download_options.html)
- [Maven](https://maven.apache.org/install.html)
- [Git](https://github.com/git-guides/install-git)
- [Docker Desktop](https://www.docker.com/products/docker-desktop/)
- [Apache Kafka](https://kafka.apache.org/downloads)
- [Apache Flink](https://nightlies.apache.org/flink/flink-docs-master/docs/try-flink/local_installation/)

## How do I use this repo?

- To get started with this repo, first ensure you have the [necessary prerequisites](#prerequisites) installed.

### Terminal Commands

- Clone this git repository to your local machine.

```bash
git clone git@github.com:aws-samples/apache-flink-temporal-tables-example.git
```
- Once in the project root, open a terminal session and navigate to the src/main/supplemental-resources page.

NOTE: For Apache Kafka commands you can modify your PATH variable to include kafka shell commands so that you don't need to reference the full Kafka directory for this sample:

```bash
PATH="$PATH:~/kafka_2.13-2.8.1/bin"
```

- From here you can paste the following bash commands

```bash
Note: all commands meant to be run from src/main/supplemental-resources directory

# start up kafka
cd kafka-docker
docker-compose up -d

#create kafka topics
kafka-topics.sh --create --topic stock-topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 2 --config "cleanup.policy=compact"  --config "delete.retention.ms=100"  --config "segment.ms=100" --config "min.cleanable.dirty.ratio=0.01"
kafka-topics.sh --create --topic transaction-topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 2
kafka-topics.sh --create --topic output --bootstrap-server localhost:9092 --partitions 3 --replication-factor 2
```

We've just started up a docker container running Kafka and created 3 topics--:
- stock-topic
- transaction-topic
- output

Each with 3 partitions and a replication factor of 3

- Next, run the DatagenJob.java class found in `src/main/java/temporal.tables/` to begin generating data into your `stock-topic` and `transaction-topic` topics. 

Note: [Running from IntelliJ is simple to set up](https://catalog.us-east-1.prod.workshops.aws/workshops/429cec9e-3222-4943-82f7-1f45c45ed99a/en-US/1-localdev) and has many plugins for running Apache Flink

- Finally, run the DataStreamJob.java class found in `src/main/java/temporal.tables` to run  your temporal table join of the two topics and write them out to the `output` topic. 

When run locally, you can view this running job at https://localhost:8081.


- To see output, run the following command from the terminal:



```bash
# view output
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic output
```


https://user-images.githubusercontent.com/74626212/219805515-0f692993-6132-4cc6-a1b4-6a50eb1c1a56.mp4



### Deploying and Running on KDA

In order to run this example on KDA, you will first need to [create an MSK Cluster](https://docs.aws.amazon.com/msk/latest/developerguide/create-cluster.html), and then [create the KDA application within the same VPC](https://docs.aws.amazon.com/kinesisanalytics/latest/java/example-msk.html#example-msk-create). [Ensure that you have the right networking setup so that KDA can connect to MSK](https://docs.aws.amazon.com/kinesisanalytics/latest/java/vpc.html).





### To deploy onto Kinesis Data Analytics for Apache Flink
In order to deploy your application, you can follow [these steps](https://catalog.us-east-1.prod.workshops.aws/workshops/429cec9e-3222-4943-82f7-1f45c45ed99a/en-US/4-packagingforkda) for best practices and how to package / deploy. Ensure you set the appropriate values for the Kafka Brokers, topics, etc rather than using `localhost:9092`. 

Included in the project is also a [sample KDA Studio notebook](src/main/supplemental-resources/kafka-admin.zpln) for producing / consuming data into your MSK Cluster once deployed onto your AWS Infrastructure.