On Friday 23, Andrzej K. Haczewski wrote: > 2012/3/22 Robert G. Jakabosky <[email protected]>: > > zmq sockets are alway level-triggered. Your code should call > > zmq_send/zmq_recv until it returns EAGAIN, then register the FD for read > > events with your event loop. Your code also needs to alway pass the > > ZMQ_NOBLOCK flag to zmq_send/zmq_recv. > > > > For an lib-ev event loop use an idle watcher to call zmq_recv when it is > > not block. Once zmq_recv returns EAGAIN, stop the idle watcher and > > start an io watcher for read events. The ev_io callback needs to get > > the value of ZMQ_EVENTS with zmq_getsockopt(), then check the events > > value for ZMQ_POLLIN. As soon as ZMQ_EVENTS has ZMQ_POLLIN stop the io > > watcher and start the idle watcher again. > > > > Basically if the zmq socket is readable, your code must keep calling > > zmq_recv() until it returns EAGAIN without waiting for another IO read > > event from the event loop. > > > > I have written an example [1] in Lua that reads from a zmq SUB socket > > using the lib-ev event loop. > > > > Also there is client [2] and server [3] example (using REQ/REP sockets) > > that can use either lib-ev [4] or epoll [5] event loops. > > > > I hope that helps. > > > > 1. https://github.com/Neopallium/lua- > > zmq/blob/master/examples/subscriber_ev.lua > > 2. > > https://github.com/Neopallium/lua-zmq/blob/master/examples/client_poll.l > > ua 3. > > https://github.com/Neopallium/lua-zmq/blob/master/examples/server_poll.l > > ua 4. > > https://github.com/Neopallium/lua-zmq/blob/master/examples/poller/ev.lua > > 5. > > https://github.com/Neopallium/lua-zmq/blob/master/examples/poller/epoll. > > lua
A correction to my last email, I should have said "zmq sockets are alway edge- triggered". > Thank you so much for your assistance. I'm refactoring my code to > include the scheme you've proposed. > > There is one thing that bothers me though: why does the scheme I used > works for ZeroMQ 3.1.0 and CrossroadsIO, as I tired both and they work > with registering FD right away with no recv() calls in between > connect() and epoll(), and it doesn't work for ZeroMQ 2.1. Version 3.1 might be writing something to the socket's pipe, when new sockets are created vs. 2.1. I haven't used 3.1 much, only did a little bit of testing when adding support for 3.1 to my Lua bindings. Try using your code on a SUB socket with 3.1 and send a burst of messages. I think you will still run into problems with 3.1 when many messages are ready to be received from the socket. With 'edge-triggered' sockets your code must always keep reading from the socket until the read queue is empty. Now this doesn't mean that you can't call poll/epoll to check for other events, just make sure you don't ask epoll to block for events (use timeout=0, for don't block). What I recommend is to use some type of work queue (or Idle watchers with lib- ev) to process 'edge-triggered' sockets that are not currently blocked. The worker for each socket should only be allowed to call recv/send X times each time the worker is called. Higher values of X give better through-put on sockets transferring lots of data, but can increase latency for other sockets that only need to send/recv a small amount of data. If the socket's worker hits the limit before getting EAGAIN, then it should "yield" back to the event loop. > Also, I wonder if there might be a race in proposed approach, between > getting EAGAIN and starting to watch FD, since it might be quite a lot > of time between I process all the pending tasks I have on my queue > (which that idle recv() task will belong to) and actually entering > epoll. Or does ZeroMQ 2.1 guarantee that after throwing EAGAIN at user > it will never clear pending event from socket before user has a chance > of epolling on that socket's FD? I think the only thing that can clear a sockets pending events are these calls: zmq_recv() zmq_send() zmq_getsockopt(sock, ZMQ_EVENTS,...) If ZMQ_EVENTS is 0 then it is safe to block for a read event. Hmm, I wonder if there could be an issue with this case: 1. APP: zmq_recv() returns EAGAIN 2. APP: registers zmq socket's FD (i.e. it's pipe FD) with event loop for read event. 3. ZMQ_IO_THREAD: puts new message on read queue (one byte will be written too the socket's pipe). 4. APP: zmq_send() is called to send a message (this will consume the byte from the socket's pipe). --------> at this point the APP should resume calling zmq_recv() until EAGAIN. 5. APP: event loop will block, even though a message can be read with zmq_recv(). But this would only be a problem for bi-directional (XREQ/XREP,DEALER/ROUTER) sockets that can send & recv at any time. One way to handle this would be to mark the socket as recv blocked, then if zmq_send is called on a recv blocked socket, check ZMQ_EVENTS to see if it should be unblocked. The reverse should be done for a send blocked socket when zmq_recv() is called, or just don't call zmq_recv on a socket that is send blocked. -- Robert G. Jakabosky _______________________________________________ zeromq-dev mailing list [email protected] http://lists.zeromq.org/mailman/listinfo/zeromq-dev
