# Flint Index Reference Manual ## Overview ### What is Flint Index? A Flint index is ... ![Overview](https://user-images.githubusercontent.com/46505291/235786891-556cfde2-189c-4f65-b24f-c36e9c59a96a.png) ### Feature Highlights - Skipping Index - Partition: skip data scan by maintaining and filtering partitioned column value per file. - MinMax: skip data scan by maintaining lower and upper bound of the indexed column per file. - ValueSet: skip data scan by building a unique value set of the indexed column per file. Please see the following example in which Index Building Logic and Query Rewrite Logic column shows the basic idea behind each skipping index implementation. | Skipping Index | Create Index Statement | Index Building Logic | Query Rewrite Logic | |----------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | Partition | CREATE SKIPPING INDEX
ON alb_logs
FOR COLUMNS (
  year PARTITION,
  month PARTITION,
  day PARTITION,
  hour PARTITION
) | INSERT INTO flint_alb_logs_skipping_index
SELECT
  FIRST(year) AS year,
  FIRST(month) AS month,
  FIRST(day) AS day,
  FIRST(hour) AS hour,
  input_file_name() AS file_path
FROM alb_logs
GROUP BY
  input_file_name() | SELECT *
FROM alb_logs
WHERE year = 2023 AND month = 4
=>
SELECT *
FROM alb_logs (input_files =
  SELECT file_path
  FROM flint_alb_logs_skipping_index
  WHERE year = 2023 AND month = 4
)
WHERE year = 2023 AND month = 4 | | ValueSet | CREATE SKIPPING INDEX
ON alb_logs
FOR COLUMNS (
  elb_status_code VALUE_SET
) | INSERT INTO flint_alb_logs_skipping_index
SELECT
  COLLECT_SET(elb_status_code) AS elb_status_code,
  input_file_name() AS file_path
FROM alb_logs
GROUP BY
  input_file_name() | SELECT *
FROM alb_logs
WHERE elb_status_code = 404
=>
SELECT *
FROM alb_logs (input_files =
  SELECT file_path
  FROM flint_alb_logs_skipping_index
  WHERE ARRAY_CONTAINS(elb_status_code, 404)
)
WHERE elb_status_code = 404 | | MinMax | CREATE SKIPPING INDEX
ON alb_logs
FOR COLUMNS (
  request_processing_time MIN_MAX
) | INSERT INTO flint_alb_logs_skipping_index
SELECT
  MIN(request_processing_time) AS request_processing_time_min,
  MAX(request_processing_time) AS request_processing_time_max,
  input_file_name() AS file_path
FROM alb_logs
GROUP BY
  input_file_name() | SELECT *
FROM alb_logs
WHERE request_processing_time = 100
=>
SELECT *
FROM alb_logs (input_files =
SELECT file_path
  FROM flint_alb_logs_skipping_index
  WHERE request_processing_time_min <= 100
    AND 100 <= request_processing_time_max
)
WHERE request_processing_time = 100 ### Flint Index Specification #### Metadata Currently, Flint metadata is only static configuration without version control and write-ahead log. ```json { "version": "0.1", "indexConfig": { "kind": "skipping", "properties": { "indexedColumns": [{ "kind": "...", "columnName": "...", "columnType": "..." }] } }, "source": "alb_logs", "state": "active", "enabled": true } ``` #### Field Data Type For now, Flint Index doesn't define its own data type and uses OpenSearch field type instead. | **FlintDataType** | |-------------------| | boolean | | long | | integer | | short | | byte | | double | | float | | date | | keyword | | text | | object | #### File Format Please see Index Store section for more details. ## User Guide ### SDK `FlintClient` provides low-level Flint index management and data access API. Index management API example: ```java // Initialized Flint client for a specific storage FlintClient flintClient = new FlintOpenSearchClient("localhost", 9200); FlintMetadata metadata = new FlintMetadata(...) flintClient.createIndex("alb_logs_skipping_index", metadata) flintClient.getIndexMetadata("alb_logs_skipping_index") ``` Index data read and write example: ```java FlintClient flintClient = new FlintOpenSearchClient("localhost", 9200); // read example FlintReader reader = flintClient.createReader("indexName", null)\ while(reader.hasNext) { reader.next() } reader.close() // write example FlintWriter writer = flintClient.createWriter("indexName") writer.write("{\"create\":{}}") writer.write("\n") writer.write("{\"aInt\":1}") writer.write("\n") writer.flush() writer.close() ``` ### API High level API is dependent on query engine implementation. Please see Query Engine Integration section for details. ### SQL DDL statement: ```sql CREATE SKIPPING INDEX ON ( column [, ...] ) WHERE WITH (auto_refresh = (true|false)) REFRESH SKIPPING INDEX ON DESCRIBE SKIPPING INDEX ON DROP SKIPPING INDEX ON ::= [db_name].[schema_name].table_name ``` Skipping index type: ```sql ::= { PARTITION, VALUE_SET, MIN_MAX } ``` Example: ```sql CREATE SKIPPING INDEX ON alb_logs ( elb_status_code VALUE_SET ) WHERE time > '2023-04-01 00:00:00' REFRESH SKIPPING INDEX ON alb_logs DESCRIBE SKIPPING INDEX ON alb_logs DROP SKIPPING INDEX ON alb_logs ``` ## Index Store ### OpenSearch OpenSearch stores the Flint index in an OpenSearch index of the given name. In the index mapping, the `_meta` and `properties`field stores meta and schema info of a Flint index. ```json { "_meta": { "version": "0.1", "indexConfig": { "kind": "skipping", "properties": { "indexedColumns": [ { "kind": "Partition", "columnName": "year", "columnType": "int" }, { "kind": "ValuesSet", "columnName": "elb_status_code", "columnType": "int" } ] } }, "source": "alb_logs" }, "properties": { "year": { "type": "integer" }, "elb_status_code": { "type": "integer" }, "file_path": { "type": "keyword" } } } ``` ## Query Engine Integration ### Apache Spark #### Configurations - `spark.datasource.flint.host`: default is localhost. - `spark.datasource.flint.port`: default is 9200. - `spark.datasource.flint.scheme`: default is http. valid values [http, https] - `spark.datasource.flint.auth`: default is false. valid values [false, sigv4] - `spark.datasource.flint.region`: default is us-west-2. only been used when auth=sigv4 - `spark.datasource.flint.write.id_name`: no default value. - `spark.datasource.flint.ignore.id_column` : default value is true. - `spark.datasource.flint.write.batch_size`: default value is 1000. - `spark.datasource.flint.write.refresh_policy`: default value is false. valid values [NONE(false), IMMEDIATE(true), WAIT_UNTIL(wait_for)] - `spark.datasource.flint.read.scroll_size`: default value is 100. - `spark.flint.optimizer.enabled`: default is true. - `spark.flint.index.hybridscan.enabled`: default is false. #### Data Type Mapping The following table define the data type mapping between Flint data type and Spark data type. | **FlintDataType** | **SparkDataType** | |-------------------|----------------------------------| | boolean | BooleanType | | long | LongType | | integer | IntegerType | | short | ShortType | | byte | ByteType | | double | DoubleType | | float | FloatType | | date(Timestamp) | DateType | | date(Date) | TimestampType | | keyword | StringType | | text | StringType(meta(osType)=text) | | object | StructType | * currently, Flint data type only support date. it is mapped to Spark Data Type based on the format: * Map to DateType if format = strict_date, (we also support format = date, may change in future) * Map to TimestampType if format = strict_date_optional_time_nanos, (we also support format = strict_date_optional_time | epoch_millis, may change in future) #### API Here is an example for Flint Spark integration: ```scala val flint = new FlintSpark(spark) flint.skippingIndex() .onTable("alb_logs") .filterBy("time > 2023-04-01 00:00:00") .addPartitions("year", "month", "day") .addValueSet("elb_status_code") .addMinMax("request_processing_time") .create() flint.refresh("flint_alb_logs_skipping_index", FULL) ``` #### Skipping Index Provider SPI ```scala trait FlintSparkSkippingStrategy { TODO: outputSchema, getAggregators, rewritePredicate } ``` #### Flint DataSource Read/Write Here is an example for read index data from AWS OpenSearch domain. ```scala spark.conf.set("spark.datasource.flint.host", "yourdomain.us-west-2.es.amazonaws.com") spark.conf.set("spark.datasource.flint.port", "-1") spark.conf.set("spark.datasource.flint.scheme", "https") spark.conf.set("spark.datasource.flint.auth", "sigv4") spark.conf.set("spark.datasource.flint.region", "us-west-2") spark.conf.set("spark.datasource.flint.refresh_policy", "wait_for") val df = spark.range(15).toDF("aInt") val re = df.coalesce(1) .write .format("flint") .mode("overwrite") .save("t001") val df = new SQLContext(sc).read .format("flint") .load("t001") ``` ## Benchmarks TODO ## Limitations ### Query Optimization For now, only single or conjunct conditions (conditions connected by AND) in WHERE clause can be optimized by skipping index. ### Index Refresh Job Management Manual refreshing a table which already has skipping index being auto-refreshed, will be prevented. However, this assumption relies on the condition that the incremental refresh job is actively running in the same Spark cluster, which can be identified when performing the check.