After some investigation, here are the results:
- Setting sockets to NOBLOCK partially rectified this problem, but didn't
completely solved - frame shift occurred much less frequent and at higher
message rate. I assumed that only first part of the message is missed to
simplify further message analysis, it proved to be a correct assumption.
- Separating cycles for pushing messages and receiving them to two separate
threads fixed this problem completely.
Joshua, thanks for the tips,
Sergey
On 2012-02-17, at 5:13 PM, Joshua Foster wrote:
> I would check to make sure that you are not actually wrapping messages
> between the loop. try using a socket.hasReceiveMore() before pulling the next
> part and then using draining the rest of the message parts if it doesn't
> match your expectation.
>
> A typical drain call that you can use could be like this:
> while (socket.hasRecvMore()) {
> socket.recv(ZMQ.NOBLOCK)
> }
>
> You could also add logging to see when this occurs. I have noticed on 3.1 in
> testing that I sometimes don't get the first frame which could throw the
> process off. https://zeromq.jira.com/browse/LIBZMQ-283 has something like
> this (note: 3.0 so it may not be in 2.1.11).
>
> Joshua
>
> On Feb 17, 2012, at 11:41 AM, Sergey Malov wrote:
>
>> Hi,
>> I am working on a project where two applications talk to each other
>> exchanging protocol buffer messages through ZeroMQ sockets. Applications are
>> written in Scala (2.9.1), Java binding is used to manipulate ZeroMQ sockets,
>> messages are being serialized using protocol buffer's Java binding as well.
>> Underlying ZeroMQ library is 2.1.11, Java binding (for ZeroMQ) compiled from
>> the latest source.
>> The flow of messages is the following.
>> Application A pushes message to application B (push/pull socket's pair), B
>> de-serializes message, processes it, then serializes result and publishes it
>> for A to pick up (publish/subscribe socket pair). Message A->B is multipart,
>> first part being a key for message B->A subscription (which is obviously,
>> multipart too), second part - being an actual message body .
>> Most (if not all) of the B->A messages arrive (i.e. polling for appropriate
>> socket succeeds), however occasionally I see for two socket.recv(0) calls on
>> A side: first call picks up a *body* of the previously sent A->B message,
>> second call picks up a *key* for the current message.
>> Obviously it causes major problems processing response in A. I should also
>> note that it happens only when rate of A->B messages is sufficiently high
>> (like 500 messages per second).
>>
>> It is very possible that I did something wrong designing a message flow at
>> the first place, so below are the relevant (I think) parts of A and B.
>> Any advice would be very welcome.
>>
>> Sergey Malov
>>
>>
>> Application A :
>> --------------------
>> pushSocket.setLinger(...);
>> subSocket.setLinger(...); subSocket.setReceiveTimeOut(...)
>>
>> poller.setTimeout(...);
>> pushSocket.connect(pushEndPoint);
>>
>> subSocket.connect(subEndPoint);
>> val status = poller.register(subSocket) ; // then check status
>> val address = new Random().nextInt().toString.getBytes;
>>
>> while(true)
>> {
>> pushSocket.send( address, ZMQ.SNDMORE);
>> val pushMsg = ..... // create protocol buffer message
>> pushSocket.send(pushMsg.toByteArray,0);
>>
>> subSocket.subscribe(address);
>> poller.poll;
>> if (poller.pollin(0))
>> {
>> val rspAddress = subSocket.recv(0); <- this
>> should be subscription address, but occasionally it is message body !
>> val rspBytes = subSocket.recv(0); <- this
>> should be message body, but occasionally it is address
>> }
>> // sleep pre-defined time to get a steady message
>> rate
>> Thread.sleep(SLEEP_TIME);
>> }
>> }
>>
>> Application B:
>> -------------------
>>
>> poller.setTimeout( ....) );
>> pullSocket.bind(pullEndPoint);
>> val status1 = poller.register(pullSocket); // then check
>> status
>>
>> pubSocket.bind(pubEndPoint);
>> val status2 = poller.register(pubSocket) // then check
>> status
>> while (true)
>> {
>> poller.poll;
>> if (poller.pollin(0)) {
>> val address = pullSocket.recv(0);
>> val inMessage = pullSocket.recv(0);
>> // process incoming message
>> val result = ... // receive result
>> pubSocket.send(address, ZMQ.SNDMORE);
>> pubSocket.send(result.toByteArray, 0);
>> };
>> };
>>
>>
>>
>>
>> _______________________________________________
>> zeromq-dev mailing list
>> [email protected]
>> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>
> _______________________________________________
> zeromq-dev mailing list
> [email protected]
> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev