I would check to make sure that you are not actually wrapping messages between
the loop. try using a socket.hasReceiveMore() before pulling the next part and
then using draining the rest of the message parts if it doesn't match your
expectation.
A typical drain call that you can use could be like this:
while (socket.hasRecvMore()) {
socket.recv(ZMQ.NOBLOCK)
}
You could also add logging to see when this occurs. I have noticed on 3.1 in
testing that I sometimes don't get the first frame which could throw the
process off. https://zeromq.jira.com/browse/LIBZMQ-283 has something like this
(note: 3.0 so it may not be in 2.1.11).
Joshua
On Feb 17, 2012, at 11:41 AM, Sergey Malov wrote:
> Hi,
> I am working on a project where two applications talk to each other
> exchanging protocol buffer messages through ZeroMQ sockets. Applications are
> written in Scala (2.9.1), Java binding is used to manipulate ZeroMQ sockets,
> messages are being serialized using protocol buffer's Java binding as well.
> Underlying ZeroMQ library is 2.1.11, Java binding (for ZeroMQ) compiled from
> the latest source.
> The flow of messages is the following.
> Application A pushes message to application B (push/pull socket's pair), B
> de-serializes message, processes it, then serializes result and publishes it
> for A to pick up (publish/subscribe socket pair). Message A->B is multipart,
> first part being a key for message B->A subscription (which is obviously,
> multipart too), second part - being an actual message body .
> Most (if not all) of the B->A messages arrive (i.e. polling for appropriate
> socket succeeds), however occasionally I see for two socket.recv(0) calls on
> A side: first call picks up a *body* of the previously sent A->B message,
> second call picks up a *key* for the current message.
> Obviously it causes major problems processing response in A. I should also
> note that it happens only when rate of A->B messages is sufficiently high
> (like 500 messages per second).
>
> It is very possible that I did something wrong designing a message flow at
> the first place, so below are the relevant (I think) parts of A and B.
> Any advice would be very welcome.
>
> Sergey Malov
>
>
> Application A :
> --------------------
> pushSocket.setLinger(...);
> subSocket.setLinger(...); subSocket.setReceiveTimeOut(...)
>
> poller.setTimeout(...);
> pushSocket.connect(pushEndPoint);
>
> subSocket.connect(subEndPoint);
> val status = poller.register(subSocket) ; // then check status
> val address = new Random().nextInt().toString.getBytes;
>
> while(true)
> {
> pushSocket.send( address, ZMQ.SNDMORE);
> val pushMsg = ..... // create protocol buffer message
> pushSocket.send(pushMsg.toByteArray,0);
>
> subSocket.subscribe(address);
> poller.poll;
> if (poller.pollin(0))
> {
> val rspAddress = subSocket.recv(0); <- this
> should be subscription address, but occasionally it is message body !
> val rspBytes = subSocket.recv(0); <- this
> should be message body, but occasionally it is address
> }
> // sleep pre-defined time to get a steady message
> rate
> Thread.sleep(SLEEP_TIME);
> }
> }
>
> Application B:
> -------------------
>
> poller.setTimeout( ....) );
> pullSocket.bind(pullEndPoint);
> val status1 = poller.register(pullSocket); // then check
> status
>
> pubSocket.bind(pubEndPoint);
> val status2 = poller.register(pubSocket) // then check
> status
> while (true)
> {
> poller.poll;
> if (poller.pollin(0)) {
> val address = pullSocket.recv(0);
> val inMessage = pullSocket.recv(0);
> // process incoming message
> val result = ... // receive result
> pubSocket.send(address, ZMQ.SNDMORE);
> pubSocket.send(result.toByteArray, 0);
> };
> };
>
>
>
>
> _______________________________________________
> zeromq-dev mailing list
> [email protected]
> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev