I am having a problem stopping a busy message listener that is busy.
When I attempt to stop subscription/subscription manager for a message
listener that is processing messages from a queue with a high rate of
message traffic I get random coring.
I have created a message listener thats runs in a separate thread
created by calling SubscriptionManager::start(). When I call
SubscriptionManager::stop() from another thread my process cores. If
the message listener is idle then my program will not.
I discovered this problem as I was trying to figure out how to manually
acknowledge a message from the broker. Below I have included the code
that I wrote.
To reproduce the problem do the following:
* Create a direct exchange named "TestExchange"
* Create a queue named "TestQueue"
* Bind TestQueue to TestExchange with the binding key "binding_key"
* Run the sender program. (This will send one message to the
TestExchange and then exit.
$ ./sender
* Run the receiver program, then after it starts receiving message
press control+c to terminate it.
$ ./receiver
It might be necessary to run the program a few times to get it to core.
Here is the stack trace:
#0 0x00007f3aa4829fb5 in raise () from /lib/libc.so.6
(gdb) bt
#0 0x00007f3aa4829fb5 in raise () from /lib/libc.so.6
#1 0x00007f3aa482bbc3 in abort () from /lib/libc.so.6
#2 0x00007f3aa4822f09 in __assert_fail () from /lib/libc.so.6
#3 0x00007f3aa577cadd in qpid::sys::Mutex::lock (this=<value optimized
out>)
at ./qpid/sys/posix/Mutex.h:116
#4 0x00007f3aa5387e4e in qpid::client::Dispatcher::run (this=0x25c1da0)
at ./qpid/sys/Mutex.h:44
#5 0x00007f3aa5773cca in runRunnable (p=0x2518)
at qpid/sys/posix/Thread.cpp:35
#6 0x00007f3aa3b893ba in start_thread () from /lib/libpthread.so.0
#7 0x00007f3aa48dcfcd in clone () from /lib/libc.so.6
#8 0x0000000000000000 in ?? ()
I am running QPID along with my programs on Ubuntu 9.04.
Since I intentionally made the receiver program release all messages
instead of accepting the message, the one message that I sent will
remain in the queue indefinitely and the receiver will continually
receive the message over and over again until you press control-c.
Can you tell me whether the code that I wrote in the method
MyMessageHandler::stop() is calling the various qpid methods in correct
sequence?
Thanks,
Jason Jones
receiver.cpp (This is the program that cores)
#include <qpid/client/Connection.h>
#include <qpid/client/Session.h>
#include <qpid/client/Message.h>
#include <qpid/client/MessageListener.h>
#include <qpid/client/Subscription.h>
#include <qpid/client/SubscriptionManager.h>
#include <qpid/client/SubscriptionManager.h>
#include <exception>
#include <iostream>
#include <string>
#include <signal.h>
#include <time.h>
using namespace std;
using namespace qpid::client;
sig_atomic_t shutdown = 0;
void sigIntHandler(int signo)
{
shutdown = 1;
}
class MyMessageHandler : public MessageListener
{
public:
MyMessageHandler();
~MyMessageHandler();
void received(Message& message);
void start(Session& session, const string& queue);
void stop();
private:
MyMessageHandler(const MyMessageHandler&);
MyMessageHandler& operator=(const MyMessageHandler&);
Subscription subscription_;
SubscriptionManager* subscriptionManager_;
};
MyMessageHandler::MyMessageHandler()
: MessageListener(),
subscription_(),
subscriptionManager_(0)
{
}
MyMessageHandler::~MyMessageHandler()
{
stop();
delete subscriptionManager_;
subscriptionManager_ = 0;
}
void
MyMessageHandler::received(Message& message)
{
cout << "Received message" << endl;
subscription_.acquire(message);
subscription_.release(message);
}
void
MyMessageHandler::start(Session& session, const string& queue)
{
SubscriptionSettings subscriptionSettings;
subscriptionSettings.autoAck = 0;
subscriptionSettings.acquireMode = ACQUIRE_MODE_PRE_ACQUIRED;
try
{
subscriptionManager_ = new SubscriptionManager(session);
subscriptionManager_->setAutoStop(true);
subscription_ = subscriptionManager_->subscribe(*this, queue,
subscriptionSettings, "jason");
subscriptionManager_->start();
}
catch(...)
{
delete subscriptionManager_;
subscriptionManager_ = 0;
throw;
}
}
void
MyMessageHandler::stop()
{
if(0 != subscriptionManager_)
{
// I think order matters on these two lines
subscriptionManager_->stop();
subscription_.cancel();
delete subscriptionManager_;
subscriptionManager_ = 0;
}
}
int main(int argc, char* argv[])
{
Connection connection;
Session session;
string host = "localhost";
int port = 5672;
string exchange = "TestExchange";
struct sigaction action = {0};
struct timespec second = {1, 0};
MyMessageHandler MyMessageHandler;
action.sa_handler = sigIntHandler;
action.sa_flags = SA_RESTART;
sigaction(SIGINT, &action, 0);
try
{
connection.open(host, port);
session = connection.newSession();
cout << "Connected to broker" << endl;
MyMessageHandler.start(session, string("TestQueue"));
while(0 == shutdown)
{
nanosleep(&second, 0);
}
cout << "Shutting down" << endl;
MyMessageHandler.stop();
session.close();
connection.close();
}
catch(const exception& e)
{
cerr << "ERROR: " << e.what() << endl;
}
return 0;
}
sender.cpp
#include <qpid/client/Connection.h>
#include <qpid/client/Session.h>
#include <qpid/client/Message.h>
#include <exception>
#include <iostream>
#include <string>
using namespace std;
using namespace qpid::client;
int main(int argc, char* argv[])
{
Connection connection;
Session session;
string host = "eagle";
int port = 5672;
string exchange = "TestExchange";
Message message;
try
{
connection.open(host, port);
session = connection.newSession();
message.getDeliveryProperties().setRoutingKey("routing_key");
message.setData("Hello World");
cout << "Sending message" << endl;
session.messageTransfer(exchange, 1, 0, message);
cout << "Message sent" << endl;
session.close();
connection.close();
}
catch(const exception& e)
{
cerr << "ERROR: " << e.what() << endl;
}
return 0;
}
Makefile
CC = g++
INCLUDES = -I.
SENDER_EXEC = sender
SENDER_MAIN = sender.cpp
RECEIVER_EXEC = receiver
RECEIVER_MAIN = receiver.cpp
LIBRARIES = -lqpidcommon \
-lqpidclient
all: linkSender linkReceiver
linkSender:
$(CC) -o $(SENDER_EXEC) $(SENDER_MAIN) $(INCLUDES) $(LIBRARIES)
linkReceiver:
$(CC) -o $(RECEIVER_EXEC) $(RECEIVER_MAIN) $(INCLUDES) $(LIBRARIES)
clean:
rm -f ../lib/${LIBRARY} ${OBJECTS} ${SENDER_EXEC}