2011/12/24 Eric Charles <e...@apache.org>: > Hi Stephano, > > Opening the discussion to learn more :) > > - Why are you considering that 2 threads is a criteria to use standard > synchronization rather than some atomic fields.
It is a criteria to not use the ConcurrentLinkedQueue that is a structure thought to handle many concurrent threads and is overkill for 2 threads. > - I can understand you replace a concurrent by a non-concurrent queue. > However, you now have a blocking queue. Is there an impact due to this > blocking aspect? I think the answer is no. That's why I did that. Remember we have 2 threads and they do 2 different things, they simply block each other when they add or remove from the queue. > - You defined isAsync as volatile and sometimes encapsulate access to > isAsync in a synchronized block, sometime not. Why using 2 different > thread-safety strategies in this class? Because some times it needed sincronization, other times I felt it was not needed (the access to the volatile doesn't need synchronization. I just synchronize to ensure that the change to the list happens together with the change in the volatile var). If you can find a better solution you're welcome to provide one. It took a couple of hours to reach a working solution. The previous one was not thread safe at this line: ------ if (listenerAdded == false) { write.set(false); ----- It could happen another thread already added a new item to the queue but skipped to process it because write was true. So we ended up with an item in the queue never written. I don't like too much my solution and I felt it a bit hackish, but that was my best solution for my limited time, so if you can provide a more elegant solution while still being thread safe, I'm more than happy :-) Stefano > > Thx, > > Eric > > > > On 21/12/11 15:47, b...@apache.org wrote: >> >> Author: bago >> Date: Wed Dec 21 14:47:25 2011 >> New Revision: 1221748 >> >> URL: http://svn.apache.org/viewvc?rev=1221748&view=rev >> Log: >> An attempt to refactor AbstractProtocolTransport to be thread safe. I >> moved back to standard synchronization as we only have max 2 threads >> competing for the queue so it doesn't make sense to use a non blocking >> queue. Norman, please overview, and feel free to revert if you don't like >> the solution (i thought it was better to simply commit instead of opening a >> JIRA to show you this). >> >> Modified: >> >> james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java >> >> Modified: >> james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java >> URL: >> http://svn.apache.org/viewvc/james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java?rev=1221748&r1=1221747&r2=1221748&view=diff >> >> ============================================================================== >> --- >> james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java >> (original) >> +++ >> james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java >> Wed Dec 21 14:47:25 2011 >> @@ -22,9 +22,8 @@ package org.apache.james.protocols.api; >> import java.io.InputStream; >> import java.io.UnsupportedEncodingException; >> import java.util.List; >> -import java.util.concurrent.ConcurrentLinkedQueue; >> -import java.util.concurrent.atomic.AtomicBoolean; >> - >> +import java.util.Queue; >> +import java.util.concurrent.LinkedBlockingQueue; >> >> import org.apache.james.protocols.api.FutureResponse.ResponseListener; >> >> @@ -42,18 +41,34 @@ public abstract class AbstractProtocolTr >> >> >> // TODO: Should we limit the size ? >> - private final ConcurrentLinkedQueue<Response> responses = new >> ConcurrentLinkedQueue<Response>(); >> - private final AtomicBoolean write = new AtomicBoolean(false); >> + private final Queue<Response> responses = new >> LinkedBlockingQueue<Response>(); >> + private volatile boolean isAsync = false; >> >> /** >> * @see >> org.apache.james.protocols.api.ProtocolTransport#writeResponse(org.apache.james.protocols.api.Response, >> org.apache.james.protocols.api.ProtocolSession) >> */ >> public final void writeResponse(Response response, final >> ProtocolSession session) { >> - // just add the response to the queue. We will trigger the write >> operation later >> - responses.add(response); >> - >> - // trigger the write >> - writeQueuedResponses(session); >> + // if we already in asynchrnous mode we simply enqueue the >> response >> + // we do this synchronously because we may have a dequeuer thread >> working on >> + // isAsync and responses. >> + boolean enqueued = false; >> + synchronized(this) { >> + if (isAsync == true) { >> + responses.offer(response); >> + enqueued = true; >> + } >> + } >> + >> + // if we didn't enqueue then we check if the response is writable >> or we have to >> + // set us "asynchrnous" and wait for response to be ready. >> + if (!enqueued) { >> + if (isResponseWritable(response)) { >> + writeResponseToClient(response, session); >> + } else { >> + addDequeuerListener(response, session); >> + isAsync = true; >> + } >> + } >> } >> >> /** >> @@ -65,50 +80,46 @@ public abstract class AbstractProtocolTr >> * @param session >> */ >> private void writeQueuedResponses(final ProtocolSession session) { >> - Response queuedResponse = null; >> >> - if (write.compareAndSet(false, true)){ >> - boolean listenerAdded = false; >> - // dequeue Responses until non is left >> - while ((queuedResponse = responses.poll()) != null) { >> - >> - // check if we need to take special care of >> FutureResponses >> - if (queuedResponse instanceof FutureResponse) { >> - FutureResponse futureResponse =(FutureResponse) >> queuedResponse; >> - if (futureResponse.isReady()) { >> - // future is ready so we can write it without >> blocking the IO-Thread >> - writeResponseToClient(queuedResponse, session); >> - } else { >> - >> - // future is not ready so we need to write it via >> a ResponseListener otherwise we MAY block the IO-Thread >> - futureResponse.addListener(new ResponseListener() >> { >> - >> - public void onResponse(FutureResponse >> response) { >> - writeResponseToClient(response, session); >> - if (write.compareAndSet(true, false)) { >> - writeQueuedResponses(session); >> - } >> - } >> - }); >> - listenerAdded = true; >> - // just break here as we will trigger the dequeue >> later >> - break; >> - } >> - >> - } else { >> - // the Response is not a FutureResponse, so just >> write it back the the remote peer >> - writeResponseToClient(queuedResponse, session); >> + // dequeue Responses until non is left >> + while (true) { >> + >> + Response queuedResponse = null; >> + >> + // synchrnously we check responses and if it is empty we move >> back to non asynch >> + // behaviour >> + synchronized(this) { >> + queuedResponse = responses.poll(); >> + if (queuedResponse == null) { >> + isAsync = false; >> + break; >> } >> - >> } >> - // Check if a ResponseListener was added before. If not we >> can allow to write >> - // responses again. Otherwise the writing will get triggered >> from the listener >> - if (listenerAdded == false) { >> - write.set(false); >> + >> + // if we have something in the queue we continue writing >> until we >> + // find something asynchronous. >> + if (isResponseWritable(queuedResponse)) { >> + writeResponseToClient(queuedResponse, session); >> + } else { >> + addDequeuerListener(queuedResponse, session); >> + // no changes to isAsync here, because in this method we >> are always already async. >> + break; >> } >> } >> - >> - >> + } >> + >> + private boolean isResponseWritable(Response response) { >> + return !(response instanceof FutureResponse) || ((FutureResponse) >> response).isReady(); >> + } >> + >> + private void addDequeuerListener(Response response, final >> ProtocolSession session) { >> + ((FutureResponse) response).addListener(new ResponseListener() { >> + >> + public void onResponse(FutureResponse response) { >> + writeResponseToClient(response, session); >> + writeQueuedResponses(session); >> + } >> + }); >> } >> >> /** >> >> >> >> --------------------------------------------------------------------- >> To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org >> For additional commands, e-mail: server-dev-h...@james.apache.org >> > > -- > Eric http://about.echarles.net > > > --------------------------------------------------------------------- > To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org > For additional commands, e-mail: server-dev-h...@james.apache.org > --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org