Hi Stephano,

Opening the discussion to learn more :)

- Why are you considering that 2 threads is a criteria to use standard synchronization rather than some atomic fields.

- I can understand you replace a concurrent by a non-concurrent queue. However, you now have a blocking queue. Is there an impact due to this blocking aspect?

- You defined isAsync as volatile and sometimes encapsulate access to isAsync in a synchronized block, sometime not. Why using 2 different thread-safety strategies in this class?

Thx,

Eric


On 21/12/11 15:47, b...@apache.org wrote:
Author: bago
Date: Wed Dec 21 14:47:25 2011
New Revision: 1221748

URL: http://svn.apache.org/viewvc?rev=1221748&view=rev
Log:
An attempt to refactor AbstractProtocolTransport to be thread safe. I moved 
back to standard synchronization as we only have max 2 threads competing for 
the queue so it doesn't make sense to use a non blocking queue. Norman, please 
overview, and feel free to revert if you don't like the solution (i thought it 
was better to simply commit instead of opening a JIRA to show you this).

Modified:
     
james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java

Modified: 
james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java
URL: 
http://svn.apache.org/viewvc/james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java?rev=1221748&r1=1221747&r2=1221748&view=diff
==============================================================================
--- 
james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java
 (original)
+++ 
james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java
 Wed Dec 21 14:47:25 2011
@@ -22,9 +22,8 @@ package org.apache.james.protocols.api;
  import java.io.InputStream;
  import java.io.UnsupportedEncodingException;
  import java.util.List;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-
+import java.util.Queue;
+import java.util.concurrent.LinkedBlockingQueue;

  import org.apache.james.protocols.api.FutureResponse.ResponseListener;

@@ -42,18 +41,34 @@ public abstract class AbstractProtocolTr


      // TODO: Should we limit the size ?
-    private final ConcurrentLinkedQueue<Response>  responses = new 
ConcurrentLinkedQueue<Response>();
-    private final AtomicBoolean write = new AtomicBoolean(false);
+    private final Queue<Response>  responses = new 
LinkedBlockingQueue<Response>();
+    private volatile boolean isAsync = false;

      /**
       * @see 
org.apache.james.protocols.api.ProtocolTransport#writeResponse(org.apache.james.protocols.api.Response,
 org.apache.james.protocols.api.ProtocolSession)
       */
      public final void writeResponse(Response response, final ProtocolSession 
session) {
-        // just add the response to the queue. We will trigger the write 
operation later
-        responses.add(response);
-
-        // trigger the write
-        writeQueuedResponses(session);
+        // if we already in asynchrnous mode we simply enqueue the response
+        // we do this synchronously because we may have a dequeuer thread 
working on
+        // isAsync and responses.
+        boolean enqueued = false;
+        synchronized(this) {
+            if (isAsync == true) {
+                responses.offer(response);
+                enqueued = true;
+            }
+        }
+
+        // if we didn't enqueue then we check if the response is writable or 
we have to
+        // set us "asynchrnous" and wait for response to be ready.
+        if (!enqueued) {
+            if (isResponseWritable(response)) {
+                writeResponseToClient(response, session);
+            } else {
+                addDequeuerListener(response, session);
+                isAsync = true;
+            }
+        }
      }

      /**
@@ -65,50 +80,46 @@ public abstract class AbstractProtocolTr
       * @param session
       */
      private  void writeQueuedResponses(final ProtocolSession session) {
-        Response queuedResponse = null;

-        if (write.compareAndSet(false, true)){
-            boolean listenerAdded = false;
-            // dequeue Responses until non is left
-            while ((queuedResponse = responses.poll()) != null) {
-
-                // check if we need to take special care of FutureResponses
-                if (queuedResponse instanceof FutureResponse) {
-                    FutureResponse futureResponse =(FutureResponse) 
queuedResponse;
-                    if (futureResponse.isReady()) {
-                        // future is ready so we can write it without blocking 
the IO-Thread
-                        writeResponseToClient(queuedResponse, session);
-                    } else {
-
-                        // future is not ready so we need to write it via a 
ResponseListener otherwise we MAY block the IO-Thread
-                        futureResponse.addListener(new ResponseListener() {
-
-                            public void onResponse(FutureResponse response) {
-                                writeResponseToClient(response, session);
-                                if (write.compareAndSet(true, false)) {
-                                    writeQueuedResponses(session);
-                                }
-                            }
-                        });
-                        listenerAdded = true;
-                        // just break here as we will trigger the dequeue later
-                        break;
-                    }
-
-                } else {
-                    // the Response is not a FutureResponse, so just write it 
back the the remote peer
-                    writeResponseToClient(queuedResponse, session);
+        // dequeue Responses until non is left
+        while (true) {
+
+            Response queuedResponse = null;
+
+            // synchrnously we check responses and if it is empty we move back 
to non asynch
+            // behaviour
+            synchronized(this) {
+                queuedResponse = responses.poll();
+                if (queuedResponse == null) {
+                    isAsync = false;
+                    break;
                  }
-
              }
-            // Check if a ResponseListener was added before. If not we can 
allow to write
-            // responses again. Otherwise the writing will get triggered from 
the listener
-            if (listenerAdded == false) {
-                write.set(false);
+
+            // if we have something in the queue we continue writing until we
+            // find something asynchronous.
+            if (isResponseWritable(queuedResponse)) {
+                writeResponseToClient(queuedResponse, session);
+            } else {
+                addDequeuerListener(queuedResponse, session);
+                // no changes to isAsync here, because in this method we are 
always already async.
+                break;
              }
          }
-
-
+    }
+
+    private boolean isResponseWritable(Response response) {
+        return !(response instanceof FutureResponse) || ((FutureResponse) 
response).isReady();
+    }
+
+    private void addDequeuerListener(Response response, final ProtocolSession 
session) {
+        ((FutureResponse) response).addListener(new ResponseListener() {
+
+            public void onResponse(FutureResponse response) {
+                writeResponseToClient(response, session);
+                writeQueuedResponses(session);
+            }
+        });
      }

      /**



---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org


--
Eric http://about.echarles.net

---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org

Reply via email to