This is an automated email from the ASF dual-hosted git repository. benedict pushed a commit to branch cassandra-3.0 in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-3.0 by this push: new 5a03898 Prevent client requests from blocking on executor task queue 5a03898 is described below commit 5a03898c680ed6ada63901e8a4b278ccc8070717 Author: sumanthpasupuleti <sumanth.pasupuleti...@gmail.com> AuthorDate: Mon Mar 25 08:06:13 2019 -0700 Prevent client requests from blocking on executor task queue patch by Sumanth Pasupuleti, reviewed by Benedict for CASSANDRA-15013 --- CHANGES.txt | 1 + doc/native_protocol_v4.spec | 4 + src/java/org/apache/cassandra/config/Config.java | 3 + .../cassandra/config/DatabaseDescriptor.java | 31 +++ .../apache/cassandra/metrics/ClientMetrics.java | 56 ++++- .../org/apache/cassandra/net/ResourceLimits.java | 245 ++++++++++++++++++++ .../cassandra/service/NativeTransportService.java | 26 +-- .../org/apache/cassandra/transport/Connection.java | 11 + src/java/org/apache/cassandra/transport/Frame.java | 12 +- .../org/apache/cassandra/transport/Message.java | 146 +++++++++++- .../transport/RequestThreadPoolExecutor.java | 96 -------- .../org/apache/cassandra/transport/Server.java | 64 ++++-- .../apache/cassandra/transport/SimpleClient.java | 10 + .../transport/messages/StartupMessage.java | 3 + test/unit/org/apache/cassandra/cql3/CQLTester.java | 2 + .../service/NativeTransportServiceTest.java | 3 +- .../InflightRequestPayloadTrackerTest.java | 248 +++++++++++++++++++++ 17 files changed, 811 insertions(+), 150 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index c8bd30d..68d309c 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.19 + * Prevent client requests from blocking on executor task queue (CASSANDRA-15013) * Toughen up column drop/recreate type validations (CASSANDRA-15204) * LegacyLayout should handle paging states that cross a collection column (CASSANDRA-15201) * Prevent RuntimeException when username or password is empty/null (CASSANDRA-15198) diff --git a/doc/native_protocol_v4.spec b/doc/native_protocol_v4.spec index 02802a7..8beb77b 100644 --- a/doc/native_protocol_v4.spec +++ b/doc/native_protocol_v4.spec @@ -275,6 +275,9 @@ Table of Contents mode. This mode will make all Thrift and Compact Tables to be exposed as if they were CQL Tables. This is optional; if not specified, the option will not be used. + - "THROW_ON_OVERLOAD": In case of server overloaded with too many requests, by default the server puts + back pressure on the client connection. Instead, the server can send an OverloadedException error message back to + the client if this option is set to true. 4.1.2. AUTH_RESPONSE @@ -1175,3 +1178,4 @@ Table of Contents * The <paging_state> returned in the v4 protocol is not compatible with the v3 protocol. In other words, a <paging_state> returned by a node using protocol v4 should not be used to query a node using protocol v3 (and vice-versa). + * Added THROW_ON_OVERLOAD startup option (Section 4.1.1). diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index de158bd..830d3e1 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -154,6 +154,9 @@ public class Config public volatile Long native_transport_max_concurrent_connections = -1L; public volatile Long native_transport_max_concurrent_connections_per_ip = -1L; public boolean native_transport_flush_in_batches_legacy = true; + public volatile long native_transport_max_concurrent_requests_in_bytes_per_ip = -1L; + public volatile long native_transport_max_concurrent_requests_in_bytes = -1L; + @Deprecated public Integer thrift_max_message_length_in_mb = 16; diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index db55c20..8417c39 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -470,6 +470,17 @@ public class DatabaseDescriptor { throw new ConfigurationException("Missing endpoint_snitch directive", false); } + + if (conf.native_transport_max_concurrent_requests_in_bytes <= 0) + { + conf.native_transport_max_concurrent_requests_in_bytes = Runtime.getRuntime().maxMemory() / 10; + } + + if (conf.native_transport_max_concurrent_requests_in_bytes_per_ip <= 0) + { + conf.native_transport_max_concurrent_requests_in_bytes_per_ip = Runtime.getRuntime().maxMemory() / 40; + } + snitch = createEndpointSnitch(conf.endpoint_snitch); EndpointSnitchInfo.create(); @@ -1524,6 +1535,26 @@ public class DatabaseDescriptor conf.commitlog_sync_batch_window_in_ms = windowMillis; } + public static long getNativeTransportMaxConcurrentRequestsInBytesPerIp() + { + return conf.native_transport_max_concurrent_requests_in_bytes_per_ip; + } + + public static void setNativeTransportMaxConcurrentRequestsInBytesPerIp(long maxConcurrentRequestsInBytes) + { + conf.native_transport_max_concurrent_requests_in_bytes_per_ip = maxConcurrentRequestsInBytes; + } + + public static long getNativeTransportMaxConcurrentRequestsInBytes() + { + return conf.native_transport_max_concurrent_requests_in_bytes; + } + + public static void setNativeTransportMaxConcurrentRequestsInBytes(long maxConcurrentRequestsInBytes) + { + conf.native_transport_max_concurrent_requests_in_bytes = maxConcurrentRequestsInBytes; + } + public static int getCommitLogSyncPeriod() { return conf.commitlog_sync_period_in_ms; diff --git a/src/java/org/apache/cassandra/metrics/ClientMetrics.java b/src/java/org/apache/cassandra/metrics/ClientMetrics.java index 4a384eb..08f0531 100644 --- a/src/java/org/apache/cassandra/metrics/ClientMetrics.java +++ b/src/java/org/apache/cassandra/metrics/ClientMetrics.java @@ -18,9 +18,14 @@ */ package org.apache.cassandra.metrics; +import java.util.Collection; +import java.util.Collections; import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicInteger; import com.codahale.metrics.Gauge; +import com.codahale.metrics.Meter; +import org.apache.cassandra.transport.Server; import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics; @@ -28,13 +33,40 @@ import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics; public class ClientMetrics { private static final MetricNameFactory factory = new DefaultNameFactory("Client"); - public static final ClientMetrics instance = new ClientMetrics(); - + + private volatile boolean initialized = false; + + private Collection<Server> servers = Collections.emptyList(); + + private AtomicInteger pausedConnections; + private Gauge<Integer> pausedConnectionsGauge; + private Meter requestDiscarded; + private ClientMetrics() { } + public void pauseConnection() { pausedConnections.incrementAndGet(); } + public void unpauseConnection() { pausedConnections.decrementAndGet(); } + public void markRequestDiscarded() { requestDiscarded.mark(); } + + public synchronized void init(Collection<Server> servers) + { + if (initialized) + return; + + this.servers = servers; + + registerGauge("connectedNativeClients", this::countConnectedClients); + + pausedConnections = new AtomicInteger(); + pausedConnectionsGauge = registerGauge("PausedConnections", pausedConnections::get); + requestDiscarded = registerMeter("RequestDiscarded"); + + initialized = true; + } + public void addCounter(String name, final Callable<Integer> provider) { Metrics.register(factory.createMetricName(name), new Gauge<Integer>() @@ -51,4 +83,24 @@ public class ClientMetrics } }); } + + private int countConnectedClients() + { + int count = 0; + + for (Server server : servers) + count += server.getConnectedClients(); + + return count; + } + + private <T> Gauge<T> registerGauge(String name, Gauge<T> gauge) + { + return Metrics.register(factory.createMetricName(name), gauge); + } + + private Meter registerMeter(String name) + { + return Metrics.meter(factory.createMetricName(name)); + } } diff --git a/src/java/org/apache/cassandra/net/ResourceLimits.java b/src/java/org/apache/cassandra/net/ResourceLimits.java new file mode 100644 index 0000000..f8d24d7 --- /dev/null +++ b/src/java/org/apache/cassandra/net/ResourceLimits.java @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.net; + +import java.util.concurrent.atomic.AtomicLongFieldUpdater; + +public abstract class ResourceLimits +{ + /** + * Represents permits to utilise a resource and ways to allocate and release them. + * + * Two implementations are currently provided: + * 1. {@link Concurrent}, for shared limits, which is thread-safe; + * 2. {@link Basic}, for limits that are not shared between threads, is not thread-safe. + */ + public interface Limit + { + /** + * @return total amount of permits represented by this {@link Limit} - the capacity + */ + long limit(); + + /** + * @return remaining, unallocated permit amount + */ + long remaining(); + + /** + * @return amount of permits currently in use + */ + long using(); + + /** + * Attempts to allocate an amount of permits from this limit. If allocated, <em>MUST</em> eventually + * be released back with {@link #release(long)}. + * + * @return {@code true} if the allocation was successful, {@code false} otherwise + */ + boolean tryAllocate(long amount); + + /** + * Allocates an amount independent of permits available from this limit. <em>MUST</em> eventually + * be released back with {@link #release(long)}. + * + */ + void allocate(long amount); + + /** + * @param amount return the amount of permits back to this limit + * @return {@code ABOVE_LIMIT} if there aren't enough permits available even after the release, or + * {@code BELOW_LIMIT} if there are enough permits available after the releaese. + */ + Outcome release(long amount); + } + + /** + * A thread-safe permit container. + */ + public static class Concurrent implements Limit + { + private final long limit; + + private volatile long using; + private static final AtomicLongFieldUpdater<Concurrent> usingUpdater = + AtomicLongFieldUpdater.newUpdater(Concurrent.class, "using"); + + public Concurrent(long limit) + { + this.limit = limit; + } + + public long limit() + { + return limit; + } + + public long remaining() + { + return limit - using; + } + + public long using() + { + return using; + } + + public boolean tryAllocate(long amount) + { + long current, next; + do + { + current = using; + next = current + amount; + + if (next > limit) + return false; + } + while (!usingUpdater.compareAndSet(this, current, next)); + + return true; + } + + public void allocate(long amount) + { + long current, next; + do + { + current = using; + next = current + amount; + } while (!usingUpdater.compareAndSet(this, current, next)); + } + + public Outcome release(long amount) + { + assert amount >= 0; + long using = usingUpdater.addAndGet(this, -amount); + assert using >= 0; + return using >= limit ? Outcome.ABOVE_LIMIT : Outcome.BELOW_LIMIT; + } + } + + /** + * A cheaper, thread-unsafe permit container to be used for unshared limits. + */ + static class Basic implements Limit + { + private final long limit; + private long using; + + Basic(long limit) + { + this.limit = limit; + } + + public long limit() + { + return limit; + } + + public long remaining() + { + return limit - using; + } + + public long using() + { + return using; + } + + public boolean tryAllocate(long amount) + { + if (using + amount > limit) + return false; + + using += amount; + return true; + } + + public void allocate(long amount) + { + using += amount; + } + + public Outcome release(long amount) + { + assert amount >= 0 && amount <= using; + using -= amount; + return using >= limit ? Outcome.ABOVE_LIMIT : Outcome.BELOW_LIMIT; + } + } + + /** + * A convenience class that groups a per-endpoint limit with the global one + * to allow allocating/releasing permits from/to both limits as one logical operation. + */ + public static class EndpointAndGlobal + { + final Limit endpoint; + final Limit global; + + public EndpointAndGlobal(Limit endpoint, Limit global) + { + this.endpoint = endpoint; + this.global = global; + } + + public Limit endpoint() + { + return endpoint; + } + + public Limit global() + { + return global; + } + + /** + * @return {@code INSUFFICIENT_GLOBAL} if there weren't enough permits in the global limit, or + * {@code INSUFFICIENT_ENDPOINT} if there weren't enough permits in the per-endpoint limit, or + * {@code SUCCESS} if there were enough permits to take from both. + */ + public Outcome tryAllocate(long amount) + { + if (!global.tryAllocate(amount)) + return Outcome.INSUFFICIENT_GLOBAL; + + if (endpoint.tryAllocate(amount)) + return Outcome.SUCCESS; + + global.release(amount); + return Outcome.INSUFFICIENT_ENDPOINT; + } + + public void allocate(long amount) + { + global.allocate(amount); + endpoint.allocate(amount); + } + + public Outcome release(long amount) + { + Outcome endpointReleaseOutcome = endpoint.release(amount); + Outcome globalReleaseOutcome = global.release(amount); + return (endpointReleaseOutcome == Outcome.ABOVE_LIMIT || globalReleaseOutcome == Outcome.ABOVE_LIMIT) + ? Outcome.ABOVE_LIMIT : Outcome.BELOW_LIMIT; + } + } + + public enum Outcome { SUCCESS, INSUFFICIENT_ENDPOINT, INSUFFICIENT_GLOBAL, BELOW_LIMIT, ABOVE_LIMIT } +} diff --git a/src/java/org/apache/cassandra/service/NativeTransportService.java b/src/java/org/apache/cassandra/service/NativeTransportService.java index 48839f1..2280818 100644 --- a/src/java/org/apache/cassandra/service/NativeTransportService.java +++ b/src/java/org/apache/cassandra/service/NativeTransportService.java @@ -31,11 +31,9 @@ import io.netty.channel.EventLoopGroup; import io.netty.channel.epoll.Epoll; import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.util.concurrent.EventExecutor; -import io.netty.util.concurrent.Future; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.metrics.ClientMetrics; -import org.apache.cassandra.transport.RequestThreadPoolExecutor; +import org.apache.cassandra.transport.Message; import org.apache.cassandra.transport.Server; /** @@ -50,7 +48,6 @@ public class NativeTransportService private boolean initialized = false; private EventLoopGroup workerGroup; - private EventExecutor eventExecutorGroup; /** * Creates netty thread pools and event loops. @@ -61,9 +58,6 @@ public class NativeTransportService if (initialized) return; - // prepare netty resources - eventExecutorGroup = new RequestThreadPoolExecutor(); - if (useEpoll()) { workerGroup = new EpollEventLoopGroup(); @@ -80,7 +74,6 @@ public class NativeTransportService InetAddress nativeAddr = DatabaseDescriptor.getRpcAddress(); org.apache.cassandra.transport.Server.Builder builder = new org.apache.cassandra.transport.Server.Builder() - .withEventExecutor(eventExecutorGroup) .withEventLoopGroup(workerGroup) .withHost(nativeAddr); @@ -108,13 +101,7 @@ public class NativeTransportService } // register metrics - ClientMetrics.instance.addCounter("connectedNativeClients", () -> - { - int ret = 0; - for (Server server : servers) - ret += server.getConnectedClients(); - return ret; - }); + ClientMetrics.instance.init(servers); initialized = true; } @@ -147,8 +134,7 @@ public class NativeTransportService // shutdown executors used by netty for native transport server workerGroup.shutdownGracefully(3, 5, TimeUnit.SECONDS).awaitUninterruptibly(); - // shutdownGracefully not implemented yet in RequestThreadPoolExecutor - eventExecutorGroup.shutdown(); + Message.Dispatcher.shutdown(); } /** @@ -177,12 +163,6 @@ public class NativeTransportService } @VisibleForTesting - EventExecutor getEventExecutor() - { - return eventExecutorGroup; - } - - @VisibleForTesting Collection<Server> getServers() { return servers; diff --git a/src/java/org/apache/cassandra/transport/Connection.java b/src/java/org/apache/cassandra/transport/Connection.java index af26557..2966d9b 100644 --- a/src/java/org/apache/cassandra/transport/Connection.java +++ b/src/java/org/apache/cassandra/transport/Connection.java @@ -29,6 +29,7 @@ public class Connection private final Tracker tracker; private volatile FrameCompressor frameCompressor; + private boolean throwOnOverload; public Connection(Channel channel, int version, Tracker tracker) { @@ -49,6 +50,16 @@ public class Connection return frameCompressor; } + public void setThrowOnOverload(boolean throwOnOverload) + { + this.throwOnOverload = throwOnOverload; + } + + public boolean isThrowOnOverload() + { + return throwOnOverload; + } + public Tracker getTracker() { return tracker; diff --git a/src/java/org/apache/cassandra/transport/Frame.java b/src/java/org/apache/cassandra/transport/Frame.java index 3940b47..c28be9f 100644 --- a/src/java/org/apache/cassandra/transport/Frame.java +++ b/src/java/org/apache/cassandra/transport/Frame.java @@ -68,7 +68,7 @@ public class Frame public static Frame create(Message.Type type, int streamId, int version, EnumSet<Header.Flag> flags, ByteBuf body) { - Header header = new Header(version, flags, streamId, type); + Header header = new Header(version, flags, streamId, type, body.readableBytes()); return new Frame(header, body); } @@ -83,18 +83,20 @@ public class Frame public final EnumSet<Flag> flags; public final int streamId; public final Message.Type type; + public final long bodySizeInBytes; - private Header(int version, int flags, int streamId, Message.Type type) + private Header(int version, int flags, int streamId, Message.Type type, long bodySizeInBytes) { - this(version, Flag.deserialize(flags), streamId, type); + this(version, Flag.deserialize(flags), streamId, type, bodySizeInBytes); } - private Header(int version, EnumSet<Flag> flags, int streamId, Message.Type type) + private Header(int version, EnumSet<Flag> flags, int streamId, Message.Type type, long bodySizeInBytes) { this.version = version; this.flags = flags; this.streamId = streamId; this.type = type; + this.bodySizeInBytes = bodySizeInBytes; } public static enum Flag @@ -240,7 +242,7 @@ public class Frame streamId); } - results.add(new Frame(new Header(version, flags, streamId, type), body)); + results.add(new Frame(new Header(version, flags, streamId, type, bodyLength), body)); } private void fail() diff --git a/src/java/org/apache/cassandra/transport/Message.java b/src/java/org/apache/cassandra/transport/Message.java index 0851b19..08a8600 100644 --- a/src/java/org/apache/cassandra/transport/Message.java +++ b/src/java/org/apache/cassandra/transport/Message.java @@ -42,11 +42,18 @@ import com.google.common.collect.ImmutableSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.concurrent.LocalAwareExecutorService; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.exceptions.OverloadedException; +import org.apache.cassandra.metrics.ClientMetrics; +import org.apache.cassandra.net.ResourceLimits; import org.apache.cassandra.service.ClientWarn; import org.apache.cassandra.transport.messages.*; import org.apache.cassandra.service.QueryState; import org.apache.cassandra.utils.JVMStabilityInspector; +import static org.apache.cassandra.concurrent.SharedExecutorPool.SHARED; + /** * A message from the CQL binary protocol. */ @@ -404,19 +411,42 @@ public abstract class Message } } - @ChannelHandler.Sharable public static class Dispatcher extends SimpleChannelInboundHandler<Request> { + private static final LocalAwareExecutorService requestExecutor = SHARED.newExecutor(DatabaseDescriptor.getNativeTransportMaxThreads(), + Integer.MAX_VALUE, + "transport", + "Native-Transport-Requests"); + + /** + * Current count of *request* bytes that are live on the channel. + * + * Note: should only be accessed while on the netty event loop. + */ + private long channelPayloadBytesInFlight; + + private final Server.EndpointPayloadTracker endpointPayloadTracker; + + private boolean paused; + private static class FlushItem { final ChannelHandlerContext ctx; final Object response; final Frame sourceFrame; - private FlushItem(ChannelHandlerContext ctx, Object response, Frame sourceFrame) + final Dispatcher dispatcher; + + private FlushItem(ChannelHandlerContext ctx, Object response, Frame sourceFrame, Dispatcher dispatcher) { this.ctx = ctx; this.sourceFrame = sourceFrame; this.response = response; + this.dispatcher = dispatcher; + } + + public void release() + { + dispatcher.releaseItem(this); } } @@ -472,7 +502,7 @@ public abstract class Message for (ChannelHandlerContext channel : channels) channel.flush(); for (FlushItem item : flushed) - item.sourceFrame.release(); + item.release(); channels.clear(); flushed.clear(); @@ -524,7 +554,7 @@ public abstract class Message for (ChannelHandlerContext channel : channels) channel.flush(); for (FlushItem item : flushed) - item.sourceFrame.release(); + item.release(); channels.clear(); flushed.clear(); @@ -536,16 +566,98 @@ public abstract class Message private final boolean useLegacyFlusher; - public Dispatcher(boolean useLegacyFlusher) + public Dispatcher(boolean useLegacyFlusher, Server.EndpointPayloadTracker endpointPayloadTracker) { super(false); this.useLegacyFlusher = useLegacyFlusher; + this.endpointPayloadTracker = endpointPayloadTracker; } @Override public void channelRead0(ChannelHandlerContext ctx, Request request) { + // if we decide to handle this message, process it outside of the netty event loop + if (shouldHandleRequest(ctx, request)) + requestExecutor.submit(() -> processRequest(ctx, request)); + } + + /** This check for inflight payload to potentially discard the request should have been ideally in one of the + * first handlers in the pipeline (Frame::decode()). However, incase of any exception thrown between that + * handler (where inflight payload is incremented) and this handler (Dispatcher::channelRead0) (where inflight + * payload in decremented), inflight payload becomes erroneous. ExceptionHandler is not sufficient for this + * purpose since it does not have the frame associated with the exception. + * + * Note: this method should execute on the netty event loop. + */ + private boolean shouldHandleRequest(ChannelHandlerContext ctx, Request request) + { + long frameSize = request.getSourceFrame().header.bodySizeInBytes; + + ResourceLimits.EndpointAndGlobal endpointAndGlobalPayloadsInFlight = endpointPayloadTracker.endpointAndGlobalPayloadsInFlight; + + // check for overloaded state by trying to allocate framesize to inflight payload trackers + if (endpointAndGlobalPayloadsInFlight.tryAllocate(frameSize) != ResourceLimits.Outcome.SUCCESS) + { + if (request.connection.isThrowOnOverload()) + { + // discard the request and throw an exception + ClientMetrics.instance.markRequestDiscarded(); + logger.trace("Discarded request of size: {}. InflightChannelRequestPayload: {}, InflightEndpointRequestPayload: {}, InflightOverallRequestPayload: {}, Request: {}", + frameSize, + channelPayloadBytesInFlight, + endpointAndGlobalPayloadsInFlight.endpoint().using(), + endpointAndGlobalPayloadsInFlight.global().using(), + request); + throw ErrorMessage.wrap(new OverloadedException("Server is in overloaded state. Cannot accept more requests at this point"), + request.getSourceFrame().header.streamId); + } + else + { + // set backpressure on the channel, and handle the request + endpointAndGlobalPayloadsInFlight.allocate(frameSize); + ctx.channel().config().setAutoRead(false); + ClientMetrics.instance.pauseConnection(); + paused = true; + } + } + + channelPayloadBytesInFlight += frameSize; + return true; + } + + /** + * Note: this method will be used in the {@link Flusher#run()}, which executes on the netty event loop + * ({@link Dispatcher#flusherLookup}). Thus, we assume the semantics and visibility of variables + * of being on the event loop. + */ + private void releaseItem(FlushItem item) + { + long itemSize = item.sourceFrame.header.bodySizeInBytes; + item.sourceFrame.release(); + + // since the request has been processed, decrement inflight payload at channel, endpoint and global levels + channelPayloadBytesInFlight -= itemSize; + ResourceLimits.Outcome endpointGlobalReleaseOutcome = endpointPayloadTracker.endpointAndGlobalPayloadsInFlight.release(itemSize); + + // now check to see if we need to reenable the channel's autoRead. + // If the current payload side is zero, we must reenable autoread as + // 1) we allow no other thread/channel to do it, and + // 2) there's no other events following this one (becuase we're at zero bytes in flight), + // so no successive to trigger the other clause in this if-block + ChannelConfig config = item.ctx.channel().config(); + if (paused && (channelPayloadBytesInFlight == 0 || endpointGlobalReleaseOutcome == ResourceLimits.Outcome.BELOW_LIMIT)) + { + paused = false; + ClientMetrics.instance.unpauseConnection(); + config.setAutoRead(true); + } + } + /** + * Note: this method is not expected to execute on the netty event loop. + */ + void processRequest(ChannelHandlerContext ctx, Request request) + { final Response response; final ServerConnection connection; @@ -569,7 +681,7 @@ public abstract class Message { JVMStabilityInspector.inspectThrowable(t); UnexpectedChannelExceptionHandler handler = new UnexpectedChannelExceptionHandler(ctx.channel(), true); - flush(new FlushItem(ctx, ErrorMessage.fromException(t, handler).setStreamId(request.getStreamId()), request.getSourceFrame())); + flush(new FlushItem(ctx, ErrorMessage.fromException(t, handler).setStreamId(request.getStreamId()), request.getSourceFrame(), this)); return; } finally @@ -578,7 +690,19 @@ public abstract class Message } logger.trace("Responding: {}, v={}", response, connection.getVersion()); - flush(new FlushItem(ctx, response, request.getSourceFrame())); + flush(new FlushItem(ctx, response, request.getSourceFrame(), this)); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) + { + endpointPayloadTracker.release(); + if (paused) + { + paused = false; + ClientMetrics.instance.unpauseConnection(); + } + ctx.fireChannelInactive(); } private void flush(FlushItem item) @@ -596,6 +720,14 @@ public abstract class Message flusher.queued.add(item); flusher.start(); } + + public static void shutdown() + { + if (requestExecutor != null) + { + requestExecutor.shutdown(); + } + } } @ChannelHandler.Sharable diff --git a/src/java/org/apache/cassandra/transport/RequestThreadPoolExecutor.java b/src/java/org/apache/cassandra/transport/RequestThreadPoolExecutor.java deleted file mode 100644 index 75dd05d..0000000 --- a/src/java/org/apache/cassandra/transport/RequestThreadPoolExecutor.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.transport; - -import java.util.List; -import java.util.concurrent.TimeUnit; - -import io.netty.util.concurrent.AbstractEventExecutor; -import io.netty.util.concurrent.EventExecutorGroup; -import io.netty.util.concurrent.Future; -import org.apache.cassandra.concurrent.LocalAwareExecutorService; -import org.apache.cassandra.config.DatabaseDescriptor; - -import static org.apache.cassandra.concurrent.SharedExecutorPool.SHARED; - -public class RequestThreadPoolExecutor extends AbstractEventExecutor -{ - private final static int MAX_QUEUED_REQUESTS = Integer.getInteger("cassandra.max_queued_native_transport_requests", 128); - private final static String THREAD_FACTORY_ID = "Native-Transport-Requests"; - private final LocalAwareExecutorService wrapped = SHARED.newExecutor(DatabaseDescriptor.getNativeTransportMaxThreads(), - MAX_QUEUED_REQUESTS, - "transport", - THREAD_FACTORY_ID); - - public boolean isShuttingDown() - { - return wrapped.isShutdown(); - } - - public Future<?> shutdownGracefully(long l, long l2, TimeUnit timeUnit) - { - throw new IllegalStateException(); - } - - public Future<?> terminationFuture() - { - throw new IllegalStateException(); - } - - @Override - public void shutdown() - { - wrapped.shutdown(); - } - - @Override - public List<Runnable> shutdownNow() - { - return wrapped.shutdownNow(); - } - - public boolean isShutdown() - { - return wrapped.isShutdown(); - } - - public boolean isTerminated() - { - return wrapped.isTerminated(); - } - - public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException - { - return wrapped.awaitTermination(timeout, unit); - } - - public EventExecutorGroup parent() - { - return null; - } - - public boolean inEventLoop(Thread thread) - { - return false; - } - - public void execute(Runnable command) - { - wrapped.execute(command); - } -} diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java index 8c781db..83a676c 100644 --- a/src/java/org/apache/cassandra/transport/Server.java +++ b/src/java/org/apache/cassandra/transport/Server.java @@ -26,6 +26,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLEngine; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicInteger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,6 +51,7 @@ import io.netty.util.internal.logging.Slf4JLoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.EncryptionOptions; import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.net.ResourceLimits; import org.apache.cassandra.security.SSLFactory; import org.apache.cassandra.service.*; import org.apache.cassandra.transport.messages.EventMessage; @@ -84,7 +87,6 @@ public class Server implements CassandraDaemon.Server private final AtomicBoolean isRunning = new AtomicBoolean(false); private EventLoopGroup workerGroup; - private EventExecutor eventExecutorGroup; private Server (Builder builder) { @@ -101,8 +103,6 @@ public class Server implements CassandraDaemon.Server else workerGroup = new NioEventLoopGroup(); } - if (builder.eventExecutorGroup != null) - eventExecutorGroup = builder.eventExecutorGroup; EventNotifier notifier = new EventNotifier(this); StorageService.instance.register(notifier); MigrationManager.instance.register(notifier); @@ -201,12 +201,6 @@ public class Server implements CassandraDaemon.Server return this; } - public Builder withEventExecutor(EventExecutor eventExecutor) - { - this.eventExecutorGroup = eventExecutor; - return this; - } - public Builder withHost(InetAddress host) { this.hostAddr = host; @@ -286,6 +280,49 @@ public class Server implements CassandraDaemon.Server } } + // global inflight payload across all channels across all endpoints + private static final ResourceLimits.Concurrent globalRequestPayloadInFlight = new ResourceLimits.Concurrent(DatabaseDescriptor.getNativeTransportMaxConcurrentRequestsInBytes()); + + public static class EndpointPayloadTracker + { + // inflight payload per endpoint across corresponding channels + private static final ConcurrentMap<InetAddress, EndpointPayloadTracker> requestPayloadInFlightPerEndpoint = new ConcurrentHashMap<>(); + + private final AtomicInteger refCount = new AtomicInteger(0); + private final InetAddress endpoint; + + final ResourceLimits.EndpointAndGlobal endpointAndGlobalPayloadsInFlight = new ResourceLimits.EndpointAndGlobal(new ResourceLimits.Concurrent(DatabaseDescriptor.getNativeTransportMaxConcurrentRequestsInBytesPerIp()), + globalRequestPayloadInFlight); + + private EndpointPayloadTracker(InetAddress endpoint) + { + this.endpoint = endpoint; + } + + public static EndpointPayloadTracker get(InetAddress endpoint) + { + while (true) + { + EndpointPayloadTracker result = requestPayloadInFlightPerEndpoint.computeIfAbsent(endpoint, EndpointPayloadTracker::new); + if (result.acquire()) + return result; + + requestPayloadInFlightPerEndpoint.remove(endpoint, result); + } + } + + private boolean acquire() + { + return 0 < refCount.updateAndGet(i -> i < 0 ? i : i + 1); + } + + public void release() + { + if (-1 == refCount.updateAndGet(i -> i == 1 ? -1 : i - 1)) + requestPayloadInFlightPerEndpoint.remove(endpoint, this); + } + } + private static class Initializer extends ChannelInitializer<Channel> { // Stateless handlers @@ -295,7 +332,6 @@ public class Server implements CassandraDaemon.Server private static final Frame.Compressor frameCompressor = new Frame.Compressor(); private static final Frame.Encoder frameEncoder = new Frame.Encoder(); private static final Message.ExceptionHandler exceptionHandler = new Message.ExceptionHandler(); - private static final Message.Dispatcher dispatcher = new Message.Dispatcher(DatabaseDescriptor.useNativeTransportLegacyFlusher()); private static final ConnectionLimitHandler connectionLimitHandler = new ConnectionLimitHandler(); private final Server server; @@ -328,6 +364,9 @@ public class Server implements CassandraDaemon.Server pipeline.addLast("messageDecoder", messageDecoder); pipeline.addLast("messageEncoder", messageEncoder); + pipeline.addLast("executor", new Message.Dispatcher(DatabaseDescriptor.useNativeTransportLegacyFlusher(), + EndpointPayloadTracker.get(((InetSocketAddress) channel.remoteAddress()).getAddress()))); + // The exceptionHandler will take care of handling exceptionCaught(...) events while still running // on the same EventLoop as all previous added handlers in the pipeline. This is important as the used // eventExecutorGroup may not enforce strict ordering for channel events. @@ -335,11 +374,6 @@ public class Server implements CassandraDaemon.Server // correctly handled before the handler itself is removed. // See https://issues.apache.org/jira/browse/CASSANDRA-13649 pipeline.addLast("exceptionHandler", exceptionHandler); - - if (server.eventExecutorGroup != null) - pipeline.addLast(server.eventExecutorGroup, "executor", dispatcher); - else - pipeline.addLast("executor", dispatcher); } } diff --git a/src/java/org/apache/cassandra/transport/SimpleClient.java b/src/java/org/apache/cassandra/transport/SimpleClient.java index 4759c2a..7916deb 100644 --- a/src/java/org/apache/cassandra/transport/SimpleClient.java +++ b/src/java/org/apache/cassandra/transport/SimpleClient.java @@ -115,10 +115,20 @@ public class SimpleClient implements Closeable public void connect(boolean useCompression) throws IOException { + connect(useCompression, false); + } + + public void connect(boolean useCompression, boolean throwOnOverload) throws IOException + { establishConnection(); Map<String, String> options = new HashMap<>(); options.put(StartupMessage.CQL_VERSION, "3.0.0"); + + if (throwOnOverload) + options.put(StartupMessage.THROW_ON_OVERLOAD, "1"); + connection.setThrowOnOverload(throwOnOverload); + if (useCompression) { options.put(StartupMessage.COMPRESSION, "snappy"); diff --git a/src/java/org/apache/cassandra/transport/messages/StartupMessage.java b/src/java/org/apache/cassandra/transport/messages/StartupMessage.java index 774be6a..92278fa 100644 --- a/src/java/org/apache/cassandra/transport/messages/StartupMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/StartupMessage.java @@ -36,6 +36,7 @@ public class StartupMessage extends Message.Request public static final String CQL_VERSION = "CQL_VERSION"; public static final String COMPRESSION = "COMPRESSION"; public static final String NO_COMPACT = "NO_COMPACT"; + public static final String THROW_ON_OVERLOAD = "THROW_ON_OVERLOAD"; public static final Message.Codec<StartupMessage> codec = new Message.Codec<StartupMessage>() { @@ -101,6 +102,8 @@ public class StartupMessage extends Message.Request if (options.containsKey(NO_COMPACT) && Boolean.parseBoolean(options.get(NO_COMPACT))) state.getClientState().setNoCompactMode(); + connection.setThrowOnOverload("1".equals(options.get(THROW_ON_OVERLOAD))); + if (DatabaseDescriptor.getAuthenticator().requireAuthentication()) return new AuthenticateMessage(DatabaseDescriptor.getAuthenticator().getClass().getName()); else diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java index 3c0cefc..999404e 100644 --- a/test/unit/org/apache/cassandra/cql3/CQLTester.java +++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java @@ -44,6 +44,7 @@ import com.datastax.driver.core.ResultSet; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.metrics.ClientMetrics; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.Schema; import org.apache.cassandra.cql3.functions.FunctionName; @@ -334,6 +335,7 @@ public abstract class CQLTester SchemaLoader.startGossiper(); server = new Server.Builder().withHost(nativeAddr).withPort(nativePort).build(); + ClientMetrics.instance.init(Collections.singleton(server)); server.start(); for (int version : PROTOCOL_VERSIONS) diff --git a/test/unit/org/apache/cassandra/service/NativeTransportServiceTest.java b/test/unit/org/apache/cassandra/service/NativeTransportServiceTest.java index d0e291a..8f2689a 100644 --- a/test/unit/org/apache/cassandra/service/NativeTransportServiceTest.java +++ b/test/unit/org/apache/cassandra/service/NativeTransportServiceTest.java @@ -79,8 +79,7 @@ public class NativeTransportServiceTest { withService((NativeTransportService service) -> { Supplier<Boolean> allTerminated = () -> - service.getWorkerGroup().isShutdown() && service.getWorkerGroup().isTerminated() && - service.getEventExecutor().isShutdown() && service.getEventExecutor().isTerminated(); + service.getWorkerGroup().isShutdown() && service.getWorkerGroup().isTerminated(); assertFalse(allTerminated.get()); service.destroy(); assertTrue(allTerminated.get()); diff --git a/test/unit/org/apache/cassandra/transport/InflightRequestPayloadTrackerTest.java b/test/unit/org/apache/cassandra/transport/InflightRequestPayloadTrackerTest.java new file mode 100644 index 0000000..e4d335b --- /dev/null +++ b/test/unit/org/apache/cassandra/transport/InflightRequestPayloadTrackerTest.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.transport; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.apache.cassandra.OrderedJUnit4ClassRunner; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.EncryptionOptions; +import org.apache.cassandra.cql3.CQLTester; +import org.apache.cassandra.cql3.QueryOptions; +import org.apache.cassandra.cql3.QueryProcessor; +import org.apache.cassandra.exceptions.OverloadedException; +import org.apache.cassandra.transport.messages.QueryMessage; + +@RunWith(OrderedJUnit4ClassRunner.class) +public class InflightRequestPayloadTrackerTest extends CQLTester +{ + @BeforeClass + public static void setUp() + { + DatabaseDescriptor.setNativeTransportMaxConcurrentRequestsInBytesPerIp(600); + DatabaseDescriptor.setNativeTransportMaxConcurrentRequestsInBytes(600); + requireNetwork(); + } + + @AfterClass + public static void tearDown() + { + DatabaseDescriptor.setNativeTransportMaxConcurrentRequestsInBytesPerIp(3000000000L); + DatabaseDescriptor.setNativeTransportMaxConcurrentRequestsInBytes(5000000000L); + } + + @After + public void dropCreatedTable() + { + try + { + QueryProcessor.executeOnceInternal("DROP TABLE " + KEYSPACE + ".atable"); + } + catch (Throwable t) + { + // ignore + } + } + + @Test + public void testQueryExecutionWithThrowOnOverload() throws Throwable + { + SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(), + nativePort, + Server.CURRENT_VERSION, + new EncryptionOptions.ClientEncryptionOptions()); + + try + { + client.connect(false, true); + QueryOptions queryOptions = QueryOptions.create( + QueryOptions.DEFAULT.getConsistency(), + QueryOptions.DEFAULT.getValues(), + QueryOptions.DEFAULT.skipMetadata(), + QueryOptions.DEFAULT.getPageSize(), + QueryOptions.DEFAULT.getPagingState(), + QueryOptions.DEFAULT.getSerialConsistency(), + Server.CURRENT_VERSION); + + QueryMessage queryMessage = new QueryMessage(String.format("CREATE TABLE %s.atable (pk1 int PRIMARY KEY, v text)", KEYSPACE), + queryOptions); + client.execute(queryMessage); + } + finally + { + client.close(); + } + } + + @Test + public void testQueryExecutionWithoutThrowOnOverload() throws Throwable + { + SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(), + nativePort, + Server.CURRENT_VERSION, + new EncryptionOptions.ClientEncryptionOptions()); + + try + { + client.connect(false, false); + QueryOptions queryOptions = QueryOptions.create( + QueryOptions.DEFAULT.getConsistency(), + QueryOptions.DEFAULT.getValues(), + QueryOptions.DEFAULT.skipMetadata(), + QueryOptions.DEFAULT.getPageSize(), + QueryOptions.DEFAULT.getPagingState(), + QueryOptions.DEFAULT.getSerialConsistency(), + Server.CURRENT_VERSION); + + QueryMessage queryMessage = new QueryMessage(String.format("CREATE TABLE %s.atable (pk int PRIMARY KEY, v text)", KEYSPACE), + queryOptions); + client.execute(queryMessage); + queryMessage = new QueryMessage(String.format("SELECT * FROM %s.atable", KEYSPACE), + queryOptions); + client.execute(queryMessage); + } + finally + { + client.close(); + } + } + + @Test + public void testQueryExecutionWithoutThrowOnOverloadAndInflightLimitedExceeded() throws Throwable + { + SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(), + nativePort, + Server.CURRENT_VERSION, + new EncryptionOptions.ClientEncryptionOptions()); + + try + { + client.connect(false, false); + QueryOptions queryOptions = QueryOptions.create( + QueryOptions.DEFAULT.getConsistency(), + QueryOptions.DEFAULT.getValues(), + QueryOptions.DEFAULT.skipMetadata(), + QueryOptions.DEFAULT.getPageSize(), + QueryOptions.DEFAULT.getPagingState(), + QueryOptions.DEFAULT.getSerialConsistency(), + Server.CURRENT_VERSION); + + QueryMessage queryMessage = new QueryMessage(String.format("CREATE TABLE %s.atable (pk int PRIMARY KEY, v text)", KEYSPACE), + queryOptions); + client.execute(queryMessage); + + queryMessage = new QueryMessage(String.format("INSERT INTO %s.atable (pk, v) VALUES (1, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa [...] + queryOptions); + client.execute(queryMessage); + } + finally + { + client.close(); + } + } + + @Test + public void testOverloadedExceptionForEndpointInflightLimit() throws Throwable + { + SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(), + nativePort, + Server.CURRENT_VERSION, + new EncryptionOptions.ClientEncryptionOptions()); + + try + { + client.connect(false, true); + QueryOptions queryOptions = QueryOptions.create( + QueryOptions.DEFAULT.getConsistency(), + QueryOptions.DEFAULT.getValues(), + QueryOptions.DEFAULT.skipMetadata(), + QueryOptions.DEFAULT.getPageSize(), + QueryOptions.DEFAULT.getPagingState(), + QueryOptions.DEFAULT.getSerialConsistency(), + Server.CURRENT_VERSION); + + QueryMessage queryMessage = new QueryMessage(String.format("CREATE TABLE %s.atable (pk int PRIMARY KEY, v text)", KEYSPACE), + queryOptions); + client.execute(queryMessage); + + queryMessage = new QueryMessage(String.format("INSERT INTO %s.atable (pk, v) VALUES (1, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa [...] + queryOptions); + try + { + client.execute(queryMessage); + Assert.fail(); + } + catch (RuntimeException e) + { + Assert.assertTrue(e.getCause() instanceof OverloadedException); + } + } + finally + { + client.close(); + } + } + + @Test + public void testOverloadedExceptionForOverallInflightLimit() throws Throwable + { + SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(), + nativePort, + Server.CURRENT_VERSION, + new EncryptionOptions.ClientEncryptionOptions()); + + try + { + client.connect(false, true); + QueryOptions queryOptions = QueryOptions.create( + QueryOptions.DEFAULT.getConsistency(), + QueryOptions.DEFAULT.getValues(), + QueryOptions.DEFAULT.skipMetadata(), + QueryOptions.DEFAULT.getPageSize(), + QueryOptions.DEFAULT.getPagingState(), + QueryOptions.DEFAULT.getSerialConsistency(), + Server.CURRENT_VERSION); + + QueryMessage queryMessage = new QueryMessage(String.format("CREATE TABLE %s.atable (pk int PRIMARY KEY, v text)", KEYSPACE), + queryOptions); + client.execute(queryMessage); + + queryMessage = new QueryMessage(String.format("INSERT INTO %s.atable (pk, v) VALUES (1, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa [...] + queryOptions); + try + { + client.execute(queryMessage); + Assert.fail(); + } + catch (RuntimeException e) + { + Assert.assertTrue(e.getCause() instanceof OverloadedException); + } + } + finally + { + client.close(); + } + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org