/* * * Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved. * SPDX-License-Identifier: MIT-0 * */ package com.amazonaws.elasticachedemo; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.util.Date; import java.util.Iterator; import java.util.Properties; import java.util.concurrent.ExecutionException; import redis.clients.jedis.HostAndPort; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisCluster; import redis.clients.jedis.JedisPool; import redis.clients.jedis.params.SetParams; /** * This class demonstrates how to use Amazon ElastiCache for Redis * in cluster mode using the Jedis client. */ public class RedisElastiCacheClusterModeDemo { /** The properties object to hold the config. */ private Properties properties = null; /** The Redis cluster host name. */ private String redisHost = null; /** The Redis cluster port. */ private int redisPort = 0; /** The cache-expiry value (in seconds). */ private int cacheExpiryInSecs = 0; /** * The flag that denotes whether cache needs to be flushed or not on shutdown. */ private boolean cacheFlushOnShutdown = false; /** The client shutdown timeout value (in seconds). */ private int clientTimeoutInSecs = 0; /** The number of auto-generated key-value entries. */ private int numberOfAutoGeneratedEntries = 0; /** The Jedis Cluster object. */ private JedisCluster jedisCluster = null; /** * Constructor performing initialization. * * @throws IOException Signals that an I/O exception has occurred. */ public RedisElastiCacheClusterModeDemo() throws IOException { initialize(); } /** * Loads the properties from the config file and starts the Jedis client. * * @throws IOException Signals that an I/O exception has occurred. */ private void initialize() throws IOException { loadProperties(); startClient(); } /** * Loads properties from the config file. * * @throws IOException Signals that an I/O exception has occurred. */ private void loadProperties() throws IOException { System.out.println("Reading config file..."); properties = new Properties(); FileInputStream fis = new FileInputStream(new File(System.getProperty("CONFIG_FILE_NAME"))); properties.load(fis); redisHost = properties.getProperty("REDIS_CLUSTER_ENDPOINT_HOSTNAME"); redisPort = Integer.parseInt(properties.getProperty("REDIS_CLUSTER_ENDPOINT_PORT")); cacheExpiryInSecs = Integer.parseInt(properties.getProperty("REDIS_CACHE_EXPIRY_IN_SECS")); cacheFlushOnShutdown = Boolean.parseBoolean(properties.getProperty("REDIS_CACHE_FLUSH_ON_SHUTDOWN")); clientTimeoutInSecs = Integer.parseInt(properties.getProperty("REDIS_CLIENT_TIMEOUT_IN_SECS")); numberOfAutoGeneratedEntries = Integer.parseInt(properties.getProperty("NUMBER_OF_AUTO_GENERATED_ENTRIES")); fis.close(); System.out.println("Completed reading config file."); } /** * Starts the Jedis client to connect in cluster mode. * * @throws IOException Signals that an I/O exception has occurred. */ private void startClient() throws IOException { System.out.println("Initializing Redis client..."); jedisCluster = new JedisCluster(new HostAndPort(redisHost, redisPort), clientTimeoutInSecs); System.out.println("Completed initializing Redis client."); } /** * Stops the Jedis client. */ private void stopClient() { System.out.println("Shutting down Redis client..."); jedisCluster.close(); System.out.println("Completed shutting down Redis client."); } /** * Upserts cache entries - loops through and calls the upsertCacheEntry method. * * @param keyPrefix the key prefix * @param valuePrefix the value prefix * @param checkExists the check exists * @throws InterruptedException the interrupted exception * @throws ExecutionException the execution exception */ private void upsertCacheEntries(String keyPrefix, String valuePrefix, boolean checkExists) throws InterruptedException, ExecutionException { for (int i = 1; i <= numberOfAutoGeneratedEntries; i++) { upsertCacheEntry("Name" + i, "Value" + i, false); } } /** * Upsert cache entry - adds if not present; else updates based on the key. * * @param key the key * @param value the value * @param checkExists the check exists * @throws InterruptedException the interrupted exception * @throws ExecutionException the execution exception */ private void upsertCacheEntry(String key, String value, boolean checkExists) throws InterruptedException, ExecutionException { boolean valueExists = false; if (checkExists && (getCacheValue(key) != null)) { valueExists = true; } String result = jedisCluster.set(key, value, (new SetParams()).ex(cacheExpiryInSecs)); if (result.equalsIgnoreCase("OK")) { if (checkExists) { if (valueExists) { System.out.println("Updated = {key=" + key + ", value=" + value + "}"); } else { System.out.println("Inserted = {key=" + key + ", value=" + value + "}"); } } else { System.out.println("Upserted = {key=" + key + ", value=" + value + "}"); } } else { System.out.println("Could not upsert key '" + key + "'"); } } /** * Deletes a random cache entry; uses the key-prefix and generates the full key * randomly. * * @param keyPrefix the key prefix * @throws InterruptedException the interrupted exception * @throws ExecutionException the execution exception */ private void deleteRandomCacheEntry(String keyPrefix) throws InterruptedException, ExecutionException { deleteCacheEntry(keyPrefix + getRandomInteger(1, numberOfAutoGeneratedEntries)); } /** * Deletes the cache entry for the specified key. * * @param key the key * @throws InterruptedException the interrupted exception * @throws ExecutionException the execution exception */ private void deleteCacheEntry(String key) throws InterruptedException, ExecutionException { Long result = jedisCluster.del(key); if (result.longValue() == 1L) { System.out.println("Deleted key '" + key + "'"); System.out.println("Testing delete..."); getCacheValue(key); System.out.println("Completed testing delete."); } else { System.out.println("Could not delete key '" + key + "'"); } } /** * Flushes the cache. * * @throws InterruptedException the interrupted exception * @throws ExecutionException the execution exception */ private void flushCache() throws InterruptedException, ExecutionException { boolean flushSucceeded = true; long startTime = (new Date()).getTime(); Iterator jedisPoolIterator = jedisCluster.getClusterNodes().values().iterator(); poolIteration: while (jedisPoolIterator.hasNext()) { JedisPool pool = jedisPoolIterator.next(); Jedis jedis = pool.getResource(); String result = jedis.flushAll(); if (!result.equalsIgnoreCase("OK")) { System.out.println("Could not flush cache."); flushSucceeded = false; break poolIteration; } } if (flushSucceeded) { long endTime = (new Date()).getTime(); System.out.println("Flushed cache in " + (endTime - startTime) + " millisecond(s)."); } } /** * Gets the random cache value; uses the key-prefix and generates the full key * randomly. * * @param keyPrefix the key prefix * @return the random cache value */ private String getRandomCacheValue(String keyPrefix) { return getCacheValue(keyPrefix + getRandomInteger(1, numberOfAutoGeneratedEntries)); } /** * Gets the specified cache value. * * @param key the key * @return the cache value */ private String getCacheValue(String key) { long startTime = (new Date()).getTime(); String value = jedisCluster.get(key); long endTime = (new Date()).getTime(); if (value != null) { System.out.println("Retrieved value='" + value + "' for key= '" + key + "' in " + (endTime - startTime) + " millisecond(s)."); } else { System.out.println("Key '" + key + "' not found."); } return value; } /** * Perform shutdown - flush the cache and stop the Jedis client. * * @throws InterruptedException the interrupted exception * @throws ExecutionException the execution exception */ private void shutdown() throws InterruptedException, ExecutionException { if (cacheFlushOnShutdown) { flushCache(); } stopClient(); } /** * Gets a random integer from a range of integers. * * @param minimum the minimum * @param maximum the maximum * @return the random integer */ private int getRandomInteger(int minimum, int maximum) { return ((int) (Math.random() * (maximum - minimum))) + minimum; } /** * The main method performs the following, * 1. Reads the config file. * 2. Instantiates the Jedis client to connect in cluster mode. * 3. Upserts the specified number of key-values in the cache. * 4. Prints the value of a random key in the range. * 5. Deletes the key-value entry for a random key in the range. * 6. Flushes the cache (if enabled). * 7. Shuts down the Jedis client. * * @param args the arguments * @throws IOException Signals that an I/O exception has occurred. * @throws InterruptedException the interrupted exception * @throws ExecutionException the execution exception */ public static void main(String[] args) throws IOException, InterruptedException, ExecutionException { RedisElastiCacheClusterModeDemo redisElastiCacheClusterModeDemo = new RedisElastiCacheClusterModeDemo(); redisElastiCacheClusterModeDemo.upsertCacheEntries("Name", "Value", false); redisElastiCacheClusterModeDemo.getRandomCacheValue("Name"); redisElastiCacheClusterModeDemo.deleteRandomCacheEntry("Name"); redisElastiCacheClusterModeDemo.shutdown(); } }