Author: cctrieloff
Date: Tue Nov 13 13:49:39 2007
New Revision: 594655
URL: http://svn.apache.org/viewvc?rev=594655&view=rev
Log:
- fixed sync mode deadlock
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Serializer.h
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=594655&r1=594654&r2=594655&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Tue Nov 13 13:49:39
2007
@@ -46,7 +46,8 @@
const ConnectionToken* const _owner,
Manageable* parent) :
- name(_name),
+ dispatching(false),
+ name(_name),
autodelete(_autodelete),
store(_store),
owner(_owner),
@@ -75,7 +76,8 @@
{
// signal SemanticHander to ack completed dequeues
// then dispatch to ack...
- serializer.execute(dispatchCallback);
+ if (!dispatching)
+ serializer.execute(dispatchCallback);
}
@@ -100,7 +102,8 @@
push(msg);
}
QPID_LOG(debug, "Message " << msg << " enqueued on " << name << "[" <<
this << "]");
- serializer.execute(dispatchCallback);
+ if (!dispatching)
+ serializer.execute(dispatchCallback);
}
}
@@ -127,7 +130,8 @@
push(msg);
if (mgmtObject != 0)
mgmtObject->enqueue (msg->contentSize (), mask);
- serializer.execute(dispatchCallback);
+ if (!dispatching)
+ serializer.execute(dispatchCallback);
}
@@ -137,7 +141,8 @@
msg.payload->enqueueComplete(); // mark the message as enqueued
messages.push_front(msg);
}
- serializer.execute(dispatchCallback);
+ if (!dispatching)
+ serializer.execute(dispatchCallback);
}
bool Queue::acquire(const QueuedMessage& msg) {
@@ -153,7 +158,8 @@
void Queue::requestDispatch(Consumer::ptr c){
if (!c || c->preAcquires()) {
- serializer.execute(dispatchCallback);
+ if (!dispatching)
+ serializer.execute(dispatchCallback);
} else {
DispatchFunctor f(*this, c);
serializer.execute(f);
@@ -229,6 +235,7 @@
void Queue::dispatch()
{
+ dispatching = true;
QueuedMessage msg(this);
while (getNextMessage(msg) && msg.payload->isEnqueueComplete()){
if (dispatch(msg)) {
@@ -242,6 +249,7 @@
}
}
serviceAllBrowsers();
+ dispatching = false;
}
void Queue::serviceAllBrowsers()
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=594655&r1=594654&r2=594655&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Tue Nov 13 13:49:39
2007
@@ -74,7 +74,8 @@
DispatchFunctor(Queue& q, Consumer::ptr c, DispatchCompletion*
s = 0) : queue(q), consumer(c), sync(s) {}
void operator()();
};
-
+
+ bool dispatching;
const string name;
const bool autodelete;
MessageStore* const store;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Serializer.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Serializer.h?rev=594655&r1=594654&r2=594655&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Serializer.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Serializer.h Tue Nov 13 13:49:39
2007
@@ -163,6 +163,7 @@
assert(state == EXECUTING || state == DISPATCHING);
Mutex::ScopedUnlock u(lock);
// No exceptions allowed in task.
+ notifyWorker();
try { task(); } catch (...) { assert(0); }
}