/* * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. * A copy of the License is located at * * http://aws.amazon.com/apache2.0 * * or in the "license" file accompanying this file. This file is distributed * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either * express or implied. See the License for the specific language governing * permissions and limitations under the License. */ using System; using System.Collections.Generic; using System.Linq; using System.Threading; #if AWS_ASYNC_API using System.Threading.Tasks; #endif using Amazon.DynamoDBv2.Model; using Amazon.Runtime; namespace Amazon.DynamoDBv2.DocumentModel { /// /// Class for retrieving a batch of Documents from a single DynamoDB table. /// public partial class DocumentBatchGet { #region Internal properties internal Table TargetTable { get; private set; } internal List Keys { get; private set; } #endregion #region Public properties /// /// List of results retrieved from DynamoDB. /// Populated after Execute is called. /// public List Results { get; internal set; } /// /// List of attributes to retrieve. /// public List AttributesToGet { get; set; } /// /// If set to true, a consistent read is issued. Otherwise eventually-consistent is used. /// public bool ConsistentRead { get; set; } #endregion #region Constructor /// /// Constructs a DocumentBatchGet instance for a specific table. /// /// Table to get items from. public DocumentBatchGet(Table targetTable) { TargetTable = targetTable; Keys = new List(); } #endregion #region Public methods /// /// Add a single item to get, identified by its hash primary key. /// /// Hash key element of the item to get. public void AddKey(Primitive hashKey) { AddKey(hashKey, null); } /// /// Add a single item to get, identified by its hash-and-range primary key. /// /// Hash key element of the item to get. /// Range key element of the item to get. public void AddKey(Primitive hashKey, Primitive rangeKey) { Keys.Add(TargetTable.MakeKey(hashKey, rangeKey)); } /// /// Add a single item to get, identified by its key. /// /// Key of the item to get. public void AddKey(IDictionary key) { Keys.Add(TargetTable.MakeKey(key)); } /// /// Creates a MultiTableDocumentBatchGet object that is a combination /// of the current DocumentBatchGet and the specified DocumentBatchGet. /// /// Other DocumentBatchGet object. /// /// MultiTableDocumentBatchGet consisting of the two DocumentBatchGet /// objects. /// public MultiTableDocumentBatchGet Combine(DocumentBatchGet otherBatch) { return new MultiTableDocumentBatchGet(this, otherBatch); } #endregion #region Internal methods internal void ExecuteHelper() { MultiBatchGet resultsObject = new MultiBatchGet { Batches = new List { this } }; var results = resultsObject.GetItemsHelper(); List batchResults; if (results.TryGetValue(TargetTable.TableName, out batchResults)) { Results = batchResults; } else { Results = new List(); } } #if AWS_ASYNC_API internal async Task ExecuteHelperAsync(CancellationToken cancellationToken) { MultiBatchGet resultsObject = new MultiBatchGet { Batches = new List { this } }; var results = await resultsObject.GetItemsHelperAsync(cancellationToken).ConfigureAwait(false); List batchResults; if (results.TryGetValue(TargetTable.TableName, out batchResults)) { Results = batchResults; } else { Results = new List(); } } #endif internal void AddKey(Document document) { Keys.Add(TargetTable.MakeKey(document)); } internal void AddKey(Key key) { Keys.Add(key); } #endregion } /// /// Class for retrieving a batch of Documents from multiple DynamoDB tables. /// public partial class MultiTableDocumentBatchGet { #region Properties /// /// List of DocumentBatchGet objects to include in the multi-table /// batch request. /// public List Batches { get; private set; } /// /// Total number of primary keys in the multi-table batch request. /// public int TotalKeys { get { int count = 0; foreach (var batch in Batches) { count += batch.Keys.Count; } return count; } } #endregion #region Constructor /// /// Constructs a MultiTableDocumentBatchGet object from a number of /// DocumentBatchGet objects. /// /// Collection of DocumentBatchGet objects. public MultiTableDocumentBatchGet(params DocumentBatchGet[] batches) { if (batches == null) throw new ArgumentNullException("batches"); Batches = new List(batches); } #endregion #region Public methods /// /// Add a DocumentBatchGet object to the multi-table batch request. /// /// DocumentBatchGet to add. public void AddBatch(DocumentBatchGet batch) { Batches.Add(batch); } #endregion #region Internal methods internal void ExecuteHelper() { MultiBatchGet resultsObject = new MultiBatchGet { Batches = Batches }; var results = resultsObject.GetItemsHelper(); foreach (var batch in Batches) { List batchResults; if (results.TryGetValue(batch.TargetTable.TableName, out batchResults)) { batch.Results = batchResults; } else { batch.Results = new List(); } } } #if AWS_ASYNC_API internal async Task ExecuteHelperAsync(CancellationToken cancellationToken) { MultiBatchGet resultsObject = new MultiBatchGet { Batches = Batches }; var results = await resultsObject.GetItemsHelperAsync(cancellationToken).ConfigureAwait(false); foreach (var batch in Batches) { List batchResults; if (results.TryGetValue(batch.TargetTable.TableName, out batchResults)) { batch.Results = batchResults; } else { batch.Results = new List(); } } } #endif #endregion } /// /// Internal class for handling multi-table batch gets. /// internal class MultiBatchGet { /// /// Batches that comprise the current BatchGet operation /// public List Batches { get; set; } /// /// Maximum number of items that can be sent in a single BatchGet request /// public const int MaxItemsPerCall = 100; /// /// Gets items configured in Batches from the server /// /// public Dictionary> GetItems() { return GetItemsHelper(); } #if AWS_ASYNC_API /// /// Gets items configured in Batches from the server asynchronously /// /// public Task>> GetItemsAsync(CancellationToken cancellationToken = default(CancellationToken)) { return GetItemsHelperAsync(cancellationToken); } internal async Task>> GetItemsHelperAsync(CancellationToken cancellationToken) { var results = await GetAttributeItemsAsync(cancellationToken).ConfigureAwait(false); var itemsAsDocuments = new Dictionary>(StringComparer.Ordinal); foreach (var kvp in results.RetrievedItems) { var tableName = kvp.Key; var table = results.TargetTables[tableName]; List documents = new List(); foreach (var dictionary in kvp.Value) { documents.Add(table.FromAttributeMap(dictionary)); } itemsAsDocuments[kvp.Key] = documents; } return itemsAsDocuments; } #endif internal Dictionary> GetItemsHelper() { var results = GetAttributeItems(); var itemsAsDocuments = new Dictionary>(StringComparer.Ordinal); foreach (var kvp in results.RetrievedItems) { var tableName = kvp.Key; var table = results.TargetTables[tableName]; List documents = new List(); foreach (var dictionary in kvp.Value) { documents.Add(table.FromAttributeMap(dictionary)); } itemsAsDocuments[kvp.Key] = documents; } return itemsAsDocuments; } #if AWS_ASYNC_API private async Task GetAttributeItemsAsync(CancellationToken cancellationToken) { var results = new Results(Batches); if (Batches == null || Batches.Count == 0) return results; // use client from the table from the first batch var firstBatch = this.Batches[0]; var targetTable = firstBatch.TargetTable; var clientToUse = targetTable.DDBClient; var convertedBatches = ConvertBatches(); while (true) { var nextSet = GetNextRequestItems(ref convertedBatches, MaxItemsPerCall); if (nextSet.Count == 0) break; BatchGetItemRequest request = CreateRequest(nextSet); targetTable.AddRequestHandler(request, isAsync: true); await CallUntilCompletionAsync(clientToUse, request, results, cancellationToken).ConfigureAwait(false); } return results; } #endif private Results GetAttributeItems() { var results = new Results(Batches); if (Batches == null || Batches.Count == 0) return results; // use client from the table from the first batch var firstBatch = this.Batches[0]; var targetTable = firstBatch.TargetTable; var clientToUse = targetTable.DDBClient; var convertedBatches = ConvertBatches(); while (true) { var nextSet = GetNextRequestItems(ref convertedBatches, MaxItemsPerCall); if (nextSet.Count == 0) break; BatchGetItemRequest request = CreateRequest(nextSet); targetTable.AddRequestHandler(request, isAsync: false); CallUntilCompletion(clientToUse, request, results); } return results; } #if NETSTANDARD private static void CallUntilCompletion(AmazonDynamoDBClient client, BatchGetItemRequest request, Results allResults) #else private static void CallUntilCompletion(IAmazonDynamoDB client, BatchGetItemRequest request, Results allResults) #endif { do { var serviceResponse = client.BatchGetItem(request); foreach (var kvp in serviceResponse.Responses) { var tableName = kvp.Key; var items = kvp.Value; allResults.Add(tableName, items); } request.RequestItems = serviceResponse.UnprocessedKeys; } while (request.RequestItems.Count > 0); } #if AWS_ASYNC_API #if NETSTANDARD private static async Task CallUntilCompletionAsync(AmazonDynamoDBClient client, BatchGetItemRequest request, Results allResults, CancellationToken cancellationToken) #else private static async Task CallUntilCompletionAsync(IAmazonDynamoDB client, BatchGetItemRequest request, Results allResults, CancellationToken cancellationToken) #endif { do { var serviceResponse = await client.BatchGetItemAsync(request, cancellationToken).ConfigureAwait(false); foreach (var kvp in serviceResponse.Responses) { var tableName = kvp.Key; var items = kvp.Value; allResults.Add(tableName, items); } request.RequestItems = serviceResponse.UnprocessedKeys; } while (request.RequestItems.Count > 0); } #endif private static BatchGetItemRequest CreateRequest(Dictionary set) { BatchGetItemRequest request = new BatchGetItemRequest(); var requestItems = new Dictionary(); foreach (var kvp in set) { var tableName = kvp.Key; var requestSet = kvp.Value; var keys = new KeysAndAttributes { Keys = requestSet.GetItems(), ConsistentRead = requestSet.Batch.ConsistentRead, AttributesToGet = requestSet.Batch.AttributesToGet }; requestItems.Add(tableName, keys); } request.RequestItems = requestItems; return request; } private Dictionary ConvertBatches() { var allItems = new Dictionary(); if (Batches == null || Batches.Count == 0) return allItems; foreach (var batch in Batches) { var table = batch.TargetTable; var tableName = table.TableName; if (allItems.ContainsKey(tableName)) throw new AmazonDynamoDBException("More than one batch request against a single table is not supported."); if (batch.Keys != null && batch.Keys.Count > 0) { var keysList = batch.Keys.Select((Key k) => k as Dictionary); var keys = new RequestSet(keysList, batch); allItems.Add(tableName, keys); } } return allItems; } private static Dictionary GetNextRequestItems(ref Dictionary getRequestsMap, int maxNumberOfItems) { int numberOfItems = 0; var nextItems = new Dictionary(); List keys = new List(getRequestsMap.Keys); foreach (string tableName in keys) { if (numberOfItems >= maxNumberOfItems) break; var getRequests = getRequestsMap[tableName]; if (getRequests.Count == 0) continue; var partialRequests = getRequests.RemoveFromHead(maxNumberOfItems - numberOfItems); var partialRequestsList = new RequestSet(partialRequests, getRequests.Batch); nextItems[tableName] = partialRequestsList; numberOfItems += partialRequestsList.Count; } return nextItems; } private class RequestSet : QuickList> { public DocumentBatchGet Batch { get; private set; } public RequestSet(IEnumerable> items, DocumentBatchGet batch) : base(items) { Batch = batch; } } private class Results { public Dictionary>> RetrievedItems { get; private set; } public Dictionary TargetTables { get; private set; } public Results(IEnumerable batches) { RetrievedItems = new Dictionary>>(StringComparer.Ordinal); TargetTables = new Dictionary(StringComparer.Ordinal); if (batches != null) { foreach (var batch in batches) { var table = batch.TargetTable; TargetTables[table.TableName] = table; } } } public void Add(string tableName, List> items) { List> fetchedItems; if (!RetrievedItems.TryGetValue(tableName, out fetchedItems)) { fetchedItems = new List>(); RetrievedItems[tableName] = fetchedItems; } fetchedItems.AddRange(items); } } } }