I've attached a patch that should resolve this issue.

Let me know if there is anything else I should do to get this fixed in
the MINA codebase.

Thanks,
-adam

On Wed, Aug 12, 2009 at 8:48 AM, Adam Brown<[email protected]> wrote:
> Any news on this?
>
> On Fri, Jul 31, 2009 at 9:44 AM, Adam Brown<[email protected]> wrote:
>> here's some code that i think demonstrates the problem.  with the
>> calls to filterChainBuilder.addFirst() to add the SslFilter &
>> FileRegionWriteFilter, MyHandler.messageSent() is never called
>> (slightly different from what i've seen in our actual code, but pretty
>> close).
>>
>> let me know what else i can do to help track this down.
>>
>> -adam
>>
>> public class App {
>>        private static class MyHandler extends IoHandlerAdapter {
>>                private final IoBuffer ioBuffer = IoBuffer.wrap("test 
>> string".getBytes());
>>                private FileChannel fileChannel;
>>
>>                public MyHandler() {
>>                        try {
>>                                fileChannel = new 
>> FileInputStream("tmpfile").getChannel();
>>                        } catch (FileNotFoundException e) {
>>                                e.printStackTrace();
>>                                System.exit(1);
>>                        }
>>                }
>>
>>               �...@override
>>                public void messageReceived(IoSession session, Object message)
>> throws Exception {
>>                        session.write(ioBuffer);
>>                        session.write(new DefaultFileRegion(fileChannel));
>>                }
>>
>>               �...@override
>>                public void messageSent(IoSession session, Object message) {
>>                        System.out.println();
>>                        System.out.println("messageSent" + message);
>>                }
>>        }
>>
>>        public static void main(String[] args) throws
>> NoSuchAlgorithmException, KeyManagementException, IOException {
>>        SSLContext context = SSLContext.getInstance("TLS");
>>        context.init(null, null, null);
>>
>>        NioSocketAcceptor ioAcceptor = new NioSocketAcceptor(4);
>>        ioAcceptor.setReuseAddress(true);
>>        ioAcceptor.setBacklog(2048);
>>        ioAcceptor.getSessionConfig().setThroughputCalculationInterval(10);
>>        ioAcceptor.setCloseOnDeactivation(false);
>>
>>        DefaultIoFilterChainBuilder filterChainBuilder = new
>> DefaultIoFilterChainBuilder();
>>        filterChainBuilder.addFirst("sslFilter", new SslFilter(context));
>>        filterChainBuilder.addFirst("fileRegionFilter", new
>> FileRegionWriteFilter());
>>
>>        ioAcceptor.setFilterChainBuilder(filterChainBuilder);
>>                ioAcceptor.setHandler(new MyHandler());
>>                ioAcceptor.bind(new InetSocketAddress(8080));
>>        }
>> }
>>
>>
>> On Fri, Jul 31, 2009 at 8:39 AM, Adam Brown<[email protected]> wrote:
>>> On Thu, Jul 30, 2009 at 3:53 PM, Emmanuel Lecharny<[email protected]> 
>>> wrote:
>>>> Adam Brown wrote:
>>>>>
>>>>> i'm currently using a FileRegionWriteFilter 'before' an SslFilter to
>>>>> breakup FileRegion objects into chunks that SslFilter can handle
>>>>> (since SslFilter doesn't handle them directly).  however, this filter
>>>>> chain also has IoBuffers written to the same IoSession.
>>>>>
>>>>> the problem i'm seeing appears to be that the IoBuffer is not being
>>>>> sent by MINA (with messageSent() being sent back up the chain and
>>>>> IoHandler) before the first IoBuffer generated by the
>>>>> FileRegionWriteFilter is written.  because of this ordering, my
>>>>> IoBuffer (not the ones created by FileRegionWriteFilter) is "lost" to
>>>>> the chain and never has messageSent() called on it.  in addition to
>>>>> the messageSent() call made with the FileRegion, some of the IoBuffers
>>>>> generated by FileRegionWriteFilter "escape" and have messageSent()
>>>>> called on them, passing them up the filter chain (and ultimately to my
>>>>> IoHandler).
>>>>>
>>>>> so really, i've got two problems :-).
>>>>>
>>>>
>>>> Are you using an ExecutorFilter? If so, which kind of threadPool are you
>>>> using ?
>>>
>>> no, we are not using an ExecutorFilter on the filter chain.  all of
>>> the write()s are occuring in the same thread, just different places in
>>> our code.
>>>
>>>>
>>>> Can you push a very simple piece of code demonstrating the problem so that
>>>> we can debu it and understand what's going on ?
>>>
>>> yeah, i'll try to put together a simple example that exercises this.
>>> i did a little debugging before posting to the list & the general flow
>>> of events is as follows:
>>>
>>> our code:
>>> ioSession.write(myIoBuffer);
>>> ...
>>> ioSession.write(fileRegion);
>>>
>>> yields in the filter chain:
>>>
>>> AbstractStreamWriteFilter.filterWrite(myIoBuffer);
>>> AbstractStreamWriteFilter.filterWrite(fileRegion);
>>> AbstractStreamWriteFilter.messageSent(myIoBuffer);  -> gets "captured"
>>> by AbstractStreamWriteFilter
>>> AbstractStreamWriteFilter.messageSent(ioBufferForFileRegion);  ->
>>> called a couple of times
>>> ...
>>> => nextFilter.messageSent(fileRegion);
>>> nextFilter.messageSent(ioBufferForFileRegion);  -> called at least
>>> once, but this should have been "captured" by
>>> AbstractStreamWriteFilter
>>>
>>>>
>>>> Thanks !
>>>>
>>>> PS: MINA version, etc... That helps !
>>>
>>> right now, we're using 2.0.0-M3.  we had switched to -M5 a while back,
>>> but there was a serious performance drop-off caused by -M5 (which we
>>> haven't had a chance to track down yet), so we reverted to -M3.
>>>
>>>>
>>>> --
>>>> --
>>>> cordialement, regards,
>>>> Emmanuel Lécharny
>>>> www.iktek.com
>>>> directory.apache.org
>>>>
>>>>
>>>>
>>>
>>
>
--- /home/abrown/dev/mina/core/src/main/java/org/apache/mina/filter/stream/AbstractStreamWriteFilter.java	2009-07-20 14:26:46.000000000 -0700
+++ AbstractStreamWriteFilter.java	2009-08-24 14:55:53.000000000 -0700
@@ -17,10 +17,13 @@
  *  under the License.
  *
  */
package org.apache.mina.filter.stream;
 
 import java.io.IOException;
+import java.util.HashSet;
 import java.util.Queue;
+import java.util.Set;
 
 import org.apache.mina.core.buffer.IoBuffer;
 import org.apache.mina.core.filterchain.IoFilterAdapter;
@@ -49,6 +52,7 @@
 
     protected final AttributeKey WRITE_REQUEST_QUEUE = new AttributeKey(getClass(), "queue");
     protected final AttributeKey CURRENT_WRITE_REQUEST = new AttributeKey(getClass(), "writeRequest");
+    protected final AttributeKey WRITE_REQUESTS_FOR_CURRENT_STREAM = new AttributeKey(getClass(), "writeRequestsForCurrentStream");
 
     private int writeBufferSize = DEFAULT_STREAM_BUFFER_SIZE;
 
@@ -91,9 +95,11 @@
             } else {
                 session.setAttribute(CURRENT_STREAM, message);
                 session.setAttribute(CURRENT_WRITE_REQUEST, writeRequest);
+                session.setAttribute(WRITE_REQUESTS_FOR_CURRENT_STREAM, new HashSet<WriteRequest>());
 
-                nextFilter.filterWrite(session, new DefaultWriteRequest(
-                        buffer));
+                DefaultWriteRequest bufferWriteRequest = new DefaultWriteRequest(buffer);
+                getWriteRequestsForCurrentStream(session).add(bufferWriteRequest);
+                nextFilter.filterWrite(session, bufferWriteRequest);
             }
 
         } else {
@@ -113,21 +119,32 @@
         return (Queue<WriteRequest>) session.removeAttribute(WRITE_REQUEST_QUEUE);
     }
     
+    @SuppressWarnings("unchecked")
+    private Set<WriteRequest> getWriteRequestsForCurrentStream(IoSession session) {
+        return (Set<WriteRequest>) session.getAttribute(WRITE_REQUESTS_FOR_CURRENT_STREAM);
+    }
+    
+    @SuppressWarnings("unchecked")
+    private Set<WriteRequest> removeWriteRequestsForCurrentStream(IoSession session) {
+        return (Set<WriteRequest>) session.removeAttribute(WRITE_REQUESTS_FOR_CURRENT_STREAM);
+    }
+    
     @Override
     public void messageSent(NextFilter nextFilter, IoSession session,
                             WriteRequest writeRequest) throws Exception {
         T stream = getMessageClass().cast(session.getAttribute(CURRENT_STREAM));
 
-        if (stream == null) {
+        if (stream == null || !getWriteRequestsForCurrentStream(session).contains(writeRequest)) {
             nextFilter.messageSent(session, writeRequest);
         } else {
+            getWriteRequestsForCurrentStream(session).remove(writeRequest);
             IoBuffer buffer = getNextBuffer(stream);
 
             if (buffer == null) {
                 // End of stream reached.
                 session.removeAttribute(CURRENT_STREAM);
-                WriteRequest currentWriteRequest = (WriteRequest) session
-                        .removeAttribute(CURRENT_WRITE_REQUEST);
+                WriteRequest currentWriteRequest = (WriteRequest) session.removeAttribute(CURRENT_WRITE_REQUEST);
+                removeWriteRequestsForCurrentStream(session);
 
                 // Write queued WriteRequests.
                 Queue<WriteRequest> queue = removeWriteRequestQueue(session);
@@ -142,8 +159,9 @@
                 currentWriteRequest.getFuture().setWritten();
                 nextFilter.messageSent(session, currentWriteRequest);
             } else {
-                nextFilter.filterWrite(session, new DefaultWriteRequest(
-                        buffer));
+                DefaultWriteRequest bufferWriteRequest = new DefaultWriteRequest(buffer);
+                getWriteRequestsForCurrentStream(session).add(bufferWriteRequest);
+                nextFilter.filterWrite(session, bufferWriteRequest);
             }
         }
     }

Reply via email to