Skip to content
Snippets Groups Projects

fix(cache): Add try-catch for connection errors when clearing cache

Merged Martin Lowe requested to merge malowe/main/1.2.4-release into main
1 file
+ 224
206
Compare changes
  • Side-by-side
  • Inline
@@ -42,241 +42,259 @@ import jakarta.ws.rs.core.MultivaluedMap;
/**
* Custom Redis cache implementation. This interfaces with Redis through a client, as the cache layer can only be configured to run using
* caffeine or Redis, and not both at the same time for different regions.
*
* As Redis doesn't support lists natively and instead uses serialized concrete types, any place where we need list entities, we need to
* pre-serialize to a JSON string and store and retrieve that. While it adds small complexity, list support is critical to caching.
*/
@ApplicationScoped
public class RedisCacheService implements DistributedCacheService {
private static final Logger LOGGER = LoggerFactory.getLogger(RedisCacheService.class);
private static final Logger LOGGER = LoggerFactory.getLogger(RedisCacheService.class);
// used to reduce key/data pollution of Redis instance so that the instance can keep itself somewhat clean. Default 1 month.
private final Duration maxKeyAge;
private final int maxAgeInSeconds;
// used to reduce key/data pollution of Redis instance so that the instance can keep itself somewhat clean. Default 1 month.
private final Duration maxKeyAge;
private final int maxAgeInSeconds;
// retain datasource so we can lazy load cache commands as needed
private RedisDataSource ds;
// string keys used as serialized keys are harder to do glob matches on
private KeyCommands<String> keyCommands;
// used for TTL and raw JSON storage w/o auto-conversion
private ValueCommands<String, String> stringCommands;
private Map<Class<?>, ValueCommands<String, ?>> singleCommands;
private Map<Class<?>, ListCommands<String, ?>> listCommands;
// retain datasource so we can lazy load cache commands as needed
private RedisDataSource ds;
// string keys used as serialized keys are harder to do glob matches on
private KeyCommands<String> keyCommands;
// used for TTL and raw JSON storage w/o auto-conversion
private ValueCommands<String, String> stringCommands;
private Map<Class<?>, ValueCommands<String, ?>> singleCommands;
private Map<Class<?>, ListCommands<String, ?>> listCommands;
private RedisCacheConfig config;
private final CachingService cache;
private RedisCacheConfig config;
private final CachingService cache;
/**
* Load the various command objects to interact with Redis, as well as the object mapper used for list caching calls.
*
* @param ds the Redis datasource object as created by the system
* @param mapper the Jackson mapper object for the application
*/
public RedisCacheService(RedisDataSource ds, RedisCacheConfig config, CachingService cache) {
this.ds = ds;
this.singleCommands = new ConcurrentHashMap<>();
this.listCommands = new ConcurrentHashMap<>();
this.keyCommands = ds.key(String.class);
this.stringCommands = ds.value(String.class);
this.config = config;
this.cache = cache;
/**
* Load the various command objects to interact with Redis, as well as the object mapper used for list caching calls.
*
* @param ds the Redis datasource object as created by the system
* @param mapper the Jackson mapper object for the application
*/
public RedisCacheService(RedisDataSource ds, RedisCacheConfig config, CachingService cache) {
this.ds = ds;
this.singleCommands = new ConcurrentHashMap<>();
this.listCommands = new ConcurrentHashMap<>();
this.keyCommands = ds.key(String.class);
this.stringCommands = ds.value(String.class);
this.config = config;
this.cache = cache;
// based on config at start, build the max age + shortcut for number of seconds in that duration
this.maxKeyAge = ChronoUnit.DAYS
.getDuration()
.multipliedBy(config.maxAge().getDays())
.plus(ChronoUnit.MONTHS.getDuration().multipliedBy(config.maxAge().getMonths()))
.plus(ChronoUnit.YEARS.getDuration().multipliedBy(config.maxAge().getYears()));
this.maxAgeInSeconds = (int) this.maxKeyAge.toSeconds();
}
// based on config at start, build the max age + shortcut for number of seconds in that duration
this.maxKeyAge = ChronoUnit.DAYS
.getDuration()
.multipliedBy(config.maxAge().getDays())
.plus(ChronoUnit.MONTHS.getDuration().multipliedBy(config.maxAge().getMonths()))
.plus(ChronoUnit.YEARS.getDuration().multipliedBy(config.maxAge().getYears()));
this.maxAgeInSeconds = (int) this.maxKeyAge.toSeconds();
}
@Override
public <T> CacheWrapper<T> get(String id, MultivaluedMap<String, String> params, Class<T> type, Callable<? extends T> callable) {
ValueCommands<String, T> cacheRef = getCommands(type);
// build the cache key for the given params
ParameterizedCacheKey k = ParameterizedCacheKeyBuilder.builder().id(id).clazz(type).params(params).build();
@Override
public <T> CacheWrapper<T> get(String id, MultivaluedMap<String, String> params, Class<T> type, Callable<? extends T> callable) {
ValueCommands<String, T> cacheRef = getCommands(type);
// build the cache key for the given params
ParameterizedCacheKey k = ParameterizedCacheKeyBuilder.builder().id(id).clazz(type).params(params).build();
// get the cached value + TTL if it isn't expired
LOGGER.trace("Checking redis cache for non-expired value for key {}", k);
CacheWrapper<T> wrapper;
try {
T value = cacheRef.get(k.toString());
String ttl = stringCommands.get(getTtlKey(k));
// build the default wrapper premptively
wrapper = CacheWrapperBuilder.<T>builder().data(Optional.ofNullable(value)).errorType(Optional.empty()).build();
// if the value is empty or the TTL has expired, recalculate the value
if (value == null || ttl == null) {
// generate the value from the callable when needed
LOGGER.trace("Attempting to fetch fresh value for key {}", k);
value = callable.call();
// get the cached value + TTL if it isn't expired
LOGGER.trace("Checking redis cache for non-expired value for key {}", k);
CacheWrapper<T> wrapper;
try {
T value = cacheRef.get(k.toString());
String ttl = stringCommands.get(getTtlKey(k));
// build the default wrapper premptively
wrapper = CacheWrapperBuilder.<T>builder().data(Optional.ofNullable(value)).errorType(Optional.empty()).build();
// if the value is empty or the TTL has expired, recalculate the value
if (value == null || ttl == null) {
// generate the value from the callable when needed
LOGGER.trace("Attempting to fetch fresh value for key {}", k);
value = callable.call();
// set the cache and the TTL entry for the entry. max age included on key to stop stagnant keys from polluting instance
cacheRef.setex(k.toString(), maxAgeInSeconds, value);
setTTL(k);
// set the cache and the TTL entry for the entry. max age included on key to stop stagnant keys from polluting instance
cacheRef.setex(k.toString(), maxAgeInSeconds, value);
setTTL(k);
// set the value once successfully sent to Redis into the returned wrapper
wrapper = CacheWrapperBuilder.<T>builder().data(Optional.ofNullable(value)).errorType(Optional.empty()).build();
}
} catch (CompletionException e) {
if (ConnectException.class.isAssignableFrom(e.getCause().getClass())) {
LOGGER.warn("Unable to connect to Redis server, falling back to in memory cache for key {}", k);
// call in-memory cache as fallback
wrapper = cache.get(id, params, type, callable);
} else {
wrapper = CacheWrapperBuilder.<T>builder().data(Optional.empty()).errorType(Optional.of(e.getClass())).build();
}
} catch (Exception e) {
// TODO: We probably want a way to stop oversending for this eventually
// catch the error and set into the wrapper
LOGGER.error("Error while creating cache entry for key {} and type {}", id, type.getSimpleName(), e);
wrapper = CacheWrapperBuilder.<T>builder().data(Optional.empty()).errorType(Optional.of(e.getClass())).build();
}
return wrapper;
// set the value once successfully sent to Redis into the returned wrapper
wrapper = CacheWrapperBuilder.<T>builder().data(Optional.ofNullable(value)).errorType(Optional.empty()).build();
}
} catch (CompletionException e) {
if (ConnectException.class.isAssignableFrom(e.getCause().getClass())) {
LOGGER.warn("Unable to connect to Redis server, falling back to in memory cache for key {}", k);
// call in-memory cache as fallback
wrapper = cache.get(id, params, type, callable);
} else {
wrapper = CacheWrapperBuilder.<T>builder().data(Optional.empty()).errorType(Optional.of(e.getClass())).build();
}
} catch (Exception e) {
// TODO: We probably want a way to stop oversending for this eventually
// catch the error and set into the wrapper
LOGGER.error("Error while creating cache entry for key {} and type {}", id, type.getSimpleName(), e);
wrapper = CacheWrapperBuilder.<T>builder().data(Optional.empty()).errorType(Optional.of(e.getClass())).build();
}
return wrapper;
}
@Override
public <T> CacheWrapper<List<T>> getList(String id, MultivaluedMap<String, String> params, Class<T> type,
Callable<? extends List<T>> callable) {
ListCommands<String, T> cacheRef = getListCommands(type);
// build the cache key for the given params
ParameterizedCacheKey k = ParameterizedCacheKeyBuilder.builder().id(id).clazz(type).params(params).build();
@Override
public <T> CacheWrapper<List<T>> getList(String id, MultivaluedMap<String, String> params, Class<T> type,
Callable<? extends List<T>> callable) {
ListCommands<String, T> cacheRef = getListCommands(type);
// build the cache key for the given params
ParameterizedCacheKey k = ParameterizedCacheKeyBuilder.builder().id(id).clazz(type).params(params).build();
// get the cached value + TTL if it isn't expired
LOGGER.trace("Checking redis cache for non-expired value for key {}", k);
CacheWrapper<List<T>> wrapper;
try {
List<T> value = cacheRef.lrange(k.toString(), 0, -1);
String ttl = stringCommands.get(getTtlKey(k));
// get the cached value + TTL if it isn't expired
LOGGER.trace("Checking redis cache for non-expired value for key {}", k);
CacheWrapper<List<T>> wrapper;
try {
List<T> value = cacheRef.lrange(k.toString(), 0, -1);
String ttl = stringCommands.get(getTtlKey(k));
// build the wrapper around the value, fetching fresh values when necessary
wrapper = buildListCacheWrapper(k, value, ttl, cacheRef, callable);
} catch (CompletionException e) {
if (ConnectException.class.isAssignableFrom(e.getCause().getClass())) {
LOGGER.warn("Unable to connect to Redis server, falling back to in memory cache for key {}", k);
// call in-memory cache as fallback
wrapper = cache.get(id, params, type, callable);
} else {
LOGGER.error("Error while loading distributed cache results", e);
wrapper = CacheWrapperBuilder.<List<T>>builder().data(Optional.empty()).errorType(Optional.of(e.getClass())).build();
}
} catch (Exception e) {
// TODO: We probably want a way to stop oversending for this eventually
// catch the error and set into the wrapper
LOGGER.error("Error while creating cache entry for key {}", k, e);
wrapper = CacheWrapperBuilder.<List<T>>builder().data(Optional.empty()).errorType(Optional.of(e.getClass())).build();
}
return wrapper;
// build the wrapper around the value, fetching fresh values when necessary
wrapper = buildListCacheWrapper(k, value, ttl, cacheRef, callable);
} catch (CompletionException e) {
if (ConnectException.class.isAssignableFrom(e.getCause().getClass())) {
LOGGER.warn("Unable to connect to Redis server, falling back to in memory cache for key {}", k);
// call in-memory cache as fallback
wrapper = cache.get(id, params, type, callable);
} else {
LOGGER.error("Error while loading distributed cache results", e);
wrapper = CacheWrapperBuilder.<List<T>>builder().data(Optional.empty()).errorType(Optional.of(e.getClass())).build();
}
} catch (Exception e) {
// TODO: We probably want a way to stop oversending for this eventually
// catch the error and set into the wrapper
LOGGER.error("Error while creating cache entry for key {}", k, e);
wrapper = CacheWrapperBuilder.<List<T>>builder().data(Optional.empty()).errorType(Optional.of(e.getClass())).build();
}
return wrapper;
}
@Override
public boolean remove(String id, MultivaluedMap<String, String> params, Class<?> rawType) {
return keyCommands.del(ParameterizedCacheKeyBuilder.builder().id(id).clazz(rawType).params(params).build().toString()) == 1;
@Override
public boolean remove(String id, MultivaluedMap<String, String> params, Class<?> rawType) {
ParameterizedCacheKey k = ParameterizedCacheKeyBuilder.builder().id(id).clazz(rawType).params(params).build();
try {
// in case we had to fallback, also clear the in-memory cache
cache.remove(k);
// attempt to clear redis cache,
return keyCommands.del(k.toString()) == 1;
} catch (CompletionException e) {
if (ConnectException.class.isAssignableFrom(e.getCause().getClass())) {
LOGGER.warn("Unable to connect to Redis server, unable to guarantee cache clear for key {}", k);
} else {
LOGGER.error("Error while interacting with the distributed cache", e);
}
}
return false;
}
@Override
public void removeFuzzy(String key, Class<?> rawType) {
// for each key that matches the type root, remove the data and return the deleted results
keyCommands
.del(keyCommands
.keys(ParameterizedCacheKeyBuilder.builder().id(key).clazz(rawType).params(null).build().toString() + "*")
.toArray(new String[] {}));
@Override
public void removeFuzzy(String key, Class<?> rawType) {
ParameterizedCacheKey k = ParameterizedCacheKeyBuilder.builder().id(key).clazz(rawType).params(null).build();
try {
// in case we had to fallback, also clear the in-memory cache
cache.fuzzyRemove(key, rawType);
// for each key that matches the type root, remove the data and return the deleted results
keyCommands.del(keyCommands.keys(k.toString() + "*").toArray(new String[] {}));
} catch (CompletionException e) {
if (ConnectException.class.isAssignableFrom(e.getCause().getClass())) {
LOGGER.warn("Unable to connect to Redis server, unable to guarantee cache clear for key {}", k);
} else {
LOGGER.error("Error while interacting with the distributed cache", e);
}
}
}
@Override
public void clearCache(Class<?> rawType) {
// take advantage of key structuring to do fuzzy key removal
removeFuzzy("", rawType);
}
@Override
public void clearCache(Class<?> rawType) {
// take advantage of key structuring to do fuzzy key removal
removeFuzzy("", rawType);
}
/**
* Standardize the TTL key for use in cache entries.
*
* @param k the cache key for the entry being recorded
* @return the TTL cache key for the given entry
*/
public String getTtlKey(ParameterizedCacheKey k) {
return k.toString() + ":ttl";
}
/**
* Standardize the TTL key for use in cache entries.
*
* @param k the cache key for the entry being recorded
* @return the TTL cache key for the given entry
*/
public String getTtlKey(ParameterizedCacheKey k) {
return k.toString() + ":ttl";
}
/**
* Using the existing cache data and TTL, check if the data is available and current. If the data is missing or stale, a new copy should
* be fetched, set to the distributed cache if possible, and returned.
*
* @param <T> the type of data being store in the list cache
* @param k the key associated with the cache entry
* @param value currently available Redis cache value
* @param ttl the projected TTL value for the current cache entry
* @param cacheRef the cache that contains the data for the current type
* @param callable passed function that will retrieve fresh values to populate cache if not present
* @return the wrapper either for the currently valid cache data, or
* @throws Exception if there is an error either interacting with the distributed cache, or an error in fetching the fresh data
*/
@SuppressWarnings("unchecked")
private <T> CacheWrapper<List<T>> buildListCacheWrapper(ParameterizedCacheKey k, List<T> value, String ttl,
ListCommands<String, T> cacheRef, Callable<? extends List<T>> callable) throws Exception {
CacheWrapper<List<T>> wrapper = CacheWrapperBuilder
.<List<T>>builder()
.data(Optional.ofNullable(value))
.errorType(Optional.empty())
.build();
// if the value is empty or the TTL has expired, recalculate the value
if (value == null || ttl == null) {
// if expired, clean out old values first as there is no "set" command for lists
if (value != null) {
long count = cacheRef.llen(k.toString());
// casting to int should be safe. If we have > 2b entries for a key we have bigger problems
cacheRef.rpop(k.toString(), (int) count);
}
// TTL set before value so that if we have some value, we don't overcall in error states
setTTL(k);
/**
* Using the existing cache data and TTL, check if the data is available and current. If the data is missing or stale, a new copy should
* be fetched, set to the distributed cache if possible, and returned.
*
* @param <T> the type of data being store in the list cache
* @param k the key associated with the cache entry
* @param value currently available Redis cache value
* @param ttl the projected TTL value for the current cache entry
* @param cacheRef the cache that contains the data for the current type
* @param callable passed function that will retrieve fresh values to populate cache if not present
* @return the wrapper either for the currently valid cache data, or
* @throws Exception if there is an error either interacting with the distributed cache, or an error in fetching the fresh data
*/
@SuppressWarnings("unchecked")
private <T> CacheWrapper<List<T>> buildListCacheWrapper(ParameterizedCacheKey k, List<T> value, String ttl,
ListCommands<String, T> cacheRef, Callable<? extends List<T>> callable) throws Exception {
CacheWrapper<List<T>> wrapper = CacheWrapperBuilder
.<List<T>>builder()
.data(Optional.ofNullable(value))
.errorType(Optional.empty())
.build();
// if the value is empty or the TTL has expired, recalculate the value
if (value == null || ttl == null) {
// if expired, clean out old values first as there is no "set" command for lists
if (value != null) {
long count = cacheRef.llen(k.toString());
// casting to int should be safe. If we have > 2b entries for a key we have bigger problems
cacheRef.rpop(k.toString(), (int) count);
}
// TTL set before value so that if we have some value, we don't overcall in error states
setTTL(k);
// generate the value from the callable when needed
LOGGER.trace("Attempting to fetch fresh value for key {}", k);
value = callable.call();
// generate the value from the callable when needed
LOGGER.trace("Attempting to fetch fresh value for key {}", k);
value = callable.call();
// set the cache and the TTL entry for the entry
cacheRef.rpush(k.toString(), value.toArray((T[]) Array.newInstance(k.clazz(), value.size())));
// max age included on key, as this will stop stagnant keys from polluting instance
keyCommands.expire(k.toString(), maxKeyAge);
// set the cache and the TTL entry for the entry
cacheRef.rpush(k.toString(), value.toArray((T[]) Array.newInstance(k.clazz(), value.size())));
// max age included on key, as this will stop stagnant keys from polluting instance
keyCommands.expire(k.toString(), maxKeyAge);
// set the value once successfully sent to Redis into the returned wrapper
wrapper = CacheWrapperBuilder.<List<T>>builder().data(Optional.ofNullable(value)).errorType(Optional.empty()).build();
}
return wrapper;
// set the value once successfully sent to Redis into the returned wrapper
wrapper = CacheWrapperBuilder.<List<T>>builder().data(Optional.ofNullable(value)).errorType(Optional.empty()).build();
}
return wrapper;
}
/**
* Set the TTL key into the Redis cache to be able to handle expirations without immediate revocation.
*
* @param k the cache key of the item being cached
*/
private void setTTL(ParameterizedCacheKey k) {
stringCommands.setex(getTtlKey(k), config.ttl().toSeconds(), DateTimeHelper.toRFC3339(new Date()));
}
/**
* Set the TTL key into the Redis cache to be able to handle expirations without immediate revocation.
*
* @param k the cache key of the item being cached
*/
private void setTTL(ParameterizedCacheKey k) {
stringCommands.setex(getTtlKey(k), config.ttl().toSeconds(), DateTimeHelper.toRFC3339(new Date()));
}
/**
* Retrieve the access object for Redis with the serialization layer prepared for a given list type.
*
* @param <V> incoming rawtype of the inner datatype.
* @param rawType the class reference for the data type being accessed
* @return the Redis access command object used to manipulate cache data for a given type of lists.
*/
@SuppressWarnings("unchecked")
private <V> ValueCommands<String, V> getCommands(Class<V> rawType) {
// relies on the assumption that this is the only modifier to the map to ensure type-safety
return (ValueCommands<String, V>) singleCommands.computeIfAbsent(rawType, ds::value);
}
/**
* Retrieve the access object for Redis with the serialization layer prepared for a given list type.
*
* @param <V> incoming rawtype of the inner datatype.
* @param rawType the class reference for the data type being accessed
* @return the Redis access command object used to manipulate cache data for a given type of lists.
*/
@SuppressWarnings("unchecked")
private <V> ValueCommands<String, V> getCommands(Class<V> rawType) {
// relies on the assumption that this is the only modifier to the map to ensure type-safety
return (ValueCommands<String, V>) singleCommands.computeIfAbsent(rawType, ds::value);
}
/**
* Retrieve the access object for Redis with the serialization layer prepared for a given type.
*
* @param <V> incoming rawtype of the inner datatype.
* @param rawType the class reference for the data type being accessed
* @return the Redis access command object used to manipulate cache data for a given type.
*/
@SuppressWarnings("unchecked")
private <V> ListCommands<String, V> getListCommands(Class<V> rawType) {
// relies on the assumption that this is the only modifier to the map to ensure type-safety
return (ListCommands<String, V>) listCommands.computeIfAbsent(rawType, ds::list);
}
/**
* Retrieve the access object for Redis with the serialization layer prepared for a given type.
*
* @param <V> incoming rawtype of the inner datatype.
* @param rawType the class reference for the data type being accessed
* @return the Redis access command object used to manipulate cache data for a given type.
*/
@SuppressWarnings("unchecked")
private <V> ListCommands<String, V> getListCommands(Class<V> rawType) {
// relies on the assumption that this is the only modifier to the map to ensure type-safety
return (ListCommands<String, V>) listCommands.computeIfAbsent(rawType, ds::list);
}
}
Loading