I would check to make sure that you are not actually wrapping messages between 
the loop. try using a socket.hasReceiveMore() before pulling the next part and 
then using draining the rest of the message parts if it doesn't match your 
expectation.

A typical drain call that you can use could be like this: 
while (socket.hasRecvMore()) {
  socket.recv(ZMQ.NOBLOCK)
}

You could also add logging to see when this occurs. I have noticed on 3.1 in 
testing that I sometimes don't get the first frame which could throw the 
process off. https://zeromq.jira.com/browse/LIBZMQ-283 has something like this 
(note: 3.0 so it may not be in 2.1.11).

Joshua

On Feb 17, 2012, at 11:41 AM, Sergey Malov wrote:

> Hi,
> I am working on a project where two applications talk to each other 
> exchanging protocol buffer messages through ZeroMQ sockets. Applications are 
> written in Scala (2.9.1), Java binding is used to manipulate ZeroMQ sockets, 
> messages are being serialized using protocol buffer's Java binding as well. 
> Underlying ZeroMQ library is 2.1.11, Java binding (for ZeroMQ) compiled from 
> the latest source.
> The flow of messages is the following.
> Application A pushes message to application B (push/pull socket's pair), B 
> de-serializes message, processes it, then serializes result and publishes it 
> for A to pick up (publish/subscribe socket pair). Message A->B is multipart, 
> first part being a key for message B->A subscription (which is obviously, 
> multipart too), second part - being an actual message body .
> Most (if not all) of the B->A messages arrive (i.e. polling for appropriate 
> socket succeeds), however occasionally I see for two socket.recv(0) calls on 
> A side: first call picks up a *body* of the previously sent A->B message, 
> second call picks up a *key* for the current message. 
> Obviously it causes major problems processing response in A. I should also 
> note that it happens only when rate of A->B messages is sufficiently high 
> (like 500 messages per second).
> 
> It is very possible that I did something wrong designing a message flow at 
> the first place, so below are the relevant (I think) parts of A and B.
> Any advice would be very welcome.
> 
> Sergey Malov
> 
> 
> Application A :
> --------------------
>                 pushSocket.setLinger(...);
>                 subSocket.setLinger(...); subSocket.setReceiveTimeOut(...)
> 
>               poller.setTimeout(...);
>               pushSocket.connect(pushEndPoint);
> 
>               subSocket.connect(subEndPoint);
>               val status = poller.register(subSocket) ; // then check status
>               val address = new Random().nextInt().toString.getBytes;
> 
>               while(true)
>               {
>                       pushSocket.send( address, ZMQ.SNDMORE);
>                       val pushMsg = ..... // create protocol buffer message
>                       pushSocket.send(pushMsg.toByteArray,0);
> 
>                       subSocket.subscribe(address);
>                       poller.poll;
>                       if (poller.pollin(0))
>                       {
>                               val rspAddress = subSocket.recv(0); <- this 
> should be subscription address, but occasionally it is message body !
>                               val rspBytes = subSocket.recv(0);   <- this 
> should be message body, but occasionally it is address 
>                       }
>                         // sleep pre-defined time to get a steady message 
> rate 
>                       Thread.sleep(SLEEP_TIME);
>               }
>       }
> 
> Application B:
> -------------------
> 
>                   poller.setTimeout( ....) ); 
>                   pullSocket.bind(pullEndPoint);
>                   val status1 = poller.register(pullSocket); // then check 
> status
>   
>                   pubSocket.bind(pubEndPoint);
>                   val status2 = poller.register(pubSocket) // then check 
> status 
>                 while (true)
>                 {
>                         poller.poll;
>                         if (poller.pollin(0)) {
>                                 val address = pullSocket.recv(0);
>                                 val inMessage = pullSocket.recv(0);
>                                 // process incoming message 
>                                 val result = ... // receive result
>                                 pubSocket.send(address, ZMQ.SNDMORE);
>                                 pubSocket.send(result.toByteArray, 0);
>                         };
>                 };
> 
> 
> 
>  
> _______________________________________________
> zeromq-dev mailing list
> [email protected]
> http://lists.zeromq.org/mailman/listinfo/zeromq-dev

_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev

Reply via email to