Author: norman
Date: Wed Sep 28 07:02:53 2011
New Revision: 1176754
URL: http://svn.apache.org/viewvc?rev=1176754&view=rev
Log:
Better use synchronized so we are 100 % sure the response order is not mixed in
all cases. See PROTOCOLS-36
Modified:
james/protocols/trunk/impl/src/main/java/org/apache/james/protocols/impl/NettyProtocolTransport.java
Modified:
james/protocols/trunk/impl/src/main/java/org/apache/james/protocols/impl/NettyProtocolTransport.java
URL:
http://svn.apache.org/viewvc/james/protocols/trunk/impl/src/main/java/org/apache/james/protocols/impl/NettyProtocolTransport.java?rev=1176754&r1=1176753&r2=1176754&view=diff
==============================================================================
---
james/protocols/trunk/impl/src/main/java/org/apache/james/protocols/impl/NettyProtocolTransport.java
(original)
+++
james/protocols/trunk/impl/src/main/java/org/apache/james/protocols/impl/NettyProtocolTransport.java
Wed Sep 28 07:02:53 2011
@@ -20,7 +20,7 @@
package org.apache.james.protocols.impl;
import java.net.InetSocketAddress;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.LinkedList;
import javax.net.ssl.SSLEngine;
@@ -46,7 +46,7 @@ public class NettyProtocolTransport impl
private int lineHandlerCount = 0;
// TODO: Should we limit the size ?
- private final ConcurrentLinkedQueue<Response> responses = new
ConcurrentLinkedQueue<Response>();
+ private final LinkedList<Response> responses = new LinkedList<Response>();
public NettyProtocolTransport(Channel channel, SSLEngine engine) {
this.channel = channel;
@@ -93,11 +93,14 @@ public class NettyProtocolTransport impl
* @see
org.apache.james.protocols.api.ProtocolTransport#writeResponse(org.apache.james.protocols.api.Response,
org.apache.james.protocols.api.ProtocolSession)
*/
public void writeResponse(Response response, final ProtocolSession
session) {
- // just add the response to the queue. We will trigger the write
operation later
- responses.add(response);
-
- // trigger the write
- writeQueuedResponses(session);
+ synchronized (responses) {
+ // just add the response to the queue. We will trigger the write
operation later
+ responses.add(response);
+
+ // trigger the write
+ writeQueuedResponses(session);
+ }
+
}
/**
@@ -108,38 +111,41 @@ public class NettyProtocolTransport impl
*
* @param session
*/
- private void writeQueuedResponses(final ProtocolSession session) {
- Response queuedResponse = null;
-
- // dequeue Responses until non is left
- while ((queuedResponse = responses.poll()) != null) {
+ private void writeQueuedResponses(final ProtocolSession session) {
+ synchronized (responses) {
+ Response queuedResponse = null;
- // check if we need to take special care of FutureResponses
- if (queuedResponse instanceof FutureResponse) {
- FutureResponse futureResponse =(FutureResponse) queuedResponse;
- if (futureResponse.isReady()) {
- // future is ready so we can write it without blocking the
IO-Thread
- writeResponseToChannel(queuedResponse, session);
- } else {
-
- // future is not ready so we need to write it via a
ResponseListener otherwise we MAY block the IO-Thread
- futureResponse.addListener(new ResponseListener() {
+ // dequeue Responses until non is left
+ while ((queuedResponse = responses.poll()) != null) {
+
+ // check if we need to take special care of FutureResponses
+ if (queuedResponse instanceof FutureResponse) {
+ FutureResponse futureResponse =(FutureResponse)
queuedResponse;
+ if (futureResponse.isReady()) {
+ // future is ready so we can write it without blocking
the IO-Thread
+ writeResponseToChannel(queuedResponse, session);
+ } else {
+
+ // future is not ready so we need to write it via a
ResponseListener otherwise we MAY block the IO-Thread
+ futureResponse.addListener(new ResponseListener() {
+
+ public void onResponse(FutureResponse response) {
+ writeResponseToChannel(response, session);
+ writeQueuedResponses(session);
+ }
+ });
- public void onResponse(FutureResponse response) {
- writeResponseToChannel(response, session);
- writeQueuedResponses(session);
- }
- });
+ // just break here as we will trigger the dequeue later
+ break;
+ }
- // just break here as we will trigger the dequeue later
- break;
+ } else {
+ // the Response is not a FutureResponse, so just write it
back the the remote peer
+ writeResponseToChannel(queuedResponse, session);
}
-
- } else {
- // the Response is not a FutureResponse, so just write it back
the the remote peer
- writeResponseToChannel(queuedResponse, session);
}
}
+
}
private void writeResponseToChannel(Response response, ProtocolSession
session) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]