// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package com.amazon.aws.cqlreplicator.task.replication;
import com.amazon.aws.cqlreplicator.storage.AdvancedCacheV2;
import com.amazon.aws.cqlreplicator.storage.SourceStorageOnCassandra;
import com.amazon.aws.cqlreplicator.storage.StorageServiceImpl;
import com.amazon.aws.cqlreplicator.task.AbstractTaskV2;
import com.amazon.aws.cqlreplicator.util.Utils;
import com.datastax.oss.driver.api.core.cql.BoundStatementBuilder;
import com.datastax.oss.driver.api.core.cql.Row;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.math.BigInteger;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.regex.Pattern;
import java.util.stream.IntStream;
import static com.amazon.aws.cqlreplicator.util.Utils.*;
/**
* The {@code PartitionDiscoveryTask} class provides partition key synchronization between Cassandra
* cluster and Amazon Keyspaces by using token range split. This implementation makes ~
* &scanAndCompare; scan Cassandra cluster and compare any token range of n.
*
*
The {@code PartitionDiscoveryTask} splits Cassandra token range in m tiles and each
* instance of PartitionDiscoveryTask handles only one tile
*
*
*/
public class PartitionDiscoveryTaskV2 extends AbstractTaskV2 {
private static final Logger LOGGER = LoggerFactory.getLogger(PartitionDiscoveryTaskV2.class);
private static final Pattern REGEX_PIPE = Pattern.compile("\\|");
private static int ADVANCED_CACHE_SIZE;
private static SourceStorageOnCassandra sourceStorageOnCassandra;
private static Map> metaData;
private final Properties config;
/**
* Constructor for PartitionDiscoveryTask.
*
* @param config the array to be sorted
*/
public PartitionDiscoveryTaskV2(Properties config) {
this.config = config;
ADVANCED_CACHE_SIZE =
Integer.parseInt(config.getProperty("LOCAL_STORAGE_PARTITIONS_PER_PAGE"));
sourceStorageOnCassandra = new SourceStorageOnCassandra(config);
metaData = sourceStorageOnCassandra.getMetaData();
}
/**
* Scan and compare partition keys.
*/
private void scanAndCompare(StorageServiceImpl storageService,
List> rangeList, String[] pks)
throws IOException, InterruptedException, ExecutionException, TimeoutException {
AdvancedCacheV2 advancedCache;
List resultSetRange;
advancedCache =
new AdvancedCacheV2<>(ADVANCED_CACHE_SIZE, storageService) {
@Override
protected void flush(List payload, StorageServiceImpl storage)
throws IOException {
var totalChunks = String.format("%s", "totalChunks");
var currentChunk = bytesToInt(storage.readTileMetadata(totalChunks));
LOGGER.debug("{}:{}", totalChunks, currentChunk);
var cborPayload = Utils.cborEncoder(payload);
var compressedCborPayload = Utils.compress(cborPayload);
var keyOfChunk =
String.format("%s|%s", "pksChunk", currentChunk);
storage.writePartitionsByChunk(keyOfChunk, compressedCborPayload);
storage.incrByOne(totalChunks);
}
};
boolean totalChunksExist =
storageService.containsInPartitions(
String.format("%s", "totalChunks"));
if (!totalChunksExist)
storageService.writeTileMetadata(
String.format("%s", "totalChunks"),
intToBytes(0));
var pksStr = String.join(",", pks);
var partitioner = sourceStorageOnCassandra.getPartitioner();
LOGGER.debug("Partitioner: {}", partitioner);
for (ImmutablePair range : rangeList) {
Object rangeStart, rangeEnd;
if (!partitioner.equals("org.apache.cassandra.dht.RandomPartitioner")) {
rangeStart = Long.parseLong(range.left);
rangeEnd = Long.parseLong(range.right);
resultSetRange =
sourceStorageOnCassandra.findPartitionsByTokenRange(
pksStr, (Long) rangeStart, (Long) rangeEnd);
} else {
rangeStart = new BigInteger(range.left);
rangeEnd = new BigInteger(range.right);
resultSetRange =
sourceStorageOnCassandra.findPartitionsByTokenRange(
pksStr, (BigInteger) rangeStart, (BigInteger) rangeEnd);
}
LOGGER.trace("Processing a range: {} - {}", rangeStart, rangeEnd);
for (Row eachResult : resultSetRange) {
var i = 0;
List tmp = new ArrayList<>();
for (String cl : pks) {
var type = metaData.get("partition_key").get(cl);
tmp.add(String.valueOf(eachResult.get(pks[i], Utils.getClassType(type.toUpperCase()))));
i++;
}
//TODO: Implement PartitionKeyV2
var joinedPartitionKeys = String.join("|", tmp);
var flag = storageService.containsInPartitions(joinedPartitionKeys);
if (!flag) {
//TODO: Implement PartitionKeyV2
storageService.writePartition(
joinedPartitionKeys
);
//TODO: Implement PartitionKeyV2
advancedCache.put(joinedPartitionKeys);
LOGGER.debug("Syncing a new partition key: {}", joinedPartitionKeys);
}
}
}
if (advancedCache.getSize() > 0) {
LOGGER.debug("Flushing remainders: {}", advancedCache.getSize());
advancedCache.doFlush();
}
LOGGER.debug("Comparing stage is running");
}
//TODO: Implement PartitionKeyV2
private void deletePartitions(String[] pks, StorageServiceImpl storageService, int chunk)
throws IOException {
var keyOfChunkFirst = String.format("%s|%s", "pksChunk", chunk);
var compressedPayloadFirst = storageService.readPartitionsByChunk(keyOfChunkFirst);
var cborPayloadFirst = Utils.decompress(compressedPayloadFirst);
var collection = Utils.cborDecoder(cborPayloadFirst);
//TODO: Implement PartitionKeyV2
var finalClonedCollection = new CopyOnWriteArrayList<>(collection);
collection.parallelStream().forEach(
key -> {
BoundStatementBuilder boundStatementCassandraBuilder =
sourceStorageOnCassandra.getCassandraPreparedStatement().boundStatementBuilder();
//TODO: Implement PartitionKeyV2
LOGGER.debug("Processing partition key: {}", key);
//TODO: Implement PartitionKeyV2
var pk = REGEX_PIPE.split((String) key);
var i = 0;
//TODO: Implement PartitionKeyV2
for (String cl : pks) {
var type = metaData.get("partition_key").get(cl);
try {
boundStatementCassandraBuilder =
Utils.aggregateBuilder(type, cl, pk[i], boundStatementCassandraBuilder);
} catch (Exception e) {
throw new RuntimeException(e);
}
i++;
}
List cassandraResult =
sourceStorageOnCassandra.extract(boundStatementCassandraBuilder);
// Found deleted partition key
if (cassandraResult.size() == 0) {
LOGGER.debug("Found deleted partition key {}", key);
// Remove partition key from the cache
//TODO: Implement PartitionKeyV2
storageService.deletePartition(key.toString());
finalClonedCollection.remove(key);
}
});
if (finalClonedCollection.size() < collection.size()) {
var cborPayload = Utils.cborEncoder(finalClonedCollection);
var compressedPayload = Utils.compress(cborPayload);
var keyOfChunk = String.format("%s|%s", "pksChunk", chunk);
storageService.writePartitionsByChunk(keyOfChunk, compressedPayload);
}
if (finalClonedCollection.size() == 0) {
var keyOfChunk = String.format("%s", "totalChunks");
storageService.decrByOne(keyOfChunk);
}
}
/**
* Scan and remove deleted partition keys.
*
* @params rangeList, pkCache, pks the array to be sorted
*/
//TODO: Implement PartitionKeyV2
private void scanAndRemove(StorageServiceImpl storageService, Utils.CassandraTaskTypes taskName, String[] pks) throws IOException {
if (taskName.equals(Utils.CassandraTaskTypes.SYNC_DELETED_PARTITION_KEYS)) {
LOGGER.debug("Syncing deleted partition keys between C* and Amazon Keyspaces");
var totalChunks = String.format("%s", "totalChunks");
var chunks = bytesToInt(storageService.readTileMetadata(totalChunks));
// remove each chunk of partition keys
IntStream.range(0, chunks).parallel().forEach(
chunk -> {
try {
deletePartitions(pks, storageService, chunk);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
);
//for (int chunk = 0; chunk < chunks; chunk++) {
// deletePartitions(pks, storageService, chunk);
//}
}
}
@Override
protected void doPerformTask(StorageServiceImpl storageService, Utils.CassandraTaskTypes taskName)
throws IOException, InterruptedException, ExecutionException, TimeoutException {
var pks = metaData.get("partition_key").keySet().toArray(new String[0]);
LOGGER.debug("SOURCE_QUERY: {}", config.getProperty("SOURCE_CQL_QUERY"));
List> ranges = sourceStorageOnCassandra.getTokenRanges();
var totalRanges = ranges.size();
List>> tiles =
getDistributedRangesByTiles(ranges, Integer.parseInt(config.getProperty("TILES")));
var currentTile = Integer.parseInt(config.getProperty("TILE"));
List> rangeList = tiles.get(currentTile);
// if tiles = 0 we need to scan one range from one pkScanner, if tiles>0 we need to scan all
// ranges from the pkScanner
LOGGER.debug("The number of ranges in the cassandra: {}", totalRanges);
LOGGER.debug("The number of ranges for the tile: {}", rangeList.size());
LOGGER.debug("The number of tiles: {}", tiles.size());
LOGGER.debug("The current tile: {}", currentTile);
scanAndCompare(storageService, rangeList, pks);
if (config.getProperty("REPLICATE_DELETES").equals("true")) {
scanAndRemove(storageService, taskName, pks);
}
LOGGER.debug("Caching and comparing stage is completed");
}
}