On 20 May 2011 15:27, Attila-Mihaly Balazs <[email protected]> wrote:
> Hello everyone,
>
> I'm playing around with ZMQ + Java and I've run into a couple of problems.
> The test program which I use is attached [1]. What I try to do:
> - start a published and a subscriber on a IPC transport (ipc://zeromq_test)
> - publish a message on this transport and ensure that I receive it back
> - stop the transport
>
> However I've run into several problems (which I suspect are caused by me,
> since I'm a ZMQ noob):
> - some times I get the message: "Assertion failed: nbytes == sizeof
> (command_t) (mailbox.cpp:245)" [4] - I found a discussion related to this
> from last year [3], however it's not clear to me what the problem is or how
> I should resolve it. Is this a bug or am I making a mistake?
> - some times the termination thread hangs. I've attached a gdb stack trace
> [2] with the termination thread. From it, if I read correctly, it hangs in
> mailbox.cpp:204, which is in recv (which has the blocking parameter set to
> true). Again, am I incorrectly terminating or is there a bug?
>
Hi,
Try initializing the sockets inside the thread that uses them and avoid
passing around sockets between threads.
See the attached modified version of your test.
Cheers,
Marko
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.zeromq.ZMQ;
import org.zeromq.ZMQException;
public class TestJZMQ {
public static void main(String... args) throws Exception {
System.out.println("starting");
while (true) {
RunInfo runInfo = start();
System.out.println("sending");
runInfo.publishSocket.send("foobar".getBytes(), 0);
System.out.println("try acquire");
if (!runInfo.dispatcher.receivedMessages.tryAcquire(10, TimeUnit.SECONDS)) {
System.err.println("Message wasn't received!");
System.exit(1);
}
System.out.println("try acquired");
if (runInfo.dispatcher.error) {
System.err.println("Invalid message received!");
System.exit(2);
}
stop(runInfo);
}
}
private static RunInfo start() {
ZMQ.Context context = ZMQ.context(1);
ZMQ.Socket publishSocket = context.socket(ZMQ.PUB);
publishSocket.bind("ipc://zeromq_test");
ZeroMQDispatcher dispatcher = new ZeroMQDispatcher(context);
Thread dispatchThread = new Thread(null, dispatcher,
"ZeroMQDispatcher@" + System.currentTimeMillis());
dispatchThread.setDaemon(true);
dispatchThread.start();
return new RunInfo(dispatchThread, context, publishSocket, dispatcher);
}
private static void stop(final RunInfo runInfo) throws Exception {
System.out.println("stopping");
final CountDownLatch stopLatch = new CountDownLatch(1);
new Thread("StopThread@" + System.currentTimeMillis()) {
@Override
public void run() {
runInfo.dispatchThread.interrupt();
System.out.println("dispatcher interrupted");
// runInfo.subscribeSocket.close();
stopLatch.countDown();
System.out.println("stop thread finishing");
}
}.start();
if (!stopLatch.await(10, TimeUnit.SECONDS)) {
System.err.println("Exit timeouted!");
System.exit(3);
}
System.out.println("closing main thread sockets and context");
runInfo.publishSocket.close();
runInfo.context.term();
System.exit(0);
}
static class RunInfo {
final Thread dispatchThread;
final ZMQ.Context context;
final ZMQ.Socket publishSocket;
final ZeroMQDispatcher dispatcher;
RunInfo(Thread dispatchThread, ZMQ.Context context,
ZMQ.Socket publishSocket, ZeroMQDispatcher dispatcher) {
this.dispatchThread = dispatchThread;
this.context = context;
this.publishSocket = publishSocket;
this.dispatcher = dispatcher;
}
}
static class ZeroMQDispatcher implements Runnable {
final ZMQ.Socket subscribeSocket;
final Semaphore receivedMessages;
volatile boolean error;
ZeroMQDispatcher(ZMQ.Context context) {
ZMQ.Socket subscribeSocket = context.socket(ZMQ.SUB);
subscribeSocket.connect("ipc://zeromq_test");
subscribeSocket.subscribe(new byte[0]);
this.subscribeSocket = subscribeSocket;
this.receivedMessages = new Semaphore(0);
this.error = false;
}
@Override
public void run() {
Thread currentThread = Thread.currentThread();
while (!currentThread.isInterrupted()) {
byte[] messageBytes;
try {
messageBytes = subscribeSocket.recv(0);
System.out.println("got some data");
} catch (ZMQException ex) {
if (ZMQ.Error.ETERM.getCode() == ex.getErrorCode()) {
// termination requested
break;
}
throw ex;
}
String message = new String(messageBytes);
if (!"foobar".equals(message)) { error = true; }
System.out.println("releasing semaphore");
receivedMessages.release();
System.out.println("released semaphore");
}
subscribeSocket.close();
}
}
}
_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev