I'm currently working on an embedded broker running within our different
services to allow to share messages between
the different parts of our system. I'm using a producer and a consumer
socket connecting to two different ports on a
forwarder with following configuration:
<forwarder>
<in>
<bind addr = "tcp://127.0.0.1:5555"/>
</in>
<out>
<bind addr = "tcp://127.0.0.1:5556"/>
</out>
</forwarder>
The forwarder solution is only for some of our developer systems
(Windows, MacOS). In other environments the
producers and consumers should send and receive the messages via
broadcast (OpenPGM).
The following code shows that I'm using two contexts. I need to do it
that way because otherwise the send thread
is blocking. An this leads to my first question: Why I need to use two
contexts here respectively why is the code
blocking if I'm only using one context? So, here is the shortened code
of a first prototype:
public class ZmqBroker {
private static final FmtLogger LOG =
FmtLogger.getLogger(ZmqBroker.class);
private final ExecutorService _executorService =
ExecutorServiceFactory.newFixedThreadPool("ZmqDispatcher", 1);
private Marshaller _marshaller = new MarshallerJavaSerialization();
private static final ZMQ.Context PRODUCER_CONTEXT = ZMQ.context (1);
private static final ZMQ.Context CONSUMER_CONTEXT = ZMQ.context (1);
private static final ZMQ.Socket PRODUCER_SOCKET =
PRODUCER_CONTEXT.socket(ZMQ.PUB);
private static final ZMQ.Socket CONSUMER_SOCKET =
PRODUCER_CONTEXT.socket(ZMQ.SUB);
public void init() {
PRODUCER_SOCKET.connect("tcp://127.0.0.1:5555");
CONSUMER_SOCKET.setsockopt(ZMQ.SUBSCRIBE, "");
CONSUMER_SOCKET.connect("tcp://127.0.0.1:5556");
_executorService.execute(new Dispatcher(new ZmqMessageHandler()));
}
public synchronized void send(Message message) {
byte[] data =
_marshaller.marshal(ZmqMessageFactory.getZmqTransport(message));
PRODUCER_SOCKET.send(data, 0);
}
private static class Dispatcher implements Runnable {
private final Marshaller _marshaller = new
MarshallerJavaSerialization();
private final MessageHandler<Message> _handler;
private Dispatcher(MessageHandler<Message> handler) {
_marshaller = marshaller;
_handler = handler;
}
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
byte[] rawMessage = CONSUMER_SOCKET.recv(0);
Message message = _marshaller.unmarshal(rawMessage);
_handler.handleMessage(message);
} catch (Exception e) {
LOG.error("An error occured handling zmq message ...");
}
}
}
}
}
The second problem I've is that if I'm using two contexts I'm running
into troubles when I'm changing
the protocol to epgm. Because in that case I'm getting following error:
Assertion failed: !pgm_supported () (zmq.cpp:239)
Best regards, Jens
_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev