/* * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. * A copy of the License is located at * * http://aws.amazon.com/apache2.0 * * or in the "license" file accompanying this file. This file is distributed * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either * express or implied. See the License for the specific language governing * permissions and limitations under the License. */ using System; using System.Collections.Generic; using System.IO; using System.Net; using System.Threading; #if AWS_ASYNC_API using System.Threading.Tasks; #endif using Amazon.DynamoDBv2.Model; using Amazon.Runtime; namespace Amazon.DynamoDBv2.DocumentModel { /// /// Class for putting and/or deleting a batch of items in a single DynamoDB table. /// public partial class DocumentBatchWrite { #region Internal properties internal Table TargetTable { get; private set; } internal List ToDelete { get; private set; } internal List ToPut { get; private set; } #endregion #region Constructor /// /// Constructs a DocumentBatchWrite instance for a specific table. /// /// Table to get items from. public DocumentBatchWrite(Table targetTable) { TargetTable = targetTable; ToDelete = new List(); ToPut = new List(); } #endregion #region Public Delete methods /// /// Add a single item to delete, identified by its hash primary key. /// /// Hash key element of the item to delete. public void AddKeyToDelete(Primitive hashKey) { AddKeyToDelete(hashKey, null); } /// /// Add a single item to delete, identified by its hash-and-range primary key. /// /// Hash key element of the item to delete. /// Range key element of the item to delete. public void AddKeyToDelete(Primitive hashKey, Primitive rangeKey) { AddKeyToDelete(TargetTable.MakeKey(hashKey, rangeKey)); } /// /// Add a single item to delete, identified by its key. /// /// Key of the item to delete. public void AddKeyToDelete(IDictionary key) { AddKeyToDelete(TargetTable.MakeKey(key)); } /// /// Add a single item to delete, identified by a Document object. /// /// Document representing the item to be deleted. public void AddItemToDelete(Document document) { AddKeyToDelete(TargetTable.MakeKey(document)); } #endregion #region Public Put methods /// /// Add a single Document to put. /// /// Document to put. public void AddDocumentToPut(Document document) { ToPut.Add(document); } #endregion #region Public methods /// /// Creates a MultiTableDocumentBatchWrite object that is a combination /// of the current DocumentBatchWrite and the specified DocumentBatchWrite. /// /// Other DocumentBatchWrite object. /// /// MultiTableDocumentBatchWrite consisting of the two DocumentBatchWrite /// objects. /// public MultiTableDocumentBatchWrite Combine(DocumentBatchWrite otherBatch) { return new MultiTableDocumentBatchWrite(this, otherBatch); } #endregion #region Internal methods internal void ExecuteHelper() { MultiBatchWrite multiBatchWrite = new MultiBatchWrite { Batches = new List { this } }; multiBatchWrite.WriteItems(); } #if AWS_ASYNC_API internal Task ExecuteHelperAsync(CancellationToken cancellationToken) { MultiBatchWrite multiBatchWrite = new MultiBatchWrite { Batches = new List { this } }; return multiBatchWrite.WriteItemsAsync(cancellationToken); } #endif internal void AddKeyToDelete(Key key) { ToDelete.Add(key); } #endregion } /// /// Class for putting and/or deleting a batch of items in multiple DynamoDB tables. /// public partial class MultiTableDocumentBatchWrite { #region Properties /// /// List of DocumentBatchWrite objects to include in the multi-table /// batch request. /// public List Batches { get; private set; } #endregion #region Constructor /// /// Constructs a MultiTableDocumentBatchWrite object from a number of /// DocumentBatchWrite objects. /// /// Collection of DocumentBatchWrite objects. public MultiTableDocumentBatchWrite(params DocumentBatchWrite[] batches) { if (batches == null) throw new ArgumentNullException("batches"); Batches = new List(batches); } #endregion #region Public methods /// /// Add a DocumentBatchWrite object to the multi-table batch request. /// /// DocumentBatchWrite to add. public void AddBatch(DocumentBatchWrite batch) { Batches.Add(batch); } #endregion #region Internal methods internal void ExecuteHelper() { MultiBatchWrite multiBatchWrite = new MultiBatchWrite { Batches = Batches }; multiBatchWrite.WriteItems(); } #if AWS_ASYNC_API internal Task ExecuteHelperAsync(CancellationToken cancellationToken) { MultiBatchWrite multiBatchWrite = new MultiBatchWrite { Batches = Batches }; return multiBatchWrite.WriteItemsAsync(cancellationToken); } #endif #endregion } /// /// Internal class for handling multi-table batch writes. /// internal class MultiBatchWrite { private static KeyComparer keyComparer = new KeyComparer(); /// /// Map which stores tables by its name. /// private Dictionary tableMap = new Dictionary(StringComparer.Ordinal); /// /// Maximum number of items that can be sent in a single BatchWrite request /// public const int MaxItemsPerCall = 25; /// /// Batches that comprise the current BatchWrite operation /// public List Batches { get; set; } /// /// Pushes items configured in Batches to the server /// public void WriteItems() { WriteItemsHelper(Batches); } #if AWS_ASYNC_API /// /// Pushes items configured in Batches to the server asynchronously /// public Task WriteItemsAsync(CancellationToken cancellationToken = default(CancellationToken)) { return WriteItemsHelperAsync(Batches, cancellationToken); } #endif #region Private helper methods private void WriteItemsHelper(List batches) { if (Batches == null || Batches.Count == 0) return; Dictionary> writeRequestsMap = ConvertBatches(batches); Table targetTable = this.Batches[0].TargetTable; while (true) { Dictionary> nextSet = GetNextWriteItems(ref writeRequestsMap, MaxItemsPerCall); if (nextSet.Count == 0) break; SendSet(nextSet, targetTable); } } #if AWS_ASYNC_API private async Task WriteItemsHelperAsync(List batches, CancellationToken cancellationToken) { if (Batches == null || Batches.Count == 0) return; Dictionary> writeRequestsMap = ConvertBatches(batches); Table targetTable = this.Batches[0].TargetTable; while (true) { Dictionary> nextSet = GetNextWriteItems(ref writeRequestsMap, MaxItemsPerCall); if (nextSet.Count == 0) break; await SendSetAsync(nextSet, targetTable, cancellationToken).ConfigureAwait(false); } } #endif private void SendSet(Dictionary> set, Table targetTable) { Dictionary> documentMap = null; BatchWriteItemRequest request = ConstructRequest(set, targetTable, out documentMap, false); if (request.RequestItems.Count == 0) return; bool shouldTrySmallerRequest = false; try { CallUntilCompletion(request, documentMap, targetTable.DDBClient); } catch (AmazonDynamoDBException addbex) { if (addbex.StatusCode == HttpStatusCode.RequestEntityTooLarge) shouldTrySmallerRequest = true; else throw; } if (shouldTrySmallerRequest) { int totalWrites = GetNumberOfWrites(request); if (totalWrites >= 2) { // 2 or more items in request, retry with the request split up var partialSet = GetNextWriteItems(ref set, totalWrites / 2); SendSet(partialSet, targetTable); SendSet(set, targetTable); } else { // only 1 item in request, retry as-is SendSet(set, targetTable); } } } #if AWS_ASYNC_API private async Task SendSetAsync(Dictionary> set, Table targetTable, CancellationToken cancellationToken) { Dictionary> documentMap = null; BatchWriteItemRequest request = ConstructRequest(set, targetTable, out documentMap, true); if (request.RequestItems.Count == 0) return; bool shouldTrySmallerRequest = false; try { await CallUntilCompletionAsync(request, documentMap, targetTable.DDBClient, cancellationToken).ConfigureAwait(false); } catch (AmazonDynamoDBException addbex) { if (addbex.StatusCode == HttpStatusCode.RequestEntityTooLarge) shouldTrySmallerRequest = true; else throw; } if (shouldTrySmallerRequest) { int totalWrites = GetNumberOfWrites(request); if (totalWrites >= 2) { // 2 or more items in request, retry with the request split up var partialSet = GetNextWriteItems(ref set, totalWrites / 2); await SendSetAsync(partialSet, targetTable, cancellationToken).ConfigureAwait(false); await SendSetAsync(set, targetTable, cancellationToken).ConfigureAwait(false); } else { // only 1 item in request, retry as-is await SendSetAsync(set, targetTable, cancellationToken).ConfigureAwait(false); } } } #endif private static int GetNumberOfWrites(BatchWriteItemRequest request) { int totalWrites = 0; foreach (var writeItem in request.RequestItems) { var writes = writeItem.Value; totalWrites += (writes == null ? 0 : writes.Count); } return totalWrites; } private BatchWriteItemRequest ConstructRequest( Dictionary> writeItems, Table targetTable, out Dictionary> documentMap, bool isAsync) { documentMap = new Dictionary>(StringComparer.Ordinal); BatchWriteItemRequest request = new BatchWriteItemRequest(); foreach (var writeItem in writeItems) { string tableName = writeItem.Key; List requestItems = writeItem.Value.GetItems(); if (requestItems != null && requestItems.Count > 0) { var table = tableMap[tableName]; var requestList = new List(); var tableDocumentMap = new Dictionary(keyComparer); documentMap.Add(tableName, tableDocumentMap); foreach (var item in requestItems) { requestList.Add(item.WriteRequest); // Add document corresponding to the Put request to document map if (item.WriteRequest.PutRequest != null) { var key = table.MakeKey(item.Document); tableDocumentMap.Add(key, item.Document); } } request.RequestItems[tableName] = requestList; } } targetTable.AddRequestHandler(request, isAsync); return request; } private static Dictionary> GetNextWriteItems(ref Dictionary> writeRequestsMap, int maxNumberOfItems) { int numberOfItems = 0; Dictionary> nextItems = new Dictionary>(); List keys = new List(writeRequestsMap.Keys); foreach (string tableName in keys) { if (numberOfItems >= maxNumberOfItems) break; QuickList writeRequests = writeRequestsMap[tableName]; if (writeRequests.Count == 0) continue; IEnumerable partialRequests = writeRequests.RemoveFromHead(maxNumberOfItems - numberOfItems); QuickList partialRequestsList = new QuickList(partialRequests); nextItems[tableName] = partialRequestsList; numberOfItems += partialRequestsList.Count; } return nextItems; } #if NETSTANDARD private void CallUntilCompletion(BatchWriteItemRequest request, Dictionary> documentMap, AmazonDynamoDBClient client) #else private void CallUntilCompletion(BatchWriteItemRequest request, Dictionary> documentMap, IAmazonDynamoDB client) #endif { do { var result = client.BatchWriteItem(request); request.RequestItems = result.UnprocessedItems; Dictionary unprocessedDocuments = new Dictionary(keyComparer); foreach (var unprocessedItems in result.UnprocessedItems) { string tableName = unprocessedItems.Key; Table table = tableMap[tableName]; Dictionary tableDocumentMap = documentMap[tableName]; foreach (var writeRequest in unprocessedItems.Value) { if (writeRequest.PutRequest != null) { var doc = table.FromAttributeMap(writeRequest.PutRequest.Item); var key = table.MakeKey(doc); Document document = null; if (tableDocumentMap.TryGetValue(key, out document)) { // Remove unprocessed requests from the document map // and copy them to unprocessed documents. unprocessedDocuments.Add(key, document); tableDocumentMap.Remove(key); } } } // Commit the remaining documents in the document map foreach (var document in tableDocumentMap.Values) { document.CommitChanges(); } // Replace existing documents with just the unprocessed documents documentMap[tableName] = unprocessedDocuments; } } while (request.RequestItems.Count > 0); // Commit any remaining documents in document map. // This would only happen if we are not able to match the items sent in the request // with the items returned back as unprocessed items. foreach (var tableDocumentMap in documentMap.Values) { foreach (var document in tableDocumentMap.Values) { document.CommitChanges(); } } } #if AWS_ASYNC_API #if NETSTANDARD private async Task CallUntilCompletionAsync(BatchWriteItemRequest request, Dictionary> documentMap, AmazonDynamoDBClient client, CancellationToken cancellationToken) #else private async Task CallUntilCompletionAsync(BatchWriteItemRequest request, Dictionary> documentMap, IAmazonDynamoDB client, CancellationToken cancellationToken) #endif { do { var result = await client.BatchWriteItemAsync(request, cancellationToken).ConfigureAwait(false); request.RequestItems = result.UnprocessedItems; Dictionary unprocessedDocuments = new Dictionary(keyComparer); foreach (var unprocessedItems in result.UnprocessedItems) { string tableName = unprocessedItems.Key; Table table = tableMap[tableName]; Dictionary tableDocumentMap = documentMap[tableName]; foreach (var writeRequest in unprocessedItems.Value) { if (writeRequest.PutRequest != null) { var doc = table.FromAttributeMap(writeRequest.PutRequest.Item); var key = table.MakeKey(doc); Document document = null; if (tableDocumentMap.TryGetValue(key, out document)) { // Remove unprocessed requests from the document map // and copy them to unprocessed documents. unprocessedDocuments.Add(key, document); tableDocumentMap.Remove(key); } } } // Commit the remaining documents in the document map foreach (var document in tableDocumentMap.Values) { document.CommitChanges(); } // Replace existing documents with just the unprocessed documents documentMap[tableName] = unprocessedDocuments; } } while (request.RequestItems.Count > 0); // Commit any remaining documents in document map. // This would only happen if we are not able to match the items sent in the request // with the items returned back as unprocessed items. foreach (var tableDocumentMap in documentMap.Values) { foreach (var document in tableDocumentMap.Values) { document.CommitChanges(); } } } #endif private Dictionary> ConvertBatches(List batches) { Dictionary> result = new Dictionary>(); foreach (var batch in batches) { var table = batch.TargetTable; var tableName = table.TableName; List writeRequests = new List(); if (batch.ToDelete != null) { foreach (var toDelete in batch.ToDelete) writeRequests.Add(new WriteRequestDocument { WriteRequest = new WriteRequest { DeleteRequest = new DeleteRequest { Key = toDelete } } }); } if (batch.ToPut != null) { foreach (var toPut in batch.ToPut) { var item = table.ToAttributeMap(toPut); writeRequests.Add(new WriteRequestDocument { WriteRequest = new WriteRequest { PutRequest = new PutRequest { Item = item } }, Document = toPut }); } } if (writeRequests.Count > 0) { if (result.ContainsKey(tableName)) { result[tableName].Add(writeRequests); } else { QuickList qlWriteRequests = new QuickList(writeRequests); result.Add(tableName, qlWriteRequests); tableMap.Add(tableName, batch.TargetTable); } } } return result; } #endregion } internal class QuickList { private List List; private int StartIndex = 0; public QuickList(IEnumerable items) { List = new List(items); } public void Add(IEnumerable items) { List.AddRange(items); } public int Count { get { return List.Count - StartIndex; } } public IEnumerable RemoveFromHead(int numberOfItems) { int boundedNumberOfItems = Math.Min(numberOfItems, Count); for (int i = 0; i < boundedNumberOfItems; i++) { yield return List[StartIndex]; StartIndex++; } } public List GetItems() { return List.GetRange(StartIndex, Count); } } internal class WriteRequestDocument { public WriteRequest WriteRequest { get; set; } public Document Document { get; set; } } internal class KeyComparer : IEqualityComparer { public bool Equals(Key x, Key y) { if (AreBothNull(x, y)) { return true; } if (IsEitherNull(x, y)) { return false; } // Objects are equal if both reference the same object. if (object.ReferenceEquals(x, y)) { return true; } if (x.Count != y.Count) { return false; } foreach (var item in x) { AttributeValue valueY = null; if (y.TryGetValue(item.Key, out valueY)) { if (!CompareAttributeValue(item.Value, valueY)) { return false; } } else { return false; } } return true; } public int GetHashCode(Key key) { int hashCode = 0; foreach (var item in key) { hashCode ^= GetHashCode(item.Value); } return hashCode; } private static int GetHashCode(AttributeValue attributeValue) { if (attributeValue == null) { return 0; } if (attributeValue.S != null) { return attributeValue.S.GetHashCode(); } if (attributeValue.N != null) { return attributeValue.N.GetHashCode(); } if (attributeValue.B != null) { long xPos = attributeValue.B.Position; attributeValue.B.Position = 0; int hash = 0; int valueX = 0; do { hash ^= valueX; valueX = attributeValue.B.ReadByte(); } while (valueX != -1); attributeValue.B.Position = xPos; return hash; } return 0; } private static bool CompareAttributeValue(AttributeValue x, AttributeValue y) { if (AreBothNull(x, y)) { return true; } if (IsEitherNull(x, y)) { return false; } // Compare scalar properties of primary attributes. Primary attributes can only be scalar types. // http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/DataModel.html if (IsEitherNull(x.S, y.S) || IsEitherNull(x.N, y.N) || IsEitherNull(x.B, y.B)) { return false; } if (!AreBothNull(x.S, y.S)) { if (!x.S.Equals(y.S, StringComparison.OrdinalIgnoreCase)) { return false; } } if (!AreBothNull(x.N, y.N)) { if (!x.N.Equals(y.N, StringComparison.OrdinalIgnoreCase)) { return false; } } if (!AreBothNull(x.B, y.B)) { if (x.B.Length != y.B.Length) { return false; } long xPos = x.B.Position, yPos = y.B.Position; x.B.Position = 0; y.B.Position = 0; int valueX = 0, valueY = 0; do { if (valueX != valueY) { return false; } valueX = x.B.ReadByte(); valueY = y.B.ReadByte(); } while (valueX != -1 && valueY != -1); x.B.Position = xPos; y.B.Position = yPos; } return true; } private static bool IsEitherNull(T x, T y) where T : class { return ((x == null && y != null) || (x != null && y == null)); } private static bool AreBothNull(T x, T y) where T : class { return (x == null && y == null); } } }