This is an automated email from the ASF dual-hosted git repository.

jmckenzie pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 09b282d1fd Rate-limit new client connection auth setup to avoid 
overwhelming bcrypt
09b282d1fd is described below

commit 09b282d1fdd7d6d62542137003011d144c0227be
Author: Josh McKenzie <jmcken...@apache.org>
AuthorDate: Thu Aug 11 14:02:27 2022 -0400

    Rate-limit new client connection auth setup to avoid overwhelming bcrypt
    
    Patch by Chris Lohfink; reviewed by Caleb Rackliffe, Yifan Cai, and Josh 
McKenzie for CASSANDRA-17812
    
    Co-authored-by: Chris Lohfink <clohf...@apple.com>
    Co-authored-by: Josh McKenzie <jmcken...@apache.org>
---
 CHANGES.txt                                        |   1 +
 src/java/org/apache/cassandra/config/Config.java   |   2 +
 .../cassandra/config/DatabaseDescriptor.java       |  16 ++
 .../org/apache/cassandra/transport/Dispatcher.java |  47 ++++--
 src/java/org/apache/cassandra/utils/Shared.java    |   2 +-
 .../cassandra/transport/MessageDispatcherTest.java | 172 +++++++++++++++++++++
 6 files changed, 228 insertions(+), 12 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 489d2d8845..3aaaf8b38e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.2
+ * Rate-limit new client connection auth setup to avoid overwhelming bcrypt 
(CASSANDRA-17812)
  * DataOutputBuffer#scratchBuffer can use off-heap or on-heap memory as a 
means to control memory allocations (CASSANDRA-16471)
  * Add ability to read the TTLs and write times of the elements of a 
collection and/or UDT (CASSANDRA-8877)
  * Removed Python < 2.7 support from formatting.py (CASSANDRA-17694)
diff --git a/src/java/org/apache/cassandra/config/Config.java 
b/src/java/org/apache/cassandra/config/Config.java
index 68091ac90f..bdca0a7df1 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -266,6 +266,8 @@ public class Config
     public int native_transport_max_threads = 128;
     @Replaces(oldName = "native_transport_max_frame_size_in_mb", converter = 
Converters.MEBIBYTES_DATA_STORAGE_INT, deprecated = true)
     public DataStorageSpec.IntMebibytesBound native_transport_max_frame_size = 
new DataStorageSpec.IntMebibytesBound("16MiB");
+    /** do bcrypt hashing in a limited pool to prevent cpu load spikes; note: 
any value < 1 will be set to 1 on init **/
+    public int native_transport_max_auth_threads = 4;
     public volatile long native_transport_max_concurrent_connections = -1L;
     public volatile long native_transport_max_concurrent_connections_per_ip = 
-1L;
     public boolean native_transport_flush_in_batches_legacy = false;
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 1ce16052fe..0a9036f632 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -2573,6 +2573,22 @@ public class DatabaseDescriptor
         conf.native_transport_max_threads = max_threads;
     }
 
+    public static Integer getNativeTransportMaxAuthThreads()
+    {
+        return conf.native_transport_max_auth_threads;
+    }
+
+    /**
+     * If this value is set to <= 0 it will move auth requests to the standard 
request pool regardless of the current
+     * size of the {@link 
org.apache.cassandra.transport.Dispatcher#authExecutor}'s active size.
+     *
+     * see {@link org.apache.cassandra.transport.Dispatcher#dispatch} for 
executor selection
+     */
+    public static void setNativeTransportMaxAuthThreads(int threads)
+    {
+        conf.native_transport_max_auth_threads = threads;
+    }
+
     public static int getNativeTransportMaxFrameSize()
     {
         return conf.native_transport_max_frame_size.toBytes();
diff --git a/src/java/org/apache/cassandra/transport/Dispatcher.java 
b/src/java/org/apache/cassandra/transport/Dispatcher.java
index 8f8a607c77..f21acc2c6d 100644
--- a/src/java/org/apache/cassandra/transport/Dispatcher.java
+++ b/src/java/org/apache/cassandra/transport/Dispatcher.java
@@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Predicate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -51,11 +52,31 @@ import static 
org.apache.cassandra.concurrent.SharedExecutorPool.SHARED;
 public class Dispatcher
 {
     private static final Logger logger = 
LoggerFactory.getLogger(Dispatcher.class);
-    
-    private static final LocalAwareExecutorPlus requestExecutor = 
SHARED.newExecutor(DatabaseDescriptor.getNativeTransportMaxThreads(),
-                                                                               
      DatabaseDescriptor::setNativeTransportMaxThreads,
-                                                                               
      "transport",
-                                                                               
      "Native-Transport-Requests");
+
+    @VisibleForTesting
+    static final LocalAwareExecutorPlus requestExecutor = 
SHARED.newExecutor(DatabaseDescriptor.getNativeTransportMaxThreads(),
+                                                                             
DatabaseDescriptor::setNativeTransportMaxThreads,
+                                                                             
"transport",
+                                                                             
"Native-Transport-Requests");
+
+    /** CASSANDRA-17812: Rate-limit new client connection setup to avoid 
overwhelming during bcrypt
+     *
+     * authExecutor is a separate thread pool for handling requests on 
connections that need to be authenticated.
+     * Calls to AUTHENTICATE can be expensive if the number of rounds for 
bcrypt is configured to a high value,
+     * so during a connection storm checking the password hash would starve 
existing connected clients for CPU and
+     * trigger timeouts if on the same thread pool as standard requests.
+     *
+     * Moving authentication requests to a small, separate pool prevents 
starvation handling all other
+     * requests. If the authExecutor pool backs up, it may cause 
authentication timeouts but the clients should
+     * back off and retry while the rest of the system continues to make 
progress.
+     *
+     * Setting less than 1 will service auth requests on the standard {@link 
Dispatcher#requestExecutor}
+     */
+    @VisibleForTesting
+    static final LocalAwareExecutorPlus authExecutor = 
SHARED.newExecutor(Math.max(1, 
DatabaseDescriptor.getNativeTransportMaxAuthThreads()),
+                                                                          
DatabaseDescriptor::setNativeTransportMaxAuthThreads,
+                                                                          
"transport",
+                                                                          
"Native-Transport-Auth-Requests");
 
     private static final ConcurrentMap<EventLoop, Flusher> flusherLookup = new 
ConcurrentHashMap<>();
     private final boolean useLegacyFlusher;
@@ -80,7 +101,14 @@ public class Dispatcher
 
     public void dispatch(Channel channel, Message.Request request, 
FlushItemConverter forFlusher, Overload backpressure)
     {
-        requestExecutor.submit(new RequestProcessor(channel, request, 
forFlusher, backpressure));
+        // if native_transport_max_auth_threads is < 1, don't delegate to new 
pool on auth messages
+        boolean isAuthQuery = 
DatabaseDescriptor.getNativeTransportMaxAuthThreads() > 0 &&
+                              (request.type == Message.Type.AUTH_RESPONSE || 
request.type == Message.Type.CREDENTIALS);
+
+        // Importantly, the authExecutor will handle the AUTHENTICATE message 
which may be CPU intensive.
+        LocalAwareExecutorPlus executor = isAuthQuery ? authExecutor : 
requestExecutor;
+
+        executor.submit(new RequestProcessor(channel, request, forFlusher, 
backpressure));
         ClientMetrics.instance.markRequestDispatched();
     }
 
@@ -233,13 +261,10 @@ public class Dispatcher
 
     public static void shutdown()
     {
-        if (requestExecutor != null)
-        {
-            requestExecutor.shutdown();
-        }
+        requestExecutor.shutdown();
+        authExecutor.shutdown();
     }
 
-
     /**
      * Dispatcher for EventMessages. In {@link 
Server.ConnectionTracker#send(Event)}, the strategy
      * for delivering events to registered clients is dependent on protocol 
version and the configuration
diff --git a/src/java/org/apache/cassandra/utils/Shared.java 
b/src/java/org/apache/cassandra/utils/Shared.java
index e576c8676c..6433624911 100644
--- a/src/java/org/apache/cassandra/utils/Shared.java
+++ b/src/java/org/apache/cassandra/utils/Shared.java
@@ -24,7 +24,7 @@ import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
 
 /**
- * Tells jvm-dtest that a class should be shared accross all {@link 
ClassLoader}s.
+ * Tells jvm-dtest that a class should be shared across all {@link 
ClassLoader}s.
  *
  * Jvm-dtest relies on classloader isolation to run multiple cassandra 
instances in the same JVM, this makes it
  * so some classes do not get shared (outside a blesssed set of 
classes/packages). When the default behavior
diff --git 
a/test/unit/org/apache/cassandra/transport/MessageDispatcherTest.java 
b/test/unit/org/apache/cassandra/transport/MessageDispatcherTest.java
new file mode 100644
index 0000000000..0c70315e25
--- /dev/null
+++ b/test/unit/org/apache/cassandra/transport/MessageDispatcherTest.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.transport;
+
+import java.util.Collections;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import io.netty.channel.Channel;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.metrics.ClientMetrics;
+import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.transport.messages.AuthResponse;
+
+public class MessageDispatcherTest
+{
+    static final Message.Request AUTH_RESPONSE_REQUEST = new AuthResponse(new 
byte[0])
+    {
+        public Response execute(QueryState queryState, long 
queryStartNanoTime, boolean traceRequest)
+        {
+            return null;
+        }
+    };
+
+    private static AuthTestDispatcher dispatch;
+    private static int maxAuthThreadsBeforeTests;
+
+    @BeforeClass
+    public static void init() throws Exception
+    {
+        DatabaseDescriptor.daemonInitialization();
+        ClientMetrics.instance.init(Collections.emptyList());
+        maxAuthThreadsBeforeTests = 
DatabaseDescriptor.getNativeTransportMaxAuthThreads();
+        dispatch = new AuthTestDispatcher();
+    }
+
+    @AfterClass
+    public static void restoreAuthSize()
+    {
+        
DatabaseDescriptor.setNativeTransportMaxAuthThreads(maxAuthThreadsBeforeTests);
+    }
+
+    @Test
+    public void testAuthRateLimiter() throws Exception
+    {
+        long startRequests = completedRequests();
+
+        DatabaseDescriptor.setNativeTransportMaxAuthThreads(1);
+        long auths = tryAuth(this::completedAuth);
+        Assert.assertEquals(auths, 1);
+
+        DatabaseDescriptor.setNativeTransportMaxAuthThreads(100);
+        auths = tryAuth(this::completedAuth);
+        Assert.assertEquals(auths, 1);
+
+        // Make sure no tasks executed on the regular pool
+        Assert.assertEquals(startRequests, completedRequests());
+    }
+
+    @Test
+    public void testAuthRateLimiterNotUsed() throws Exception
+    {
+        DatabaseDescriptor.setNativeTransportMaxAuthThreads(1);
+        for (Message.Type type : Message.Type.values())
+        {
+            if (type == Message.Type.AUTH_RESPONSE || type == 
Message.Type.CREDENTIALS || type.direction != Message.Direction.REQUEST)
+                continue;
+
+            long auths = completedAuth();
+            long requests = tryAuth(this::completedRequests, new 
Message.Request(type)
+            {
+                public Response execute(QueryState queryState, long 
queryStartNanoTime, boolean traceRequest)
+                {
+                    return null;
+                }
+            });
+            Assert.assertEquals(requests, 1);
+            Assert.assertEquals(completedAuth() - auths, 0);
+        }
+    }
+
+    @Test
+    public void testAuthRateLimiterDisabled() throws Exception
+    {
+        long startAuthRequests = completedAuth();
+
+        DatabaseDescriptor.setNativeTransportMaxAuthThreads(0);
+        long requests = tryAuth(this::completedRequests);
+        Assert.assertEquals(requests, 1);
+
+        DatabaseDescriptor.setNativeTransportMaxAuthThreads(-1);
+        requests = tryAuth(this::completedRequests);
+        Assert.assertEquals(requests, 1);
+
+        DatabaseDescriptor.setNativeTransportMaxAuthThreads(-1000);
+        requests = tryAuth(this::completedRequests);
+        Assert.assertEquals(requests, 1);
+
+        // Make sure no tasks executed on the auth pool
+        Assert.assertEquals(startAuthRequests, completedAuth());
+    }
+
+    private long completedRequests()
+    {
+        return Dispatcher.requestExecutor.getCompletedTaskCount();
+    }
+
+    private long completedAuth()
+    {
+        return Dispatcher.authExecutor.getCompletedTaskCount();
+    }
+
+    public long tryAuth(Callable<Long> check) throws Exception
+    {
+        return tryAuth(check, AUTH_RESPONSE_REQUEST);
+    }
+
+    @SuppressWarnings("UnstableApiUsage")
+    public long tryAuth(Callable<Long> check, Message.Request request) throws 
Exception
+    {
+        long start = check.call();
+        dispatch.dispatch(null, request, (channel,req,response) -> null, 
ClientResourceLimits.Overload.NONE);
+
+        // While this is timeout based, we should be *well below* a full 
second on any of this processing in any sane environment.
+        long timeout = System.currentTimeMillis();
+        while(start == check.call() && System.currentTimeMillis() - timeout < 
1000)
+        {
+            Uninterruptibles.sleepUninterruptibly(10, TimeUnit.MILLISECONDS);
+        }
+        return check.call() - start;
+    }
+
+    public static class AuthTestDispatcher extends Dispatcher
+    {
+        public AuthTestDispatcher()
+        {
+            super(false);
+        }
+
+        @Override
+        void processRequest(Channel channel,
+                            Message.Request request,
+                            FlushItemConverter forFlusher,
+                            ClientResourceLimits.Overload backpressure,
+                            long approxStartTimeNanos)
+        {
+            // noop
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to