/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: MIT-0
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of this
* software and associated documentation files (the "Software"), to deal in the Software
* without restriction, including without limitation the rights to use, copy, modify,
* merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED,
* INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
* PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
* OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
using Amazon.Athena;
using Amazon.Athena.Model;
using AthenaNetCore.BusinessLogic.Entities;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
namespace AthenaNetCore.BusinessLogic.Extentions
{
internal static class AmazonAthenaClientExtentions
{
private const int SLEEP_AMOUNT_IN_MS = 1000;
private static readonly string S3_RESULT_BUCKET_NAME = Environment.GetEnvironmentVariable("S3_RESULT_BUCKET_NAME") ?? string.Empty;
///
/// Execute an SQL query using Amazon Athena, wait for the result of the query
/// and map the result to a C# Entity object. If the Amazon Athena did not complete
/// processing the giving query and the timeout is reached, this method
/// will throw exception and return the QueryExecutionId that
/// can be used later to get the result
///
/// Type of the entity result
/// Instance of Amazon Athena Client
/// SQL query
/// default 2 minutes: timeout in minutes for the application to abort waiting.
/// The enumerator, which supports a simple iteration over a collection of a specified type
public static async Task> QueryAsync(this IAmazonAthena athenaClient, string queryString, int timeoutInMinutes = 2) where T : new()
{
if (athenaClient == null || string.IsNullOrEmpty(queryString))
{
return default;
}
var qry = await athenaClient.StartQueryExecutionAsync(new StartQueryExecutionRequest
{
QueryString = queryString,
ResultConfiguration = new ResultConfiguration
{
OutputLocation = S3_RESULT_BUCKET_NAME
}
});
await WaitForQueryToComplete(athenaClient, qry.QueryExecutionId, timeoutInMinutes);
return await ProcessQueryResultsAsync(athenaClient, qry.QueryExecutionId);
}
///
/// Execute an SQL query using Amazon Athena and return QueryExecutionId
/// without waiting for the result, the QueryExecutionId can be used later to get the result.
///
/// Instance of Amazon Athena Client
/// SQL query
/// Eexecution Id to track the query progress
public static async Task QueryAndGoAsync(this IAmazonAthena athenaClient, string queryString)
{
var qry = await athenaClient.StartQueryExecutionAsync(new StartQueryExecutionRequest
{
QueryString = queryString,
ResultConfiguration = new ResultConfiguration
{
OutputLocation = S3_RESULT_BUCKET_NAME
}
});
return qry.QueryExecutionId;
}
///
/// Retive the query result and return it as a collection of a specified type
///
/// Type of the entity result
/// Instance of Amazon Athena Client
///
///
public static async Task> ProcessQueryResultsAsync(this IAmazonAthena athenaClient, string queryExecutionId) where T : new()
{
if (athenaClient == null || string.IsNullOrEmpty(queryExecutionId))
{
return default;
}
var results = new List();
try
{
// Max Results can be set but if its not set,
// it will choose the maximum page size
// As of the writing of this code, the maximum value is 1000
var getQueryResultsRequest = new GetQueryResultsRequest { QueryExecutionId = queryExecutionId };
var getQueryResultsResults = await athenaClient.GetQueryResultsAsync(getQueryResultsRequest);
var columnInfoList = getQueryResultsResults.ResultSet.ResultSetMetadata.ColumnInfo;
var rows = getQueryResultsResults.ResultSet.Rows;
var columnPositionMap = MapColumnsPositions(rows[0].Data, columnInfoList);
rows.RemoveAt(0);
foreach (var row in rows)
{
results.Add(ProcessRow(row.Data, columnPositionMap));
}
}
catch (AmazonAthenaException e)
{
Debug.WriteLine(e);
throw;
}
return results;
}
///
/// Check if the query still running and return FALSE n case of completion.
/// Otherwise It will return TRUE or throw an exception in case of Failed or Cancelled
///
///
///
///
public static async Task IsTheQueryStillRunning(this IAmazonAthena athenaClient, string queryExecutionId)
{
var getQueryExecutionRequest = new GetQueryExecutionRequest { QueryExecutionId = queryExecutionId };
bool isQueryStillRunning = true;
var getQueryExecutionResponse = await athenaClient.GetQueryExecutionAsync(getQueryExecutionRequest);
var queryState = getQueryExecutionResponse.QueryExecution.Status.State;
if (queryState == QueryExecutionState.FAILED)
{
throw new AmazonAthenaException("Query Failed to run with Error Message: " + getQueryExecutionResponse.QueryExecution.Status.StateChangeReason);
}
else if (queryState == QueryExecutionState.CANCELLED)
{
throw new AmazonAthenaException("Query was cancelled.");
}
else if (queryState == QueryExecutionState.SUCCEEDED)
{
isQueryStillRunning = false;
}
Debug.WriteLine("Current Status is: " + queryState);
return isQueryStillRunning;
}
///
/// Wait for Amazon Athena to complete execution of the query. If It is not completed until the timeout, then It should throw exception.
///
/// Amazon Athena Client instance
/// Eexecution Id to track the query progress
/// max DateTime to wait before abort
///
private static async Task WaitForQueryToComplete(IAmazonAthena athenaClient, string queryExecutionId, int timeout)
{
bool isQueryStillRunning = true;
DateTimeOffset endTimeOffset = DateTimeOffset.Now.AddMinutes(timeout);
while (isQueryStillRunning && DateTimeOffset.Now <= endTimeOffset)
{
isQueryStillRunning = await athenaClient.IsTheQueryStillRunning(queryExecutionId);
if (isQueryStillRunning)
{
// Sleep an amount of time before retrying again.
await Task.Delay(SLEEP_AMOUNT_IN_MS);
}
}
if (isQueryStillRunning && DateTimeOffset.Now > endTimeOffset)
{
throw new AmazonAthenaException("Timeout: Amazon Athena still processing your query, use the RequestId to get the result later.")
{
RequestId = queryExecutionId
};
}
}
///
/// Map the columns with tier positions on the first row and the columns Info
///
/// first row columns position
/// columns info
///
private static IReadOnlyDictionary MapColumnsPositions(IReadOnlyList columnsPositions, IReadOnlyList columnInfoList)
{
var result = new Dictionary();
if (columnsPositions == null || columnInfoList == null)
{
return result;
}
for (int i = 0; i < columnsPositions.Count; i++)
{
var column = columnsPositions[i].VarCharValue.ToLower();
result.Add(column, new ColumnPositionInfo
{
IndexPosition = i,
ColumnInfo = columnInfoList.FirstOrDefault(f => f.Name.ToLower() == column)
});
}
return result;
}
///
/// Compute the row data to entity object
///
/// Specified entity object type
/// Collection of columns data in row
/// Map of columns position and info for each data
///
private static T ProcessRow(IReadOnlyList columnsData, IReadOnlyDictionary columnsPositionMap) where T : new()
{
//Debug.WriteLine(string.Join(" | ", columnsData.Select(s => s.VarCharValue)));
var entityItem = new T();
foreach (var prop in entityItem.GetType().GetProperties())
{
var propColumnName = prop.Name.ToLower();
var att = prop.GetCustomAttributes(typeof(AthenaColumnAttribute), false)?.FirstOrDefault();
if (att is AthenaColumnAttribute attribute)
{
propColumnName = attribute.ColumnName;
}
if (columnsPositionMap.ContainsKey(propColumnName))
{
var mapped = columnsPositionMap[propColumnName];
var athenaColumnInfo = mapped.ColumnInfo;
var i = mapped.IndexPosition;
//For more detail about Amazon Athena Data Type, check: https://docs.aws.amazon.com/athena/latest/ug/data-types.html
if (athenaColumnInfo.Type == "integer" || athenaColumnInfo.Type == "tinyint" || athenaColumnInfo.Type == "smallint")
{
prop.SetValue(entityItem, Convert.ToInt32(columnsData[i]?.VarCharValue));
}
else if (athenaColumnInfo.Type == "bigint")
{
prop.SetValue(entityItem, Convert.ToInt64(columnsData[i]?.VarCharValue));
}
else if (athenaColumnInfo.Type == "double" || athenaColumnInfo.Type == "float")
{
prop.SetValue(entityItem, Convert.ToDouble(columnsData[i]?.VarCharValue));
}
else if (athenaColumnInfo.Type == "decimal")
{
prop.SetValue(entityItem, Convert.ToDecimal(columnsData[i]?.VarCharValue));
}
else if (athenaColumnInfo.Type == "date" || athenaColumnInfo.Type == "timestamp")
{
prop.SetValue(entityItem, Convert.ToDateTime(columnsData[i]?.VarCharValue));
}
else
{
prop.SetValue(entityItem, columnsData[i]?.VarCharValue);
}
}
}
return entityItem;
}
}
}