Author: rgreig
Date: Fri Sep 21 13:31:18 2007
New Revision: 578258

URL: http://svn.apache.org/viewvc?rev=578258&view=rev
Log:
QPID-607: dispatcher threads now poll so that the can die when the connection 
is closed.

Modified:
    
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java

Modified: 
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=578258&r1=578257&r2=578258&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 (original)
+++ 
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 Fri Sep 21 13:31:18 2007
@@ -108,6 +108,7 @@
 import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -2677,30 +2678,33 @@
 
             try
             {
-                while (!_closed.get() && ((message = (UnprocessedMessage) 
_queue.take()) != null))
+                while (!_closed.get())
                 {
-                    synchronized (_lock)
+                    message = (UnprocessedMessage) _queue.poll(1000, 
TimeUnit.MILLISECONDS);
+                    if (message != null)
                     {
-
-                        while (connectionStopped())
+                        synchronized (_lock)
                         {
-                            _lock.wait(2000);
-                        }
 
-                        if (message.getDeliverBody().deliveryTag <= 
_rollbackMark.get())
-                        {
-                            rejectMessage(message, true);
-                        }
-                        else
-                        {
-                            synchronized (_messageDeliveryLock)
+                            while (connectionStopped())
                             {
-                                dispatchMessage(message);
+                                _lock.wait(2000);
                             }
-                        }
 
-                    }
+                            if (message.getDeliverBody().deliveryTag <= 
_rollbackMark.get())
+                            {
+                                rejectMessage(message, true);
+                            }
+                            else
+                            {
+                                synchronized (_messageDeliveryLock)
+                                {
+                                    dispatchMessage(message);
+                                }
+                            }
 
+                        }
+                    }
                 }
             }
             catch (InterruptedException e)

Modified: 
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java?rev=578258&r1=578257&r2=578258&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
 (original)
+++ 
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
 Fri Sep 21 13:31:18 2007
@@ -23,6 +23,7 @@
 import java.util.Iterator;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 
 /**
  * A blocking queue that emits events above a user specified threshold 
allowing the caller to take action (e.g. flow
@@ -69,10 +70,10 @@
         _listener = listener;
     }
 
-    public Object take() throws InterruptedException
+    public Object poll(long time, TimeUnit unit) throws InterruptedException
     {
-        Object o = _queue.take();
-        if (_listener != null)
+        Object o = _queue.poll(time, unit);
+        if (o != null && _listener != null)
         {
             synchronized (_listener)
             {


Reply via email to