package com.amazonaws.athena.connector.lambda.examples;
/*-
* #%L
* Amazon Athena Query Federation SDK
* %%
* Copyright (C) 2019 Amazon Web Services
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import com.amazonaws.athena.connector.lambda.QueryStatusChecker;
import com.amazonaws.athena.connector.lambda.data.Block;
import com.amazonaws.athena.connector.lambda.data.BlockAllocator;
import com.amazonaws.athena.connector.lambda.data.BlockWriter;
import com.amazonaws.athena.connector.lambda.data.FieldBuilder;
import com.amazonaws.athena.connector.lambda.data.SchemaBuilder;
import com.amazonaws.athena.connector.lambda.domain.Split;
import com.amazonaws.athena.connector.lambda.domain.TableName;
import com.amazonaws.athena.connector.lambda.exceptions.FederationThrottleException;
import com.amazonaws.athena.connector.lambda.handlers.MetadataHandler;
import com.amazonaws.athena.connector.lambda.metadata.GetSplitsRequest;
import com.amazonaws.athena.connector.lambda.metadata.GetSplitsResponse;
import com.amazonaws.athena.connector.lambda.metadata.GetTableLayoutRequest;
import com.amazonaws.athena.connector.lambda.metadata.GetTableRequest;
import com.amazonaws.athena.connector.lambda.metadata.GetTableResponse;
import com.amazonaws.athena.connector.lambda.metadata.ListSchemasRequest;
import com.amazonaws.athena.connector.lambda.metadata.ListSchemasResponse;
import com.amazonaws.athena.connector.lambda.metadata.ListTablesRequest;
import com.amazonaws.athena.connector.lambda.metadata.ListTablesResponse;
import com.amazonaws.athena.connector.lambda.request.FederationRequest;
import com.amazonaws.athena.connector.lambda.request.PingRequest;
import com.amazonaws.athena.connector.lambda.security.EncryptionKey;
import com.amazonaws.athena.connector.lambda.security.EncryptionKeyFactory;
import com.amazonaws.athena.connector.lambda.security.FederatedIdentity;
import com.amazonaws.services.athena.AmazonAthena;
import com.amazonaws.services.secretsmanager.AWSSecretsManager;
import org.apache.arrow.util.VisibleForTesting;
import org.apache.arrow.vector.complex.reader.FieldReader;
import org.apache.arrow.vector.types.DateUnit;
import org.apache.arrow.vector.types.FloatingPointPrecision;
import org.apache.arrow.vector.types.Types;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
/**
* All items in the "com.amazonaws.athena.connector.lambda.examples" that this class belongs to are part of an
* 'Example' connector. We do not recommend using any of the classes in this package directly. Instead you can/should
* copy and modify as needed.
*
* This class defined an example MetadataHandler that supports a single schema and single table which showcases most
* of the features offered by the Amazon Athena Query Federation SDK. Some notable characteristics include:
* 1. Highly partitioned table.
* 2. Paginated split generation.
* 3. S3 Spill support.
* 4. Spill encryption using either KMS KeyFactory or LocalKeyFactory.
* 5. A wide range of field types including complex Struct and List types.
*
*
* @note All schema names, table names, and column names must be lower case at this time. Any entities that are uppercase or
* mixed case will not be accessible in queries and will be lower cased by Athena's engine to ensure consistency across
* sources. As such you may need to handle this when integrating with a source that supports mixed case. As an example,
* you can look at the CloudwatchTableResolver in the athena-cloudwatch module for one potential approach to this challenge.
*
* @see MetadataHandler
*/
public class ExampleMetadataHandler
extends MetadataHandler
{
private static final Logger logger = LoggerFactory.getLogger(ExampleMetadataHandler.class);
//Used to aid in diagnostic logging
private static final String SOURCE_TYPE = "custom";
//The name of the Lambda Environment vaiable that toggles generating simulated Throttling events to trigger Athena's
//Congestion control logic.
private static final String SIMULATE_THROTTLES = "SIMULATE_THROTTLES";
//The number of splits to generated for each Partition. Keep in mind this connector generates random data, a real
//source is unlikely to have such a setting.
protected static final int NUM_PARTS_PER_SPLIT = 10;
//This is used to illustrate how to use continuation tokens to handle partitions that generate a large number
//of splits. This helps avoid hitting the Lambda response size limit.
protected static final int MAX_SPLITS_PER_REQUEST = 300;
//Field name for storing partition location information.
protected static final String PARTITION_LOCATION = "location";
//Field name for storing an example property on our partitions and splits.
protected static final String SERDE = "serde";
//Stores how frequently to generate a simulated throttling event.
private final int simulateThrottle;
//Controls if spill encryption should be enabled or disabled.
private boolean encryptionEnabled = true;
//Counter that is used in conjunction with simulateThrottle to generated simulated throttling events.
private int count = 0;
/**
* Default constructor used by Lambda.
*/
public ExampleMetadataHandler()
{
super(SOURCE_TYPE);
this.simulateThrottle = (System.getenv(SIMULATE_THROTTLES) == null) ? 0 : Integer.parseInt(System.getenv(SIMULATE_THROTTLES));
}
/**
* Full DI constructor used mostly for testing
*
* @param keyFactory The EncryptionKeyFactory to use for spill encryption.
* @param awsSecretsManager The AWSSecretsManager client that can be used when attempting to resolve secrets.
* @param athena The Athena client that can be used to fetch query termination status to fast-fail this handler.
* @param spillBucket The S3 Bucket to use when spilling results.
* @param spillPrefix The S3 prefix to use when spilling results.
*/
@VisibleForTesting
protected ExampleMetadataHandler(EncryptionKeyFactory keyFactory,
AWSSecretsManager awsSecretsManager,
AmazonAthena athena,
String spillBucket,
String spillPrefix)
{
super(keyFactory, awsSecretsManager, athena, SOURCE_TYPE, spillBucket, spillPrefix);
//Read the Lambda environment variable for controlling simulated throttles.
this.simulateThrottle = (System.getenv(SIMULATE_THROTTLES) == null) ? 0 : Integer.parseInt(System.getenv(SIMULATE_THROTTLES));
}
/**
* Used to toggle encryption during unit tests.
*
* @param enableEncryption
*/
@VisibleForTesting
protected void setEncryption(boolean enableEncryption)
{
this.encryptionEnabled = enableEncryption;
}
/**
* Demonstrates how you can capture the identity of the caller that ran the Athena query which triggered the Lambda invocation.
*
* @param request
*/
private void logCaller(FederationRequest request)
{
FederatedIdentity identity = request.getIdentity();
logger.info("logCaller: account[" + identity.getAccount() + "] id[" + identity.getId() + "] principal[" + identity.getPrincipal() + "]");
}
/**
* Returns a static, single schema. A connector for a real data source would likely query that source's metadata
* to create a real list of schemas.
*
* @param allocator Tool for creating and managing Apache Arrow Blocks.
* @param request Provides details on who made the request and which Athena catalog they are querying.
* @return The ListSchemasResponse which mostly contains the list of schemas (aka databases).
*/
@Override
public ListSchemasResponse doListSchemaNames(BlockAllocator allocator, ListSchemasRequest request)
{
logCaller(request);
List schemas = new ArrayList<>();
schemas.add(ExampleTable.schemaName);
return new ListSchemasResponse(request.getCatalogName(), schemas);
}
/**
* Returns a static list of TableNames. A connector for a real data source would likely query that source's metadata
* to create a real list of TableNames for the requested schema name.
*
* @param allocator Tool for creating and managing Apache Arrow Blocks.
* @param request Provides details on who made the request and which Athena catalog and database they are querying.
* @return A ListTablesResponse containing the list of available TableNames.
*/
@Override
public ListTablesResponse doListTables(BlockAllocator allocator, ListTablesRequest request)
{
logCaller(request);
List tables = new ArrayList<>();
tables.add(new TableName(ExampleTable.schemaName, ExampleTable.tableName));
//The below filter for null schema is not typical, we do this to generate a specific semantic error
//that is exercised in our unit test suite.
return new ListTablesResponse(request.getCatalogName(),
tables.stream()
.filter(table -> request.getSchemaName() == null || request.getSchemaName().equals(table.getSchemaName()))
.collect(Collectors.toList()));
}
/**
* Retrieves a static Table schema for the example table. A connector for a real data source would likely query that
* source's metadata to create a table definition.
*
* @param allocator Tool for creating and managing Apache Arrow Blocks.
* @param request Provides details on who made the request and which Athena catalog, database, and table they are querying.
* @return A GetTableResponse containing the definition of the table (e.g. table schema and partition columns)
*/
@Override
public GetTableResponse doGetTable(BlockAllocator allocator, GetTableRequest request)
{
logCaller(request);
if (!request.getTableName().getSchemaName().equals(ExampleTable.schemaName) ||
!request.getTableName().getTableName().equals(ExampleTable.tableName)) {
throw new IllegalArgumentException("Unknown table " + request.getTableName());
}
Set partitionCols = new HashSet<>();
partitionCols.add("month");
partitionCols.add("year");
partitionCols.add("day");
return new GetTableResponse(request.getCatalogName(), request.getTableName(), ExampleTable.schema, partitionCols);
}
/**
* Here we inject the two additional columns we define for partition metadata. These columns are ignored by
* Athena but passed along to our code when Athena calls GetSplits(...). If you do not require any additional
* metadata on your partitions you may choose not to implement this function.
*
* @param partitionSchemaBuilder The SchemaBuilder you can use to add additional columns and metadata to the
* partitions response.
* @param request The GetTableLayoutResquest that triggered this call.
*/
@Override
public void enhancePartitionSchema(SchemaBuilder partitionSchemaBuilder, GetTableLayoutRequest request)
{
/**
* Add any additional fields we might need to our partition response schema.
* These additional fields are ignored by Athena but will be passed to GetSplits(...)
* when Athena calls our lambda function to plan the distributed read of our partitions.
*/
partitionSchemaBuilder.addField(PARTITION_LOCATION, new ArrowType.Utf8())
.addField(SERDE, new ArrowType.Utf8());
}
/**
* Our example table is partitions on year, month, day so we loop over a range of years, months, and days to generate
* our example partitions. A connector for a real data source would likely query that source's metadata
* to create a real list of partitions.
* @param writer Used to write rows (partitions) into the Apache Arrow response. The writes are automatically constrained.
* @param request Provides details of the catalog, database, and table being queried as well as any filter predicate.
* @param queryStatusChecker
*/
@Override
public void getPartitions(BlockWriter writer, GetTableLayoutRequest request, QueryStatusChecker queryStatusChecker)
{
logCaller(request);
/**
* Now use the constraint that was in the request to do some partition pruning. Here we are just
* generating some fake values for the partitions but in a real implementation you'd use your metastore
* or knowledge of the actual table's physical layout to do this.
*/
for (int year = 1990; year < 2020; year++) {
for (int month = 0; month < 12; month++) {
for (int day = 0; day < 30; day++) {
final int dayVal = day;
final int monthVal = month;
final int yearVal = year;
writer.writeRows((Block block, int rowNum) -> {
//these are our partition columns and were defined by the call to doGetTable(...)
boolean matched = true;
matched &= block.setValue("day", rowNum, dayVal);
matched &= block.setValue("month", rowNum, monthVal);
matched &= block.setValue("year", rowNum, yearVal);
//these are additional field we added by overriding enhancePartitionSchema(...)
matched &= block.setValue(PARTITION_LOCATION, rowNum, "s3://" + request.getPartitionCols());
matched &= block.setValue(SERDE, rowNum, "TextInputFormat");
//if all fields passed then we wrote 1 row
return matched ? 1 : 0;
});
}
}
}
}
/**
* For each partition we generate a pre-determined number of splits based on the NUM_PARTS_PER_SPLIT setting. This
* method also demonstrates how to handle calls for batches of partitions and also leverage this API's ability
* to paginated. A connector for a real data source would likely query that source's metadata to determine if/how
* to split up the read operations for a particular partition.
*
* @param allocator Tool for creating and managing Apache Arrow Blocks.
* @param request Provides details of the catalog, database, table, andpartition(s) being queried as well as
* any filter predicate.
* @return A GetSplitsResponse which contains a list of splits as an optional continuation token if we were not
* able to generate all splits for the partitions in this batch.
*/
@Override
public GetSplitsResponse doGetSplits(BlockAllocator allocator, GetSplitsRequest request)
{
logCaller(request);
logger.info("doGetSplits: spill location " + makeSpillLocation(request));
/**
* It is important to try and throw any throttling events before writing data since Athena may not be able to
* continue the query, due to consistency errors, if you throttle after writing data.
*/
if (simulateThrottle > 0 && count++ % simulateThrottle == 0) {
logger.info("readWithConstraint: throwing throttle Exception!");
throw new FederationThrottleException("Please slow down for this simulated throttling event");
}
ContinuationToken requestToken = ContinuationToken.decode(request.getContinuationToken());
int partitionContd = requestToken.getPartition();
int partContd = requestToken.getPart();
Set splits = new HashSet<>();
Block partitions = request.getPartitions();
for (int curPartition = partitionContd; curPartition < partitions.getRowCount(); curPartition++) {
//We use the makeEncryptionKey() method from our parent class to make an EncryptionKey
EncryptionKey encryptionKey = makeEncryptionKey();
//We prepare to read our custom metadata fields from the partition so that we can pass this info to the split(s)
FieldReader locationReader = partitions.getFieldReader(SplitProperties.LOCATION.getId());
locationReader.setPosition(curPartition);
FieldReader storageClassReader = partitions.getFieldReader(SplitProperties.SERDE.getId());
storageClassReader.setPosition(curPartition);
//Do something to decide if this partition needs to be subdivided into multiple, possibly concurrent,
//table scan operations (aka splits)
for (int curPart = partContd; curPart < NUM_PARTS_PER_SPLIT; curPart++) {
if (splits.size() >= MAX_SPLITS_PER_REQUEST) {
//We exceeded the number of split we want to return in a single request, return and provide
//a continuation token.
return new GetSplitsResponse(request.getCatalogName(),
splits,
ContinuationToken.encode(curPartition, curPart));
}
//We use makeSpillLocation(...) from our parent class to get a unique SpillLocation for each split
Split.Builder splitBuilder = Split.newBuilder(makeSpillLocation(request), encryptionEnabled ? encryptionKey : null)
.add(SplitProperties.LOCATION.getId(), String.valueOf(locationReader.readText()))
.add(SplitProperties.SERDE.getId(), String.valueOf(storageClassReader.readText()))
.add(SplitProperties.SPLIT_PART.getId(), String.valueOf(curPart));
//Add the partition column values to the split's properties.
//We are doing this because our example record reader depends on it, your specific needs
//will likely vary. Our example only supports a limited number of partition column types.
for (String next : request.getPartitionCols()) {
FieldReader reader = partitions.getFieldReader(next);
reader.setPosition(curPartition);
switch (reader.getMinorType()) {
case UINT2:
splitBuilder.add(next, Integer.valueOf(reader.readCharacter()).toString());
break;
case UINT4:
case INT:
splitBuilder.add(next, String.valueOf(reader.readInteger()));
break;
case UINT8:
case BIGINT:
splitBuilder.add(next, String.valueOf(reader.readLong()));
break;
default:
throw new RuntimeException("Unsupported partition column type. " + reader.getMinorType());
}
}
splits.add(splitBuilder.build());
}
//part continuation only applies within a partition so we complete that partial partition and move on
//to the next one.
partContd = 0;
}
return new GetSplitsResponse(request.getCatalogName(), splits, null);
}
/**
* We use the ping signal to simply log the fact that a ping request came in.
*
* @param request The PingRequest.
*/
public void onPing(PingRequest request)
{
logCaller(request);
}
/**
* We use this as our static metastore for the example implementation
*/
protected static class ExampleTable
{
public static final String schemaName = "custom_source";
public static final String tableName = "fake_table";
public static final Schema schema;
static {
schema = new SchemaBuilder().newBuilder()
.addField("col1", new ArrowType.Date(DateUnit.DAY))
.addField("day", new ArrowType.Int(32, true))
.addField("month", new ArrowType.Int(32, true))
.addField("year", new ArrowType.Int(32, true))
.addField("col3", new ArrowType.Bool())
.addField("col4", new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE))
.addField("col5", new ArrowType.Utf8())
.addField("datemilli", Types.MinorType.DATEMILLI.getType())
.addField("int", Types.MinorType.INT.getType())
.addField("tinyint", Types.MinorType.TINYINT.getType())
.addField("smallint", Types.MinorType.SMALLINT.getType())
.addField("bigint", Types.MinorType.BIGINT.getType())
.addField("float4", Types.MinorType.FLOAT4.getType())
.addField("float8", Types.MinorType.FLOAT8.getType())
.addField("bit", Types.MinorType.BIT.getType())
.addField("varchar", Types.MinorType.VARCHAR.getType())
.addField("varbinary", Types.MinorType.VARBINARY.getType())
.addField("decimal", new ArrowType.Decimal(10, 2))
.addField("decimalLong", new ArrowType.Decimal(36, 2))
//Example of a List of Structs
.addField(
FieldBuilder.newBuilder("list", new ArrowType.List())
.addField(
FieldBuilder.newBuilder("innerStruct", Types.MinorType.STRUCT.getType())
.addStringField("varchar")
.addBigIntField("bigint")
.build())
.build())
//Example of a List Of Lists
.addField(
FieldBuilder.newBuilder("outerlist", new ArrowType.List())
.addListField("innerList", Types.MinorType.VARCHAR.getType())
.build())
.addMetadata("partitionCols", "day,month,year")
.addMetadata("randomProp1", "randomPropVal1")
.addMetadata("randomProp2", "randomPropVal2").build();
}
private ExampleTable() {}
}
}