This is an automated email from the ASF dual-hosted git repository.

jmckenzie pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new eae7b9c  Actively update auth caches in the background
eae7b9c is described below

commit eae7b9c3ade386f28c5f0c7ee015b0d0445388ac
Author: Blake Eggleston <bdeggles...@gmail.com>
AuthorDate: Wed Sep 15 10:52:46 2021 -0400

    Actively update auth caches in the background
    
    Patch by Blake Eggleston; reviewed by Sam Tunnicliffe, Jason Brown, and 
Caleb Rackliffe for CASSANDRA-16957
    
    Co-authored-by: Blake Eggleston <bdeggles...@gmail.com>
    Co-authored-by: Josh McKenzie <jmcken...@apache.org>
---
 CHANGES.txt                                        |   1 +
 conf/cassandra.yaml                                |  33 +++++
 src/java/org/apache/cassandra/auth/AuthCache.java  | 157 +++++++++++++++++----
 .../org/apache/cassandra/auth/AuthCacheMBean.java  |   4 +
 .../org/apache/cassandra/auth/CacheRefresher.java  |  94 ++++++++++++
 .../cassandra/auth/NetworkPermissionsCache.java    |   2 +
 .../cassandra/auth/PasswordAuthenticator.java      |  39 ++++-
 .../apache/cassandra/auth/PermissionsCache.java    |   2 +
 src/java/org/apache/cassandra/auth/RolesCache.java |   2 +
 .../cassandra/auth/jmx/AuthorizationProxy.java     |   2 +
 src/java/org/apache/cassandra/config/Config.java   |   3 +
 .../cassandra/config/DatabaseDescriptor.java       |  30 ++++
 .../apache/cassandra/service/StorageService.java   |   5 +
 .../org/apache/cassandra/auth/AuthCacheTest.java   |   2 +
 .../apache/cassandra/auth/CacheRefresherTest.java  |  88 ++++++++++++
 15 files changed, 429 insertions(+), 35 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index f436917..1cfb1fe 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.1
+ * Actively update auth cache in the background (CASSANDRA-16957)
  * Add unix time conversion functions (CASSANDRA-17029)
  * JVMStabilityInspector.forceHeapSpaceOomMaybe should handle all non-heap 
OOMs rather than only supporting direct only (CASSANDRA-17128)
  * Forbid other Future implementations with checkstyle (CASSANDRA-17055)
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 92a91c6..12221ca 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -170,6 +170,8 @@ network_authorizer: AllowAllNetworkAuthorizer
 # after the period specified here, become eligible for (async) reload.
 # Defaults to 2000, set to 0 to disable caching entirely.
 # Will be disabled automatically for AllowAllAuthenticator.
+# For a long-running cache using roles_cache_active_update, consider
+# setting to something longer such as a daily validation: 86400000
 roles_validity_in_ms: 2000
 
 # Refresh interval for roles cache (if enabled).
@@ -177,13 +179,24 @@ roles_validity_in_ms: 2000
 # access, an async reload is scheduled and the old value returned until it
 # completes. If roles_validity_in_ms is non-zero, then this must be
 # also.
+# This setting is also used to inform the interval of auto-updating if
+# using roles_cache_active_update.
 # Defaults to the same value as roles_validity_in_ms.
+# For a long-running cache, consider setting this to 60000 (1 hour) etc.
 # roles_update_interval_in_ms: 2000
 
+# If true, cache contents are actively updated by a background task at the
+# interval set by roles_update_interval_in_ms. If false, cache entries
+# become eligible for refresh after their update interval. Upon next access,
+# an async reload is scheduled and the old value returned until it completes.
+# roles_cache_active_update: false
+
 # Validity period for permissions cache (fetching permissions can be an
 # expensive operation depending on the authorizer, CassandraAuthorizer is
 # one example). Defaults to 2000, set to 0 to disable.
 # Will be disabled automatically for AllowAllAuthorizer.
+# For a long-running cache using permissions_cache_active_update, consider
+# setting to something longer such as a daily validation: 86400000
 permissions_validity_in_ms: 2000
 
 # Refresh interval for permissions cache (if enabled).
@@ -191,9 +204,18 @@ permissions_validity_in_ms: 2000
 # access, an async reload is scheduled and the old value returned until it
 # completes. If permissions_validity_in_ms is non-zero, then this must be
 # also.
+# This setting is also used to inform the interval of auto-updating if
+# using permissions_cache_active_update.
 # Defaults to the same value as permissions_validity_in_ms.
+# For a longer-running permissions cache, consider setting to update hourly 
(60000)
 # permissions_update_interval_in_ms: 2000
 
+# If true, cache contents are actively updated by a background task at the
+# interval set by permissions_update_interval_in_ms. If false, cache entries
+# become eligible for refresh after their update interval. Upon next access,
+# an async reload is scheduled and the old value returned until it completes.
+# permissions_cache_active_update: false
+
 # Validity period for credentials cache. This cache is tightly coupled to
 # the provided PasswordAuthenticator implementation of IAuthenticator. If
 # another IAuthenticator implementation is configured, this cache will not
@@ -203,6 +225,8 @@ permissions_validity_in_ms: 2000
 # underlying table, it may not  bring a significant reduction in the
 # latency of individual authentication attempts.
 # Defaults to 2000, set to 0 to disable credentials caching.
+# For a long-running cache using credentials_cache_active_update, consider
+# setting to something longer such as a daily validation: 86400000
 credentials_validity_in_ms: 2000
 
 # Refresh interval for credentials cache (if enabled).
@@ -210,9 +234,18 @@ credentials_validity_in_ms: 2000
 # access, an async reload is scheduled and the old value returned until it
 # completes. If credentials_validity_in_ms is non-zero, then this must be
 # also.
+# This setting is also used to inform the interval of auto-updating if
+# using credentials_cache_active_update.
 # Defaults to the same value as credentials_validity_in_ms.
+# For a longer-running permissions cache, consider setting to update hourly 
(60000)
 # credentials_update_interval_in_ms: 2000
 
+# If true, cache contents are actively updated by a background task at the
+# interval set by credentials_update_interval_in_ms. If false (default), cache 
entries
+# become eligible for refresh after their update interval. Upon next access,
+# an async reload is scheduled and the old value returned until it completes.
+# credentials_cache_active_update: false
+
 # The partitioner is responsible for distributing groups of rows (by
 # partition key) across nodes in the cluster. The partitioner can NOT be
 # changed without reloading all data.  If you are adding nodes or upgrading,
diff --git a/src/java/org/apache/cassandra/auth/AuthCache.java 
b/src/java/org/apache/cassandra/auth/AuthCache.java
index e69cd6c..a70afba 100644
--- a/src/java/org/apache/cassandra/auth/AuthCache.java
+++ b/src/java/org/apache/cassandra/auth/AuthCache.java
@@ -20,9 +20,12 @@ package org.apache.cassandra.auth;
 
 import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.function.BiPredicate;
 import java.util.function.BooleanSupplier;
+import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.IntConsumer;
 import java.util.function.IntSupplier;
@@ -32,7 +35,9 @@ import org.slf4j.LoggerFactory;
 
 import com.github.benmanes.caffeine.cache.Caffeine;
 import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.github.benmanes.caffeine.cache.Policy;
 import org.apache.cassandra.concurrent.ExecutorPlus;
+import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.concurrent.Shutdownable;
 import org.apache.cassandra.utils.ExecutorUtils;
 import org.apache.cassandra.utils.MBeanWrapper;
@@ -46,6 +51,8 @@ public class AuthCache<K, V> implements AuthCacheMBean, 
Shutdownable
 
     public static final String MBEAN_NAME_BASE = 
"org.apache.cassandra.auth:type=";
 
+    private volatile ScheduledFuture cacheRefresher = null;
+
     // Keep a handle on created instances so their executors can be terminated 
cleanly
     private static final Set<Shutdownable> REGISTRY = new HashSet<>(4);
 
@@ -60,15 +67,23 @@ public class AuthCache<K, V> implements AuthCacheMBean, 
Shutdownable
     protected volatile LoadingCache<K, V> cache;
     private ExecutorPlus cacheRefreshExecutor;
 
-    private String name;
-    private IntConsumer setValidityDelegate;
-    private IntSupplier getValidityDelegate;
-    private IntConsumer setUpdateIntervalDelegate;
-    private IntSupplier getUpdateIntervalDelegate;
-    private IntConsumer setMaxEntriesDelegate;
-    private IntSupplier getMaxEntriesDelegate;
-    private Function<K, V> loadFunction;
-    private BooleanSupplier enableCache;
+    private final String name;
+    private final IntConsumer setValidityDelegate;
+    private final IntSupplier getValidityDelegate;
+    private final IntConsumer setUpdateIntervalDelegate;
+    private final IntSupplier getUpdateIntervalDelegate;
+    private final IntConsumer setMaxEntriesDelegate;
+    private final IntSupplier getMaxEntriesDelegate;
+    private final Consumer<Boolean> setActiveUpdate;
+    private final BooleanSupplier getActiveUpdate;
+    private final Function<K, V> loadFunction;
+    private final BooleanSupplier enableCache;
+
+    // Determines whether the presence of a specific value should trigger the 
invalidation of
+    // the supplied key. Used by CredentialsCache & CacheRefresher to identify 
when the
+    // credentials for a role couldn't be loaded without throwing an exception 
or serving stale
+    // values until the natural expiry time.
+    private final BiPredicate<K, V> invalidateCondition;
 
     /**
      * @param name Used for MBean
@@ -78,6 +93,8 @@ public class AuthCache<K, V> implements AuthCacheMBean, 
Shutdownable
      * @param getUpdateIntervalDelegate Getter for update interval
      * @param setMaxEntriesDelegate Used to set max # entries in cache. See 
{@link com.github.benmanes.caffeine.cache.Policy.Eviction#setMaximum(long)}
      * @param getMaxEntriesDelegate Getter for max entries.
+     * @param setActiveUpdate Method to process config to actively update the 
auth cache prior to configured cache expiration
+     * @param getActiveUpdate Getter for active update
      * @param loadFunction Function to load the cache. Called on {@link 
#get(Object)}
      * @param cacheEnabledDelegate Used to determine if cache is enabled.
      */
@@ -88,9 +105,53 @@ public class AuthCache<K, V> implements AuthCacheMBean, 
Shutdownable
                         IntSupplier getUpdateIntervalDelegate,
                         IntConsumer setMaxEntriesDelegate,
                         IntSupplier getMaxEntriesDelegate,
+                        Consumer<Boolean> setActiveUpdate,
+                        BooleanSupplier getActiveUpdate,
                         Function<K, V> loadFunction,
                         BooleanSupplier cacheEnabledDelegate)
     {
+        this(name,
+             setValidityDelegate,
+             getValidityDelegate,
+             setUpdateIntervalDelegate,
+             getUpdateIntervalDelegate,
+             setMaxEntriesDelegate,
+             getMaxEntriesDelegate,
+             setActiveUpdate,
+             getActiveUpdate,
+             loadFunction,
+             cacheEnabledDelegate,
+             (k, v) -> false);
+    }
+
+    /**
+     * @param name Used for MBean
+     * @param setValidityDelegate Used to set cache validity period. See 
{@link Policy#expireAfterWrite()}
+     * @param getValidityDelegate Getter for validity period
+     * @param setUpdateIntervalDelegate Used to set cache update interval. See 
{@link Policy#refreshAfterWrite()}
+     * @param getUpdateIntervalDelegate Getter for update interval
+     * @param setMaxEntriesDelegate Used to set max # entries in cache. See 
{@link com.github.benmanes.caffeine.cache.Policy.Eviction#setMaximum(long)}
+     * @param getMaxEntriesDelegate Getter for max entries.
+     * @param setActiveUpdate Actively update the cache before expiry
+     * @param getActiveUpdate Getter for active update
+     * @param loadFunction Function to load the cache. Called on {@link 
#get(Object)}
+     * @param cacheEnabledDelegate Used to determine if cache is enabled.
+     * @param invalidationCondition Used during active updates to determine if 
a refreshed value indicates a missing
+     *                              entry in the underlying table. If 
satisfied, the key will be invalidated.
+     */
+    protected AuthCache(String name,
+                        IntConsumer setValidityDelegate,
+                        IntSupplier getValidityDelegate,
+                        IntConsumer setUpdateIntervalDelegate,
+                        IntSupplier getUpdateIntervalDelegate,
+                        IntConsumer setMaxEntriesDelegate,
+                        IntSupplier getMaxEntriesDelegate,
+                        Consumer<Boolean> setActiveUpdate,
+                        BooleanSupplier getActiveUpdate,
+                        Function<K, V> loadFunction,
+                        BooleanSupplier cacheEnabledDelegate,
+                        BiPredicate<K, V> invalidationCondition)
+    {
         this.name = checkNotNull(name);
         this.setValidityDelegate = checkNotNull(setValidityDelegate);
         this.getValidityDelegate = checkNotNull(getValidityDelegate);
@@ -98,8 +159,11 @@ public class AuthCache<K, V> implements AuthCacheMBean, 
Shutdownable
         this.getUpdateIntervalDelegate = 
checkNotNull(getUpdateIntervalDelegate);
         this.setMaxEntriesDelegate = checkNotNull(setMaxEntriesDelegate);
         this.getMaxEntriesDelegate = checkNotNull(getMaxEntriesDelegate);
+        this.setActiveUpdate = checkNotNull(setActiveUpdate);
+        this.getActiveUpdate = checkNotNull(getActiveUpdate);
         this.loadFunction = checkNotNull(loadFunction);
         this.enableCache = checkNotNull(cacheEnabledDelegate);
+        this.invalidateCondition = checkNotNull(invalidationCondition);
         init();
     }
 
@@ -143,7 +207,7 @@ public class AuthCache<K, V> implements AuthCacheMBean, 
Shutdownable
     /**
      * Invalidate the entire cache.
      */
-    public void invalidate()
+    public synchronized void invalidate()
     {
         cache = initCache(null);
     }
@@ -162,7 +226,7 @@ public class AuthCache<K, V> implements AuthCacheMBean, 
Shutdownable
      * Time in milliseconds that a value in the cache will expire after.
      * @param validityPeriod in milliseconds
      */
-    public void setValidity(int validityPeriod)
+    public synchronized void setValidity(int validityPeriod)
     {
         if 
(Boolean.getBoolean("cassandra.disable_auth_caches_remote_configuration"))
             throw new UnsupportedOperationException("Remote configuration of 
auth caches is disabled");
@@ -180,7 +244,7 @@ public class AuthCache<K, V> implements AuthCacheMBean, 
Shutdownable
      * Time in milliseconds after which an entry in the cache should be 
refreshed (it's load function called again)
      * @param updateInterval in milliseconds
      */
-    public void setUpdateInterval(int updateInterval)
+    public synchronized void setUpdateInterval(int updateInterval)
     {
         if 
(Boolean.getBoolean("cassandra.disable_auth_caches_remote_configuration"))
             throw new UnsupportedOperationException("Remote configuration of 
auth caches is disabled");
@@ -198,7 +262,7 @@ public class AuthCache<K, V> implements AuthCacheMBean, 
Shutdownable
      * Set maximum number of entries in the cache.
      * @param maxEntries
      */
-    public void setMaxEntries(int maxEntries)
+    public synchronized void setMaxEntries(int maxEntries)
     {
         if 
(Boolean.getBoolean("cassandra.disable_auth_caches_remote_configuration"))
             throw new UnsupportedOperationException("Remote configuration of 
auth caches is disabled");
@@ -212,6 +276,20 @@ public class AuthCache<K, V> implements AuthCacheMBean, 
Shutdownable
         return getMaxEntriesDelegate.getAsInt();
     }
 
+    public boolean getActiveUpdate()
+    {
+        return getActiveUpdate.getAsBoolean();
+    }
+
+    public synchronized void setActiveUpdate(boolean update)
+    {
+        if 
(Boolean.getBoolean("cassandra.disable_auth_caches_remote_configuration"))
+            throw new UnsupportedOperationException("Remote configuration of 
auth caches is disabled");
+
+        setActiveUpdate.accept(update);
+        cache = initCache(cache);
+    }
+
     /**
      * (Re-)initialise the underlying cache. Will update validity, max 
entries, and update interval if
      * any have changed. The underlying {@link LoadingCache} will be initiated 
based on the provided {@code loadFunction}.
@@ -227,26 +305,45 @@ public class AuthCache<K, V> implements AuthCacheMBean, 
Shutdownable
         if (getValidity() <= 0)
             return null;
 
-        logger.info("(Re)initializing {} (validity period/update interval/max 
entries) ({}/{}/{})",
-                    name, getValidity(), getUpdateInterval(), getMaxEntries());
+        boolean activeUpdate = getActiveUpdate();
+        logger.info("(Re)initializing {} (validity period/update interval/max 
entries/active update) ({}/{}/{}/{})",
+                    name, getValidity(), getUpdateInterval(), getMaxEntries(), 
activeUpdate);
+        LoadingCache<K, V> updatedCache;
+
+        if (existing == null)
+        {
+            updatedCache = 
Caffeine.newBuilder().refreshAfterWrite(activeUpdate ? getValidity() : 
getUpdateInterval(), TimeUnit.MILLISECONDS)
+                                   .expireAfterWrite(getValidity(), 
TimeUnit.MILLISECONDS)
+                                   .maximumSize(getMaxEntries())
+                                   .executor(cacheRefreshExecutor)
+                                   .build(loadFunction::apply);
+        }
+        else
+        {
+            updatedCache = cache;
+            // Always set as mandatory
+            cache.policy().refreshAfterWrite().ifPresent(policy ->
+                policy.setExpiresAfter(activeUpdate ? getValidity() : 
getUpdateInterval(), TimeUnit.MILLISECONDS));
+            cache.policy().expireAfterWrite().ifPresent(policy -> 
policy.setExpiresAfter(getValidity(), TimeUnit.MILLISECONDS));
+            cache.policy().eviction().ifPresent(policy -> 
policy.setMaximum(getMaxEntries()));
+        }
 
-        if (existing == null) {
-          return Caffeine.newBuilder()
-                         .refreshAfterWrite(getUpdateInterval(), 
TimeUnit.MILLISECONDS)
-                         .expireAfterWrite(getValidity(), 
TimeUnit.MILLISECONDS)
-                         .maximumSize(getMaxEntries())
-                         .executor(cacheRefreshExecutor)
-                         .build(loadFunction::apply);
+        if (cacheRefresher != null)
+        {
+            cacheRefresher.cancel(false); // permit the two refreshers to race 
until the old one dies, should be harmless.
+            cacheRefresher = null;
         }
 
-        // Always set as mandatory
-        cache.policy().refreshAfterWrite().ifPresent(policy ->
-            policy.setExpiresAfter(getUpdateInterval(), 
TimeUnit.MILLISECONDS));
-        cache.policy().expireAfterWrite().ifPresent(policy ->
-            policy.setExpiresAfter(getValidity(), TimeUnit.MILLISECONDS));
-        cache.policy().eviction().ifPresent(policy ->
-            policy.setMaximum(getMaxEntries()));
-        return cache;
+        if (activeUpdate)
+        {
+            cacheRefresher = 
ScheduledExecutors.optionalTasks.scheduleAtFixedRate(CacheRefresher.create(name,
+                                                                               
                         updatedCache,
+                                                                               
                         invalidateCondition),
+                                                                               
   getUpdateInterval(),
+                                                                               
   getUpdateInterval(),
+                                                                               
   TimeUnit.MILLISECONDS);
+        }
+        return updatedCache;
     }
 
     @Override
diff --git a/src/java/org/apache/cassandra/auth/AuthCacheMBean.java 
b/src/java/org/apache/cassandra/auth/AuthCacheMBean.java
index 43fb88e..3f6247a 100644
--- a/src/java/org/apache/cassandra/auth/AuthCacheMBean.java
+++ b/src/java/org/apache/cassandra/auth/AuthCacheMBean.java
@@ -33,4 +33,8 @@ public interface AuthCacheMBean
     public void setMaxEntries(int maxEntries);
 
     public int getMaxEntries();
+
+    public boolean getActiveUpdate();
+
+    public void setActiveUpdate(boolean update);
 }
diff --git a/src/java/org/apache/cassandra/auth/CacheRefresher.java 
b/src/java/org/apache/cassandra/auth/CacheRefresher.java
new file mode 100644
index 0000000..a199601
--- /dev/null
+++ b/src/java/org/apache/cassandra/auth/CacheRefresher.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.auth;
+
+import java.util.Set;
+import java.util.function.BiPredicate;
+import java.util.function.BooleanSupplier;
+
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.google.common.annotations.VisibleForTesting;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.service.StorageService;
+
+public class CacheRefresher<K, V> implements Runnable
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(CacheRefresher.class);
+
+    private final String name;
+    private final LoadingCache<K, V> cache;
+    private final BiPredicate<K, V> invalidationCondition;
+    private final BooleanSupplier skipCondition;
+
+    private CacheRefresher(String name, LoadingCache<K, V> cache,  
BiPredicate<K, V> invalidationCondition, BooleanSupplier skipCondition)
+    {
+        this.name = name;
+        this.cache = cache;
+        this.invalidationCondition = invalidationCondition;
+        this.skipCondition = skipCondition;
+    }
+
+    public void run()
+    {
+        if (skipCondition.getAsBoolean())
+        {
+            logger.debug("Skipping {} cache refresh", name);
+            return;
+        }
+
+        try
+        {
+            logger.debug("Refreshing {} cache", name);
+            Set<K> ks = cache.asMap().keySet();
+            for (K key : ks)
+            {
+                cache.refresh(key);
+                V value = cache.getIfPresent(key);
+                if (invalidationCondition.test(key, value))
+                {
+                    logger.debug("Invalidating key");
+                    cache.invalidate(key);
+                }
+            }
+        }
+        catch (Exception e)
+        {
+            logger.error("Unexpected exception refreshing {} cache", name, e);
+        }
+    }
+
+    @VisibleForTesting
+    public static <K, V> CacheRefresher<K, V> create(String name,
+                                                     LoadingCache<K, V> cache,
+                                                     BiPredicate<K, V> 
invalidationCondition,
+                                                     BooleanSupplier 
skipCondition)
+    {
+        logger.info("Creating CacheRefresher for {}", name);
+        return new CacheRefresher<>(name, cache, invalidationCondition, 
skipCondition);
+    }
+
+    public static <K, V> CacheRefresher<K, V> create(String name, 
LoadingCache<K, V> cache, BiPredicate<K, V> invalidationCondition)
+    {
+        // By default we skip cache refreshes if the node has been decommed
+        return create(name, cache, invalidationCondition, 
StorageService.instance::isDecommissioned);
+    }
+}
\ No newline at end of file
diff --git a/src/java/org/apache/cassandra/auth/NetworkPermissionsCache.java 
b/src/java/org/apache/cassandra/auth/NetworkPermissionsCache.java
index 72817a9..b2e8707 100644
--- a/src/java/org/apache/cassandra/auth/NetworkPermissionsCache.java
+++ b/src/java/org/apache/cassandra/auth/NetworkPermissionsCache.java
@@ -32,6 +32,8 @@ public class NetworkPermissionsCache extends 
AuthCache<RoleResource, DCPermissio
               DatabaseDescriptor::getRolesUpdateInterval,
               DatabaseDescriptor::setRolesCacheMaxEntries,
               DatabaseDescriptor::getRolesCacheMaxEntries,
+              DatabaseDescriptor::setRolesCacheActiveUpdate,
+              DatabaseDescriptor::getRolesCacheActiveUpdate,
               authorizer::authorize,
               () -> 
DatabaseDescriptor.getAuthenticator().requireAuthentication());
 
diff --git a/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java 
b/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java
index 098ed9f..ac1dbb9 100644
--- a/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java
+++ b/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java
@@ -60,6 +60,9 @@ public class PasswordAuthenticator implements IAuthenticator
 {
     private static final Logger logger = 
LoggerFactory.getLogger(PasswordAuthenticator.class);
 
+    /** We intentionally use an empty string sentinel to allow object equality 
comparison */
+    private static final String NO_SUCH_CREDENTIAL = "";
+
     // name of the hash column.
     private static final String SALTED_HASH = "salted_hash";
 
@@ -95,6 +98,27 @@ public class PasswordAuthenticator implements IAuthenticator
     private AuthenticatedUser authenticate(String username, String password) 
throws AuthenticationException
     {
         String hash = cache.get(username);
+
+        // intentional use of object equality
+        if (hash == NO_SUCH_CREDENTIAL)
+        {
+            // The cache was unable to load credentials via 
queryHashedPassword, probably because the supplied
+            // rolename doesn't exist. If caching is enabled we will have now 
cached the sentinel value for that key
+            // so we should invalidate it otherwise the cache will continue to 
serve that until it expires which
+            // will be a problem if the role is added in the meantime.
+            //
+            // We can't just throw the AuthenticationException directly from 
queryHashedPassword for a similar reason:
+            // if an existing role is dropped and active updates are enabled 
for the cache, the refresh in
+            // CacheRefresher::run will log and swallow the exception and keep 
serving the stale credentials until they
+            // eventually expire.
+            //
+            // So whenever we encounter the sentinal value, here and also in 
CacheRefresher (if active updates are
+            // enabled), we manually expunge the key from the cache. If 
caching is not enabled, AuthCache::invalidate
+            // is a safe no-op.
+            cache.invalidateCredentials(username);
+            throw new AuthenticationException(String.format("Provided username 
%s and/or password are incorrect", username));
+        }
+
         if (!checkpw(password, hash))
             throw new AuthenticationException(String.format("Provided username 
%s and/or password are incorrect", username));
 
@@ -111,14 +135,15 @@ public class PasswordAuthenticator implements 
IAuthenticator
             ResultMessage.Rows rows = select(authenticateStatement, options);
 
             // If either a non-existent role name was supplied, or no 
credentials
-            // were found for that role we don't want to cache the result so 
we throw
-            // an exception.
+            // were found for that role, we don't want to cache the result so 
we
+            // return a sentinel value. On receiving the sentinel, the caller 
can
+            // invalidate the cache and throw an appropriate exception.
             if (rows.result.isEmpty())
-                throw new AuthenticationException(String.format("Provided 
username %s and/or password are incorrect", username));
+                return NO_SUCH_CREDENTIAL;
 
             UntypedResultSet result = UntypedResultSet.create(rows.result);
             if (!result.one().has(SALTED_HASH))
-                throw new AuthenticationException(String.format("Provided 
username %s and/or password are incorrect", username));
+                return NO_SUCH_CREDENTIAL;
 
             return result.one().getString(SALTED_HASH);
         }
@@ -257,8 +282,12 @@ public class PasswordAuthenticator implements 
IAuthenticator
                   DatabaseDescriptor::getCredentialsUpdateInterval,
                   DatabaseDescriptor::setCredentialsCacheMaxEntries,
                   DatabaseDescriptor::getCredentialsCacheMaxEntries,
+                  DatabaseDescriptor::setCredentialsCacheActiveUpdate,
+                  DatabaseDescriptor::getCredentialsCacheActiveUpdate,
                   authenticator::queryHashedPassword,
-                  () -> true);
+                  () -> true,
+                  (k,v) -> NO_SUCH_CREDENTIAL == v); // use a known object as 
a sentinel value. CacheRefresher will
+                                                     // invalidate the key if 
the sentinel is loaded during a refresh
         }
 
         public void invalidateCredentials(String roleName)
diff --git a/src/java/org/apache/cassandra/auth/PermissionsCache.java 
b/src/java/org/apache/cassandra/auth/PermissionsCache.java
index a649c35..fd1fce8 100644
--- a/src/java/org/apache/cassandra/auth/PermissionsCache.java
+++ b/src/java/org/apache/cassandra/auth/PermissionsCache.java
@@ -34,6 +34,8 @@ public class PermissionsCache extends 
AuthCache<Pair<AuthenticatedUser, IResourc
               DatabaseDescriptor::getPermissionsUpdateInterval,
               DatabaseDescriptor::setPermissionsCacheMaxEntries,
               DatabaseDescriptor::getPermissionsCacheMaxEntries,
+              DatabaseDescriptor::setPermissionsCacheActiveUpdate,
+              DatabaseDescriptor::getPermissionsCacheActiveUpdate,
               (p) -> authorizer.authorize(p.left, p.right),
               () -> DatabaseDescriptor.getAuthorizer().requireAuthorization());
     }
diff --git a/src/java/org/apache/cassandra/auth/RolesCache.java 
b/src/java/org/apache/cassandra/auth/RolesCache.java
index 62fecfb..05a5759 100644
--- a/src/java/org/apache/cassandra/auth/RolesCache.java
+++ b/src/java/org/apache/cassandra/auth/RolesCache.java
@@ -34,6 +34,8 @@ public class RolesCache extends AuthCache<RoleResource, 
Set<Role>> implements Ro
               DatabaseDescriptor::getRolesUpdateInterval,
               DatabaseDescriptor::setRolesCacheMaxEntries,
               DatabaseDescriptor::getRolesCacheMaxEntries,
+              DatabaseDescriptor::setRolesCacheActiveUpdate,
+              DatabaseDescriptor::getRolesCacheActiveUpdate,
               roleManager::getRoleDetails,
               enableCache);
     }
diff --git a/src/java/org/apache/cassandra/auth/jmx/AuthorizationProxy.java 
b/src/java/org/apache/cassandra/auth/jmx/AuthorizationProxy.java
index 9179062..77bd5c0 100644
--- a/src/java/org/apache/cassandra/auth/jmx/AuthorizationProxy.java
+++ b/src/java/org/apache/cassandra/auth/jmx/AuthorizationProxy.java
@@ -489,6 +489,8 @@ public class AuthorizationProxy implements InvocationHandler
                   DatabaseDescriptor::getPermissionsUpdateInterval,
                   DatabaseDescriptor::setPermissionsCacheMaxEntries,
                   DatabaseDescriptor::getPermissionsCacheMaxEntries,
+                  DatabaseDescriptor::setPermissionsCacheActiveUpdate,
+                  DatabaseDescriptor::getPermissionsCacheActiveUpdate,
                   AuthorizationProxy::loadPermissions,
                   () -> true);
 
diff --git a/src/java/org/apache/cassandra/config/Config.java 
b/src/java/org/apache/cassandra/config/Config.java
index 0dc4180..5a1d048 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -59,12 +59,15 @@ public class Config
     public volatile int permissions_validity_in_ms = 2000;
     public volatile int permissions_cache_max_entries = 1000;
     public volatile int permissions_update_interval_in_ms = -1;
+    public volatile boolean permissions_cache_active_update = false;
     public volatile int roles_validity_in_ms = 2000;
     public volatile int roles_cache_max_entries = 1000;
     public volatile int roles_update_interval_in_ms = -1;
+    public volatile boolean roles_cache_active_update = false;
     public volatile int credentials_validity_in_ms = 2000;
     public volatile int credentials_cache_max_entries = 1000;
     public volatile int credentials_update_interval_in_ms = -1;
+    public volatile boolean credentials_cache_active_update = false;
 
     /* Hashing strategy Random or OPHF */
     public String partitioner;
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index d246fc7..3ef1603 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1286,6 +1286,16 @@ public class DatabaseDescriptor
         return conf.permissions_cache_max_entries = maxEntries;
     }
 
+    public static boolean getPermissionsCacheActiveUpdate()
+    {
+        return conf.permissions_cache_active_update;
+    }
+
+    public static void setPermissionsCacheActiveUpdate(boolean update)
+    {
+        conf.permissions_cache_active_update = update;
+    }
+
     public static int getRolesValidity()
     {
         return conf.roles_validity_in_ms;
@@ -1303,6 +1313,16 @@ public class DatabaseDescriptor
              : conf.roles_update_interval_in_ms;
     }
 
+    public static void setRolesCacheActiveUpdate(boolean update)
+    {
+        conf.roles_cache_active_update = update;
+    }
+
+    public static boolean getRolesCacheActiveUpdate()
+    {
+        return conf.roles_cache_active_update;
+    }
+
     public static void setRolesUpdateInterval(int interval)
     {
         conf.roles_update_interval_in_ms = interval;
@@ -1350,6 +1370,16 @@ public class DatabaseDescriptor
         return conf.credentials_cache_max_entries = maxEntries;
     }
 
+    public static boolean getCredentialsCacheActiveUpdate()
+    {
+        return conf.credentials_cache_active_update;
+    }
+
+    public static void setCredentialsCacheActiveUpdate(boolean update)
+    {
+        conf.credentials_cache_active_update = update;
+    }
+
     public static int getMaxValueSize()
     {
         return conf.max_value_size_in_mb * 1024 * 1024;
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index 5015f75..c1ad9cb 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -4974,6 +4974,11 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         return operationMode == Mode.NORMAL;
     }
 
+    public boolean isDecommissioned()
+    {
+        return operationMode == Mode.DECOMMISSIONED;
+    }
+
     public String getDrainProgress()
     {
         return String.format("Drained %s/%s ColumnFamilies", remainingCFs, 
totalCFs);
diff --git a/test/unit/org/apache/cassandra/auth/AuthCacheTest.java 
b/test/unit/org/apache/cassandra/auth/AuthCacheTest.java
index da97225..15e6b1f 100644
--- a/test/unit/org/apache/cassandra/auth/AuthCacheTest.java
+++ b/test/unit/org/apache/cassandra/auth/AuthCacheTest.java
@@ -258,6 +258,8 @@ public class AuthCacheTest
                   () -> 1000,
                   (maxEntries) -> {},
                   () -> 10,
+                  (updateActiveUpdate) -> {},
+                  () -> false,
                   loadFunction,
                   cacheEnabledDelegate);
         }
diff --git a/test/unit/org/apache/cassandra/auth/CacheRefresherTest.java 
b/test/unit/org/apache/cassandra/auth/CacheRefresherTest.java
new file mode 100644
index 0000000..3340d82
--- /dev/null
+++ b/test/unit/org/apache/cassandra/auth/CacheRefresherTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.auth;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BooleanSupplier;
+
+import com.google.common.util.concurrent.MoreExecutors;
+
+import com.github.benmanes.caffeine.cache.CacheLoader;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class CacheRefresherTest
+{
+    @Test
+    public void refresh() throws Exception
+    {
+        Map<String, String> src = new HashMap<>();
+        CacheLoader<String, String> loader = src::get;
+
+        // Supply the directExecutor so the refresh() call executes within the 
refresher task like AuthCache (rather than async)
+        LoadingCache<String, String> cache = Caffeine.newBuilder()
+                                                     
.executor(MoreExecutors.directExecutor())
+                                                     .build(loader);
+
+        AtomicBoolean skipRefresh = new AtomicBoolean(false);
+        BooleanSupplier skipCondition = skipRefresh::get;
+
+        CacheRefresher<String, String> refresher = 
CacheRefresher.create("test", cache, (k, v) -> v.equals("removed"), 
skipCondition);
+        src.put("some", "thing");
+        Assert.assertEquals("thing", cache.get("some"));
+
+        // Cache should still have old value...
+        src.put("some", "one");
+        Assert.assertEquals("thing", cache.get("some"));
+
+        // ... but refresher should update it
+        refresher.run();
+        Assert.assertEquals("one", cache.get("some"));
+
+        // If we just remove the value from the src, the cache should still 
contain it
+        src.remove("some");
+        Assert.assertEquals("one", cache.get("some"));
+
+        // If we insert the special sentinel value into the src, the refresher 
will invalidate it from the cache.
+        // This time when it's removed from the underlying storage, it's not 
returned from the cache
+        src.put("some", "removed");
+        refresher.run();
+        src.remove("some");
+
+        // Remove from src
+        Assert.assertNull(cache.get("some"));
+
+        // If the skip condition returns true, don't refresh
+        src.put("some", "one");
+        Assert.assertEquals("one", cache.get("some"));
+        skipRefresh.set(true);
+        src.put("some", "body");
+        refresher.run();
+        Assert.assertEquals("one", cache.get("some"));
+
+        // Change the skip condition back to false and refresh
+        skipRefresh.set(false);
+        refresher.run();
+        Assert.assertEquals("body", cache.get("some"));
+    }
+}
\ No newline at end of file

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to