Here is it ! :)

2011/6/12 Martin Sustrik <[email protected]>:
> On 06/12/2011 07:32 PM, Fabien Niñoles wrote:
>
>> So, here the patch (again).  Feel free to check it or wait for my
>> memento patch (should come soon).
>
> Ok. I'll rather wait for the final version.
>
> Martin
>
From 2ea334c73e73746bb2c88852f85fcb15fdcc0815 Mon Sep 17 00:00:00 2001
From: Fabien Ninoles <[email protected]>
Date: Mon, 23 May 2011 13:33:12 -0400
Subject: [PATCH] Add sockopt ZMQ_RCVTIMEO/ZMQ_SNDTIMEO.

- Add doc and tests.
- Add options and setup.
- Wait using zmq_pool in mailbox_t::recv.
 (Windows version untested).

Signed-off-by: Fabien Ninoles <[email protected]>
---
 .gitignore              |    1 +
 doc/zmq_getsockopt.txt  |   36 +++++++
 doc/zmq_setsockopt.txt  |   36 +++++++
 include/zmq.h           |    2 +
 src/ctx.cpp             |    2 +-
 src/io_thread.cpp       |    2 +-
 src/mailbox.cpp         |  242 ++++++++++++++++++++++++++++++++---------------
 src/mailbox.hpp         |    8 ++-
 src/options.cpp         |   36 +++++++-
 src/options.hpp         |    4 +
 src/reaper.cpp          |    2 +-
 src/socket_base.cpp     |   53 +++++++---
 src/socket_base.hpp     |    2 +-
 tests/Makefile.am       |    7 +-
 tests/test_timeout.cpp  |  128 +++++++++++++++++++++++++
 tests/test_timeout2.cpp |  135 ++++++++++++++++++++++++++
 16 files changed, 596 insertions(+), 100 deletions(-)
 create mode 100644 tests/test_timeout.cpp
 create mode 100644 tests/test_timeout2.cpp

diff --git a/.gitignore b/.gitignore
index b309c73..abde4cd 100644
--- a/.gitignore
+++ b/.gitignore
@@ -57,3 +57,4 @@ foreign/openpgm/*
 !foreign/openpgm/Makefile.am
 zeromq-*.tar.gz
 zeromq-*.zip
+tests/test_timeout
diff --git a/doc/zmq_getsockopt.txt b/doc/zmq_getsockopt.txt
index 97b4032..4450544 100644
--- a/doc/zmq_getsockopt.txt
+++ b/doc/zmq_getsockopt.txt
@@ -357,6 +357,42 @@ Default value:: N/A
 Applicable socket types:: all
 
 
+ZMQ_RCVTIMEO: Maximum time before a socket operation returns with EAGAIN
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Retrieve the timeout for recv operation on the socket.  The value is
+interpreted the same way as the timeout argument of _zmq_poll(3)_: If
+the value is `0`, the operation shall return immediately, with a
+EAGAIN error if no events were present. If the value is `-1`, the
+operation shall block indefinitely until an event has occurred on the
+socket. For all other values, the operation will wait that value
+before returning with an EAGAIN error.
+
+[horizontal]
+Option value type:: int
+Option value unit:: milliseconds
+Default value:: -1 (infinite)
+Applicable socket types:: all
+
+
+ZMQ_SNDTIMEO: Maximum time before a socket operation returns with EAGAIN
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Retrieve the timeout for send operation on the socket.  The value is
+interpreted the same way as the timeout argument of _zmq_poll(3)_: If
+the value is `0`, the operation shall return immediately, with a
+EAGAIN error if no events were present. If the value is `-1`, the
+operation shall block indefinitely until an event has occurred on the
+socket. For all other values, the operation will wait that value
+before returning with an EAGAIN error.
+
+[horizontal]
+Option value type:: int
+Option value unit:: milliseconds
+Default value:: -1 (infinite)
+Applicable socket types:: all
+
+
 RETURN VALUE
 ------------
 The _zmq_getsockopt()_ function shall return zero if successful. Otherwise it
diff --git a/doc/zmq_setsockopt.txt b/doc/zmq_setsockopt.txt
index ed3b3a7..a89a7f7 100644
--- a/doc/zmq_setsockopt.txt
+++ b/doc/zmq_setsockopt.txt
@@ -309,6 +309,42 @@ Option value unit:: network hops
 Default value:: 1
 Applicable socket types:: all, when using multicast transports
 
+
+ZMQ_RCVTIMEO: Maximum time before a recv operation returns with EAGAIN
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Sets the timeout for recv operation on the socket.  The value is
+interpreted the same way as the timeout argument of _zmq_poll(3)_: If
+the value is `0`, the operation shall return immediately, with a
+EAGAIN error if no events were present. If the value is `-1`, the
+operation shall block indefinitely until an event has occurred on the
+socket. For all other values, the operation will wait that value
+before returning with an EAGAIN error.
+
+[horizontal]
+Option value type:: int
+Option value unit:: milliseconds
+Default value:: -1 (infinite)
+Applicable socket types:: all
+
+ZMQ_SNDTIMEO: Maximum time before a send operation returns with EAGAIN
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Sets the timeout for send operation on the socket.  The value is
+interpreted the same way as the timeout argument of _zmq_poll(3)_: If
+the value is `0`, the operation shall return immediately, with a
+EAGAIN error if no events were present. If the value is `-1`, the
+operation shall block indefinitely until an event has occurred on the
+socket. For all other values, the operation will wait that value
+before returning with an EAGAIN error.
+
+[horizontal]
+Option value type:: int
+Option value unit:: milliseconds
+Default value:: -1 (infinite)
+Applicable socket types:: all
+
+
 RETURN VALUE
 ------------
 The _zmq_setsockopt()_ function shall return zero if successful. Otherwise it
diff --git a/include/zmq.h b/include/zmq.h
index 40dffd9..5f0fec6 100644
--- a/include/zmq.h
+++ b/include/zmq.h
@@ -180,6 +180,8 @@ ZMQ_EXPORT int zmq_term (void *context);
 #define ZMQ_SNDHWM 23
 #define ZMQ_RCVHWM 24
 #define ZMQ_MULTICAST_HOPS 25
+#define ZMQ_RCVTIMEO 26
+#define ZMQ_SNDTIMEO 27
     
 /*  Send/recv options.                                                        */
 #define ZMQ_DONTWAIT 1
diff --git a/src/ctx.cpp b/src/ctx.cpp
index fb5420d..783bcba 100644
--- a/src/ctx.cpp
+++ b/src/ctx.cpp
@@ -142,7 +142,7 @@ int zmq::ctx_t::terminate ()
 
     //  Wait till reaper thread closes all the sockets.
     command_t cmd;
-    int rc = term_mailbox.recv (&cmd, true);
+    int rc = term_mailbox.recv (&cmd, -1);
     if (rc == -1 && errno == EINTR)
         return -1;
     zmq_assert (rc == 0);
diff --git a/src/io_thread.cpp b/src/io_thread.cpp
index 9678392..c6f3880 100644
--- a/src/io_thread.cpp
+++ b/src/io_thread.cpp
@@ -70,7 +70,7 @@ void zmq::io_thread_t::in_event ()
 
         //  Get the next command. If there is none, exit.
         command_t cmd;
-        int rc = mailbox.recv (&cmd, false);
+        int rc = mailbox.recv (&cmd, 0);
         if (rc != 0 && errno == EINTR)
             continue;
         if (rc != 0 && errno == EAGAIN)
diff --git a/src/mailbox.cpp b/src/mailbox.cpp
index 221396b..efafafa 100644
--- a/src/mailbox.cpp
+++ b/src/mailbox.cpp
@@ -36,6 +36,13 @@
 #include <sys/socket.h>
 #endif
 
+#include <pthread.h>
+#include <poller.hpp>
+
+#if defined(SO_RCVTIMEO) && !defined(ZMQ_HAVE_WINDOWS) && !defined(ZMQ_HAVE_HPUX)
+#define ZMQ_HAVE_SO_RCVTIMEO
+#endif
+
 zmq::fd_t zmq::mailbox_t::get_fd ()
 {
     return r;
@@ -55,9 +62,10 @@ zmq::mailbox_t::mailbox_t ()
     wsa_assert (rc != SOCKET_ERROR);
 
     //  Set the reader to non-blocking mode.
-    argp = 1;
-    rc = ioctlsocket (r, FIONBIO, &argp);
-    wsa_assert (rc != SOCKET_ERROR);
+    r_block = true;
+    set_ioblock(false);
+    
+    r_timeout = -1;
 }
 
 zmq::mailbox_t::~mailbox_t ()
@@ -79,44 +87,6 @@ void zmq::mailbox_t::send (const command_t &cmd_)
     zmq_assert (nbytes == sizeof (command_t));
 }
 
-int zmq::mailbox_t::recv (command_t *cmd_, bool block_)
-{
-    //  If required, set the reader to blocking mode.
-    if (block_) {
-        unsigned long argp = 0;
-        int rc = ioctlsocket (r, FIONBIO, &argp);
-        wsa_assert (rc != SOCKET_ERROR);
-    }
-
-    //  Attempt to read an entire command. Returns EAGAIN if non-blocking
-    //  and a command is not available. Save value of errno if we wish to pass
-    //  it to caller.
-    int err = 0;
-    int nbytes = ::recv (r, (char *)cmd_, sizeof (command_t), 0);
-    if (nbytes == -1 && WSAGetLastError () == WSAEWOULDBLOCK)
-        err = EAGAIN;
-
-    //  Re-set the reader to non-blocking mode.
-    if (block_) {
-        unsigned long argp = 1;
-        int rc = ioctlsocket (r, FIONBIO, &argp);
-        wsa_assert (rc != SOCKET_ERROR);
-    }
-
-    //  If the recv failed, return with the saved errno.
-    if (err != 0) {
-        errno = err;
-        return -1;
-    }
-
-    //  Sanity check for success.
-    wsa_assert (nbytes != SOCKET_ERROR);
-
-    //  Check whether we haven't got half of command.
-    zmq_assert (nbytes == sizeof (command_t));
-
-    return 0;
-}
 
 #else
 
@@ -139,12 +109,18 @@ zmq::mailbox_t::mailbox_t ()
     rc = fcntl (w, F_SETFL, flags | O_NONBLOCK);
     errno_assert (rc == 0);
 
+    r_block = true;
 #ifndef MSG_DONTWAIT
     //  Set the reader to non-blocking mode.
-    flags = fcntl (r, F_GETFL, 0);
-    errno_assert (flags >= 0);
-    rc = fcntl (r, F_SETFL, flags | O_NONBLOCK);
-    errno_assert (rc == 0);
+    set_ioblock(false);
+#else
+    r_block = true;
+#endif
+
+    r_timeout = -1;
+#ifdef ZMQ_HAVE_SO_RCVTIMEO
+    // Initialize timeout on socket
+    set_rcvtimeo(0);
 #endif
 }
 
@@ -194,52 +170,131 @@ void zmq::mailbox_t::send (const command_t &cmd_)
     zmq_assert (nbytes == sizeof (command_t));
 }
 
-int zmq::mailbox_t::recv (command_t *cmd_, bool block_)
+#endif
+
+void zmq::mailbox_t::set_rcvtimeo(int timeout_)
 {
-#ifdef MSG_DONTWAIT
+    zmq_assert (r_timeout != timeout_);
 
-    //  Attempt to read an entire command. Returns EAGAIN if non-blocking
-    //  mode is requested and a command is not available.
-    ssize_t nbytes = ::recv (r, cmd_, sizeof (command_t),
-        block_ ? 0 : MSG_DONTWAIT);
-    if (nbytes == -1 && (errno == EAGAIN || errno == EINTR))
-        return -1;
+#ifdef ZMQ_HAVE_SO_RCVTIMEO
+    struct timeval rcvtimeout;
+    if (timeout_ <= 0) {
+        rcvtimeout.tv_sec = 0;
+        rcvtimeout.tv_usec = 0;
+    }
+    else {
+        rcvtimeout.tv_sec = timeout_ / 1000;
+        rcvtimeout.tv_usec = (timeout_ % 1000) * 1000;
+    }
+    rcvtimeout.tv_sec = timeout_ / 1000;
+    int rc = setsockopt(r, SOL_SOCKET, SO_RCVTIMEO, &rcvtimeout, sizeof (rcvtimeout));
+    errno_assert (rc != -1);
 #else
+    // Not supported
+    errno = ENOTSUP;
+    errno_assert (false);
+#endif
 
-    //  If required, set the reader to blocking mode.
-    if (block_) {
-        int flags = fcntl (r, F_GETFL, 0);
-        errno_assert (flags >= 0);
-        int rc = fcntl (r, F_SETFL, flags & ~O_NONBLOCK);
-        errno_assert (rc == 0);
+    r_timeout = timeout_;
+}
+
+void zmq::mailbox_t::set_ioblock(bool block_)
+{
+    zmq_assert (block_ != r_block);
+
+#ifdef ZMQ_HAVE_WINDOWS
+    unsigned long argp = block_ ? 0 : 1;
+    int rc = ioctlsocket (r, FIONBIO, &argp);
+    wsa_assert (rc != SOCKET_ERROR);
+#else
+    int flags = fcntl (r, F_GETFL, 0);
+    errno_assert (flags >= 0);
+
+    if (block_)
+        flags = flags & ~O_NONBLOCK;
+    else
+        flags = flags | O_NONBLOCK;
+    
+    int rc = fcntl (r, F_SETFL, flags | O_NONBLOCK);
+    errno_assert (rc == 0);
+#endif
+
+    r_block = block_;
+}
+
+int zmq::mailbox_t::recv (command_t *cmd_, int timeout_)
+{
+#ifdef ZMQ_HAVE_SO_RCVTIMEO
+    if (timeout_ != 0) {
+        // Used the blocking version if we support RCV_TIMEOUT.
+        if (r_timeout != timeout_)
+            set_rcvtimeo(timeout_);
+        return wait_recv(cmd_);
+    }
+#else
+    if (timeout_ == -1) {
+        // optimized version of recv for the special case of infinite
+        // timeout_ == -1.
+        return wait_recv(cmd_);
     }
+#endif
 
-    //  Attempt to read an entire command. Returns EAGAIN if non-blocking
-    //  and a command is not available. Save value of errno if we wish to pass
-    //  it to caller.
-    int err = 0;
-    ssize_t nbytes = ::recv (r, cmd_, sizeof (command_t), 0);
-    if (nbytes == -1 && (errno == EAGAIN || errno == EINTR))  
-        err = errno;
-
-    //  Re-set the reader to non-blocking mode.
-    if (block_) {
-        int flags = fcntl (r, F_GETFL, 0);
-        errno_assert (flags >= 0);
-        int rc = fcntl (r, F_SETFL, flags | O_NONBLOCK);
-        errno_assert (rc == 0);
+#if defined(MSG_DONTWAIT)
+    int flags = MSG_DONTWAIT;
+#else
+    if (r_block)
+        set_ioblock(false);
+    int flags = 0;
+#endif
+
+    ssize_t nbytes = ::recv (r, cmd_, sizeof (command_t), flags);
+
+#ifdef ZMQ_HAVE_WINDOWS
+    if (nbytes != SOCKET_ERROR) {
+#else
+    if (nbytes != -1) {
+#endif
+        //  Sanity check for success.
+        zmq_assert (nbytes == sizeof (command_t));
+
+        return 0;
     }
 
-    //  If the recv failed, return with the saved errno if set.
-    if (err != 0) {
-        errno = err;
+    if (timeout_ == 0) {
+#ifdef ZMQ_HAVE_WINDOWS
+        // Winsock error conversion
+        if (WSAGetLastError () == WSAEWOULDBLOCK)
+            errno = EAGAIN;
+#endif
         return -1;
     }
 
-#endif
+    if (timeout_ != -1) {
+        // Elsewhere, poll
+
+        zmq_pollitem_t item;
+        memset(&item, 0, sizeof item);
+        item.fd = r;
+        item.events = ZMQ_POLLIN;
+
+        int rc = zmq_poll (&item, 1, timeout_);
+        if (rc == 0)
+            errno = EAGAIN;
+        if (rc <= 0)
+            return -1;
+    
+        zmq_assert (rc == 1);
+        zmq_assert (item.revents == ZMQ_POLLIN);
+    }
+
+    nbytes = ::recv (r, cmd_, sizeof (command_t), flags);
 
     //  Sanity check for success.
+#ifdef ZMQ_HAVE_WINDOWS
+    wsa_assert (nbytes != SOCKET_ERROR);
+#else
     errno_assert (nbytes != -1);
+#endif
 
     //  Check whether we haven't got half of command.
     zmq_assert (nbytes == sizeof (command_t));
@@ -247,8 +302,41 @@ int zmq::mailbox_t::recv (command_t *cmd_, bool block_)
     return 0;
 }
 
+int zmq::mailbox_t::wait_recv(command_t *cmd_)
+{
+#ifndef MSG_DONTWAIT
+    if (!r_block)
+        set_ioblock(true);
+#endif
+
+    // Attempt to read an entire command.  Save value of errno if we
+    // wish to pass it to caller.
+    int nbytes = ::recv (r, (char *)cmd_, sizeof (command_t), 0);
+
+#ifdef ZMQ_HAVE_WINDOWS
+    if (nbytes == -1 && WSAGetLastError () == WSAEWOULDBLOCK)
+    {
+        errno = EAGAIN;
+        return -1;
+    }
+#else
+    if (nbytes == -1 && (errno == EAGAIN || errno == EINTR))
+        return -1;
 #endif
 
+    //  Sanity check for success.
+#ifdef ZMQ_HAVE_WINDOWS
+    wsa_assert (nbytes != SOCKET_ERROR);
+#else
+    errno_assert (nbytes != -1);
+#endif
+
+    //  Check whether we haven't got half of command.
+    zmq_assert (nbytes == sizeof (command_t));
+
+    return 0;
+}
+
 int zmq::mailbox_t::make_socketpair (fd_t *r_, fd_t *w_)
 {
 #if defined ZMQ_HAVE_WINDOWS
diff --git a/src/mailbox.hpp b/src/mailbox.hpp
index 96bf4eb..183b99c 100644
--- a/src/mailbox.hpp
+++ b/src/mailbox.hpp
@@ -41,13 +41,19 @@ namespace zmq
 
         fd_t get_fd ();
         void send (const command_t &cmd_);
-        int recv (command_t *cmd_, bool block_);
+        int recv (command_t *cmd_, int timeout_);
         
     private:
+        void set_rcvtimeo(int timeout_);
+        void set_ioblock(bool block_);
+        // common case, for optimizing the default case.
+        int wait_recv(command_t *cmd_);
 
         //  Write & read end of the socketpair.
         fd_t w;
         fd_t r;
+        bool r_block;
+        int r_timeout;
 
         //  Platform-dependent function to create a socketpair.
         static int make_socketpair (fd_t *r_, fd_t *w_);
diff --git a/src/options.cpp b/src/options.cpp
index 897e0f5..9a03849 100644
--- a/src/options.cpp
+++ b/src/options.cpp
@@ -40,7 +40,9 @@ zmq::options_t::options_t () :
     maxmsgsize (-1),
     requires_in (false),
     requires_out (false),
-    immediate_connect (true)
+    immediate_connect (true),
+    recv_timeout (-1),
+    send_timeout (-1)
 {
 }
 
@@ -174,6 +176,21 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
         multicast_hops = *((int*) optval_);
         return 0;
 
+    case ZMQ_RCVTIMEO:
+        if (optvallen_ != sizeof (int)) {
+            errno = EINVAL;
+            return -1;
+        }
+        recv_timeout = *((int*) optval_);
+        return 0;
+
+    case ZMQ_SNDTIMEO:
+        if (optvallen_ != sizeof (int)) {
+            errno = EINVAL;
+            return -1;
+        }
+        send_timeout = *((int*) optval_);
+        return 0;
     }
 
     errno = EINVAL;
@@ -319,6 +336,23 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
         *optvallen_ = sizeof (int);
         return 0;
 
+    case ZMQ_RCVTIMEO:
+        if (*optvallen_ < sizeof (int)) {
+            errno = EINVAL;
+            return -1;
+        }
+        *((int*) optval_) = recv_timeout;
+        *optvallen_ = sizeof (int);
+        return 0;
+
+    case ZMQ_SNDTIMEO:
+        if (*optvallen_ < sizeof (int)) {
+            errno = EINVAL;
+            return -1;
+        }
+        *((int*) optval_) = send_timeout;
+        *optvallen_ = sizeof (int);
+        return 0;
     }
 
     errno = EINVAL;
diff --git a/src/options.hpp b/src/options.hpp
index 53d0197..d6f6fd4 100644
--- a/src/options.hpp
+++ b/src/options.hpp
@@ -85,6 +85,10 @@ namespace zmq
         //  is not aware of the peer's identity, however, it is able to send
         //  messages straight away.
         bool immediate_connect;
+
+        // The default timeout to wait on send/recv operation for this socket.
+        int recv_timeout;
+        int send_timeout;
     };
 
 }
diff --git a/src/reaper.cpp b/src/reaper.cpp
index d3ebbba..f94f7c1 100644
--- a/src/reaper.cpp
+++ b/src/reaper.cpp
@@ -61,7 +61,7 @@ void zmq::reaper_t::in_event ()
 
         //  Get the next command. If there is none, exit.
         command_t cmd;
-        int rc = mailbox.recv (&cmd, false);
+        int rc = mailbox.recv (&cmd, 0);
         if (rc != 0 && errno == EINTR)
             continue;
         if (rc != 0 && errno == EAGAIN)
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index 3e104a8..e852c42 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -276,7 +276,7 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_,
             errno = EINVAL;
             return -1;
         }
-        int rc = process_commands (false, false);
+        int rc = process_commands (0, false);
         if (rc != 0 && (errno == EINTR || errno == ETERM))
             return -1;
         errno_assert (rc == 0);
@@ -478,7 +478,7 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_)
     }
 
     //  Process pending commands, if any.
-    int rc = process_commands (false, true);
+    int rc = process_commands (0, true);
     if (unlikely (rc != 0))
         return -1;
 
@@ -498,12 +498,22 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_)
 
     //  Oops, we couldn't send the message. Wait for the next
     //  command, process it and try to send the message again.
+    int timeout = options.send_timeout;
+    clock_t clock;
+    uint64_t last = clock.now_ms ();
     while (rc != 0) {
         if (errno != EAGAIN)
             return -1;
-        if (unlikely (process_commands (true, false) != 0))
+        if (unlikely (process_commands (timeout, false) != 0))
             return -1;
         rc = xsend (msg_, flags_);
+        if (rc != 0 && timeout != -1 && errno == EAGAIN) {
+            uint64_t now = clock.now_ms ();
+            if (now >= last + timeout)
+                return -1;
+            timeout -= now-last;
+            last = now;
+        }
     }
     return 0;
 }
@@ -535,7 +545,7 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
     //  described above) from the one used by 'send'. This is because counting
     //  ticks is more efficient than doing RDTSC all the time.
     if (++ticks == inbound_poll_rate) {
-        if (unlikely (process_commands (false, false) != 0))
+        if (unlikely (process_commands (0, false) != 0))
             return -1;
         ticks = 0;
     }
@@ -553,12 +563,12 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
 
     //  If the message cannot be fetched immediately, there are two scenarios.
     //  For non-blocking recv, commands are processed in case there's an
-    //  activate_reader command already waiting int a command pipe.
+    //  activate_reader command already waiting in a command pipe.
     //  If it's not, return EAGAIN.
     if (flags_ & ZMQ_DONTWAIT) {
         if (errno != EAGAIN)
             return -1;
-        if (unlikely (process_commands (false, false) != 0))
+        if (unlikely (process_commands (0, false) != 0))
             return -1;
         ticks = 0;
 
@@ -573,15 +583,25 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
 
     //  In blocking scenario, commands are processed over and over again until
     //  we are able to fetch a message.
-    bool block = (ticks != 0);
+    int timeout = (ticks != 0) ? options.recv_timeout : 0;
+    clock_t clock;
+    uint64_t last = clock.now_ms();
     while (rc != 0) {
         if (errno != EAGAIN)
             return -1;
-        if (unlikely (process_commands (block, false) != 0))
+        if (unlikely (process_commands (timeout, false) != 0))
             return -1;
         rc = xrecv (msg_, flags_);
+        if (ticks != 0)
+            timeout = options.recv_timeout;
         ticks = 0;
-        block = true;
+        if (rc != 0 && timeout != -1 && errno == EAGAIN) {
+            uint64_t now = clock.now_ms();
+            if (now >= last + timeout)
+                return -1;
+            timeout -= now-last;
+            last = now;
+        }
     }
 
     rcvmore = msg_->flags () & msg_t::more;
@@ -655,17 +675,18 @@ void zmq::socket_base_t::start_reaping (poller_t *poller_)
     poller->set_pollin (handle);
 }
 
-int zmq::socket_base_t::process_commands (bool block_, bool throttle_)
+int zmq::socket_base_t::process_commands (int timeout_, bool throttle_)
 {
     int rc;
     command_t cmd;
-    if (block_) {
-        rc = mailbox.recv (&cmd, true);
-        if (rc == -1 && errno == EINTR)
+    if (timeout_ != 0) {
+        rc = mailbox.recv (&cmd, timeout_);
+        if (rc == -1 && (errno == EINTR || errno == EAGAIN))
             return -1;
         errno_assert (rc == 0);
     }
     else {
+        zmq_assert (timeout_ == 0);
 
         //  Get the CPU's tick counter. If 0, the counter is not available.
         uint64_t tsc = zmq::clock_t::rdtsc ();
@@ -687,7 +708,7 @@ int zmq::socket_base_t::process_commands (bool block_, bool throttle_)
         }
 
         //  Check whether there are any commands pending for this thread.
-        rc = mailbox.recv (&cmd, false);
+        rc = mailbox.recv (&cmd, 0);
     }
 
     //  Process all the commands available at the moment.
@@ -698,7 +719,7 @@ int zmq::socket_base_t::process_commands (bool block_, bool throttle_)
             return -1;
         errno_assert (rc == 0);
         cmd.destination->process_command (cmd);
-        rc = mailbox.recv (&cmd, false);
+        rc = mailbox.recv (&cmd, 0);
      }
 
     if (ctx_terminated) {
@@ -777,7 +798,7 @@ void zmq::socket_base_t::in_event ()
 {
     //  Process any commands from other threads/sockets that may be available
     //  at the moment. Ultimately, socket will be destroyed.
-    process_commands (false, false);
+    process_commands (0, false);
     check_destroy ();
 }
 
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index 0a5c574..82745e2 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -164,7 +164,7 @@ namespace zmq
         //  set to true, returns only after at least one command was processed.
         //  If throttle argument is true, commands are processed at most once
         //  in a predefined time period.
-        int process_commands (bool block_, bool throttle_);
+        int process_commands (int timeout_, bool throttle_);
 
         //  Handlers for incoming commands.
         void process_stop ();
diff --git a/tests/Makefile.am b/tests/Makefile.am
index ebbc46c..36d6ee6 100644
--- a/tests/Makefile.am
+++ b/tests/Makefile.am
@@ -5,7 +5,9 @@ noinst_PROGRAMS = test_pair_inproc \
                   test_pair_tcp \
                   test_reqrep_inproc \
                   test_reqrep_tcp \
-                  test_hwm
+                  test_hwm \
+                  test_timeout \
+                  test_timeout2
 
 if !ON_MINGW
 noinst_PROGRAMS += test_shutdown_stress \
@@ -21,6 +23,9 @@ test_reqrep_tcp_SOURCES = test_reqrep_tcp.cpp testutil.hpp
 
 test_hwm_SOURCES = test_hwm.cpp
 
+test_timeout_SOURCES = test_timeout.cpp
+test_timeout2_SOURCES = test_timeout2.cpp
+
 if !ON_MINGW
 test_shutdown_stress_SOURCES = test_shutdown_stress.cpp
 test_pair_ipc_SOURCES = test_pair_ipc.cpp testutil.hpp
diff --git a/tests/test_timeout.cpp b/tests/test_timeout.cpp
new file mode 100644
index 0000000..adf633d
--- /dev/null
+++ b/tests/test_timeout.cpp
@@ -0,0 +1,128 @@
+/*
+    Copyright (c) 2007-2011 iMatix Corporation
+    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+
+    This file is part of 0MQ.
+
+    0MQ is free software; you can redistribute it and/or modify it under
+    the terms of the GNU Lesser General Public License as published by
+    the Free Software Foundation; either version 3 of the License, or
+    (at your option) any later version.
+
+    0MQ is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU Lesser General Public License for more details.
+
+    You should have received a copy of the GNU Lesser General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include <assert.h>
+#include <string.h>
+
+#include "../include/zmq.h"
+#include <stdio.h>
+#include <time.h>
+#include <sys/types.h>
+// Deprecated but portable method
+#include <sys/timeb.h>
+#define TIMEOUT 500
+
+int elapsed_ms(struct timeb& start, struct timeb& end) {
+    return (end.time - start.time) * 1000 + (end.millitm - start.millitm);
+}
+
+int main (int argc, char *argv []) {
+
+    void *ctx = zmq_init (1);
+    assert (ctx);
+
+
+    const char* transport = "inproc://timeout_test";
+
+    void *sb = zmq_socket (ctx, ZMQ_PAIR);
+    assert (sb);
+    int rc = zmq_bind (sb, transport);
+    assert (rc == 0);
+
+    // Recv part
+
+    char buf [32];
+    rc = zmq_recv (sb, buf, 32, ZMQ_DONTWAIT);
+    assert (rc == -1);
+    assert (zmq_errno() == EAGAIN);
+
+    int timeout = TIMEOUT;
+    size_t timeout_size = sizeof timeout;
+    rc = zmq_setsockopt(sb, ZMQ_RCVTIMEO, &timeout, timeout_size);
+    assert (rc == 0);
+
+    timeout = -1;
+    rc = zmq_getsockopt(sb, ZMQ_RCVTIMEO, &timeout, &timeout_size);
+    assert (rc == 0);
+    assert (timeout_size == sizeof timeout);
+    assert (timeout == TIMEOUT);
+    
+    struct timeb start;
+    ftime (&start);
+    rc = zmq_recv (sb, buf, 32, 0);
+    struct timeb end;
+    ftime (&end);
+    assert (rc == -1);
+    assert (zmq_errno () == EAGAIN);
+    int elapsed = elapsed_ms(start, end);
+    assert (TIMEOUT*0.99 <= elapsed);
+    assert (elapsed <= TIMEOUT*1.01);
+
+    // Send part
+
+    rc = zmq_send (sb, buf, 32, ZMQ_DONTWAIT);
+    assert (rc == -1);
+    assert (zmq_errno() == EAGAIN);
+
+    rc = zmq_setsockopt(sb, ZMQ_SNDTIMEO, &timeout, timeout_size);
+    assert (rc == 0);
+
+    timeout = -1;
+    rc = zmq_getsockopt(sb, ZMQ_SNDTIMEO, &timeout, &timeout_size);
+    assert (rc == 0);
+    assert (timeout_size == sizeof timeout);
+    assert (timeout == TIMEOUT);
+    
+    ftime (&start);
+    rc = zmq_send (sb, buf, 32, 0);
+    ftime (&end);
+    assert (rc == -1);
+    assert (zmq_errno () == EAGAIN);
+    elapsed = elapsed_ms(start, end);
+    assert (TIMEOUT*0.99 <= elapsed);
+    assert (elapsed <= TIMEOUT*1.01);
+
+    // Check everything is good otherwise
+
+    void *sc = zmq_socket (ctx, ZMQ_PAIR);
+    assert (sc);
+    rc = zmq_connect (sc, transport);
+    assert (rc == 0);
+
+    const char *content = "12345678ABCDEFGH12345678abcdefgh";
+
+    rc = zmq_send (sc, content, 32, 0);
+    assert (rc == 32);
+    
+    rc = zmq_recv (sb, buf, 32, 0);
+    assert (rc == 32);
+    assert (memcmp (buf, content, 32) == 0);    
+
+    rc = zmq_close (sc);
+    assert (rc == 0);
+
+    rc = zmq_close (sb);
+    assert (rc == 0);
+
+    rc = zmq_term (ctx);
+    assert (rc == 0);
+
+    return 0 ;
+}
diff --git a/tests/test_timeout2.cpp b/tests/test_timeout2.cpp
new file mode 100644
index 0000000..fe53018
--- /dev/null
+++ b/tests/test_timeout2.cpp
@@ -0,0 +1,135 @@
+/*
+    Copyright (c) 2007-2011 iMatix Corporation
+    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+
+    This file is part of 0MQ.
+
+    0MQ is free software; you can redistribute it and/or modify it under
+    the terms of the GNU Lesser General Public License as published by
+    the Free Software Foundation; either version 3 of the License, or
+    (at your option) any later version.
+
+    0MQ is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU Lesser General Public License for more details.
+
+    You should have received a copy of the GNU Lesser General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include <assert.h>
+#include <string.h>
+
+#include "../include/zmq.h"
+#include "../include/zmq_utils.h"
+#include <stdio.h>
+#include <time.h>
+#include <sys/types.h>
+// Deprecated but portable method
+#include <sys/timeb.h>
+#include <pthread.h>
+
+#define TIMEOUT 2000
+
+const char* transport = "inproc://timeout2";
+
+int elapsed_ms(struct timeb& start, struct timeb& end) {
+    return (end.time - start.time) * 1000 + (end.millitm - start.millitm);
+}
+
+extern "C" {
+    void* worker(void *ctx) {
+
+        void *sb = zmq_socket (ctx, ZMQ_PAIR);
+        assert (sb);
+
+        int timeout = TIMEOUT;
+        int rc = zmq_setsockopt(sb, ZMQ_RCVTIMEO, &timeout, sizeof (timeout));
+        assert (rc == 0);
+        rc = zmq_setsockopt(sb, ZMQ_SNDTIMEO, &timeout, sizeof (timeout));
+        assert (rc == 0);
+
+        int hwm = 1;
+        rc = zmq_setsockopt(sb, ZMQ_SNDHWM, &hwm, sizeof (hwm));
+        assert (rc == 0);
+
+        rc = zmq_bind (sb, transport);
+        assert (rc == 0);
+
+        char buf [32];
+        struct timeb start;
+        struct timeb end;
+
+        ftime (&start);
+        rc = zmq_recv (sb, buf, 32, 0);
+        ftime (&end);
+        assert (rc == -1);
+        assert (zmq_errno () == EAGAIN);
+        int elapsed = elapsed_ms(start, end);
+        assert (TIMEOUT*0.99 <= elapsed);
+        assert (elapsed <= TIMEOUT*1.01);
+
+        const char *content = "12345678ABCDEFGH12345678abcdefgh";
+        do {
+            rc = zmq_send(sb, content, 32, ZMQ_DONTWAIT);
+        } while (rc == 32);
+        assert (rc == -1);
+        assert (errno == EAGAIN);
+
+        ftime (&start);
+        rc = zmq_send (sb, content, 32, 0);
+        ftime (&end);
+        assert (rc == -1);
+        assert (zmq_errno () == EAGAIN);
+        elapsed = elapsed_ms(start, end);
+        assert (TIMEOUT*0.99 <= elapsed);
+        assert (elapsed <= TIMEOUT*1.01);
+
+        rc = zmq_close (sb);
+        assert (rc == 0);
+
+        return NULL;
+    }
+}
+
+int main (int argc, char *argv []) {
+
+    void *ctx = zmq_init (1);
+    assert (ctx);
+
+    pthread_t thread;
+    int rc = pthread_create (&thread, NULL, worker, ctx);
+    assert (rc == 0);
+
+    // let it go a little bit
+    zmq_sleep (1);
+
+    // Connecting to the transport will send a message to the mailbox
+    // of the subscriber but shouldn't the modified the timeout time.
+    void *sc = zmq_socket (ctx, ZMQ_PAIR);
+    assert (sc);
+    // set hwm to 1 for zmq_send test.
+    int hwm = 1;
+    rc = zmq_setsockopt (sc, ZMQ_RCVHWM, &hwm, sizeof (hwm));
+    assert (rc == 0);
+    rc = zmq_connect (sc, transport);
+    assert (rc == 0);
+
+    // sleep again...
+    zmq_sleep (2);
+
+    // the worker should now be in blocking send.
+    // create a second message by closing the socket.
+    rc = zmq_close (sc);
+    assert (rc == 0);
+
+    rc = pthread_join (thread, NULL);
+    assert (rc == 0);
+
+
+    rc = zmq_term (ctx);
+    assert (rc == 0);
+
+    return 0 ;
+}
-- 
1.7.5.3

_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev

Reply via email to