OK, I add a test for this condition, find a bug in my previous code and fix this. Nothing better than a test to check your work :)
Here the new patch. Fabien 2011/6/11 Martin Sustrik <[email protected]>: > On 06/11/2011 05:59 PM, Fabien Niñoles wrote: >> >> Here the new patch. I wasn't able however to make the test fail >> without the fix. If you can figure out a way to achieve this, your >> welcome. > > That looks better :) > > As for the test case do it this way: > > 1. bind a SUB socket > 2. start receiving with timeout of 10 secs > 3. after 9 secs connect to the SUB from another application but don't send > any messages. > 4. the recv call shoul timeout after 19 secs (as opposed to 10). > > Martin >
From ec78fa599ff6822888bbc5f16a88815b4e8e1340 Mon Sep 17 00:00:00 2001 From: Fabien Ninoles <[email protected]> Date: Mon, 23 May 2011 13:33:12 -0400 Subject: [PATCH] Add sockopt ZMQ_RCVTIMEO/ZMQ_SNDTIMEO. - Add doc and tests. - Add options and setup. - Wait using zmq_pool in mailbox_t::recv. (Windows version untested). Signed-off-by: Fabien Ninoles <[email protected]> --- .gitignore | 1 + doc/zmq_getsockopt.txt | 36 +++++++++++++ doc/zmq_setsockopt.txt | 36 +++++++++++++ include/zmq.h | 2 + src/ctx.cpp | 2 +- src/io_thread.cpp | 2 +- src/mailbox.cpp | 114 ++++++++++++++-------------------------- src/mailbox.hpp | 2 +- src/options.cpp | 36 ++++++++++++- src/options.hpp | 4 ++ src/reaper.cpp | 2 +- src/socket_base.cpp | 53 +++++++++++++------ src/socket_base.hpp | 2 +- tests/Makefile.am | 7 ++- tests/test_timeout.cpp | 128 ++++++++++++++++++++++++++++++++++++++++++++ tests/test_timeout2.cpp | 135 +++++++++++++++++++++++++++++++++++++++++++++++ 16 files changed, 464 insertions(+), 98 deletions(-) create mode 100644 tests/test_timeout.cpp create mode 100644 tests/test_timeout2.cpp diff --git a/.gitignore b/.gitignore index b309c73..abde4cd 100644 --- a/.gitignore +++ b/.gitignore @@ -57,3 +57,4 @@ foreign/openpgm/* !foreign/openpgm/Makefile.am zeromq-*.tar.gz zeromq-*.zip +tests/test_timeout diff --git a/doc/zmq_getsockopt.txt b/doc/zmq_getsockopt.txt index 97b4032..4450544 100644 --- a/doc/zmq_getsockopt.txt +++ b/doc/zmq_getsockopt.txt @@ -357,6 +357,42 @@ Default value:: N/A Applicable socket types:: all +ZMQ_RCVTIMEO: Maximum time before a socket operation returns with EAGAIN +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Retrieve the timeout for recv operation on the socket. The value is +interpreted the same way as the timeout argument of _zmq_poll(3)_: If +the value is `0`, the operation shall return immediately, with a +EAGAIN error if no events were present. If the value is `-1`, the +operation shall block indefinitely until an event has occurred on the +socket. For all other values, the operation will wait that value +before returning with an EAGAIN error. + +[horizontal] +Option value type:: int +Option value unit:: milliseconds +Default value:: -1 (infinite) +Applicable socket types:: all + + +ZMQ_SNDTIMEO: Maximum time before a socket operation returns with EAGAIN +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Retrieve the timeout for send operation on the socket. The value is +interpreted the same way as the timeout argument of _zmq_poll(3)_: If +the value is `0`, the operation shall return immediately, with a +EAGAIN error if no events were present. If the value is `-1`, the +operation shall block indefinitely until an event has occurred on the +socket. For all other values, the operation will wait that value +before returning with an EAGAIN error. + +[horizontal] +Option value type:: int +Option value unit:: milliseconds +Default value:: -1 (infinite) +Applicable socket types:: all + + RETURN VALUE ------------ The _zmq_getsockopt()_ function shall return zero if successful. Otherwise it diff --git a/doc/zmq_setsockopt.txt b/doc/zmq_setsockopt.txt index ed3b3a7..a89a7f7 100644 --- a/doc/zmq_setsockopt.txt +++ b/doc/zmq_setsockopt.txt @@ -309,6 +309,42 @@ Option value unit:: network hops Default value:: 1 Applicable socket types:: all, when using multicast transports + +ZMQ_RCVTIMEO: Maximum time before a recv operation returns with EAGAIN +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Sets the timeout for recv operation on the socket. The value is +interpreted the same way as the timeout argument of _zmq_poll(3)_: If +the value is `0`, the operation shall return immediately, with a +EAGAIN error if no events were present. If the value is `-1`, the +operation shall block indefinitely until an event has occurred on the +socket. For all other values, the operation will wait that value +before returning with an EAGAIN error. + +[horizontal] +Option value type:: int +Option value unit:: milliseconds +Default value:: -1 (infinite) +Applicable socket types:: all + +ZMQ_SNDTIMEO: Maximum time before a send operation returns with EAGAIN +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Sets the timeout for send operation on the socket. The value is +interpreted the same way as the timeout argument of _zmq_poll(3)_: If +the value is `0`, the operation shall return immediately, with a +EAGAIN error if no events were present. If the value is `-1`, the +operation shall block indefinitely until an event has occurred on the +socket. For all other values, the operation will wait that value +before returning with an EAGAIN error. + +[horizontal] +Option value type:: int +Option value unit:: milliseconds +Default value:: -1 (infinite) +Applicable socket types:: all + + RETURN VALUE ------------ The _zmq_setsockopt()_ function shall return zero if successful. Otherwise it diff --git a/include/zmq.h b/include/zmq.h index 40dffd9..5f0fec6 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -180,6 +180,8 @@ ZMQ_EXPORT int zmq_term (void *context); #define ZMQ_SNDHWM 23 #define ZMQ_RCVHWM 24 #define ZMQ_MULTICAST_HOPS 25 +#define ZMQ_RCVTIMEO 26 +#define ZMQ_SNDTIMEO 27 /* Send/recv options. */ #define ZMQ_DONTWAIT 1 diff --git a/src/ctx.cpp b/src/ctx.cpp index fb5420d..783bcba 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -142,7 +142,7 @@ int zmq::ctx_t::terminate () // Wait till reaper thread closes all the sockets. command_t cmd; - int rc = term_mailbox.recv (&cmd, true); + int rc = term_mailbox.recv (&cmd, -1); if (rc == -1 && errno == EINTR) return -1; zmq_assert (rc == 0); diff --git a/src/io_thread.cpp b/src/io_thread.cpp index 9678392..c6f3880 100644 --- a/src/io_thread.cpp +++ b/src/io_thread.cpp @@ -70,7 +70,7 @@ void zmq::io_thread_t::in_event () // Get the next command. If there is none, exit. command_t cmd; - int rc = mailbox.recv (&cmd, false); + int rc = mailbox.recv (&cmd, 0); if (rc != 0 && errno == EINTR) continue; if (rc != 0 && errno == EAGAIN) diff --git a/src/mailbox.cpp b/src/mailbox.cpp index 221396b..eff1de2 100644 --- a/src/mailbox.cpp +++ b/src/mailbox.cpp @@ -36,6 +36,9 @@ #include <sys/socket.h> #endif +#include <pthread.h> +#include <poller.hpp> + zmq::fd_t zmq::mailbox_t::get_fd () { return r; @@ -79,44 +82,6 @@ void zmq::mailbox_t::send (const command_t &cmd_) zmq_assert (nbytes == sizeof (command_t)); } -int zmq::mailbox_t::recv (command_t *cmd_, bool block_) -{ - // If required, set the reader to blocking mode. - if (block_) { - unsigned long argp = 0; - int rc = ioctlsocket (r, FIONBIO, &argp); - wsa_assert (rc != SOCKET_ERROR); - } - - // Attempt to read an entire command. Returns EAGAIN if non-blocking - // and a command is not available. Save value of errno if we wish to pass - // it to caller. - int err = 0; - int nbytes = ::recv (r, (char *)cmd_, sizeof (command_t), 0); - if (nbytes == -1 && WSAGetLastError () == WSAEWOULDBLOCK) - err = EAGAIN; - - // Re-set the reader to non-blocking mode. - if (block_) { - unsigned long argp = 1; - int rc = ioctlsocket (r, FIONBIO, &argp); - wsa_assert (rc != SOCKET_ERROR); - } - - // If the recv failed, return with the saved errno. - if (err != 0) { - errno = err; - return -1; - } - - // Sanity check for success. - wsa_assert (nbytes != SOCKET_ERROR); - - // Check whether we haven't got half of command. - zmq_assert (nbytes == sizeof (command_t)); - - return 0; -} #else @@ -139,13 +104,11 @@ zmq::mailbox_t::mailbox_t () rc = fcntl (w, F_SETFL, flags | O_NONBLOCK); errno_assert (rc == 0); -#ifndef MSG_DONTWAIT // Set the reader to non-blocking mode. flags = fcntl (r, F_GETFL, 0); errno_assert (flags >= 0); rc = fcntl (r, F_SETFL, flags | O_NONBLOCK); errno_assert (rc == 0); -#endif } zmq::mailbox_t::~mailbox_t () @@ -194,52 +157,55 @@ void zmq::mailbox_t::send (const command_t &cmd_) zmq_assert (nbytes == sizeof (command_t)); } -int zmq::mailbox_t::recv (command_t *cmd_, bool block_) +#endif + +int zmq::mailbox_t::recv (command_t *cmd_, int timeout_) { -#ifdef MSG_DONTWAIT + ssize_t nbytes = ::recv (r, cmd_, sizeof (command_t), 0); - // Attempt to read an entire command. Returns EAGAIN if non-blocking - // mode is requested and a command is not available. - ssize_t nbytes = ::recv (r, cmd_, sizeof (command_t), - block_ ? 0 : MSG_DONTWAIT); - if (nbytes == -1 && (errno == EAGAIN || errno == EINTR)) - return -1; +#ifdef ZMQ_HAVE_WINDOWS + if (nbytes != SOCKET_ERROR) { #else + if (nbytes != -1) { +#endif + // Sanity check for success. + zmq_assert (nbytes == sizeof (command_t)); - // If required, set the reader to blocking mode. - if (block_) { - int flags = fcntl (r, F_GETFL, 0); - errno_assert (flags >= 0); - int rc = fcntl (r, F_SETFL, flags & ~O_NONBLOCK); - errno_assert (rc == 0); + return 0; } - // Attempt to read an entire command. Returns EAGAIN if non-blocking - // and a command is not available. Save value of errno if we wish to pass - // it to caller. - int err = 0; - ssize_t nbytes = ::recv (r, cmd_, sizeof (command_t), 0); - if (nbytes == -1 && (errno == EAGAIN || errno == EINTR)) - err = errno; - - // Re-set the reader to non-blocking mode. - if (block_) { - int flags = fcntl (r, F_GETFL, 0); - errno_assert (flags >= 0); - int rc = fcntl (r, F_SETFL, flags | O_NONBLOCK); - errno_assert (rc == 0); + if (timeout_ == 0) { +#ifdef ZMQ_HAVE_WINDOWS + // Winsock error conversion + if (WSAGetLastError () == WSAEWOULDBLOCK) + errno = EAGAIN; +#endif + return -1; } - // If the recv failed, return with the saved errno if set. - if (err != 0) { - errno = err; + // Elsewhere, poll + zmq_pollitem_t item; + memset(&item, 0, sizeof item); + item.fd = r; + item.events = ZMQ_POLLIN; + + int rc = zmq_poll (&item, 1, timeout_); + if (rc == 0) + errno = EAGAIN; + if (rc <= 0) return -1; - } + + zmq_assert (rc == 1); + zmq_assert (item.revents == ZMQ_POLLIN); -#endif + nbytes = ::recv (r, cmd_, sizeof (command_t), 0); // Sanity check for success. +#ifdef ZMQ_HAVE_WINDOWS + wsa_assert (nbytes != SOCKET_ERROR); +#else errno_assert (nbytes != -1); +#endif // Check whether we haven't got half of command. zmq_assert (nbytes == sizeof (command_t)); @@ -247,8 +213,6 @@ int zmq::mailbox_t::recv (command_t *cmd_, bool block_) return 0; } -#endif - int zmq::mailbox_t::make_socketpair (fd_t *r_, fd_t *w_) { #if defined ZMQ_HAVE_WINDOWS diff --git a/src/mailbox.hpp b/src/mailbox.hpp index 96bf4eb..5867764 100644 --- a/src/mailbox.hpp +++ b/src/mailbox.hpp @@ -41,7 +41,7 @@ namespace zmq fd_t get_fd (); void send (const command_t &cmd_); - int recv (command_t *cmd_, bool block_); + int recv (command_t *cmd_, int timeout_); private: diff --git a/src/options.cpp b/src/options.cpp index 897e0f5..9a03849 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -40,7 +40,9 @@ zmq::options_t::options_t () : maxmsgsize (-1), requires_in (false), requires_out (false), - immediate_connect (true) + immediate_connect (true), + recv_timeout (-1), + send_timeout (-1) { } @@ -174,6 +176,21 @@ int zmq::options_t::setsockopt (int option_, const void *optval_, multicast_hops = *((int*) optval_); return 0; + case ZMQ_RCVTIMEO: + if (optvallen_ != sizeof (int)) { + errno = EINVAL; + return -1; + } + recv_timeout = *((int*) optval_); + return 0; + + case ZMQ_SNDTIMEO: + if (optvallen_ != sizeof (int)) { + errno = EINVAL; + return -1; + } + send_timeout = *((int*) optval_); + return 0; } errno = EINVAL; @@ -319,6 +336,23 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_) *optvallen_ = sizeof (int); return 0; + case ZMQ_RCVTIMEO: + if (*optvallen_ < sizeof (int)) { + errno = EINVAL; + return -1; + } + *((int*) optval_) = recv_timeout; + *optvallen_ = sizeof (int); + return 0; + + case ZMQ_SNDTIMEO: + if (*optvallen_ < sizeof (int)) { + errno = EINVAL; + return -1; + } + *((int*) optval_) = send_timeout; + *optvallen_ = sizeof (int); + return 0; } errno = EINVAL; diff --git a/src/options.hpp b/src/options.hpp index 53d0197..d6f6fd4 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -85,6 +85,10 @@ namespace zmq // is not aware of the peer's identity, however, it is able to send // messages straight away. bool immediate_connect; + + // The default timeout to wait on send/recv operation for this socket. + int recv_timeout; + int send_timeout; }; } diff --git a/src/reaper.cpp b/src/reaper.cpp index d3ebbba..f94f7c1 100644 --- a/src/reaper.cpp +++ b/src/reaper.cpp @@ -61,7 +61,7 @@ void zmq::reaper_t::in_event () // Get the next command. If there is none, exit. command_t cmd; - int rc = mailbox.recv (&cmd, false); + int rc = mailbox.recv (&cmd, 0); if (rc != 0 && errno == EINTR) continue; if (rc != 0 && errno == EAGAIN) diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 3e104a8..e852c42 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -276,7 +276,7 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_, errno = EINVAL; return -1; } - int rc = process_commands (false, false); + int rc = process_commands (0, false); if (rc != 0 && (errno == EINTR || errno == ETERM)) return -1; errno_assert (rc == 0); @@ -478,7 +478,7 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_) } // Process pending commands, if any. - int rc = process_commands (false, true); + int rc = process_commands (0, true); if (unlikely (rc != 0)) return -1; @@ -498,12 +498,22 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_) // Oops, we couldn't send the message. Wait for the next // command, process it and try to send the message again. + int timeout = options.send_timeout; + clock_t clock; + uint64_t last = clock.now_ms (); while (rc != 0) { if (errno != EAGAIN) return -1; - if (unlikely (process_commands (true, false) != 0)) + if (unlikely (process_commands (timeout, false) != 0)) return -1; rc = xsend (msg_, flags_); + if (rc != 0 && timeout != -1 && errno == EAGAIN) { + uint64_t now = clock.now_ms (); + if (now >= last + timeout) + return -1; + timeout -= now-last; + last = now; + } } return 0; } @@ -535,7 +545,7 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_) // described above) from the one used by 'send'. This is because counting // ticks is more efficient than doing RDTSC all the time. if (++ticks == inbound_poll_rate) { - if (unlikely (process_commands (false, false) != 0)) + if (unlikely (process_commands (0, false) != 0)) return -1; ticks = 0; } @@ -553,12 +563,12 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_) // If the message cannot be fetched immediately, there are two scenarios. // For non-blocking recv, commands are processed in case there's an - // activate_reader command already waiting int a command pipe. + // activate_reader command already waiting in a command pipe. // If it's not, return EAGAIN. if (flags_ & ZMQ_DONTWAIT) { if (errno != EAGAIN) return -1; - if (unlikely (process_commands (false, false) != 0)) + if (unlikely (process_commands (0, false) != 0)) return -1; ticks = 0; @@ -573,15 +583,25 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_) // In blocking scenario, commands are processed over and over again until // we are able to fetch a message. - bool block = (ticks != 0); + int timeout = (ticks != 0) ? options.recv_timeout : 0; + clock_t clock; + uint64_t last = clock.now_ms(); while (rc != 0) { if (errno != EAGAIN) return -1; - if (unlikely (process_commands (block, false) != 0)) + if (unlikely (process_commands (timeout, false) != 0)) return -1; rc = xrecv (msg_, flags_); + if (ticks != 0) + timeout = options.recv_timeout; ticks = 0; - block = true; + if (rc != 0 && timeout != -1 && errno == EAGAIN) { + uint64_t now = clock.now_ms(); + if (now >= last + timeout) + return -1; + timeout -= now-last; + last = now; + } } rcvmore = msg_->flags () & msg_t::more; @@ -655,17 +675,18 @@ void zmq::socket_base_t::start_reaping (poller_t *poller_) poller->set_pollin (handle); } -int zmq::socket_base_t::process_commands (bool block_, bool throttle_) +int zmq::socket_base_t::process_commands (int timeout_, bool throttle_) { int rc; command_t cmd; - if (block_) { - rc = mailbox.recv (&cmd, true); - if (rc == -1 && errno == EINTR) + if (timeout_ != 0) { + rc = mailbox.recv (&cmd, timeout_); + if (rc == -1 && (errno == EINTR || errno == EAGAIN)) return -1; errno_assert (rc == 0); } else { + zmq_assert (timeout_ == 0); // Get the CPU's tick counter. If 0, the counter is not available. uint64_t tsc = zmq::clock_t::rdtsc (); @@ -687,7 +708,7 @@ int zmq::socket_base_t::process_commands (bool block_, bool throttle_) } // Check whether there are any commands pending for this thread. - rc = mailbox.recv (&cmd, false); + rc = mailbox.recv (&cmd, 0); } // Process all the commands available at the moment. @@ -698,7 +719,7 @@ int zmq::socket_base_t::process_commands (bool block_, bool throttle_) return -1; errno_assert (rc == 0); cmd.destination->process_command (cmd); - rc = mailbox.recv (&cmd, false); + rc = mailbox.recv (&cmd, 0); } if (ctx_terminated) { @@ -777,7 +798,7 @@ void zmq::socket_base_t::in_event () { // Process any commands from other threads/sockets that may be available // at the moment. Ultimately, socket will be destroyed. - process_commands (false, false); + process_commands (0, false); check_destroy (); } diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 0a5c574..82745e2 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -164,7 +164,7 @@ namespace zmq // set to true, returns only after at least one command was processed. // If throttle argument is true, commands are processed at most once // in a predefined time period. - int process_commands (bool block_, bool throttle_); + int process_commands (int timeout_, bool throttle_); // Handlers for incoming commands. void process_stop (); diff --git a/tests/Makefile.am b/tests/Makefile.am index ebbc46c..36d6ee6 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -5,7 +5,9 @@ noinst_PROGRAMS = test_pair_inproc \ test_pair_tcp \ test_reqrep_inproc \ test_reqrep_tcp \ - test_hwm + test_hwm \ + test_timeout \ + test_timeout2 if !ON_MINGW noinst_PROGRAMS += test_shutdown_stress \ @@ -21,6 +23,9 @@ test_reqrep_tcp_SOURCES = test_reqrep_tcp.cpp testutil.hpp test_hwm_SOURCES = test_hwm.cpp +test_timeout_SOURCES = test_timeout.cpp +test_timeout2_SOURCES = test_timeout2.cpp + if !ON_MINGW test_shutdown_stress_SOURCES = test_shutdown_stress.cpp test_pair_ipc_SOURCES = test_pair_ipc.cpp testutil.hpp diff --git a/tests/test_timeout.cpp b/tests/test_timeout.cpp new file mode 100644 index 0000000..adf633d --- /dev/null +++ b/tests/test_timeout.cpp @@ -0,0 +1,128 @@ +/* + Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include <assert.h> +#include <string.h> + +#include "../include/zmq.h" +#include <stdio.h> +#include <time.h> +#include <sys/types.h> +// Deprecated but portable method +#include <sys/timeb.h> +#define TIMEOUT 500 + +int elapsed_ms(struct timeb& start, struct timeb& end) { + return (end.time - start.time) * 1000 + (end.millitm - start.millitm); +} + +int main (int argc, char *argv []) { + + void *ctx = zmq_init (1); + assert (ctx); + + + const char* transport = "inproc://timeout_test"; + + void *sb = zmq_socket (ctx, ZMQ_PAIR); + assert (sb); + int rc = zmq_bind (sb, transport); + assert (rc == 0); + + // Recv part + + char buf [32]; + rc = zmq_recv (sb, buf, 32, ZMQ_DONTWAIT); + assert (rc == -1); + assert (zmq_errno() == EAGAIN); + + int timeout = TIMEOUT; + size_t timeout_size = sizeof timeout; + rc = zmq_setsockopt(sb, ZMQ_RCVTIMEO, &timeout, timeout_size); + assert (rc == 0); + + timeout = -1; + rc = zmq_getsockopt(sb, ZMQ_RCVTIMEO, &timeout, &timeout_size); + assert (rc == 0); + assert (timeout_size == sizeof timeout); + assert (timeout == TIMEOUT); + + struct timeb start; + ftime (&start); + rc = zmq_recv (sb, buf, 32, 0); + struct timeb end; + ftime (&end); + assert (rc == -1); + assert (zmq_errno () == EAGAIN); + int elapsed = elapsed_ms(start, end); + assert (TIMEOUT*0.99 <= elapsed); + assert (elapsed <= TIMEOUT*1.01); + + // Send part + + rc = zmq_send (sb, buf, 32, ZMQ_DONTWAIT); + assert (rc == -1); + assert (zmq_errno() == EAGAIN); + + rc = zmq_setsockopt(sb, ZMQ_SNDTIMEO, &timeout, timeout_size); + assert (rc == 0); + + timeout = -1; + rc = zmq_getsockopt(sb, ZMQ_SNDTIMEO, &timeout, &timeout_size); + assert (rc == 0); + assert (timeout_size == sizeof timeout); + assert (timeout == TIMEOUT); + + ftime (&start); + rc = zmq_send (sb, buf, 32, 0); + ftime (&end); + assert (rc == -1); + assert (zmq_errno () == EAGAIN); + elapsed = elapsed_ms(start, end); + assert (TIMEOUT*0.99 <= elapsed); + assert (elapsed <= TIMEOUT*1.01); + + // Check everything is good otherwise + + void *sc = zmq_socket (ctx, ZMQ_PAIR); + assert (sc); + rc = zmq_connect (sc, transport); + assert (rc == 0); + + const char *content = "12345678ABCDEFGH12345678abcdefgh"; + + rc = zmq_send (sc, content, 32, 0); + assert (rc == 32); + + rc = zmq_recv (sb, buf, 32, 0); + assert (rc == 32); + assert (memcmp (buf, content, 32) == 0); + + rc = zmq_close (sc); + assert (rc == 0); + + rc = zmq_close (sb); + assert (rc == 0); + + rc = zmq_term (ctx); + assert (rc == 0); + + return 0 ; +} diff --git a/tests/test_timeout2.cpp b/tests/test_timeout2.cpp new file mode 100644 index 0000000..fe53018 --- /dev/null +++ b/tests/test_timeout2.cpp @@ -0,0 +1,135 @@ +/* + Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include <assert.h> +#include <string.h> + +#include "../include/zmq.h" +#include "../include/zmq_utils.h" +#include <stdio.h> +#include <time.h> +#include <sys/types.h> +// Deprecated but portable method +#include <sys/timeb.h> +#include <pthread.h> + +#define TIMEOUT 2000 + +const char* transport = "inproc://timeout2"; + +int elapsed_ms(struct timeb& start, struct timeb& end) { + return (end.time - start.time) * 1000 + (end.millitm - start.millitm); +} + +extern "C" { + void* worker(void *ctx) { + + void *sb = zmq_socket (ctx, ZMQ_PAIR); + assert (sb); + + int timeout = TIMEOUT; + int rc = zmq_setsockopt(sb, ZMQ_RCVTIMEO, &timeout, sizeof (timeout)); + assert (rc == 0); + rc = zmq_setsockopt(sb, ZMQ_SNDTIMEO, &timeout, sizeof (timeout)); + assert (rc == 0); + + int hwm = 1; + rc = zmq_setsockopt(sb, ZMQ_SNDHWM, &hwm, sizeof (hwm)); + assert (rc == 0); + + rc = zmq_bind (sb, transport); + assert (rc == 0); + + char buf [32]; + struct timeb start; + struct timeb end; + + ftime (&start); + rc = zmq_recv (sb, buf, 32, 0); + ftime (&end); + assert (rc == -1); + assert (zmq_errno () == EAGAIN); + int elapsed = elapsed_ms(start, end); + assert (TIMEOUT*0.99 <= elapsed); + assert (elapsed <= TIMEOUT*1.01); + + const char *content = "12345678ABCDEFGH12345678abcdefgh"; + do { + rc = zmq_send(sb, content, 32, ZMQ_DONTWAIT); + } while (rc == 32); + assert (rc == -1); + assert (errno == EAGAIN); + + ftime (&start); + rc = zmq_send (sb, content, 32, 0); + ftime (&end); + assert (rc == -1); + assert (zmq_errno () == EAGAIN); + elapsed = elapsed_ms(start, end); + assert (TIMEOUT*0.99 <= elapsed); + assert (elapsed <= TIMEOUT*1.01); + + rc = zmq_close (sb); + assert (rc == 0); + + return NULL; + } +} + +int main (int argc, char *argv []) { + + void *ctx = zmq_init (1); + assert (ctx); + + pthread_t thread; + int rc = pthread_create (&thread, NULL, worker, ctx); + assert (rc == 0); + + // let it go a little bit + zmq_sleep (1); + + // Connecting to the transport will send a message to the mailbox + // of the subscriber but shouldn't the modified the timeout time. + void *sc = zmq_socket (ctx, ZMQ_PAIR); + assert (sc); + // set hwm to 1 for zmq_send test. + int hwm = 1; + rc = zmq_setsockopt (sc, ZMQ_RCVHWM, &hwm, sizeof (hwm)); + assert (rc == 0); + rc = zmq_connect (sc, transport); + assert (rc == 0); + + // sleep again... + zmq_sleep (2); + + // the worker should now be in blocking send. + // create a second message by closing the socket. + rc = zmq_close (sc); + assert (rc == 0); + + rc = pthread_join (thread, NULL); + assert (rc == 0); + + + rc = zmq_term (ctx); + assert (rc == 0); + + return 0 ; +} -- 1.7.5.3
_______________________________________________ zeromq-dev mailing list [email protected] http://lists.zeromq.org/mailman/listinfo/zeromq-dev
