This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-3.0 by this push:
     new 5a03898  Prevent client requests from blocking on executor task queue
5a03898 is described below

commit 5a03898c680ed6ada63901e8a4b278ccc8070717
Author: sumanthpasupuleti <sumanth.pasupuleti...@gmail.com>
AuthorDate: Mon Mar 25 08:06:13 2019 -0700

    Prevent client requests from blocking on executor task queue
    
    patch by Sumanth Pasupuleti, reviewed by Benedict for CASSANDRA-15013
---
 CHANGES.txt                                        |   1 +
 doc/native_protocol_v4.spec                        |   4 +
 src/java/org/apache/cassandra/config/Config.java   |   3 +
 .../cassandra/config/DatabaseDescriptor.java       |  31 +++
 .../apache/cassandra/metrics/ClientMetrics.java    |  56 ++++-
 .../org/apache/cassandra/net/ResourceLimits.java   | 245 ++++++++++++++++++++
 .../cassandra/service/NativeTransportService.java  |  26 +--
 .../org/apache/cassandra/transport/Connection.java |  11 +
 src/java/org/apache/cassandra/transport/Frame.java |  12 +-
 .../org/apache/cassandra/transport/Message.java    | 146 +++++++++++-
 .../transport/RequestThreadPoolExecutor.java       |  96 --------
 .../org/apache/cassandra/transport/Server.java     |  64 ++++--
 .../apache/cassandra/transport/SimpleClient.java   |  10 +
 .../transport/messages/StartupMessage.java         |   3 +
 test/unit/org/apache/cassandra/cql3/CQLTester.java |   2 +
 .../service/NativeTransportServiceTest.java        |   3 +-
 .../InflightRequestPayloadTrackerTest.java         | 248 +++++++++++++++++++++
 17 files changed, 811 insertions(+), 150 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index c8bd30d..68d309c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.19
+ * Prevent client requests from blocking on executor task queue 
(CASSANDRA-15013)
  * Toughen up column drop/recreate type validations (CASSANDRA-15204)
  * LegacyLayout should handle paging states that cross a collection column 
(CASSANDRA-15201)
  * Prevent RuntimeException when username or password is empty/null 
(CASSANDRA-15198)
diff --git a/doc/native_protocol_v4.spec b/doc/native_protocol_v4.spec
index 02802a7..8beb77b 100644
--- a/doc/native_protocol_v4.spec
+++ b/doc/native_protocol_v4.spec
@@ -275,6 +275,9 @@ Table of Contents
       mode. This mode will make all Thrift and Compact Tables to be exposed as 
if
       they were CQL Tables. This is optional; if not specified, the option will
       not be used.
+    - "THROW_ON_OVERLOAD": In case of server overloaded with too many 
requests, by default the server puts
+            back pressure on the client connection. Instead, the server can 
send an OverloadedException error message back to
+            the client if this option is set to true.
 
 
 4.1.2. AUTH_RESPONSE
@@ -1175,3 +1178,4 @@ Table of Contents
   * The <paging_state> returned in the v4 protocol is not compatible with the 
v3
     protocol. In other words, a <paging_state> returned by a node using 
protocol v4
     should not be used to query a node using protocol v3 (and vice-versa).
+  * Added THROW_ON_OVERLOAD startup option (Section 4.1.1).
diff --git a/src/java/org/apache/cassandra/config/Config.java 
b/src/java/org/apache/cassandra/config/Config.java
index de158bd..830d3e1 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -154,6 +154,9 @@ public class Config
     public volatile Long native_transport_max_concurrent_connections = -1L;
     public volatile Long native_transport_max_concurrent_connections_per_ip = 
-1L;
     public boolean native_transport_flush_in_batches_legacy = true;
+    public volatile long 
native_transport_max_concurrent_requests_in_bytes_per_ip = -1L;
+    public volatile long native_transport_max_concurrent_requests_in_bytes = 
-1L;
+
 
     @Deprecated
     public Integer thrift_max_message_length_in_mb = 16;
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index db55c20..8417c39 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -470,6 +470,17 @@ public class DatabaseDescriptor
         {
             throw new ConfigurationException("Missing endpoint_snitch 
directive", false);
         }
+
+        if (conf.native_transport_max_concurrent_requests_in_bytes <= 0)
+        {
+            conf.native_transport_max_concurrent_requests_in_bytes = 
Runtime.getRuntime().maxMemory() / 10;
+        }
+
+        if (conf.native_transport_max_concurrent_requests_in_bytes_per_ip <= 0)
+        {
+            conf.native_transport_max_concurrent_requests_in_bytes_per_ip = 
Runtime.getRuntime().maxMemory() / 40;
+        }
+
         snitch = createEndpointSnitch(conf.endpoint_snitch);
         EndpointSnitchInfo.create();
 
@@ -1524,6 +1535,26 @@ public class DatabaseDescriptor
         conf.commitlog_sync_batch_window_in_ms = windowMillis;
     }
 
+    public static long getNativeTransportMaxConcurrentRequestsInBytesPerIp()
+    {
+        return conf.native_transport_max_concurrent_requests_in_bytes_per_ip;
+    }
+
+    public static void 
setNativeTransportMaxConcurrentRequestsInBytesPerIp(long 
maxConcurrentRequestsInBytes)
+    {
+        conf.native_transport_max_concurrent_requests_in_bytes_per_ip = 
maxConcurrentRequestsInBytes;
+    }
+
+    public static long getNativeTransportMaxConcurrentRequestsInBytes()
+    {
+        return conf.native_transport_max_concurrent_requests_in_bytes;
+    }
+
+    public static void setNativeTransportMaxConcurrentRequestsInBytes(long 
maxConcurrentRequestsInBytes)
+    {
+        conf.native_transport_max_concurrent_requests_in_bytes = 
maxConcurrentRequestsInBytes;
+    }
+
     public static int getCommitLogSyncPeriod()
     {
         return conf.commitlog_sync_period_in_ms;
diff --git a/src/java/org/apache/cassandra/metrics/ClientMetrics.java 
b/src/java/org/apache/cassandra/metrics/ClientMetrics.java
index 4a384eb..08f0531 100644
--- a/src/java/org/apache/cassandra/metrics/ClientMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/ClientMetrics.java
@@ -18,9 +18,14 @@
  */
 package org.apache.cassandra.metrics;
 
+import java.util.Collection;
+import java.util.Collections;
 import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Meter;
+import org.apache.cassandra.transport.Server;
 
 import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
 
@@ -28,13 +33,40 @@ import static 
org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
 public class ClientMetrics
 {
     private static final MetricNameFactory factory = new 
DefaultNameFactory("Client");
-    
     public static final ClientMetrics instance = new ClientMetrics();
-    
+
+    private volatile boolean initialized = false;
+
+    private Collection<Server> servers = Collections.emptyList();
+
+    private AtomicInteger pausedConnections;
+    private Gauge<Integer> pausedConnectionsGauge;
+    private Meter requestDiscarded;
+
     private ClientMetrics()
     {
     }
 
+    public void pauseConnection() { pausedConnections.incrementAndGet(); }
+    public void unpauseConnection() { pausedConnections.decrementAndGet(); }
+    public void markRequestDiscarded() { requestDiscarded.mark(); }
+
+    public synchronized void init(Collection<Server> servers)
+    {
+        if (initialized)
+            return;
+
+        this.servers = servers;
+
+        registerGauge("connectedNativeClients", this::countConnectedClients);
+
+        pausedConnections = new AtomicInteger();
+        pausedConnectionsGauge = registerGauge("PausedConnections", 
pausedConnections::get);
+        requestDiscarded = registerMeter("RequestDiscarded");
+
+        initialized = true;
+    }
+
     public void addCounter(String name, final Callable<Integer> provider)
     {
         Metrics.register(factory.createMetricName(name), new Gauge<Integer>()
@@ -51,4 +83,24 @@ public class ClientMetrics
             }
         });
     }
+
+    private int countConnectedClients()
+    {
+        int count = 0;
+
+        for (Server server : servers)
+            count += server.getConnectedClients();
+
+        return count;
+    }
+
+    private <T> Gauge<T> registerGauge(String name, Gauge<T> gauge)
+    {
+        return Metrics.register(factory.createMetricName(name), gauge);
+    }
+
+    private Meter registerMeter(String name)
+    {
+        return Metrics.meter(factory.createMetricName(name));
+    }
 }
diff --git a/src/java/org/apache/cassandra/net/ResourceLimits.java 
b/src/java/org/apache/cassandra/net/ResourceLimits.java
new file mode 100644
index 0000000..f8d24d7
--- /dev/null
+++ b/src/java/org/apache/cassandra/net/ResourceLimits.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.net;
+
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+
+public abstract class ResourceLimits
+{
+    /**
+     * Represents permits to utilise a resource and ways to allocate and 
release them.
+     *
+     * Two implementations are currently provided:
+     * 1. {@link Concurrent}, for shared limits, which is thread-safe;
+     * 2. {@link Basic}, for limits that are not shared between threads, is 
not thread-safe.
+     */
+    public interface Limit
+    {
+        /**
+         * @return total amount of permits represented by this {@link Limit} - 
the capacity
+         */
+        long limit();
+
+        /**
+         * @return remaining, unallocated permit amount
+         */
+        long remaining();
+
+        /**
+         * @return amount of permits currently in use
+         */
+        long using();
+
+        /**
+         * Attempts to allocate an amount of permits from this limit. If 
allocated, <em>MUST</em> eventually
+         * be released back with {@link #release(long)}.
+         *
+         * @return {@code true} if the allocation was successful, {@code 
false} otherwise
+         */
+        boolean tryAllocate(long amount);
+
+        /**
+         * Allocates an amount independent of permits available from this 
limit. <em>MUST</em> eventually
+         * be released back with {@link #release(long)}.
+         *
+         */
+        void allocate(long amount);
+
+        /**
+         * @param amount return the amount of permits back to this limit
+         * @return {@code ABOVE_LIMIT} if there aren't enough permits 
available even after the release, or
+         *         {@code BELOW_LIMIT} if there are enough permits available 
after the releaese.
+         */
+        Outcome release(long amount);
+    }
+
+    /**
+     * A thread-safe permit container.
+     */
+    public static class Concurrent implements Limit
+    {
+        private final long limit;
+
+        private volatile long using;
+        private static final AtomicLongFieldUpdater<Concurrent> usingUpdater =
+            AtomicLongFieldUpdater.newUpdater(Concurrent.class, "using");
+
+        public Concurrent(long limit)
+        {
+            this.limit = limit;
+        }
+
+        public long limit()
+        {
+            return limit;
+        }
+
+        public long remaining()
+        {
+            return limit - using;
+        }
+
+        public long using()
+        {
+            return using;
+        }
+
+        public boolean tryAllocate(long amount)
+        {
+            long current, next;
+            do
+            {
+                current = using;
+                next = current + amount;
+
+                if (next > limit)
+                    return false;
+            }
+            while (!usingUpdater.compareAndSet(this, current, next));
+
+            return true;
+        }
+
+        public void allocate(long amount)
+        {
+            long current, next;
+            do
+            {
+                current = using;
+                next = current + amount;
+            } while (!usingUpdater.compareAndSet(this, current, next));
+        }
+
+        public Outcome release(long amount)
+        {
+            assert amount >= 0;
+            long using = usingUpdater.addAndGet(this, -amount);
+            assert using >= 0;
+            return using >= limit ? Outcome.ABOVE_LIMIT : Outcome.BELOW_LIMIT;
+        }
+    }
+
+    /**
+     * A cheaper, thread-unsafe permit container to be used for unshared 
limits.
+     */
+    static class Basic implements Limit
+    {
+        private final long limit;
+        private long using;
+
+        Basic(long limit)
+        {
+            this.limit = limit;
+        }
+
+        public long limit()
+        {
+            return limit;
+        }
+
+        public long remaining()
+        {
+            return limit - using;
+        }
+
+        public long using()
+        {
+            return using;
+        }
+
+        public boolean tryAllocate(long amount)
+        {
+            if (using + amount > limit)
+                return false;
+
+            using += amount;
+            return true;
+        }
+
+        public void allocate(long amount)
+        {
+            using += amount;
+        }
+
+        public Outcome release(long amount)
+        {
+            assert amount >= 0 && amount <= using;
+            using -= amount;
+            return using >= limit ? Outcome.ABOVE_LIMIT : Outcome.BELOW_LIMIT;
+        }
+    }
+
+    /**
+     * A convenience class that groups a per-endpoint limit with the global one
+     * to allow allocating/releasing permits from/to both limits as one 
logical operation.
+     */
+    public static class EndpointAndGlobal
+    {
+        final Limit endpoint;
+        final Limit global;
+
+        public EndpointAndGlobal(Limit endpoint, Limit global)
+        {
+            this.endpoint = endpoint;
+            this.global = global;
+        }
+
+        public Limit endpoint()
+        {
+            return endpoint;
+        }
+
+        public Limit global()
+        {
+            return global;
+        }
+
+        /**
+         * @return {@code INSUFFICIENT_GLOBAL} if there weren't enough permits 
in the global limit, or
+         *         {@code INSUFFICIENT_ENDPOINT} if there weren't enough 
permits in the per-endpoint limit, or
+         *         {@code SUCCESS} if there were enough permits to take from 
both.
+         */
+        public Outcome tryAllocate(long amount)
+        {
+            if (!global.tryAllocate(amount))
+                return Outcome.INSUFFICIENT_GLOBAL;
+
+            if (endpoint.tryAllocate(amount))
+                return Outcome.SUCCESS;
+
+            global.release(amount);
+            return Outcome.INSUFFICIENT_ENDPOINT;
+        }
+
+        public void allocate(long amount)
+        {
+            global.allocate(amount);
+            endpoint.allocate(amount);
+        }
+
+        public Outcome release(long amount)
+        {
+            Outcome endpointReleaseOutcome = endpoint.release(amount);
+            Outcome globalReleaseOutcome = global.release(amount);
+            return (endpointReleaseOutcome == Outcome.ABOVE_LIMIT || 
globalReleaseOutcome == Outcome.ABOVE_LIMIT)
+                   ? Outcome.ABOVE_LIMIT : Outcome.BELOW_LIMIT;
+        }
+    }
+
+    public enum Outcome { SUCCESS, INSUFFICIENT_ENDPOINT, INSUFFICIENT_GLOBAL, 
BELOW_LIMIT, ABOVE_LIMIT }
+}
diff --git a/src/java/org/apache/cassandra/service/NativeTransportService.java 
b/src/java/org/apache/cassandra/service/NativeTransportService.java
index 48839f1..2280818 100644
--- a/src/java/org/apache/cassandra/service/NativeTransportService.java
+++ b/src/java/org/apache/cassandra/service/NativeTransportService.java
@@ -31,11 +31,9 @@ import io.netty.channel.EventLoopGroup;
 import io.netty.channel.epoll.Epoll;
 import io.netty.channel.epoll.EpollEventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.util.concurrent.EventExecutor;
-import io.netty.util.concurrent.Future;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.metrics.ClientMetrics;
-import org.apache.cassandra.transport.RequestThreadPoolExecutor;
+import org.apache.cassandra.transport.Message;
 import org.apache.cassandra.transport.Server;
 
 /**
@@ -50,7 +48,6 @@ public class NativeTransportService
 
     private boolean initialized = false;
     private EventLoopGroup workerGroup;
-    private EventExecutor eventExecutorGroup;
 
     /**
      * Creates netty thread pools and event loops.
@@ -61,9 +58,6 @@ public class NativeTransportService
         if (initialized)
             return;
 
-        // prepare netty resources
-        eventExecutorGroup = new RequestThreadPoolExecutor();
-
         if (useEpoll())
         {
             workerGroup = new EpollEventLoopGroup();
@@ -80,7 +74,6 @@ public class NativeTransportService
         InetAddress nativeAddr = DatabaseDescriptor.getRpcAddress();
 
         org.apache.cassandra.transport.Server.Builder builder = new 
org.apache.cassandra.transport.Server.Builder()
-                                                                
.withEventExecutor(eventExecutorGroup)
                                                                 
.withEventLoopGroup(workerGroup)
                                                                 
.withHost(nativeAddr);
 
@@ -108,13 +101,7 @@ public class NativeTransportService
         }
 
         // register metrics
-        ClientMetrics.instance.addCounter("connectedNativeClients", () ->
-        {
-            int ret = 0;
-            for (Server server : servers)
-                ret += server.getConnectedClients();
-            return ret;
-        });
+        ClientMetrics.instance.init(servers);
 
         initialized = true;
     }
@@ -147,8 +134,7 @@ public class NativeTransportService
         // shutdown executors used by netty for native transport server
         workerGroup.shutdownGracefully(3, 5, 
TimeUnit.SECONDS).awaitUninterruptibly();
 
-        // shutdownGracefully not implemented yet in RequestThreadPoolExecutor
-        eventExecutorGroup.shutdown();
+        Message.Dispatcher.shutdown();
     }
 
     /**
@@ -177,12 +163,6 @@ public class NativeTransportService
     }
 
     @VisibleForTesting
-    EventExecutor getEventExecutor()
-    {
-        return eventExecutorGroup;
-    }
-
-    @VisibleForTesting
     Collection<Server> getServers()
     {
         return servers;
diff --git a/src/java/org/apache/cassandra/transport/Connection.java 
b/src/java/org/apache/cassandra/transport/Connection.java
index af26557..2966d9b 100644
--- a/src/java/org/apache/cassandra/transport/Connection.java
+++ b/src/java/org/apache/cassandra/transport/Connection.java
@@ -29,6 +29,7 @@ public class Connection
     private final Tracker tracker;
 
     private volatile FrameCompressor frameCompressor;
+    private boolean throwOnOverload;
 
     public Connection(Channel channel, int version, Tracker tracker)
     {
@@ -49,6 +50,16 @@ public class Connection
         return frameCompressor;
     }
 
+    public void setThrowOnOverload(boolean throwOnOverload)
+    {
+        this.throwOnOverload = throwOnOverload;
+    }
+
+    public boolean isThrowOnOverload()
+    {
+        return throwOnOverload;
+    }
+
     public Tracker getTracker()
     {
         return tracker;
diff --git a/src/java/org/apache/cassandra/transport/Frame.java 
b/src/java/org/apache/cassandra/transport/Frame.java
index 3940b47..c28be9f 100644
--- a/src/java/org/apache/cassandra/transport/Frame.java
+++ b/src/java/org/apache/cassandra/transport/Frame.java
@@ -68,7 +68,7 @@ public class Frame
 
     public static Frame create(Message.Type type, int streamId, int version, 
EnumSet<Header.Flag> flags, ByteBuf body)
     {
-        Header header = new Header(version, flags, streamId, type);
+        Header header = new Header(version, flags, streamId, type, 
body.readableBytes());
         return new Frame(header, body);
     }
 
@@ -83,18 +83,20 @@ public class Frame
         public final EnumSet<Flag> flags;
         public final int streamId;
         public final Message.Type type;
+        public final long bodySizeInBytes;
 
-        private Header(int version, int flags, int streamId, Message.Type type)
+        private Header(int version, int flags, int streamId, Message.Type 
type, long bodySizeInBytes)
         {
-            this(version, Flag.deserialize(flags), streamId, type);
+            this(version, Flag.deserialize(flags), streamId, type, 
bodySizeInBytes);
         }
 
-        private Header(int version, EnumSet<Flag> flags, int streamId, 
Message.Type type)
+        private Header(int version, EnumSet<Flag> flags, int streamId, 
Message.Type type, long bodySizeInBytes)
         {
             this.version = version;
             this.flags = flags;
             this.streamId = streamId;
             this.type = type;
+            this.bodySizeInBytes = bodySizeInBytes;
         }
 
         public static enum Flag
@@ -240,7 +242,7 @@ public class Frame
                         streamId);
             }
 
-            results.add(new Frame(new Header(version, flags, streamId, type), 
body));
+            results.add(new Frame(new Header(version, flags, streamId, type, 
bodyLength), body));
         }
 
         private void fail()
diff --git a/src/java/org/apache/cassandra/transport/Message.java 
b/src/java/org/apache/cassandra/transport/Message.java
index 0851b19..08a8600 100644
--- a/src/java/org/apache/cassandra/transport/Message.java
+++ b/src/java/org/apache/cassandra/transport/Message.java
@@ -42,11 +42,18 @@ import com.google.common.collect.ImmutableSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.concurrent.LocalAwareExecutorService;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.exceptions.OverloadedException;
+import org.apache.cassandra.metrics.ClientMetrics;
+import org.apache.cassandra.net.ResourceLimits;
 import org.apache.cassandra.service.ClientWarn;
 import org.apache.cassandra.transport.messages.*;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 
+import static org.apache.cassandra.concurrent.SharedExecutorPool.SHARED;
+
 /**
  * A message from the CQL binary protocol.
  */
@@ -404,19 +411,42 @@ public abstract class Message
         }
     }
 
-    @ChannelHandler.Sharable
     public static class Dispatcher extends SimpleChannelInboundHandler<Request>
     {
+        private static final LocalAwareExecutorService requestExecutor = 
SHARED.newExecutor(DatabaseDescriptor.getNativeTransportMaxThreads(),
+                                                                               
             Integer.MAX_VALUE,
+                                                                               
             "transport",
+                                                                               
             "Native-Transport-Requests");
+
+        /**
+         * Current count of *request* bytes that are live on the channel.
+         *
+         * Note: should only be accessed while on the netty event loop.
+         */
+        private long channelPayloadBytesInFlight;
+
+        private final Server.EndpointPayloadTracker endpointPayloadTracker;
+
+        private boolean paused;
+
         private static class FlushItem
         {
             final ChannelHandlerContext ctx;
             final Object response;
             final Frame sourceFrame;
-            private FlushItem(ChannelHandlerContext ctx, Object response, 
Frame sourceFrame)
+            final Dispatcher dispatcher;
+
+            private FlushItem(ChannelHandlerContext ctx, Object response, 
Frame sourceFrame, Dispatcher dispatcher)
             {
                 this.ctx = ctx;
                 this.sourceFrame = sourceFrame;
                 this.response = response;
+                this.dispatcher = dispatcher;
+            }
+
+            public void release()
+            {
+                dispatcher.releaseItem(this);
             }
         }
 
@@ -472,7 +502,7 @@ public abstract class Message
                     for (ChannelHandlerContext channel : channels)
                         channel.flush();
                     for (FlushItem item : flushed)
-                        item.sourceFrame.release();
+                        item.release();
 
                     channels.clear();
                     flushed.clear();
@@ -524,7 +554,7 @@ public abstract class Message
                     for (ChannelHandlerContext channel : channels)
                         channel.flush();
                     for (FlushItem item : flushed)
-                        item.sourceFrame.release();
+                        item.release();
 
                     channels.clear();
                     flushed.clear();
@@ -536,16 +566,98 @@ public abstract class Message
 
         private final boolean useLegacyFlusher;
 
-        public Dispatcher(boolean useLegacyFlusher)
+        public Dispatcher(boolean useLegacyFlusher, 
Server.EndpointPayloadTracker endpointPayloadTracker)
         {
             super(false);
             this.useLegacyFlusher = useLegacyFlusher;
+            this.endpointPayloadTracker = endpointPayloadTracker;
         }
 
         @Override
         public void channelRead0(ChannelHandlerContext ctx, Request request)
         {
+            // if we decide to handle this message, process it outside of the 
netty event loop
+            if (shouldHandleRequest(ctx, request))
+                requestExecutor.submit(() -> processRequest(ctx, request));
+        }
+
+        /** This check for inflight payload to potentially discard the request 
should have been ideally in one of the
+         * first handlers in the pipeline (Frame::decode()). However, incase 
of any exception thrown between that
+         * handler (where inflight payload is incremented) and this handler 
(Dispatcher::channelRead0) (where inflight
+         * payload in decremented), inflight payload becomes erroneous. 
ExceptionHandler is not sufficient for this
+         * purpose since it does not have the frame associated with the 
exception.
+         *
+         * Note: this method should execute on the netty event loop.
+         */
+        private boolean shouldHandleRequest(ChannelHandlerContext ctx, Request 
request)
+        {
+            long frameSize = request.getSourceFrame().header.bodySizeInBytes;
+
+            ResourceLimits.EndpointAndGlobal endpointAndGlobalPayloadsInFlight 
= endpointPayloadTracker.endpointAndGlobalPayloadsInFlight;
+
+            // check for overloaded state by trying to allocate framesize to 
inflight payload trackers
+            if (endpointAndGlobalPayloadsInFlight.tryAllocate(frameSize) != 
ResourceLimits.Outcome.SUCCESS)
+            {
+                if (request.connection.isThrowOnOverload())
+                {
+                    // discard the request and throw an exception
+                    ClientMetrics.instance.markRequestDiscarded();
+                    logger.trace("Discarded request of size: {}. 
InflightChannelRequestPayload: {}, InflightEndpointRequestPayload: {}, 
InflightOverallRequestPayload: {}, Request: {}",
+                                 frameSize,
+                                 channelPayloadBytesInFlight,
+                                 
endpointAndGlobalPayloadsInFlight.endpoint().using(),
+                                 
endpointAndGlobalPayloadsInFlight.global().using(),
+                                 request);
+                    throw ErrorMessage.wrap(new OverloadedException("Server is 
in overloaded state. Cannot accept more requests at this point"),
+                                            
request.getSourceFrame().header.streamId);
+                }
+                else
+                {
+                    // set backpressure on the channel, and handle the request
+                    endpointAndGlobalPayloadsInFlight.allocate(frameSize);
+                    ctx.channel().config().setAutoRead(false);
+                    ClientMetrics.instance.pauseConnection();
+                    paused = true;
+                }
+            }
+
+            channelPayloadBytesInFlight += frameSize;
+            return true;
+        }
+
+        /**
+         * Note: this method will be used in the {@link Flusher#run()}, which 
executes on the netty event loop
+         * ({@link Dispatcher#flusherLookup}). Thus, we assume the semantics 
and visibility of variables
+         * of being on the event loop.
+         */
+        private void releaseItem(FlushItem item)
+        {
+            long itemSize = item.sourceFrame.header.bodySizeInBytes;
+            item.sourceFrame.release();
+
+            // since the request has been processed, decrement inflight 
payload at channel, endpoint and global levels
+            channelPayloadBytesInFlight -= itemSize;
+            ResourceLimits.Outcome endpointGlobalReleaseOutcome = 
endpointPayloadTracker.endpointAndGlobalPayloadsInFlight.release(itemSize);
+
+            // now check to see if we need to reenable the channel's autoRead.
+            // If the current payload side is zero, we must reenable autoread 
as
+            // 1) we allow no other thread/channel to do it, and
+            // 2) there's no other events following this one (becuase we're at 
zero bytes in flight),
+            // so no successive to trigger the other clause in this if-block
+            ChannelConfig config = item.ctx.channel().config();
+            if (paused && (channelPayloadBytesInFlight == 0 || 
endpointGlobalReleaseOutcome == ResourceLimits.Outcome.BELOW_LIMIT))
+            {
+                paused = false;
+                ClientMetrics.instance.unpauseConnection();
+                config.setAutoRead(true);
+            }
+        }
 
+        /**
+         * Note: this method is not expected to execute on the netty event 
loop.
+         */
+        void processRequest(ChannelHandlerContext ctx, Request request)
+        {
             final Response response;
             final ServerConnection connection;
 
@@ -569,7 +681,7 @@ public abstract class Message
             {
                 JVMStabilityInspector.inspectThrowable(t);
                 UnexpectedChannelExceptionHandler handler = new 
UnexpectedChannelExceptionHandler(ctx.channel(), true);
-                flush(new FlushItem(ctx, ErrorMessage.fromException(t, 
handler).setStreamId(request.getStreamId()), request.getSourceFrame()));
+                flush(new FlushItem(ctx, ErrorMessage.fromException(t, 
handler).setStreamId(request.getStreamId()), request.getSourceFrame(), this));
                 return;
             }
             finally
@@ -578,7 +690,19 @@ public abstract class Message
             }
 
             logger.trace("Responding: {}, v={}", response, 
connection.getVersion());
-            flush(new FlushItem(ctx, response, request.getSourceFrame()));
+            flush(new FlushItem(ctx, response, request.getSourceFrame(), 
this));
+        }
+
+        @Override
+        public void channelInactive(ChannelHandlerContext ctx)
+        {
+            endpointPayloadTracker.release();
+            if (paused)
+            {
+                paused = false;
+                ClientMetrics.instance.unpauseConnection();
+            }
+            ctx.fireChannelInactive();
         }
 
         private void flush(FlushItem item)
@@ -596,6 +720,14 @@ public abstract class Message
             flusher.queued.add(item);
             flusher.start();
         }
+
+        public static void shutdown()
+        {
+            if (requestExecutor != null)
+            {
+                requestExecutor.shutdown();
+            }
+        }
     }
 
     @ChannelHandler.Sharable
diff --git 
a/src/java/org/apache/cassandra/transport/RequestThreadPoolExecutor.java 
b/src/java/org/apache/cassandra/transport/RequestThreadPoolExecutor.java
deleted file mode 100644
index 75dd05d..0000000
--- a/src/java/org/apache/cassandra/transport/RequestThreadPoolExecutor.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.transport;
-
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import io.netty.util.concurrent.AbstractEventExecutor;
-import io.netty.util.concurrent.EventExecutorGroup;
-import io.netty.util.concurrent.Future;
-import org.apache.cassandra.concurrent.LocalAwareExecutorService;
-import org.apache.cassandra.config.DatabaseDescriptor;
-
-import static org.apache.cassandra.concurrent.SharedExecutorPool.SHARED;
-
-public class RequestThreadPoolExecutor extends AbstractEventExecutor
-{
-    private final static int MAX_QUEUED_REQUESTS = 
Integer.getInteger("cassandra.max_queued_native_transport_requests", 128);
-    private final static String THREAD_FACTORY_ID = 
"Native-Transport-Requests";
-    private final LocalAwareExecutorService wrapped = 
SHARED.newExecutor(DatabaseDescriptor.getNativeTransportMaxThreads(),
-                                                                           
MAX_QUEUED_REQUESTS,
-                                                                           
"transport",
-                                                                           
THREAD_FACTORY_ID);
-
-    public boolean isShuttingDown()
-    {
-        return wrapped.isShutdown();
-    }
-
-    public Future<?> shutdownGracefully(long l, long l2, TimeUnit timeUnit)
-    {
-        throw new IllegalStateException();
-    }
-
-    public Future<?> terminationFuture()
-    {
-        throw new IllegalStateException();
-    }
-
-    @Override
-    public void shutdown()
-    {
-        wrapped.shutdown();
-    }
-
-    @Override
-    public List<Runnable> shutdownNow()
-    {
-        return wrapped.shutdownNow();
-    }
-
-    public boolean isShutdown()
-    {
-        return wrapped.isShutdown();
-    }
-
-    public boolean isTerminated()
-    {
-        return wrapped.isTerminated();
-    }
-
-    public boolean awaitTermination(long timeout, TimeUnit unit) throws 
InterruptedException
-    {
-        return wrapped.awaitTermination(timeout, unit);
-    }
-
-    public EventExecutorGroup parent()
-    {
-        return null;
-    }
-
-    public boolean inEventLoop(Thread thread)
-    {
-        return false;
-    }
-
-    public void execute(Runnable command)
-    {
-        wrapped.execute(command);
-    }
-}
diff --git a/src/java/org/apache/cassandra/transport/Server.java 
b/src/java/org/apache/cassandra/transport/Server.java
index 8c781db..83a676c 100644
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@ -26,6 +26,8 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -49,6 +51,7 @@ import io.netty.util.internal.logging.Slf4JLoggerFactory;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.EncryptionOptions;
 import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.net.ResourceLimits;
 import org.apache.cassandra.security.SSLFactory;
 import org.apache.cassandra.service.*;
 import org.apache.cassandra.transport.messages.EventMessage;
@@ -84,7 +87,6 @@ public class Server implements CassandraDaemon.Server
     private final AtomicBoolean isRunning = new AtomicBoolean(false);
 
     private EventLoopGroup workerGroup;
-    private EventExecutor eventExecutorGroup;
 
     private Server (Builder builder)
     {
@@ -101,8 +103,6 @@ public class Server implements CassandraDaemon.Server
             else
                 workerGroup = new NioEventLoopGroup();
         }
-        if (builder.eventExecutorGroup != null)
-            eventExecutorGroup = builder.eventExecutorGroup;
         EventNotifier notifier = new EventNotifier(this);
         StorageService.instance.register(notifier);
         MigrationManager.instance.register(notifier);
@@ -201,12 +201,6 @@ public class Server implements CassandraDaemon.Server
             return this;
         }
 
-        public Builder withEventExecutor(EventExecutor eventExecutor)
-        {
-            this.eventExecutorGroup = eventExecutor;
-            return this;
-        }
-
         public Builder withHost(InetAddress host)
         {
             this.hostAddr = host;
@@ -286,6 +280,49 @@ public class Server implements CassandraDaemon.Server
         }
     }
 
+    // global inflight payload across all channels across all endpoints
+    private static final ResourceLimits.Concurrent 
globalRequestPayloadInFlight = new 
ResourceLimits.Concurrent(DatabaseDescriptor.getNativeTransportMaxConcurrentRequestsInBytes());
+
+    public static class EndpointPayloadTracker
+    {
+        // inflight payload per endpoint across corresponding channels
+        private static final ConcurrentMap<InetAddress, 
EndpointPayloadTracker> requestPayloadInFlightPerEndpoint = new 
ConcurrentHashMap<>();
+
+        private final AtomicInteger refCount = new AtomicInteger(0);
+        private final InetAddress endpoint;
+
+        final ResourceLimits.EndpointAndGlobal 
endpointAndGlobalPayloadsInFlight = new ResourceLimits.EndpointAndGlobal(new 
ResourceLimits.Concurrent(DatabaseDescriptor.getNativeTransportMaxConcurrentRequestsInBytesPerIp()),
+                                                                               
                                          globalRequestPayloadInFlight);
+
+        private EndpointPayloadTracker(InetAddress endpoint)
+        {
+            this.endpoint = endpoint;
+        }
+
+        public static EndpointPayloadTracker get(InetAddress endpoint)
+        {
+            while (true)
+            {
+                EndpointPayloadTracker result = 
requestPayloadInFlightPerEndpoint.computeIfAbsent(endpoint, 
EndpointPayloadTracker::new);
+                if (result.acquire())
+                    return result;
+
+                requestPayloadInFlightPerEndpoint.remove(endpoint, result);
+            }
+        }
+
+        private boolean acquire()
+        {
+            return 0 < refCount.updateAndGet(i -> i < 0 ? i : i + 1);
+        }
+
+        public void release()
+        {
+            if (-1 == refCount.updateAndGet(i -> i == 1 ? -1 : i - 1))
+                requestPayloadInFlightPerEndpoint.remove(endpoint, this);
+        }
+    }
+
     private static class Initializer extends ChannelInitializer<Channel>
     {
         // Stateless handlers
@@ -295,7 +332,6 @@ public class Server implements CassandraDaemon.Server
         private static final Frame.Compressor frameCompressor = new 
Frame.Compressor();
         private static final Frame.Encoder frameEncoder = new Frame.Encoder();
         private static final Message.ExceptionHandler exceptionHandler = new 
Message.ExceptionHandler();
-        private static final Message.Dispatcher dispatcher = new 
Message.Dispatcher(DatabaseDescriptor.useNativeTransportLegacyFlusher());
         private static final ConnectionLimitHandler connectionLimitHandler = 
new ConnectionLimitHandler();
 
         private final Server server;
@@ -328,6 +364,9 @@ public class Server implements CassandraDaemon.Server
             pipeline.addLast("messageDecoder", messageDecoder);
             pipeline.addLast("messageEncoder", messageEncoder);
 
+            pipeline.addLast("executor", new 
Message.Dispatcher(DatabaseDescriptor.useNativeTransportLegacyFlusher(),
+                                                                
EndpointPayloadTracker.get(((InetSocketAddress) 
channel.remoteAddress()).getAddress())));
+
             // The exceptionHandler will take care of handling 
exceptionCaught(...) events while still running
             // on the same EventLoop as all previous added handlers in the 
pipeline. This is important as the used
             // eventExecutorGroup may not enforce strict ordering for channel 
events.
@@ -335,11 +374,6 @@ public class Server implements CassandraDaemon.Server
             // correctly handled before the handler itself is removed.
             // See https://issues.apache.org/jira/browse/CASSANDRA-13649
             pipeline.addLast("exceptionHandler", exceptionHandler);
-
-            if (server.eventExecutorGroup != null)
-                pipeline.addLast(server.eventExecutorGroup, "executor", 
dispatcher);
-            else
-                pipeline.addLast("executor", dispatcher);
         }
     }
 
diff --git a/src/java/org/apache/cassandra/transport/SimpleClient.java 
b/src/java/org/apache/cassandra/transport/SimpleClient.java
index 4759c2a..7916deb 100644
--- a/src/java/org/apache/cassandra/transport/SimpleClient.java
+++ b/src/java/org/apache/cassandra/transport/SimpleClient.java
@@ -115,10 +115,20 @@ public class SimpleClient implements Closeable
 
     public void connect(boolean useCompression) throws IOException
     {
+        connect(useCompression, false);
+    }
+
+    public void connect(boolean useCompression, boolean throwOnOverload) 
throws IOException
+    {
         establishConnection();
 
         Map<String, String> options = new HashMap<>();
         options.put(StartupMessage.CQL_VERSION, "3.0.0");
+
+        if (throwOnOverload)
+            options.put(StartupMessage.THROW_ON_OVERLOAD, "1");
+        connection.setThrowOnOverload(throwOnOverload);
+
         if (useCompression)
         {
             options.put(StartupMessage.COMPRESSION, "snappy");
diff --git 
a/src/java/org/apache/cassandra/transport/messages/StartupMessage.java 
b/src/java/org/apache/cassandra/transport/messages/StartupMessage.java
index 774be6a..92278fa 100644
--- a/src/java/org/apache/cassandra/transport/messages/StartupMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/StartupMessage.java
@@ -36,6 +36,7 @@ public class StartupMessage extends Message.Request
     public static final String CQL_VERSION = "CQL_VERSION";
     public static final String COMPRESSION = "COMPRESSION";
     public static final String NO_COMPACT = "NO_COMPACT";
+    public static final String THROW_ON_OVERLOAD = "THROW_ON_OVERLOAD";
 
     public static final Message.Codec<StartupMessage> codec = new 
Message.Codec<StartupMessage>()
     {
@@ -101,6 +102,8 @@ public class StartupMessage extends Message.Request
         if (options.containsKey(NO_COMPACT) && 
Boolean.parseBoolean(options.get(NO_COMPACT)))
             state.getClientState().setNoCompactMode();
 
+        
connection.setThrowOnOverload("1".equals(options.get(THROW_ON_OVERLOAD)));
+
         if (DatabaseDescriptor.getAuthenticator().requireAuthentication())
             return new 
AuthenticateMessage(DatabaseDescriptor.getAuthenticator().getClass().getName());
         else
diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java 
b/test/unit/org/apache/cassandra/cql3/CQLTester.java
index 3c0cefc..999404e 100644
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@ -44,6 +44,7 @@ import com.datastax.driver.core.ResultSet;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.metrics.ClientMetrics;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.cql3.functions.FunctionName;
@@ -334,6 +335,7 @@ public abstract class CQLTester
         SchemaLoader.startGossiper();
 
         server = new 
Server.Builder().withHost(nativeAddr).withPort(nativePort).build();
+        ClientMetrics.instance.init(Collections.singleton(server));
         server.start();
 
         for (int version : PROTOCOL_VERSIONS)
diff --git 
a/test/unit/org/apache/cassandra/service/NativeTransportServiceTest.java 
b/test/unit/org/apache/cassandra/service/NativeTransportServiceTest.java
index d0e291a..8f2689a 100644
--- a/test/unit/org/apache/cassandra/service/NativeTransportServiceTest.java
+++ b/test/unit/org/apache/cassandra/service/NativeTransportServiceTest.java
@@ -79,8 +79,7 @@ public class NativeTransportServiceTest
     {
         withService((NativeTransportService service) -> {
             Supplier<Boolean> allTerminated = () ->
-                                              
service.getWorkerGroup().isShutdown() && 
service.getWorkerGroup().isTerminated() &&
-                                              
service.getEventExecutor().isShutdown() && 
service.getEventExecutor().isTerminated();
+                                              
service.getWorkerGroup().isShutdown() && 
service.getWorkerGroup().isTerminated();
             assertFalse(allTerminated.get());
             service.destroy();
             assertTrue(allTerminated.get());
diff --git 
a/test/unit/org/apache/cassandra/transport/InflightRequestPayloadTrackerTest.java
 
b/test/unit/org/apache/cassandra/transport/InflightRequestPayloadTrackerTest.java
new file mode 100644
index 0000000..e4d335b
--- /dev/null
+++ 
b/test/unit/org/apache/cassandra/transport/InflightRequestPayloadTrackerTest.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.transport;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.cassandra.OrderedJUnit4ClassRunner;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.EncryptionOptions;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.exceptions.OverloadedException;
+import org.apache.cassandra.transport.messages.QueryMessage;
+
+@RunWith(OrderedJUnit4ClassRunner.class)
+public class InflightRequestPayloadTrackerTest extends CQLTester
+{
+    @BeforeClass
+    public static void setUp()
+    {
+        
DatabaseDescriptor.setNativeTransportMaxConcurrentRequestsInBytesPerIp(600);
+        DatabaseDescriptor.setNativeTransportMaxConcurrentRequestsInBytes(600);
+        requireNetwork();
+    }
+
+    @AfterClass
+    public static void tearDown()
+    {
+        
DatabaseDescriptor.setNativeTransportMaxConcurrentRequestsInBytesPerIp(3000000000L);
+        
DatabaseDescriptor.setNativeTransportMaxConcurrentRequestsInBytes(5000000000L);
+    }
+
+    @After
+    public void dropCreatedTable()
+    {
+        try
+        {
+            QueryProcessor.executeOnceInternal("DROP TABLE " + KEYSPACE + 
".atable");
+        }
+        catch (Throwable t)
+        {
+            // ignore
+        }
+    }
+
+    @Test
+    public void testQueryExecutionWithThrowOnOverload() throws Throwable
+    {
+        SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(),
+                                               nativePort,
+                                               Server.CURRENT_VERSION,
+                                               new 
EncryptionOptions.ClientEncryptionOptions());
+
+        try
+        {
+            client.connect(false, true);
+            QueryOptions queryOptions = QueryOptions.create(
+            QueryOptions.DEFAULT.getConsistency(),
+            QueryOptions.DEFAULT.getValues(),
+            QueryOptions.DEFAULT.skipMetadata(),
+            QueryOptions.DEFAULT.getPageSize(),
+            QueryOptions.DEFAULT.getPagingState(),
+            QueryOptions.DEFAULT.getSerialConsistency(),
+            Server.CURRENT_VERSION);
+
+            QueryMessage queryMessage = new QueryMessage(String.format("CREATE 
TABLE %s.atable (pk1 int PRIMARY KEY, v text)", KEYSPACE),
+                                                         queryOptions);
+            client.execute(queryMessage);
+        }
+        finally
+        {
+            client.close();
+        }
+    }
+
+    @Test
+    public void testQueryExecutionWithoutThrowOnOverload() throws Throwable
+    {
+        SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(),
+                                               nativePort,
+                                               Server.CURRENT_VERSION,
+                                               new 
EncryptionOptions.ClientEncryptionOptions());
+
+        try
+        {
+            client.connect(false, false);
+            QueryOptions queryOptions = QueryOptions.create(
+            QueryOptions.DEFAULT.getConsistency(),
+            QueryOptions.DEFAULT.getValues(),
+            QueryOptions.DEFAULT.skipMetadata(),
+            QueryOptions.DEFAULT.getPageSize(),
+            QueryOptions.DEFAULT.getPagingState(),
+            QueryOptions.DEFAULT.getSerialConsistency(),
+            Server.CURRENT_VERSION);
+
+            QueryMessage queryMessage = new QueryMessage(String.format("CREATE 
TABLE %s.atable (pk int PRIMARY KEY, v text)", KEYSPACE),
+                                                         queryOptions);
+            client.execute(queryMessage);
+            queryMessage = new QueryMessage(String.format("SELECT * FROM 
%s.atable", KEYSPACE),
+                                            queryOptions);
+            client.execute(queryMessage);
+        }
+        finally
+        {
+            client.close();
+        }
+    }
+
+    @Test
+    public void 
testQueryExecutionWithoutThrowOnOverloadAndInflightLimitedExceeded() throws 
Throwable
+    {
+        SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(),
+                                               nativePort,
+                                               Server.CURRENT_VERSION,
+                                               new 
EncryptionOptions.ClientEncryptionOptions());
+
+        try
+        {
+            client.connect(false, false);
+            QueryOptions queryOptions = QueryOptions.create(
+            QueryOptions.DEFAULT.getConsistency(),
+            QueryOptions.DEFAULT.getValues(),
+            QueryOptions.DEFAULT.skipMetadata(),
+            QueryOptions.DEFAULT.getPageSize(),
+            QueryOptions.DEFAULT.getPagingState(),
+            QueryOptions.DEFAULT.getSerialConsistency(),
+            Server.CURRENT_VERSION);
+
+            QueryMessage queryMessage = new QueryMessage(String.format("CREATE 
TABLE %s.atable (pk int PRIMARY KEY, v text)", KEYSPACE),
+                                                         queryOptions);
+            client.execute(queryMessage);
+
+            queryMessage = new QueryMessage(String.format("INSERT INTO 
%s.atable (pk, v) VALUES (1, 
'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
 [...]
+                                            queryOptions);
+            client.execute(queryMessage);
+        }
+        finally
+        {
+            client.close();
+        }
+    }
+
+    @Test
+    public void testOverloadedExceptionForEndpointInflightLimit() throws 
Throwable
+    {
+        SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(),
+                                               nativePort,
+                                               Server.CURRENT_VERSION,
+                                               new 
EncryptionOptions.ClientEncryptionOptions());
+
+        try
+        {
+            client.connect(false, true);
+            QueryOptions queryOptions = QueryOptions.create(
+            QueryOptions.DEFAULT.getConsistency(),
+            QueryOptions.DEFAULT.getValues(),
+            QueryOptions.DEFAULT.skipMetadata(),
+            QueryOptions.DEFAULT.getPageSize(),
+            QueryOptions.DEFAULT.getPagingState(),
+            QueryOptions.DEFAULT.getSerialConsistency(),
+            Server.CURRENT_VERSION);
+
+            QueryMessage queryMessage = new QueryMessage(String.format("CREATE 
TABLE %s.atable (pk int PRIMARY KEY, v text)", KEYSPACE),
+                                                         queryOptions);
+            client.execute(queryMessage);
+
+            queryMessage = new QueryMessage(String.format("INSERT INTO 
%s.atable (pk, v) VALUES (1, 
'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
 [...]
+                                            queryOptions);
+            try
+            {
+                client.execute(queryMessage);
+                Assert.fail();
+            }
+            catch (RuntimeException e)
+            {
+                Assert.assertTrue(e.getCause() instanceof OverloadedException);
+            }
+        }
+        finally
+        {
+            client.close();
+        }
+    }
+
+    @Test
+    public void testOverloadedExceptionForOverallInflightLimit() throws 
Throwable
+    {
+        SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(),
+                                               nativePort,
+                                               Server.CURRENT_VERSION,
+                                               new 
EncryptionOptions.ClientEncryptionOptions());
+
+        try
+        {
+            client.connect(false, true);
+            QueryOptions queryOptions = QueryOptions.create(
+            QueryOptions.DEFAULT.getConsistency(),
+            QueryOptions.DEFAULT.getValues(),
+            QueryOptions.DEFAULT.skipMetadata(),
+            QueryOptions.DEFAULT.getPageSize(),
+            QueryOptions.DEFAULT.getPagingState(),
+            QueryOptions.DEFAULT.getSerialConsistency(),
+            Server.CURRENT_VERSION);
+
+            QueryMessage queryMessage = new QueryMessage(String.format("CREATE 
TABLE %s.atable (pk int PRIMARY KEY, v text)", KEYSPACE),
+                                                         queryOptions);
+            client.execute(queryMessage);
+
+            queryMessage = new QueryMessage(String.format("INSERT INTO 
%s.atable (pk, v) VALUES (1, 
'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
 [...]
+                                            queryOptions);
+            try
+            {
+                client.execute(queryMessage);
+                Assert.fail();
+            }
+            catch (RuntimeException e)
+            {
+                Assert.assertTrue(e.getCause() instanceof OverloadedException);
+            }
+        }
+        finally
+        {
+            client.close();
+        }
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to