/* * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * SPDX-License-Identifier: MIT-0 * * Permission is hereby granted, free of charge, to any person obtaining a copy of this * software and associated documentation files (the "Software"), to deal in the Software * without restriction, including without limitation the rights to use, copy, modify, * merge, publish, distribute, sublicense, and/or sell copies of the Software, and to * permit persons to whom the Software is furnished to do so. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, * INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A * PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ using Amazon.Athena; using Amazon.Athena.Model; using AthenaNetCore.BusinessLogic.Entities; using System; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Threading.Tasks; namespace AthenaNetCore.BusinessLogic.Extentions { internal static class AmazonAthenaClientExtentions { private const int SLEEP_AMOUNT_IN_MS = 1000; private static readonly string S3_RESULT_BUCKET_NAME = Environment.GetEnvironmentVariable("S3_RESULT_BUCKET_NAME") ?? string.Empty; /// /// Execute an SQL query using Amazon Athena, wait for the result of the query /// and map the result to a C# Entity object. If the Amazon Athena did not complete /// processing the giving query and the timeout is reached, this method /// will throw exception and return the QueryExecutionId that /// can be used later to get the result /// /// Type of the entity result /// Instance of Amazon Athena Client /// SQL query /// default 2 minutes: timeout in minutes for the application to abort waiting. /// The enumerator, which supports a simple iteration over a collection of a specified type public static async Task> QueryAsync(this IAmazonAthena athenaClient, string queryString, int timeoutInMinutes = 2) where T : new() { if (athenaClient == null || string.IsNullOrEmpty(queryString)) { return default; } var qry = await athenaClient.StartQueryExecutionAsync(new StartQueryExecutionRequest { QueryString = queryString, ResultConfiguration = new ResultConfiguration { OutputLocation = S3_RESULT_BUCKET_NAME } }); await WaitForQueryToComplete(athenaClient, qry.QueryExecutionId, timeoutInMinutes); return await ProcessQueryResultsAsync(athenaClient, qry.QueryExecutionId); } /// /// Execute an SQL query using Amazon Athena and return QueryExecutionId /// without waiting for the result, the QueryExecutionId can be used later to get the result. /// /// Instance of Amazon Athena Client /// SQL query /// Eexecution Id to track the query progress public static async Task QueryAndGoAsync(this IAmazonAthena athenaClient, string queryString) { var qry = await athenaClient.StartQueryExecutionAsync(new StartQueryExecutionRequest { QueryString = queryString, ResultConfiguration = new ResultConfiguration { OutputLocation = S3_RESULT_BUCKET_NAME } }); return qry.QueryExecutionId; } /// /// Retive the query result and return it as a collection of a specified type /// /// Type of the entity result /// Instance of Amazon Athena Client /// /// public static async Task> ProcessQueryResultsAsync(this IAmazonAthena athenaClient, string queryExecutionId) where T : new() { if (athenaClient == null || string.IsNullOrEmpty(queryExecutionId)) { return default; } var results = new List(); try { // Max Results can be set but if its not set, // it will choose the maximum page size // As of the writing of this code, the maximum value is 1000 var getQueryResultsRequest = new GetQueryResultsRequest { QueryExecutionId = queryExecutionId }; var getQueryResultsResults = await athenaClient.GetQueryResultsAsync(getQueryResultsRequest); var columnInfoList = getQueryResultsResults.ResultSet.ResultSetMetadata.ColumnInfo; var rows = getQueryResultsResults.ResultSet.Rows; var columnPositionMap = MapColumnsPositions(rows[0].Data, columnInfoList); rows.RemoveAt(0); foreach (var row in rows) { results.Add(ProcessRow(row.Data, columnPositionMap)); } } catch (AmazonAthenaException e) { Debug.WriteLine(e); throw; } return results; } /// /// Check if the query still running and return FALSE n case of completion. /// Otherwise It will return TRUE or throw an exception in case of Failed or Cancelled /// /// /// /// public static async Task IsTheQueryStillRunning(this IAmazonAthena athenaClient, string queryExecutionId) { var getQueryExecutionRequest = new GetQueryExecutionRequest { QueryExecutionId = queryExecutionId }; bool isQueryStillRunning = true; var getQueryExecutionResponse = await athenaClient.GetQueryExecutionAsync(getQueryExecutionRequest); var queryState = getQueryExecutionResponse.QueryExecution.Status.State; if (queryState == QueryExecutionState.FAILED) { throw new AmazonAthenaException("Query Failed to run with Error Message: " + getQueryExecutionResponse.QueryExecution.Status.StateChangeReason); } else if (queryState == QueryExecutionState.CANCELLED) { throw new AmazonAthenaException("Query was cancelled."); } else if (queryState == QueryExecutionState.SUCCEEDED) { isQueryStillRunning = false; } Debug.WriteLine("Current Status is: " + queryState); return isQueryStillRunning; } /// /// Wait for Amazon Athena to complete execution of the query. If It is not completed until the timeout, then It should throw exception. /// /// Amazon Athena Client instance /// Eexecution Id to track the query progress /// max DateTime to wait before abort /// private static async Task WaitForQueryToComplete(IAmazonAthena athenaClient, string queryExecutionId, int timeout) { bool isQueryStillRunning = true; DateTimeOffset endTimeOffset = DateTimeOffset.Now.AddMinutes(timeout); while (isQueryStillRunning && DateTimeOffset.Now <= endTimeOffset) { isQueryStillRunning = await athenaClient.IsTheQueryStillRunning(queryExecutionId); if (isQueryStillRunning) { // Sleep an amount of time before retrying again. await Task.Delay(SLEEP_AMOUNT_IN_MS); } } if (isQueryStillRunning && DateTimeOffset.Now > endTimeOffset) { throw new AmazonAthenaException("Timeout: Amazon Athena still processing your query, use the RequestId to get the result later.") { RequestId = queryExecutionId }; } } /// /// Map the columns with tier positions on the first row and the columns Info /// /// first row columns position /// columns info /// private static IReadOnlyDictionary MapColumnsPositions(IReadOnlyList columnsPositions, IReadOnlyList columnInfoList) { var result = new Dictionary(); if (columnsPositions == null || columnInfoList == null) { return result; } for (int i = 0; i < columnsPositions.Count; i++) { var column = columnsPositions[i].VarCharValue.ToLower(); result.Add(column, new ColumnPositionInfo { IndexPosition = i, ColumnInfo = columnInfoList.FirstOrDefault(f => f.Name.ToLower() == column) }); } return result; } /// /// Compute the row data to entity object /// /// Specified entity object type /// Collection of columns data in row /// Map of columns position and info for each data /// private static T ProcessRow(IReadOnlyList columnsData, IReadOnlyDictionary columnsPositionMap) where T : new() { //Debug.WriteLine(string.Join(" | ", columnsData.Select(s => s.VarCharValue))); var entityItem = new T(); foreach (var prop in entityItem.GetType().GetProperties()) { var propColumnName = prop.Name.ToLower(); var att = prop.GetCustomAttributes(typeof(AthenaColumnAttribute), false)?.FirstOrDefault(); if (att is AthenaColumnAttribute attribute) { propColumnName = attribute.ColumnName; } if (columnsPositionMap.ContainsKey(propColumnName)) { var mapped = columnsPositionMap[propColumnName]; var athenaColumnInfo = mapped.ColumnInfo; var i = mapped.IndexPosition; //For more detail about Amazon Athena Data Type, check: https://docs.aws.amazon.com/athena/latest/ug/data-types.html if (athenaColumnInfo.Type == "integer" || athenaColumnInfo.Type == "tinyint" || athenaColumnInfo.Type == "smallint") { prop.SetValue(entityItem, Convert.ToInt32(columnsData[i]?.VarCharValue)); } else if (athenaColumnInfo.Type == "bigint") { prop.SetValue(entityItem, Convert.ToInt64(columnsData[i]?.VarCharValue)); } else if (athenaColumnInfo.Type == "double" || athenaColumnInfo.Type == "float") { prop.SetValue(entityItem, Convert.ToDouble(columnsData[i]?.VarCharValue)); } else if (athenaColumnInfo.Type == "decimal") { prop.SetValue(entityItem, Convert.ToDecimal(columnsData[i]?.VarCharValue)); } else if (athenaColumnInfo.Type == "date" || athenaColumnInfo.Type == "timestamp") { prop.SetValue(entityItem, Convert.ToDateTime(columnsData[i]?.VarCharValue)); } else { prop.SetValue(entityItem, columnsData[i]?.VarCharValue); } } } return entityItem; } } }