Hi Eric,

comments inside....

Am 24.12.2011 um 10:05 schrieb Eric Charles <e...@apache.org>:

> Hi Stephano,
> 
> Opening the discussion to learn more :)
> 
> - Why are you considering that 2 threads is a criteria to use standard 
> synchronization rather than some atomic fields.
> 
If you only have a small count of concurrent threads its not slower to use 
synchronization as the context switching will not happen often..


> - I can understand you replace a concurrent by a non-concurrent queue. 
> However, you now have a blocking queue. Is there an impact due to this 
> blocking aspect?

Nope there is not as we not use the blocking methods. We could even replace it 
with a LinkedList.

> 
> - You defined isAsync as volatile and sometimes encapsulate access to isAsync 
> in a synchronized block, sometime not. Why using 2 different thread-safety 
> strategies in this class?
> 
If you only need to access a "status flag" ina concurrent way then its more 
cheap to just use a volatile for it. If you need to update more then one field 
in a "atomic" way you need synchronized. Updating a volatile in a synchronized 
is not a problem...


> Thx,
> 
> Eric
> 
> 
Hope it helps,
Norman


> On 21/12/11 15:47, b...@apache.org wrote:
>> Author: bago
>> Date: Wed Dec 21 14:47:25 2011
>> New Revision: 1221748
>> 
>> URL: http://svn.apache.org/viewvc?rev=1221748&view=rev
>> Log:
>> An attempt to refactor AbstractProtocolTransport to be thread safe. I moved 
>> back to standard synchronization as we only have max 2 threads competing for 
>> the queue so it doesn't make sense to use a non blocking queue. Norman, 
>> please overview, and feel free to revert if you don't like the solution (i 
>> thought it was better to simply commit instead of opening a JIRA to show you 
>> this).
>> 
>> Modified:
>>     
>> james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java
>> 
>> Modified: 
>> james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java
>> URL: 
>> http://svn.apache.org/viewvc/james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java?rev=1221748&r1=1221747&r2=1221748&view=diff
>> ==============================================================================
>> --- 
>> james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java
>>  (original)
>> +++ 
>> james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java
>>  Wed Dec 21 14:47:25 2011
>> @@ -22,9 +22,8 @@ package org.apache.james.protocols.api;
>>  import java.io.InputStream;
>>  import java.io.UnsupportedEncodingException;
>>  import java.util.List;
>> -import java.util.concurrent.ConcurrentLinkedQueue;
>> -import java.util.concurrent.atomic.AtomicBoolean;
>> -
>> +import java.util.Queue;
>> +import java.util.concurrent.LinkedBlockingQueue;
>> 
>>  import org.apache.james.protocols.api.FutureResponse.ResponseListener;
>> 
>> @@ -42,18 +41,34 @@ public abstract class AbstractProtocolTr
>> 
>> 
>>      // TODO: Should we limit the size ?
>> -    private final ConcurrentLinkedQueue<Response>  responses = new 
>> ConcurrentLinkedQueue<Response>();
>> -    private final AtomicBoolean write = new AtomicBoolean(false);
>> +    private final Queue<Response>  responses = new 
>> LinkedBlockingQueue<Response>();
>> +    private volatile boolean isAsync = false;
>> 
>>      /**
>>       * @see 
>> org.apache.james.protocols.api.ProtocolTransport#writeResponse(org.apache.james.protocols.api.Response,
>>  org.apache.james.protocols.api.ProtocolSession)
>>       */
>>      public final void writeResponse(Response response, final 
>> ProtocolSession session) {
>> -        // just add the response to the queue. We will trigger the write 
>> operation later
>> -        responses.add(response);
>> -
>> -        // trigger the write
>> -        writeQueuedResponses(session);
>> +        // if we already in asynchrnous mode we simply enqueue the response
>> +        // we do this synchronously because we may have a dequeuer thread 
>> working on
>> +        // isAsync and responses.
>> +        boolean enqueued = false;
>> +        synchronized(this) {
>> +            if (isAsync == true) {
>> +                responses.offer(response);
>> +                enqueued = true;
>> +            }
>> +        }
>> +
>> +        // if we didn't enqueue then we check if the response is writable 
>> or we have to
>> +        // set us "asynchrnous" and wait for response to be ready.
>> +        if (!enqueued) {
>> +            if (isResponseWritable(response)) {
>> +                writeResponseToClient(response, session);
>> +            } else {
>> +                addDequeuerListener(response, session);
>> +                isAsync = true;
>> +            }
>> +        }
>>      }
>> 
>>      /**
>> @@ -65,50 +80,46 @@ public abstract class AbstractProtocolTr
>>       * @param session
>>       */
>>      private  void writeQueuedResponses(final ProtocolSession session) {
>> -        Response queuedResponse = null;
>> 
>> -        if (write.compareAndSet(false, true)){
>> -            boolean listenerAdded = false;
>> -            // dequeue Responses until non is left
>> -            while ((queuedResponse = responses.poll()) != null) {
>> -
>> -                // check if we need to take special care of FutureResponses
>> -                if (queuedResponse instanceof FutureResponse) {
>> -                    FutureResponse futureResponse =(FutureResponse) 
>> queuedResponse;
>> -                    if (futureResponse.isReady()) {
>> -                        // future is ready so we can write it without 
>> blocking the IO-Thread
>> -                        writeResponseToClient(queuedResponse, session);
>> -                    } else {
>> -
>> -                        // future is not ready so we need to write it via a 
>> ResponseListener otherwise we MAY block the IO-Thread
>> -                        futureResponse.addListener(new ResponseListener() {
>> -
>> -                            public void onResponse(FutureResponse response) 
>> {
>> -                                writeResponseToClient(response, session);
>> -                                if (write.compareAndSet(true, false)) {
>> -                                    writeQueuedResponses(session);
>> -                                }
>> -                            }
>> -                        });
>> -                        listenerAdded = true;
>> -                        // just break here as we will trigger the dequeue 
>> later
>> -                        break;
>> -                    }
>> -
>> -                } else {
>> -                    // the Response is not a FutureResponse, so just write 
>> it back the the remote peer
>> -                    writeResponseToClient(queuedResponse, session);
>> +        // dequeue Responses until non is left
>> +        while (true) {
>> +
>> +            Response queuedResponse = null;
>> +
>> +            // synchrnously we check responses and if it is empty we move 
>> back to non asynch
>> +            // behaviour
>> +            synchronized(this) {
>> +                queuedResponse = responses.poll();
>> +                if (queuedResponse == null) {
>> +                    isAsync = false;
>> +                    break;
>>                  }
>> -
>>              }
>> -            // Check if a ResponseListener was added before. If not we can 
>> allow to write
>> -            // responses again. Otherwise the writing will get triggered 
>> from the listener
>> -            if (listenerAdded == false) {
>> -                write.set(false);
>> +
>> +            // if we have something in the queue we continue writing until 
>> we
>> +            // find something asynchronous.
>> +            if (isResponseWritable(queuedResponse)) {
>> +                writeResponseToClient(queuedResponse, session);
>> +            } else {
>> +                addDequeuerListener(queuedResponse, session);
>> +                // no changes to isAsync here, because in this method we 
>> are always already async.
>> +                break;
>>              }
>>          }
>> -
>> -
>> +    }
>> +
>> +    private boolean isResponseWritable(Response response) {
>> +        return !(response instanceof FutureResponse) || ((FutureResponse) 
>> response).isReady();
>> +    }
>> +
>> +    private void addDequeuerListener(Response response, final 
>> ProtocolSession session) {
>> +        ((FutureResponse) response).addListener(new ResponseListener() {
>> +
>> +            public void onResponse(FutureResponse response) {
>> +                writeResponseToClient(response, session);
>> +                writeQueuedResponses(session);
>> +            }
>> +        });
>>      }
>> 
>>      /**
>> 
>> 
>> 
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
>> For additional commands, e-mail: server-dev-h...@james.apache.org
>> 
> 
> -- 
> Eric http://about.echarles.net
> 
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
> For additional commands, e-mail: server-dev-h...@james.apache.org
> 

---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org

Reply via email to