Hello there,
We are using qpid0.8 c++. We have a typical client/server setup where server
gets requests from a queue and process it. We use the qpid callback mechanism
for that purpose. The problem is that from time to time, a bad request showed
up and crashed our server in processing. But the request is still considered
sitting on the queue. Therefore another instance of the server tries to do the
same thing and in turn crashes again. Leaving our servers in a not useable
state. Is there a way to work around this or rather what mistakes I have made
here? What server does is kinds of complicated and it uses some legacy and
3party libraries therefore not easy to prevent it from crashing. Crashing
once is fine as long as the subsequent good requests get processed normally.
But what we see is that the bad request will be picked up by the next server
and we can not get out of that state. Ideally after the message layer
delivers the message to the application level, the message itself is considered
consumed. Whatever stupid and bad things happen in the application codes
should not matter much. After all what qpid does should be delivery only. As
long as this is done, it is done. Why we still see the message sitting there?
So my question is, when a message in a queue is marked as consumed? Ideally
it should be right before the received(.) callback is invoked. But from what I
see looks like it is not.
As a workaround, can I call some routine to tell the broker to remove the
message on queue?
Thank you very much for your help, really appreciate it!
In Qpid::client::Dispatcher
void Dispatcher::run()
{
Mutex::ScopedLock l(lock);
if (running)
throw Exception("Dispatcher is already running.");
state_saver<bool> reset(running); // Reset to false on exit.
running = true;
try {
while (!queue->isClosed()) {
Mutex::ScopedUnlock u(lock);
FrameSet::shared_ptr content = queue->pop();
if (content->isA<MessageTransferBody>()) {
Message msg(new MessageImpl(*content));
boost::intrusive_ptr<SubscriptionImpl> listener =
find(msg.getDestination());
if (!listener) {
QPID_LOG(error, "No listener found for destination " <<
msg.getDestination());
} else {
assert(listener);
listener->received(msg);
}
} else {
if (handler.get()) {
handler->handle(*content);
} else {
QPID_LOG(warning, "No handler found for " <<
*(content->getMethod()));
}
}
}
session.sync(); // Make sure all our acks are received before
returning.
}
catch (const ClosedException&) {
QPID_LOG(debug, QPID_MSG(session.getId() << ": closed by peer"));
}
catch (const TransportFailure&) {
QPID_LOG(info, QPID_MSG(session.getId() << ": transport failure"));
throw;
}
catch (const std::exception& e) {
if ( failoverHandler ) {
QPID_LOG(debug, QPID_MSG(session.getId() << " failover: " <<
e.what()));
failoverHandler();
} else {
QPID_LOG(error, session.getId() << " error: " << e.what());
throw;
}
}
Inside qpid::client::SubscriptionImpl::received() :
void SubscriptionImpl::received(Message& m) {
Mutex::ScopedLock l(lock);
MessageImpl& mi = *MessageImpl::get(m);
if (mi.getMethod().getAcquireMode() == ACQUIRE_MODE_NOT_ACQUIRED)
unacquired.add(m.getId());
else if (mi.getMethod().getAcceptMode() == ACCEPT_MODE_EXPLICIT)
unaccepted.add(m.getId());
if (listener) {
Mutex::ScopedUnlock u(lock);
listener->received(m); <--if crashes here,what will happen? the
message considered consumed and removed from queue yet, no?
}
if (settings.completionMode == COMPLETE_ON_DELIVERY) {
manager.getSession().markCompleted(m.getId(), false, false);
}
if (settings.autoAck) {
if (unaccepted.size() >= settings.autoAck) {
async(manager.getSession()).messageAccept(unaccepted);
switch (settings.completionMode) {
case COMPLETE_ON_ACCEPT:
manager.getSession().markCompleted(unaccepted, true);
break;
case COMPLETE_ON_DELIVERY:
manager.getSession().sendCompletion();
break;
default://do nothing
break;
}
unaccepted.clear();
}
}
}
This email is confidential and subject to important disclaimers and
conditions including on offers for the purchase or sale of
securities, accuracy and completeness of information, viruses,
confidentiality, legal privilege, and legal entity disclaimers,
available at http://www.jpmorgan.com/pages/disclosures/email.