/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
#if AWS_ASYNC_API
using System.Threading.Tasks;
#endif
using Amazon.DynamoDBv2.Model;
using Amazon.Runtime;
namespace Amazon.DynamoDBv2.DocumentModel
{
///
/// Class for retrieving a batch of Documents from a single DynamoDB table.
///
public partial class DocumentBatchGet
{
#region Internal properties
internal Table TargetTable { get; private set; }
internal List Keys { get; private set; }
#endregion
#region Public properties
///
/// List of results retrieved from DynamoDB.
/// Populated after Execute is called.
///
public List Results { get; internal set; }
///
/// List of attributes to retrieve.
///
public List AttributesToGet { get; set; }
///
/// If set to true, a consistent read is issued. Otherwise eventually-consistent is used.
///
public bool ConsistentRead { get; set; }
#endregion
#region Constructor
///
/// Constructs a DocumentBatchGet instance for a specific table.
///
/// Table to get items from.
public DocumentBatchGet(Table targetTable)
{
TargetTable = targetTable;
Keys = new List();
}
#endregion
#region Public methods
///
/// Add a single item to get, identified by its hash primary key.
///
/// Hash key element of the item to get.
public void AddKey(Primitive hashKey)
{
AddKey(hashKey, null);
}
///
/// Add a single item to get, identified by its hash-and-range primary key.
///
/// Hash key element of the item to get.
/// Range key element of the item to get.
public void AddKey(Primitive hashKey, Primitive rangeKey)
{
Keys.Add(TargetTable.MakeKey(hashKey, rangeKey));
}
///
/// Add a single item to get, identified by its key.
///
/// Key of the item to get.
public void AddKey(IDictionary key)
{
Keys.Add(TargetTable.MakeKey(key));
}
///
/// Creates a MultiTableDocumentBatchGet object that is a combination
/// of the current DocumentBatchGet and the specified DocumentBatchGet.
///
/// Other DocumentBatchGet object.
///
/// MultiTableDocumentBatchGet consisting of the two DocumentBatchGet
/// objects.
///
public MultiTableDocumentBatchGet Combine(DocumentBatchGet otherBatch)
{
return new MultiTableDocumentBatchGet(this, otherBatch);
}
#endregion
#region Internal methods
internal void ExecuteHelper()
{
MultiBatchGet resultsObject = new MultiBatchGet
{
Batches = new List { this }
};
var results = resultsObject.GetItemsHelper();
List batchResults;
if (results.TryGetValue(TargetTable.TableName, out batchResults))
{
Results = batchResults;
}
else
{
Results = new List();
}
}
#if AWS_ASYNC_API
internal async Task ExecuteHelperAsync(CancellationToken cancellationToken)
{
MultiBatchGet resultsObject = new MultiBatchGet
{
Batches = new List { this }
};
var results = await resultsObject.GetItemsHelperAsync(cancellationToken).ConfigureAwait(false);
List batchResults;
if (results.TryGetValue(TargetTable.TableName, out batchResults))
{
Results = batchResults;
}
else
{
Results = new List();
}
}
#endif
internal void AddKey(Document document)
{
Keys.Add(TargetTable.MakeKey(document));
}
internal void AddKey(Key key)
{
Keys.Add(key);
}
#endregion
}
///
/// Class for retrieving a batch of Documents from multiple DynamoDB tables.
///
public partial class MultiTableDocumentBatchGet
{
#region Properties
///
/// List of DocumentBatchGet objects to include in the multi-table
/// batch request.
///
public List Batches { get; private set; }
///
/// Total number of primary keys in the multi-table batch request.
///
public int TotalKeys
{
get
{
int count = 0;
foreach (var batch in Batches)
{
count += batch.Keys.Count;
}
return count;
}
}
#endregion
#region Constructor
///
/// Constructs a MultiTableDocumentBatchGet object from a number of
/// DocumentBatchGet objects.
///
/// Collection of DocumentBatchGet objects.
public MultiTableDocumentBatchGet(params DocumentBatchGet[] batches)
{
if (batches == null)
throw new ArgumentNullException("batches");
Batches = new List(batches);
}
#endregion
#region Public methods
///
/// Add a DocumentBatchGet object to the multi-table batch request.
///
/// DocumentBatchGet to add.
public void AddBatch(DocumentBatchGet batch)
{
Batches.Add(batch);
}
#endregion
#region Internal methods
internal void ExecuteHelper()
{
MultiBatchGet resultsObject = new MultiBatchGet
{
Batches = Batches
};
var results = resultsObject.GetItemsHelper();
foreach (var batch in Batches)
{
List batchResults;
if (results.TryGetValue(batch.TargetTable.TableName, out batchResults))
{
batch.Results = batchResults;
}
else
{
batch.Results = new List();
}
}
}
#if AWS_ASYNC_API
internal async Task ExecuteHelperAsync(CancellationToken cancellationToken)
{
MultiBatchGet resultsObject = new MultiBatchGet
{
Batches = Batches
};
var results = await resultsObject.GetItemsHelperAsync(cancellationToken).ConfigureAwait(false);
foreach (var batch in Batches)
{
List batchResults;
if (results.TryGetValue(batch.TargetTable.TableName, out batchResults))
{
batch.Results = batchResults;
}
else
{
batch.Results = new List();
}
}
}
#endif
#endregion
}
///
/// Internal class for handling multi-table batch gets.
///
internal class MultiBatchGet
{
///
/// Batches that comprise the current BatchGet operation
///
public List Batches { get; set; }
///
/// Maximum number of items that can be sent in a single BatchGet request
///
public const int MaxItemsPerCall = 100;
///
/// Gets items configured in Batches from the server
///
///
public Dictionary> GetItems()
{
return GetItemsHelper();
}
#if AWS_ASYNC_API
///
/// Gets items configured in Batches from the server asynchronously
///
///
public Task>> GetItemsAsync(CancellationToken cancellationToken = default(CancellationToken))
{
return GetItemsHelperAsync(cancellationToken);
}
internal async Task>> GetItemsHelperAsync(CancellationToken cancellationToken)
{
var results = await GetAttributeItemsAsync(cancellationToken).ConfigureAwait(false);
var itemsAsDocuments = new Dictionary>(StringComparer.Ordinal);
foreach (var kvp in results.RetrievedItems)
{
var tableName = kvp.Key;
var table = results.TargetTables[tableName];
List documents = new List();
foreach (var dictionary in kvp.Value)
{
documents.Add(table.FromAttributeMap(dictionary));
}
itemsAsDocuments[kvp.Key] = documents;
}
return itemsAsDocuments;
}
#endif
internal Dictionary> GetItemsHelper()
{
var results = GetAttributeItems();
var itemsAsDocuments = new Dictionary>(StringComparer.Ordinal);
foreach (var kvp in results.RetrievedItems)
{
var tableName = kvp.Key;
var table = results.TargetTables[tableName];
List documents = new List();
foreach (var dictionary in kvp.Value)
{
documents.Add(table.FromAttributeMap(dictionary));
}
itemsAsDocuments[kvp.Key] = documents;
}
return itemsAsDocuments;
}
#if AWS_ASYNC_API
private async Task GetAttributeItemsAsync(CancellationToken cancellationToken)
{
var results = new Results(Batches);
if (Batches == null || Batches.Count == 0)
return results;
// use client from the table from the first batch
var firstBatch = this.Batches[0];
var targetTable = firstBatch.TargetTable;
var clientToUse = targetTable.DDBClient;
var convertedBatches = ConvertBatches();
while (true)
{
var nextSet = GetNextRequestItems(ref convertedBatches, MaxItemsPerCall);
if (nextSet.Count == 0)
break;
BatchGetItemRequest request = CreateRequest(nextSet);
targetTable.AddRequestHandler(request, isAsync: true);
await CallUntilCompletionAsync(clientToUse, request, results, cancellationToken).ConfigureAwait(false);
}
return results;
}
#endif
private Results GetAttributeItems()
{
var results = new Results(Batches);
if (Batches == null || Batches.Count == 0)
return results;
// use client from the table from the first batch
var firstBatch = this.Batches[0];
var targetTable = firstBatch.TargetTable;
var clientToUse = targetTable.DDBClient;
var convertedBatches = ConvertBatches();
while (true)
{
var nextSet = GetNextRequestItems(ref convertedBatches, MaxItemsPerCall);
if (nextSet.Count == 0)
break;
BatchGetItemRequest request = CreateRequest(nextSet);
targetTable.AddRequestHandler(request, isAsync: false);
CallUntilCompletion(clientToUse, request, results);
}
return results;
}
#if NETSTANDARD
private static void CallUntilCompletion(AmazonDynamoDBClient client, BatchGetItemRequest request, Results allResults)
#else
private static void CallUntilCompletion(IAmazonDynamoDB client, BatchGetItemRequest request, Results allResults)
#endif
{
do
{
var serviceResponse = client.BatchGetItem(request);
foreach (var kvp in serviceResponse.Responses)
{
var tableName = kvp.Key;
var items = kvp.Value;
allResults.Add(tableName, items);
}
request.RequestItems = serviceResponse.UnprocessedKeys;
} while (request.RequestItems.Count > 0);
}
#if AWS_ASYNC_API
#if NETSTANDARD
private static async Task CallUntilCompletionAsync(AmazonDynamoDBClient client, BatchGetItemRequest request, Results allResults, CancellationToken cancellationToken)
#else
private static async Task CallUntilCompletionAsync(IAmazonDynamoDB client, BatchGetItemRequest request, Results allResults, CancellationToken cancellationToken)
#endif
{
do
{
var serviceResponse = await client.BatchGetItemAsync(request, cancellationToken).ConfigureAwait(false);
foreach (var kvp in serviceResponse.Responses)
{
var tableName = kvp.Key;
var items = kvp.Value;
allResults.Add(tableName, items);
}
request.RequestItems = serviceResponse.UnprocessedKeys;
} while (request.RequestItems.Count > 0);
}
#endif
private static BatchGetItemRequest CreateRequest(Dictionary set)
{
BatchGetItemRequest request = new BatchGetItemRequest();
var requestItems = new Dictionary();
foreach (var kvp in set)
{
var tableName = kvp.Key;
var requestSet = kvp.Value;
var keys = new KeysAndAttributes
{
Keys = requestSet.GetItems(),
ConsistentRead = requestSet.Batch.ConsistentRead,
AttributesToGet = requestSet.Batch.AttributesToGet
};
requestItems.Add(tableName, keys);
}
request.RequestItems = requestItems;
return request;
}
private Dictionary ConvertBatches()
{
var allItems = new Dictionary();
if (Batches == null || Batches.Count == 0)
return allItems;
foreach (var batch in Batches)
{
var table = batch.TargetTable;
var tableName = table.TableName;
if (allItems.ContainsKey(tableName))
throw new AmazonDynamoDBException("More than one batch request against a single table is not supported.");
if (batch.Keys != null && batch.Keys.Count > 0)
{
var keysList = batch.Keys.Select((Key k) => k as Dictionary);
var keys = new RequestSet(keysList, batch);
allItems.Add(tableName, keys);
}
}
return allItems;
}
private static Dictionary GetNextRequestItems(ref Dictionary getRequestsMap, int maxNumberOfItems)
{
int numberOfItems = 0;
var nextItems = new Dictionary();
List keys = new List(getRequestsMap.Keys);
foreach (string tableName in keys)
{
if (numberOfItems >= maxNumberOfItems)
break;
var getRequests = getRequestsMap[tableName];
if (getRequests.Count == 0)
continue;
var partialRequests = getRequests.RemoveFromHead(maxNumberOfItems - numberOfItems);
var partialRequestsList = new RequestSet(partialRequests, getRequests.Batch);
nextItems[tableName] = partialRequestsList;
numberOfItems += partialRequestsList.Count;
}
return nextItems;
}
private class RequestSet : QuickList>
{
public DocumentBatchGet Batch { get; private set; }
public RequestSet(IEnumerable> items, DocumentBatchGet batch)
: base(items)
{
Batch = batch;
}
}
private class Results
{
public Dictionary>> RetrievedItems { get; private set; }
public Dictionary TargetTables { get; private set; }
public Results(IEnumerable batches)
{
RetrievedItems = new Dictionary>>(StringComparer.Ordinal);
TargetTables = new Dictionary(StringComparer.Ordinal);
if (batches != null)
{
foreach (var batch in batches)
{
var table = batch.TargetTable;
TargetTables[table.TableName] = table;
}
}
}
public void Add(string tableName, List> items)
{
List> fetchedItems;
if (!RetrievedItems.TryGetValue(tableName, out fetchedItems))
{
fetchedItems = new List>();
RetrievedItems[tableName] = fetchedItems;
}
fetchedItems.AddRange(items);
}
}
}
}