>From c9ee04f04da42e2f7149c35aff78c932918b1e11 Mon Sep 17 00:00:00 2001 From: Martin Sustrik <[email protected]> Date: Sun, 10 Jul 2011 08:03:32 +0200 Subject: [PATCH] Implementation of zmq_device() re-introduced
Signed-off-by: Martin Sustrik <[email protected]> --- doc/Makefile.am | 3 +- doc/zmq_device.txt | 83 +++++++++++++++++++++++++++++++++++++++++++++ include/zmq.h | 6 +++ src/zmq.cpp | 94 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 185 insertions(+), 1 deletions(-) create mode 100644 doc/zmq_device.txt diff --git a/doc/Makefile.am b/doc/Makefile.am index eec7209..4349464 100644 --- a/doc/Makefile.am +++ b/doc/Makefile.am @@ -2,7 +2,8 @@ MAN3 = zmq_bind.3 zmq_close.3 zmq_connect.3 zmq_init.3 \ zmq_msg_close.3 zmq_msg_copy.3 zmq_msg_data.3 zmq_msg_init.3 \ zmq_msg_init_data.3 zmq_msg_init_size.3 zmq_msg_move.3 zmq_msg_size.3 \ zmq_poll.3 zmq_recv.3 zmq_send.3 zmq_setsockopt.3 zmq_socket.3 \ - zmq_strerror.3 zmq_term.3 zmq_version.3 zmq_getsockopt.3 zmq_errno.3 + zmq_strerror.3 zmq_term.3 zmq_version.3 zmq_getsockopt.3 zmq_errno.3 \ + zmq_device.3 MAN7 = zmq.7 zmq_tcp.7 zmq_pgm.7 zmq_epgm.7 zmq_inproc.7 zmq_ipc.7 MAN_DOC = $(MAN1) $(MAN3) $(MAN7) diff --git a/doc/zmq_device.txt b/doc/zmq_device.txt new file mode 100644 index 0000000..f20c0df --- /dev/null +++ b/doc/zmq_device.txt @@ -0,0 +1,83 @@ +zmq_device(3) +============= + +NAME +---- +zmq_device - start a simple built-in 0MQ device + + +SYNOPSIS +-------- +*int zmq_device (void *s1, void *s2);* + + +DESCRIPTION +----------- +The _zmq_device()_ function starts a simple built-in 0MQ device. + +The device connects the two sockets supplied. Message from one are passed to +the another and vice versa. + +Before calling _zmq_device()_ you must set any socket options, and connect or +bind both sockets. + +_zmq_device()_ runs in the current thread and returns only if/when the current +context is closed or an error occurs. + + +RETURN VALUE +------------ +The _zmq_device()_ function is an infinite loop and returns only in the case +of error. It returns `-1` and set 'errno' to one of the values defined below. + +ERRORS +------ +*ETERM*:: +The 0MQ 'context' associated with the sockets was terminated. +*EINTR*:: +The device was interrupted by delivery of a signal. + + +EXAMPLE +------- +.Creating a request/reply broker +---- +// Create frontend and backend sockets +void *frontend = zmq_socket (context, ZMQ_XREP); +assert (backend); +void *backend = zmq_socket (context, ZMQ_XREQ); +assert (frontend); +// Bind both sockets to TCP ports +assert (zmq_bind (frontend, "tcp://*:5555") == 0); +assert (zmq_bind (backend, "tcp://*:5556") == 0); +// Start a queue device +zmq_device (frontend, backend); +---- + + +SEE ALSO +-------- +linkzmq:zmq_bind[3] +linkzmq:zmq_connect[3] +linkzmq:zmq_socket[3] +linkzmq:zmq[7] + + +AUTHORS +------- +This 0MQ manual page was written by Pieter Hintjens <[email protected]> + + +RESOURCES +--------- +Main web site: <http://www.zeromq.org/> + +Report bugs to the 0MQ development mailing list: <[email protected]> + + +COPYING +------- +Free use of this software is granted under the terms of the GNU Lesser General +Public License (LGPL). For details see the files `COPYING` and `COPYING.LESSER` +included with the 0MQ distribution. + diff --git a/include/zmq.h b/include/zmq.h index 7de421b..2c77837 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -227,6 +227,12 @@ typedef struct ZMQ_EXPORT int zmq_poll (zmq_pollitem_t *items, int nitems, long timeout); +/******************************************************************************/ +/* Simple in-process device. */ +/******************************************************************************/ + +ZMQ_EXPORT int zmq_device (void *s1, void *s2); + #undef ZMQ_EXPORT #ifdef __cplusplus diff --git a/src/zmq.cpp b/src/zmq.cpp index 2fa60a2..f2ee91e 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -685,6 +685,100 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) #undef ZMQ_POLL_BASED_ON_POLL #endif +int zmq_device (void *s1_, void *s2_) +{ + // TODO: The function doesn't account for send blocked because of + // SNDHWM have been reached. + + zmq_msg_t msg; + int rc = zmq_msg_init (&msg); + + if (rc != 0) { + return -1; + } + + int label; + size_t labelsz; + int more; + size_t moresz; + + zmq_pollitem_t items [2]; + items [0].socket = s1_; + items [0].fd = 0; + items [0].events = ZMQ_POLLIN; + items [0].revents = 0; + items [1].socket = s2_; + items [1].fd = 0; + items [1].events = ZMQ_POLLIN; + items [1].revents = 0; + + while (true) { + + // Wait while there are either requests or replies to process. + rc = zmq_poll (&items [0], 2, -1); + if (unlikely (rc < 0)) { + return -1; + } + + // Pass a message from s1 to s2. + if (items [0].revents & ZMQ_POLLIN) { + while (true) { + rc = zmq_recvmsg (s1_, &msg, 0); + if (unlikely (rc < 0)) { + return -1; + } + moresz = sizeof (more); + rc = zmq_getsockopt (s1_, ZMQ_RCVMORE, &more, &moresz); + if (unlikely (rc < 0)) { + return -1; + } + labelsz = sizeof (label); + rc = zmq_getsockopt (s1_, ZMQ_RCVLABEL, &label, &labelsz); + if (unlikely (rc < 0)) { + return -1; + } + rc = zmq_sendmsg (s2_, &msg, (more ? ZMQ_SNDMORE : 0) | + (label ? ZMQ_SNDLABEL : 0)); + if (unlikely (rc < 0)) { + return -1; + } + if (!more) + break; + } + } + + // Pass a message from s2 to s1. + if (items [1].revents & ZMQ_POLLIN) { + while (true) { + rc = zmq_recvmsg (s2_, &msg, 0); + if (unlikely (rc < 0)) { + return -1; + } + moresz = sizeof (more); + rc = zmq_getsockopt (s2_, ZMQ_RCVMORE, &more, &moresz); + if (unlikely (rc < 0)) { + return -1; + } + labelsz = sizeof (label); + rc = zmq_getsockopt (s2_, ZMQ_RCVLABEL, &label, &labelsz); + if (unlikely (rc < 0)) { + return -1; + } + rc = zmq_sendmsg (s1_, &msg, (more ? ZMQ_SNDMORE : 0) | + (label ? ZMQ_SNDLABEL : 0)); + if (unlikely (rc < 0)) { + return -1; + } + if (!more) + break; + } + } + + } + + return 0; +} + int zmq_errno () { return errno; -- 1.7.0.4
_______________________________________________ zeromq-dev mailing list [email protected] http://lists.zeromq.org/mailman/listinfo/zeromq-dev
