Author: bago Date: Wed Dec 21 14:47:25 2011 New Revision: 1221748 URL: http://svn.apache.org/viewvc?rev=1221748&view=rev Log: An attempt to refactor AbstractProtocolTransport to be thread safe. I moved back to standard synchronization as we only have max 2 threads competing for the queue so it doesn't make sense to use a non blocking queue. Norman, please overview, and feel free to revert if you don't like the solution (i thought it was better to simply commit instead of opening a JIRA to show you this).
Modified: james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java Modified: james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java URL: http://svn.apache.org/viewvc/james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java?rev=1221748&r1=1221747&r2=1221748&view=diff ============================================================================== --- james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java (original) +++ james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java Wed Dec 21 14:47:25 2011 @@ -22,9 +22,8 @@ package org.apache.james.protocols.api; import java.io.InputStream; import java.io.UnsupportedEncodingException; import java.util.List; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicBoolean; - +import java.util.Queue; +import java.util.concurrent.LinkedBlockingQueue; import org.apache.james.protocols.api.FutureResponse.ResponseListener; @@ -42,18 +41,34 @@ public abstract class AbstractProtocolTr // TODO: Should we limit the size ? - private final ConcurrentLinkedQueue<Response> responses = new ConcurrentLinkedQueue<Response>(); - private final AtomicBoolean write = new AtomicBoolean(false); + private final Queue<Response> responses = new LinkedBlockingQueue<Response>(); + private volatile boolean isAsync = false; /** * @see org.apache.james.protocols.api.ProtocolTransport#writeResponse(org.apache.james.protocols.api.Response, org.apache.james.protocols.api.ProtocolSession) */ public final void writeResponse(Response response, final ProtocolSession session) { - // just add the response to the queue. We will trigger the write operation later - responses.add(response); - - // trigger the write - writeQueuedResponses(session); + // if we already in asynchrnous mode we simply enqueue the response + // we do this synchronously because we may have a dequeuer thread working on + // isAsync and responses. + boolean enqueued = false; + synchronized(this) { + if (isAsync == true) { + responses.offer(response); + enqueued = true; + } + } + + // if we didn't enqueue then we check if the response is writable or we have to + // set us "asynchrnous" and wait for response to be ready. + if (!enqueued) { + if (isResponseWritable(response)) { + writeResponseToClient(response, session); + } else { + addDequeuerListener(response, session); + isAsync = true; + } + } } /** @@ -65,50 +80,46 @@ public abstract class AbstractProtocolTr * @param session */ private void writeQueuedResponses(final ProtocolSession session) { - Response queuedResponse = null; - if (write.compareAndSet(false, true)){ - boolean listenerAdded = false; - // dequeue Responses until non is left - while ((queuedResponse = responses.poll()) != null) { - - // check if we need to take special care of FutureResponses - if (queuedResponse instanceof FutureResponse) { - FutureResponse futureResponse =(FutureResponse) queuedResponse; - if (futureResponse.isReady()) { - // future is ready so we can write it without blocking the IO-Thread - writeResponseToClient(queuedResponse, session); - } else { - - // future is not ready so we need to write it via a ResponseListener otherwise we MAY block the IO-Thread - futureResponse.addListener(new ResponseListener() { - - public void onResponse(FutureResponse response) { - writeResponseToClient(response, session); - if (write.compareAndSet(true, false)) { - writeQueuedResponses(session); - } - } - }); - listenerAdded = true; - // just break here as we will trigger the dequeue later - break; - } - - } else { - // the Response is not a FutureResponse, so just write it back the the remote peer - writeResponseToClient(queuedResponse, session); + // dequeue Responses until non is left + while (true) { + + Response queuedResponse = null; + + // synchrnously we check responses and if it is empty we move back to non asynch + // behaviour + synchronized(this) { + queuedResponse = responses.poll(); + if (queuedResponse == null) { + isAsync = false; + break; } - } - // Check if a ResponseListener was added before. If not we can allow to write - // responses again. Otherwise the writing will get triggered from the listener - if (listenerAdded == false) { - write.set(false); + + // if we have something in the queue we continue writing until we + // find something asynchronous. + if (isResponseWritable(queuedResponse)) { + writeResponseToClient(queuedResponse, session); + } else { + addDequeuerListener(queuedResponse, session); + // no changes to isAsync here, because in this method we are always already async. + break; } } - - + } + + private boolean isResponseWritable(Response response) { + return !(response instanceof FutureResponse) || ((FutureResponse) response).isReady(); + } + + private void addDequeuerListener(Response response, final ProtocolSession session) { + ((FutureResponse) response).addListener(new ResponseListener() { + + public void onResponse(FutureResponse response) { + writeResponseToClient(response, session); + writeQueuedResponses(session); + } + }); } /** --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org