Hi
Attached are a couple of patches to make the zmq_queue device usable.
I also had to fix the include paths in the forwarder and streamer since
the repo re-org.
Note that at the moment I've only tested one-hop zmq_queue and currently
in the
rep process which is the end of the chain, you have to do some manual
code to get
the response to route back to the correct client.
This is the setup
REQ* <--> ZMQ_QUEUE <--> REP*
* can have multiple of these.
the config for the zmq_queue looks something like:-
<queue>
<in>
<bind addr = "tcp://lo:43210"/>
</in>
<out>
<bind addr = "tcp://lo:43211"/>
</out>
</queue>
Note about REP sockets:
This snippet just simply sends back the response with a copy,
the inbound message was one character.
However currently the first octet of the message is a length indicator
detailing
the length of the addressing info. This needs to be copied into the
response message.
This is only a temporary measure. The new extensions field of the wire
format will make
this requirement go away.
zmq::message_t request;
rep.recv(&request);
unsigned char l = *((char*)request.data());
zmq::message_t response(l+2); // l+2 = the octet, the address, and
my 1 byte test message
::memcpy(response.data(), request.data(), l+2);
rep.send(response);
Jon
>From 7ebd65803082b1f8eee510ef348935b0eb168813 Mon Sep 17 00:00:00 2001
From: jon <j...@ubiq.(none)>
Date: Thu, 11 Mar 2010 23:24:38 +0000
Subject: [PATCH 1/2] added queue class to make zmq_queue device actually useful
---
devices/zmq_queue/zmq_queue.cpp | 116 +++++++++++++++++++++++++++++++++++++--
1 files changed, 111 insertions(+), 5 deletions(-)
diff --git a/devices/zmq_queue/zmq_queue.cpp b/devices/zmq_queue/zmq_queue.cpp
index a90aac7..9c45efe 100644
--- a/devices/zmq_queue/zmq_queue.cpp
+++ b/devices/zmq_queue/zmq_queue.cpp
@@ -20,6 +20,114 @@
#include "../../bindings/cpp/zmq.hpp"
#include "../../foreign/xmlParser/xmlParser.cpp"
+class queue
+{
+public:
+ queue (zmq::socket_t& reply, zmq::socket_t& request)
+ : xrep(reply), xreq(request)
+ {
+ items[0].socket = reply; // socket_t has operator void*
+ items[0].fd = 0;
+ items[0].events = ZMQ_POLLIN;
+ items[0].revents = 0;
+
+ items[1].socket = request; // socket_t has operator void*
+ items[1].fd = 0;
+ items[1].events = ZMQ_POLLIN;
+ items[1].revents = 0;
+
+ m_next_request_method = &queue::get_request;
+ m_next_response_method = &queue::get_response;
+
+ }
+
+ void run()
+ {
+ while (true) {
+ int rc = zmq::poll(&items[0],2,-1);
+ if (rc < 0) break;
+ next_request();
+ next_response();
+ }
+ }
+
+
+private:
+ void next_request()
+ {
+ (this->*m_next_request_method)();
+ }
+
+ void next_response()
+ {
+ (this->*m_next_response_method)();
+ }
+
+
+ void get_request()
+ {
+ if ( items[0].revents & ZMQ_POLLIN ) {
+ int rc = xrep.recv(&request_msg, ZMQ_NOBLOCK);
+ if (!rc) return;
+ items[0].events &= ~ZMQ_POLLIN;
+ items[1].events |= ZMQ_POLLOUT;
+ m_next_request_method = &queue::send_request;
+ }
+ }
+
+ void send_request()
+ {
+ if ( items[1].revents & ZMQ_POLLOUT) {
+ int rc = xreq.send(request_msg, ZMQ_NOBLOCK);
+ if (!rc) return;
+ items[1].events &= ~ZMQ_POLLOUT;
+ items[0].events |= ZMQ_POLLIN;
+ m_next_request_method = &queue::get_request;
+ }
+ }
+
+ void get_response()
+ {
+ if ( items[1].revents & ZMQ_POLLIN ) {
+ int rc = xreq.recv(&response_msg, ZMQ_NOBLOCK);
+ if (!rc) return;
+ items[1].events &= ~ZMQ_POLLIN;
+ items[0].events |= ZMQ_POLLOUT;
+ m_next_response_method = &queue::send_response;
+ }
+ }
+
+ void send_response()
+ {
+ if ( items[0].revents & ZMQ_POLLOUT) {
+ int rc = xrep.send(response_msg, ZMQ_NOBLOCK);
+ if (!rc) return;
+ items[0].events &= ~ZMQ_POLLOUT;
+ items[1].events |= ZMQ_POLLIN;
+ m_next_response_method = &queue::get_response;
+ }
+ }
+
+
+
+private:
+ zmq::socket_t & xrep;
+ zmq::socket_t & xreq;
+ zmq_pollitem_t items[2];
+ zmq::message_t request_msg;
+ zmq::message_t response_msg;
+
+ typedef void (queue::*next_method)();
+
+ next_method m_next_request_method;
+ next_method m_next_response_method;
+
+ queue (queue const &);
+ void operator = (queue const &);
+
+};
+
+
int main (int argc, char *argv [])
{
if (argc != 2) {
@@ -112,11 +220,9 @@ int main (int argc, char *argv [])
n++;
}
- zmq::message_t msg;
- while (true) {
- in_socket.recv (&msg);
- out_socket.send (msg);
- }
+ queue q(in_socket, out_socket);
+
+ q.run();
return 0;
}
--
1.6.3.3
>From eb3bbaaf9a9dffeb7d0bfb990a25c23db5f03b1a Mon Sep 17 00:00:00 2001
From: Jon Dyte <[email protected]>
Date: Fri, 12 Mar 2010 00:45:33 +0000
Subject: [PATCH 2/2] tidy up zmq_queue and adjust include path since the bindinds area removed for forwarder and streamer as well
---
devices/zmq_forwarder/zmq_forwarder.cpp | 2 +-
devices/zmq_queue/zmq_queue.cpp | 16 ++++++++++++----
devices/zmq_streamer/zmq_streamer.cpp | 2 +-
3 files changed, 14 insertions(+), 6 deletions(-)
diff --git a/devices/zmq_forwarder/zmq_forwarder.cpp b/devices/zmq_forwarder/zmq_forwarder.cpp
index b900484..092bc47 100644
--- a/devices/zmq_forwarder/zmq_forwarder.cpp
+++ b/devices/zmq_forwarder/zmq_forwarder.cpp
@@ -17,7 +17,7 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#include "../../bindings/cpp/zmq.hpp"
+#include "../../include/zmq.hpp"
#include "../../foreign/xmlParser/xmlParser.cpp"
int main (int argc, char *argv [])
diff --git a/devices/zmq_queue/zmq_queue.cpp b/devices/zmq_queue/zmq_queue.cpp
index 9c45efe..25d3085 100644
--- a/devices/zmq_queue/zmq_queue.cpp
+++ b/devices/zmq_queue/zmq_queue.cpp
@@ -3,7 +3,15 @@
This file is part of 0MQ.
- 0MQ is free software; you can redistribute it and/or modify it under
+ 0MQ is free software; you can redistribute it and/or modify it under<forwarder>
+ <in>
+ <connect addr = "tcp://192.168.0.1:5555"/>
+ </in>
+ <out>
+ <bind addr = "tcp://eth0:5556"/>
+ </out>
+</forwarder>
+
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
@@ -12,12 +20,12 @@
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
-
+ /sw/release/zmq2/
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#include "../../bindings/cpp/zmq.hpp"
+#include "../../include/zmq.hpp"
#include "../../foreign/xmlParser/xmlParser.cpp"
class queue
@@ -160,7 +168,7 @@ int main (int argc, char *argv [])
}
// TODO: make the number of I/O threads configurable.
- zmq::context_t ctx (1, 1);
+ zmq::context_t ctx (1, 1, ZMQ_POLL);
zmq::socket_t in_socket (ctx, ZMQ_XREP);
zmq::socket_t out_socket (ctx, ZMQ_XREQ);
diff --git a/devices/zmq_streamer/zmq_streamer.cpp b/devices/zmq_streamer/zmq_streamer.cpp
index a2906d9..6eccedf 100644
--- a/devices/zmq_streamer/zmq_streamer.cpp
+++ b/devices/zmq_streamer/zmq_streamer.cpp
@@ -17,7 +17,7 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#include "../../bindings/cpp/zmq.hpp"
+#include "../../include/zmq.hpp"
#include "../../foreign/xmlParser/xmlParser.cpp"
int main (int argc, char *argv [])
--
1.6.3.3
_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev