And another update where I removed the block parameter; This way, setting the timeout value to 0 would behave always like a non-blocking call (I hesitate to do it first but can't see why it shouldn't be otherwise).
Fabien 2011/6/5 Fabien Niñoles <[email protected]>: > Sorry, I forget to update my patch file, here the good one. > > 2011/6/5 Fabien Niñoles <[email protected]>: >> Sorry for the long time before sending back a new patch, but here it >> is. The basic changes are I rename ZMQ_TIMEOUT to ZMQ_RCVTIMEO and >> ZMQ_SNDTIMEO and I used polling instead of a busy loop in the >> mailbox_t::recv method. Two things I'm not sure: >> >> - I use zmq_poll since it abstracts nicely the different poll >> implementation but this is an external API and it is usually bad >> practices to do it so. It's probably far worse to duplicate such >> complex piece of code, so I kept it this way unless you tell me >> otherwise. >> >> - The windows version of mailbox_t::recv only differ in the error >> handling (wsa_assert/SOCKET_ERROR instead of errno_assert/-1) and I >> merged them for this reason. Shouldn't we have an abstraction for this >> instead of the ugly ZMQ_HAS_WINDOWS #ifdef ? >> >> Thanks, >> Fabien >> >> 2011/5/23 Fabien Niñoles <[email protected]>: >>> OK, I would go with a little more specs here, to be more in line with >>> the normal socket behavior. >>> >>> - Change the ZMQ_TIMEOUT for ZMQ_RCVTIMEO and ZMQ_SNDTIMEO. >>> - Value is an uint32 in ms (to be compatible with windows and kept the >>> implementation simple). >>> - Default to 0, which would mean block infinitely. >>> - On timeout, return EAGAIN. >>> >>> Everything's ok with you ? >>> >>> Fabien >>> >> >
From 37ea0fdbdd57f018977f5422b75b20bef7924a82 Mon Sep 17 00:00:00 2001 From: Fabien Ninoles <[email protected]> Date: Mon, 23 May 2011 13:33:12 -0400 Subject: [PATCH] Add sockopt ZMQ_RCVTIMEO/ZMQ_SNDTIMEO. - Add doc and tests. - Add options and setup. - Wait using zmq_pool in mailbox_t::recv. (Windows version untested). Signed-off-by: Fabien Ninoles <[email protected]> --- .gitignore | 1 + doc/zmq_getsockopt.txt | 36 +++++++++++++++ doc/zmq_setsockopt.txt | 36 +++++++++++++++ include/zmq.h | 2 + src/ctx.cpp | 2 +- src/io_thread.cpp | 2 +- src/mailbox.cpp | 115 +++++++++++++++++------------------------------- src/mailbox.hpp | 2 +- src/options.cpp | 36 +++++++++++++++- src/options.hpp | 4 ++ src/reaper.cpp | 2 +- src/socket_base.cpp | 33 +++++++------- src/socket_base.hpp | 2 +- tests/Makefile.am | 5 ++- tests/test_timeout.cpp | 85 +++++++++++++++++++++++++++++++++++ 15 files changed, 265 insertions(+), 98 deletions(-) create mode 100644 tests/test_timeout.cpp diff --git a/.gitignore b/.gitignore index b309c73..abde4cd 100644 --- a/.gitignore +++ b/.gitignore @@ -57,3 +57,4 @@ foreign/openpgm/* !foreign/openpgm/Makefile.am zeromq-*.tar.gz zeromq-*.zip +tests/test_timeout diff --git a/doc/zmq_getsockopt.txt b/doc/zmq_getsockopt.txt index 97b4032..4450544 100644 --- a/doc/zmq_getsockopt.txt +++ b/doc/zmq_getsockopt.txt @@ -357,6 +357,42 @@ Default value:: N/A Applicable socket types:: all +ZMQ_RCVTIMEO: Maximum time before a socket operation returns with EAGAIN +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Retrieve the timeout for recv operation on the socket. The value is +interpreted the same way as the timeout argument of _zmq_poll(3)_: If +the value is `0`, the operation shall return immediately, with a +EAGAIN error if no events were present. If the value is `-1`, the +operation shall block indefinitely until an event has occurred on the +socket. For all other values, the operation will wait that value +before returning with an EAGAIN error. + +[horizontal] +Option value type:: int +Option value unit:: milliseconds +Default value:: -1 (infinite) +Applicable socket types:: all + + +ZMQ_SNDTIMEO: Maximum time before a socket operation returns with EAGAIN +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Retrieve the timeout for send operation on the socket. The value is +interpreted the same way as the timeout argument of _zmq_poll(3)_: If +the value is `0`, the operation shall return immediately, with a +EAGAIN error if no events were present. If the value is `-1`, the +operation shall block indefinitely until an event has occurred on the +socket. For all other values, the operation will wait that value +before returning with an EAGAIN error. + +[horizontal] +Option value type:: int +Option value unit:: milliseconds +Default value:: -1 (infinite) +Applicable socket types:: all + + RETURN VALUE ------------ The _zmq_getsockopt()_ function shall return zero if successful. Otherwise it diff --git a/doc/zmq_setsockopt.txt b/doc/zmq_setsockopt.txt index ed3b3a7..a89a7f7 100644 --- a/doc/zmq_setsockopt.txt +++ b/doc/zmq_setsockopt.txt @@ -309,6 +309,42 @@ Option value unit:: network hops Default value:: 1 Applicable socket types:: all, when using multicast transports + +ZMQ_RCVTIMEO: Maximum time before a recv operation returns with EAGAIN +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Sets the timeout for recv operation on the socket. The value is +interpreted the same way as the timeout argument of _zmq_poll(3)_: If +the value is `0`, the operation shall return immediately, with a +EAGAIN error if no events were present. If the value is `-1`, the +operation shall block indefinitely until an event has occurred on the +socket. For all other values, the operation will wait that value +before returning with an EAGAIN error. + +[horizontal] +Option value type:: int +Option value unit:: milliseconds +Default value:: -1 (infinite) +Applicable socket types:: all + +ZMQ_SNDTIMEO: Maximum time before a send operation returns with EAGAIN +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Sets the timeout for send operation on the socket. The value is +interpreted the same way as the timeout argument of _zmq_poll(3)_: If +the value is `0`, the operation shall return immediately, with a +EAGAIN error if no events were present. If the value is `-1`, the +operation shall block indefinitely until an event has occurred on the +socket. For all other values, the operation will wait that value +before returning with an EAGAIN error. + +[horizontal] +Option value type:: int +Option value unit:: milliseconds +Default value:: -1 (infinite) +Applicable socket types:: all + + RETURN VALUE ------------ The _zmq_setsockopt()_ function shall return zero if successful. Otherwise it diff --git a/include/zmq.h b/include/zmq.h index 40dffd9..5f0fec6 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -180,6 +180,8 @@ ZMQ_EXPORT int zmq_term (void *context); #define ZMQ_SNDHWM 23 #define ZMQ_RCVHWM 24 #define ZMQ_MULTICAST_HOPS 25 +#define ZMQ_RCVTIMEO 26 +#define ZMQ_SNDTIMEO 27 /* Send/recv options. */ #define ZMQ_DONTWAIT 1 diff --git a/src/ctx.cpp b/src/ctx.cpp index fb5420d..783bcba 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -142,7 +142,7 @@ int zmq::ctx_t::terminate () // Wait till reaper thread closes all the sockets. command_t cmd; - int rc = term_mailbox.recv (&cmd, true); + int rc = term_mailbox.recv (&cmd, -1); if (rc == -1 && errno == EINTR) return -1; zmq_assert (rc == 0); diff --git a/src/io_thread.cpp b/src/io_thread.cpp index 9678392..c6f3880 100644 --- a/src/io_thread.cpp +++ b/src/io_thread.cpp @@ -70,7 +70,7 @@ void zmq::io_thread_t::in_event () // Get the next command. If there is none, exit. command_t cmd; - int rc = mailbox.recv (&cmd, false); + int rc = mailbox.recv (&cmd, 0); if (rc != 0 && errno == EINTR) continue; if (rc != 0 && errno == EAGAIN) diff --git a/src/mailbox.cpp b/src/mailbox.cpp index 221396b..85b288e 100644 --- a/src/mailbox.cpp +++ b/src/mailbox.cpp @@ -23,6 +23,7 @@ #include "err.hpp" #include "fd.hpp" #include "ip.hpp" +#include "clock.hpp" #if defined ZMQ_HAVE_WINDOWS #include "windows.hpp" @@ -36,6 +37,9 @@ #include <sys/socket.h> #endif +#include <pthread.h> +#include <poller.hpp> + zmq::fd_t zmq::mailbox_t::get_fd () { return r; @@ -79,44 +83,6 @@ void zmq::mailbox_t::send (const command_t &cmd_) zmq_assert (nbytes == sizeof (command_t)); } -int zmq::mailbox_t::recv (command_t *cmd_, bool block_) -{ - // If required, set the reader to blocking mode. - if (block_) { - unsigned long argp = 0; - int rc = ioctlsocket (r, FIONBIO, &argp); - wsa_assert (rc != SOCKET_ERROR); - } - - // Attempt to read an entire command. Returns EAGAIN if non-blocking - // and a command is not available. Save value of errno if we wish to pass - // it to caller. - int err = 0; - int nbytes = ::recv (r, (char *)cmd_, sizeof (command_t), 0); - if (nbytes == -1 && WSAGetLastError () == WSAEWOULDBLOCK) - err = EAGAIN; - - // Re-set the reader to non-blocking mode. - if (block_) { - unsigned long argp = 1; - int rc = ioctlsocket (r, FIONBIO, &argp); - wsa_assert (rc != SOCKET_ERROR); - } - - // If the recv failed, return with the saved errno. - if (err != 0) { - errno = err; - return -1; - } - - // Sanity check for success. - wsa_assert (nbytes != SOCKET_ERROR); - - // Check whether we haven't got half of command. - zmq_assert (nbytes == sizeof (command_t)); - - return 0; -} #else @@ -139,13 +105,11 @@ zmq::mailbox_t::mailbox_t () rc = fcntl (w, F_SETFL, flags | O_NONBLOCK); errno_assert (rc == 0); -#ifndef MSG_DONTWAIT // Set the reader to non-blocking mode. flags = fcntl (r, F_GETFL, 0); errno_assert (flags >= 0); rc = fcntl (r, F_SETFL, flags | O_NONBLOCK); errno_assert (rc == 0); -#endif } zmq::mailbox_t::~mailbox_t () @@ -194,52 +158,55 @@ void zmq::mailbox_t::send (const command_t &cmd_) zmq_assert (nbytes == sizeof (command_t)); } -int zmq::mailbox_t::recv (command_t *cmd_, bool block_) +#endif + +int zmq::mailbox_t::recv (command_t *cmd_, int timeout_) { -#ifdef MSG_DONTWAIT + ssize_t nbytes = ::recv (r, cmd_, sizeof (command_t), 0); - // Attempt to read an entire command. Returns EAGAIN if non-blocking - // mode is requested and a command is not available. - ssize_t nbytes = ::recv (r, cmd_, sizeof (command_t), - block_ ? 0 : MSG_DONTWAIT); - if (nbytes == -1 && (errno == EAGAIN || errno == EINTR)) - return -1; +#ifdef ZMQ_HAVE_WINDOWS + if (nbytes != SOCKET_ERROR) { #else + if (nbytes != -1) { +#endif + // Sanity check for success. + zmq_assert (nbytes == sizeof (command_t)); - // If required, set the reader to blocking mode. - if (block_) { - int flags = fcntl (r, F_GETFL, 0); - errno_assert (flags >= 0); - int rc = fcntl (r, F_SETFL, flags & ~O_NONBLOCK); - errno_assert (rc == 0); + return 0; } - // Attempt to read an entire command. Returns EAGAIN if non-blocking - // and a command is not available. Save value of errno if we wish to pass - // it to caller. - int err = 0; - ssize_t nbytes = ::recv (r, cmd_, sizeof (command_t), 0); - if (nbytes == -1 && (errno == EAGAIN || errno == EINTR)) - err = errno; - - // Re-set the reader to non-blocking mode. - if (block_) { - int flags = fcntl (r, F_GETFL, 0); - errno_assert (flags >= 0); - int rc = fcntl (r, F_SETFL, flags | O_NONBLOCK); - errno_assert (rc == 0); + if (timeout_ == 0) { +#ifdef ZMQ_HAVE_WINDOWS + // Winsock error conversion + if (WSAGetLastError () == WSAEWOULDBLOCK) + errno = EAGAIN; +#endif + return -1; } - // If the recv failed, return with the saved errno if set. - if (err != 0) { - errno = err; + // Elsewhere, poll + zmq_pollitem_t item; + memset(&item, 0, sizeof item); + item.fd = r; + item.events = ZMQ_POLLIN; + + int rc = zmq_poll (&item, 1, timeout_); + if (rc == 0) + errno = EAGAIN; + if (rc <= 0) return -1; - } + + zmq_assert (rc == 1); + zmq_assert (item.revents == ZMQ_POLLIN); -#endif + nbytes = ::recv (r, cmd_, sizeof (command_t), 0); // Sanity check for success. +#ifdef ZMQ_HAVE_WINDOWS + wsa_assert (nbytes != SOCKET_ERROR); +#else errno_assert (nbytes != -1); +#endif // Check whether we haven't got half of command. zmq_assert (nbytes == sizeof (command_t)); @@ -247,8 +214,6 @@ int zmq::mailbox_t::recv (command_t *cmd_, bool block_) return 0; } -#endif - int zmq::mailbox_t::make_socketpair (fd_t *r_, fd_t *w_) { #if defined ZMQ_HAVE_WINDOWS diff --git a/src/mailbox.hpp b/src/mailbox.hpp index 96bf4eb..5867764 100644 --- a/src/mailbox.hpp +++ b/src/mailbox.hpp @@ -41,7 +41,7 @@ namespace zmq fd_t get_fd (); void send (const command_t &cmd_); - int recv (command_t *cmd_, bool block_); + int recv (command_t *cmd_, int timeout_); private: diff --git a/src/options.cpp b/src/options.cpp index 897e0f5..9a03849 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -40,7 +40,9 @@ zmq::options_t::options_t () : maxmsgsize (-1), requires_in (false), requires_out (false), - immediate_connect (true) + immediate_connect (true), + recv_timeout (-1), + send_timeout (-1) { } @@ -174,6 +176,21 @@ int zmq::options_t::setsockopt (int option_, const void *optval_, multicast_hops = *((int*) optval_); return 0; + case ZMQ_RCVTIMEO: + if (optvallen_ != sizeof (int)) { + errno = EINVAL; + return -1; + } + recv_timeout = *((int*) optval_); + return 0; + + case ZMQ_SNDTIMEO: + if (optvallen_ != sizeof (int)) { + errno = EINVAL; + return -1; + } + send_timeout = *((int*) optval_); + return 0; } errno = EINVAL; @@ -319,6 +336,23 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_) *optvallen_ = sizeof (int); return 0; + case ZMQ_RCVTIMEO: + if (*optvallen_ < sizeof (int)) { + errno = EINVAL; + return -1; + } + *((int*) optval_) = recv_timeout; + *optvallen_ = sizeof (int); + return 0; + + case ZMQ_SNDTIMEO: + if (*optvallen_ < sizeof (int)) { + errno = EINVAL; + return -1; + } + *((int*) optval_) = send_timeout; + *optvallen_ = sizeof (int); + return 0; } errno = EINVAL; diff --git a/src/options.hpp b/src/options.hpp index 53d0197..d6f6fd4 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -85,6 +85,10 @@ namespace zmq // is not aware of the peer's identity, however, it is able to send // messages straight away. bool immediate_connect; + + // The default timeout to wait on send/recv operation for this socket. + int recv_timeout; + int send_timeout; }; } diff --git a/src/reaper.cpp b/src/reaper.cpp index d3ebbba..f94f7c1 100644 --- a/src/reaper.cpp +++ b/src/reaper.cpp @@ -61,7 +61,7 @@ void zmq::reaper_t::in_event () // Get the next command. If there is none, exit. command_t cmd; - int rc = mailbox.recv (&cmd, false); + int rc = mailbox.recv (&cmd, 0); if (rc != 0 && errno == EINTR) continue; if (rc != 0 && errno == EAGAIN) diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 3e104a8..460490c 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -276,7 +276,7 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_, errno = EINVAL; return -1; } - int rc = process_commands (false, false); + int rc = process_commands (0, false); if (rc != 0 && (errno == EINTR || errno == ETERM)) return -1; errno_assert (rc == 0); @@ -478,7 +478,7 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_) } // Process pending commands, if any. - int rc = process_commands (false, true); + int rc = process_commands (0, true); if (unlikely (rc != 0)) return -1; @@ -501,7 +501,7 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_) while (rc != 0) { if (errno != EAGAIN) return -1; - if (unlikely (process_commands (true, false) != 0)) + if (unlikely (process_commands (options.send_timeout, false) != 0)) return -1; rc = xsend (msg_, flags_); } @@ -535,7 +535,7 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_) // described above) from the one used by 'send'. This is because counting // ticks is more efficient than doing RDTSC all the time. if (++ticks == inbound_poll_rate) { - if (unlikely (process_commands (false, false) != 0)) + if (unlikely (process_commands (0, false) != 0)) return -1; ticks = 0; } @@ -553,12 +553,12 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_) // If the message cannot be fetched immediately, there are two scenarios. // For non-blocking recv, commands are processed in case there's an - // activate_reader command already waiting int a command pipe. + // activate_reader command already waiting in a command pipe. // If it's not, return EAGAIN. if (flags_ & ZMQ_DONTWAIT) { if (errno != EAGAIN) return -1; - if (unlikely (process_commands (false, false) != 0)) + if (unlikely (process_commands (0, false) != 0)) return -1; ticks = 0; @@ -573,15 +573,15 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_) // In blocking scenario, commands are processed over and over again until // we are able to fetch a message. - bool block = (ticks != 0); + int timeout = (ticks != 0) ? options.recv_timeout : 0; while (rc != 0) { if (errno != EAGAIN) return -1; - if (unlikely (process_commands (block, false) != 0)) + if (unlikely (process_commands (timeout, false) != 0)) return -1; rc = xrecv (msg_, flags_); ticks = 0; - block = true; + timeout = options.recv_timeout; } rcvmore = msg_->flags () & msg_t::more; @@ -655,17 +655,18 @@ void zmq::socket_base_t::start_reaping (poller_t *poller_) poller->set_pollin (handle); } -int zmq::socket_base_t::process_commands (bool block_, bool throttle_) +int zmq::socket_base_t::process_commands (int timeout_, bool throttle_) { int rc; command_t cmd; - if (block_) { - rc = mailbox.recv (&cmd, true); - if (rc == -1 && errno == EINTR) + if (timeout_ != 0) { + rc = mailbox.recv (&cmd, timeout_); + if (rc == -1 && (errno == EINTR || errno == EAGAIN)) return -1; errno_assert (rc == 0); } else { + zmq_assert (timeout_ == 0); // Get the CPU's tick counter. If 0, the counter is not available. uint64_t tsc = zmq::clock_t::rdtsc (); @@ -687,7 +688,7 @@ int zmq::socket_base_t::process_commands (bool block_, bool throttle_) } // Check whether there are any commands pending for this thread. - rc = mailbox.recv (&cmd, false); + rc = mailbox.recv (&cmd, 0); } // Process all the commands available at the moment. @@ -698,7 +699,7 @@ int zmq::socket_base_t::process_commands (bool block_, bool throttle_) return -1; errno_assert (rc == 0); cmd.destination->process_command (cmd); - rc = mailbox.recv (&cmd, false); + rc = mailbox.recv (&cmd, 0); } if (ctx_terminated) { @@ -777,7 +778,7 @@ void zmq::socket_base_t::in_event () { // Process any commands from other threads/sockets that may be available // at the moment. Ultimately, socket will be destroyed. - process_commands (false, false); + process_commands (0, false); check_destroy (); } diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 0a5c574..82745e2 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -164,7 +164,7 @@ namespace zmq // set to true, returns only after at least one command was processed. // If throttle argument is true, commands are processed at most once // in a predefined time period. - int process_commands (bool block_, bool throttle_); + int process_commands (int timeout_, bool throttle_); // Handlers for incoming commands. void process_stop (); diff --git a/tests/Makefile.am b/tests/Makefile.am index ebbc46c..c00d7ce 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -5,7 +5,8 @@ noinst_PROGRAMS = test_pair_inproc \ test_pair_tcp \ test_reqrep_inproc \ test_reqrep_tcp \ - test_hwm + test_hwm \ + test_timeout if !ON_MINGW noinst_PROGRAMS += test_shutdown_stress \ @@ -21,6 +22,8 @@ test_reqrep_tcp_SOURCES = test_reqrep_tcp.cpp testutil.hpp test_hwm_SOURCES = test_hwm.cpp +test_timeout_SOURCES = test_timeout.cpp + if !ON_MINGW test_shutdown_stress_SOURCES = test_shutdown_stress.cpp test_pair_ipc_SOURCES = test_pair_ipc.cpp testutil.hpp diff --git a/tests/test_timeout.cpp b/tests/test_timeout.cpp new file mode 100644 index 0000000..6d94dd4 --- /dev/null +++ b/tests/test_timeout.cpp @@ -0,0 +1,85 @@ +/* + Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include <assert.h> +#include <string.h> + +#include "../include/zmq.h" + + +int main (int argc, char *argv []) +{ + + void *ctx = zmq_init (1); + assert (ctx); + + + const char* transport = "inproc://timeout_test"; + + void *sb = zmq_socket (ctx, ZMQ_REP); + assert (sb); + int rc = zmq_bind (sb, transport); + assert (rc == 0); + + void *sc = zmq_socket (ctx, ZMQ_REQ); + assert (sc); + rc = zmq_connect (sc, transport); + assert (rc == 0); + + char buf [32]; + rc = zmq_recv (sb, buf, 32, ZMQ_DONTWAIT); + assert (rc == -1); + assert (zmq_errno() == EAGAIN); + + int timeout = 500; + size_t timeout_size = sizeof timeout; + rc = zmq_setsockopt(sb, ZMQ_RCVTIMEO, &timeout, timeout_size); + assert (rc == 0); + + timeout = -1; + rc = zmq_getsockopt(sb, ZMQ_RCVTIMEO, &timeout, &timeout_size); + assert (rc == 0); + assert (timeout_size == sizeof timeout); + assert (timeout == 500); + + rc = zmq_recv (sb, buf, 32, 0); + assert (rc == -1); + assert (zmq_errno() == EAGAIN); + + const char *content = "12345678ABCDEFGH12345678abcdefgh"; + + rc = zmq_send (sc, content, 32, 0); + assert (rc == 32); + + rc = zmq_recv (sb, buf, 32, 0); + assert (rc == 32); + assert (memcmp (buf, content, 32) == 0); + + rc = zmq_close (sc); + assert (rc == 0); + + rc = zmq_close (sb); + assert (rc == 0); + + rc = zmq_term (ctx); + assert (rc == 0); + + return 0 ; +} -- 1.7.4.4
_______________________________________________ zeromq-dev mailing list [email protected] http://lists.zeromq.org/mailman/listinfo/zeromq-dev
