Author: cctrieloff
Date: Tue Nov 13 13:49:39 2007
New Revision: 594655

URL: http://svn.apache.org/viewvc?rev=594655&view=rev
Log:
- fixed sync mode deadlock


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Serializer.h

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=594655&r1=594654&r2=594655&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Tue Nov 13 13:49:39 
2007
@@ -46,7 +46,8 @@
              const ConnectionToken* const _owner,
              Manageable* parent) :
 
-    name(_name), 
+    dispatching(false),
+       name(_name), 
     autodelete(_autodelete),
     store(_store),
     owner(_owner), 
@@ -75,7 +76,8 @@
 {
     // signal SemanticHander to ack completed dequeues
     // then dispatch to ack...
-    serializer.execute(dispatchCallback);
+       if (!dispatching)
+        serializer.execute(dispatchCallback);
 }
 
 
@@ -100,7 +102,8 @@
             push(msg);
         }
         QPID_LOG(debug, "Message " << msg << " enqueued on " << name << "[" << 
this << "]");
-        serializer.execute(dispatchCallback);
+        if (!dispatching)
+                   serializer.execute(dispatchCallback);
     }
 }
 
@@ -127,7 +130,8 @@
     push(msg);
     if (mgmtObject != 0)
         mgmtObject->enqueue (msg->contentSize (), mask);
-    serializer.execute(dispatchCallback);
+    if (!dispatching)
+        serializer.execute(dispatchCallback);
    
 }
 
@@ -137,7 +141,8 @@
         msg.payload->enqueueComplete(); // mark the message as enqueued
         messages.push_front(msg);
     }
-    serializer.execute(dispatchCallback);
+    if (!dispatching)
+           serializer.execute(dispatchCallback);
 }
 
 bool Queue::acquire(const QueuedMessage& msg) {
@@ -153,7 +158,8 @@
 
 void Queue::requestDispatch(Consumer::ptr c){
     if (!c || c->preAcquires()) {
-        serializer.execute(dispatchCallback);
+        if (!dispatching)
+                   serializer.execute(dispatchCallback);
     } else {
         DispatchFunctor f(*this, c);
         serializer.execute(f);
@@ -229,6 +235,7 @@
 
 void Queue::dispatch()
 {
+       dispatching = true;
      QueuedMessage msg(this);
      while (getNextMessage(msg) && msg.payload->isEnqueueComplete()){
          if (dispatch(msg)) {
@@ -242,6 +249,7 @@
          }        
      }
      serviceAllBrowsers();
+        dispatching = false;
 }
 
 void Queue::serviceAllBrowsers()

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=594655&r1=594654&r2=594655&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Tue Nov 13 13:49:39 
2007
@@ -74,7 +74,8 @@
                 DispatchFunctor(Queue& q, Consumer::ptr c, DispatchCompletion* 
s = 0) : queue(q), consumer(c), sync(s) {}
                 void operator()();
             };
-                
+
+                       bool dispatching;                
             const string name;
             const bool autodelete;
             MessageStore* const store;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Serializer.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Serializer.h?rev=594655&r1=594654&r2=594655&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Serializer.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Serializer.h Tue Nov 13 13:49:39 
2007
@@ -163,6 +163,7 @@
     assert(state == EXECUTING || state == DISPATCHING);
     Mutex::ScopedUnlock u(lock);
     // No exceptions allowed in task.
+       notifyWorker();
     try { task(); } catch (...) { assert(0); }
 }
 


Reply via email to