Hi there, Thank you for your time in advance. I am doing my high performance system based on ZeroMQ and jzmq in Java. While I am trying to use direct buffer, I encountered some problem. I have summarize it into the following test code in windows.
My unit test environment is: Windows 7 64bit ZeroMQ 3.2.3 jzmq master from GIT, checked out in July 2013 JDK 1.7.0_25 And the ftest code is as following. You can see in the subscriber thread that if I use the recv(ZMQ.DONTWAIT) way it works perfectly; but if I use the direct buffer, it gives me no response while running and gives me the following error on exit: Exception in thread "Thread-0" org.zeromq.ZMQException: Resource temporarily unavailable(0xb) at org.zeromq.ZMQ$Socket.recvZeroCopy(Native Method) at org.as.algo.messaging.bus.zmq.ZMQReadynessTest$1.run(ZMQReadynessTest.java:48) If I try to use in-direct ByteBuffer, I don't have exception on exit but still cannot receive anything. Please kindly assist me. Lots of thanks. Alex import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicInteger; import org.junit.Before; import org.junit.Test; import org.zeromq.ZMQ; /** * Test if ZMQ is ready on this box. * * @author Alex Suo * */ public class ZMQReadynessTest { private ZMQ.Context context; @Before public void setUp() { context = ZMQ.context(1); } @Test public void testSimpleMessage() { String topic = "tcp://127.0.0.1:31216"; final AtomicInteger counter = new AtomicInteger(); // create a simple subscriber final ZMQ.Socket subscribeSocket = context.socket(ZMQ.SUB); subscribeSocket.connect(topic); subscribeSocket.subscribe("TestTopic".getBytes()); Thread subThread = new Thread() { @Override public void run() { while (true) { String value = null; // This would result in trouble { ByteBuffer buffer = ByteBuffer.allocateDirect(100); if (subscribeSocket.recvZeroCopy(buffer, buffer.remaining(), ZMQ.DONTWAIT) > 0) { buffer.flip(); value = buffer.asCharBuffer().toString(); System.out.println(buffer.asCharBuffer().toString()); } } // This works perfectly /* { byte[] bytes = subscribeSocket.recv(ZMQ.DONTWAIT); if (bytes == null || bytes.length == 0) { continue; } value = new String(bytes); } */ if (value != null && value.length() > 0) { counter.incrementAndGet(); System.out.println(value); break; } } } }; subThread.start(); // create a simple publisher - wait 3 sec to make sure its ready ZMQ.Socket publishSocket = context.socket(ZMQ.PUB); publishSocket.bind("tcp://*:31216"); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); fail(); } // publish a sample message try { publishSocket.send("TestTopic".getBytes(), ZMQ.SNDMORE); publishSocket.send("This is test string".getBytes(), 0); subThread.join(100); } catch (InterruptedException e) { e.printStackTrace(); fail(); } assertTrue(counter.get() > 0); System.out.println(counter.get()); } }
_______________________________________________ zeromq-dev mailing list zeromq-dev@lists.zeromq.org http://lists.zeromq.org/mailman/listinfo/zeromq-dev