Hi Peter!

Your addition of ReadThrottleFilter looks good for me.  It is very creative!
:)

Here's my comments:

* The class name ends with 'Filter', but it's not a filter.  Wouldn't there
be a better name?  This is very close to IoFilterChainBuilder, but it's too
long.  Any idea?
* We have to increate the counter by remaining() instead of capacity() to be
more accurate.
* Is it impossible to use getTrafficMask() instead of using the session
attribute?  Was there any reason to do so?

That's all.  Everything is superb except that. :D

Cheers,
Trustin

On 6/28/06, [EMAIL PROTECTED] <[EMAIL PROTECTED]> wrote:

Author: proyal
Date: Tue Jun 27 09:31:30 2006
New Revision: 417502

URL: http://svn.apache.org/viewvc?rev=417502&view=rev
Log:
DIRMINA-206 - Filter that watches the total amount queue'd for a
connection in the ThreadPoolFilter and will disable reads for a connection
when a configurable (default 1mb) limit is reached.

Added:

    
directory/trunks/mina/core/src/main/java/org/apache/mina/filter/ReadThrottleFilter.java

Added:
directory/trunks/mina/core/src/main/java/org/apache/mina/filter/ReadThrottleFilter.java
URL:
http://svn.apache.org/viewvc/directory/trunks/mina/core/src/main/java/org/apache/mina/filter/ReadThrottleFilter.java?rev=417502&view=auto

==============================================================================
---
directory/trunks/mina/core/src/main/java/org/apache/mina/filter/ReadThrottleFilter.java
(added)
+++
directory/trunks/mina/core/src/main/java/org/apache/mina/filter/ReadThrottleFilter.java
Tue Jun 27 09:31:30 2006
@@ -0,0 +1,192 @@
+/*
+ * @(#) $Id: StreamWriteFilterTest.java 389042 2006-03-27 07:49:41Z
trustin $
+ *
+ * Copyright 2004 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mina.filter;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.DefaultIoFilterChainBuilder;
+import org.apache.mina.common.IoFilterAdapter;
+import org.apache.mina.common.IoFilterChain;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.filter.thread.ThreadPoolFilter;
+
+/**
+ * This filter will automatically disable reads on an
<code>IoSession</code> once the data batched for that session in
+ * the <code>ThreadPoolFilter</code> reaches a defined threshold (the
default is 1mb). It accomplishes this by being in
+ * the filter chain before <strong>and</strong> after the
<code>ThreadPoolFilter</code>. It is possible to subvert the
+ * behavior of this filter by adding filters immediately after the
<code>ThreadPoolFilter</code> after adding this
+ * filter. Thus, it is recommended to add this filter towards the end of
your filter chain construction, if you need to
+ * ensure that other filters need to be right after the
<code>ThreadPoolFilter</code>.
+ *
+ * <p>Usage:
+ *
+ * <pre><code>
+ * DefaultFilterChainBuilder builder = ...
+ * ReadThrottleFilter filter = new ReadThrottleFilter();
+ * filter.attach( builder );
+ * </code></pre>
+ *
+ * or
+ *
+ * <pre><code>
+ * IoFilterChain chain = ...
+ * ReadThrottleFilter filter = new ReadThrottleFilter();
+ * filter.attach( chain );
+ * </code></pre>
+ *
+ * @author The Apache Directory Project ([email protected])
+ * @version $Rev: 406554 $, $Date: 2006-05-15 06:46:02Z $
+ */
+public class ReadThrottleFilter
+{
+    public static final String COUNTER = ReadThrottleFilter.class.getName()
+ ".counter";
+    public static final String SUSPENDED_READS =
ReadThrottleFilter.class.getName() + ".suspendedReads";
+
+    private volatile int maximumConnectionBufferSize = 1024 * 1024; //
1mb
+
+    /**
+     * Set the maximum amount of data to buffer in the ThreadPoolFilter
prior to disabling reads. Changing the value
+     * will only take effect when new data is received for a connection,
including existing connections. Default value
+     * is 1mb
+     *
+     * @param maximumConnectionBufferSize New buffer size. Must be > 0
+     */
+    public void setMaximumConnectionBufferSize( int
maximumConnectionBufferSize )
+    {
+        this.maximumConnectionBufferSize = maximumConnectionBufferSize;
+    }
+
+    /**
+     * Attach this filter to the specified filter chain. It will search
for the ThreadPoolFilter, and attach itself
+     * before and after that filter.
+     *
+     * @param chain <code>IoFilterChain</code> to attach self to.
+     */
+    public void attach( IoFilterChain chain )
+    {
+        String name = getThreadPoolFilterEntryName( chain.getAll() );
+
+        chain.addBefore( name, getClass().getName() + ".add", new Add()
);
+        chain.addAfter( name, getClass().getName() + ".release", new
Release() );
+    }
+
+    /**
+     * Attach this filter to the specified builder. It will search for
the ThreadPoolFilter, and attach itself before
+     * and after that filter.
+     *
+     * @param builder <code>DefaultIoFilterChainBuilder</code> to attach
self to.
+     */
+    public void attach( DefaultIoFilterChainBuilder builder )
+    {
+        String name = getThreadPoolFilterEntryName( builder.getAll() );
+
+        builder.addBefore( name, getClass().getName() + ".add", new Add()
);
+        builder.addAfter( name, getClass().getName() + ".release", new
Release() );
+    }
+
+    private String getThreadPoolFilterEntryName( List entries )
+    {
+        Iterator i = entries.iterator();
+
+        while( i.hasNext() )
+        {
+            IoFilterChain.Entry entry = (IoFilterChain.Entry)i.next();
+
+            if( entry.getFilter().getClass().isAssignableFrom(
ThreadPoolFilter.class ) )
+            {
+                return entry.getName();
+            }
+        }
+
+        throw new IllegalStateException( "Chain does not contain a
ThreadPoolFilter" );
+    }
+
+    private void add( IoSession session, int size )
+    {
+        synchronized( session )
+        {
+            int counter = getCounter( session ) + size;
+
+            session.setAttribute( COUNTER, new Integer( counter ) );
+
+            if( counter >= maximumConnectionBufferSize &&
session.getTrafficMask().isReadable() )
+            {
+                session.suspendRead();
+                session.setAttribute( SUSPENDED_READS );
+            }
+        }
+    }
+
+    private void release( IoSession session, int size )
+    {
+        synchronized( session )
+        {
+            int counter = Math.max( 0, getCounter( session ) - size );
+
+            session.setAttribute( COUNTER, new Integer( counter ) );
+
+            if( counter < maximumConnectionBufferSize &&
isSuspendedReads( session ) )
+            {
+                session.resumeRead();
+            }
+
+            session.removeAttribute( SUSPENDED_READS );
+        }
+    }
+
+    private boolean isSuspendedReads( IoSession session )
+    {
+        Boolean flag = (Boolean)session.getAttribute( SUSPENDED_READS );
+
+        return null != flag && flag.booleanValue();
+    }
+
+    private int getCounter( IoSession session )
+    {
+        Integer i = (Integer)session.getAttribute( COUNTER );
+        return null == i ? 0 : i.intValue();
+    }
+
+    private class Add extends IoFilterAdapter
+    {
+        public void messageReceived( NextFilter nextFilter, IoSession
session, Object message ) throws Exception
+        {
+            if( message instanceof ByteBuffer )
+            {
+                add( session, ( (ByteBuffer)message ).capacity() );
+            }
+
+            nextFilter.messageReceived( session, message );
+        }
+    }
+
+    private class Release extends IoFilterAdapter
+    {
+        public void messageReceived( NextFilter nextFilter, IoSession
session, Object message ) throws Exception
+        {
+            if( message instanceof ByteBuffer )
+            {
+                release( session, ( (ByteBuffer)message ).capacity() );
+            }
+
+            nextFilter.messageReceived( session, message );
+        }
+    }
+}





--
what we call human nature is actually human habit
--
http://gleamynode.net/
--
PGP key fingerprints:
* E167 E6AF E73A CBCE EE41  4A29 544D DE48 FE95 4E7E
* B693 628E 6047 4F8F CFA4  455E 1C62 A7DC 0255 ECA6

Reply via email to