I've attached a patch that should resolve this issue.
Let me know if there is anything else I should do to get this fixed in
the MINA codebase.
Thanks,
-adam
On Wed, Aug 12, 2009 at 8:48 AM, Adam Brown<[email protected]> wrote:
> Any news on this?
>
> On Fri, Jul 31, 2009 at 9:44 AM, Adam Brown<[email protected]> wrote:
>> here's some code that i think demonstrates the problem. with the
>> calls to filterChainBuilder.addFirst() to add the SslFilter &
>> FileRegionWriteFilter, MyHandler.messageSent() is never called
>> (slightly different from what i've seen in our actual code, but pretty
>> close).
>>
>> let me know what else i can do to help track this down.
>>
>> -adam
>>
>> public class App {
>> private static class MyHandler extends IoHandlerAdapter {
>> private final IoBuffer ioBuffer = IoBuffer.wrap("test
>> string".getBytes());
>> private FileChannel fileChannel;
>>
>> public MyHandler() {
>> try {
>> fileChannel = new
>> FileInputStream("tmpfile").getChannel();
>> } catch (FileNotFoundException e) {
>> e.printStackTrace();
>> System.exit(1);
>> }
>> }
>>
>> �...@override
>> public void messageReceived(IoSession session, Object message)
>> throws Exception {
>> session.write(ioBuffer);
>> session.write(new DefaultFileRegion(fileChannel));
>> }
>>
>> �...@override
>> public void messageSent(IoSession session, Object message) {
>> System.out.println();
>> System.out.println("messageSent" + message);
>> }
>> }
>>
>> public static void main(String[] args) throws
>> NoSuchAlgorithmException, KeyManagementException, IOException {
>> SSLContext context = SSLContext.getInstance("TLS");
>> context.init(null, null, null);
>>
>> NioSocketAcceptor ioAcceptor = new NioSocketAcceptor(4);
>> ioAcceptor.setReuseAddress(true);
>> ioAcceptor.setBacklog(2048);
>> ioAcceptor.getSessionConfig().setThroughputCalculationInterval(10);
>> ioAcceptor.setCloseOnDeactivation(false);
>>
>> DefaultIoFilterChainBuilder filterChainBuilder = new
>> DefaultIoFilterChainBuilder();
>> filterChainBuilder.addFirst("sslFilter", new SslFilter(context));
>> filterChainBuilder.addFirst("fileRegionFilter", new
>> FileRegionWriteFilter());
>>
>> ioAcceptor.setFilterChainBuilder(filterChainBuilder);
>> ioAcceptor.setHandler(new MyHandler());
>> ioAcceptor.bind(new InetSocketAddress(8080));
>> }
>> }
>>
>>
>> On Fri, Jul 31, 2009 at 8:39 AM, Adam Brown<[email protected]> wrote:
>>> On Thu, Jul 30, 2009 at 3:53 PM, Emmanuel Lecharny<[email protected]>
>>> wrote:
>>>> Adam Brown wrote:
>>>>>
>>>>> i'm currently using a FileRegionWriteFilter 'before' an SslFilter to
>>>>> breakup FileRegion objects into chunks that SslFilter can handle
>>>>> (since SslFilter doesn't handle them directly). however, this filter
>>>>> chain also has IoBuffers written to the same IoSession.
>>>>>
>>>>> the problem i'm seeing appears to be that the IoBuffer is not being
>>>>> sent by MINA (with messageSent() being sent back up the chain and
>>>>> IoHandler) before the first IoBuffer generated by the
>>>>> FileRegionWriteFilter is written. because of this ordering, my
>>>>> IoBuffer (not the ones created by FileRegionWriteFilter) is "lost" to
>>>>> the chain and never has messageSent() called on it. in addition to
>>>>> the messageSent() call made with the FileRegion, some of the IoBuffers
>>>>> generated by FileRegionWriteFilter "escape" and have messageSent()
>>>>> called on them, passing them up the filter chain (and ultimately to my
>>>>> IoHandler).
>>>>>
>>>>> so really, i've got two problems :-).
>>>>>
>>>>
>>>> Are you using an ExecutorFilter? If so, which kind of threadPool are you
>>>> using ?
>>>
>>> no, we are not using an ExecutorFilter on the filter chain. all of
>>> the write()s are occuring in the same thread, just different places in
>>> our code.
>>>
>>>>
>>>> Can you push a very simple piece of code demonstrating the problem so that
>>>> we can debu it and understand what's going on ?
>>>
>>> yeah, i'll try to put together a simple example that exercises this.
>>> i did a little debugging before posting to the list & the general flow
>>> of events is as follows:
>>>
>>> our code:
>>> ioSession.write(myIoBuffer);
>>> ...
>>> ioSession.write(fileRegion);
>>>
>>> yields in the filter chain:
>>>
>>> AbstractStreamWriteFilter.filterWrite(myIoBuffer);
>>> AbstractStreamWriteFilter.filterWrite(fileRegion);
>>> AbstractStreamWriteFilter.messageSent(myIoBuffer); -> gets "captured"
>>> by AbstractStreamWriteFilter
>>> AbstractStreamWriteFilter.messageSent(ioBufferForFileRegion); ->
>>> called a couple of times
>>> ...
>>> => nextFilter.messageSent(fileRegion);
>>> nextFilter.messageSent(ioBufferForFileRegion); -> called at least
>>> once, but this should have been "captured" by
>>> AbstractStreamWriteFilter
>>>
>>>>
>>>> Thanks !
>>>>
>>>> PS: MINA version, etc... That helps !
>>>
>>> right now, we're using 2.0.0-M3. we had switched to -M5 a while back,
>>> but there was a serious performance drop-off caused by -M5 (which we
>>> haven't had a chance to track down yet), so we reverted to -M3.
>>>
>>>>
>>>> --
>>>> --
>>>> cordialement, regards,
>>>> Emmanuel Lécharny
>>>> www.iktek.com
>>>> directory.apache.org
>>>>
>>>>
>>>>
>>>
>>
>
--- /home/abrown/dev/mina/core/src/main/java/org/apache/mina/filter/stream/AbstractStreamWriteFilter.java 2009-07-20 14:26:46.000000000 -0700
+++ AbstractStreamWriteFilter.java 2009-08-24 14:55:53.000000000 -0700
@@ -17,10 +17,13 @@
* under the License.
*
*/
package org.apache.mina.filter.stream;
import java.io.IOException;
+import java.util.HashSet;
import java.util.Queue;
+import java.util.Set;
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.filterchain.IoFilterAdapter;
@@ -49,6 +52,7 @@
protected final AttributeKey WRITE_REQUEST_QUEUE = new AttributeKey(getClass(), "queue");
protected final AttributeKey CURRENT_WRITE_REQUEST = new AttributeKey(getClass(), "writeRequest");
+ protected final AttributeKey WRITE_REQUESTS_FOR_CURRENT_STREAM = new AttributeKey(getClass(), "writeRequestsForCurrentStream");
private int writeBufferSize = DEFAULT_STREAM_BUFFER_SIZE;
@@ -91,9 +95,11 @@
} else {
session.setAttribute(CURRENT_STREAM, message);
session.setAttribute(CURRENT_WRITE_REQUEST, writeRequest);
+ session.setAttribute(WRITE_REQUESTS_FOR_CURRENT_STREAM, new HashSet<WriteRequest>());
- nextFilter.filterWrite(session, new DefaultWriteRequest(
- buffer));
+ DefaultWriteRequest bufferWriteRequest = new DefaultWriteRequest(buffer);
+ getWriteRequestsForCurrentStream(session).add(bufferWriteRequest);
+ nextFilter.filterWrite(session, bufferWriteRequest);
}
} else {
@@ -113,21 +119,32 @@
return (Queue<WriteRequest>) session.removeAttribute(WRITE_REQUEST_QUEUE);
}
+ @SuppressWarnings("unchecked")
+ private Set<WriteRequest> getWriteRequestsForCurrentStream(IoSession session) {
+ return (Set<WriteRequest>) session.getAttribute(WRITE_REQUESTS_FOR_CURRENT_STREAM);
+ }
+
+ @SuppressWarnings("unchecked")
+ private Set<WriteRequest> removeWriteRequestsForCurrentStream(IoSession session) {
+ return (Set<WriteRequest>) session.removeAttribute(WRITE_REQUESTS_FOR_CURRENT_STREAM);
+ }
+
@Override
public void messageSent(NextFilter nextFilter, IoSession session,
WriteRequest writeRequest) throws Exception {
T stream = getMessageClass().cast(session.getAttribute(CURRENT_STREAM));
- if (stream == null) {
+ if (stream == null || !getWriteRequestsForCurrentStream(session).contains(writeRequest)) {
nextFilter.messageSent(session, writeRequest);
} else {
+ getWriteRequestsForCurrentStream(session).remove(writeRequest);
IoBuffer buffer = getNextBuffer(stream);
if (buffer == null) {
// End of stream reached.
session.removeAttribute(CURRENT_STREAM);
- WriteRequest currentWriteRequest = (WriteRequest) session
- .removeAttribute(CURRENT_WRITE_REQUEST);
+ WriteRequest currentWriteRequest = (WriteRequest) session.removeAttribute(CURRENT_WRITE_REQUEST);
+ removeWriteRequestsForCurrentStream(session);
// Write queued WriteRequests.
Queue<WriteRequest> queue = removeWriteRequestQueue(session);
@@ -142,8 +159,9 @@
currentWriteRequest.getFuture().setWritten();
nextFilter.messageSent(session, currentWriteRequest);
} else {
- nextFilter.filterWrite(session, new DefaultWriteRequest(
- buffer));
+ DefaultWriteRequest bufferWriteRequest = new DefaultWriteRequest(buffer);
+ getWriteRequestsForCurrentStream(session).add(bufferWriteRequest);
+ nextFilter.filterWrite(session, bufferWriteRequest);
}
}
}