Repository: cassandra Updated Branches: refs/heads/cassandra-2.1 790256825 -> b1c2536cc
Add ability to limit number of native connections Patch by Norman Maurer; reviewed by jmckenzie for CASSANDRA-8086 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e56d9efb Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e56d9efb Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e56d9efb Branch: refs/heads/cassandra-2.1 Commit: e56d9efb7c18138fac9059207568598bbb964eb9 Parents: 2428b9c Author: Norman Maurer <nor...@apache.org> Authored: Thu Mar 5 10:37:53 2015 -0600 Committer: Joshua McKenzie <jmcken...@apache.org> Committed: Thu Mar 5 10:37:53 2015 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + conf/cassandra.yaml | 8 ++ .../org/apache/cassandra/config/Config.java | 2 + .../cassandra/config/DatabaseDescriptor.java | 19 ++++ .../apache/cassandra/service/StorageProxy.java | 8 +- .../cassandra/service/StorageProxyMBean.java | 3 + .../transport/ConnectionLimitHandler.java | 111 +++++++++++++++++++ .../org/apache/cassandra/transport/Server.java | 9 ++ 8 files changed, 160 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e56d9efb/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index f98bb3f..4e34c9e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.0.13: + * Add ability to limit number of native connections (CASSANDRA-8086) * Add offline tool to relevel sstables (CASSANDRA-8301) * Preserve stream ID for more protocol errors (CASSANDRA-8848) * Fix combining token() function with multi-column relations on http://git-wip-us.apache.org/repos/asf/cassandra/blob/e56d9efb/conf/cassandra.yaml ---------------------------------------------------------------------- diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index 163ae9e..f99ade1 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -332,6 +332,14 @@ native_transport_port: 9042 # be rejected as invalid. The default is 256MB. # native_transport_max_frame_size_in_mb: 256 +# The maximum number of concurrent client connections. +# The default is -1, which means unlimited. +# native_transport_max_concurrent_connections: -1 + +# The maximum number of concurrent client connections per source ip. +# The default is -1, which means unlimited. +# native_transport_max_concurrent_connections_per_ip: -1 + # Whether to start the thrift rpc server. start_rpc: true http://git-wip-us.apache.org/repos/asf/cassandra/blob/e56d9efb/src/java/org/apache/cassandra/config/Config.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index 4dd71aa..3c223e3 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -112,6 +112,8 @@ public class Config public Integer native_transport_port = 9042; public Integer native_transport_max_threads = 128; public Integer native_transport_max_frame_size_in_mb = 256; + public volatile Long native_transport_max_concurrent_connections = -1L; + public volatile Long native_transport_max_concurrent_connections_per_ip = -1L; @Deprecated public Integer thrift_max_message_length_in_mb = 16; http://git-wip-us.apache.org/repos/asf/cassandra/blob/e56d9efb/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 286014e..b3b10c1 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -1108,6 +1108,25 @@ public class DatabaseDescriptor return conf.native_transport_max_frame_size_in_mb * 1024 * 1024; } + public static Long getNativeTransportMaxConcurrentConnections() + { + return conf.native_transport_max_concurrent_connections; + } + + public static void setNativeTransportMaxConcurrentConnections(long nativeTransportMaxConcurrentConnections) + { + conf.native_transport_max_concurrent_connections = nativeTransportMaxConcurrentConnections; + } + + public static Long getNativeTransportMaxConcurrentConnectionsPerIp() { + return conf.native_transport_max_concurrent_connections_per_ip; + } + + public static void setNativeTransportMaxConcurrentConnectionsPerIp(long native_transport_max_concurrent_connections_per_ip) + { + conf.native_transport_max_concurrent_connections_per_ip = native_transport_max_concurrent_connections_per_ip; + } + public static double getCommitLogSyncBatchWindow() { return conf.commitlog_sync_batch_window_in_ms; http://git-wip-us.apache.org/repos/asf/cassandra/blob/e56d9efb/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index fcc9665..d033929 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -2125,9 +2125,15 @@ public class StorageProxy implements StorageProxyMBean public Long getTruncateRpcTimeout() { return DatabaseDescriptor.getTruncateRpcTimeout(); } public void setTruncateRpcTimeout(Long timeoutInMillis) { DatabaseDescriptor.setTruncateRpcTimeout(timeoutInMillis); } + + public Long getNativeTransportMaxConcurrentConnections() { return DatabaseDescriptor.getNativeTransportMaxConcurrentConnections(); } + public void setNativeTransportMaxConcurrentConnections(Long nativeTransportMaxConcurrentConnections) { DatabaseDescriptor.setNativeTransportMaxConcurrentConnections(nativeTransportMaxConcurrentConnections); } + + public Long getNativeTransportMaxConcurrentConnectionsPerIp() { return DatabaseDescriptor.getNativeTransportMaxConcurrentConnectionsPerIp(); } + public void setNativeTransportMaxConcurrentConnectionsPerIp(Long nativeTransportMaxConcurrentConnections) { DatabaseDescriptor.setNativeTransportMaxConcurrentConnectionsPerIp(nativeTransportMaxConcurrentConnections); } + public void reloadTriggerClasses() { TriggerExecutor.instance.reloadClasses(); } - public long getReadRepairAttempted() { return ReadRepairMetrics.attempted.count(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e56d9efb/src/java/org/apache/cassandra/service/StorageProxyMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxyMBean.java b/src/java/org/apache/cassandra/service/StorageProxyMBean.java index 203cabe..03b9b58 100644 --- a/src/java/org/apache/cassandra/service/StorageProxyMBean.java +++ b/src/java/org/apache/cassandra/service/StorageProxyMBean.java @@ -95,6 +95,9 @@ public interface StorageProxyMBean public Long getTruncateRpcTimeout(); public void setTruncateRpcTimeout(Long timeoutInMillis); + public void setNativeTransportMaxConcurrentConnections(Long nativeTransportMaxConcurrentConnections); + public Long getNativeTransportMaxConcurrentConnections(); + public void reloadTriggerClasses(); public long getReadRepairAttempted(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e56d9efb/src/java/org/apache/cassandra/transport/ConnectionLimitHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/ConnectionLimitHandler.java b/src/java/org/apache/cassandra/transport/ConnectionLimitHandler.java new file mode 100644 index 0000000..c45d2cb --- /dev/null +++ b/src/java/org/apache/cassandra/transport/ConnectionLimitHandler.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.transport; + + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.jboss.netty.channel.ChannelHandler; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.ChannelStateEvent; +import org.jboss.netty.channel.SimpleChannelUpstreamHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; + + +/** + * {@link SimpleChannelUpstreamHandler} implementation which allows to limit the number of concurrent + * connections to the Server. Be aware this <strong>MUST</strong> be shared between all child channels. + */ +@ChannelHandler.Sharable +final class ConnectionLimitHandler extends SimpleChannelUpstreamHandler +{ + private static final Logger logger = LoggerFactory.getLogger(ConnectionLimitHandler.class); + private final ConcurrentMap<InetAddress, AtomicLong> connectionsPerClient = new ConcurrentHashMap<>(); + private final AtomicLong counter = new AtomicLong(0); + + @Override + public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent event) throws Exception + { + final long count = counter.incrementAndGet(); + long limit = DatabaseDescriptor.getNativeTransportMaxConcurrentConnections(); + // Setting the limit to -1 disables it. + if(limit < 0) + { + limit = Long.MAX_VALUE; + } + if (count > limit) + { + // The decrement will be done in channelClosed(...) + logger.warn("Exceeded maximum native connection limit of {} by using {} connections", limit, count); + ctx.getChannel().close(); + } + else + { + long perIpLimit = DatabaseDescriptor.getNativeTransportMaxConcurrentConnectionsPerIp(); + if (perIpLimit > 0) + { + InetAddress address = ((InetSocketAddress) ctx.getChannel().getRemoteAddress()).getAddress(); + + AtomicLong perIpCount = connectionsPerClient.get(address); + if (perIpCount == null) + { + perIpCount = new AtomicLong(0); + + AtomicLong old = connectionsPerClient.putIfAbsent(address, perIpCount); + if (old != null) + { + perIpCount = old; + } + } + if (perIpCount.incrementAndGet() > perIpLimit) + { + // The decrement will be done in channelClosed(...) + logger.warn("Exceeded maximum native connection limit per ip of {} by using {} connections", perIpLimit, perIpCount); + ctx.getChannel().close(); + return; + } + } + super.channelOpen(ctx, event); + } + } + + @Override + public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent event) throws Exception + { + counter.decrementAndGet(); + InetAddress address = ((InetSocketAddress) ctx.getChannel().getRemoteAddress()).getAddress(); + + AtomicLong count = connectionsPerClient.get(address); + if (count != null) + { + if (count.decrementAndGet() <= 0) + { + connectionsPerClient.remove(address); + } + } + super.channelClosed(ctx, event); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/e56d9efb/src/java/org/apache/cassandra/transport/Server.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java index df4f127..30b8a9d 100644 --- a/src/java/org/apache/cassandra/transport/Server.java +++ b/src/java/org/apache/cassandra/transport/Server.java @@ -240,6 +240,7 @@ public class Server implements CassandraDaemon.Server private static final Frame.Compressor frameCompressor = new Frame.Compressor(); private static final Frame.Encoder frameEncoder = new Frame.Encoder(); private static final Message.Dispatcher dispatcher = new Message.Dispatcher(); + private static final ConnectionLimitHandler connectionLimitHandler = new ConnectionLimitHandler(); private final Server server; @@ -252,6 +253,14 @@ public class Server implements CassandraDaemon.Server { ChannelPipeline pipeline = Channels.pipeline(); + // Add the ConnectionLimitHandler to the pipeline if configured to do so. + if (DatabaseDescriptor.getNativeTransportMaxConcurrentConnections() > 0 + || DatabaseDescriptor.getNativeTransportMaxConcurrentConnectionsPerIp() > 0) + { + // Add as first to the pipeline so the limit is enforced as first action. + pipeline.addFirst("connectionLimitHandler", connectionLimitHandler); + } + //pipeline.addLast("debug", new LoggingHandler()); pipeline.addLast("frameDecoder", new Frame.Decoder(server.connectionFactory));