/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
using System;
using System.Collections.Generic;
using System.Linq;
using Amazon.DynamoDBv2.Model;
using Amazon.Runtime;
using System.Globalization;
#if AWS_ASYNC_API
using System.Threading.Tasks;
#endif
using System.Threading;
namespace Amazon.DynamoDBv2.DocumentModel
{
///
/// Search response object
///
public partial class Search
{
#region Internal constructors
internal Search()
: this((SearchType)0)
{
}
internal Search(SearchType searchMethod)
{
SearchMethod = searchMethod;
Reset();
}
#endregion
#region Public properties
///
/// Name of the table being searched
///
public String TableName { get; internal set; }
///
/// Whether to collect GetNextSet and GetRemaining results in Matches property.
/// Default is true. If set to false, Matches will always be empty.
///
public bool CollectResults { get; internal set; }
///
/// Upper limit on the number of items returned.
/// For Queries: upper limit on the number of items returned.
/// For Scan: limit on the number of items scanned.
///
/// Maps directly to Limit property on Query and Scan operations.
///
public int Limit { get; internal set; }
///
/// The key expression that is evaluated for each item of a query.
/// This applies only to Query operations.
///
public Expression KeyExpression { get; set; }
///
/// The filter expression that is evaluated for each item.
/// This applies to Query and Scan operations.
///
public Expression FilterExpression { get; set; }
///
/// Filter for the search operation
/// This applies to Query and Scan operations.
///
public Filter Filter { get; internal set; }
///
/// Conditional operator for the search operation
///
public ConditionalOperatorValues ConditionalOperator { get; internal set; }
///
/// List of attribute names to retrieve
///
public List AttributesToGet { get; internal set; }
///
/// Flag that, if true, indicates that the search is operating in consistent-read mode
///
public bool IsConsistentRead { get; internal set; }
///
/// Flag that, if true, indicates that the search is traversing backwards
///
public bool IsBackwardSearch { get; internal set; }
///
/// Flag that, if true, indicates that the search is done
///
public bool IsDone { get; private set; }
///
/// Key denoting the starting point of the next set of results
///
public Dictionary NextKey { get; private set; }
///
/// Pagination token corresponding to the item where the search operation stopped,
/// inclusive of the previous result set. Use this value to start a new
/// operation to resume search from the next item.
///
public string PaginationToken
{
get
{
return Common.ToPaginationToken(NextKey);
}
internal set
{
NextKey = Common.FromPaginationToken(value);
}
}
///
/// List of currently found items
///
public List Matches { get; private set; }
///
/// For parallel Scan requests, TotalSegmentsrepresents the total number of segments for a table that is being scanned. Segments
/// are a way to logically divide a table into equally sized portions, for the duration of the Scan request. The value of
/// TotalSegments corresponds to the number of application "workers" (such as threads or processes) that will perform the parallel
/// Scan. For example, if you want to scan a table using four application threads, you would specify a TotalSegments value of 4.
/// The value for TotalSegments must be greater than or equal to 1, and less than or equal to 4096. If you specify a TotalSegments
/// value of 1, the Scan will be sequential rather than parallel. If you specify TotalSegments, you must also specify
/// Segment.
///
///
/// Constraints:
///
/// -
/// Range
/// 1 - 4096
///
///
///
///
public int TotalSegments { get; set; }
///
/// For parallel Scan requests, Segment identifies an individual segment to be scanned by an application "worker" (such as a
/// thread or a process). Each worker issues a Scan request with a distinct value for the segment it will scan. Segment IDs are
/// zero-based, so the first segment is always 0. For example, if you want to scan a table using four application threads, the first thread
/// would specify a Segment value of 0, the second thread would specify 1, and so on. LastEvaluatedKey returned from a parallel scan
/// request must be used with same Segment id in a subsequent operation. The value for Segment must be less than or equal to 0, and less
/// than the value provided for TotalSegments. If you specify Segment, you must also specify TotalSegments.
///
///
/// Constraints:
///
/// -
/// Range
/// 0 - 4095
///
///
///
///
public int Segment { get; set; }
///
/// Gets the total number of items that match the search parameters.
///
/// If IsDone is true and CollectResults is true, returns Matches.Count.
/// Otherwise, makes a call to DynamoDB to find out the number of
/// matching items, without retrieving the items. Count is then cached.
///
public int Count { get { return GetCount(); } }
///
/// Name of the index to query or scan against.
///
public string IndexName { get; internal set; }
///
/// Enum specifying what data to return from query.
///
public SelectValues Select { get; internal set; }
#endregion
#region Private/internal members
internal List GetNextSetHelper()
{
List ret = new List();
if (!IsDone)
{
switch (SearchMethod)
{
case SearchType.Scan:
ScanRequest scanReq = new ScanRequest
{
ExclusiveStartKey = NextKey,
Limit = Limit,
TableName = TableName,
AttributesToGet = AttributesToGet,
ScanFilter = Filter.ToConditions(SourceTable),
Select = EnumMapper.Convert(Select),
ConsistentRead = IsConsistentRead
};
if (!string.IsNullOrEmpty(this.IndexName))
scanReq.IndexName = this.IndexName;
if (this.FilterExpression != null && this.FilterExpression.IsSet)
this.FilterExpression.ApplyExpression(scanReq, SourceTable);
if (scanReq.ScanFilter != null && scanReq.ScanFilter.Count > 1)
scanReq.ConditionalOperator = EnumMapper.Convert(ConditionalOperator);
Common.ConvertAttributesToGetToProjectionExpression(scanReq);
if (this.TotalSegments != 0)
{
scanReq.TotalSegments = this.TotalSegments;
scanReq.Segment = this.Segment;
}
SourceTable.AddRequestHandler(scanReq, isAsync: false);
var scanResult = SourceTable.DDBClient.Scan(scanReq);
foreach (var item in scanResult.Items)
{
Document doc = SourceTable.FromAttributeMap(item);
ret.Add(doc);
if (CollectResults)
{
Matches.Add(doc);
}
}
NextKey = scanResult.LastEvaluatedKey;
if (NextKey == null || NextKey.Count == 0)
{
IsDone = true;
}
return ret;
case SearchType.Query:
QueryRequest queryReq = new QueryRequest
{
TableName = TableName,
ConsistentRead = IsConsistentRead,
Select = EnumMapper.Convert(Select),
ExclusiveStartKey = NextKey,
Limit = Limit,
ScanIndexForward = !IsBackwardSearch,
AttributesToGet = AttributesToGet,
IndexName = IndexName,
};
Expression.ApplyExpression(queryReq, SourceTable, KeyExpression, FilterExpression);
Dictionary keyConditions, filterConditions;
SplitQueryFilter(Filter, SourceTable, queryReq.IndexName, out keyConditions, out filterConditions);
queryReq.KeyConditions = keyConditions;
queryReq.QueryFilter = filterConditions;
Common.ConvertAttributesToGetToProjectionExpression(queryReq);
if (queryReq.QueryFilter != null && queryReq.QueryFilter.Count > 1)
queryReq.ConditionalOperator = EnumMapper.Convert(ConditionalOperator);
SourceTable.AddRequestHandler(queryReq, isAsync: false);
var queryResult = SourceTable.DDBClient.Query(queryReq);
foreach (var item in queryResult.Items)
{
Document doc = SourceTable.FromAttributeMap(item);
ret.Add(doc);
if (CollectResults)
{
Matches.Add(doc);
}
}
NextKey = queryResult.LastEvaluatedKey;
if (NextKey == null || NextKey.Count == 0)
{
IsDone = true;
}
return ret;
default:
throw new InvalidOperationException("Unknown Search Method");
}
}
return ret;
}
#if AWS_ASYNC_API
internal async Task> GetNextSetHelperAsync(CancellationToken cancellationToken)
{
List ret = new List();
if (!IsDone)
{
switch (SearchMethod)
{
case SearchType.Scan:
ScanRequest scanReq = new ScanRequest
{
ExclusiveStartKey = NextKey,
Limit = Limit,
TableName = TableName,
AttributesToGet = AttributesToGet,
ScanFilter = Filter.ToConditions(SourceTable),
Select = EnumMapper.Convert(Select),
ConsistentRead = IsConsistentRead
};
if (!string.IsNullOrEmpty(this.IndexName))
scanReq.IndexName = this.IndexName;
if (this.FilterExpression != null && this.FilterExpression.IsSet)
this.FilterExpression.ApplyExpression(scanReq, SourceTable);
if (scanReq.ScanFilter != null && scanReq.ScanFilter.Count > 1)
scanReq.ConditionalOperator = EnumMapper.Convert(ConditionalOperator);
Common.ConvertAttributesToGetToProjectionExpression(scanReq);
if (this.TotalSegments != 0)
{
scanReq.TotalSegments = this.TotalSegments;
scanReq.Segment = this.Segment;
}
SourceTable.AddRequestHandler(scanReq, isAsync: true);
var scanResult = await SourceTable.DDBClient.ScanAsync(scanReq, cancellationToken).ConfigureAwait(false);
foreach (var item in scanResult.Items)
{
Document doc = SourceTable.FromAttributeMap(item);
ret.Add(doc);
if (CollectResults)
{
Matches.Add(doc);
}
}
NextKey = scanResult.LastEvaluatedKey;
if (NextKey == null || NextKey.Count == 0)
{
IsDone = true;
}
return ret;
case SearchType.Query:
QueryRequest queryReq = new QueryRequest
{
TableName = TableName,
ConsistentRead = IsConsistentRead,
Select = EnumMapper.Convert(Select),
ExclusiveStartKey = NextKey,
Limit = Limit,
ScanIndexForward = !IsBackwardSearch,
AttributesToGet = AttributesToGet,
IndexName = IndexName,
};
Expression.ApplyExpression(queryReq, SourceTable, KeyExpression, FilterExpression);
Dictionary keyConditions, filterConditions;
SplitQueryFilter(Filter, SourceTable, queryReq.IndexName, out keyConditions, out filterConditions);
queryReq.KeyConditions = keyConditions;
queryReq.QueryFilter = filterConditions;
Common.ConvertAttributesToGetToProjectionExpression(queryReq);
if (queryReq.QueryFilter != null && queryReq.QueryFilter.Count > 1)
queryReq.ConditionalOperator = EnumMapper.Convert(ConditionalOperator);
SourceTable.AddRequestHandler(queryReq, isAsync: true);
var queryResult = await SourceTable.DDBClient.QueryAsync(queryReq, cancellationToken).ConfigureAwait(false);
foreach (var item in queryResult.Items)
{
Document doc = SourceTable.FromAttributeMap(item);
ret.Add(doc);
if (CollectResults)
{
Matches.Add(doc);
}
}
NextKey = queryResult.LastEvaluatedKey;
if (NextKey == null || NextKey.Count == 0)
{
IsDone = true;
}
return ret;
default:
throw new InvalidOperationException("Unknown Search Method");
}
}
return ret;
}
#endif
internal List GetRemainingHelper()
{
List ret = new List();
while (!IsDone)
{
foreach (Document doc in GetNextSetHelper())
{
ret.Add(doc);
}
}
return ret;
}
#if AWS_ASYNC_API
internal async Task> GetRemainingHelperAsync(CancellationToken cancellationToken)
{
List ret = new List();
while (!IsDone)
{
foreach (Document doc in await GetNextSetHelperAsync(cancellationToken).ConfigureAwait(false))
{
ret.Add(doc);
}
}
return ret;
}
#endif
private int count;
private SearchType SearchMethod { get; set; }
internal Table SourceTable { get; set; }
private static void SplitQueryFilter(Filter filter, Table targetTable, string indexName, out Dictionary keyConditions, out Dictionary filterConditions)
{
QueryFilter queryFilter = filter as QueryFilter;
if (queryFilter == null) throw new InvalidOperationException("Filter is not of type QueryFilter");
keyConditions = new Dictionary();
filterConditions = new Dictionary();
var conditions = filter.ToConditions(targetTable);
foreach (var kvp in conditions)
{
string attributeName = kvp.Key;
Condition condition = kvp.Value;
// depending on whether the attribute is key, place either in keyConditions or filterConditions
if (IsKeyAttribute(targetTable, indexName, attributeName))
keyConditions[attributeName] = condition;
else
filterConditions[attributeName] = condition;
}
}
// Test if the given attribute is a key on the table or a key on the given index
private static bool IsKeyAttribute(Table table, string indexName, string attributeName)
{
GlobalSecondaryIndexDescription gsi;
LocalSecondaryIndexDescription lsi;
// if no index, check only table keys
if (string.IsNullOrEmpty(indexName))
return table.Keys.ContainsKey(attributeName);
// for an index, check if attribute is part of KeySchema for GSI or LSI
else if (table.GlobalSecondaryIndexes.TryGetValue(indexName, out gsi) && gsi != null)
return gsi.KeySchema.Any(AttributeIsKey(attributeName));
else if (table.LocalSecondaryIndexes.TryGetValue(indexName, out lsi) && lsi != null)
return lsi.KeySchema.Any(AttributeIsKey(attributeName));
else
throw new InvalidOperationException(string.Format(CultureInfo.InvariantCulture,
"Unable to locate index [{0}] on table [{1}]", indexName, table.TableName));
}
private static Func AttributeIsKey(string attributeName)
{
return (kse => string.Equals(kse.AttributeName, attributeName, StringComparison.Ordinal));
}
private int GetCount()
{
if (IsDone && CollectResults)
{
return Matches.Count;
}
else
{
if (count != -1)
{
return count;
}
else
{
switch (SearchMethod)
{
case SearchType.Scan:
ScanRequest scanReq = new ScanRequest
{
TableName = TableName,
Select = EnumMapper.Convert(SelectValues.Count),
ExclusiveStartKey = NextKey,
ScanFilter = Filter.ToConditions(SourceTable.Conversion, SourceTable.IsEmptyStringValueEnabled),
ConsistentRead = IsConsistentRead
};
if (!string.IsNullOrEmpty(this.IndexName))
scanReq.IndexName = this.IndexName;
if (this.FilterExpression != null && this.FilterExpression.IsSet)
this.FilterExpression.ApplyExpression(scanReq, SourceTable);
if (scanReq.ScanFilter != null && scanReq.ScanFilter.Count > 1)
scanReq.ConditionalOperator = EnumMapper.Convert(ConditionalOperator);
if (this.TotalSegments != 0)
{
scanReq.TotalSegments = this.TotalSegments;
scanReq.Segment = this.Segment;
}
SourceTable.AddRequestHandler(scanReq, isAsync: false);
var scanResult = SourceTable.DDBClient.Scan(scanReq);
count = Matches.Count + scanResult.Count;
return count;
case SearchType.Query:
QueryRequest queryReq = new QueryRequest
{
TableName = TableName,
ConsistentRead = IsConsistentRead,
Select = EnumMapper.Convert(SelectValues.Count),
ExclusiveStartKey = NextKey,
ScanIndexForward = !IsBackwardSearch,
IndexName = IndexName
};
Expression.ApplyExpression(queryReq, SourceTable, KeyExpression, FilterExpression);
Dictionary keyConditions, filterConditions;
SplitQueryFilter(Filter, SourceTable, queryReq.IndexName, out keyConditions, out filterConditions);
queryReq.KeyConditions = keyConditions;
queryReq.QueryFilter = filterConditions;
if (queryReq.QueryFilter != null && queryReq.QueryFilter.Count > 1)
queryReq.ConditionalOperator = EnumMapper.Convert(ConditionalOperator);
SourceTable.AddRequestHandler(queryReq, isAsync: false);
var queryResult = SourceTable.DDBClient.Query(queryReq);
count = Matches.Count + queryResult.Count;
return count;
default:
throw new InvalidOperationException("Unknown Search Method");
}
}
}
}
///
/// Resets the Search object to default state.
///
internal void Reset()
{
count = -1;
IsDone = false;
NextKey = null;
Matches = new List();
CollectResults = true;
}
#endregion
}
}