Hi Fabien,

Here's improved version of the patch, with waiting using poll() or select().

Martin
>From d7923f08cab62ef40027a92f596ff45428870838 Mon Sep 17 00:00:00 2001
From: Fabien Ninoles <[email protected]>
Date: Fri, 17 Jun 2011 12:22:02 +0200
Subject: [PATCH] Add sockopt ZMQ_RCVTIMEO/ZMQ_SNDTIMEO.

- Add doc and tests
- Add options and setup
- Wait using poll/select

Signed-off-by: Fabien Ninoles <[email protected]>
Signed-off-by: Martin Sustrik <[email protected]>
---
 .gitignore             |    1 +
 doc/zmq_getsockopt.txt |   35 ++++++++++++++-
 doc/zmq_setsockopt.txt |   32 +++++++++++++
 include/zmq.h          |    2 +
 perf/remote_thr.cpp    |    2 +
 src/ctx.cpp            |    2 +-
 src/io_thread.cpp      |    2 +-
 src/mailbox.cpp        |  106 ++++++++++++++++++++++++++++++++++++++++---
 src/mailbox.hpp        |    7 +++-
 src/options.cpp        |   36 +++++++++++++++
 src/options.hpp        |    4 ++
 src/reaper.cpp         |    2 +-
 src/socket_base.cpp    |  100 ++++++++++++++++++++++++++++--------------
 src/socket_base.hpp    |    6 +-
 tests/Makefile.am      |    5 ++-
 tests/test_timeo.cpp   |  115 ++++++++++++++++++++++++++++++++++++++++++++++++
 16 files changed, 407 insertions(+), 50 deletions(-)
 create mode 100644 tests/test_timeo.cpp

diff --git a/.gitignore b/.gitignore
index b309c73..2adeb22 100644
--- a/.gitignore
+++ b/.gitignore
@@ -28,6 +28,7 @@ tests/test_reqrep_ipc
 tests/test_reqrep_tcp
 tests/test_shutdown_stress
 tests/test_hwm
+tests/test_timeo
 src/platform.hpp*
 src/stamp-h1
 devices/zmq_forwarder/zmq_forwarder
diff --git a/doc/zmq_getsockopt.txt b/doc/zmq_getsockopt.txt
index b48b06b..270ea6f 100644
--- a/doc/zmq_getsockopt.txt
+++ b/doc/zmq_getsockopt.txt
@@ -255,7 +255,8 @@ interval for the specified 'socket'.  This is the maximum period 0MQ shall wait
 between attempts to reconnect. On each reconnect attempt, the previous interval 
 shall be doubled untill ZMQ_RECONNECT_IVL_MAX is reached. This allows for 
 exponential backoff strategy. Default value means no exponential backoff is 
-performed and reconnect interval calculations are only based on ZMQ_RECONNECT_IVL.
+performed and reconnect interval calculations are only based on
+ZMQ_RECONNECT_IVL.
 
 NOTE:  Values less than ZMQ_RECONNECT_IVL will be ignored.
 
@@ -324,6 +325,38 @@ Default value:: 1
 Applicable socket types:: ZMQ_SUB, ZMQ_XSUB
 
 
+ZMQ_RCVTIMEO: Maximum time before a socket operation returns with EAGAIN
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Retrieve the timeout for recv operation on the socket.  If the value is `0`,
+_zmq_recv(3)_ will return immediately, with a EAGAIN error if there is no
+message to receive. If the value is `-1`, it will block until a message is
+available. For all other values, it will wait for a message for that amount
+of time before returning with an EAGAIN error.
+
+[horizontal]
+Option value type:: int
+Option value unit:: milliseconds
+Default value:: -1 (infinite)
+Applicable socket types:: all
+
+
+ZMQ_SNDTIMEO: Maximum time before a socket operation returns with EAGAIN
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Retrieve the timeout for send operation on the socket. If the value is `0`,
+_zmq_send(3)_ will return immediately, with a EAGAIN error if the message
+cannot be sent. If the value is `-1`, it will block until the message is sent.
+For all other values, it will try to send the message for that amount of time
+before returning with an EAGAIN error.
+
+[horizontal]
+Option value type:: int
+Option value unit:: milliseconds
+Default value:: -1 (infinite)
+Applicable socket types:: all
+
+
 ZMQ_FD: Retrieve file descriptor associated with the socket
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 The 'ZMQ_FD' option shall retrieve the file descriptor associated with the
diff --git a/doc/zmq_setsockopt.txt b/doc/zmq_setsockopt.txt
index 4b639c5..0093085 100644
--- a/doc/zmq_setsockopt.txt
+++ b/doc/zmq_setsockopt.txt
@@ -328,6 +328,38 @@ Default value:: 1
 Applicable socket types:: ZMQ_SUB, ZMQ_XSUB
 
 
+ZMQ_RCVTIMEO: Maximum time before a recv operation returns with EAGAIN
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Sets the timeout for receive operation on the socket. If the value is `0`,
+_zmq_recv(3)_ will return immediately, with a EAGAIN error if there is no
+message to receive. If the value is `-1`, it will block until a message is
+available. For all other values, it will wait for a message for that amount
+of time before returning with an EAGAIN error.
+
+[horizontal]
+Option value type:: int
+Option value unit:: milliseconds
+Default value:: -1 (infinite)
+Applicable socket types:: all
+
+
+ZMQ_SNDTIMEO: Maximum time before a send operation returns with EAGAIN
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Sets the timeout for send operation on the socket. If the value is `0`,
+_zmq_send(3)_ will return immediately, with a EAGAIN error if the message
+cannot be sent. If the value is `-1`, it will block until the message is sent.
+For all other values, it will try to send the message for that amount of time
+before returning with an EAGAIN error.
+
+[horizontal]
+Option value type:: int
+Option value unit:: milliseconds
+Default value:: -1 (infinite)
+Applicable socket types:: all
+
+
 RETURN VALUE
 ------------
 The _zmq_setsockopt()_ function shall return zero if successful. Otherwise it
diff --git a/include/zmq.h b/include/zmq.h
index 8d1d57b..ca34e58 100644
--- a/include/zmq.h
+++ b/include/zmq.h
@@ -181,6 +181,8 @@ ZMQ_EXPORT int zmq_term (void *context);
 #define ZMQ_RCVHWM 24
 #define ZMQ_MULTICAST_HOPS 25
 #define ZMQ_FILTER 26
+#define ZMQ_RCVTIMEO 27
+#define ZMQ_SNDTIMEO 28
     
 /*  Send/recv options.                                                        */
 #define ZMQ_DONTWAIT 1
diff --git a/perf/remote_thr.cpp b/perf/remote_thr.cpp
index ba36b98..363ae7c 100644
--- a/perf/remote_thr.cpp
+++ b/perf/remote_thr.cpp
@@ -88,6 +88,8 @@ int main (int argc, char *argv [])
         }
     }
 
+zmq_sleep (2);
+
     rc = zmq_close (s);
     if (rc != 0) {
         printf ("error in zmq_close: %s\n", zmq_strerror (errno));
diff --git a/src/ctx.cpp b/src/ctx.cpp
index fb5420d..783bcba 100644
--- a/src/ctx.cpp
+++ b/src/ctx.cpp
@@ -142,7 +142,7 @@ int zmq::ctx_t::terminate ()
 
     //  Wait till reaper thread closes all the sockets.
     command_t cmd;
-    int rc = term_mailbox.recv (&cmd, true);
+    int rc = term_mailbox.recv (&cmd, -1);
     if (rc == -1 && errno == EINTR)
         return -1;
     zmq_assert (rc == 0);
diff --git a/src/io_thread.cpp b/src/io_thread.cpp
index 9678392..c6f3880 100644
--- a/src/io_thread.cpp
+++ b/src/io_thread.cpp
@@ -70,7 +70,7 @@ void zmq::io_thread_t::in_event ()
 
         //  Get the next command. If there is none, exit.
         command_t cmd;
-        int rc = mailbox.recv (&cmd, false);
+        int rc = mailbox.recv (&cmd, 0);
         if (rc != 0 && errno == EINTR)
             continue;
         if (rc != 0 && errno == EAGAIN)
diff --git a/src/mailbox.cpp b/src/mailbox.cpp
index 221396b..402d025 100644
--- a/src/mailbox.cpp
+++ b/src/mailbox.cpp
@@ -18,8 +18,33 @@
     along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */
 
-#include "mailbox.hpp"
 #include "platform.hpp"
+
+#if defined ZMQ_FORCE_SELECT
+#define ZMQ_RCVTIMEO_BASED_ON_SELECT
+#elif defined ZMQ_FORCE_POLL
+#define ZMQ_RCVTIMEO_BASED_ON_POLL
+#elif defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_FREEBSD ||\
+    defined ZMQ_HAVE_OPENBSD || defined ZMQ_HAVE_SOLARIS ||\
+    defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_QNXNTO ||\
+    defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_AIX ||\
+    defined ZMQ_HAVE_NETBSD
+#define ZMQ_RCVTIMEO_BASED_ON_POLL
+#elif defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS
+#define ZMQ_RCVTIMEO_BASED_ON_SELECT
+#endif
+
+//  On AIX, poll.h has to be included before zmq.h to get consistent
+//  definition of pollfd structure (AIX uses 'reqevents' and 'retnevents'
+//  instead of 'events' and 'revents' and defines macros to map from POSIX-y
+//  names to AIX-specific names).
+#if defined ZMQ_RCVTIMEO_BASED_ON_POLL
+#include <poll.h>
+#elif defined ZMQ_RCVTIMEO_BASED_ON_SELECT
+#include <sys/select.h>
+#endif
+
+#include "mailbox.hpp"
 #include "err.hpp"
 #include "fd.hpp"
 #include "ip.hpp"
@@ -79,10 +104,14 @@ void zmq::mailbox_t::send (const command_t &cmd_)
     zmq_assert (nbytes == sizeof (command_t));
 }
 
-int zmq::mailbox_t::recv (command_t *cmd_, bool block_)
+int zmq::mailbox_t::recv (command_t *cmd_, int timeout_)
 {
+    //  If there's a finite timeout, poll on the fd.
+    if (timeout_ > 0)
+        return recv_timeout (cmd_, timeout_);
+
     //  If required, set the reader to blocking mode.
-    if (block_) {
+    if (timeout_ < 0) {
         unsigned long argp = 0;
         int rc = ioctlsocket (r, FIONBIO, &argp);
         wsa_assert (rc != SOCKET_ERROR);
@@ -97,7 +126,7 @@ int zmq::mailbox_t::recv (command_t *cmd_, bool block_)
         err = EAGAIN;
 
     //  Re-set the reader to non-blocking mode.
-    if (block_) {
+    if (timeout_ < 0) {
         unsigned long argp = 1;
         int rc = ioctlsocket (r, FIONBIO, &argp);
         wsa_assert (rc != SOCKET_ERROR);
@@ -194,20 +223,24 @@ void zmq::mailbox_t::send (const command_t &cmd_)
     zmq_assert (nbytes == sizeof (command_t));
 }
 
-int zmq::mailbox_t::recv (command_t *cmd_, bool block_)
+int zmq::mailbox_t::recv (command_t *cmd_, int timeout_)
 {
+    //  If there's a finite timeout, poll on the fd.
+    if (timeout_ > 0)
+        return recv_timeout (cmd_, timeout_);
+
 #ifdef MSG_DONTWAIT
 
     //  Attempt to read an entire command. Returns EAGAIN if non-blocking
     //  mode is requested and a command is not available.
     ssize_t nbytes = ::recv (r, cmd_, sizeof (command_t),
-        block_ ? 0 : MSG_DONTWAIT);
+        timeout_ < 0 ? 0 : MSG_DONTWAIT);
     if (nbytes == -1 && (errno == EAGAIN || errno == EINTR))
         return -1;
 #else
 
     //  If required, set the reader to blocking mode.
-    if (block_) {
+    if (timeout_ < 0) {
         int flags = fcntl (r, F_GETFL, 0);
         errno_assert (flags >= 0);
         int rc = fcntl (r, F_SETFL, flags & ~O_NONBLOCK);
@@ -223,7 +256,7 @@ int zmq::mailbox_t::recv (command_t *cmd_, bool block_)
         err = errno;
 
     //  Re-set the reader to non-blocking mode.
-    if (block_) {
+    if (timeout_ < 0) {
         int flags = fcntl (r, F_GETFL, 0);
         errno_assert (flags >= 0);
         int rc = fcntl (r, F_SETFL, flags | O_NONBLOCK);
@@ -380,3 +413,60 @@ int zmq::mailbox_t::make_socketpair (fd_t *r_, fd_t *w_)
 #endif
 }
 
+int zmq::mailbox_t::recv_timeout (command_t *cmd_, int timeout_)
+{
+#ifdef ZMQ_RCVTIMEO_BASED_ON_POLL
+
+    struct pollfd pfd;
+    pfd.fd = r;
+    pfd.events = POLLIN;
+    int rc = poll (&pfd, 1, timeout_);
+    if (unlikely (rc < 0)) {
+        zmq_assert (errno == EINTR);
+        return -1;
+    }
+    else if (unlikely (rc == 0)) {
+        errno = EAGAIN;
+        return -1;
+    }
+    zmq_assert (rc == 1);
+    zmq_assert (pfd.revents & POLLIN);
+
+#elif defined ZMQ_RCVTIMEO_BASED_ON_SELECT
+
+    fd_set fds;
+    FD_ZERO (&fds);
+    FD_SET (r, &fds);
+    struct timeval timeout;
+    timeout.tv_sec = timeout_ / 1000;
+    timeout.tv_usec = timeout_ % 1000 * 1000;
+    int rc = select (r + 1, &fds, NULL, NULL, &timeout);
+    if (unlikely (rc < 0)) {
+        zmq_assert (errno == EINTR);
+        return -1;
+    }
+    else if (unlikely (rc == 0)) {
+        errno = EAGAIN;
+        return -1;
+    }
+    zmq_assert (rc == 1);
+
+#else
+#error
+#endif
+
+    //  The file descriptor is ready for reading. Extract one command out of it.
+    ssize_t nbytes = ::recv (r, cmd_, sizeof (command_t), 0);
+    if (unlikely (rc < 0 && errno == EINTR))
+        return -1;
+    zmq_assert (nbytes == sizeof (command_t));
+    return 0;
+}
+
+#if defined ZMQ_RCVTIMEO_BASED_ON_SELECT
+#undef ZMQ_RCVTIMEO_BASED_ON_SELECT
+#endif
+#if defined ZMQ_RCVTIMEO_BASED_ON_POLL
+#undef ZMQ_RCVTIMEO_BASED_ON_POLL
+#endif
+
diff --git a/src/mailbox.hpp b/src/mailbox.hpp
index 96bf4eb..1b54aac 100644
--- a/src/mailbox.hpp
+++ b/src/mailbox.hpp
@@ -41,7 +41,7 @@ namespace zmq
 
         fd_t get_fd ();
         void send (const command_t &cmd_);
-        int recv (command_t *cmd_, bool block_);
+        int recv (command_t *cmd_, int timeout_);
         
     private:
 
@@ -52,6 +52,11 @@ namespace zmq
         //  Platform-dependent function to create a socketpair.
         static int make_socketpair (fd_t *r_, fd_t *w_);
 
+        //  Receives a command with the specific timeout.
+        //  This function is not to be used for non-blocking or inifinitely
+        //  blocking recvs.
+        int recv_timeout (command_t *cmd_, int timeout_);
+
         //  Disable copying of mailbox_t object.
         mailbox_t (const mailbox_t&);
         const mailbox_t &operator = (const mailbox_t&);
diff --git a/src/options.cpp b/src/options.cpp
index 29cf023..80ab294 100644
--- a/src/options.cpp
+++ b/src/options.cpp
@@ -39,6 +39,8 @@ zmq::options_t::options_t () :
     backlog (100),
     maxmsgsize (-1),
     filter (1),
+    rcvtimeo (-1),
+    sndtimeo (-1),
     immediate_connect (true)
 {
 }
@@ -182,6 +184,22 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
         filter = *((int*) optval_);
         return 0;
 
+    case ZMQ_RCVTIMEO:
+        if (optvallen_ != sizeof (int)) {
+            errno = EINVAL;
+            return -1;
+        }
+        rcvtimeo = *((int*) optval_);
+        return 0;
+
+    case ZMQ_SNDTIMEO:
+        if (optvallen_ != sizeof (int)) {
+            errno = EINVAL;
+            return -1;
+        }
+        sndtimeo = *((int*) optval_);
+        return 0;
+
     }
 
     errno = EINVAL;
@@ -336,6 +354,24 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
         *optvallen_ = sizeof (int);
         return 0;
 
+    case ZMQ_RCVTIMEO:
+        if (*optvallen_ < sizeof (int)) {
+            errno = EINVAL;
+            return -1;
+        }
+        *((int*) optval_) = rcvtimeo;
+        *optvallen_ = sizeof (int);
+        return 0;
+
+    case ZMQ_SNDTIMEO:
+        if (*optvallen_ < sizeof (int)) {
+            errno = EINVAL;
+            return -1;
+        }
+        *((int*) optval_) = sndtimeo;
+        *optvallen_ = sizeof (int);
+        return 0;
+
     }
 
     errno = EINVAL;
diff --git a/src/options.hpp b/src/options.hpp
index e055919..858ec2e 100644
--- a/src/options.hpp
+++ b/src/options.hpp
@@ -78,6 +78,10 @@ namespace zmq
         //  If 1, (X)SUB socket should filter the messages. If 0, it should not.
         int filter;
 
+        // The timeout for send/recv operations for this socket.
+        int rcvtimeo;
+        int sndtimeo;
+
         //  If true, when connecting, pipes are created immediately without
         //  waiting for the connection to be established. That way the socket
         //  is not aware of the peer's identity, however, it is able to send
diff --git a/src/reaper.cpp b/src/reaper.cpp
index 0295137..4c67b37 100644
--- a/src/reaper.cpp
+++ b/src/reaper.cpp
@@ -61,7 +61,7 @@ void zmq::reaper_t::in_event ()
 
         //  Get the next command. If there is none, exit.
         command_t cmd;
-        int rc = mailbox.recv (&cmd, false);
+        int rc = mailbox.recv (&cmd, 0);
         if (rc != 0 && errno == EINTR)
             continue;
         if (rc != 0 && errno == EAGAIN)
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index 2b1d8af..dc6b5f5 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -288,7 +288,7 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_,
             errno = EINVAL;
             return -1;
         }
-        int rc = process_commands (false, false);
+        int rc = process_commands (0, false);
         if (rc != 0 && (errno == EINTR || errno == ETERM))
             return -1;
         errno_assert (rc == 0);
@@ -475,7 +475,7 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_)
     }
 
     //  Process pending commands, if any.
-    int rc = process_commands (false, true);
+    int rc = process_commands (0, true);
     if (unlikely (rc != 0))
         return -1;
 
@@ -487,20 +487,38 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_)
     rc = xsend (msg_, flags_);
     if (rc == 0)
         return 0;
+    if (unlikely (errno != EAGAIN))
+        return -1;
 
     //  In case of non-blocking send we'll simply propagate
-    //  the error - including EAGAIN - upwards.
-    if (flags_ & ZMQ_DONTWAIT)
+    //  the error - including EAGAIN - up the stack.
+    if (flags_ & ZMQ_DONTWAIT || options.sndtimeo == 0)
         return -1;
 
+    //  Compute the time when the timeout should occur.
+    //  If the timeout is infite, don't care. 
+    clock_t clock ;
+    int timeout = options.sndtimeo;
+    uint64_t end = timeout < 0 ? 0 : (clock.now_ms () + timeout);
+
     //  Oops, we couldn't send the message. Wait for the next
     //  command, process it and try to send the message again.
-    while (rc != 0) {
-        if (errno != EAGAIN)
-            return -1;
-        if (unlikely (process_commands (true, false) != 0))
+    //  If timeout is reached in the meantime, return EAGAIN.
+    while (true) {
+        if (unlikely (process_commands (timeout, false) != 0))
             return -1;
         rc = xsend (msg_, flags_);
+        if (rc == 0)
+            break;
+        if (unlikely (errno != EAGAIN))
+            return -1;
+        if (timeout > 0) {
+            timeout = end - clock.now_ms ();
+            if (timeout <= 0) {
+                errno = EAGAIN;
+                return -1;
+            }
+        }
     }
     return 0;
 }
@@ -521,7 +539,8 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
 
     //  Get the message.
     int rc = xrecv (msg_, flags_);
-    int err = errno;
+    if (unlikely (rc != 0 && errno != EAGAIN))
+        return -1;
 
     //  Once every inbound_poll_rate messages check for signals and process
     //  incoming commands. This happens only if we are not polling altogether
@@ -532,7 +551,7 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
     //  described above) from the one used by 'send'. This is because counting
     //  ticks is more efficient than doing RDTSC all the time.
     if (++ticks == inbound_poll_rate) {
-        if (unlikely (process_commands (false, false) != 0))
+        if (unlikely (process_commands (0, false) != 0))
             return -1;
         ticks = 0;
     }
@@ -545,17 +564,12 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
         return 0;
     }
 
-    //  If we don't have the message, restore the original cause of the problem.
-    errno = err;
-
     //  If the message cannot be fetched immediately, there are two scenarios.
     //  For non-blocking recv, commands are processed in case there's an
     //  activate_reader command already waiting int a command pipe.
     //  If it's not, return EAGAIN.
-    if (flags_ & ZMQ_DONTWAIT) {
-        if (errno != EAGAIN)
-            return -1;
-        if (unlikely (process_commands (false, false) != 0))
+    if (flags_ & ZMQ_DONTWAIT || options.rcvtimeo == 0) {
+        if (unlikely (process_commands (0, false) != 0))
             return -1;
         ticks = 0;
 
@@ -568,17 +582,33 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
         return rc;
     }
 
+    //  Compute the time when the timeout should occur.
+    //  If the timeout is infite, don't care. 
+    clock_t clock ;
+    int timeout = options.rcvtimeo;
+    uint64_t end = timeout < 0 ? 0 : (clock.now_ms () + timeout);
+
     //  In blocking scenario, commands are processed over and over again until
     //  we are able to fetch a message.
     bool block = (ticks != 0);
-    while (rc != 0) {
-        if (errno != EAGAIN)
-            return -1;
-        if (unlikely (process_commands (block, false) != 0))
+    while (true) {
+        if (unlikely (process_commands (block ? timeout : 0, false) != 0))
             return -1;
         rc = xrecv (msg_, flags_);
-        ticks = 0;
+        if (rc == 0) {
+            ticks = 0;
+            break;
+        }
+        if (unlikely (errno != EAGAIN))
+            return -1;
         block = true;
+        if (timeout > 0) {
+            timeout = end - clock.now_ms ();
+            if (timeout <= 0) {
+                errno = EAGAIN;
+                return -1;
+            }
+        }
     }
 
     rcvmore = msg_->flags () & msg_t::more;
@@ -658,18 +688,20 @@ void zmq::socket_base_t::start_reaping (poller_t *poller_)
     check_destroy ();
 }
 
-int zmq::socket_base_t::process_commands (bool block_, bool throttle_)
+int zmq::socket_base_t::process_commands (int timeout_, bool throttle_)
 {
     int rc;
     command_t cmd;
-    if (block_) {
-        rc = mailbox.recv (&cmd, true);
-        if (rc == -1 && errno == EINTR)
-            return -1;
-        errno_assert (rc == 0);
+    if (timeout_ != 0) {
+
+        //  If we are asked to wait, simply ask mailbox to wait.
+        rc = mailbox.recv (&cmd, timeout_);
     }
     else {
 
+        //  If we are asked not to wait, check whether we haven't processed
+        //  commands recently, so that we can throttle the new commands.
+
         //  Get the CPU's tick counter. If 0, the counter is not available.
         uint64_t tsc = zmq::clock_t::rdtsc ();
 
@@ -690,7 +722,7 @@ int zmq::socket_base_t::process_commands (bool block_, bool throttle_)
         }
 
         //  Check whether there are any commands pending for this thread.
-        rc = mailbox.recv (&cmd, false);
+        rc = mailbox.recv (&cmd, 0);
     }
 
     //  Process all the commands available at the moment.
@@ -701,7 +733,7 @@ int zmq::socket_base_t::process_commands (bool block_, bool throttle_)
             return -1;
         errno_assert (rc == 0);
         cmd.destination->process_command (cmd);
-        rc = mailbox.recv (&cmd, false);
+        rc = mailbox.recv (&cmd, 0);
      }
 
     if (ctx_terminated) {
@@ -797,9 +829,11 @@ void zmq::socket_base_t::xhiccuped (pipe_t *pipe_)
 
 void zmq::socket_base_t::in_event ()
 {
-    //  Process any commands from other threads/sockets that may be available
-    //  at the moment. Ultimately, socket will be destroyed.
-    process_commands (false, false);
+    //  This function is invoked only once the socket is running in the context
+    //  of the reaper thread. Process any commands from other threads/sockets
+    //  that may be available at the moment. Ultimately, the socket will
+    //  be destroyed.
+    process_commands (0, false);
     check_destroy ();
 }
 
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index ed5620c..69a8aac 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -160,11 +160,11 @@ namespace zmq
         //  Register the pipe with this socket.
         void attach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_);
 
-        //  Processes commands sent to this socket (if any). If 'block' is
-        //  set to true, returns only after at least one command was processed.
+        //  Processes commands sent to this socket (if any). If timeout is -1,
+        //  returns only after at least one command was processed.
         //  If throttle argument is true, commands are processed at most once
         //  in a predefined time period.
-        int process_commands (bool block_, bool throttle_);
+        int process_commands (int timeout_, bool throttle_);
 
         //  Handlers for incoming commands.
         void process_stop ();
diff --git a/tests/Makefile.am b/tests/Makefile.am
index ebbc46c..b26cccc 100644
--- a/tests/Makefile.am
+++ b/tests/Makefile.am
@@ -5,7 +5,8 @@ noinst_PROGRAMS = test_pair_inproc \
                   test_pair_tcp \
                   test_reqrep_inproc \
                   test_reqrep_tcp \
-                  test_hwm
+                  test_hwm \
+                  test_timeo
 
 if !ON_MINGW
 noinst_PROGRAMS += test_shutdown_stress \
@@ -21,6 +22,8 @@ test_reqrep_tcp_SOURCES = test_reqrep_tcp.cpp testutil.hpp
 
 test_hwm_SOURCES = test_hwm.cpp
 
+test_timeo_SOURCES = test_timeo.cpp
+
 if !ON_MINGW
 test_shutdown_stress_SOURCES = test_shutdown_stress.cpp
 test_pair_ipc_SOURCES = test_pair_ipc.cpp testutil.hpp
diff --git a/tests/test_timeo.cpp b/tests/test_timeo.cpp
new file mode 100644
index 0000000..a8a3fc0
--- /dev/null
+++ b/tests/test_timeo.cpp
@@ -0,0 +1,115 @@
+/*
+    Copyright (c) 2007-2011 iMatix Corporation
+    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+
+    This file is part of 0MQ.
+
+    0MQ is free software; you can redistribute it and/or modify it under
+    the terms of the GNU Lesser General Public License as published by
+    the Free Software Foundation; either version 3 of the License, or
+    (at your option) any later version.
+
+    0MQ is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU Lesser General Public License for more details.
+
+    You should have received a copy of the GNU Lesser General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include <assert.h>
+#include <string.h>
+#include <pthread.h>
+
+#include "../include/zmq.h"
+#include "../include/zmq_utils.h"
+
+extern "C"
+{
+    void *worker(void *ctx)
+    {
+        //  Worker thread connects after delay of 1 second. Then it waits
+        //  for 1 more second, so that async connect has time to succeed.
+        zmq_sleep (1);
+        void *sc = zmq_socket (ctx, ZMQ_PUSH);
+        assert (sc);
+        int rc = zmq_connect (sc, "inproc://timeout_test");
+        assert (rc == 0);
+        zmq_sleep (1);
+        rc = zmq_close (sc);
+        assert (rc == 0);
+        return NULL;
+    }
+}
+
+int main (int argc, char *argv [])
+{
+    void *ctx = zmq_init (1);
+    assert (ctx);
+
+    //  Create a disconnected socket.
+    void *sb = zmq_socket (ctx, ZMQ_PULL);
+    assert (sb);
+    int rc = zmq_bind (sb, "inproc://timeout_test");
+    assert (rc == 0);
+
+    //  Check whether non-blocking recv returns immediately.
+    char buf [] = "12345678ABCDEFGH12345678abcdefgh";
+    rc = zmq_recv (sb, buf, 32, ZMQ_DONTWAIT);
+    assert (rc == -1);
+    assert (zmq_errno() == EAGAIN);
+
+    //  Check whether recv timeout is honoured.
+    int timeout = 500;
+    size_t timeout_size = sizeof timeout;
+    rc = zmq_setsockopt(sb, ZMQ_RCVTIMEO, &timeout, timeout_size);
+    assert (rc == 0);    
+    void *watch = zmq_stopwatch_start ();
+    rc = zmq_recv (sb, buf, 32, 0);
+    assert (rc == -1);
+    assert (zmq_errno () == EAGAIN);
+    unsigned long elapsed = zmq_stopwatch_stop (watch);
+    assert (elapsed > 440000 && elapsed < 550000);
+
+    //  Check whether connection during the wait doesn't distort the timeout.
+    timeout = 2000;
+    rc = zmq_setsockopt(sb, ZMQ_RCVTIMEO, &timeout, timeout_size);
+    assert (rc == 0);
+    pthread_t thread;
+    rc = pthread_create (&thread, NULL, worker, ctx);
+    assert (rc == 0);
+    watch = zmq_stopwatch_start ();
+    rc = zmq_recv (sb, buf, 32, 0);
+    assert (rc == -1);
+    assert (zmq_errno () == EAGAIN);
+    elapsed = zmq_stopwatch_stop (watch);
+    assert (elapsed > 1900000 && elapsed < 2100000);
+    rc = pthread_join (thread, NULL);
+    assert (rc == 0);
+
+    //  Check that timeouts don't break normal message transfer.
+    void *sc = zmq_socket (ctx, ZMQ_PUSH);
+    assert (sc);
+    rc = zmq_setsockopt(sb, ZMQ_RCVTIMEO, &timeout, timeout_size);
+    assert (rc == 0);
+    rc = zmq_setsockopt(sb, ZMQ_SNDTIMEO, &timeout, timeout_size);
+    assert (rc == 0);
+    rc = zmq_connect (sc, "inproc://timeout_test");
+    assert (rc == 0);
+    rc = zmq_send (sc, buf, 32, 0);
+    assert (rc == 32);
+    rc = zmq_recv (sb, buf, 32, 0);
+    assert (rc == 32);
+
+    //  Clean-up.
+    rc = zmq_close (sc);
+    assert (rc == 0);
+    rc = zmq_close (sb);
+    assert (rc == 0);
+    rc = zmq_term (ctx);
+    assert (rc == 0);
+
+    return 0 ;
+}
+
-- 
1.7.0.4

_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev

Reply via email to