I am having a problem stopping a busy message listener that is busy. When I attempt to stop subscription/subscription manager for a message listener that is processing messages from a queue with a high rate of message traffic I get random coring.

I have created a message listener thats runs in a separate thread created by calling SubscriptionManager::start(). When I call SubscriptionManager::stop() from another thread my process cores. If the message listener is idle then my program will not.

I discovered this problem as I was trying to figure out how to manually acknowledge a message from the broker. Below I have included the code that I wrote.

To reproduce the problem do the following:

   * Create a direct exchange named "TestExchange"
   * Create a queue named "TestQueue"
   * Bind TestQueue to TestExchange with the binding key "binding_key"
   * Run the sender program. (This will send one message to the
     TestExchange and then exit.
     $ ./sender
   * Run the receiver program, then after it starts receiving message
     press control+c to terminate it.
     $ ./receiver

It might be necessary to run the program a few times to get it to core.
Here is the stack trace:

#0  0x00007f3aa4829fb5 in raise () from /lib/libc.so.6
(gdb) bt
#0  0x00007f3aa4829fb5 in raise () from /lib/libc.so.6
#1  0x00007f3aa482bbc3 in abort () from /lib/libc.so.6
#2  0x00007f3aa4822f09 in __assert_fail () from /lib/libc.so.6
#3 0x00007f3aa577cadd in qpid::sys::Mutex::lock (this=<value optimized out>)
   at ./qpid/sys/posix/Mutex.h:116
#4  0x00007f3aa5387e4e in qpid::client::Dispatcher::run (this=0x25c1da0)
   at ./qpid/sys/Mutex.h:44
#5  0x00007f3aa5773cca in runRunnable (p=0x2518)
   at qpid/sys/posix/Thread.cpp:35
#6  0x00007f3aa3b893ba in start_thread () from /lib/libpthread.so.0
#7  0x00007f3aa48dcfcd in clone () from /lib/libc.so.6
#8  0x0000000000000000 in ?? ()

I am running QPID along with my programs on Ubuntu 9.04.

Since I intentionally made the receiver program release all messages instead of accepting the message, the one message that I sent will remain in the queue indefinitely and the receiver will continually receive the message over and over again until you press control-c.

Can you tell me whether the code that I wrote in the method MyMessageHandler::stop() is calling the various qpid methods in correct sequence?


Thanks,
Jason Jones


receiver.cpp (This is the program that cores)
#include <qpid/client/Connection.h>
#include <qpid/client/Session.h>
#include <qpid/client/Message.h>
#include <qpid/client/MessageListener.h>
#include <qpid/client/Subscription.h>
#include <qpid/client/SubscriptionManager.h>
#include <qpid/client/SubscriptionManager.h>

#include <exception>
#include <iostream>
#include <string>

#include <signal.h>
#include <time.h>

using namespace std;
using namespace qpid::client;

sig_atomic_t shutdown = 0;

void sigIntHandler(int signo)
{
   shutdown = 1;
}

class MyMessageHandler : public MessageListener
{
   public:
       MyMessageHandler();
       ~MyMessageHandler();
       void received(Message& message);
       void start(Session& session, const string& queue);
       void stop();
private:
       MyMessageHandler(const MyMessageHandler&);
       MyMessageHandler& operator=(const MyMessageHandler&);
       Subscription subscription_;
       SubscriptionManager* subscriptionManager_;
};

MyMessageHandler::MyMessageHandler()
   : MessageListener(),
     subscription_(),
     subscriptionManager_(0)
{
}

MyMessageHandler::~MyMessageHandler()
{
   stop();
   delete subscriptionManager_;
   subscriptionManager_ = 0;
}

void
MyMessageHandler::received(Message& message)
{
   cout << "Received message" << endl;
   subscription_.acquire(message);
   subscription_.release(message);
}


void
MyMessageHandler::start(Session& session, const string& queue)
{
   SubscriptionSettings subscriptionSettings;

   subscriptionSettings.autoAck = 0;
   subscriptionSettings.acquireMode = ACQUIRE_MODE_PRE_ACQUIRED;
try
   {
       subscriptionManager_ = new SubscriptionManager(session);
       subscriptionManager_->setAutoStop(true);
subscription_ = subscriptionManager_->subscribe(*this, queue, subscriptionSettings, "jason");
       subscriptionManager_->start();
   }
   catch(...)
   {
       delete subscriptionManager_;
       subscriptionManager_ = 0;
       throw;
   }
}


void
MyMessageHandler::stop()
{
   if(0 != subscriptionManager_)
   {
       // I think order matters on these two lines
       subscriptionManager_->stop();
       subscription_.cancel();
       delete subscriptionManager_;
       subscriptionManager_ = 0;
   }
}


int main(int argc, char* argv[])
{
   Connection connection;
   Session session;
   string host = "localhost";
   int port = 5672;
   string exchange = "TestExchange";
   struct sigaction action = {0};
   struct timespec second = {1, 0};
   MyMessageHandler MyMessageHandler;
action.sa_handler = sigIntHandler;
   action.sa_flags = SA_RESTART;
sigaction(SIGINT, &action, 0); try
   {
       connection.open(host, port);
       session = connection.newSession();
       cout << "Connected to broker" << endl;
       MyMessageHandler.start(session, string("TestQueue"));
while(0 == shutdown)
       {
           nanosleep(&second, 0);
       }
       cout << "Shutting down" << endl;
       MyMessageHandler.stop();
       session.close();
       connection.close();
   }
   catch(const exception& e)
   {
       cerr << "ERROR: " << e.what() << endl;
   }
return 0;
}




sender.cpp
#include <qpid/client/Connection.h>
#include <qpid/client/Session.h>
#include <qpid/client/Message.h>

#include <exception>
#include <iostream>
#include <string>


using namespace std;
using namespace qpid::client;


int main(int argc, char* argv[])
{
   Connection connection;
   Session session;
   string host = "eagle";
   int port = 5672;
   string exchange = "TestExchange";
   Message message;
try
   {
       connection.open(host, port);
       session = connection.newSession();
message.getDeliveryProperties().setRoutingKey("routing_key");
       message.setData("Hello World");
cout << "Sending message" << endl;
       session.messageTransfer(exchange, 1, 0, message);
       cout << "Message sent" << endl;
session.close();
       connection.close();
   }
   catch(const exception& e)
   {
       cerr << "ERROR: " << e.what() << endl;
   }
return 0;
}



Makefile
CC = g++
INCLUDES = -I.

SENDER_EXEC = sender
SENDER_MAIN = sender.cpp
RECEIVER_EXEC = receiver
RECEIVER_MAIN = receiver.cpp

LIBRARIES = -lqpidcommon \
           -lqpidclient

all: linkSender linkReceiver

linkSender:
   $(CC) -o $(SENDER_EXEC) $(SENDER_MAIN) $(INCLUDES) $(LIBRARIES)

linkReceiver:
   $(CC) -o $(RECEIVER_EXEC) $(RECEIVER_MAIN) $(INCLUDES) $(LIBRARIES)

clean:
   rm -f ../lib/${LIBRARY} ${OBJECTS} ${SENDER_EXEC}


Reply via email to