>From c9ee04f04da42e2f7149c35aff78c932918b1e11 Mon Sep 17 00:00:00 2001
From: Martin Sustrik <[email protected]>
Date: Sun, 10 Jul 2011 08:03:32 +0200
Subject: [PATCH] Implementation of zmq_device() re-introduced

Signed-off-by: Martin Sustrik <[email protected]>
---
 doc/Makefile.am    |    3 +-
 doc/zmq_device.txt |   83 +++++++++++++++++++++++++++++++++++++++++++++
 include/zmq.h      |    6 +++
 src/zmq.cpp        |   94 ++++++++++++++++++++++++++++++++++++++++++++++++++++
 4 files changed, 185 insertions(+), 1 deletions(-)
 create mode 100644 doc/zmq_device.txt

diff --git a/doc/Makefile.am b/doc/Makefile.am
index eec7209..4349464 100644
--- a/doc/Makefile.am
+++ b/doc/Makefile.am
@@ -2,7 +2,8 @@ MAN3 = zmq_bind.3 zmq_close.3 zmq_connect.3 zmq_init.3 \
     zmq_msg_close.3 zmq_msg_copy.3 zmq_msg_data.3 zmq_msg_init.3 \
     zmq_msg_init_data.3 zmq_msg_init_size.3 zmq_msg_move.3 zmq_msg_size.3 \
     zmq_poll.3 zmq_recv.3 zmq_send.3 zmq_setsockopt.3 zmq_socket.3 \
-    zmq_strerror.3 zmq_term.3 zmq_version.3 zmq_getsockopt.3 zmq_errno.3
+    zmq_strerror.3 zmq_term.3 zmq_version.3 zmq_getsockopt.3 zmq_errno.3 \
+    zmq_device.3
 MAN7 = zmq.7 zmq_tcp.7 zmq_pgm.7 zmq_epgm.7 zmq_inproc.7 zmq_ipc.7
 
 MAN_DOC = $(MAN1) $(MAN3) $(MAN7)
diff --git a/doc/zmq_device.txt b/doc/zmq_device.txt
new file mode 100644
index 0000000..f20c0df
--- /dev/null
+++ b/doc/zmq_device.txt
@@ -0,0 +1,83 @@
+zmq_device(3)
+=============
+
+NAME
+----
+zmq_device - start a simple built-in 0MQ device
+
+
+SYNOPSIS
+--------
+*int zmq_device (void *s1, void *s2);*
+
+
+DESCRIPTION
+-----------
+The _zmq_device()_ function starts a simple built-in 0MQ device.
+
+The device connects the two sockets supplied. Message from one are passed to
+the another and vice versa.
+
+Before calling _zmq_device()_ you must set any socket options, and connect or
+bind both sockets.
+
+_zmq_device()_ runs in the current thread and returns only if/when the current
+context is closed or an error occurs.
+
+
+RETURN VALUE
+------------
+The _zmq_device()_ function is an infinite loop and returns only in the case
+of error. It returns `-1` and set 'errno' to one of the values defined below.
+
+ERRORS
+------
+*ETERM*::
+The 0MQ 'context' associated with the sockets was terminated.
+*EINTR*::
+The device was interrupted by delivery of a signal.
+
+
+EXAMPLE
+-------
+.Creating a request/reply broker
+----
+//  Create frontend and backend sockets
+void *frontend = zmq_socket (context, ZMQ_XREP);
+assert (backend);
+void *backend = zmq_socket (context, ZMQ_XREQ);
+assert (frontend);
+//  Bind both sockets to TCP ports
+assert (zmq_bind (frontend, "tcp://*:5555") == 0);
+assert (zmq_bind (backend, "tcp://*:5556") == 0);
+//  Start a queue device
+zmq_device (frontend, backend);
+----
+
+
+SEE ALSO
+--------
+linkzmq:zmq_bind[3]
+linkzmq:zmq_connect[3]
+linkzmq:zmq_socket[3]
+linkzmq:zmq[7]
+
+
+AUTHORS
+-------
+This 0MQ manual page was written by Pieter Hintjens <[email protected]>
+
+
+RESOURCES
+---------
+Main web site: <http://www.zeromq.org/>
+
+Report bugs to the 0MQ development mailing list: <[email protected]>
+
+
+COPYING
+-------
+Free use of this software is granted under the terms of the GNU Lesser General
+Public License (LGPL). For details see the files `COPYING` and `COPYING.LESSER`
+included with the 0MQ distribution.
+
diff --git a/include/zmq.h b/include/zmq.h
index 7de421b..2c77837 100644
--- a/include/zmq.h
+++ b/include/zmq.h
@@ -227,6 +227,12 @@ typedef struct
 
 ZMQ_EXPORT int zmq_poll (zmq_pollitem_t *items, int nitems, long timeout);
 
+/******************************************************************************/
+/*  Simple in-process device.                                                 */
+/******************************************************************************/
+
+ZMQ_EXPORT int zmq_device (void *s1, void *s2);
+
 #undef ZMQ_EXPORT
 
 #ifdef __cplusplus
diff --git a/src/zmq.cpp b/src/zmq.cpp
index 2fa60a2..f2ee91e 100644
--- a/src/zmq.cpp
+++ b/src/zmq.cpp
@@ -685,6 +685,100 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
 #undef ZMQ_POLL_BASED_ON_POLL
 #endif
 
+int zmq_device (void *s1_, void *s2_)
+{
+    //  TODO: The function doesn't account for send blocked because of
+    //  SNDHWM have been reached.
+
+    zmq_msg_t msg;
+    int rc = zmq_msg_init (&msg);
+
+    if (rc != 0) {
+        return -1;
+    }
+
+    int label;
+    size_t labelsz;
+    int more;
+    size_t moresz;
+
+    zmq_pollitem_t items [2];
+    items [0].socket = s1_;
+    items [0].fd = 0;
+    items [0].events = ZMQ_POLLIN;
+    items [0].revents = 0;
+    items [1].socket = s2_;
+    items [1].fd = 0;
+    items [1].events = ZMQ_POLLIN;
+    items [1].revents = 0;
+
+    while (true) {
+
+        //  Wait while there are either requests or replies to process.
+        rc = zmq_poll (&items [0], 2, -1);
+        if (unlikely (rc < 0)) {
+            return -1;
+        }
+
+        //  Pass a message from s1 to s2.
+        if (items [0].revents & ZMQ_POLLIN) {
+            while (true) {
+                rc = zmq_recvmsg (s1_, &msg, 0);
+                if (unlikely (rc < 0)) {
+                    return -1;
+                }
+                moresz = sizeof (more);
+                rc = zmq_getsockopt (s1_, ZMQ_RCVMORE, &more, &moresz);
+                if (unlikely (rc < 0)) {
+                    return -1;
+                }
+                labelsz = sizeof (label);
+                rc = zmq_getsockopt (s1_, ZMQ_RCVLABEL, &label, &labelsz);
+                if (unlikely (rc < 0)) {
+                    return -1;
+                }
+                rc = zmq_sendmsg (s2_, &msg, (more ? ZMQ_SNDMORE : 0) |
+                    (label ? ZMQ_SNDLABEL : 0));
+                if (unlikely (rc < 0)) {
+                    return -1;
+                }
+                if (!more)
+                    break;
+            }
+        }
+
+        //  Pass a message from s2 to s1.
+        if (items [1].revents & ZMQ_POLLIN) {
+            while (true) {
+                rc = zmq_recvmsg (s2_, &msg, 0);
+                if (unlikely (rc < 0)) {
+                    return -1;
+                }
+                moresz = sizeof (more);
+                rc = zmq_getsockopt (s2_, ZMQ_RCVMORE, &more, &moresz);
+                if (unlikely (rc < 0)) {
+                    return -1;
+                }
+                labelsz = sizeof (label);
+                rc = zmq_getsockopt (s2_, ZMQ_RCVLABEL, &label, &labelsz);
+                if (unlikely (rc < 0)) {
+                    return -1;
+                }
+                rc = zmq_sendmsg (s1_, &msg, (more ? ZMQ_SNDMORE : 0) |
+                    (label ? ZMQ_SNDLABEL : 0));
+                if (unlikely (rc < 0)) {
+                    return -1;
+                }
+                if (!more)
+                    break;
+            }
+        }
+
+    }
+
+    return 0;
+}
+
 int zmq_errno ()
 {
     return errno;
-- 
1.7.0.4

_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev

Reply via email to