();
/**
*
* For parallel Scan requests, TotalSegmentsrepresents the
* total number of segments for a table that is being scanned. Segments are
* a way to logically divide a table into equally sized portions, for the
* duration of the Scan request. The value of TotalSegments
* corresponds to the number of application "workers" (such as threads or
* processes) that will perform the parallel Scan. For example, if
* you want to scan a table using four application threads, you would
* specify a TotalSegments value of 4. The value for
* TotalSegments must be greater than or equal to 1, and less than or
* equal to 4096. If you specify a TotalSegments value of 1, the
* Scan will be sequential rather than parallel. If you specify
* TotalSegments, you must also specify Segment.
*
* Constraints:
*
*
*
*/
private int totalSegments;
/**
*
* For parallel Scan requests, Segment identifies an
* individual segment to be scanned by an application "worker" (such as a
* thread or a process). Each worker issues a Scan request with a
* distinct value for the segment it will scan. Segment IDs are zero-based,
* so the first segment is always 0. For example, if you want to scan a
* table using four application threads, the first thread would specify a
* Segment value of 0, the second thread would specify 1, and so on.
* LastEvaluatedKey returned from a parallel scan request must be used with
* same Segment id in a subsequent operation. The value for Segment
* must be less than or equal to 0, and less than the value provided for
* TotalSegments.
*
*/
private int segment;
/**
*
* Gets the total number of items that match the search parameters. If
* IsDone is true and CollectResults is true, returns Matches.Count.
* Otherwise, makes a call to DynamoDB to find out the number of matching
* items, without retrieving the items. Count is then cached.
*
*/
private int count = -1;
/**
* Name of the index to query or scan against.
*/
private String indexName;
/**
* Enum specifying what data to return from query.
*/
private Select select;
private final SearchType searchType;
protected Search(SearchType searchType) {
this.searchType = searchType;
}
protected Search() {
this.searchType = SearchType.QUERY;
}
protected Search withTableName(String tableName) {
this.tableName = tableName;
return this;
}
protected Search withCollectResults(boolean collectResults) {
this.collectResults = collectResults;
return this;
}
protected Search withLimit(int limit) {
this.limit = limit;
return this;
}
protected Search withKeyExpression(Expression keyExpression) {
this.keyExpression = keyExpression;
return this;
}
protected Search withFilterExpression(Expression filterExpression) {
this.filterExpression = filterExpression;
return this;
}
protected Search withFilter(Filter filter) {
this.filter = filter;
return this;
}
protected Search withConditionalOperator(ConditionalOperator conditionalOperator) {
this.conditionalOperator = conditionalOperator;
return this;
}
protected Search withAttributesToGet(List attributesToGet) {
this.attributesToGet = attributesToGet;
return this;
}
protected Search withIsConsistentRead(boolean isConsistentRead) {
this.isConsistentRead = isConsistentRead;
return this;
}
protected Search withIsBackwardSearch(boolean isBackwardSearch) {
this.isBackwardSearch = isBackwardSearch;
return this;
}
protected Search withNextKey(Map nextKey) {
this.nextKey = nextKey;
return this;
}
protected Search withTotalSegments(int totalSegments) {
this.totalSegments = totalSegments;
return this;
}
protected Search withSegment(int segment) {
this.segment = segment;
return this;
}
protected Search withIndexName(String indexName) {
this.indexName = indexName;
return this;
}
protected Search withSelect(Select select) {
this.select = select;
return this;
}
protected Search withTable(Table table) {
this.table = table;
return this;
}
/**
* @return paginated list of {@link Document}
*/
public List getNextResultSet() {
switch (this.searchType) {
case QUERY:
return getNextQueryResultSet();
case SCAN:
return getNextScanResultSet();
default:
throw new IllegalStateException("search type other than scan or query");
}
}
private List getNextScanResultSet() {
final List returnValue = new ArrayList();
final ScanRequest request = new ScanRequest();
request.withExclusiveStartKey(nextKey)
.withAttributesToGet(attributesToGet)
.withLimit(limit)
.withTableName(tableName)
.withConsistentRead(isConsistentRead);
if (select != null) {
request.withSelect(select);
}
if (this.filter != null) {
request.withScanFilter(this.filter.toConditions());
}
if (!StringUtils.isBlank(this.indexName)) {
request.withIndexName(indexName);
}
if (this.filterExpression != null && this.filterExpression.isSet()) {
this.filterExpression.applyExpression(request, table);
}
if (request.getScanFilter() != null && request.getScanFilter().size() > 1) {
request.setConditionalOperator(this.conditionalOperator);
}
if (this.totalSegments != 0) {
request.withTotalSegments(totalSegments)
.withSegment(segment);
}
Table.appendDynamoDBDocumentUserAgentString(request);
final ScanResult result = table.getClient().scan(request);
for (final Map item : result.getItems()) {
final Document doc = Document.fromAttributeMap(item);
returnValue.add(doc);
if (this.collectResults) {
this.matches.add(doc);
}
}
nextKey = result.getLastEvaluatedKey();
if (nextKey == null || nextKey.size() == 0) {
isDone = true;
}
return returnValue;
}
private List getNextQueryResultSet() {
final List returnValue = new ArrayList();
final QueryRequest request = new QueryRequest();
request.withExclusiveStartKey(nextKey)
.withAttributesToGet(attributesToGet)
.withLimit(limit)
.withTableName(tableName)
.withConsistentRead(isConsistentRead)
.withIndexName(this.indexName);
if (select != null) {
request.withSelect(select);
}
Expression.applyExpression(request, table, keyExpression, filterExpression);
if (this.filter != null) {
final Map keyConditions = getKeyConditions((QueryFilter) this.filter,
request.getIndexName());
final Map filterConditions = getFilterConditions(
(QueryFilter) this.filter,
request.getIndexName());
if (!keyConditions.isEmpty()) {
request.withKeyConditions(keyConditions);
}
if (!filterConditions.isEmpty()) {
request.withQueryFilter(filterConditions);
}
} else {
request.withKeyConditions(null).withQueryFilter(null);
}
if (request.getQueryFilter() != null && request.getQueryFilter().size() > 1) {
request.withConditionalOperator(this.conditionalOperator);
} else {
request.withConditionalOperator((String) null);
}
Table.appendDynamoDBDocumentUserAgentString(request);
final QueryResult result = table.getClient().query(request);
for (final Map item : result.getItems()) {
final Document doc = Document.fromAttributeMap(item);
returnValue.add(doc);
if (this.collectResults) {
this.matches.add(doc);
}
}
nextKey = result.getLastEvaluatedKey();
if (nextKey == null || nextKey.size() == 0) {
isDone = true;
}
return returnValue;
}
/**
* @return complete list of {@link Document}
*/
public List getAllResults() {
final List returnValue = new ArrayList();
while (!this.isDone) {
returnValue.addAll(getNextResultSet());
}
return returnValue;
}
private Map getKeyConditions(QueryFilter filter, String indexName) {
final Map keyConditions = new HashMap();
final Map conditions = filter.toConditions();
for (final Entry entry : conditions.entrySet()) {
final String attributeName = entry.getKey();
final Condition condition = entry.getValue();
if (isKeyAttribute(table, indexName, attributeName)) {
keyConditions.put(attributeName, condition);
}
}
return keyConditions;
}
private Map getFilterConditions(QueryFilter filter, String indexName) {
final Map filterConditions = new HashMap();
final Map conditions = filter.toConditions();
for (final Entry entry : conditions.entrySet()) {
final String attributeName = entry.getKey();
final Condition condition = entry.getValue();
if (!isKeyAttribute(table, indexName, attributeName)) {
filterConditions.put(attributeName, condition);
}
}
return filterConditions;
}
private static boolean isKeyAttribute(Table table, String indexName, String attributeName) {
GlobalSecondaryIndexDescription gsi = null;
final LocalSecondaryIndexDescription lsi;
if (StringUtils.isBlank(indexName)) {
return table.getKeys().containsKey(attributeName);
} else if (table.getGlobalSecondaryIndexes().get(indexName) != null) {
gsi = table.getGlobalSecondaryIndexes().get(indexName);
for (final KeySchemaElement element : gsi.getKeySchema()) {
return isKeyAttribute(element, attributeName);
}
} else if (table.getLocalSecondaryIndexes().get(indexName) != null) {
lsi = table.getLocalSecondaryIndexes().get(indexName);
for (final KeySchemaElement element : lsi.getKeySchema()) {
return isKeyAttribute(element, attributeName);
}
} else {
throw new IllegalStateException(String.format(
"Unable to locate index %s on table %s", indexName, table.getTableName()));
}
return false;
}
private static boolean isKeyAttribute(KeySchemaElement element, String attributeName) {
return element.getAttributeName().equals(attributeName);
}
/**
* Gets the count of the search result.
*
* @return count.
*/
public int getCount() {
if (isDone && this.collectResults) {
return this.matches.size();
} else {
if (count != -1) {
return count;
} else {
switch (searchType) {
case SCAN: {
final ScanRequest request = new ScanRequest();
request.withExclusiveStartKey(nextKey)
.withTableName(tableName)
.withScanFilter(this.filter.toConditions())
.withSelect(Select.COUNT)
.withConsistentRead(isConsistentRead);
if (!StringUtils.isBlank(this.indexName)) {
request.withIndexName(indexName);
}
if (this.filterExpression != null && this.filterExpression.isSet()) {
this.filterExpression.applyExpression(request, table);
}
if (request.getScanFilter() != null && request.getScanFilter().size() > 1) {
request.setConditionalOperator(this.conditionalOperator);
}
if (this.totalSegments != 0) {
request.withTotalSegments(totalSegments)
.withSegment(segment);
}
Table.appendDynamoDBDocumentUserAgentString(request);
final ScanResult result = table.getClient().scan(request);
count = this.matches.size() + result.getCount();
return count;
}
case QUERY: {
final QueryRequest request = new QueryRequest();
request.withExclusiveStartKey(nextKey)
.withTableName(tableName)
.withSelect(Select.COUNT)
.withConsistentRead(isConsistentRead)
.withIndexName(this.indexName);
Expression.applyExpression(request, table, keyExpression, filterExpression);
final Map keyConditions = getKeyConditions(
(QueryFilter) this.filter,
request.getIndexName());
final Map filterConditions = getFilterConditions(
(QueryFilter) this.filter,
request.getIndexName());
request.withKeyConditions(keyConditions).withQueryFilter(filterConditions);
if (request.getQueryFilter() != null
&& request.getQueryFilter().size() > 1) {
request.withConditionalOperator(this.conditionalOperator);
}
Table.appendDynamoDBDocumentUserAgentString(request);
final QueryResult result = table.getClient().query(request);
count = this.matches.size() + result.getCount();
return count;
}
default: {
throw new IllegalStateException("search type other than scan or query");
}
}
}
}
}
}