After some investigation, here are the results:
 - Setting sockets to NOBLOCK partially rectified this problem, but didn't 
completely solved - frame shift occurred much less frequent and at higher 
message rate. I assumed that only first part of the message is missed to 
simplify further message analysis, it proved to be a correct assumption.
 - Separating cycles for pushing messages and receiving them to two separate 
threads fixed this problem completely.

Joshua, thanks for the tips,

Sergey
 
On 2012-02-17, at 5:13 PM, Joshua Foster wrote:

> I would check to make sure that you are not actually wrapping messages 
> between the loop. try using a socket.hasReceiveMore() before pulling the next 
> part and then using draining the rest of the message parts if it doesn't 
> match your expectation.
> 
> A typical drain call that you can use could be like this: 
> while (socket.hasRecvMore()) {
>  socket.recv(ZMQ.NOBLOCK)
> }
> 
> You could also add logging to see when this occurs. I have noticed on 3.1 in 
> testing that I sometimes don't get the first frame which could throw the 
> process off. https://zeromq.jira.com/browse/LIBZMQ-283 has something like 
> this (note: 3.0 so it may not be in 2.1.11).
> 
> Joshua
> 
> On Feb 17, 2012, at 11:41 AM, Sergey Malov wrote:
> 
>> Hi,
>> I am working on a project where two applications talk to each other 
>> exchanging protocol buffer messages through ZeroMQ sockets. Applications are 
>> written in Scala (2.9.1), Java binding is used to manipulate ZeroMQ sockets, 
>> messages are being serialized using protocol buffer's Java binding as well. 
>> Underlying ZeroMQ library is 2.1.11, Java binding (for ZeroMQ) compiled from 
>> the latest source.
>> The flow of messages is the following.
>> Application A pushes message to application B (push/pull socket's pair), B 
>> de-serializes message, processes it, then serializes result and publishes it 
>> for A to pick up (publish/subscribe socket pair). Message A->B is multipart, 
>> first part being a key for message B->A subscription (which is obviously, 
>> multipart too), second part - being an actual message body .
>> Most (if not all) of the B->A messages arrive (i.e. polling for appropriate 
>> socket succeeds), however occasionally I see for two socket.recv(0) calls on 
>> A side: first call picks up a *body* of the previously sent A->B message, 
>> second call picks up a *key* for the current message. 
>> Obviously it causes major problems processing response in A. I should also 
>> note that it happens only when rate of A->B messages is sufficiently high 
>> (like 500 messages per second).
>> 
>> It is very possible that I did something wrong designing a message flow at 
>> the first place, so below are the relevant (I think) parts of A and B.
>> Any advice would be very welcome.
>> 
>> Sergey Malov
>> 
>> 
>> Application A :
>> --------------------
>>                pushSocket.setLinger(...);
>>                subSocket.setLinger(...); subSocket.setReceiveTimeOut(...)
>> 
>>              poller.setTimeout(...);
>>              pushSocket.connect(pushEndPoint);
>> 
>>              subSocket.connect(subEndPoint);
>>              val status = poller.register(subSocket) ; // then check status
>>              val address = new Random().nextInt().toString.getBytes;
>> 
>>              while(true)
>>              {
>>                      pushSocket.send( address, ZMQ.SNDMORE);
>>                      val pushMsg = ..... // create protocol buffer message
>>                      pushSocket.send(pushMsg.toByteArray,0);
>> 
>>                      subSocket.subscribe(address);
>>                      poller.poll;
>>                      if (poller.pollin(0))
>>                      {
>>                              val rspAddress = subSocket.recv(0); <- this 
>> should be subscription address, but occasionally it is message body !
>>                              val rspBytes = subSocket.recv(0);   <- this 
>> should be message body, but occasionally it is address 
>>                      }
>>                        // sleep pre-defined time to get a steady message 
>> rate 
>>                      Thread.sleep(SLEEP_TIME);
>>              }
>>      }
>> 
>> Application B:
>> -------------------
>> 
>>                  poller.setTimeout( ....) ); 
>>                  pullSocket.bind(pullEndPoint);
>>                  val status1 = poller.register(pullSocket); // then check 
>> status
>> 
>>                  pubSocket.bind(pubEndPoint);
>>                  val status2 = poller.register(pubSocket) // then check 
>> status 
>>                while (true)
>>                {
>>                        poller.poll;
>>                        if (poller.pollin(0)) {
>>                                val address = pullSocket.recv(0);
>>                                val inMessage = pullSocket.recv(0);
>>                                // process incoming message 
>>                                val result = ... // receive result
>>                                pubSocket.send(address, ZMQ.SNDMORE);
>>                                pubSocket.send(result.toByteArray, 0);
>>                        };
>>                };
>> 
>> 
>> 
>> 
>> _______________________________________________
>> zeromq-dev mailing list
>> [email protected]
>> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
> 
> _______________________________________________
> zeromq-dev mailing list
> [email protected]
> http://lists.zeromq.org/mailman/listinfo/zeromq-dev

_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev

Reply via email to