From 0ba09475b249bd3ebf8ab83eb7aebc23d36a1353 Mon Sep 17 00:00:00 2001
From: Martin Lowe <martin.lowe@eclipse-foundation.org>
Date: Mon, 7 Apr 2025 13:05:30 -0400
Subject: [PATCH] fix(cache): Add try-catch for connection errors when clearing
 cache

Previous code had no error handling in the case of Redis service being
unavailable, which made the cache less reliable. Additionally, there was
no cleaing of the underlying in-memory cache for fallback data, which
could result in data not being properly cleared.
---
 .../service/impl/RedisCacheService.java       | 430 +++++++++---------
 1 file changed, 224 insertions(+), 206 deletions(-)

diff --git a/caching/src/main/java/org/eclipsefoundation/caching/service/impl/RedisCacheService.java b/caching/src/main/java/org/eclipsefoundation/caching/service/impl/RedisCacheService.java
index 1c2c0eb..35beb4c 100644
--- a/caching/src/main/java/org/eclipsefoundation/caching/service/impl/RedisCacheService.java
+++ b/caching/src/main/java/org/eclipsefoundation/caching/service/impl/RedisCacheService.java
@@ -42,241 +42,259 @@ import jakarta.ws.rs.core.MultivaluedMap;
 /**
  * Custom Redis cache implementation. This interfaces with Redis through a client, as the cache layer can only be configured to run using
  * caffeine or Redis, and not both at the same time for different regions.
- * 
- * As Redis doesn't support lists natively and instead uses serialized concrete types, any place where we need list entities, we need to
- * pre-serialize to a JSON string and store and retrieve that. While it adds small complexity, list support is critical to caching.
  */
 @ApplicationScoped
 public class RedisCacheService implements DistributedCacheService {
-    private static final Logger LOGGER = LoggerFactory.getLogger(RedisCacheService.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(RedisCacheService.class);
 
-    // used to reduce key/data pollution of Redis instance so that the instance can keep itself somewhat clean. Default 1 month.
-    private final Duration maxKeyAge;
-    private final int maxAgeInSeconds;
+  // used to reduce key/data pollution of Redis instance so that the instance can keep itself somewhat clean. Default 1 month.
+  private final Duration maxKeyAge;
+  private final int maxAgeInSeconds;
 
-    // retain datasource so we can lazy load cache commands as needed
-    private RedisDataSource ds;
-    // string keys used as serialized keys are harder to do glob matches on
-    private KeyCommands<String> keyCommands;
-    // used for TTL and raw JSON storage w/o auto-conversion
-    private ValueCommands<String, String> stringCommands;
-    private Map<Class<?>, ValueCommands<String, ?>> singleCommands;
-    private Map<Class<?>, ListCommands<String, ?>> listCommands;
+  // retain datasource so we can lazy load cache commands as needed
+  private RedisDataSource ds;
+  // string keys used as serialized keys are harder to do glob matches on
+  private KeyCommands<String> keyCommands;
+  // used for TTL and raw JSON storage w/o auto-conversion
+  private ValueCommands<String, String> stringCommands;
+  private Map<Class<?>, ValueCommands<String, ?>> singleCommands;
+  private Map<Class<?>, ListCommands<String, ?>> listCommands;
 
-    private RedisCacheConfig config;
-    private final CachingService cache;
+  private RedisCacheConfig config;
+  private final CachingService cache;
 
-    /**
-     * Load the various command objects to interact with Redis, as well as the object mapper used for list caching calls.
-     * 
-     * @param ds the Redis datasource object as created by the system
-     * @param mapper the Jackson mapper object for the application
-     */
-    public RedisCacheService(RedisDataSource ds, RedisCacheConfig config, CachingService cache) {
-        this.ds = ds;
-        this.singleCommands = new ConcurrentHashMap<>();
-        this.listCommands = new ConcurrentHashMap<>();
-        this.keyCommands = ds.key(String.class);
-        this.stringCommands = ds.value(String.class);
-        this.config = config;
-        this.cache = cache;
+  /**
+   * Load the various command objects to interact with Redis, as well as the object mapper used for list caching calls.
+   * 
+   * @param ds the Redis datasource object as created by the system
+   * @param mapper the Jackson mapper object for the application
+   */
+  public RedisCacheService(RedisDataSource ds, RedisCacheConfig config, CachingService cache) {
+    this.ds = ds;
+    this.singleCommands = new ConcurrentHashMap<>();
+    this.listCommands = new ConcurrentHashMap<>();
+    this.keyCommands = ds.key(String.class);
+    this.stringCommands = ds.value(String.class);
+    this.config = config;
+    this.cache = cache;
 
-        // based on config at start, build the max age + shortcut for number of seconds in that duration
-        this.maxKeyAge = ChronoUnit.DAYS
-                .getDuration()
-                .multipliedBy(config.maxAge().getDays())
-                .plus(ChronoUnit.MONTHS.getDuration().multipliedBy(config.maxAge().getMonths()))
-                .plus(ChronoUnit.YEARS.getDuration().multipliedBy(config.maxAge().getYears()));
-        this.maxAgeInSeconds = (int) this.maxKeyAge.toSeconds();
-    }
+    // based on config at start, build the max age + shortcut for number of seconds in that duration
+    this.maxKeyAge = ChronoUnit.DAYS
+        .getDuration()
+        .multipliedBy(config.maxAge().getDays())
+        .plus(ChronoUnit.MONTHS.getDuration().multipliedBy(config.maxAge().getMonths()))
+        .plus(ChronoUnit.YEARS.getDuration().multipliedBy(config.maxAge().getYears()));
+    this.maxAgeInSeconds = (int) this.maxKeyAge.toSeconds();
+  }
 
-    @Override
-    public <T> CacheWrapper<T> get(String id, MultivaluedMap<String, String> params, Class<T> type, Callable<? extends T> callable) {
-        ValueCommands<String, T> cacheRef = getCommands(type);
-        // build the cache key for the given params
-        ParameterizedCacheKey k = ParameterizedCacheKeyBuilder.builder().id(id).clazz(type).params(params).build();
+  @Override
+  public <T> CacheWrapper<T> get(String id, MultivaluedMap<String, String> params, Class<T> type, Callable<? extends T> callable) {
+    ValueCommands<String, T> cacheRef = getCommands(type);
+    // build the cache key for the given params
+    ParameterizedCacheKey k = ParameterizedCacheKeyBuilder.builder().id(id).clazz(type).params(params).build();
 
-        // get the cached value + TTL if it isn't expired
-        LOGGER.trace("Checking redis cache for non-expired value for key {}", k);
-        CacheWrapper<T> wrapper;
-        try {
-            T value = cacheRef.get(k.toString());
-            String ttl = stringCommands.get(getTtlKey(k));
-            // build the default wrapper premptively
-            wrapper = CacheWrapperBuilder.<T>builder().data(Optional.ofNullable(value)).errorType(Optional.empty()).build();
-            // if the value is empty or the TTL has expired, recalculate the value
-            if (value == null || ttl == null) {
-                // generate the value from the callable when needed
-                LOGGER.trace("Attempting to fetch fresh value for key {}", k);
-                value = callable.call();
+    // get the cached value + TTL if it isn't expired
+    LOGGER.trace("Checking redis cache for non-expired value for key {}", k);
+    CacheWrapper<T> wrapper;
+    try {
+      T value = cacheRef.get(k.toString());
+      String ttl = stringCommands.get(getTtlKey(k));
+      // build the default wrapper premptively
+      wrapper = CacheWrapperBuilder.<T>builder().data(Optional.ofNullable(value)).errorType(Optional.empty()).build();
+      // if the value is empty or the TTL has expired, recalculate the value
+      if (value == null || ttl == null) {
+        // generate the value from the callable when needed
+        LOGGER.trace("Attempting to fetch fresh value for key {}", k);
+        value = callable.call();
 
-                // set the cache and the TTL entry for the entry. max age included on key to stop stagnant keys from polluting instance
-                cacheRef.setex(k.toString(), maxAgeInSeconds, value);
-                setTTL(k);
+        // set the cache and the TTL entry for the entry. max age included on key to stop stagnant keys from polluting instance
+        cacheRef.setex(k.toString(), maxAgeInSeconds, value);
+        setTTL(k);
 
-                // set the value once successfully sent to Redis into the returned wrapper
-                wrapper = CacheWrapperBuilder.<T>builder().data(Optional.ofNullable(value)).errorType(Optional.empty()).build();
-            }
-        } catch (CompletionException e) {
-            if (ConnectException.class.isAssignableFrom(e.getCause().getClass())) {
-                LOGGER.warn("Unable to connect to Redis server, falling back to in memory cache for key {}", k);
-                // call in-memory cache as fallback
-                wrapper = cache.get(id, params, type, callable);
-            } else {
-                wrapper = CacheWrapperBuilder.<T>builder().data(Optional.empty()).errorType(Optional.of(e.getClass())).build();
-            }
-        } catch (Exception e) {
-            // TODO: We probably want a way to stop oversending for this eventually
-            // catch the error and set into the wrapper
-            LOGGER.error("Error while creating cache entry for key {} and type {}", id, type.getSimpleName(), e);
-            wrapper = CacheWrapperBuilder.<T>builder().data(Optional.empty()).errorType(Optional.of(e.getClass())).build();
-        }
-        return wrapper;
+        // set the value once successfully sent to Redis into the returned wrapper
+        wrapper = CacheWrapperBuilder.<T>builder().data(Optional.ofNullable(value)).errorType(Optional.empty()).build();
+      }
+    } catch (CompletionException e) {
+      if (ConnectException.class.isAssignableFrom(e.getCause().getClass())) {
+        LOGGER.warn("Unable to connect to Redis server, falling back to in memory cache for key {}", k);
+        // call in-memory cache as fallback
+        wrapper = cache.get(id, params, type, callable);
+      } else {
+        wrapper = CacheWrapperBuilder.<T>builder().data(Optional.empty()).errorType(Optional.of(e.getClass())).build();
+      }
+    } catch (Exception e) {
+      // TODO: We probably want a way to stop oversending for this eventually
+      // catch the error and set into the wrapper
+      LOGGER.error("Error while creating cache entry for key {} and type {}", id, type.getSimpleName(), e);
+      wrapper = CacheWrapperBuilder.<T>builder().data(Optional.empty()).errorType(Optional.of(e.getClass())).build();
     }
+    return wrapper;
+  }
 
-    @Override
-    public <T> CacheWrapper<List<T>> getList(String id, MultivaluedMap<String, String> params, Class<T> type,
-            Callable<? extends List<T>> callable) {
-        ListCommands<String, T> cacheRef = getListCommands(type);
-        // build the cache key for the given params
-        ParameterizedCacheKey k = ParameterizedCacheKeyBuilder.builder().id(id).clazz(type).params(params).build();
+  @Override
+  public <T> CacheWrapper<List<T>> getList(String id, MultivaluedMap<String, String> params, Class<T> type,
+      Callable<? extends List<T>> callable) {
+    ListCommands<String, T> cacheRef = getListCommands(type);
+    // build the cache key for the given params
+    ParameterizedCacheKey k = ParameterizedCacheKeyBuilder.builder().id(id).clazz(type).params(params).build();
 
-        // get the cached value + TTL if it isn't expired
-        LOGGER.trace("Checking redis cache for non-expired value for key {}", k);
-        CacheWrapper<List<T>> wrapper;
-        try {
-            List<T> value = cacheRef.lrange(k.toString(), 0, -1);
-            String ttl = stringCommands.get(getTtlKey(k));
+    // get the cached value + TTL if it isn't expired
+    LOGGER.trace("Checking redis cache for non-expired value for key {}", k);
+    CacheWrapper<List<T>> wrapper;
+    try {
+      List<T> value = cacheRef.lrange(k.toString(), 0, -1);
+      String ttl = stringCommands.get(getTtlKey(k));
 
-            // build the wrapper around the value, fetching fresh values when necessary
-            wrapper = buildListCacheWrapper(k, value, ttl, cacheRef, callable);
-        } catch (CompletionException e) {
-            if (ConnectException.class.isAssignableFrom(e.getCause().getClass())) {
-                LOGGER.warn("Unable to connect to Redis server, falling back to in memory cache for key {}", k);
-                // call in-memory cache as fallback
-                wrapper = cache.get(id, params, type, callable);
-            } else {
-                LOGGER.error("Error while loading distributed cache results", e);
-                wrapper = CacheWrapperBuilder.<List<T>>builder().data(Optional.empty()).errorType(Optional.of(e.getClass())).build();
-            }
-        } catch (Exception e) {
-            // TODO: We probably want a way to stop oversending for this eventually
-            // catch the error and set into the wrapper
-            LOGGER.error("Error while creating cache entry for key {}", k, e);
-            wrapper = CacheWrapperBuilder.<List<T>>builder().data(Optional.empty()).errorType(Optional.of(e.getClass())).build();
-        }
-        return wrapper;
+      // build the wrapper around the value, fetching fresh values when necessary
+      wrapper = buildListCacheWrapper(k, value, ttl, cacheRef, callable);
+    } catch (CompletionException e) {
+      if (ConnectException.class.isAssignableFrom(e.getCause().getClass())) {
+        LOGGER.warn("Unable to connect to Redis server, falling back to in memory cache for key {}", k);
+        // call in-memory cache as fallback
+        wrapper = cache.get(id, params, type, callable);
+      } else {
+        LOGGER.error("Error while loading distributed cache results", e);
+        wrapper = CacheWrapperBuilder.<List<T>>builder().data(Optional.empty()).errorType(Optional.of(e.getClass())).build();
+      }
+    } catch (Exception e) {
+      // TODO: We probably want a way to stop oversending for this eventually
+      // catch the error and set into the wrapper
+      LOGGER.error("Error while creating cache entry for key {}", k, e);
+      wrapper = CacheWrapperBuilder.<List<T>>builder().data(Optional.empty()).errorType(Optional.of(e.getClass())).build();
     }
+    return wrapper;
+  }
 
-    @Override
-    public boolean remove(String id, MultivaluedMap<String, String> params, Class<?> rawType) {
-        return keyCommands.del(ParameterizedCacheKeyBuilder.builder().id(id).clazz(rawType).params(params).build().toString()) == 1;
+  @Override
+  public boolean remove(String id, MultivaluedMap<String, String> params, Class<?> rawType) {
+    ParameterizedCacheKey k = ParameterizedCacheKeyBuilder.builder().id(id).clazz(rawType).params(params).build();
+    try {
+      // in case we had to fallback, also clear the in-memory cache
+      cache.remove(k);
+      // attempt to clear redis cache,
+      return keyCommands.del(k.toString()) == 1;
+    } catch (CompletionException e) {
+      if (ConnectException.class.isAssignableFrom(e.getCause().getClass())) {
+        LOGGER.warn("Unable to connect to Redis server, unable to guarantee cache clear for key {}", k);
+      } else {
+        LOGGER.error("Error while interacting with the distributed cache", e);
+      }
     }
+    return false;
+  }
 
-    @Override
-    public void removeFuzzy(String key, Class<?> rawType) {
-        // for each key that matches the type root, remove the data and return the deleted results
-        keyCommands
-                .del(keyCommands
-                        .keys(ParameterizedCacheKeyBuilder.builder().id(key).clazz(rawType).params(null).build().toString() + "*")
-                        .toArray(new String[] {}));
+  @Override
+  public void removeFuzzy(String key, Class<?> rawType) {
+    ParameterizedCacheKey k = ParameterizedCacheKeyBuilder.builder().id(key).clazz(rawType).params(null).build();
+    try {
+      // in case we had to fallback, also clear the in-memory cache
+      cache.fuzzyRemove(key, rawType);
+      // for each key that matches the type root, remove the data and return the deleted results
+      keyCommands.del(keyCommands.keys(k.toString() + "*").toArray(new String[] {}));
+    } catch (CompletionException e) {
+      if (ConnectException.class.isAssignableFrom(e.getCause().getClass())) {
+        LOGGER.warn("Unable to connect to Redis server, unable to guarantee cache clear for key {}", k);
+      } else {
+        LOGGER.error("Error while interacting with the distributed cache", e);
+      }
     }
+  }
 
-    @Override
-    public void clearCache(Class<?> rawType) {
-        // take advantage of key structuring to do fuzzy key removal
-        removeFuzzy("", rawType);
-    }
+  @Override
+  public void clearCache(Class<?> rawType) {
+    // take advantage of key structuring to do fuzzy key removal
+    removeFuzzy("", rawType);
+  }
 
-    /**
-     * Standardize the TTL key for use in cache entries.
-     * 
-     * @param k the cache key for the entry being recorded
-     * @return the TTL cache key for the given entry
-     */
-    public String getTtlKey(ParameterizedCacheKey k) {
-        return k.toString() + ":ttl";
-    }
+  /**
+   * Standardize the TTL key for use in cache entries.
+   * 
+   * @param k the cache key for the entry being recorded
+   * @return the TTL cache key for the given entry
+   */
+  public String getTtlKey(ParameterizedCacheKey k) {
+    return k.toString() + ":ttl";
+  }
 
-    /**
-     * Using the existing cache data and TTL, check if the data is available and current. If the data is missing or stale, a new copy should
-     * be fetched, set to the distributed cache if possible, and returned.
-     * 
-     * @param <T> the type of data being store in the list cache
-     * @param k the key associated with the cache entry
-     * @param value currently available Redis cache value
-     * @param ttl the projected TTL value for the current cache entry
-     * @param cacheRef the cache that contains the data for the current type
-     * @param callable passed function that will retrieve fresh values to populate cache if not present
-     * @return the wrapper either for the currently valid cache data, or
-     * @throws Exception if there is an error either interacting with the distributed cache, or an error in fetching the fresh data
-     */
-    @SuppressWarnings("unchecked")
-    private <T> CacheWrapper<List<T>> buildListCacheWrapper(ParameterizedCacheKey k, List<T> value, String ttl,
-            ListCommands<String, T> cacheRef, Callable<? extends List<T>> callable) throws Exception {
-        CacheWrapper<List<T>> wrapper = CacheWrapperBuilder
-                .<List<T>>builder()
-                .data(Optional.ofNullable(value))
-                .errorType(Optional.empty())
-                .build();
-        // if the value is empty or the TTL has expired, recalculate the value
-        if (value == null || ttl == null) {
-            // if expired, clean out old values first as there is no "set" command for lists
-            if (value != null) {
-                long count = cacheRef.llen(k.toString());
-                // casting to int should be safe. If we have > 2b entries for a key we have bigger problems
-                cacheRef.rpop(k.toString(), (int) count);
-            }
-            // TTL set before value so that if we have some value, we don't overcall in error states
-            setTTL(k);
+  /**
+   * Using the existing cache data and TTL, check if the data is available and current. If the data is missing or stale, a new copy should
+   * be fetched, set to the distributed cache if possible, and returned.
+   * 
+   * @param <T> the type of data being store in the list cache
+   * @param k the key associated with the cache entry
+   * @param value currently available Redis cache value
+   * @param ttl the projected TTL value for the current cache entry
+   * @param cacheRef the cache that contains the data for the current type
+   * @param callable passed function that will retrieve fresh values to populate cache if not present
+   * @return the wrapper either for the currently valid cache data, or
+   * @throws Exception if there is an error either interacting with the distributed cache, or an error in fetching the fresh data
+   */
+  @SuppressWarnings("unchecked")
+  private <T> CacheWrapper<List<T>> buildListCacheWrapper(ParameterizedCacheKey k, List<T> value, String ttl,
+      ListCommands<String, T> cacheRef, Callable<? extends List<T>> callable) throws Exception {
+    CacheWrapper<List<T>> wrapper = CacheWrapperBuilder
+        .<List<T>>builder()
+        .data(Optional.ofNullable(value))
+        .errorType(Optional.empty())
+        .build();
+    // if the value is empty or the TTL has expired, recalculate the value
+    if (value == null || ttl == null) {
+      // if expired, clean out old values first as there is no "set" command for lists
+      if (value != null) {
+        long count = cacheRef.llen(k.toString());
+        // casting to int should be safe. If we have > 2b entries for a key we have bigger problems
+        cacheRef.rpop(k.toString(), (int) count);
+      }
+      // TTL set before value so that if we have some value, we don't overcall in error states
+      setTTL(k);
 
-            // generate the value from the callable when needed
-            LOGGER.trace("Attempting to fetch fresh value for key {}", k);
-            value = callable.call();
+      // generate the value from the callable when needed
+      LOGGER.trace("Attempting to fetch fresh value for key {}", k);
+      value = callable.call();
 
-            // set the cache and the TTL entry for the entry
-            cacheRef.rpush(k.toString(), value.toArray((T[]) Array.newInstance(k.clazz(), value.size())));
-            // max age included on key, as this will stop stagnant keys from polluting instance
-            keyCommands.expire(k.toString(), maxKeyAge);
+      // set the cache and the TTL entry for the entry
+      cacheRef.rpush(k.toString(), value.toArray((T[]) Array.newInstance(k.clazz(), value.size())));
+      // max age included on key, as this will stop stagnant keys from polluting instance
+      keyCommands.expire(k.toString(), maxKeyAge);
 
-            // set the value once successfully sent to Redis into the returned wrapper
-            wrapper = CacheWrapperBuilder.<List<T>>builder().data(Optional.ofNullable(value)).errorType(Optional.empty()).build();
-        }
-        return wrapper;
+      // set the value once successfully sent to Redis into the returned wrapper
+      wrapper = CacheWrapperBuilder.<List<T>>builder().data(Optional.ofNullable(value)).errorType(Optional.empty()).build();
     }
+    return wrapper;
+  }
 
-    /**
-     * Set the TTL key into the Redis cache to be able to handle expirations without immediate revocation.
-     * 
-     * @param k the cache key of the item being cached
-     */
-    private void setTTL(ParameterizedCacheKey k) {
-        stringCommands.setex(getTtlKey(k), config.ttl().toSeconds(), DateTimeHelper.toRFC3339(new Date()));
-    }
+  /**
+   * Set the TTL key into the Redis cache to be able to handle expirations without immediate revocation.
+   * 
+   * @param k the cache key of the item being cached
+   */
+  private void setTTL(ParameterizedCacheKey k) {
+    stringCommands.setex(getTtlKey(k), config.ttl().toSeconds(), DateTimeHelper.toRFC3339(new Date()));
+  }
 
-    /**
-     * Retrieve the access object for Redis with the serialization layer prepared for a given list type.
-     *
-     * @param <V> incoming rawtype of the inner datatype.
-     * @param rawType the class reference for the data type being accessed
-     * @return the Redis access command object used to manipulate cache data for a given type of lists.
-     */
-    @SuppressWarnings("unchecked")
-    private <V> ValueCommands<String, V> getCommands(Class<V> rawType) {
-        // relies on the assumption that this is the only modifier to the map to ensure type-safety
-        return (ValueCommands<String, V>) singleCommands.computeIfAbsent(rawType, ds::value);
-    }
+  /**
+   * Retrieve the access object for Redis with the serialization layer prepared for a given list type.
+   *
+   * @param <V> incoming rawtype of the inner datatype.
+   * @param rawType the class reference for the data type being accessed
+   * @return the Redis access command object used to manipulate cache data for a given type of lists.
+   */
+  @SuppressWarnings("unchecked")
+  private <V> ValueCommands<String, V> getCommands(Class<V> rawType) {
+    // relies on the assumption that this is the only modifier to the map to ensure type-safety
+    return (ValueCommands<String, V>) singleCommands.computeIfAbsent(rawType, ds::value);
+  }
 
-    /**
-     * Retrieve the access object for Redis with the serialization layer prepared for a given type.
-     *
-     * @param <V> incoming rawtype of the inner datatype.
-     * @param rawType the class reference for the data type being accessed
-     * @return the Redis access command object used to manipulate cache data for a given type.
-     */
-    @SuppressWarnings("unchecked")
-    private <V> ListCommands<String, V> getListCommands(Class<V> rawType) {
-        // relies on the assumption that this is the only modifier to the map to ensure type-safety
-        return (ListCommands<String, V>) listCommands.computeIfAbsent(rawType, ds::list);
-    }
+  /**
+   * Retrieve the access object for Redis with the serialization layer prepared for a given type.
+   *
+   * @param <V> incoming rawtype of the inner datatype.
+   * @param rawType the class reference for the data type being accessed
+   * @return the Redis access command object used to manipulate cache data for a given type.
+   */
+  @SuppressWarnings("unchecked")
+  private <V> ListCommands<String, V> getListCommands(Class<V> rawType) {
+    // relies on the assumption that this is the only modifier to the map to ensure type-safety
+    return (ListCommands<String, V>) listCommands.computeIfAbsent(rawType, ds::list);
+  }
 }
-- 
GitLab