Hi Peter! Your addition of ReadThrottleFilter looks good for me. It is very creative! :)
Here's my comments: * The class name ends with 'Filter', but it's not a filter. Wouldn't there be a better name? This is very close to IoFilterChainBuilder, but it's too long. Any idea? * We have to increate the counter by remaining() instead of capacity() to be more accurate. * Is it impossible to use getTrafficMask() instead of using the session attribute? Was there any reason to do so? That's all. Everything is superb except that. :D Cheers, Trustin On 6/28/06, [EMAIL PROTECTED] <[EMAIL PROTECTED]> wrote:
Author: proyal Date: Tue Jun 27 09:31:30 2006 New Revision: 417502 URL: http://svn.apache.org/viewvc?rev=417502&view=rev Log: DIRMINA-206 - Filter that watches the total amount queue'd for a connection in the ThreadPoolFilter and will disable reads for a connection when a configurable (default 1mb) limit is reached. Added: directory/trunks/mina/core/src/main/java/org/apache/mina/filter/ReadThrottleFilter.java Added: directory/trunks/mina/core/src/main/java/org/apache/mina/filter/ReadThrottleFilter.java URL: http://svn.apache.org/viewvc/directory/trunks/mina/core/src/main/java/org/apache/mina/filter/ReadThrottleFilter.java?rev=417502&view=auto ============================================================================== --- directory/trunks/mina/core/src/main/java/org/apache/mina/filter/ReadThrottleFilter.java (added) +++ directory/trunks/mina/core/src/main/java/org/apache/mina/filter/ReadThrottleFilter.java Tue Jun 27 09:31:30 2006 @@ -0,0 +1,192 @@ +/* + * @(#) $Id: StreamWriteFilterTest.java 389042 2006-03-27 07:49:41Z trustin $ + * + * Copyright 2004 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.mina.filter; + +import java.util.Iterator; +import java.util.List; + +import org.apache.mina.common.ByteBuffer; +import org.apache.mina.common.DefaultIoFilterChainBuilder; +import org.apache.mina.common.IoFilterAdapter; +import org.apache.mina.common.IoFilterChain; +import org.apache.mina.common.IoSession; +import org.apache.mina.filter.thread.ThreadPoolFilter; + +/** + * This filter will automatically disable reads on an <code>IoSession</code> once the data batched for that session in + * the <code>ThreadPoolFilter</code> reaches a defined threshold (the default is 1mb). It accomplishes this by being in + * the filter chain before <strong>and</strong> after the <code>ThreadPoolFilter</code>. It is possible to subvert the + * behavior of this filter by adding filters immediately after the <code>ThreadPoolFilter</code> after adding this + * filter. Thus, it is recommended to add this filter towards the end of your filter chain construction, if you need to + * ensure that other filters need to be right after the <code>ThreadPoolFilter</code>. + * + * <p>Usage: + * + * <pre><code> + * DefaultFilterChainBuilder builder = ... + * ReadThrottleFilter filter = new ReadThrottleFilter(); + * filter.attach( builder ); + * </code></pre> + * + * or + * + * <pre><code> + * IoFilterChain chain = ... + * ReadThrottleFilter filter = new ReadThrottleFilter(); + * filter.attach( chain ); + * </code></pre> + * + * @author The Apache Directory Project ([email protected]) + * @version $Rev: 406554 $, $Date: 2006-05-15 06:46:02Z $ + */ +public class ReadThrottleFilter +{ + public static final String COUNTER = ReadThrottleFilter.class.getName() + ".counter"; + public static final String SUSPENDED_READS = ReadThrottleFilter.class.getName() + ".suspendedReads"; + + private volatile int maximumConnectionBufferSize = 1024 * 1024; // 1mb + + /** + * Set the maximum amount of data to buffer in the ThreadPoolFilter prior to disabling reads. Changing the value + * will only take effect when new data is received for a connection, including existing connections. Default value + * is 1mb + * + * @param maximumConnectionBufferSize New buffer size. Must be > 0 + */ + public void setMaximumConnectionBufferSize( int maximumConnectionBufferSize ) + { + this.maximumConnectionBufferSize = maximumConnectionBufferSize; + } + + /** + * Attach this filter to the specified filter chain. It will search for the ThreadPoolFilter, and attach itself + * before and after that filter. + * + * @param chain <code>IoFilterChain</code> to attach self to. + */ + public void attach( IoFilterChain chain ) + { + String name = getThreadPoolFilterEntryName( chain.getAll() ); + + chain.addBefore( name, getClass().getName() + ".add", new Add() ); + chain.addAfter( name, getClass().getName() + ".release", new Release() ); + } + + /** + * Attach this filter to the specified builder. It will search for the ThreadPoolFilter, and attach itself before + * and after that filter. + * + * @param builder <code>DefaultIoFilterChainBuilder</code> to attach self to. + */ + public void attach( DefaultIoFilterChainBuilder builder ) + { + String name = getThreadPoolFilterEntryName( builder.getAll() ); + + builder.addBefore( name, getClass().getName() + ".add", new Add() ); + builder.addAfter( name, getClass().getName() + ".release", new Release() ); + } + + private String getThreadPoolFilterEntryName( List entries ) + { + Iterator i = entries.iterator(); + + while( i.hasNext() ) + { + IoFilterChain.Entry entry = (IoFilterChain.Entry)i.next(); + + if( entry.getFilter().getClass().isAssignableFrom( ThreadPoolFilter.class ) ) + { + return entry.getName(); + } + } + + throw new IllegalStateException( "Chain does not contain a ThreadPoolFilter" ); + } + + private void add( IoSession session, int size ) + { + synchronized( session ) + { + int counter = getCounter( session ) + size; + + session.setAttribute( COUNTER, new Integer( counter ) ); + + if( counter >= maximumConnectionBufferSize && session.getTrafficMask().isReadable() ) + { + session.suspendRead(); + session.setAttribute( SUSPENDED_READS ); + } + } + } + + private void release( IoSession session, int size ) + { + synchronized( session ) + { + int counter = Math.max( 0, getCounter( session ) - size ); + + session.setAttribute( COUNTER, new Integer( counter ) ); + + if( counter < maximumConnectionBufferSize && isSuspendedReads( session ) ) + { + session.resumeRead(); + } + + session.removeAttribute( SUSPENDED_READS ); + } + } + + private boolean isSuspendedReads( IoSession session ) + { + Boolean flag = (Boolean)session.getAttribute( SUSPENDED_READS ); + + return null != flag && flag.booleanValue(); + } + + private int getCounter( IoSession session ) + { + Integer i = (Integer)session.getAttribute( COUNTER ); + return null == i ? 0 : i.intValue(); + } + + private class Add extends IoFilterAdapter + { + public void messageReceived( NextFilter nextFilter, IoSession session, Object message ) throws Exception + { + if( message instanceof ByteBuffer ) + { + add( session, ( (ByteBuffer)message ).capacity() ); + } + + nextFilter.messageReceived( session, message ); + } + } + + private class Release extends IoFilterAdapter + { + public void messageReceived( NextFilter nextFilter, IoSession session, Object message ) throws Exception + { + if( message instanceof ByteBuffer ) + { + release( session, ( (ByteBuffer)message ).capacity() ); + } + + nextFilter.messageReceived( session, message ); + } + } +}
-- what we call human nature is actually human habit -- http://gleamynode.net/ -- PGP key fingerprints: * E167 E6AF E73A CBCE EE41 4A29 544D DE48 FE95 4E7E * B693 628E 6047 4F8F CFA4 455E 1C62 A7DC 0255 ECA6
