This is an automated email from the ASF dual-hosted git repository. jmckenzie pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new 09b282d1fd Rate-limit new client connection auth setup to avoid overwhelming bcrypt 09b282d1fd is described below commit 09b282d1fdd7d6d62542137003011d144c0227be Author: Josh McKenzie <jmcken...@apache.org> AuthorDate: Thu Aug 11 14:02:27 2022 -0400 Rate-limit new client connection auth setup to avoid overwhelming bcrypt Patch by Chris Lohfink; reviewed by Caleb Rackliffe, Yifan Cai, and Josh McKenzie for CASSANDRA-17812 Co-authored-by: Chris Lohfink <clohf...@apple.com> Co-authored-by: Josh McKenzie <jmcken...@apache.org> --- CHANGES.txt | 1 + src/java/org/apache/cassandra/config/Config.java | 2 + .../cassandra/config/DatabaseDescriptor.java | 16 ++ .../org/apache/cassandra/transport/Dispatcher.java | 47 ++++-- src/java/org/apache/cassandra/utils/Shared.java | 2 +- .../cassandra/transport/MessageDispatcherTest.java | 172 +++++++++++++++++++++ 6 files changed, 228 insertions(+), 12 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 489d2d8845..3aaaf8b38e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.2 + * Rate-limit new client connection auth setup to avoid overwhelming bcrypt (CASSANDRA-17812) * DataOutputBuffer#scratchBuffer can use off-heap or on-heap memory as a means to control memory allocations (CASSANDRA-16471) * Add ability to read the TTLs and write times of the elements of a collection and/or UDT (CASSANDRA-8877) * Removed Python < 2.7 support from formatting.py (CASSANDRA-17694) diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index 68091ac90f..bdca0a7df1 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -266,6 +266,8 @@ public class Config public int native_transport_max_threads = 128; @Replaces(oldName = "native_transport_max_frame_size_in_mb", converter = Converters.MEBIBYTES_DATA_STORAGE_INT, deprecated = true) public DataStorageSpec.IntMebibytesBound native_transport_max_frame_size = new DataStorageSpec.IntMebibytesBound("16MiB"); + /** do bcrypt hashing in a limited pool to prevent cpu load spikes; note: any value < 1 will be set to 1 on init **/ + public int native_transport_max_auth_threads = 4; public volatile long native_transport_max_concurrent_connections = -1L; public volatile long native_transport_max_concurrent_connections_per_ip = -1L; public boolean native_transport_flush_in_batches_legacy = false; diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 1ce16052fe..0a9036f632 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -2573,6 +2573,22 @@ public class DatabaseDescriptor conf.native_transport_max_threads = max_threads; } + public static Integer getNativeTransportMaxAuthThreads() + { + return conf.native_transport_max_auth_threads; + } + + /** + * If this value is set to <= 0 it will move auth requests to the standard request pool regardless of the current + * size of the {@link org.apache.cassandra.transport.Dispatcher#authExecutor}'s active size. + * + * see {@link org.apache.cassandra.transport.Dispatcher#dispatch} for executor selection + */ + public static void setNativeTransportMaxAuthThreads(int threads) + { + conf.native_transport_max_auth_threads = threads; + } + public static int getNativeTransportMaxFrameSize() { return conf.native_transport_max_frame_size.toBytes(); diff --git a/src/java/org/apache/cassandra/transport/Dispatcher.java b/src/java/org/apache/cassandra/transport/Dispatcher.java index 8f8a607c77..f21acc2c6d 100644 --- a/src/java/org/apache/cassandra/transport/Dispatcher.java +++ b/src/java/org/apache/cassandra/transport/Dispatcher.java @@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Predicate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,11 +52,31 @@ import static org.apache.cassandra.concurrent.SharedExecutorPool.SHARED; public class Dispatcher { private static final Logger logger = LoggerFactory.getLogger(Dispatcher.class); - - private static final LocalAwareExecutorPlus requestExecutor = SHARED.newExecutor(DatabaseDescriptor.getNativeTransportMaxThreads(), - DatabaseDescriptor::setNativeTransportMaxThreads, - "transport", - "Native-Transport-Requests"); + + @VisibleForTesting + static final LocalAwareExecutorPlus requestExecutor = SHARED.newExecutor(DatabaseDescriptor.getNativeTransportMaxThreads(), + DatabaseDescriptor::setNativeTransportMaxThreads, + "transport", + "Native-Transport-Requests"); + + /** CASSANDRA-17812: Rate-limit new client connection setup to avoid overwhelming during bcrypt + * + * authExecutor is a separate thread pool for handling requests on connections that need to be authenticated. + * Calls to AUTHENTICATE can be expensive if the number of rounds for bcrypt is configured to a high value, + * so during a connection storm checking the password hash would starve existing connected clients for CPU and + * trigger timeouts if on the same thread pool as standard requests. + * + * Moving authentication requests to a small, separate pool prevents starvation handling all other + * requests. If the authExecutor pool backs up, it may cause authentication timeouts but the clients should + * back off and retry while the rest of the system continues to make progress. + * + * Setting less than 1 will service auth requests on the standard {@link Dispatcher#requestExecutor} + */ + @VisibleForTesting + static final LocalAwareExecutorPlus authExecutor = SHARED.newExecutor(Math.max(1, DatabaseDescriptor.getNativeTransportMaxAuthThreads()), + DatabaseDescriptor::setNativeTransportMaxAuthThreads, + "transport", + "Native-Transport-Auth-Requests"); private static final ConcurrentMap<EventLoop, Flusher> flusherLookup = new ConcurrentHashMap<>(); private final boolean useLegacyFlusher; @@ -80,7 +101,14 @@ public class Dispatcher public void dispatch(Channel channel, Message.Request request, FlushItemConverter forFlusher, Overload backpressure) { - requestExecutor.submit(new RequestProcessor(channel, request, forFlusher, backpressure)); + // if native_transport_max_auth_threads is < 1, don't delegate to new pool on auth messages + boolean isAuthQuery = DatabaseDescriptor.getNativeTransportMaxAuthThreads() > 0 && + (request.type == Message.Type.AUTH_RESPONSE || request.type == Message.Type.CREDENTIALS); + + // Importantly, the authExecutor will handle the AUTHENTICATE message which may be CPU intensive. + LocalAwareExecutorPlus executor = isAuthQuery ? authExecutor : requestExecutor; + + executor.submit(new RequestProcessor(channel, request, forFlusher, backpressure)); ClientMetrics.instance.markRequestDispatched(); } @@ -233,13 +261,10 @@ public class Dispatcher public static void shutdown() { - if (requestExecutor != null) - { - requestExecutor.shutdown(); - } + requestExecutor.shutdown(); + authExecutor.shutdown(); } - /** * Dispatcher for EventMessages. In {@link Server.ConnectionTracker#send(Event)}, the strategy * for delivering events to registered clients is dependent on protocol version and the configuration diff --git a/src/java/org/apache/cassandra/utils/Shared.java b/src/java/org/apache/cassandra/utils/Shared.java index e576c8676c..6433624911 100644 --- a/src/java/org/apache/cassandra/utils/Shared.java +++ b/src/java/org/apache/cassandra/utils/Shared.java @@ -24,7 +24,7 @@ import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; /** - * Tells jvm-dtest that a class should be shared accross all {@link ClassLoader}s. + * Tells jvm-dtest that a class should be shared across all {@link ClassLoader}s. * * Jvm-dtest relies on classloader isolation to run multiple cassandra instances in the same JVM, this makes it * so some classes do not get shared (outside a blesssed set of classes/packages). When the default behavior diff --git a/test/unit/org/apache/cassandra/transport/MessageDispatcherTest.java b/test/unit/org/apache/cassandra/transport/MessageDispatcherTest.java new file mode 100644 index 0000000000..0c70315e25 --- /dev/null +++ b/test/unit/org/apache/cassandra/transport/MessageDispatcherTest.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.transport; + +import java.util.Collections; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +import com.google.common.util.concurrent.Uninterruptibles; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import io.netty.channel.Channel; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.metrics.ClientMetrics; +import org.apache.cassandra.service.QueryState; +import org.apache.cassandra.transport.messages.AuthResponse; + +public class MessageDispatcherTest +{ + static final Message.Request AUTH_RESPONSE_REQUEST = new AuthResponse(new byte[0]) + { + public Response execute(QueryState queryState, long queryStartNanoTime, boolean traceRequest) + { + return null; + } + }; + + private static AuthTestDispatcher dispatch; + private static int maxAuthThreadsBeforeTests; + + @BeforeClass + public static void init() throws Exception + { + DatabaseDescriptor.daemonInitialization(); + ClientMetrics.instance.init(Collections.emptyList()); + maxAuthThreadsBeforeTests = DatabaseDescriptor.getNativeTransportMaxAuthThreads(); + dispatch = new AuthTestDispatcher(); + } + + @AfterClass + public static void restoreAuthSize() + { + DatabaseDescriptor.setNativeTransportMaxAuthThreads(maxAuthThreadsBeforeTests); + } + + @Test + public void testAuthRateLimiter() throws Exception + { + long startRequests = completedRequests(); + + DatabaseDescriptor.setNativeTransportMaxAuthThreads(1); + long auths = tryAuth(this::completedAuth); + Assert.assertEquals(auths, 1); + + DatabaseDescriptor.setNativeTransportMaxAuthThreads(100); + auths = tryAuth(this::completedAuth); + Assert.assertEquals(auths, 1); + + // Make sure no tasks executed on the regular pool + Assert.assertEquals(startRequests, completedRequests()); + } + + @Test + public void testAuthRateLimiterNotUsed() throws Exception + { + DatabaseDescriptor.setNativeTransportMaxAuthThreads(1); + for (Message.Type type : Message.Type.values()) + { + if (type == Message.Type.AUTH_RESPONSE || type == Message.Type.CREDENTIALS || type.direction != Message.Direction.REQUEST) + continue; + + long auths = completedAuth(); + long requests = tryAuth(this::completedRequests, new Message.Request(type) + { + public Response execute(QueryState queryState, long queryStartNanoTime, boolean traceRequest) + { + return null; + } + }); + Assert.assertEquals(requests, 1); + Assert.assertEquals(completedAuth() - auths, 0); + } + } + + @Test + public void testAuthRateLimiterDisabled() throws Exception + { + long startAuthRequests = completedAuth(); + + DatabaseDescriptor.setNativeTransportMaxAuthThreads(0); + long requests = tryAuth(this::completedRequests); + Assert.assertEquals(requests, 1); + + DatabaseDescriptor.setNativeTransportMaxAuthThreads(-1); + requests = tryAuth(this::completedRequests); + Assert.assertEquals(requests, 1); + + DatabaseDescriptor.setNativeTransportMaxAuthThreads(-1000); + requests = tryAuth(this::completedRequests); + Assert.assertEquals(requests, 1); + + // Make sure no tasks executed on the auth pool + Assert.assertEquals(startAuthRequests, completedAuth()); + } + + private long completedRequests() + { + return Dispatcher.requestExecutor.getCompletedTaskCount(); + } + + private long completedAuth() + { + return Dispatcher.authExecutor.getCompletedTaskCount(); + } + + public long tryAuth(Callable<Long> check) throws Exception + { + return tryAuth(check, AUTH_RESPONSE_REQUEST); + } + + @SuppressWarnings("UnstableApiUsage") + public long tryAuth(Callable<Long> check, Message.Request request) throws Exception + { + long start = check.call(); + dispatch.dispatch(null, request, (channel,req,response) -> null, ClientResourceLimits.Overload.NONE); + + // While this is timeout based, we should be *well below* a full second on any of this processing in any sane environment. + long timeout = System.currentTimeMillis(); + while(start == check.call() && System.currentTimeMillis() - timeout < 1000) + { + Uninterruptibles.sleepUninterruptibly(10, TimeUnit.MILLISECONDS); + } + return check.call() - start; + } + + public static class AuthTestDispatcher extends Dispatcher + { + public AuthTestDispatcher() + { + super(false); + } + + @Override + void processRequest(Channel channel, + Message.Request request, + FlushItemConverter forFlusher, + ClientResourceLimits.Overload backpressure, + long approxStartTimeNanos) + { + // noop + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org