Hi Martin,

> One more idea: Even ioctl() implementation can be optimised if mailbox_t
> remembered the actual state of socket (blocking vs. non-blocking) and issued
> ioctl() only if there was a need to change the state.

Probably not that hard and certainly a good thing:  variable lookup is
certainly faster
than any syscalls and since we are dealing with a very obvious case (a
single entry point,
a single socket and only two set of flags), not really hard to
implement.  I would still try to
kept it clear.  At worse, we will just push back the resetting to a later time.

>> So here the patch for the old blocking case behavior.  I would see if
>> I can implement the RCV_TIMEO options now.  Someone else would have to
>> check it on Windows however.
>
> Yes. Not even speaking of platforms such as AIX or HP-UX etc. The code is a
> bit fragile here. You should take care to keep the code as is for platforms
> you cannot test on.

I set a defined ZMQ_HAVE_SO_RCVTIMEO arount the changes.  HPUX is
already excluded
since the operation seems to be a noop on it, as well as Windows
(until the doc proved to be wrong).
AIX seems to support it however.

So, here the patch (again).  Feel free to check it or wait for my
memento patch (should come soon).

Fabien
From abb67daad41f87132424797e9dbde938a46c1f7d Mon Sep 17 00:00:00 2001
From: Fabien Ninoles <[email protected]>
Date: Mon, 23 May 2011 13:33:12 -0400
Subject: [PATCH] Add sockopt ZMQ_RCVTIMEO/ZMQ_SNDTIMEO.

- Add doc and tests.
- Add options and setup.
- Wait using zmq_pool in mailbox_t::recv.
 (Windows version untested).

Signed-off-by: Fabien Ninoles <[email protected]>
---
 .gitignore              |    1 +
 doc/zmq_getsockopt.txt  |   36 ++++++++
 doc/zmq_setsockopt.txt  |   36 ++++++++
 include/zmq.h           |    2 +
 src/ctx.cpp             |    2 +-
 src/io_thread.cpp       |    2 +-
 src/mailbox.cpp         |  218 +++++++++++++++++++++++++++++++---------------
 src/mailbox.hpp         |    6 +-
 src/options.cpp         |   36 ++++++++-
 src/options.hpp         |    4 +
 src/reaper.cpp          |    2 +-
 src/socket_base.cpp     |   53 ++++++++----
 src/socket_base.hpp     |    2 +-
 tests/Makefile.am       |    7 ++-
 tests/test_timeout.cpp  |  128 +++++++++++++++++++++++++++
 tests/test_timeout2.cpp |  135 +++++++++++++++++++++++++++++
 16 files changed, 576 insertions(+), 94 deletions(-)
 create mode 100644 tests/test_timeout.cpp
 create mode 100644 tests/test_timeout2.cpp

diff --git a/.gitignore b/.gitignore
index b309c73..abde4cd 100644
--- a/.gitignore
+++ b/.gitignore
@@ -57,3 +57,4 @@ foreign/openpgm/*
 !foreign/openpgm/Makefile.am
 zeromq-*.tar.gz
 zeromq-*.zip
+tests/test_timeout
diff --git a/doc/zmq_getsockopt.txt b/doc/zmq_getsockopt.txt
index 97b4032..4450544 100644
--- a/doc/zmq_getsockopt.txt
+++ b/doc/zmq_getsockopt.txt
@@ -357,6 +357,42 @@ Default value:: N/A
 Applicable socket types:: all
 
 
+ZMQ_RCVTIMEO: Maximum time before a socket operation returns with EAGAIN
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Retrieve the timeout for recv operation on the socket.  The value is
+interpreted the same way as the timeout argument of _zmq_poll(3)_: If
+the value is `0`, the operation shall return immediately, with a
+EAGAIN error if no events were present. If the value is `-1`, the
+operation shall block indefinitely until an event has occurred on the
+socket. For all other values, the operation will wait that value
+before returning with an EAGAIN error.
+
+[horizontal]
+Option value type:: int
+Option value unit:: milliseconds
+Default value:: -1 (infinite)
+Applicable socket types:: all
+
+
+ZMQ_SNDTIMEO: Maximum time before a socket operation returns with EAGAIN
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Retrieve the timeout for send operation on the socket.  The value is
+interpreted the same way as the timeout argument of _zmq_poll(3)_: If
+the value is `0`, the operation shall return immediately, with a
+EAGAIN error if no events were present. If the value is `-1`, the
+operation shall block indefinitely until an event has occurred on the
+socket. For all other values, the operation will wait that value
+before returning with an EAGAIN error.
+
+[horizontal]
+Option value type:: int
+Option value unit:: milliseconds
+Default value:: -1 (infinite)
+Applicable socket types:: all
+
+
 RETURN VALUE
 ------------
 The _zmq_getsockopt()_ function shall return zero if successful. Otherwise it
diff --git a/doc/zmq_setsockopt.txt b/doc/zmq_setsockopt.txt
index ed3b3a7..a89a7f7 100644
--- a/doc/zmq_setsockopt.txt
+++ b/doc/zmq_setsockopt.txt
@@ -309,6 +309,42 @@ Option value unit:: network hops
 Default value:: 1
 Applicable socket types:: all, when using multicast transports
 
+
+ZMQ_RCVTIMEO: Maximum time before a recv operation returns with EAGAIN
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Sets the timeout for recv operation on the socket.  The value is
+interpreted the same way as the timeout argument of _zmq_poll(3)_: If
+the value is `0`, the operation shall return immediately, with a
+EAGAIN error if no events were present. If the value is `-1`, the
+operation shall block indefinitely until an event has occurred on the
+socket. For all other values, the operation will wait that value
+before returning with an EAGAIN error.
+
+[horizontal]
+Option value type:: int
+Option value unit:: milliseconds
+Default value:: -1 (infinite)
+Applicable socket types:: all
+
+ZMQ_SNDTIMEO: Maximum time before a send operation returns with EAGAIN
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Sets the timeout for send operation on the socket.  The value is
+interpreted the same way as the timeout argument of _zmq_poll(3)_: If
+the value is `0`, the operation shall return immediately, with a
+EAGAIN error if no events were present. If the value is `-1`, the
+operation shall block indefinitely until an event has occurred on the
+socket. For all other values, the operation will wait that value
+before returning with an EAGAIN error.
+
+[horizontal]
+Option value type:: int
+Option value unit:: milliseconds
+Default value:: -1 (infinite)
+Applicable socket types:: all
+
+
 RETURN VALUE
 ------------
 The _zmq_setsockopt()_ function shall return zero if successful. Otherwise it
diff --git a/include/zmq.h b/include/zmq.h
index 40dffd9..5f0fec6 100644
--- a/include/zmq.h
+++ b/include/zmq.h
@@ -180,6 +180,8 @@ ZMQ_EXPORT int zmq_term (void *context);
 #define ZMQ_SNDHWM 23
 #define ZMQ_RCVHWM 24
 #define ZMQ_MULTICAST_HOPS 25
+#define ZMQ_RCVTIMEO 26
+#define ZMQ_SNDTIMEO 27
     
 /*  Send/recv options.                                                        */
 #define ZMQ_DONTWAIT 1
diff --git a/src/ctx.cpp b/src/ctx.cpp
index fb5420d..783bcba 100644
--- a/src/ctx.cpp
+++ b/src/ctx.cpp
@@ -142,7 +142,7 @@ int zmq::ctx_t::terminate ()
 
     //  Wait till reaper thread closes all the sockets.
     command_t cmd;
-    int rc = term_mailbox.recv (&cmd, true);
+    int rc = term_mailbox.recv (&cmd, -1);
     if (rc == -1 && errno == EINTR)
         return -1;
     zmq_assert (rc == 0);
diff --git a/src/io_thread.cpp b/src/io_thread.cpp
index 9678392..c6f3880 100644
--- a/src/io_thread.cpp
+++ b/src/io_thread.cpp
@@ -70,7 +70,7 @@ void zmq::io_thread_t::in_event ()
 
         //  Get the next command. If there is none, exit.
         command_t cmd;
-        int rc = mailbox.recv (&cmd, false);
+        int rc = mailbox.recv (&cmd, 0);
         if (rc != 0 && errno == EINTR)
             continue;
         if (rc != 0 && errno == EAGAIN)
diff --git a/src/mailbox.cpp b/src/mailbox.cpp
index 221396b..d4fa81d 100644
--- a/src/mailbox.cpp
+++ b/src/mailbox.cpp
@@ -36,6 +36,13 @@
 #include <sys/socket.h>
 #endif
 
+#include <pthread.h>
+#include <poller.hpp>
+
+#if defined(SO_RCVTIMEO) && !defined(ZMQ_HAVE_WINDOWS) && !defined(ZMQ_HAVE_HPUX)
+#define ZMQ_HAVE_SO_RCVTIMEO
+#endif
+
 zmq::fd_t zmq::mailbox_t::get_fd ()
 {
     return r;
@@ -79,44 +86,6 @@ void zmq::mailbox_t::send (const command_t &cmd_)
     zmq_assert (nbytes == sizeof (command_t));
 }
 
-int zmq::mailbox_t::recv (command_t *cmd_, bool block_)
-{
-    //  If required, set the reader to blocking mode.
-    if (block_) {
-        unsigned long argp = 0;
-        int rc = ioctlsocket (r, FIONBIO, &argp);
-        wsa_assert (rc != SOCKET_ERROR);
-    }
-
-    //  Attempt to read an entire command. Returns EAGAIN if non-blocking
-    //  and a command is not available. Save value of errno if we wish to pass
-    //  it to caller.
-    int err = 0;
-    int nbytes = ::recv (r, (char *)cmd_, sizeof (command_t), 0);
-    if (nbytes == -1 && WSAGetLastError () == WSAEWOULDBLOCK)
-        err = EAGAIN;
-
-    //  Re-set the reader to non-blocking mode.
-    if (block_) {
-        unsigned long argp = 1;
-        int rc = ioctlsocket (r, FIONBIO, &argp);
-        wsa_assert (rc != SOCKET_ERROR);
-    }
-
-    //  If the recv failed, return with the saved errno.
-    if (err != 0) {
-        errno = err;
-        return -1;
-    }
-
-    //  Sanity check for success.
-    wsa_assert (nbytes != SOCKET_ERROR);
-
-    //  Check whether we haven't got half of command.
-    zmq_assert (nbytes == sizeof (command_t));
-
-    return 0;
-}
 
 #else
 
@@ -139,8 +108,8 @@ zmq::mailbox_t::mailbox_t ()
     rc = fcntl (w, F_SETFL, flags | O_NONBLOCK);
     errno_assert (rc == 0);
 
-#ifndef MSG_DONTWAIT
     //  Set the reader to non-blocking mode.
+#ifndef MSG_DONTWAIT
     flags = fcntl (r, F_GETFL, 0);
     errno_assert (flags >= 0);
     rc = fcntl (r, F_SETFL, flags | O_NONBLOCK);
@@ -194,52 +163,161 @@ void zmq::mailbox_t::send (const command_t &cmd_)
     zmq_assert (nbytes == sizeof (command_t));
 }
 
-int zmq::mailbox_t::recv (command_t *cmd_, bool block_)
+#endif
+
+void zmq::mailbox_t::set_rcvtimeo(int timeout_)
 {
-#ifdef MSG_DONTWAIT
+#ifdef ZMQ_HAVE_SO_RCVTIMEO
+    struct timeval rcvtimeout;
+    if (timeout_ <= 0) {
+        rcvtimeout.tv_sec = 0;
+        rcvtimeout.tv_usec = 0;
+    }
+    else {
+        rcvtimeout.tv_sec = timeout_ / 1000;
+        rcvtimeout.tv_usec = (timeout_ % 1000) * 1000;
+    }
+    rcvtimeout.tv_sec = timeout_ / 1000;
+    int rc = setsockopt(r, SOL_SOCKET, SO_RCVTIMEO, &rcvtimeout, sizeof (rcvtimeout));
+    errno_assert (rc != -1);
+#else
+    // Not supported
+    errno = ENOTSUP;
+    errno_assert (false);
+#endif
+}
 
-    //  Attempt to read an entire command. Returns EAGAIN if non-blocking
-    //  mode is requested and a command is not available.
-    ssize_t nbytes = ::recv (r, cmd_, sizeof (command_t),
-        block_ ? 0 : MSG_DONTWAIT);
-    if (nbytes == -1 && (errno == EAGAIN || errno == EINTR))
-        return -1;
+int zmq::mailbox_t::recv (command_t *cmd_, int timeout_)
+{
+#ifdef ZMQ_HAVE_SO_RCVTIMEO
+    if (timeout_ != 0) {
+        // Used the blocking version if we support RCV_TIMEOUT.
+        set_rcvtimeo(timeout_);
+        return wait_recv(cmd_);
+    }
 #else
+    if (timeout_ == -1) {
+        // optimized version of recv for the special case of infinite
+        // timeout_ == -1.
+        return wait_recv(cmd_);
+    }
+#endif
 
-    //  If required, set the reader to blocking mode.
-    if (block_) {
-        int flags = fcntl (r, F_GETFL, 0);
-        errno_assert (flags >= 0);
-        int rc = fcntl (r, F_SETFL, flags & ~O_NONBLOCK);
-        errno_assert (rc == 0);
+#if defined(MSG_DONTWAIT)
+    int flags = MSG_DONTWAIT;
+#else
+    int flags = 0;
+#endif
+
+    ssize_t nbytes = ::recv (r, cmd_, sizeof (command_t), flags);
+
+#ifdef ZMQ_HAVE_WINDOWS
+    if (nbytes != SOCKET_ERROR) {
+#else
+    if (nbytes != -1) {
+#endif
+        //  Sanity check for success.
+        zmq_assert (nbytes == sizeof (command_t));
+
+        return 0;
     }
 
-    //  Attempt to read an entire command. Returns EAGAIN if non-blocking
-    //  and a command is not available. Save value of errno if we wish to pass
-    //  it to caller.
-    int err = 0;
-    ssize_t nbytes = ::recv (r, cmd_, sizeof (command_t), 0);
-    if (nbytes == -1 && (errno == EAGAIN || errno == EINTR))  
-        err = errno;
-
-    //  Re-set the reader to non-blocking mode.
-    if (block_) {
-        int flags = fcntl (r, F_GETFL, 0);
-        errno_assert (flags >= 0);
-        int rc = fcntl (r, F_SETFL, flags | O_NONBLOCK);
-        errno_assert (rc == 0);
+    if (timeout_ == 0) {
+#ifdef ZMQ_HAVE_WINDOWS
+        // Winsock error conversion
+        if (WSAGetLastError () == WSAEWOULDBLOCK)
+            errno = EAGAIN;
+#endif
+        return -1;
     }
 
-    //  If the recv failed, return with the saved errno if set.
+    if (timeout_ != -1) {
+        // Elsewhere, poll
+
+        zmq_pollitem_t item;
+        memset(&item, 0, sizeof item);
+        item.fd = r;
+        item.events = ZMQ_POLLIN;
+
+        int rc = zmq_poll (&item, 1, timeout_);
+        if (rc == 0)
+            errno = EAGAIN;
+        if (rc <= 0)
+            return -1;
+    
+        zmq_assert (rc == 1);
+        zmq_assert (item.revents == ZMQ_POLLIN);
+    }
+
+    nbytes = ::recv (r, cmd_, sizeof (command_t), flags);
+
+    //  Sanity check for success.
+#ifdef ZMQ_HAVE_WINDOWS
+    wsa_assert (nbytes != SOCKET_ERROR);
+#else
+    errno_assert (nbytes != -1);
+#endif
+
+    //  Check whether we haven't got half of command.
+    zmq_assert (nbytes == sizeof (command_t));
+
+    return 0;
+}
+
+int zmq::mailbox_t::wait_recv(command_t *cmd_)
+{
+#ifndef MSG_DONTWAIT
+    // set the socket in blocking mode
+#ifdef ZMQ_HAVE_WINDOWS
+    unsigned long argp = 0;
+    int rc = ioctlsocket (r, FIONBIO, &argp);
+    wsa_assert (rc != SOCKET_ERROR);
+#else
+    int flags = fcntl (r, F_GETFL, 0);
+    errno_assert (flags >= 0);
+    int rc = fcntl (r, F_SETFL, flags & ~O_NONBLOCK);
+    errno_assert (rc == 0);
+#endif
+#endif
+
+    // Attempt to read an entire command.  Save value of errno if we
+    // wish to pass it to caller.
+    int nbytes = ::recv (r, (char *)cmd_, sizeof (command_t), 0);
+
+#ifndef MSG_DONTWAIT
+    // saved the errno value
+    int err = errno;
+
+    // reset the socket in non-blocking mode
+#ifdef ZMQ_HAVE_WINDOWS
+    if (nbytes == -1 && WSAGetLastError () == WSAEWOULDBLOCK)
+        err = EAGAIN;
+    argp = 1;
+    rc = ioctlsocket (r, FIONBIO, &argp);
+    wsa_assert (rc != SOCKET_ERROR);
+#else
+    int flags = fcntl (r, F_GETFL, 0);
+    errno_assert (flags >= 0);
+    int rc = fcntl (r, F_SETFL, flags | O_NONBLOCK);
+    errno_assert (rc == 0);
+#endif
+
+    //  If the recv failed, return with the saved errno.
     if (err != 0) {
         errno = err;
         return -1;
     }
-
+#elif defined(ZMQ_HAVE_SO_RCVTIMEO)
+    if (nbytes == -1)
+        return -1;
 #endif
 
     //  Sanity check for success.
+#ifdef ZMQ_HAVE_WINDOWS
+    wsa_assert (nbytes != SOCKET_ERROR);
+#else
     errno_assert (nbytes != -1);
+#endif
 
     //  Check whether we haven't got half of command.
     zmq_assert (nbytes == sizeof (command_t));
@@ -247,8 +325,6 @@ int zmq::mailbox_t::recv (command_t *cmd_, bool block_)
     return 0;
 }
 
-#endif
-
 int zmq::mailbox_t::make_socketpair (fd_t *r_, fd_t *w_)
 {
 #if defined ZMQ_HAVE_WINDOWS
diff --git a/src/mailbox.hpp b/src/mailbox.hpp
index 96bf4eb..7f05dd5 100644
--- a/src/mailbox.hpp
+++ b/src/mailbox.hpp
@@ -41,9 +41,13 @@ namespace zmq
 
         fd_t get_fd ();
         void send (const command_t &cmd_);
-        int recv (command_t *cmd_, bool block_);
+        int recv (command_t *cmd_, int timeout_);
         
     private:
+        // options only available
+        void set_rcvtimeo(int timeout_);
+        // common case, for optimizing the default case.
+        int wait_recv(command_t *cmd_);
 
         //  Write & read end of the socketpair.
         fd_t w;
diff --git a/src/options.cpp b/src/options.cpp
index 897e0f5..9a03849 100644
--- a/src/options.cpp
+++ b/src/options.cpp
@@ -40,7 +40,9 @@ zmq::options_t::options_t () :
     maxmsgsize (-1),
     requires_in (false),
     requires_out (false),
-    immediate_connect (true)
+    immediate_connect (true),
+    recv_timeout (-1),
+    send_timeout (-1)
 {
 }
 
@@ -174,6 +176,21 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
         multicast_hops = *((int*) optval_);
         return 0;
 
+    case ZMQ_RCVTIMEO:
+        if (optvallen_ != sizeof (int)) {
+            errno = EINVAL;
+            return -1;
+        }
+        recv_timeout = *((int*) optval_);
+        return 0;
+
+    case ZMQ_SNDTIMEO:
+        if (optvallen_ != sizeof (int)) {
+            errno = EINVAL;
+            return -1;
+        }
+        send_timeout = *((int*) optval_);
+        return 0;
     }
 
     errno = EINVAL;
@@ -319,6 +336,23 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
         *optvallen_ = sizeof (int);
         return 0;
 
+    case ZMQ_RCVTIMEO:
+        if (*optvallen_ < sizeof (int)) {
+            errno = EINVAL;
+            return -1;
+        }
+        *((int*) optval_) = recv_timeout;
+        *optvallen_ = sizeof (int);
+        return 0;
+
+    case ZMQ_SNDTIMEO:
+        if (*optvallen_ < sizeof (int)) {
+            errno = EINVAL;
+            return -1;
+        }
+        *((int*) optval_) = send_timeout;
+        *optvallen_ = sizeof (int);
+        return 0;
     }
 
     errno = EINVAL;
diff --git a/src/options.hpp b/src/options.hpp
index 53d0197..d6f6fd4 100644
--- a/src/options.hpp
+++ b/src/options.hpp
@@ -85,6 +85,10 @@ namespace zmq
         //  is not aware of the peer's identity, however, it is able to send
         //  messages straight away.
         bool immediate_connect;
+
+        // The default timeout to wait on send/recv operation for this socket.
+        int recv_timeout;
+        int send_timeout;
     };
 
 }
diff --git a/src/reaper.cpp b/src/reaper.cpp
index d3ebbba..f94f7c1 100644
--- a/src/reaper.cpp
+++ b/src/reaper.cpp
@@ -61,7 +61,7 @@ void zmq::reaper_t::in_event ()
 
         //  Get the next command. If there is none, exit.
         command_t cmd;
-        int rc = mailbox.recv (&cmd, false);
+        int rc = mailbox.recv (&cmd, 0);
         if (rc != 0 && errno == EINTR)
             continue;
         if (rc != 0 && errno == EAGAIN)
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index 3e104a8..e852c42 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -276,7 +276,7 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_,
             errno = EINVAL;
             return -1;
         }
-        int rc = process_commands (false, false);
+        int rc = process_commands (0, false);
         if (rc != 0 && (errno == EINTR || errno == ETERM))
             return -1;
         errno_assert (rc == 0);
@@ -478,7 +478,7 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_)
     }
 
     //  Process pending commands, if any.
-    int rc = process_commands (false, true);
+    int rc = process_commands (0, true);
     if (unlikely (rc != 0))
         return -1;
 
@@ -498,12 +498,22 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_)
 
     //  Oops, we couldn't send the message. Wait for the next
     //  command, process it and try to send the message again.
+    int timeout = options.send_timeout;
+    clock_t clock;
+    uint64_t last = clock.now_ms ();
     while (rc != 0) {
         if (errno != EAGAIN)
             return -1;
-        if (unlikely (process_commands (true, false) != 0))
+        if (unlikely (process_commands (timeout, false) != 0))
             return -1;
         rc = xsend (msg_, flags_);
+        if (rc != 0 && timeout != -1 && errno == EAGAIN) {
+            uint64_t now = clock.now_ms ();
+            if (now >= last + timeout)
+                return -1;
+            timeout -= now-last;
+            last = now;
+        }
     }
     return 0;
 }
@@ -535,7 +545,7 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
     //  described above) from the one used by 'send'. This is because counting
     //  ticks is more efficient than doing RDTSC all the time.
     if (++ticks == inbound_poll_rate) {
-        if (unlikely (process_commands (false, false) != 0))
+        if (unlikely (process_commands (0, false) != 0))
             return -1;
         ticks = 0;
     }
@@ -553,12 +563,12 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
 
     //  If the message cannot be fetched immediately, there are two scenarios.
     //  For non-blocking recv, commands are processed in case there's an
-    //  activate_reader command already waiting int a command pipe.
+    //  activate_reader command already waiting in a command pipe.
     //  If it's not, return EAGAIN.
     if (flags_ & ZMQ_DONTWAIT) {
         if (errno != EAGAIN)
             return -1;
-        if (unlikely (process_commands (false, false) != 0))
+        if (unlikely (process_commands (0, false) != 0))
             return -1;
         ticks = 0;
 
@@ -573,15 +583,25 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
 
     //  In blocking scenario, commands are processed over and over again until
     //  we are able to fetch a message.
-    bool block = (ticks != 0);
+    int timeout = (ticks != 0) ? options.recv_timeout : 0;
+    clock_t clock;
+    uint64_t last = clock.now_ms();
     while (rc != 0) {
         if (errno != EAGAIN)
             return -1;
-        if (unlikely (process_commands (block, false) != 0))
+        if (unlikely (process_commands (timeout, false) != 0))
             return -1;
         rc = xrecv (msg_, flags_);
+        if (ticks != 0)
+            timeout = options.recv_timeout;
         ticks = 0;
-        block = true;
+        if (rc != 0 && timeout != -1 && errno == EAGAIN) {
+            uint64_t now = clock.now_ms();
+            if (now >= last + timeout)
+                return -1;
+            timeout -= now-last;
+            last = now;
+        }
     }
 
     rcvmore = msg_->flags () & msg_t::more;
@@ -655,17 +675,18 @@ void zmq::socket_base_t::start_reaping (poller_t *poller_)
     poller->set_pollin (handle);
 }
 
-int zmq::socket_base_t::process_commands (bool block_, bool throttle_)
+int zmq::socket_base_t::process_commands (int timeout_, bool throttle_)
 {
     int rc;
     command_t cmd;
-    if (block_) {
-        rc = mailbox.recv (&cmd, true);
-        if (rc == -1 && errno == EINTR)
+    if (timeout_ != 0) {
+        rc = mailbox.recv (&cmd, timeout_);
+        if (rc == -1 && (errno == EINTR || errno == EAGAIN))
             return -1;
         errno_assert (rc == 0);
     }
     else {
+        zmq_assert (timeout_ == 0);
 
         //  Get the CPU's tick counter. If 0, the counter is not available.
         uint64_t tsc = zmq::clock_t::rdtsc ();
@@ -687,7 +708,7 @@ int zmq::socket_base_t::process_commands (bool block_, bool throttle_)
         }
 
         //  Check whether there are any commands pending for this thread.
-        rc = mailbox.recv (&cmd, false);
+        rc = mailbox.recv (&cmd, 0);
     }
 
     //  Process all the commands available at the moment.
@@ -698,7 +719,7 @@ int zmq::socket_base_t::process_commands (bool block_, bool throttle_)
             return -1;
         errno_assert (rc == 0);
         cmd.destination->process_command (cmd);
-        rc = mailbox.recv (&cmd, false);
+        rc = mailbox.recv (&cmd, 0);
      }
 
     if (ctx_terminated) {
@@ -777,7 +798,7 @@ void zmq::socket_base_t::in_event ()
 {
     //  Process any commands from other threads/sockets that may be available
     //  at the moment. Ultimately, socket will be destroyed.
-    process_commands (false, false);
+    process_commands (0, false);
     check_destroy ();
 }
 
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index 0a5c574..82745e2 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -164,7 +164,7 @@ namespace zmq
         //  set to true, returns only after at least one command was processed.
         //  If throttle argument is true, commands are processed at most once
         //  in a predefined time period.
-        int process_commands (bool block_, bool throttle_);
+        int process_commands (int timeout_, bool throttle_);
 
         //  Handlers for incoming commands.
         void process_stop ();
diff --git a/tests/Makefile.am b/tests/Makefile.am
index ebbc46c..36d6ee6 100644
--- a/tests/Makefile.am
+++ b/tests/Makefile.am
@@ -5,7 +5,9 @@ noinst_PROGRAMS = test_pair_inproc \
                   test_pair_tcp \
                   test_reqrep_inproc \
                   test_reqrep_tcp \
-                  test_hwm
+                  test_hwm \
+                  test_timeout \
+                  test_timeout2
 
 if !ON_MINGW
 noinst_PROGRAMS += test_shutdown_stress \
@@ -21,6 +23,9 @@ test_reqrep_tcp_SOURCES = test_reqrep_tcp.cpp testutil.hpp
 
 test_hwm_SOURCES = test_hwm.cpp
 
+test_timeout_SOURCES = test_timeout.cpp
+test_timeout2_SOURCES = test_timeout2.cpp
+
 if !ON_MINGW
 test_shutdown_stress_SOURCES = test_shutdown_stress.cpp
 test_pair_ipc_SOURCES = test_pair_ipc.cpp testutil.hpp
diff --git a/tests/test_timeout.cpp b/tests/test_timeout.cpp
new file mode 100644
index 0000000..adf633d
--- /dev/null
+++ b/tests/test_timeout.cpp
@@ -0,0 +1,128 @@
+/*
+    Copyright (c) 2007-2011 iMatix Corporation
+    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+
+    This file is part of 0MQ.
+
+    0MQ is free software; you can redistribute it and/or modify it under
+    the terms of the GNU Lesser General Public License as published by
+    the Free Software Foundation; either version 3 of the License, or
+    (at your option) any later version.
+
+    0MQ is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU Lesser General Public License for more details.
+
+    You should have received a copy of the GNU Lesser General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include <assert.h>
+#include <string.h>
+
+#include "../include/zmq.h"
+#include <stdio.h>
+#include <time.h>
+#include <sys/types.h>
+// Deprecated but portable method
+#include <sys/timeb.h>
+#define TIMEOUT 500
+
+int elapsed_ms(struct timeb& start, struct timeb& end) {
+    return (end.time - start.time) * 1000 + (end.millitm - start.millitm);
+}
+
+int main (int argc, char *argv []) {
+
+    void *ctx = zmq_init (1);
+    assert (ctx);
+
+
+    const char* transport = "inproc://timeout_test";
+
+    void *sb = zmq_socket (ctx, ZMQ_PAIR);
+    assert (sb);
+    int rc = zmq_bind (sb, transport);
+    assert (rc == 0);
+
+    // Recv part
+
+    char buf [32];
+    rc = zmq_recv (sb, buf, 32, ZMQ_DONTWAIT);
+    assert (rc == -1);
+    assert (zmq_errno() == EAGAIN);
+
+    int timeout = TIMEOUT;
+    size_t timeout_size = sizeof timeout;
+    rc = zmq_setsockopt(sb, ZMQ_RCVTIMEO, &timeout, timeout_size);
+    assert (rc == 0);
+
+    timeout = -1;
+    rc = zmq_getsockopt(sb, ZMQ_RCVTIMEO, &timeout, &timeout_size);
+    assert (rc == 0);
+    assert (timeout_size == sizeof timeout);
+    assert (timeout == TIMEOUT);
+    
+    struct timeb start;
+    ftime (&start);
+    rc = zmq_recv (sb, buf, 32, 0);
+    struct timeb end;
+    ftime (&end);
+    assert (rc == -1);
+    assert (zmq_errno () == EAGAIN);
+    int elapsed = elapsed_ms(start, end);
+    assert (TIMEOUT*0.99 <= elapsed);
+    assert (elapsed <= TIMEOUT*1.01);
+
+    // Send part
+
+    rc = zmq_send (sb, buf, 32, ZMQ_DONTWAIT);
+    assert (rc == -1);
+    assert (zmq_errno() == EAGAIN);
+
+    rc = zmq_setsockopt(sb, ZMQ_SNDTIMEO, &timeout, timeout_size);
+    assert (rc == 0);
+
+    timeout = -1;
+    rc = zmq_getsockopt(sb, ZMQ_SNDTIMEO, &timeout, &timeout_size);
+    assert (rc == 0);
+    assert (timeout_size == sizeof timeout);
+    assert (timeout == TIMEOUT);
+    
+    ftime (&start);
+    rc = zmq_send (sb, buf, 32, 0);
+    ftime (&end);
+    assert (rc == -1);
+    assert (zmq_errno () == EAGAIN);
+    elapsed = elapsed_ms(start, end);
+    assert (TIMEOUT*0.99 <= elapsed);
+    assert (elapsed <= TIMEOUT*1.01);
+
+    // Check everything is good otherwise
+
+    void *sc = zmq_socket (ctx, ZMQ_PAIR);
+    assert (sc);
+    rc = zmq_connect (sc, transport);
+    assert (rc == 0);
+
+    const char *content = "12345678ABCDEFGH12345678abcdefgh";
+
+    rc = zmq_send (sc, content, 32, 0);
+    assert (rc == 32);
+    
+    rc = zmq_recv (sb, buf, 32, 0);
+    assert (rc == 32);
+    assert (memcmp (buf, content, 32) == 0);    
+
+    rc = zmq_close (sc);
+    assert (rc == 0);
+
+    rc = zmq_close (sb);
+    assert (rc == 0);
+
+    rc = zmq_term (ctx);
+    assert (rc == 0);
+
+    return 0 ;
+}
diff --git a/tests/test_timeout2.cpp b/tests/test_timeout2.cpp
new file mode 100644
index 0000000..fe53018
--- /dev/null
+++ b/tests/test_timeout2.cpp
@@ -0,0 +1,135 @@
+/*
+    Copyright (c) 2007-2011 iMatix Corporation
+    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+
+    This file is part of 0MQ.
+
+    0MQ is free software; you can redistribute it and/or modify it under
+    the terms of the GNU Lesser General Public License as published by
+    the Free Software Foundation; either version 3 of the License, or
+    (at your option) any later version.
+
+    0MQ is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU Lesser General Public License for more details.
+
+    You should have received a copy of the GNU Lesser General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include <assert.h>
+#include <string.h>
+
+#include "../include/zmq.h"
+#include "../include/zmq_utils.h"
+#include <stdio.h>
+#include <time.h>
+#include <sys/types.h>
+// Deprecated but portable method
+#include <sys/timeb.h>
+#include <pthread.h>
+
+#define TIMEOUT 2000
+
+const char* transport = "inproc://timeout2";
+
+int elapsed_ms(struct timeb& start, struct timeb& end) {
+    return (end.time - start.time) * 1000 + (end.millitm - start.millitm);
+}
+
+extern "C" {
+    void* worker(void *ctx) {
+
+        void *sb = zmq_socket (ctx, ZMQ_PAIR);
+        assert (sb);
+
+        int timeout = TIMEOUT;
+        int rc = zmq_setsockopt(sb, ZMQ_RCVTIMEO, &timeout, sizeof (timeout));
+        assert (rc == 0);
+        rc = zmq_setsockopt(sb, ZMQ_SNDTIMEO, &timeout, sizeof (timeout));
+        assert (rc == 0);
+
+        int hwm = 1;
+        rc = zmq_setsockopt(sb, ZMQ_SNDHWM, &hwm, sizeof (hwm));
+        assert (rc == 0);
+
+        rc = zmq_bind (sb, transport);
+        assert (rc == 0);
+
+        char buf [32];
+        struct timeb start;
+        struct timeb end;
+
+        ftime (&start);
+        rc = zmq_recv (sb, buf, 32, 0);
+        ftime (&end);
+        assert (rc == -1);
+        assert (zmq_errno () == EAGAIN);
+        int elapsed = elapsed_ms(start, end);
+        assert (TIMEOUT*0.99 <= elapsed);
+        assert (elapsed <= TIMEOUT*1.01);
+
+        const char *content = "12345678ABCDEFGH12345678abcdefgh";
+        do {
+            rc = zmq_send(sb, content, 32, ZMQ_DONTWAIT);
+        } while (rc == 32);
+        assert (rc == -1);
+        assert (errno == EAGAIN);
+
+        ftime (&start);
+        rc = zmq_send (sb, content, 32, 0);
+        ftime (&end);
+        assert (rc == -1);
+        assert (zmq_errno () == EAGAIN);
+        elapsed = elapsed_ms(start, end);
+        assert (TIMEOUT*0.99 <= elapsed);
+        assert (elapsed <= TIMEOUT*1.01);
+
+        rc = zmq_close (sb);
+        assert (rc == 0);
+
+        return NULL;
+    }
+}
+
+int main (int argc, char *argv []) {
+
+    void *ctx = zmq_init (1);
+    assert (ctx);
+
+    pthread_t thread;
+    int rc = pthread_create (&thread, NULL, worker, ctx);
+    assert (rc == 0);
+
+    // let it go a little bit
+    zmq_sleep (1);
+
+    // Connecting to the transport will send a message to the mailbox
+    // of the subscriber but shouldn't the modified the timeout time.
+    void *sc = zmq_socket (ctx, ZMQ_PAIR);
+    assert (sc);
+    // set hwm to 1 for zmq_send test.
+    int hwm = 1;
+    rc = zmq_setsockopt (sc, ZMQ_RCVHWM, &hwm, sizeof (hwm));
+    assert (rc == 0);
+    rc = zmq_connect (sc, transport);
+    assert (rc == 0);
+
+    // sleep again...
+    zmq_sleep (2);
+
+    // the worker should now be in blocking send.
+    // create a second message by closing the socket.
+    rc = zmq_close (sc);
+    assert (rc == 0);
+
+    rc = pthread_join (thread, NULL);
+    assert (rc == 0);
+
+
+    rc = zmq_term (ctx);
+    assert (rc == 0);
+
+    return 0 ;
+}
-- 
1.7.5.3

_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev

Reply via email to