This makes sense! Thanks for taking care. I will cut a new release now that this is fixed. I think I will call it 1.6.0-RC1 as we are really close ;)
Bye, Norman -- Norman Maurer Am Mittwoch, 21. Dezember 2011 um 15:47 schrieb b...@apache.org: > Author: bago > Date: Wed Dec 21 14:47:25 2011 > New Revision: 1221748 > > URL: http://svn.apache.org/viewvc?rev=1221748&view=rev > Log: > An attempt to refactor AbstractProtocolTransport to be thread safe. I moved > back to standard synchronization as we only have max 2 threads competing for > the queue so it doesn't make sense to use a non blocking queue. Norman, > please overview, and feel free to revert if you don't like the solution (i > thought it was better to simply commit instead of opening a JIRA to show you > this). > > Modified: > james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java > > Modified: > james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java > URL: > http://svn.apache.org/viewvc/james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java?rev=1221748&r1=1221747&r2=1221748&view=diff > ============================================================================== > --- > james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java > (original) > +++ > james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java > Wed Dec 21 14:47:25 2011 > @@ -22,9 +22,8 @@ package org.apache.james.protocols.api; > import java.io.InputStream; > import java.io.UnsupportedEncodingException; > import java.util.List; > -import java.util.concurrent.ConcurrentLinkedQueue; > -import java.util.concurrent.atomic.AtomicBoolean; > - > +import java.util.Queue; > +import java.util.concurrent.LinkedBlockingQueue; > > import org.apache.james.protocols.api.FutureResponse.ResponseListener; > > @@ -42,18 +41,34 @@ public abstract class AbstractProtocolTr > > > // TODO: Should we limit the size ? > - private final ConcurrentLinkedQueue<Response> responses = new > ConcurrentLinkedQueue<Response>(); > - private final AtomicBoolean write = new AtomicBoolean(false); > + private final Queue<Response> responses = new > LinkedBlockingQueue<Response>(); > + private volatile boolean isAsync = false; > > /** > * @see > org.apache.james.protocols.api.ProtocolTransport#writeResponse(org.apache.james.protocols.api.Response, > org.apache.james.protocols.api.ProtocolSession) > */ > public final void writeResponse(Response response, final ProtocolSession > session) { > - // just add the response to the queue. We will trigger the write operation > later > - responses.add(response); > - > - // trigger the write > - writeQueuedResponses(session); > + // if we already in asynchrnous mode we simply enqueue the response > + // we do this synchronously because we may have a dequeuer thread working on > + // isAsync and responses. > + boolean enqueued = false; > + synchronized(this) { > + if (isAsync == true) { > + responses.offer(response); > + enqueued = true; > + } > + } > + > + // if we didn't enqueue then we check if the response is writable or we > have to > + // set us "asynchrnous" and wait for response to be ready. > + if (!enqueued) { > + if (isResponseWritable(response)) { > + writeResponseToClient(response, session); > + } else { > + addDequeuerListener(response, session); > + isAsync = true; > + } > + } > } > > /** > @@ -65,50 +80,46 @@ public abstract class AbstractProtocolTr > * @param session > */ > private void writeQueuedResponses(final ProtocolSession session) { > - Response queuedResponse = null; > > - if (write.compareAndSet(false, true)){ > - boolean listenerAdded = false; > - // dequeue Responses until non is left > - while ((queuedResponse = responses.poll()) != null) { > - > - // check if we need to take special care of FutureResponses > - if (queuedResponse instanceof FutureResponse) { > - FutureResponse futureResponse =(FutureResponse) queuedResponse; > - if (futureResponse.isReady()) { > - // future is ready so we can write it without blocking the IO-Thread > - writeResponseToClient(queuedResponse, session); > - } else { > - > - // future is not ready so we need to write it via a ResponseListener > otherwise we MAY block the IO-Thread > - futureResponse.addListener(new ResponseListener() { > - > - public void onResponse(FutureResponse response) { > - writeResponseToClient(response, session); > - if (write.compareAndSet(true, false)) { > - writeQueuedResponses(session); > - } > - } > - }); > - listenerAdded = true; > - // just break here as we will trigger the dequeue later > - break; > - } > - > - } else { > - // the Response is not a FutureResponse, so just write it back the the > remote peer > - writeResponseToClient(queuedResponse, session); > + // dequeue Responses until non is left > + while (true) { > + > + Response queuedResponse = null; > + > + // synchrnously we check responses and if it is empty we move back to non > asynch > + // behaviour > + synchronized(this) { > + queuedResponse = responses.poll(); > + if (queuedResponse == null) { > + isAsync = false; > + break; > } > - > } > - // Check if a ResponseListener was added before. If not we can allow to > write > - // responses again. Otherwise the writing will get triggered from the > listener > - if (listenerAdded == false) { > - write.set(false); > + > + // if we have something in the queue we continue writing until we > + // find something asynchronous. > + if (isResponseWritable(queuedResponse)) { > + writeResponseToClient(queuedResponse, session); > + } else { > + addDequeuerListener(queuedResponse, session); > + // no changes to isAsync here, because in this method we are always already > async. > + break; > } > } > - > - > + } > + > + private boolean isResponseWritable(Response response) { > + return !(response instanceof FutureResponse) || ((FutureResponse) > response).isReady(); > + } > + > + private void addDequeuerListener(Response response, final ProtocolSession > session) { > + ((FutureResponse) response).addListener(new ResponseListener() { > + > + public void onResponse(FutureResponse response) { > + writeResponseToClient(response, session); > + writeQueuedResponses(session); > + } > + }); > } > > /** > > > > --------------------------------------------------------------------- > To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org > (mailto:server-dev-unsubscr...@james.apache.org) > For additional commands, e-mail: server-dev-h...@james.apache.org > (mailto:server-dev-h...@james.apache.org) > >