Author: norman
Date: Tue Jan 25 12:07:51 2011
New Revision: 1063245
URL: http://svn.apache.org/viewvc?rev=1063245&view=rev
Log:
Use ConcurrentHashMap to remove the synchronized blocks and make it possible to
change the limits on runtime. These methods are not synchronized as I don't
think it worth the performance decrease for this
Modified:
james/protocols/trunk/impl/src/main/java/org/jboss/netty/handler/connection/ConnectionLimitUpstreamHandler.java
james/protocols/trunk/impl/src/main/java/org/jboss/netty/handler/connection/ConnectionPerIpLimitUpstreamHandler.java
Modified:
james/protocols/trunk/impl/src/main/java/org/jboss/netty/handler/connection/ConnectionLimitUpstreamHandler.java
URL:
http://svn.apache.org/viewvc/james/protocols/trunk/impl/src/main/java/org/jboss/netty/handler/connection/ConnectionLimitUpstreamHandler.java?rev=1063245&r1=1063244&r2=1063245&view=diff
==============================================================================
---
james/protocols/trunk/impl/src/main/java/org/jboss/netty/handler/connection/ConnectionLimitUpstreamHandler.java
(original)
+++
james/protocols/trunk/impl/src/main/java/org/jboss/netty/handler/connection/ConnectionLimitUpstreamHandler.java
Tue Jan 25 12:07:51 2011
@@ -37,18 +37,26 @@ import org.jboss.netty.channel.SimpleCha
public class ConnectionLimitUpstreamHandler extends
SimpleChannelUpstreamHandler{
private final AtomicInteger connections = new AtomicInteger(0);
- private final int maxConnections;
+ private int maxConnections;
public ConnectionLimitUpstreamHandler(int maxConnections) {
this.maxConnections = maxConnections;
}
+ public int getConnections() {
+ return connections.get();
+ }
+
+ public void setMaxConnections(int maxConnections) {
+ this.maxConnections = maxConnections;
+ }
+
@Override
public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
if (maxConnections > 0) {
int currentCount = connections.getAndIncrement();
- if (currentCount + 1 > maxConnections) {
+ if (currentCount > maxConnections) {
ctx.getChannel().close();
connections.decrementAndGet();
}
Modified:
james/protocols/trunk/impl/src/main/java/org/jboss/netty/handler/connection/ConnectionPerIpLimitUpstreamHandler.java
URL:
http://svn.apache.org/viewvc/james/protocols/trunk/impl/src/main/java/org/jboss/netty/handler/connection/ConnectionPerIpLimitUpstreamHandler.java?rev=1063245&r1=1063244&r2=1063245&view=diff
==============================================================================
---
james/protocols/trunk/impl/src/main/java/org/jboss/netty/handler/connection/ConnectionPerIpLimitUpstreamHandler.java
(original)
+++
james/protocols/trunk/impl/src/main/java/org/jboss/netty/handler/connection/ConnectionPerIpLimitUpstreamHandler.java
Tue Jan 25 12:07:51 2011
@@ -19,8 +19,9 @@
package org.jboss.netty.handler.connection;
import java.net.InetSocketAddress;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
@@ -38,36 +39,50 @@ import org.jboss.netty.channel.SimpleCha
*/
public class ConnectionPerIpLimitUpstreamHandler extends
SimpleChannelUpstreamHandler{
- private final Map<String, Integer> connections = new HashMap<String,
Integer>();
- private final int maxConnectionsPerIp;
+ private final ConcurrentMap<String, AtomicInteger> connections = new
ConcurrentHashMap<String, AtomicInteger>();
+ private int maxConnectionsPerIp;
public ConnectionPerIpLimitUpstreamHandler(int maxConnectionsPerIp) {
this.maxConnectionsPerIp = maxConnectionsPerIp;
}
+ public int getConnections(String ip) {
+ AtomicInteger count = connections.get(ip);
+ if (count == null) {
+ return 0;
+ } else {
+ return count.get();
+ }
+ }
+
+ public void setMaxConnectionsPerIp(int maxConnectionsPerIp) {
+ this.maxConnectionsPerIp = maxConnectionsPerIp;
+ }
+
+
@Override
public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
if (maxConnectionsPerIp > 0) {
InetSocketAddress remoteAddress = (InetSocketAddress)
ctx.getChannel().getRemoteAddress();
String remoteIp = remoteAddress.getAddress().getHostAddress();
- synchronized (connections) {
- Integer count = connections.get(remoteIp);
+
+ AtomicInteger atomicCount = connections.get(remoteIp);
- if (count == null) {
- count = new Integer(1);
- connections.put(remoteIp, count);
- } else {
- count++;
- if (count > maxConnectionsPerIp) {
- ctx.getChannel().close();
- count--;
- }
- connections.put(remoteIp, count);
+ if (atomicCount == null) {
+ atomicCount = new AtomicInteger(1);
+ AtomicInteger oldAtomicCount =
connections.putIfAbsent(remoteIp, atomicCount);
+ // if another thread put a new counter for this ip, we must
use the other one.
+ if (oldAtomicCount != null) {
+ atomicCount = oldAtomicCount;
+ }
+ } else {
+ Integer count = atomicCount.incrementAndGet();
+ if (count > maxConnectionsPerIp) {
+ ctx.getChannel().close();
+ atomicCount.decrementAndGet();
}
-
}
-
}
super.channelOpen(ctx, e);
@@ -77,14 +92,13 @@ public class ConnectionPerIpLimitUpstrea
if (maxConnectionsPerIp > 0) {
InetSocketAddress remoteAddress = (InetSocketAddress)
ctx.getChannel().getRemoteAddress();
String remoteIp = remoteAddress.getAddress().getHostAddress();
- synchronized (connections) {
- Integer count = connections.get(remoteIp);
- if (count != null) {
- count--;
- connections.put(remoteIp, count);
- }
- }
+
+ AtomicInteger atomicCount = connections.get(remoteIp);
+ if (atomicCount != null) {
+ atomicCount.decrementAndGet();
+ }
+
}
super.channelClosed(ctx, e);
}
-}
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]