Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 790256825 -> b1c2536cc


Add ability to limit number of native connections

Patch by Norman Maurer; reviewed by jmckenzie for CASSANDRA-8086


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e56d9efb
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e56d9efb
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e56d9efb

Branch: refs/heads/cassandra-2.1
Commit: e56d9efb7c18138fac9059207568598bbb964eb9
Parents: 2428b9c
Author: Norman Maurer <nor...@apache.org>
Authored: Thu Mar 5 10:37:53 2015 -0600
Committer: Joshua McKenzie <jmcken...@apache.org>
Committed: Thu Mar 5 10:37:53 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 conf/cassandra.yaml                             |   8 ++
 .../org/apache/cassandra/config/Config.java     |   2 +
 .../cassandra/config/DatabaseDescriptor.java    |  19 ++++
 .../apache/cassandra/service/StorageProxy.java  |   8 +-
 .../cassandra/service/StorageProxyMBean.java    |   3 +
 .../transport/ConnectionLimitHandler.java       | 111 +++++++++++++++++++
 .../org/apache/cassandra/transport/Server.java  |   9 ++
 8 files changed, 160 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e56d9efb/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f98bb3f..4e34c9e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0.13:
+ * Add ability to limit number of native connections (CASSANDRA-8086)
  * Add offline tool to relevel sstables (CASSANDRA-8301)
  * Preserve stream ID for more protocol errors (CASSANDRA-8848)
  * Fix combining token() function with multi-column relations on

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e56d9efb/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 163ae9e..f99ade1 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -332,6 +332,14 @@ native_transport_port: 9042
 # be rejected as invalid. The default is 256MB.
 # native_transport_max_frame_size_in_mb: 256
 
+# The maximum number of concurrent client connections.
+# The default is -1, which means unlimited.
+# native_transport_max_concurrent_connections: -1
+
+# The maximum number of concurrent client connections per source ip.
+# The default is -1, which means unlimited.
+# native_transport_max_concurrent_connections_per_ip: -1
+
 # Whether to start the thrift rpc server.
 start_rpc: true
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e56d9efb/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java 
b/src/java/org/apache/cassandra/config/Config.java
index 4dd71aa..3c223e3 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -112,6 +112,8 @@ public class Config
     public Integer native_transport_port = 9042;
     public Integer native_transport_max_threads = 128;
     public Integer native_transport_max_frame_size_in_mb = 256;
+    public volatile Long native_transport_max_concurrent_connections = -1L;
+    public volatile Long native_transport_max_concurrent_connections_per_ip = 
-1L;
 
     @Deprecated
     public Integer thrift_max_message_length_in_mb = 16;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e56d9efb/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 286014e..b3b10c1 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1108,6 +1108,25 @@ public class DatabaseDescriptor
         return conf.native_transport_max_frame_size_in_mb * 1024 * 1024;
     }
 
+    public static Long getNativeTransportMaxConcurrentConnections()
+    {
+        return conf.native_transport_max_concurrent_connections;
+    }
+
+    public static void setNativeTransportMaxConcurrentConnections(long 
nativeTransportMaxConcurrentConnections)
+    {
+        conf.native_transport_max_concurrent_connections = 
nativeTransportMaxConcurrentConnections;
+    }
+
+    public static Long getNativeTransportMaxConcurrentConnectionsPerIp() {
+        return conf.native_transport_max_concurrent_connections_per_ip;
+    }
+
+    public static void setNativeTransportMaxConcurrentConnectionsPerIp(long 
native_transport_max_concurrent_connections_per_ip)
+    {
+        conf.native_transport_max_concurrent_connections_per_ip = 
native_transport_max_concurrent_connections_per_ip;
+    }
+
     public static double getCommitLogSyncBatchWindow()
     {
         return conf.commitlog_sync_batch_window_in_ms;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e56d9efb/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java 
b/src/java/org/apache/cassandra/service/StorageProxy.java
index fcc9665..d033929 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -2125,9 +2125,15 @@ public class StorageProxy implements StorageProxyMBean
 
     public Long getTruncateRpcTimeout() { return 
DatabaseDescriptor.getTruncateRpcTimeout(); }
     public void setTruncateRpcTimeout(Long timeoutInMillis) { 
DatabaseDescriptor.setTruncateRpcTimeout(timeoutInMillis); }
+
+    public Long getNativeTransportMaxConcurrentConnections() { return 
DatabaseDescriptor.getNativeTransportMaxConcurrentConnections(); }
+    public void setNativeTransportMaxConcurrentConnections(Long 
nativeTransportMaxConcurrentConnections) { 
DatabaseDescriptor.setNativeTransportMaxConcurrentConnections(nativeTransportMaxConcurrentConnections);
 }
+
+    public Long getNativeTransportMaxConcurrentConnectionsPerIp() { return 
DatabaseDescriptor.getNativeTransportMaxConcurrentConnectionsPerIp(); }
+    public void setNativeTransportMaxConcurrentConnectionsPerIp(Long 
nativeTransportMaxConcurrentConnections) { 
DatabaseDescriptor.setNativeTransportMaxConcurrentConnectionsPerIp(nativeTransportMaxConcurrentConnections);
 }
+
     public void reloadTriggerClasses() { 
TriggerExecutor.instance.reloadClasses(); }
 
-    
     public long getReadRepairAttempted() {
         return ReadRepairMetrics.attempted.count();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e56d9efb/src/java/org/apache/cassandra/service/StorageProxyMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxyMBean.java 
b/src/java/org/apache/cassandra/service/StorageProxyMBean.java
index 203cabe..03b9b58 100644
--- a/src/java/org/apache/cassandra/service/StorageProxyMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageProxyMBean.java
@@ -95,6 +95,9 @@ public interface StorageProxyMBean
     public Long getTruncateRpcTimeout();
     public void setTruncateRpcTimeout(Long timeoutInMillis);
 
+    public void setNativeTransportMaxConcurrentConnections(Long 
nativeTransportMaxConcurrentConnections);
+    public Long getNativeTransportMaxConcurrentConnections();
+
     public void reloadTriggerClasses();
 
     public long getReadRepairAttempted();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e56d9efb/src/java/org/apache/cassandra/transport/ConnectionLimitHandler.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/transport/ConnectionLimitHandler.java 
b/src/java/org/apache/cassandra/transport/ConnectionLimitHandler.java
new file mode 100644
index 0000000..c45d2cb
--- /dev/null
+++ b/src/java/org/apache/cassandra/transport/ConnectionLimitHandler.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.transport;
+
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.jboss.netty.channel.ChannelHandler;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+/**
+ * {@link SimpleChannelUpstreamHandler} implementation which allows to limit 
the number of concurrent
+ * connections to the Server. Be aware this <strong>MUST</strong> be shared 
between all child channels.
+ */
+@ChannelHandler.Sharable
+final class ConnectionLimitHandler extends SimpleChannelUpstreamHandler
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(ConnectionLimitHandler.class);
+    private final ConcurrentMap<InetAddress, AtomicLong> connectionsPerClient 
= new ConcurrentHashMap<>();
+    private final AtomicLong counter = new AtomicLong(0);
+
+    @Override
+    public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent 
event) throws Exception
+    {
+        final long count = counter.incrementAndGet();
+        long limit = 
DatabaseDescriptor.getNativeTransportMaxConcurrentConnections();
+        // Setting the limit to -1 disables it.
+        if(limit < 0)
+        {
+            limit = Long.MAX_VALUE;
+        }
+        if (count > limit)
+        {
+            // The decrement will be done in channelClosed(...)
+            logger.warn("Exceeded maximum native connection limit of {} by 
using {} connections", limit, count);
+            ctx.getChannel().close();
+        }
+        else
+        {
+            long perIpLimit = 
DatabaseDescriptor.getNativeTransportMaxConcurrentConnectionsPerIp();
+            if (perIpLimit > 0)
+            {
+                InetAddress address = ((InetSocketAddress) 
ctx.getChannel().getRemoteAddress()).getAddress();
+
+                AtomicLong perIpCount = connectionsPerClient.get(address);
+                if (perIpCount == null)
+                {
+                    perIpCount = new AtomicLong(0);
+
+                    AtomicLong old = connectionsPerClient.putIfAbsent(address, 
perIpCount);
+                    if (old != null)
+                    {
+                        perIpCount = old;
+                    }
+                }
+                if (perIpCount.incrementAndGet() > perIpLimit)
+                {
+                    // The decrement will be done in channelClosed(...)
+                    logger.warn("Exceeded maximum native connection limit per 
ip of {} by using {} connections", perIpLimit, perIpCount);
+                    ctx.getChannel().close();
+                    return;
+                }
+            }
+            super.channelOpen(ctx, event);
+        }
+    }
+
+    @Override
+    public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent 
event) throws Exception
+    {
+        counter.decrementAndGet();
+        InetAddress address = ((InetSocketAddress) 
ctx.getChannel().getRemoteAddress()).getAddress();
+
+        AtomicLong count = connectionsPerClient.get(address);
+        if (count != null)
+        {
+            if (count.decrementAndGet() <= 0)
+            {
+                connectionsPerClient.remove(address);
+            }
+        }
+        super.channelClosed(ctx, event);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e56d9efb/src/java/org/apache/cassandra/transport/Server.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Server.java 
b/src/java/org/apache/cassandra/transport/Server.java
index df4f127..30b8a9d 100644
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@ -240,6 +240,7 @@ public class Server implements CassandraDaemon.Server
         private static final Frame.Compressor frameCompressor = new 
Frame.Compressor();
         private static final Frame.Encoder frameEncoder = new Frame.Encoder();
         private static final Message.Dispatcher dispatcher = new 
Message.Dispatcher();
+        private static final ConnectionLimitHandler connectionLimitHandler = 
new ConnectionLimitHandler();
 
         private final Server server;
 
@@ -252,6 +253,14 @@ public class Server implements CassandraDaemon.Server
         {
             ChannelPipeline pipeline = Channels.pipeline();
 
+            // Add the ConnectionLimitHandler to the pipeline if configured to 
do so.
+            if 
(DatabaseDescriptor.getNativeTransportMaxConcurrentConnections() > 0
+                    || 
DatabaseDescriptor.getNativeTransportMaxConcurrentConnectionsPerIp() > 0)
+            {
+                // Add as first to the pipeline so the limit is enforced as 
first action.
+                pipeline.addFirst("connectionLimitHandler", 
connectionLimitHandler);
+            }
+
             //pipeline.addLast("debug", new LoggingHandler());
 
             pipeline.addLast("frameDecoder", new 
Frame.Decoder(server.connectionFactory));

Reply via email to