ZeroMQ 2.1.0 using c++ binding WinXP, MSVC 2008 (code follows, summary of problem first)
I'm adapting part of an existing project, to see how it performs with zeromq as its networking layer. The test is using pub/sub over tcp. In my initial test case, I queue up a random assortment of messages and publish them, without having connected any subscribers. This continues for a short period, before I signal the publishing thread to shutdown. both zmq::context_t and zmq::socket_t are in the local scope, and are auto destroyed when scope exits. However, the thread never exits. Pausing execution, I find it stuck in socket_base.cpp: > libzmq.dll!zmq::socket_base_t::process_commands(bool block_=false, bool > throttle_=false) Line 645 C++ libzmq.dll!zmq::socket_base_t::dezombify() Line 605 C++ libzmq.dll!zmq::ctx_t::dezombify() Line 318 + 0xf bytes C++ libzmq.dll!zmq::ctx_t::terminate() Line 150 C++ libzmq.dll!zmq_term(void * ctx_=0x003d9a40) Line 255 + 0x8 bytes C++ TradingLink.exe!zmq::context_t::~context_t() Line 183 + 0xe bytes C++ TradingLink.exe!CZeroMQNode::PUBLISHThread(void * pThreadCTX=0x003d99f8) Line 299 + 0x30 bytes C++ Th result of line 645 (rc = mailbox.recv(&cmd, false); ) is EAGAIN, causing the following while loop to break and control to pass back up the chain. Ultimately control appears to never exit the loop in zmq::ctx_t::dezombify() What follows is the code snippet. I've wrapped the problem area in a Try/catch to force scoping. Execution never reaches " TRACE("Test Exit: Success\r\n");" Can someone take a look and tell me what I've messed up? Thank you! Martin CODE---- try { zmq::context_t oCtx(1); zmq::socket_t o0MQOrderPUB(oCtx, ZMQ_PUB); o0MQOrderPUB.connect("tcp://192.168.1.101:15000"); int nVal = errno; if (nVal != 0) { /* todo error handling */ DebugBreak(); } bool bContinue = true; while (bContinue) { bContinue = !poContext->poParent->m_bShutdown; //tbb atomic operation if (bContinue) { CZeroMQNode::TPublishRecord * poRecord=NULL; while ( poContext->poParent->m_listPublishQueue.try_pop(poRecord) ) // tbb concurrent queue, anything available? { zmq::message_t poMsg( sizeof(tradelink::TMessageHeader) + poRecord->nSize); char * poBuff = (char*)poMsg.data(); tradelink::TMessageHeader * poHeader = (tradelink::TMessageHeader*)poBuff; poHeader->nSentinel = tradelink::TMessageHeader::nMessageSentinelValue; poHeader->nSourceNode = poContext->poParent->m_oConfig.nApplicationID; poHeader->nPayLoadLength = poRecord->nSize; poHeader->eMsgType = poRecord->eMsgType; poHeader->nTimeStamp = timeGetTime(); memcpy(&poBuff[ sizeof(tradelink::TMessageHeader) ], poRecord->poBuffer, poRecord->nSize); if (o0MQOrderPUB.send(poMsg)) { int rc = errno; DebugBreak(); } delete poRecord; } ::Sleep(15); // yield } } TRACE("ZMQ Publish Thread exiting.\r\n"); } catch (CException * ex) { TRACE("Exception"); } TRACE("Test Exit: Success\r\n"); _______________________________________________ zeromq-dev mailing list zeromq-dev@lists.zeromq.org http://lists.zeromq.org/mailman/listinfo/zeromq-dev