Hi Mark, Thanks for the patch. I am currently out of the town. I'll be back on Friday. I'll review and apply your patch then.
Martin On 10/11/2010, "Marc Rossi" <[email protected]> wrote: >Here is code that can recreate the problem. The first pastebin link shows >changes that need to be made to the socket_base_t::recv() method. Didn't >post the entire func but it should be obvious where it fits in. These >changes need to be applied to library ilnked in the test apps to recreate >the timing necessary for the bug to surface. > >http://pastebin.com/RA2cJrt6 > >The next two pastebin links are for pub.cpp and sub.cpp, respectively. > Build and link against libzmq with the modified socket_base_t::recv() >method. > >pub.cpp: http://pastebin.com/SqqaBnYc >sub.cpp: http://pastebin.com/6s0xwtc8 > >Follow the below steps: > >1. Start the pub process in a terminal -- it will create a PUB socket and >wait for user input before publshing data. >2. Start the sub process in a different terminal -- it will create a SUB >socket and connect to the PUB socket from step 1. It will then wait for >user input before attempting to read from the socket. >3. Hit enter in the pub terminal. This will publish exactly 199 messages >and then wait for user input before publishing more messages. >4. Hit enter in the sub terminal This will receive exactly 199 messages >before the modified socket_base.cpp code pauses and waits for user input. > At this point the sub socket has no messages on the queue and no signals to >be read but is about to call process_commands(). >5. Hit enter in the pub terminal to allow it to continue publishing >messages. The sub client now has both a message in the pipe and a revive >signal waiting for it when it awakens. >6. Hit enter in the sub terminal to allow it to process the newly published >messages. Unfortunately it will first process the revive signal then call >process_commands() further down with block = true. Since there are no more >signals to be read it will block even though there are messages continuing >to be added to the queue. > >Apply the patch previously provided and try the same test, all should work >normally. > > >On Tue, Nov 9, 2010 at 2:15 PM, Marc Rossi <[email protected]> wrote: > >> Main thread calls recv() and hangs forever (after working fine for a period >> of time), memory usage grows continuously while io thread pulls data from >> socket and pushes on the internal queue. netstat -a shows no data in recv-q >> because io thread continues to work properly and pull data from the socket. >> >> This occurs under the following scenario: >> >> User code calls socket_base_t::recv() indirectly through higher level >> zeromq API call when there are no messages waiting. Previous 99 >> (inbound_poll_rate - 1) calls to the recv() function returned an already >> waiting message fetched by the xrecv() call at the start of the function(). >> >> This 100th call to recv() is as stated above has no messages waiting to be >> read so the xrecv() call fails and rc = -1. Immediately after this call to >> xrecv() but BEFORE the conditional statement "if (++ticks == >> inbound_poll_rate)" a message arrives and is processed by the io thread, >> resulting in the generation of a revive signal as the new message is pushed >> onto the queue. Since ++ticks is now 100 (inbound_poll_rate) the above >> conditional is true and app_thread_t::process_commands() is called, >> processing the revive signal. >> >> Since this is a BLOCKING socket and rc != 0 we fall down to the loop at the >> end of the recv() function that unfortunately for us calls the >> app_thread_t::process_commands() method with block_ = true before calling >> xrecv(). Since we already read the revive signal above we are now officially >> hung as there is still a message in the queue and there will be no more >> revive signals generated by the io thread because of that. >> >> To test that this is indeed what is happening I did the following. Added an >> integer reference as a third parameter to the >> app_thread_t::process_commands() method that is set to the number of >> commands received and processed. Immediately before AND after calling >> process_commands() method in the final loop of socket_base_t::recv() I added >> a deug print statement that is executed ONLY if the prior call to >> process_commands() returned a value > 0 for the third param. After running >> the test code for about an hour the scenario described above occurred with >> the debug print prior to the process_commands() call being displayed and >> then the process was hung. >> >> Below is the simple patch that seems to fix the problem for me. This will >> incur a small penalty when ticks == 0 and there are no messages waiting to >> be read as the initial call to process_commands will return immediately due >> to block being set to false. This could be made more efficient if the >> process_commands() method took a 3rd param as a bool that was set to true if >> commands were actually processed, then we would ONLY set block = false when >> the previous call to process_commands() actually did something, not rely on >> the ticks = 0 line in the if/then block. >> >> >> >> >> >> From 8d45a82d9cf7b788a3bed5014420962ea4ca5969 Mon Sep 17 00:00:00 2001 >> From: Marc Rossi <[email protected]> >> Date: Tue, 9 Nov 2010 13:46:06 -0600 >> Subject: [PATCH] Fix socket_t::recv() hang scenario where initial call to >> process_commands() eats signal >> >> Added block boolean var to second process_commands() invocation for >> blocking sockets >> instead of always using true. This prevents the process_commands() call >> from hanging >> when a message is received with an empty queue after the call to xrecv() >> but >> prior to the initial call to process_commands() invoked when ++ticks == >> inbound_poll_rate. >> >> Signed-off-by: Marc Rossi <[email protected]> >> --- >> src/socket_base.cpp | 4 +++- >> 1 files changed, 3 insertions(+), 1 deletions(-) >> >> diff --git a/src/socket_base.cpp b/src/socket_base.cpp >> index c933954..344b552 100644 >> --- a/src/socket_base.cpp >> +++ b/src/socket_base.cpp >> @@ -437,15 +437,17 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int >> flags_) >> >> // In blocking scenario, commands are processed over and over again >> until >> // we are able to fetch a message. >> + bool block = (ticks != 0); >> while (rc != 0) { >> if (errno != EAGAIN) >> return -1; >> - if (unlikely (!app_thread->process_commands (true, false))) { >> + if (unlikely (!app_thread->process_commands (block, false))) { >> errno = ETERM; >> return -1; >> } >> rc = xrecv (msg_, flags_); >> ticks = 0; >> + block = true; >> } >> >> rcvmore = msg_->flags & ZMQ_MSG_MORE; >> -- >> 1.7.2.3 >> >> _______________________________________________ zeromq-dev mailing list [email protected] http://lists.zeromq.org/mailman/listinfo/zeromq-dev
