This is an automated email from the ASF dual-hosted git repository. jmckenzie pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new eae7b9c Actively update auth caches in the background eae7b9c is described below commit eae7b9c3ade386f28c5f0c7ee015b0d0445388ac Author: Blake Eggleston <bdeggles...@gmail.com> AuthorDate: Wed Sep 15 10:52:46 2021 -0400 Actively update auth caches in the background Patch by Blake Eggleston; reviewed by Sam Tunnicliffe, Jason Brown, and Caleb Rackliffe for CASSANDRA-16957 Co-authored-by: Blake Eggleston <bdeggles...@gmail.com> Co-authored-by: Josh McKenzie <jmcken...@apache.org> --- CHANGES.txt | 1 + conf/cassandra.yaml | 33 +++++ src/java/org/apache/cassandra/auth/AuthCache.java | 157 +++++++++++++++++---- .../org/apache/cassandra/auth/AuthCacheMBean.java | 4 + .../org/apache/cassandra/auth/CacheRefresher.java | 94 ++++++++++++ .../cassandra/auth/NetworkPermissionsCache.java | 2 + .../cassandra/auth/PasswordAuthenticator.java | 39 ++++- .../apache/cassandra/auth/PermissionsCache.java | 2 + src/java/org/apache/cassandra/auth/RolesCache.java | 2 + .../cassandra/auth/jmx/AuthorizationProxy.java | 2 + src/java/org/apache/cassandra/config/Config.java | 3 + .../cassandra/config/DatabaseDescriptor.java | 30 ++++ .../apache/cassandra/service/StorageService.java | 5 + .../org/apache/cassandra/auth/AuthCacheTest.java | 2 + .../apache/cassandra/auth/CacheRefresherTest.java | 88 ++++++++++++ 15 files changed, 429 insertions(+), 35 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index f436917..1cfb1fe 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.1 + * Actively update auth cache in the background (CASSANDRA-16957) * Add unix time conversion functions (CASSANDRA-17029) * JVMStabilityInspector.forceHeapSpaceOomMaybe should handle all non-heap OOMs rather than only supporting direct only (CASSANDRA-17128) * Forbid other Future implementations with checkstyle (CASSANDRA-17055) diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index 92a91c6..12221ca 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -170,6 +170,8 @@ network_authorizer: AllowAllNetworkAuthorizer # after the period specified here, become eligible for (async) reload. # Defaults to 2000, set to 0 to disable caching entirely. # Will be disabled automatically for AllowAllAuthenticator. +# For a long-running cache using roles_cache_active_update, consider +# setting to something longer such as a daily validation: 86400000 roles_validity_in_ms: 2000 # Refresh interval for roles cache (if enabled). @@ -177,13 +179,24 @@ roles_validity_in_ms: 2000 # access, an async reload is scheduled and the old value returned until it # completes. If roles_validity_in_ms is non-zero, then this must be # also. +# This setting is also used to inform the interval of auto-updating if +# using roles_cache_active_update. # Defaults to the same value as roles_validity_in_ms. +# For a long-running cache, consider setting this to 60000 (1 hour) etc. # roles_update_interval_in_ms: 2000 +# If true, cache contents are actively updated by a background task at the +# interval set by roles_update_interval_in_ms. If false, cache entries +# become eligible for refresh after their update interval. Upon next access, +# an async reload is scheduled and the old value returned until it completes. +# roles_cache_active_update: false + # Validity period for permissions cache (fetching permissions can be an # expensive operation depending on the authorizer, CassandraAuthorizer is # one example). Defaults to 2000, set to 0 to disable. # Will be disabled automatically for AllowAllAuthorizer. +# For a long-running cache using permissions_cache_active_update, consider +# setting to something longer such as a daily validation: 86400000 permissions_validity_in_ms: 2000 # Refresh interval for permissions cache (if enabled). @@ -191,9 +204,18 @@ permissions_validity_in_ms: 2000 # access, an async reload is scheduled and the old value returned until it # completes. If permissions_validity_in_ms is non-zero, then this must be # also. +# This setting is also used to inform the interval of auto-updating if +# using permissions_cache_active_update. # Defaults to the same value as permissions_validity_in_ms. +# For a longer-running permissions cache, consider setting to update hourly (60000) # permissions_update_interval_in_ms: 2000 +# If true, cache contents are actively updated by a background task at the +# interval set by permissions_update_interval_in_ms. If false, cache entries +# become eligible for refresh after their update interval. Upon next access, +# an async reload is scheduled and the old value returned until it completes. +# permissions_cache_active_update: false + # Validity period for credentials cache. This cache is tightly coupled to # the provided PasswordAuthenticator implementation of IAuthenticator. If # another IAuthenticator implementation is configured, this cache will not @@ -203,6 +225,8 @@ permissions_validity_in_ms: 2000 # underlying table, it may not bring a significant reduction in the # latency of individual authentication attempts. # Defaults to 2000, set to 0 to disable credentials caching. +# For a long-running cache using credentials_cache_active_update, consider +# setting to something longer such as a daily validation: 86400000 credentials_validity_in_ms: 2000 # Refresh interval for credentials cache (if enabled). @@ -210,9 +234,18 @@ credentials_validity_in_ms: 2000 # access, an async reload is scheduled and the old value returned until it # completes. If credentials_validity_in_ms is non-zero, then this must be # also. +# This setting is also used to inform the interval of auto-updating if +# using credentials_cache_active_update. # Defaults to the same value as credentials_validity_in_ms. +# For a longer-running permissions cache, consider setting to update hourly (60000) # credentials_update_interval_in_ms: 2000 +# If true, cache contents are actively updated by a background task at the +# interval set by credentials_update_interval_in_ms. If false (default), cache entries +# become eligible for refresh after their update interval. Upon next access, +# an async reload is scheduled and the old value returned until it completes. +# credentials_cache_active_update: false + # The partitioner is responsible for distributing groups of rows (by # partition key) across nodes in the cluster. The partitioner can NOT be # changed without reloading all data. If you are adding nodes or upgrading, diff --git a/src/java/org/apache/cassandra/auth/AuthCache.java b/src/java/org/apache/cassandra/auth/AuthCache.java index e69cd6c..a70afba 100644 --- a/src/java/org/apache/cassandra/auth/AuthCache.java +++ b/src/java/org/apache/cassandra/auth/AuthCache.java @@ -20,9 +20,12 @@ package org.apache.cassandra.auth; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.BiPredicate; import java.util.function.BooleanSupplier; +import java.util.function.Consumer; import java.util.function.Function; import java.util.function.IntConsumer; import java.util.function.IntSupplier; @@ -32,7 +35,9 @@ import org.slf4j.LoggerFactory; import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.LoadingCache; +import com.github.benmanes.caffeine.cache.Policy; import org.apache.cassandra.concurrent.ExecutorPlus; +import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.concurrent.Shutdownable; import org.apache.cassandra.utils.ExecutorUtils; import org.apache.cassandra.utils.MBeanWrapper; @@ -46,6 +51,8 @@ public class AuthCache<K, V> implements AuthCacheMBean, Shutdownable public static final String MBEAN_NAME_BASE = "org.apache.cassandra.auth:type="; + private volatile ScheduledFuture cacheRefresher = null; + // Keep a handle on created instances so their executors can be terminated cleanly private static final Set<Shutdownable> REGISTRY = new HashSet<>(4); @@ -60,15 +67,23 @@ public class AuthCache<K, V> implements AuthCacheMBean, Shutdownable protected volatile LoadingCache<K, V> cache; private ExecutorPlus cacheRefreshExecutor; - private String name; - private IntConsumer setValidityDelegate; - private IntSupplier getValidityDelegate; - private IntConsumer setUpdateIntervalDelegate; - private IntSupplier getUpdateIntervalDelegate; - private IntConsumer setMaxEntriesDelegate; - private IntSupplier getMaxEntriesDelegate; - private Function<K, V> loadFunction; - private BooleanSupplier enableCache; + private final String name; + private final IntConsumer setValidityDelegate; + private final IntSupplier getValidityDelegate; + private final IntConsumer setUpdateIntervalDelegate; + private final IntSupplier getUpdateIntervalDelegate; + private final IntConsumer setMaxEntriesDelegate; + private final IntSupplier getMaxEntriesDelegate; + private final Consumer<Boolean> setActiveUpdate; + private final BooleanSupplier getActiveUpdate; + private final Function<K, V> loadFunction; + private final BooleanSupplier enableCache; + + // Determines whether the presence of a specific value should trigger the invalidation of + // the supplied key. Used by CredentialsCache & CacheRefresher to identify when the + // credentials for a role couldn't be loaded without throwing an exception or serving stale + // values until the natural expiry time. + private final BiPredicate<K, V> invalidateCondition; /** * @param name Used for MBean @@ -78,6 +93,8 @@ public class AuthCache<K, V> implements AuthCacheMBean, Shutdownable * @param getUpdateIntervalDelegate Getter for update interval * @param setMaxEntriesDelegate Used to set max # entries in cache. See {@link com.github.benmanes.caffeine.cache.Policy.Eviction#setMaximum(long)} * @param getMaxEntriesDelegate Getter for max entries. + * @param setActiveUpdate Method to process config to actively update the auth cache prior to configured cache expiration + * @param getActiveUpdate Getter for active update * @param loadFunction Function to load the cache. Called on {@link #get(Object)} * @param cacheEnabledDelegate Used to determine if cache is enabled. */ @@ -88,9 +105,53 @@ public class AuthCache<K, V> implements AuthCacheMBean, Shutdownable IntSupplier getUpdateIntervalDelegate, IntConsumer setMaxEntriesDelegate, IntSupplier getMaxEntriesDelegate, + Consumer<Boolean> setActiveUpdate, + BooleanSupplier getActiveUpdate, Function<K, V> loadFunction, BooleanSupplier cacheEnabledDelegate) { + this(name, + setValidityDelegate, + getValidityDelegate, + setUpdateIntervalDelegate, + getUpdateIntervalDelegate, + setMaxEntriesDelegate, + getMaxEntriesDelegate, + setActiveUpdate, + getActiveUpdate, + loadFunction, + cacheEnabledDelegate, + (k, v) -> false); + } + + /** + * @param name Used for MBean + * @param setValidityDelegate Used to set cache validity period. See {@link Policy#expireAfterWrite()} + * @param getValidityDelegate Getter for validity period + * @param setUpdateIntervalDelegate Used to set cache update interval. See {@link Policy#refreshAfterWrite()} + * @param getUpdateIntervalDelegate Getter for update interval + * @param setMaxEntriesDelegate Used to set max # entries in cache. See {@link com.github.benmanes.caffeine.cache.Policy.Eviction#setMaximum(long)} + * @param getMaxEntriesDelegate Getter for max entries. + * @param setActiveUpdate Actively update the cache before expiry + * @param getActiveUpdate Getter for active update + * @param loadFunction Function to load the cache. Called on {@link #get(Object)} + * @param cacheEnabledDelegate Used to determine if cache is enabled. + * @param invalidationCondition Used during active updates to determine if a refreshed value indicates a missing + * entry in the underlying table. If satisfied, the key will be invalidated. + */ + protected AuthCache(String name, + IntConsumer setValidityDelegate, + IntSupplier getValidityDelegate, + IntConsumer setUpdateIntervalDelegate, + IntSupplier getUpdateIntervalDelegate, + IntConsumer setMaxEntriesDelegate, + IntSupplier getMaxEntriesDelegate, + Consumer<Boolean> setActiveUpdate, + BooleanSupplier getActiveUpdate, + Function<K, V> loadFunction, + BooleanSupplier cacheEnabledDelegate, + BiPredicate<K, V> invalidationCondition) + { this.name = checkNotNull(name); this.setValidityDelegate = checkNotNull(setValidityDelegate); this.getValidityDelegate = checkNotNull(getValidityDelegate); @@ -98,8 +159,11 @@ public class AuthCache<K, V> implements AuthCacheMBean, Shutdownable this.getUpdateIntervalDelegate = checkNotNull(getUpdateIntervalDelegate); this.setMaxEntriesDelegate = checkNotNull(setMaxEntriesDelegate); this.getMaxEntriesDelegate = checkNotNull(getMaxEntriesDelegate); + this.setActiveUpdate = checkNotNull(setActiveUpdate); + this.getActiveUpdate = checkNotNull(getActiveUpdate); this.loadFunction = checkNotNull(loadFunction); this.enableCache = checkNotNull(cacheEnabledDelegate); + this.invalidateCondition = checkNotNull(invalidationCondition); init(); } @@ -143,7 +207,7 @@ public class AuthCache<K, V> implements AuthCacheMBean, Shutdownable /** * Invalidate the entire cache. */ - public void invalidate() + public synchronized void invalidate() { cache = initCache(null); } @@ -162,7 +226,7 @@ public class AuthCache<K, V> implements AuthCacheMBean, Shutdownable * Time in milliseconds that a value in the cache will expire after. * @param validityPeriod in milliseconds */ - public void setValidity(int validityPeriod) + public synchronized void setValidity(int validityPeriod) { if (Boolean.getBoolean("cassandra.disable_auth_caches_remote_configuration")) throw new UnsupportedOperationException("Remote configuration of auth caches is disabled"); @@ -180,7 +244,7 @@ public class AuthCache<K, V> implements AuthCacheMBean, Shutdownable * Time in milliseconds after which an entry in the cache should be refreshed (it's load function called again) * @param updateInterval in milliseconds */ - public void setUpdateInterval(int updateInterval) + public synchronized void setUpdateInterval(int updateInterval) { if (Boolean.getBoolean("cassandra.disable_auth_caches_remote_configuration")) throw new UnsupportedOperationException("Remote configuration of auth caches is disabled"); @@ -198,7 +262,7 @@ public class AuthCache<K, V> implements AuthCacheMBean, Shutdownable * Set maximum number of entries in the cache. * @param maxEntries */ - public void setMaxEntries(int maxEntries) + public synchronized void setMaxEntries(int maxEntries) { if (Boolean.getBoolean("cassandra.disable_auth_caches_remote_configuration")) throw new UnsupportedOperationException("Remote configuration of auth caches is disabled"); @@ -212,6 +276,20 @@ public class AuthCache<K, V> implements AuthCacheMBean, Shutdownable return getMaxEntriesDelegate.getAsInt(); } + public boolean getActiveUpdate() + { + return getActiveUpdate.getAsBoolean(); + } + + public synchronized void setActiveUpdate(boolean update) + { + if (Boolean.getBoolean("cassandra.disable_auth_caches_remote_configuration")) + throw new UnsupportedOperationException("Remote configuration of auth caches is disabled"); + + setActiveUpdate.accept(update); + cache = initCache(cache); + } + /** * (Re-)initialise the underlying cache. Will update validity, max entries, and update interval if * any have changed. The underlying {@link LoadingCache} will be initiated based on the provided {@code loadFunction}. @@ -227,26 +305,45 @@ public class AuthCache<K, V> implements AuthCacheMBean, Shutdownable if (getValidity() <= 0) return null; - logger.info("(Re)initializing {} (validity period/update interval/max entries) ({}/{}/{})", - name, getValidity(), getUpdateInterval(), getMaxEntries()); + boolean activeUpdate = getActiveUpdate(); + logger.info("(Re)initializing {} (validity period/update interval/max entries/active update) ({}/{}/{}/{})", + name, getValidity(), getUpdateInterval(), getMaxEntries(), activeUpdate); + LoadingCache<K, V> updatedCache; + + if (existing == null) + { + updatedCache = Caffeine.newBuilder().refreshAfterWrite(activeUpdate ? getValidity() : getUpdateInterval(), TimeUnit.MILLISECONDS) + .expireAfterWrite(getValidity(), TimeUnit.MILLISECONDS) + .maximumSize(getMaxEntries()) + .executor(cacheRefreshExecutor) + .build(loadFunction::apply); + } + else + { + updatedCache = cache; + // Always set as mandatory + cache.policy().refreshAfterWrite().ifPresent(policy -> + policy.setExpiresAfter(activeUpdate ? getValidity() : getUpdateInterval(), TimeUnit.MILLISECONDS)); + cache.policy().expireAfterWrite().ifPresent(policy -> policy.setExpiresAfter(getValidity(), TimeUnit.MILLISECONDS)); + cache.policy().eviction().ifPresent(policy -> policy.setMaximum(getMaxEntries())); + } - if (existing == null) { - return Caffeine.newBuilder() - .refreshAfterWrite(getUpdateInterval(), TimeUnit.MILLISECONDS) - .expireAfterWrite(getValidity(), TimeUnit.MILLISECONDS) - .maximumSize(getMaxEntries()) - .executor(cacheRefreshExecutor) - .build(loadFunction::apply); + if (cacheRefresher != null) + { + cacheRefresher.cancel(false); // permit the two refreshers to race until the old one dies, should be harmless. + cacheRefresher = null; } - // Always set as mandatory - cache.policy().refreshAfterWrite().ifPresent(policy -> - policy.setExpiresAfter(getUpdateInterval(), TimeUnit.MILLISECONDS)); - cache.policy().expireAfterWrite().ifPresent(policy -> - policy.setExpiresAfter(getValidity(), TimeUnit.MILLISECONDS)); - cache.policy().eviction().ifPresent(policy -> - policy.setMaximum(getMaxEntries())); - return cache; + if (activeUpdate) + { + cacheRefresher = ScheduledExecutors.optionalTasks.scheduleAtFixedRate(CacheRefresher.create(name, + updatedCache, + invalidateCondition), + getUpdateInterval(), + getUpdateInterval(), + TimeUnit.MILLISECONDS); + } + return updatedCache; } @Override diff --git a/src/java/org/apache/cassandra/auth/AuthCacheMBean.java b/src/java/org/apache/cassandra/auth/AuthCacheMBean.java index 43fb88e..3f6247a 100644 --- a/src/java/org/apache/cassandra/auth/AuthCacheMBean.java +++ b/src/java/org/apache/cassandra/auth/AuthCacheMBean.java @@ -33,4 +33,8 @@ public interface AuthCacheMBean public void setMaxEntries(int maxEntries); public int getMaxEntries(); + + public boolean getActiveUpdate(); + + public void setActiveUpdate(boolean update); } diff --git a/src/java/org/apache/cassandra/auth/CacheRefresher.java b/src/java/org/apache/cassandra/auth/CacheRefresher.java new file mode 100644 index 0000000..a199601 --- /dev/null +++ b/src/java/org/apache/cassandra/auth/CacheRefresher.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.auth; + +import java.util.Set; +import java.util.function.BiPredicate; +import java.util.function.BooleanSupplier; + +import com.github.benmanes.caffeine.cache.LoadingCache; +import com.google.common.annotations.VisibleForTesting; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.service.StorageService; + +public class CacheRefresher<K, V> implements Runnable +{ + private static final Logger logger = LoggerFactory.getLogger(CacheRefresher.class); + + private final String name; + private final LoadingCache<K, V> cache; + private final BiPredicate<K, V> invalidationCondition; + private final BooleanSupplier skipCondition; + + private CacheRefresher(String name, LoadingCache<K, V> cache, BiPredicate<K, V> invalidationCondition, BooleanSupplier skipCondition) + { + this.name = name; + this.cache = cache; + this.invalidationCondition = invalidationCondition; + this.skipCondition = skipCondition; + } + + public void run() + { + if (skipCondition.getAsBoolean()) + { + logger.debug("Skipping {} cache refresh", name); + return; + } + + try + { + logger.debug("Refreshing {} cache", name); + Set<K> ks = cache.asMap().keySet(); + for (K key : ks) + { + cache.refresh(key); + V value = cache.getIfPresent(key); + if (invalidationCondition.test(key, value)) + { + logger.debug("Invalidating key"); + cache.invalidate(key); + } + } + } + catch (Exception e) + { + logger.error("Unexpected exception refreshing {} cache", name, e); + } + } + + @VisibleForTesting + public static <K, V> CacheRefresher<K, V> create(String name, + LoadingCache<K, V> cache, + BiPredicate<K, V> invalidationCondition, + BooleanSupplier skipCondition) + { + logger.info("Creating CacheRefresher for {}", name); + return new CacheRefresher<>(name, cache, invalidationCondition, skipCondition); + } + + public static <K, V> CacheRefresher<K, V> create(String name, LoadingCache<K, V> cache, BiPredicate<K, V> invalidationCondition) + { + // By default we skip cache refreshes if the node has been decommed + return create(name, cache, invalidationCondition, StorageService.instance::isDecommissioned); + } +} \ No newline at end of file diff --git a/src/java/org/apache/cassandra/auth/NetworkPermissionsCache.java b/src/java/org/apache/cassandra/auth/NetworkPermissionsCache.java index 72817a9..b2e8707 100644 --- a/src/java/org/apache/cassandra/auth/NetworkPermissionsCache.java +++ b/src/java/org/apache/cassandra/auth/NetworkPermissionsCache.java @@ -32,6 +32,8 @@ public class NetworkPermissionsCache extends AuthCache<RoleResource, DCPermissio DatabaseDescriptor::getRolesUpdateInterval, DatabaseDescriptor::setRolesCacheMaxEntries, DatabaseDescriptor::getRolesCacheMaxEntries, + DatabaseDescriptor::setRolesCacheActiveUpdate, + DatabaseDescriptor::getRolesCacheActiveUpdate, authorizer::authorize, () -> DatabaseDescriptor.getAuthenticator().requireAuthentication()); diff --git a/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java b/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java index 098ed9f..ac1dbb9 100644 --- a/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java +++ b/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java @@ -60,6 +60,9 @@ public class PasswordAuthenticator implements IAuthenticator { private static final Logger logger = LoggerFactory.getLogger(PasswordAuthenticator.class); + /** We intentionally use an empty string sentinel to allow object equality comparison */ + private static final String NO_SUCH_CREDENTIAL = ""; + // name of the hash column. private static final String SALTED_HASH = "salted_hash"; @@ -95,6 +98,27 @@ public class PasswordAuthenticator implements IAuthenticator private AuthenticatedUser authenticate(String username, String password) throws AuthenticationException { String hash = cache.get(username); + + // intentional use of object equality + if (hash == NO_SUCH_CREDENTIAL) + { + // The cache was unable to load credentials via queryHashedPassword, probably because the supplied + // rolename doesn't exist. If caching is enabled we will have now cached the sentinel value for that key + // so we should invalidate it otherwise the cache will continue to serve that until it expires which + // will be a problem if the role is added in the meantime. + // + // We can't just throw the AuthenticationException directly from queryHashedPassword for a similar reason: + // if an existing role is dropped and active updates are enabled for the cache, the refresh in + // CacheRefresher::run will log and swallow the exception and keep serving the stale credentials until they + // eventually expire. + // + // So whenever we encounter the sentinal value, here and also in CacheRefresher (if active updates are + // enabled), we manually expunge the key from the cache. If caching is not enabled, AuthCache::invalidate + // is a safe no-op. + cache.invalidateCredentials(username); + throw new AuthenticationException(String.format("Provided username %s and/or password are incorrect", username)); + } + if (!checkpw(password, hash)) throw new AuthenticationException(String.format("Provided username %s and/or password are incorrect", username)); @@ -111,14 +135,15 @@ public class PasswordAuthenticator implements IAuthenticator ResultMessage.Rows rows = select(authenticateStatement, options); // If either a non-existent role name was supplied, or no credentials - // were found for that role we don't want to cache the result so we throw - // an exception. + // were found for that role, we don't want to cache the result so we + // return a sentinel value. On receiving the sentinel, the caller can + // invalidate the cache and throw an appropriate exception. if (rows.result.isEmpty()) - throw new AuthenticationException(String.format("Provided username %s and/or password are incorrect", username)); + return NO_SUCH_CREDENTIAL; UntypedResultSet result = UntypedResultSet.create(rows.result); if (!result.one().has(SALTED_HASH)) - throw new AuthenticationException(String.format("Provided username %s and/or password are incorrect", username)); + return NO_SUCH_CREDENTIAL; return result.one().getString(SALTED_HASH); } @@ -257,8 +282,12 @@ public class PasswordAuthenticator implements IAuthenticator DatabaseDescriptor::getCredentialsUpdateInterval, DatabaseDescriptor::setCredentialsCacheMaxEntries, DatabaseDescriptor::getCredentialsCacheMaxEntries, + DatabaseDescriptor::setCredentialsCacheActiveUpdate, + DatabaseDescriptor::getCredentialsCacheActiveUpdate, authenticator::queryHashedPassword, - () -> true); + () -> true, + (k,v) -> NO_SUCH_CREDENTIAL == v); // use a known object as a sentinel value. CacheRefresher will + // invalidate the key if the sentinel is loaded during a refresh } public void invalidateCredentials(String roleName) diff --git a/src/java/org/apache/cassandra/auth/PermissionsCache.java b/src/java/org/apache/cassandra/auth/PermissionsCache.java index a649c35..fd1fce8 100644 --- a/src/java/org/apache/cassandra/auth/PermissionsCache.java +++ b/src/java/org/apache/cassandra/auth/PermissionsCache.java @@ -34,6 +34,8 @@ public class PermissionsCache extends AuthCache<Pair<AuthenticatedUser, IResourc DatabaseDescriptor::getPermissionsUpdateInterval, DatabaseDescriptor::setPermissionsCacheMaxEntries, DatabaseDescriptor::getPermissionsCacheMaxEntries, + DatabaseDescriptor::setPermissionsCacheActiveUpdate, + DatabaseDescriptor::getPermissionsCacheActiveUpdate, (p) -> authorizer.authorize(p.left, p.right), () -> DatabaseDescriptor.getAuthorizer().requireAuthorization()); } diff --git a/src/java/org/apache/cassandra/auth/RolesCache.java b/src/java/org/apache/cassandra/auth/RolesCache.java index 62fecfb..05a5759 100644 --- a/src/java/org/apache/cassandra/auth/RolesCache.java +++ b/src/java/org/apache/cassandra/auth/RolesCache.java @@ -34,6 +34,8 @@ public class RolesCache extends AuthCache<RoleResource, Set<Role>> implements Ro DatabaseDescriptor::getRolesUpdateInterval, DatabaseDescriptor::setRolesCacheMaxEntries, DatabaseDescriptor::getRolesCacheMaxEntries, + DatabaseDescriptor::setRolesCacheActiveUpdate, + DatabaseDescriptor::getRolesCacheActiveUpdate, roleManager::getRoleDetails, enableCache); } diff --git a/src/java/org/apache/cassandra/auth/jmx/AuthorizationProxy.java b/src/java/org/apache/cassandra/auth/jmx/AuthorizationProxy.java index 9179062..77bd5c0 100644 --- a/src/java/org/apache/cassandra/auth/jmx/AuthorizationProxy.java +++ b/src/java/org/apache/cassandra/auth/jmx/AuthorizationProxy.java @@ -489,6 +489,8 @@ public class AuthorizationProxy implements InvocationHandler DatabaseDescriptor::getPermissionsUpdateInterval, DatabaseDescriptor::setPermissionsCacheMaxEntries, DatabaseDescriptor::getPermissionsCacheMaxEntries, + DatabaseDescriptor::setPermissionsCacheActiveUpdate, + DatabaseDescriptor::getPermissionsCacheActiveUpdate, AuthorizationProxy::loadPermissions, () -> true); diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index 0dc4180..5a1d048 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -59,12 +59,15 @@ public class Config public volatile int permissions_validity_in_ms = 2000; public volatile int permissions_cache_max_entries = 1000; public volatile int permissions_update_interval_in_ms = -1; + public volatile boolean permissions_cache_active_update = false; public volatile int roles_validity_in_ms = 2000; public volatile int roles_cache_max_entries = 1000; public volatile int roles_update_interval_in_ms = -1; + public volatile boolean roles_cache_active_update = false; public volatile int credentials_validity_in_ms = 2000; public volatile int credentials_cache_max_entries = 1000; public volatile int credentials_update_interval_in_ms = -1; + public volatile boolean credentials_cache_active_update = false; /* Hashing strategy Random or OPHF */ public String partitioner; diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index d246fc7..3ef1603 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -1286,6 +1286,16 @@ public class DatabaseDescriptor return conf.permissions_cache_max_entries = maxEntries; } + public static boolean getPermissionsCacheActiveUpdate() + { + return conf.permissions_cache_active_update; + } + + public static void setPermissionsCacheActiveUpdate(boolean update) + { + conf.permissions_cache_active_update = update; + } + public static int getRolesValidity() { return conf.roles_validity_in_ms; @@ -1303,6 +1313,16 @@ public class DatabaseDescriptor : conf.roles_update_interval_in_ms; } + public static void setRolesCacheActiveUpdate(boolean update) + { + conf.roles_cache_active_update = update; + } + + public static boolean getRolesCacheActiveUpdate() + { + return conf.roles_cache_active_update; + } + public static void setRolesUpdateInterval(int interval) { conf.roles_update_interval_in_ms = interval; @@ -1350,6 +1370,16 @@ public class DatabaseDescriptor return conf.credentials_cache_max_entries = maxEntries; } + public static boolean getCredentialsCacheActiveUpdate() + { + return conf.credentials_cache_active_update; + } + + public static void setCredentialsCacheActiveUpdate(boolean update) + { + conf.credentials_cache_active_update = update; + } + public static int getMaxValueSize() { return conf.max_value_size_in_mb * 1024 * 1024; diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 5015f75..c1ad9cb 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -4974,6 +4974,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE return operationMode == Mode.NORMAL; } + public boolean isDecommissioned() + { + return operationMode == Mode.DECOMMISSIONED; + } + public String getDrainProgress() { return String.format("Drained %s/%s ColumnFamilies", remainingCFs, totalCFs); diff --git a/test/unit/org/apache/cassandra/auth/AuthCacheTest.java b/test/unit/org/apache/cassandra/auth/AuthCacheTest.java index da97225..15e6b1f 100644 --- a/test/unit/org/apache/cassandra/auth/AuthCacheTest.java +++ b/test/unit/org/apache/cassandra/auth/AuthCacheTest.java @@ -258,6 +258,8 @@ public class AuthCacheTest () -> 1000, (maxEntries) -> {}, () -> 10, + (updateActiveUpdate) -> {}, + () -> false, loadFunction, cacheEnabledDelegate); } diff --git a/test/unit/org/apache/cassandra/auth/CacheRefresherTest.java b/test/unit/org/apache/cassandra/auth/CacheRefresherTest.java new file mode 100644 index 0000000..3340d82 --- /dev/null +++ b/test/unit/org/apache/cassandra/auth/CacheRefresherTest.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.auth; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BooleanSupplier; + +import com.google.common.util.concurrent.MoreExecutors; + +import com.github.benmanes.caffeine.cache.CacheLoader; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; +import org.junit.Assert; +import org.junit.Test; + +public class CacheRefresherTest +{ + @Test + public void refresh() throws Exception + { + Map<String, String> src = new HashMap<>(); + CacheLoader<String, String> loader = src::get; + + // Supply the directExecutor so the refresh() call executes within the refresher task like AuthCache (rather than async) + LoadingCache<String, String> cache = Caffeine.newBuilder() + .executor(MoreExecutors.directExecutor()) + .build(loader); + + AtomicBoolean skipRefresh = new AtomicBoolean(false); + BooleanSupplier skipCondition = skipRefresh::get; + + CacheRefresher<String, String> refresher = CacheRefresher.create("test", cache, (k, v) -> v.equals("removed"), skipCondition); + src.put("some", "thing"); + Assert.assertEquals("thing", cache.get("some")); + + // Cache should still have old value... + src.put("some", "one"); + Assert.assertEquals("thing", cache.get("some")); + + // ... but refresher should update it + refresher.run(); + Assert.assertEquals("one", cache.get("some")); + + // If we just remove the value from the src, the cache should still contain it + src.remove("some"); + Assert.assertEquals("one", cache.get("some")); + + // If we insert the special sentinel value into the src, the refresher will invalidate it from the cache. + // This time when it's removed from the underlying storage, it's not returned from the cache + src.put("some", "removed"); + refresher.run(); + src.remove("some"); + + // Remove from src + Assert.assertNull(cache.get("some")); + + // If the skip condition returns true, don't refresh + src.put("some", "one"); + Assert.assertEquals("one", cache.get("some")); + skipRefresh.set(true); + src.put("some", "body"); + refresher.run(); + Assert.assertEquals("one", cache.get("some")); + + // Change the skip condition back to false and refresh + skipRefresh.set(false); + refresher.run(); + Assert.assertEquals("body", cache.get("some")); + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org