Author: norman Date: Wed Dec 21 08:36:01 2011 New Revision: 1221644 URL: http://svn.apache.org/viewvc?rev=1221644&view=rev Log: Make sure the Responses are written in the correct order even if FutureResponse's and Response's get mixed. See PROTOCOLS-62
Modified: james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java Modified: james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java URL: http://svn.apache.org/viewvc/james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java?rev=1221644&r1=1221643&r2=1221644&view=diff ============================================================================== --- james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java (original) +++ james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java Wed Dec 21 08:36:01 2011 @@ -23,6 +23,7 @@ import java.io.InputStream; import java.io.UnsupportedEncodingException; import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.james.protocols.api.FutureResponse.ResponseListener; @@ -42,7 +43,8 @@ public abstract class AbstractProtocolTr // TODO: Should we limit the size ? private final ConcurrentLinkedQueue<Response> responses = new ConcurrentLinkedQueue<Response>(); - + private final AtomicBoolean write = new AtomicBoolean(false); + /** * @see org.apache.james.protocols.api.ProtocolTransport#writeResponse(org.apache.james.protocols.api.Response, org.apache.james.protocols.api.ProtocolSession) */ @@ -64,37 +66,48 @@ public abstract class AbstractProtocolTr */ private void writeQueuedResponses(final ProtocolSession session) { Response queuedResponse = null; - - // dequeue Responses until non is left - while ((queuedResponse = responses.poll()) != null) { - - // check if we need to take special care of FutureResponses - if (queuedResponse instanceof FutureResponse) { - FutureResponse futureResponse =(FutureResponse) queuedResponse; - if (futureResponse.isReady()) { - // future is ready so we can write it without blocking the IO-Thread - writeResponseToClient(queuedResponse, session); - } else { - - // future is not ready so we need to write it via a ResponseListener otherwise we MAY block the IO-Thread - futureResponse.addListener(new ResponseListener() { + + if (write.compareAndSet(false, true)){ + boolean listenerAdded = false; + // dequeue Responses until non is left + while ((queuedResponse = responses.poll()) != null) { + + // check if we need to take special care of FutureResponses + if (queuedResponse instanceof FutureResponse) { + FutureResponse futureResponse =(FutureResponse) queuedResponse; + if (futureResponse.isReady()) { + // future is ready so we can write it without blocking the IO-Thread + writeResponseToClient(queuedResponse, session); + } else { - public void onResponse(FutureResponse response) { - writeResponseToClient(response, session); - writeQueuedResponses(session); - } - }); + // future is not ready so we need to write it via a ResponseListener otherwise we MAY block the IO-Thread + futureResponse.addListener(new ResponseListener() { + + public void onResponse(FutureResponse response) { + writeResponseToClient(response, session); + if (write.compareAndSet(true, false)) { + writeQueuedResponses(session); + } + } + }); + listenerAdded = true; + // just break here as we will trigger the dequeue later + break; + } - // just break here as we will trigger the dequeue later - break; + } else { + // the Response is not a FutureResponse, so just write it back the the remote peer + writeResponseToClient(queuedResponse, session); } - - } else { - // the Response is not a FutureResponse, so just write it back the the remote peer - writeResponseToClient(queuedResponse, session); + + } + // Check if a ResponseListener was added before. If not we can allow to write + // responses again. Otherwise the writing will get triggered from the listener + if (listenerAdded == false) { + write.set(false); } - } + } --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org