Hi there,

Thank you for your time in advance. I am doing my high performance system
based on ZeroMQ and jzmq in Java. While I am trying to use direct buffer, I
encountered some problem. I have summarize it into the following test code
in windows.

My unit test environment is:

Windows 7 64bit
ZeroMQ 3.2.3
jzmq master from GIT, checked out in July 2013
JDK 1.7.0_25

And the ftest code is as following. You can see in the subscriber thread
that if I use the recv(ZMQ.DONTWAIT) way it works perfectly; but if I use
the direct buffer, it gives me no response while running and gives me the
following error on exit:

Exception in thread "Thread-0" org.zeromq.ZMQException: Resource
temporarily unavailable(0xb)
    at org.zeromq.ZMQ$Socket.recvZeroCopy(Native Method)
    at
org.as.algo.messaging.bus.zmq.ZMQReadynessTest$1.run(ZMQReadynessTest.java:48)


If I try to use in-direct ByteBuffer, I don't have exception on exit but
still cannot receive anything.

Please kindly assist me. Lots of thanks.

Alex

import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Before;
import org.junit.Test;
import org.zeromq.ZMQ;

/**
 * Test if ZMQ is ready on this box.
 *
 * @author Alex Suo
 *
 */
public class ZMQReadynessTest {

    private ZMQ.Context context;

    @Before
    public void setUp() {
        context = ZMQ.context(1);
    }

    @Test
    public void testSimpleMessage() {
        String topic = "tcp://127.0.0.1:31216";
        final AtomicInteger counter = new AtomicInteger();


        // create a simple subscriber
        final ZMQ.Socket subscribeSocket = context.socket(ZMQ.SUB);
        subscribeSocket.connect(topic);
        subscribeSocket.subscribe("TestTopic".getBytes());

        Thread subThread = new Thread() {

            @Override
            public void run() {
                while (true) {
                    String value = null;

                    // This would result in trouble
                    {
                        ByteBuffer buffer = ByteBuffer.allocateDirect(100);
                        if (subscribeSocket.recvZeroCopy(buffer,
buffer.remaining(), ZMQ.DONTWAIT) > 0) {

                            buffer.flip();

                            value = buffer.asCharBuffer().toString();

System.out.println(buffer.asCharBuffer().toString());
                        }
                    }

                    // This works perfectly
                    /*
                    {
                        byte[] bytes = subscribeSocket.recv(ZMQ.DONTWAIT);
                        if (bytes == null || bytes.length == 0) {
                            continue;
                        }

                        value = new String(bytes);
                    }
                     */

                    if (value != null && value.length() > 0) {
                        counter.incrementAndGet();
                        System.out.println(value);
                        break;
                    }
                }
            }
        };
        subThread.start();

        // create a simple publisher - wait 3 sec to make sure its ready
        ZMQ.Socket publishSocket = context.socket(ZMQ.PUB);
        publishSocket.bind("tcp://*:31216");

        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
            fail();
        }

        // publish a sample message
        try {
            publishSocket.send("TestTopic".getBytes(), ZMQ.SNDMORE);
            publishSocket.send("This is test string".getBytes(), 0);
            subThread.join(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
            fail();
        }

        assertTrue(counter.get() > 0);
        System.out.println(counter.get());
    }

}
_______________________________________________
zeromq-dev mailing list
zeromq-dev@lists.zeromq.org
http://lists.zeromq.org/mailman/listinfo/zeromq-dev

Reply via email to