Hi Eric, comments inside....
Am 24.12.2011 um 10:05 schrieb Eric Charles <e...@apache.org>: > Hi Stephano, > > Opening the discussion to learn more :) > > - Why are you considering that 2 threads is a criteria to use standard > synchronization rather than some atomic fields. > If you only have a small count of concurrent threads its not slower to use synchronization as the context switching will not happen often.. > - I can understand you replace a concurrent by a non-concurrent queue. > However, you now have a blocking queue. Is there an impact due to this > blocking aspect? Nope there is not as we not use the blocking methods. We could even replace it with a LinkedList. > > - You defined isAsync as volatile and sometimes encapsulate access to isAsync > in a synchronized block, sometime not. Why using 2 different thread-safety > strategies in this class? > If you only need to access a "status flag" ina concurrent way then its more cheap to just use a volatile for it. If you need to update more then one field in a "atomic" way you need synchronized. Updating a volatile in a synchronized is not a problem... > Thx, > > Eric > > Hope it helps, Norman > On 21/12/11 15:47, b...@apache.org wrote: >> Author: bago >> Date: Wed Dec 21 14:47:25 2011 >> New Revision: 1221748 >> >> URL: http://svn.apache.org/viewvc?rev=1221748&view=rev >> Log: >> An attempt to refactor AbstractProtocolTransport to be thread safe. I moved >> back to standard synchronization as we only have max 2 threads competing for >> the queue so it doesn't make sense to use a non blocking queue. Norman, >> please overview, and feel free to revert if you don't like the solution (i >> thought it was better to simply commit instead of opening a JIRA to show you >> this). >> >> Modified: >> >> james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java >> >> Modified: >> james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java >> URL: >> http://svn.apache.org/viewvc/james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java?rev=1221748&r1=1221747&r2=1221748&view=diff >> ============================================================================== >> --- >> james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java >> (original) >> +++ >> james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java >> Wed Dec 21 14:47:25 2011 >> @@ -22,9 +22,8 @@ package org.apache.james.protocols.api; >> import java.io.InputStream; >> import java.io.UnsupportedEncodingException; >> import java.util.List; >> -import java.util.concurrent.ConcurrentLinkedQueue; >> -import java.util.concurrent.atomic.AtomicBoolean; >> - >> +import java.util.Queue; >> +import java.util.concurrent.LinkedBlockingQueue; >> >> import org.apache.james.protocols.api.FutureResponse.ResponseListener; >> >> @@ -42,18 +41,34 @@ public abstract class AbstractProtocolTr >> >> >> // TODO: Should we limit the size ? >> - private final ConcurrentLinkedQueue<Response> responses = new >> ConcurrentLinkedQueue<Response>(); >> - private final AtomicBoolean write = new AtomicBoolean(false); >> + private final Queue<Response> responses = new >> LinkedBlockingQueue<Response>(); >> + private volatile boolean isAsync = false; >> >> /** >> * @see >> org.apache.james.protocols.api.ProtocolTransport#writeResponse(org.apache.james.protocols.api.Response, >> org.apache.james.protocols.api.ProtocolSession) >> */ >> public final void writeResponse(Response response, final >> ProtocolSession session) { >> - // just add the response to the queue. We will trigger the write >> operation later >> - responses.add(response); >> - >> - // trigger the write >> - writeQueuedResponses(session); >> + // if we already in asynchrnous mode we simply enqueue the response >> + // we do this synchronously because we may have a dequeuer thread >> working on >> + // isAsync and responses. >> + boolean enqueued = false; >> + synchronized(this) { >> + if (isAsync == true) { >> + responses.offer(response); >> + enqueued = true; >> + } >> + } >> + >> + // if we didn't enqueue then we check if the response is writable >> or we have to >> + // set us "asynchrnous" and wait for response to be ready. >> + if (!enqueued) { >> + if (isResponseWritable(response)) { >> + writeResponseToClient(response, session); >> + } else { >> + addDequeuerListener(response, session); >> + isAsync = true; >> + } >> + } >> } >> >> /** >> @@ -65,50 +80,46 @@ public abstract class AbstractProtocolTr >> * @param session >> */ >> private void writeQueuedResponses(final ProtocolSession session) { >> - Response queuedResponse = null; >> >> - if (write.compareAndSet(false, true)){ >> - boolean listenerAdded = false; >> - // dequeue Responses until non is left >> - while ((queuedResponse = responses.poll()) != null) { >> - >> - // check if we need to take special care of FutureResponses >> - if (queuedResponse instanceof FutureResponse) { >> - FutureResponse futureResponse =(FutureResponse) >> queuedResponse; >> - if (futureResponse.isReady()) { >> - // future is ready so we can write it without >> blocking the IO-Thread >> - writeResponseToClient(queuedResponse, session); >> - } else { >> - >> - // future is not ready so we need to write it via a >> ResponseListener otherwise we MAY block the IO-Thread >> - futureResponse.addListener(new ResponseListener() { >> - >> - public void onResponse(FutureResponse response) >> { >> - writeResponseToClient(response, session); >> - if (write.compareAndSet(true, false)) { >> - writeQueuedResponses(session); >> - } >> - } >> - }); >> - listenerAdded = true; >> - // just break here as we will trigger the dequeue >> later >> - break; >> - } >> - >> - } else { >> - // the Response is not a FutureResponse, so just write >> it back the the remote peer >> - writeResponseToClient(queuedResponse, session); >> + // dequeue Responses until non is left >> + while (true) { >> + >> + Response queuedResponse = null; >> + >> + // synchrnously we check responses and if it is empty we move >> back to non asynch >> + // behaviour >> + synchronized(this) { >> + queuedResponse = responses.poll(); >> + if (queuedResponse == null) { >> + isAsync = false; >> + break; >> } >> - >> } >> - // Check if a ResponseListener was added before. If not we can >> allow to write >> - // responses again. Otherwise the writing will get triggered >> from the listener >> - if (listenerAdded == false) { >> - write.set(false); >> + >> + // if we have something in the queue we continue writing until >> we >> + // find something asynchronous. >> + if (isResponseWritable(queuedResponse)) { >> + writeResponseToClient(queuedResponse, session); >> + } else { >> + addDequeuerListener(queuedResponse, session); >> + // no changes to isAsync here, because in this method we >> are always already async. >> + break; >> } >> } >> - >> - >> + } >> + >> + private boolean isResponseWritable(Response response) { >> + return !(response instanceof FutureResponse) || ((FutureResponse) >> response).isReady(); >> + } >> + >> + private void addDequeuerListener(Response response, final >> ProtocolSession session) { >> + ((FutureResponse) response).addListener(new ResponseListener() { >> + >> + public void onResponse(FutureResponse response) { >> + writeResponseToClient(response, session); >> + writeQueuedResponses(session); >> + } >> + }); >> } >> >> /** >> >> >> >> --------------------------------------------------------------------- >> To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org >> For additional commands, e-mail: server-dev-h...@james.apache.org >> > > -- > Eric http://about.echarles.net > > --------------------------------------------------------------------- > To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org > For additional commands, e-mail: server-dev-h...@james.apache.org > --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org