Author: gsim
Date: Fri Oct 12 07:52:36 2007
New Revision: 584172
URL: http://svn.apache.org/viewvc?rev=584172&view=rev
Log:
Further fixes to locking between queue and semantic state to avoid deadlocking.
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=584172&r1=584171&r2=584172&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Fri Oct 12 07:52:36
2007
@@ -124,21 +124,20 @@
return false;
}
-void Queue::requestDispatch(Consumer* c, bool sync){
+void Queue::requestDispatch(Consumer* c){
if (!c || c->preAcquires()) {
- if (sync) {
- Mutex::ScopedLock locker(messageLock);
- dispatch();
- } else {
- serializer.execute(dispatchCallback);
- }
+ serializer.execute(dispatchCallback);
} else {
- //note: this is always done on the callers thread, regardless
- // of sync; browsers of large queues should use flow control!
serviceBrowser(c);
}
}
+void Queue::flush(DispatchCompletion& completion)
+{
+ DispatchFunctor f(*this, &completion);
+ serializer.execute(f);
+}
+
Consumer* Queue::allocate()
{
RWlock::ScopedWlock locker(consumerLock);
@@ -179,9 +178,18 @@
} else {
break;
}
- }
- RWlock::ScopedRlock locker(consumerLock);
- for (Consumers::iterator i = browsers.begin(); i != browsers.end(); i++)
{
+ }
+ serviceAllBrowsers();
+}
+
+void Queue::serviceAllBrowsers()
+{
+ Consumers copy;
+ {
+ RWlock::ScopedRlock locker(consumerLock);
+ copy = browsers;
+ }
+ for (Consumers::iterator i = copy.begin(); i != copy.end(); i++) {
serviceBrowser(*i);
}
}
@@ -428,3 +436,4 @@
{
return alternateExchange;
}
+
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=584172&r1=584171&r2=584172&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Fri Oct 12 07:52:36
2007
@@ -48,20 +48,32 @@
using std::string;
+ struct DispatchCompletion
+ {
+ virtual ~DispatchCompletion() {}
+ virtual void completed() = 0;
+ };
+
/**
* The brokers representation of an amqp queue. Messages are
* delivered to a queue from where they can be dispatched to
* registered consumers or be stored until dequeued or until one
* or more consumers registers.
*/
- class Queue : public PersistableQueue{
+ class Queue : public PersistableQueue {
typedef std::vector<Consumer*> Consumers;
typedef std::deque<QueuedMessage> Messages;
- struct DispatchFunctor {
+ struct DispatchFunctor
+ {
Queue& queue;
- DispatchFunctor(Queue& q) : queue(q) {}
- void operator()() { queue.dispatch(); }
+ DispatchCompletion* sync;
+ DispatchFunctor(Queue& q, DispatchCompletion* s = 0) :
queue(q), sync(s) {}
+ void operator()()
+ {
+ queue.dispatch();
+ if (sync) sync->completed();
+ }
};
const string name;
@@ -93,6 +105,7 @@
*/
void dispatch();
void cancel(Consumer* c, Consumers& set);
+ void serviceAllBrowsers();
void serviceBrowser(Consumer* c);
Consumer* allocate();
bool seek(QueuedMessage& msg, const framing::SequenceNumber&
position);
@@ -149,7 +162,8 @@
* at any time, so this call schedules the despatch based on
* the serilizer policy.
*/
- void requestDispatch(Consumer* c = 0, bool sync = false);
+ void requestDispatch(Consumer* c = 0);
+ void flush(DispatchCompletion& callback);
void consume(Consumer* c, bool exclusive = false);
void cancel(Consumer* c);
uint32_t purge();
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=584172&r1=584171&r2=584172&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Fri Oct 12
07:52:36 2007
@@ -346,38 +346,40 @@
void SemanticState::ack(DeliveryId first, DeliveryId last, bool cumulative)
{
- Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible
concurrent delivery
-
- ack_iterator start = cumulative ? unacked.begin() :
- find_if(unacked.begin(), unacked.end(),
bind2nd(mem_fun_ref(&DeliveryRecord::matchOrAfter), first));
- ack_iterator end = start;
-
- if (cumulative || first != last) {
- //need to find end (position it just after the last record in range)
- end = find_if(start, unacked.end(),
bind2nd(mem_fun_ref(&DeliveryRecord::after), last));
- } else {
- //just acked single element (move end past it)
- ++end;
- }
-
- for_each(start, end, boost::bind(&SemanticState::acknowledged, this, _1));
-
- if (txBuffer.get()) {
- //in transactional mode, don't dequeue or remove, just
- //maintain set of acknowledged messages:
- accumulatedAck.update(cumulative ? accumulatedAck.mark : first, last);
+ {
+ Mutex::ScopedLock locker(deliveryLock);//need to synchronize with
possible concurrent delivery
- if (dtxBuffer.get()) {
- //if enlisted in a dtx, remove the relevant slice from
- //unacked and record it against that transaction
- TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked));
- accumulatedAck.clear();
- dtxBuffer->enlist(txAck);
+ ack_iterator start = cumulative ? unacked.begin() :
+ find_if(unacked.begin(), unacked.end(),
bind2nd(mem_fun_ref(&DeliveryRecord::matchOrAfter), first));
+ ack_iterator end = start;
+
+ if (cumulative || first != last) {
+ //need to find end (position it just after the last record in
range)
+ end = find_if(start, unacked.end(),
bind2nd(mem_fun_ref(&DeliveryRecord::after), last));
+ } else {
+ //just acked single element (move end past it)
+ ++end;
}
- } else {
- for_each(start, end, bind2nd(mem_fun_ref(&DeliveryRecord::dequeue),
0));
- unacked.erase(start, end);
- }
+
+ for_each(start, end, boost::bind(&SemanticState::acknowledged, this,
_1));
+
+ if (txBuffer.get()) {
+ //in transactional mode, don't dequeue or remove, just
+ //maintain set of acknowledged messages:
+ accumulatedAck.update(cumulative ? accumulatedAck.mark : first,
last);
+
+ if (dtxBuffer.get()) {
+ //if enlisted in a dtx, remove the relevant slice from
+ //unacked and record it against that transaction
+ TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked));
+ accumulatedAck.clear();
+ dtxBuffer->enlist(txAck);
+ }
+ } else {
+ for_each(start, end,
bind2nd(mem_fun_ref(&DeliveryRecord::dequeue), 0));
+ unacked.erase(start, end);
+ }
+ }//end of lock scope for delivery lock (TODO this is ugly, make it
prettier)
//if the prefetch limit had previously been reached, or credit
//had expired in windowing mode there may be messages that can
@@ -525,12 +527,10 @@
void SemanticState::ConsumerImpl::flush()
{
//need to prevent delivery after requestDispatch returns but
- //before credit is reduced to zero; TODO: come up with better
- //implementation of flush.
- Mutex::ScopedLock l(lock);
- queue->requestDispatch(this, true);
- byteCredit = 0;
- msgCredit = 0;
+ //before credit is reduced to zero
+ FlushCompletion completion(*this);
+ queue->flush(completion);
+ completion.wait();
}
void SemanticState::ConsumerImpl::stop()
@@ -597,6 +597,21 @@
for_each(range.start, range.end, mem_fun_ref(&DeliveryRecord::reject));
//need to remove the delivery records as well
unacked.erase(range.start, range.end);
+}
+
+
+void SemanticState::FlushCompletion::wait()
+{
+ Monitor::ScopedLock locker(lock);
+ while (!complete) lock.wait();
+}
+
+void SemanticState::FlushCompletion::completed()
+{
+ Monitor::ScopedLock locker(lock);
+ consumer.stop();
+ complete = true;
+ lock.notifyAll();
}
}} // namespace qpid::broker
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h?rev=584172&r1=584171&r2=584172&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h Fri Oct 12
07:52:36 2007
@@ -89,6 +89,17 @@
void acknowledged(const DeliveryRecord&);
};
+ struct FlushCompletion : DispatchCompletion
+ {
+ sys::Monitor lock;
+ ConsumerImpl& consumer;
+ bool complete;
+
+ FlushCompletion(ConsumerImpl& c) : consumer(c), complete(false) {}
+ void wait();
+ void completed();
+ };
+
typedef boost::ptr_map<string,ConsumerImpl> ConsumerImplMap;
SessionState& session;