diff --git a/caching/src/main/java/org/eclipsefoundation/caching/service/impl/RedisCacheService.java b/caching/src/main/java/org/eclipsefoundation/caching/service/impl/RedisCacheService.java index 1c2c0eb7a6c991b54d0c5026b550ad8ebb40ff4a..35beb4c7b9d05607f1a3b71e25e0afa449417541 100644 --- a/caching/src/main/java/org/eclipsefoundation/caching/service/impl/RedisCacheService.java +++ b/caching/src/main/java/org/eclipsefoundation/caching/service/impl/RedisCacheService.java @@ -42,241 +42,259 @@ import jakarta.ws.rs.core.MultivaluedMap; /** * Custom Redis cache implementation. This interfaces with Redis through a client, as the cache layer can only be configured to run using * caffeine or Redis, and not both at the same time for different regions. - * - * As Redis doesn't support lists natively and instead uses serialized concrete types, any place where we need list entities, we need to - * pre-serialize to a JSON string and store and retrieve that. While it adds small complexity, list support is critical to caching. */ @ApplicationScoped public class RedisCacheService implements DistributedCacheService { - private static final Logger LOGGER = LoggerFactory.getLogger(RedisCacheService.class); + private static final Logger LOGGER = LoggerFactory.getLogger(RedisCacheService.class); - // used to reduce key/data pollution of Redis instance so that the instance can keep itself somewhat clean. Default 1 month. - private final Duration maxKeyAge; - private final int maxAgeInSeconds; + // used to reduce key/data pollution of Redis instance so that the instance can keep itself somewhat clean. Default 1 month. + private final Duration maxKeyAge; + private final int maxAgeInSeconds; - // retain datasource so we can lazy load cache commands as needed - private RedisDataSource ds; - // string keys used as serialized keys are harder to do glob matches on - private KeyCommands<String> keyCommands; - // used for TTL and raw JSON storage w/o auto-conversion - private ValueCommands<String, String> stringCommands; - private Map<Class<?>, ValueCommands<String, ?>> singleCommands; - private Map<Class<?>, ListCommands<String, ?>> listCommands; + // retain datasource so we can lazy load cache commands as needed + private RedisDataSource ds; + // string keys used as serialized keys are harder to do glob matches on + private KeyCommands<String> keyCommands; + // used for TTL and raw JSON storage w/o auto-conversion + private ValueCommands<String, String> stringCommands; + private Map<Class<?>, ValueCommands<String, ?>> singleCommands; + private Map<Class<?>, ListCommands<String, ?>> listCommands; - private RedisCacheConfig config; - private final CachingService cache; + private RedisCacheConfig config; + private final CachingService cache; - /** - * Load the various command objects to interact with Redis, as well as the object mapper used for list caching calls. - * - * @param ds the Redis datasource object as created by the system - * @param mapper the Jackson mapper object for the application - */ - public RedisCacheService(RedisDataSource ds, RedisCacheConfig config, CachingService cache) { - this.ds = ds; - this.singleCommands = new ConcurrentHashMap<>(); - this.listCommands = new ConcurrentHashMap<>(); - this.keyCommands = ds.key(String.class); - this.stringCommands = ds.value(String.class); - this.config = config; - this.cache = cache; + /** + * Load the various command objects to interact with Redis, as well as the object mapper used for list caching calls. + * + * @param ds the Redis datasource object as created by the system + * @param mapper the Jackson mapper object for the application + */ + public RedisCacheService(RedisDataSource ds, RedisCacheConfig config, CachingService cache) { + this.ds = ds; + this.singleCommands = new ConcurrentHashMap<>(); + this.listCommands = new ConcurrentHashMap<>(); + this.keyCommands = ds.key(String.class); + this.stringCommands = ds.value(String.class); + this.config = config; + this.cache = cache; - // based on config at start, build the max age + shortcut for number of seconds in that duration - this.maxKeyAge = ChronoUnit.DAYS - .getDuration() - .multipliedBy(config.maxAge().getDays()) - .plus(ChronoUnit.MONTHS.getDuration().multipliedBy(config.maxAge().getMonths())) - .plus(ChronoUnit.YEARS.getDuration().multipliedBy(config.maxAge().getYears())); - this.maxAgeInSeconds = (int) this.maxKeyAge.toSeconds(); - } + // based on config at start, build the max age + shortcut for number of seconds in that duration + this.maxKeyAge = ChronoUnit.DAYS + .getDuration() + .multipliedBy(config.maxAge().getDays()) + .plus(ChronoUnit.MONTHS.getDuration().multipliedBy(config.maxAge().getMonths())) + .plus(ChronoUnit.YEARS.getDuration().multipliedBy(config.maxAge().getYears())); + this.maxAgeInSeconds = (int) this.maxKeyAge.toSeconds(); + } - @Override - public <T> CacheWrapper<T> get(String id, MultivaluedMap<String, String> params, Class<T> type, Callable<? extends T> callable) { - ValueCommands<String, T> cacheRef = getCommands(type); - // build the cache key for the given params - ParameterizedCacheKey k = ParameterizedCacheKeyBuilder.builder().id(id).clazz(type).params(params).build(); + @Override + public <T> CacheWrapper<T> get(String id, MultivaluedMap<String, String> params, Class<T> type, Callable<? extends T> callable) { + ValueCommands<String, T> cacheRef = getCommands(type); + // build the cache key for the given params + ParameterizedCacheKey k = ParameterizedCacheKeyBuilder.builder().id(id).clazz(type).params(params).build(); - // get the cached value + TTL if it isn't expired - LOGGER.trace("Checking redis cache for non-expired value for key {}", k); - CacheWrapper<T> wrapper; - try { - T value = cacheRef.get(k.toString()); - String ttl = stringCommands.get(getTtlKey(k)); - // build the default wrapper premptively - wrapper = CacheWrapperBuilder.<T>builder().data(Optional.ofNullable(value)).errorType(Optional.empty()).build(); - // if the value is empty or the TTL has expired, recalculate the value - if (value == null || ttl == null) { - // generate the value from the callable when needed - LOGGER.trace("Attempting to fetch fresh value for key {}", k); - value = callable.call(); + // get the cached value + TTL if it isn't expired + LOGGER.trace("Checking redis cache for non-expired value for key {}", k); + CacheWrapper<T> wrapper; + try { + T value = cacheRef.get(k.toString()); + String ttl = stringCommands.get(getTtlKey(k)); + // build the default wrapper premptively + wrapper = CacheWrapperBuilder.<T>builder().data(Optional.ofNullable(value)).errorType(Optional.empty()).build(); + // if the value is empty or the TTL has expired, recalculate the value + if (value == null || ttl == null) { + // generate the value from the callable when needed + LOGGER.trace("Attempting to fetch fresh value for key {}", k); + value = callable.call(); - // set the cache and the TTL entry for the entry. max age included on key to stop stagnant keys from polluting instance - cacheRef.setex(k.toString(), maxAgeInSeconds, value); - setTTL(k); + // set the cache and the TTL entry for the entry. max age included on key to stop stagnant keys from polluting instance + cacheRef.setex(k.toString(), maxAgeInSeconds, value); + setTTL(k); - // set the value once successfully sent to Redis into the returned wrapper - wrapper = CacheWrapperBuilder.<T>builder().data(Optional.ofNullable(value)).errorType(Optional.empty()).build(); - } - } catch (CompletionException e) { - if (ConnectException.class.isAssignableFrom(e.getCause().getClass())) { - LOGGER.warn("Unable to connect to Redis server, falling back to in memory cache for key {}", k); - // call in-memory cache as fallback - wrapper = cache.get(id, params, type, callable); - } else { - wrapper = CacheWrapperBuilder.<T>builder().data(Optional.empty()).errorType(Optional.of(e.getClass())).build(); - } - } catch (Exception e) { - // TODO: We probably want a way to stop oversending for this eventually - // catch the error and set into the wrapper - LOGGER.error("Error while creating cache entry for key {} and type {}", id, type.getSimpleName(), e); - wrapper = CacheWrapperBuilder.<T>builder().data(Optional.empty()).errorType(Optional.of(e.getClass())).build(); - } - return wrapper; + // set the value once successfully sent to Redis into the returned wrapper + wrapper = CacheWrapperBuilder.<T>builder().data(Optional.ofNullable(value)).errorType(Optional.empty()).build(); + } + } catch (CompletionException e) { + if (ConnectException.class.isAssignableFrom(e.getCause().getClass())) { + LOGGER.warn("Unable to connect to Redis server, falling back to in memory cache for key {}", k); + // call in-memory cache as fallback + wrapper = cache.get(id, params, type, callable); + } else { + wrapper = CacheWrapperBuilder.<T>builder().data(Optional.empty()).errorType(Optional.of(e.getClass())).build(); + } + } catch (Exception e) { + // TODO: We probably want a way to stop oversending for this eventually + // catch the error and set into the wrapper + LOGGER.error("Error while creating cache entry for key {} and type {}", id, type.getSimpleName(), e); + wrapper = CacheWrapperBuilder.<T>builder().data(Optional.empty()).errorType(Optional.of(e.getClass())).build(); } + return wrapper; + } - @Override - public <T> CacheWrapper<List<T>> getList(String id, MultivaluedMap<String, String> params, Class<T> type, - Callable<? extends List<T>> callable) { - ListCommands<String, T> cacheRef = getListCommands(type); - // build the cache key for the given params - ParameterizedCacheKey k = ParameterizedCacheKeyBuilder.builder().id(id).clazz(type).params(params).build(); + @Override + public <T> CacheWrapper<List<T>> getList(String id, MultivaluedMap<String, String> params, Class<T> type, + Callable<? extends List<T>> callable) { + ListCommands<String, T> cacheRef = getListCommands(type); + // build the cache key for the given params + ParameterizedCacheKey k = ParameterizedCacheKeyBuilder.builder().id(id).clazz(type).params(params).build(); - // get the cached value + TTL if it isn't expired - LOGGER.trace("Checking redis cache for non-expired value for key {}", k); - CacheWrapper<List<T>> wrapper; - try { - List<T> value = cacheRef.lrange(k.toString(), 0, -1); - String ttl = stringCommands.get(getTtlKey(k)); + // get the cached value + TTL if it isn't expired + LOGGER.trace("Checking redis cache for non-expired value for key {}", k); + CacheWrapper<List<T>> wrapper; + try { + List<T> value = cacheRef.lrange(k.toString(), 0, -1); + String ttl = stringCommands.get(getTtlKey(k)); - // build the wrapper around the value, fetching fresh values when necessary - wrapper = buildListCacheWrapper(k, value, ttl, cacheRef, callable); - } catch (CompletionException e) { - if (ConnectException.class.isAssignableFrom(e.getCause().getClass())) { - LOGGER.warn("Unable to connect to Redis server, falling back to in memory cache for key {}", k); - // call in-memory cache as fallback - wrapper = cache.get(id, params, type, callable); - } else { - LOGGER.error("Error while loading distributed cache results", e); - wrapper = CacheWrapperBuilder.<List<T>>builder().data(Optional.empty()).errorType(Optional.of(e.getClass())).build(); - } - } catch (Exception e) { - // TODO: We probably want a way to stop oversending for this eventually - // catch the error and set into the wrapper - LOGGER.error("Error while creating cache entry for key {}", k, e); - wrapper = CacheWrapperBuilder.<List<T>>builder().data(Optional.empty()).errorType(Optional.of(e.getClass())).build(); - } - return wrapper; + // build the wrapper around the value, fetching fresh values when necessary + wrapper = buildListCacheWrapper(k, value, ttl, cacheRef, callable); + } catch (CompletionException e) { + if (ConnectException.class.isAssignableFrom(e.getCause().getClass())) { + LOGGER.warn("Unable to connect to Redis server, falling back to in memory cache for key {}", k); + // call in-memory cache as fallback + wrapper = cache.get(id, params, type, callable); + } else { + LOGGER.error("Error while loading distributed cache results", e); + wrapper = CacheWrapperBuilder.<List<T>>builder().data(Optional.empty()).errorType(Optional.of(e.getClass())).build(); + } + } catch (Exception e) { + // TODO: We probably want a way to stop oversending for this eventually + // catch the error and set into the wrapper + LOGGER.error("Error while creating cache entry for key {}", k, e); + wrapper = CacheWrapperBuilder.<List<T>>builder().data(Optional.empty()).errorType(Optional.of(e.getClass())).build(); } + return wrapper; + } - @Override - public boolean remove(String id, MultivaluedMap<String, String> params, Class<?> rawType) { - return keyCommands.del(ParameterizedCacheKeyBuilder.builder().id(id).clazz(rawType).params(params).build().toString()) == 1; + @Override + public boolean remove(String id, MultivaluedMap<String, String> params, Class<?> rawType) { + ParameterizedCacheKey k = ParameterizedCacheKeyBuilder.builder().id(id).clazz(rawType).params(params).build(); + try { + // in case we had to fallback, also clear the in-memory cache + cache.remove(k); + // attempt to clear redis cache, + return keyCommands.del(k.toString()) == 1; + } catch (CompletionException e) { + if (ConnectException.class.isAssignableFrom(e.getCause().getClass())) { + LOGGER.warn("Unable to connect to Redis server, unable to guarantee cache clear for key {}", k); + } else { + LOGGER.error("Error while interacting with the distributed cache", e); + } } + return false; + } - @Override - public void removeFuzzy(String key, Class<?> rawType) { - // for each key that matches the type root, remove the data and return the deleted results - keyCommands - .del(keyCommands - .keys(ParameterizedCacheKeyBuilder.builder().id(key).clazz(rawType).params(null).build().toString() + "*") - .toArray(new String[] {})); + @Override + public void removeFuzzy(String key, Class<?> rawType) { + ParameterizedCacheKey k = ParameterizedCacheKeyBuilder.builder().id(key).clazz(rawType).params(null).build(); + try { + // in case we had to fallback, also clear the in-memory cache + cache.fuzzyRemove(key, rawType); + // for each key that matches the type root, remove the data and return the deleted results + keyCommands.del(keyCommands.keys(k.toString() + "*").toArray(new String[] {})); + } catch (CompletionException e) { + if (ConnectException.class.isAssignableFrom(e.getCause().getClass())) { + LOGGER.warn("Unable to connect to Redis server, unable to guarantee cache clear for key {}", k); + } else { + LOGGER.error("Error while interacting with the distributed cache", e); + } } + } - @Override - public void clearCache(Class<?> rawType) { - // take advantage of key structuring to do fuzzy key removal - removeFuzzy("", rawType); - } + @Override + public void clearCache(Class<?> rawType) { + // take advantage of key structuring to do fuzzy key removal + removeFuzzy("", rawType); + } - /** - * Standardize the TTL key for use in cache entries. - * - * @param k the cache key for the entry being recorded - * @return the TTL cache key for the given entry - */ - public String getTtlKey(ParameterizedCacheKey k) { - return k.toString() + ":ttl"; - } + /** + * Standardize the TTL key for use in cache entries. + * + * @param k the cache key for the entry being recorded + * @return the TTL cache key for the given entry + */ + public String getTtlKey(ParameterizedCacheKey k) { + return k.toString() + ":ttl"; + } - /** - * Using the existing cache data and TTL, check if the data is available and current. If the data is missing or stale, a new copy should - * be fetched, set to the distributed cache if possible, and returned. - * - * @param <T> the type of data being store in the list cache - * @param k the key associated with the cache entry - * @param value currently available Redis cache value - * @param ttl the projected TTL value for the current cache entry - * @param cacheRef the cache that contains the data for the current type - * @param callable passed function that will retrieve fresh values to populate cache if not present - * @return the wrapper either for the currently valid cache data, or - * @throws Exception if there is an error either interacting with the distributed cache, or an error in fetching the fresh data - */ - @SuppressWarnings("unchecked") - private <T> CacheWrapper<List<T>> buildListCacheWrapper(ParameterizedCacheKey k, List<T> value, String ttl, - ListCommands<String, T> cacheRef, Callable<? extends List<T>> callable) throws Exception { - CacheWrapper<List<T>> wrapper = CacheWrapperBuilder - .<List<T>>builder() - .data(Optional.ofNullable(value)) - .errorType(Optional.empty()) - .build(); - // if the value is empty or the TTL has expired, recalculate the value - if (value == null || ttl == null) { - // if expired, clean out old values first as there is no "set" command for lists - if (value != null) { - long count = cacheRef.llen(k.toString()); - // casting to int should be safe. If we have > 2b entries for a key we have bigger problems - cacheRef.rpop(k.toString(), (int) count); - } - // TTL set before value so that if we have some value, we don't overcall in error states - setTTL(k); + /** + * Using the existing cache data and TTL, check if the data is available and current. If the data is missing or stale, a new copy should + * be fetched, set to the distributed cache if possible, and returned. + * + * @param <T> the type of data being store in the list cache + * @param k the key associated with the cache entry + * @param value currently available Redis cache value + * @param ttl the projected TTL value for the current cache entry + * @param cacheRef the cache that contains the data for the current type + * @param callable passed function that will retrieve fresh values to populate cache if not present + * @return the wrapper either for the currently valid cache data, or + * @throws Exception if there is an error either interacting with the distributed cache, or an error in fetching the fresh data + */ + @SuppressWarnings("unchecked") + private <T> CacheWrapper<List<T>> buildListCacheWrapper(ParameterizedCacheKey k, List<T> value, String ttl, + ListCommands<String, T> cacheRef, Callable<? extends List<T>> callable) throws Exception { + CacheWrapper<List<T>> wrapper = CacheWrapperBuilder + .<List<T>>builder() + .data(Optional.ofNullable(value)) + .errorType(Optional.empty()) + .build(); + // if the value is empty or the TTL has expired, recalculate the value + if (value == null || ttl == null) { + // if expired, clean out old values first as there is no "set" command for lists + if (value != null) { + long count = cacheRef.llen(k.toString()); + // casting to int should be safe. If we have > 2b entries for a key we have bigger problems + cacheRef.rpop(k.toString(), (int) count); + } + // TTL set before value so that if we have some value, we don't overcall in error states + setTTL(k); - // generate the value from the callable when needed - LOGGER.trace("Attempting to fetch fresh value for key {}", k); - value = callable.call(); + // generate the value from the callable when needed + LOGGER.trace("Attempting to fetch fresh value for key {}", k); + value = callable.call(); - // set the cache and the TTL entry for the entry - cacheRef.rpush(k.toString(), value.toArray((T[]) Array.newInstance(k.clazz(), value.size()))); - // max age included on key, as this will stop stagnant keys from polluting instance - keyCommands.expire(k.toString(), maxKeyAge); + // set the cache and the TTL entry for the entry + cacheRef.rpush(k.toString(), value.toArray((T[]) Array.newInstance(k.clazz(), value.size()))); + // max age included on key, as this will stop stagnant keys from polluting instance + keyCommands.expire(k.toString(), maxKeyAge); - // set the value once successfully sent to Redis into the returned wrapper - wrapper = CacheWrapperBuilder.<List<T>>builder().data(Optional.ofNullable(value)).errorType(Optional.empty()).build(); - } - return wrapper; + // set the value once successfully sent to Redis into the returned wrapper + wrapper = CacheWrapperBuilder.<List<T>>builder().data(Optional.ofNullable(value)).errorType(Optional.empty()).build(); } + return wrapper; + } - /** - * Set the TTL key into the Redis cache to be able to handle expirations without immediate revocation. - * - * @param k the cache key of the item being cached - */ - private void setTTL(ParameterizedCacheKey k) { - stringCommands.setex(getTtlKey(k), config.ttl().toSeconds(), DateTimeHelper.toRFC3339(new Date())); - } + /** + * Set the TTL key into the Redis cache to be able to handle expirations without immediate revocation. + * + * @param k the cache key of the item being cached + */ + private void setTTL(ParameterizedCacheKey k) { + stringCommands.setex(getTtlKey(k), config.ttl().toSeconds(), DateTimeHelper.toRFC3339(new Date())); + } - /** - * Retrieve the access object for Redis with the serialization layer prepared for a given list type. - * - * @param <V> incoming rawtype of the inner datatype. - * @param rawType the class reference for the data type being accessed - * @return the Redis access command object used to manipulate cache data for a given type of lists. - */ - @SuppressWarnings("unchecked") - private <V> ValueCommands<String, V> getCommands(Class<V> rawType) { - // relies on the assumption that this is the only modifier to the map to ensure type-safety - return (ValueCommands<String, V>) singleCommands.computeIfAbsent(rawType, ds::value); - } + /** + * Retrieve the access object for Redis with the serialization layer prepared for a given list type. + * + * @param <V> incoming rawtype of the inner datatype. + * @param rawType the class reference for the data type being accessed + * @return the Redis access command object used to manipulate cache data for a given type of lists. + */ + @SuppressWarnings("unchecked") + private <V> ValueCommands<String, V> getCommands(Class<V> rawType) { + // relies on the assumption that this is the only modifier to the map to ensure type-safety + return (ValueCommands<String, V>) singleCommands.computeIfAbsent(rawType, ds::value); + } - /** - * Retrieve the access object for Redis with the serialization layer prepared for a given type. - * - * @param <V> incoming rawtype of the inner datatype. - * @param rawType the class reference for the data type being accessed - * @return the Redis access command object used to manipulate cache data for a given type. - */ - @SuppressWarnings("unchecked") - private <V> ListCommands<String, V> getListCommands(Class<V> rawType) { - // relies on the assumption that this is the only modifier to the map to ensure type-safety - return (ListCommands<String, V>) listCommands.computeIfAbsent(rawType, ds::list); - } + /** + * Retrieve the access object for Redis with the serialization layer prepared for a given type. + * + * @param <V> incoming rawtype of the inner datatype. + * @param rawType the class reference for the data type being accessed + * @return the Redis access command object used to manipulate cache data for a given type. + */ + @SuppressWarnings("unchecked") + private <V> ListCommands<String, V> getListCommands(Class<V> rawType) { + // relies on the assumption that this is the only modifier to the map to ensure type-safety + return (ListCommands<String, V>) listCommands.computeIfAbsent(rawType, ds::list); + } }