On 11/24/2014 8:08 PM, Arnaud Kapp wrote:
Currently, the patch is written for 3.2.4. I'll wait to put it in libzmq master
Oh okay. This is the commit that added the flag:
https://github.com/zeromq/libzmq/commit/779c37abc433cb6595ddeedaf86b280317656bdd
I have rewritten the patch to take the POLLPRI flag into account, and to apply cleanly to the current master (Wed 11/25).
Cheers,
Francis


libzmq was 4.1 at the time I believe.

I'll probably look at it this week-end then :)

On Mon, Nov 24, 2014 at 12:10 PM, Francis Le Bourse
<[email protected]> wrote:
Hi,
On 11/24/2014 11:35 AM, Arnaud Kapp wrote:
Hello,

I recently added support for POLLPRI flag.
It looks like it's not handled in your patch
No, it isn't handled. In which version do you have added this flag ?
Currently, the patch is written for 3.2.4. I'll wait to put it in libzmq
master.
   and that it needs custom
support. Since there is no test related to this flags you wouldn't
notice.

I can give it a look if you want.
That would be nice.

Cheers,
Francis


On Sat, Nov 22, 2014 at 2:16 PM, Pieter Hintjens <[email protected]> wrote:
I suggest you send the patch to libzmq master, and ensure all test
cases pass. Then we can get this into the next version.

On Fri, Nov 21, 2014 at 2:50 PM, Francis Le Bourse
<[email protected]> wrote:
Hi,

On 11/6/2014 3:18 PM, Pieter Hintjens wrote:
Oh, ok. Sounds like you have a good candidate for some before/after
measurement and optimization. Are you going to try to make a patch for
this?
I have a patch candidate for this optimization, the performance
improvement
is very good and it doesn't seem to introduce any new instability.
What is modified:
      - zmq_poll(), there is only one poll() now,
      - and epoll() from epoll.cpp
Other calls to poll() and select() are left unmodified.

I woulld be happy to have any feedback.


Cheers,
Francis

On Thu, Nov 6, 2014 at 2:09 PM, Francis Le Bourse
<[email protected]> wrote:
On 11/6/2014 11:47 AM, Pieter Hintjens wrote:
A simple optimization is, when you are polling sockets for input, to
continue reading from an active socket using a non-blocking read. So
you process all waiting messages on a socket and then only switch
back
to poll when needed.
Thank you for you quick reply.

Yes, but the question was more about the zmq_poll() internals.
For 600+ file descriptors, zmq_poll() calls poll() a huge number of
times
for only a few that will trigger a POLLIN and the relevant information
is
already known / present in the pollfds array. The performance hit is
there.

Cheers,
Francis



On Thu, Nov 6, 2014 at 11:28 AM, Francis Le Bourse
<[email protected]> wrote:
Hi,

I am looking at a performance issue in zmq, when the number of
zsockets
/
file descriptors becomes large.
The relevant calls are:
        poll+0x57
        zmq_poll+0x2e3
        zloop_start+0x1e8
        main+0xb40
        __libc_start_main+0xfd
immediately followed by a loop of
        poll+0x57
        zmq::signaler_t::wait(int)+0x33
        zmq::mailbox_t::recv(zmq::command_t*, int)+0x78
        zmq::socket_base_t::process_commands(int, bool)+0xbe
        zmq::socket_base_t::getsockopt(int, void*, unsigned
long*)+0x135
        zmq_getsockopt+0x75
        zmq_poll+0x3da
        zloop_start+0x1e8
        main+0xb40
        __libc_start_main+0xfd

The code in the loop is executed once per file descriptor in the
initial
pollarray, redoing a poll() system call each time.
Is there a reason to proceed that way ?
Would be possible to reuse the results of the first poll() in order
to
bypass the second set of system calls ?

Cheers,
Francis


_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev

_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev






diff -urwB libzmq-original/src/ctx.cpp libzmq/src/ctx.cpp
--- libzmq-original/src/ctx.cpp 2014-11-25 11:01:35.704930597 +0100
+++ libzmq/src/ctx.cpp  2014-11-25 11:03:41.041081315 +0100
@@ -28,6 +28,8 @@
 #include <new>
 #include <string.h>
 
+#include <poll.h>
+
 #include "ctx.hpp"
 #include "socket_base.hpp"
 #include "io_thread.hpp"
diff -urwB libzmq-original/src/epoll.cpp libzmq/src/epoll.cpp
--- libzmq-original/src/epoll.cpp       2014-11-25 11:01:35.707930552 +0100
+++ libzmq/src/epoll.cpp        2014-11-25 12:31:57.856918921 +0100
@@ -27,6 +27,8 @@
 #include <algorithm>
 #include <new>
 
+#include <poll.h>
+
 #include "epoll.hpp"
 #include "err.hpp"
 #include "config.hpp"
@@ -152,18 +154,43 @@
         for (int i = 0; i < n; i ++) {
             poll_entry_t *pe = ((poll_entry_t*) ev_buf [i].data.ptr);
 
+           uint32_t events = ev_buf[i].events;
+           ev_buf[i].events = 0;
+
+           // build a pollfd structure with the events returned by
+           // epoll and call in_events_ex in order to bypass poll()
+           // if EPOLLIN is present in events
+
+           struct pollfd pfd;
+           pfd.fd = pe->fd;
+           pfd.events = 0;
+           if(pe->ev.events & EPOLLIN)
+               pfd.events |= POLLIN;
+           if(pe->ev.events & EPOLLOUT)
+               pfd.events |= POLLOUT;
+           pfd.revents = 0;
+           
+           if(events & EPOLLIN)
+               pfd.revents |= POLLIN;
+           if(events & EPOLLOUT)
+               pfd.revents |= POLLOUT;
+           if(events & EPOLLERR)
+               pfd.revents |= POLLERR;
+           if(events & EPOLLHUP)
+               pfd.revents |= POLLHUP;
+           
             if (pe->fd == retired_fd)
                 continue;
-            if (ev_buf [i].events & (EPOLLERR | EPOLLHUP))
+            if (events & (EPOLLERR | EPOLLHUP))
                 pe->events->in_event ();
             if (pe->fd == retired_fd)
                continue;
-            if (ev_buf [i].events & EPOLLOUT)
+            if (events & EPOLLOUT)
                 pe->events->out_event ();
             if (pe->fd == retired_fd)
                 continue;
-            if (ev_buf [i].events & EPOLLIN)
-                pe->events->in_event ();
+            if (events & EPOLLIN)
+               pe->events->in_event_ex (&pfd);
         }
 
         //  Destroy retired event sources.
diff -urwB libzmq-original/src/i_poll_events.hpp libzmq/src/i_poll_events.hpp
--- libzmq-original/src/i_poll_events.hpp       2014-11-25 11:01:35.709930522 
+0100
+++ libzmq/src/i_poll_events.hpp        2014-11-25 11:03:41.042081411 +0100
@@ -20,6 +20,8 @@
 #ifndef __ZMQ_I_POLL_EVENTS_HPP_INCLUDED__
 #define __ZMQ_I_POLL_EVENTS_HPP_INCLUDED__
  
+#include <poll.h>
+
 namespace zmq
 {
  
@@ -32,6 +34,7 @@
  
         // Called by I/O thread when file descriptor is ready for reading.
         virtual void in_event () = 0;
+        virtual void in_event_ex (pollfd *) = 0;
  
         // Called by I/O thread when file descriptor is ready for writing.
         virtual void out_event () = 0;
diff -urwB libzmq-original/src/io_object.cpp libzmq/src/io_object.cpp
--- libzmq-original/src/io_object.cpp   2014-11-25 11:01:35.709930522 +0100
+++ libzmq/src/io_object.cpp    2014-11-25 11:03:41.042081411 +0100
@@ -95,6 +95,11 @@
     zmq_assert (false);
 }
 
+void zmq::io_object_t::in_event_ex (pollfd *pfd)
+{
+    zmq_assert (false);
+}
+
 void zmq::io_object_t::out_event ()
 {
     zmq_assert (false);
diff -urwB libzmq-original/src/io_object.hpp libzmq/src/io_object.hpp
--- libzmq-original/src/io_object.hpp   2014-11-25 11:01:35.709930522 +0100
+++ libzmq/src/io_object.hpp    2014-11-25 11:03:41.043081508 +0100
@@ -63,6 +63,7 @@
 
         //  i_poll_events interface implementation.
         void in_event ();
+        void in_event_ex (pollfd *);
         void out_event ();
         void timer_event (int id_);
 
diff -urwB libzmq-original/src/io_thread.cpp libzmq/src/io_thread.cpp
--- libzmq-original/src/io_thread.cpp   2014-11-25 11:01:35.709930522 +0100
+++ libzmq/src/io_thread.cpp    2014-11-25 11:03:41.043081508 +0100
@@ -77,6 +77,23 @@
     errno_assert (rc != 0 && errno == EAGAIN);
 }
 
+void zmq::io_thread_t::in_event_ex (pollfd *pfd)
+{
+    //  TODO: Do we want to limit number of commands I/O thread can
+    //  process in a single go?
+
+    command_t cmd;
+    int rc = mailbox.recv_ex (&cmd, 0, pfd); // will clear POLLIN
+    
+    while (rc == 0 || errno == EINTR) {
+        if (rc == 0)
+            cmd.destination->process_command (cmd);
+        rc = mailbox.recv_ex (&cmd, 0, pfd);
+    }
+
+    errno_assert (rc != 0 && errno == EAGAIN);
+}
+
 void zmq::io_thread_t::out_event ()
 {
     //  We are never polling for POLLOUT here. This function is never called.
diff -urwB libzmq-original/src/io_thread.hpp libzmq/src/io_thread.hpp
--- libzmq-original/src/io_thread.hpp   2014-11-25 11:01:35.709930522 +0100
+++ libzmq/src/io_thread.hpp    2014-11-25 11:03:41.043081508 +0100
@@ -57,6 +57,7 @@
 
         //  i_poll_events implementation.
         void in_event ();
+        void in_event_ex (pollfd *);
         void out_event ();
         void timer_event (int id_);
 
diff -urwB libzmq-original/src/ipc_connecter.cpp libzmq/src/ipc_connecter.cpp
--- libzmq-original/src/ipc_connecter.cpp       2014-11-25 11:01:35.710930507 
+0100
+++ libzmq/src/ipc_connecter.cpp        2014-11-25 11:03:41.043081508 +0100
@@ -99,6 +99,14 @@
     out_event ();
 }
 
+void zmq::ipc_connecter_t::in_event_ex (pollfd *pfd)
+{
+    //  We are not polling for incomming data, so we are actually called
+    //  because of error here. However, we can get error on out event as well
+    //  on some platforms, so we'll simply handle both events in the same way.
+    out_event ();
+}
+
 void zmq::ipc_connecter_t::out_event ()
 {
     fd_t fd = connect ();
diff -urwB libzmq-original/src/ipc_connecter.hpp libzmq/src/ipc_connecter.hpp
--- libzmq-original/src/ipc_connecter.hpp       2014-11-25 11:01:35.710930507 
+0100
+++ libzmq/src/ipc_connecter.hpp        2014-11-25 11:03:41.044081604 +0100
@@ -58,6 +58,7 @@
 
         //  Handlers for I/O events.
         void in_event ();
+        void in_event_ex (pollfd *);
         void out_event ();
         void timer_event (int id_);
 
diff -urwB libzmq-original/src/ipc_listener.cpp libzmq/src/ipc_listener.cpp
--- libzmq-original/src/ipc_listener.cpp        2014-11-25 11:01:35.710930507 
+0100
+++ libzmq/src/ipc_listener.cpp 2014-11-25 11:03:41.044081604 +0100
@@ -110,6 +110,11 @@
     socket->event_accepted (endpoint, fd);
 }
 
+void zmq::ipc_listener_t::in_event_ex (pollfd *pfd)
+{
+    zmq::ipc_listener_t::in_event();
+}
+
 int zmq::ipc_listener_t::get_address (std::string &addr_)
 {
     struct sockaddr_storage ss;
diff -urwB libzmq-original/src/ipc_listener.hpp libzmq/src/ipc_listener.hpp
--- libzmq-original/src/ipc_listener.hpp        2014-11-25 11:01:35.710930507 
+0100
+++ libzmq/src/ipc_listener.hpp 2014-11-25 11:03:41.044081604 +0100
@@ -59,6 +59,7 @@
 
         //  Handlers for I/O events.
         void in_event ();
+        void in_event_ex (pollfd *);
 
         //  Close the listening socket.
         int close ();
diff -urwB libzmq-original/src/mailbox.cpp libzmq/src/mailbox.cpp
--- libzmq-original/src/mailbox.cpp     2014-11-25 11:01:35.711930493 +0100
+++ libzmq/src/mailbox.cpp      2014-11-25 11:03:41.045081698 +0100
@@ -17,6 +17,10 @@
     along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */
 
+#if defined ZMQ_POLL_BASED_ON_POLL
+#include <poll.h>
+#endif
+
 #include "mailbox.hpp"
 #include "err.hpp"
 
@@ -84,3 +88,32 @@
     zmq_assert (ok);
     return 0;
 }
+
+int zmq::mailbox_t::recv_ex (command_t *cmd_, int timeout_, pollfd *pfd_)
+{
+    //  Try to get the command straight away.
+    if (active) {
+        bool ok = cpipe.read (cmd_);
+        if (ok)
+            return 0;
+
+        //  If there are no more commands available, switch into passive state.
+        active = false;
+        signaler.recv ();
+    }
+
+    //  Wait for signal from the command sender.
+    int rc = signaler.wait_ex (timeout_, pfd_);
+    if (rc != 0 && (errno == EAGAIN || errno == EINTR))
+        return -1;
+
+    //  We've got the signal. Now we can switch into active state.
+    active = true;
+
+    //  Get a command.
+    errno_assert (rc == 0);
+    bool ok = cpipe.read (cmd_);
+    zmq_assert (ok);
+    return 0;
+}
+
diff -urwB libzmq-original/src/mailbox.hpp libzmq/src/mailbox.hpp
--- libzmq-original/src/mailbox.hpp     2014-11-25 11:01:35.712930479 +0100
+++ libzmq/src/mailbox.hpp      2014-11-25 11:03:41.045081698 +0100
@@ -22,6 +22,8 @@
 
 #include <stddef.h>
 
+#include <poll.h>
+
 #include "platform.hpp"
 #include "signaler.hpp"
 #include "fd.hpp"
@@ -43,6 +45,7 @@
         fd_t get_fd () const;
         void send (const command_t &cmd_);
         int recv (command_t *cmd_, int timeout_);
+        int recv_ex (command_t *cmd_, int timeout_, pollfd *pfd_);
 
 #ifdef HAVE_FORK
         // close the file descriptors in the signaller. This is used in a 
forked
diff -urwB libzmq-original/src/pgm_receiver.cpp libzmq/src/pgm_receiver.cpp
--- libzmq-original/src/pgm_receiver.cpp        2014-11-25 11:01:35.715930435 
+0100
+++ libzmq/src/pgm_receiver.cpp 2014-11-25 11:03:41.045081698 +0100
@@ -275,6 +275,11 @@
 }
 
 
+void zmq::pgm_receiver_t::in_event_ex (pollfd *pfd)
+{
+    zmq::pgm_receiver_t::in_event();
+}
+
 void zmq::pgm_receiver_t::timer_event (int token)
 {
     zmq_assert (token == rx_timer_id);
diff -urwB libzmq-original/src/pgm_receiver.hpp libzmq/src/pgm_receiver.hpp
--- libzmq-original/src/pgm_receiver.hpp        2014-11-25 11:01:35.715930435 
+0100
+++ libzmq/src/pgm_receiver.hpp 2014-11-25 11:03:41.045081698 +0100
@@ -63,6 +63,7 @@
 
         //  i_poll_events interface implementation.
         void in_event ();
+        void in_event_ex (pollfd *);
         void timer_event (int token);
 
     private:
diff -urwB libzmq-original/src/pgm_sender.cpp libzmq/src/pgm_sender.cpp
--- libzmq-original/src/pgm_sender.cpp  2014-11-25 11:01:35.715930435 +0100
+++ libzmq/src/pgm_sender.cpp   2014-11-25 11:03:41.046081791 +0100
@@ -157,6 +157,11 @@
     }
 }
 
+void zmq::pgm_sender_t::in_event_ex (pollfd *pfd)
+{
+    zmq::pgm_sender_t::in_event ();
+}
+
 void zmq::pgm_sender_t::out_event ()
 {
     //  POLLOUT event from send socket. If write buffer is empty, 
diff -urwB libzmq-original/src/pgm_sender.hpp libzmq/src/pgm_sender.hpp
--- libzmq-original/src/pgm_sender.hpp  2014-11-25 11:01:35.716930420 +0100
+++ libzmq/src/pgm_sender.hpp   2014-11-25 11:03:41.046081791 +0100
@@ -62,6 +62,7 @@
 
         //  i_poll_events interface implementation.
         void in_event ();
+        void in_event_ex (pollfd *);
         void out_event ();
         void timer_event (int token);
 
diff -urwB libzmq-original/src/reaper.cpp libzmq/src/reaper.cpp
--- libzmq-original/src/reaper.cpp      2014-11-25 11:01:35.720930360 +0100
+++ libzmq/src/reaper.cpp       2014-11-25 11:03:41.046081791 +0100
@@ -83,6 +83,24 @@
     }
 }
 
+void zmq::reaper_t::in_event_ex (pollfd *pfd)
+{
+    while (true) {
+
+        //  Get the next command. If there is none, exit.
+        command_t cmd;
+        int rc = mailbox.recv_ex (&cmd, 0, pfd);
+        if (rc != 0 && errno == EINTR)
+            continue;
+        if (rc != 0 && errno == EAGAIN)
+            break;
+        errno_assert (rc == 0);
+
+        //  Process the command.
+        cmd.destination->process_command (cmd);
+    }
+}
+
 void zmq::reaper_t::out_event ()
 {
     zmq_assert (false);
diff -urwB libzmq-original/src/reaper.hpp libzmq/src/reaper.hpp
--- libzmq-original/src/reaper.hpp      2014-11-25 11:01:35.720930360 +0100
+++ libzmq/src/reaper.hpp       2014-11-25 11:03:41.047081882 +0100
@@ -45,6 +45,7 @@
 
         //  i_poll_events implementation.
         void in_event ();
+        void in_event_ex (pollfd *);
         void out_event ();
         void timer_event (int id_);
 
diff -urwB libzmq-original/src/signaler.cpp libzmq/src/signaler.cpp
--- libzmq-original/src/signaler.cpp    2014-11-25 11:01:35.721930346 +0100
+++ libzmq/src/signaler.cpp     2014-11-25 12:19:39.843830861 +0100
@@ -260,6 +260,94 @@
 #endif
 }
 
+int zmq::signaler_t::wait_ex (int timeout_, pollfd *pfd_)
+{
+#ifdef HAVE_FORK
+    if (unlikely (pid != getpid ())) {
+        // we have forked and the file descriptor is closed. Emulate an 
interupt
+        // response.
+        //printf("Child process %d signaler_t::wait returning simulating 
interrupt #1\n", getpid());
+        errno = EINTR;
+        return -1;
+    }
+#endif
+
+#ifdef ZMQ_POLL_BASED_ON_POLL
+    struct pollfd pfd;
+    pfd.fd = r;
+    pfd.events = POLLIN;
+    if(timeout_)
+    {
+       int rc = poll (&pfd, 1, timeout_);
+       if (unlikely (rc < 0)) {
+           errno_assert (errno == EINTR);
+           return -1;
+       }
+       else if (unlikely (rc == 0)) {
+           errno = EAGAIN;
+           return -1;
+       }
+#ifdef HAVE_FORK
+       else
+           if (unlikely (pid != getpid ())) {
+               // we have forked and the file descriptor is closed. Emulate an 
interupt
+               // response.
+               //printf("Child process %d signaler_t::wait returning 
simulating interrupt #2\n", getpid());
+               errno = EINTR;
+               return -1;
+           }
+#endif
+       zmq_assert (rc == 1);
+       zmq_assert (pfd.revents & POLLIN);
+       return 0;
+    }
+    else
+    {
+       if((pfd_->revents & POLLIN) == 0)
+       {
+           // simulate EAGAIN
+           errno = EAGAIN;
+           return -1;
+       }
+       zmq_assert (pfd_->revents & POLLIN);
+       // MUST clear POLLIN
+       pfd_->revents &= ~POLLIN;
+       return 0;
+    }
+    
+#elif defined ZMQ_POLL_BASED_ON_SELECT
+    fd_set fds;
+    FD_ZERO (&fds);
+    FD_SET (r, &fds);
+    struct timeval timeout;
+    if (timeout_ >= 0) {
+        timeout.tv_sec = timeout_ / 1000;
+        timeout.tv_usec = timeout_ % 1000 * 1000;
+    }
+#ifdef ZMQ_HAVE_WINDOWS
+    int rc = select (0, &fds, NULL, NULL,
+        timeout_ >= 0 ? &timeout : NULL);
+    wsa_assert (rc != SOCKET_ERROR);
+#else
+    int rc = select (r + 1, &fds, NULL, NULL,
+        timeout_ >= 0 ? &timeout : NULL);
+    if (unlikely (rc < 0)) {
+        errno_assert (errno == EINTR);
+        return -1;
+    }
+#endif
+    if (unlikely (rc == 0)) {
+        errno = EAGAIN;
+        return -1;
+    }
+    zmq_assert (rc == 1);
+    return 0;
+
+#else
+#error
+#endif
+}
+
 void zmq::signaler_t::recv ()
 {
     //  Attempt to read a signal.
diff -urwB libzmq-original/src/signaler.hpp libzmq/src/signaler.hpp
--- libzmq-original/src/signaler.hpp    2014-11-25 11:01:35.722930332 +0100
+++ libzmq/src/signaler.hpp     2014-11-25 11:03:41.047081882 +0100
@@ -24,6 +24,8 @@
 #include <unistd.h>
 #endif
 
+#include <poll.h>
+
 #include "fd.hpp"
 
 namespace zmq
@@ -44,6 +46,7 @@
         fd_t get_fd () const;
         void send ();
         int wait (int timeout_);
+        int wait_ex (int timeout_, pollfd *pfd_);
         void recv ();
 
 #ifdef HAVE_FORK
diff -urwB libzmq-original/src/socket_base.cpp libzmq/src/socket_base.cpp
--- libzmq-original/src/socket_base.cpp 2014-11-25 11:01:35.722930332 +0100
+++ libzmq/src/socket_base.cpp  2014-11-25 11:03:41.048081973 +0100
@@ -36,6 +36,8 @@
 #include <unistd.h>
 #endif
 
+#include <poll.h>
+
 #include "socket_base.hpp"
 #include "tcp_listener.hpp"
 #include "ipc_listener.hpp"
@@ -346,6 +348,38 @@
     return options.getsockopt (option_, optval_, optvallen_);
 }
 
+// only for ZMQ_EVENTS
+int zmq::socket_base_t::getsockopt_ex (int option_, void *optval_,
+                                      size_t *optvallen_, pollfd *pfd_)
+{
+    if (unlikely (ctx_terminated)) {
+        errno = ETERM;
+        return -1;
+    }
+
+    assert (option_ == ZMQ_EVENTS);
+
+    if (option_ == ZMQ_EVENTS) {
+        if (*optvallen_ < sizeof (int)) {
+            errno = EINVAL;
+            return -1;
+        }
+        int rc = process_commands_ex (0, false, pfd_);
+        if (rc != 0 && (errno == EINTR || errno == ETERM))
+            return -1;
+        errno_assert (rc == 0);
+        *((int*) optval_) = 0;
+        if (has_out ())
+            *((int*) optval_) |= ZMQ_POLLOUT;
+        if (has_in ())
+            *((int*) optval_) |= ZMQ_POLLIN;
+        *optvallen_ = sizeof (int);
+        return 0;
+    }
+
+    return 0;
+}
+
 int zmq::socket_base_t::bind (const char *addr_)
 {
     if (unlikely (ctx_terminated)) {
@@ -1026,6 +1060,62 @@
     return 0;
 }
 
+int zmq::socket_base_t::process_commands_ex (int timeout_, bool throttle_, 
pollfd *pfd_)
+{
+    int rc;
+    command_t cmd;
+    if (timeout_ != 0) {
+
+        //  If we are asked to wait, simply ask mailbox to wait.
+        rc = mailbox.recv_ex (&cmd, timeout_, pfd_);
+    }
+    else {
+
+        //  If we are asked not to wait, check whether we haven't processed
+        //  commands recently, so that we can throttle the new commands.
+
+        //  Get the CPU's tick counter. If 0, the counter is not available.
+        uint64_t tsc = zmq::clock_t::rdtsc ();
+
+        //  Optimised version of command processing - it doesn't have to check
+        //  for incoming commands each time. It does so only if certain time
+        //  elapsed since last command processing. Command delay varies
+        //  depending on CPU speed: It's ~1ms on 3GHz CPU, ~2ms on 1.5GHz CPU
+        //  etc. The optimisation makes sense only on platforms where getting
+        //  a timestamp is a very cheap operation (tens of nanoseconds).
+        if (tsc && throttle_) {
+
+            //  Check whether TSC haven't jumped backwards (in case of 
migration
+            //  between CPU cores) and whether certain time have elapsed since
+            //  last command processing. If it didn't do nothing.
+            if (tsc >= last_tsc && tsc - last_tsc <= max_command_delay)
+                return 0;
+            last_tsc = tsc;
+        }
+
+        //  Check whether there are any commands pending for this thread.
+        rc = mailbox.recv_ex (&cmd, 0, pfd_);
+    }
+
+    //  Process all available commands.
+    while (rc == 0) {
+        cmd.destination->process_command (cmd);
+        rc = mailbox.recv_ex (&cmd, 0, pfd_);
+    }
+
+    if (errno == EINTR)
+        return -1;
+
+    zmq_assert (errno == EAGAIN);
+
+    if (ctx_terminated) {
+        errno = ETERM;
+        return -1;
+    }
+
+    return 0;
+}
+
 void zmq::socket_base_t::process_stop ()
 {
     //  Here, someone have called zmq_term while the socket was still alive.
@@ -1124,6 +1214,16 @@
     check_destroy ();
 }
 
+void zmq::socket_base_t::in_event_ex (pollfd *pfd)
+{
+    //  This function is invoked only once the socket is running in the context
+    //  of the reaper thread. Process any commands from other threads/sockets
+    //  that may be available at the moment. Ultimately, the socket will
+    //  be destroyed.
+    process_commands (0, false);
+    check_destroy ();
+}
+
 void zmq::socket_base_t::out_event ()
 {
     zmq_assert (false);
diff -urwB libzmq-original/src/socket_base.hpp libzmq/src/socket_base.hpp
--- libzmq-original/src/socket_base.hpp 2014-11-25 11:01:35.722930332 +0100
+++ libzmq/src/socket_base.hpp  2014-11-25 11:03:41.048081973 +0100
@@ -75,6 +75,7 @@
         //  Interface for communication with the API layer.
         int setsockopt (int option_, const void *optval_, size_t optvallen_);
         int getsockopt (int option_, void *optval_, size_t *optvallen_);
+        int getsockopt_ex (int option_, void *optval_, size_t *optvallen_, 
pollfd *pfd_);
         int bind (const char *addr_);
         int connect (const char *addr_);
         int term_endpoint (const char *addr_);
@@ -94,6 +95,7 @@
         //  i_poll_events implementation. This interface is used when socket
         //  is handled by the poller in the reaper thread.
         void in_event ();
+        void in_event_ex (pollfd *);
         void out_event ();
         void timer_event (int id_);
 
@@ -219,6 +221,7 @@
         //  If throttle argument is true, commands are processed at most once
         //  in a predefined time period.
         int process_commands (int timeout_, bool throttle_);
+        int process_commands_ex (int timeout_, bool throttle_, pollfd *pfd_);
 
         //  Handlers for incoming commands.
         void process_stop ();
diff -urwB libzmq-original/src/stream_engine.cpp libzmq/src/stream_engine.cpp
--- libzmq-original/src/stream_engine.cpp       2014-11-25 11:01:35.723930318 
+0100
+++ libzmq/src/stream_engine.cpp        2014-11-25 11:03:41.049082060 +0100
@@ -320,6 +320,11 @@
     session->flush ();
 }
 
+void zmq::stream_engine_t::in_event_ex (pollfd *pfd)
+{
+    zmq::stream_engine_t::in_event();
+}
+
 void zmq::stream_engine_t::out_event ()
 {
     zmq_assert (!io_error);
diff -urwB libzmq-original/src/stream_engine.hpp libzmq/src/stream_engine.hpp
--- libzmq-original/src/stream_engine.hpp       2014-11-25 11:01:35.724930303 
+0100
+++ libzmq/src/stream_engine.hpp        2014-11-25 11:03:41.049082060 +0100
@@ -73,6 +73,7 @@
 
         //  i_poll_events interface implementation.
         void in_event ();
+        void in_event_ex (pollfd *);
         void out_event ();
         void timer_event (int id_);
 
diff -urwB libzmq-original/src/tcp_connecter.cpp libzmq/src/tcp_connecter.cpp
--- libzmq-original/src/tcp_connecter.cpp       2014-11-25 11:01:35.725930288 
+0100
+++ libzmq/src/tcp_connecter.cpp        2014-11-25 11:03:41.049082060 +0100
@@ -108,6 +108,14 @@
     out_event ();
 }
 
+void zmq::tcp_connecter_t::in_event_ex (pollfd *pfd)
+{
+    //  We are not polling for incomming data, so we are actually called
+    //  because of error here. However, we can get error on out event as well
+    //  on some platforms, so we'll simply handle both events in the same way.
+    out_event ();
+}
+
 void zmq::tcp_connecter_t::out_event ()
 {
     rm_fd (handle);
diff -urwB libzmq-original/src/tcp_connecter.hpp libzmq/src/tcp_connecter.hpp
--- libzmq-original/src/tcp_connecter.hpp       2014-11-25 11:01:35.725930288 
+0100
+++ libzmq/src/tcp_connecter.hpp        2014-11-25 11:03:41.050082146 +0100
@@ -55,6 +55,7 @@
 
         //  Handlers for I/O events.
         void in_event ();
+        void in_event_ex (pollfd *);
         void out_event ();
         void timer_event (int id_);
 
diff -urwB libzmq-original/src/tcp_listener.cpp libzmq/src/tcp_listener.cpp
--- libzmq-original/src/tcp_listener.cpp        2014-11-25 11:01:35.725930288 
+0100
+++ libzmq/src/tcp_listener.cpp 2014-11-25 11:03:41.050082146 +0100
@@ -114,6 +114,11 @@
     socket->event_accepted (endpoint, fd);
 }
 
+void zmq::tcp_listener_t::in_event_ex (pollfd *pfd)
+{
+    zmq::tcp_listener_t::in_event ();
+}
+
 void zmq::tcp_listener_t::close ()
 {
     zmq_assert (s != retired_fd);
diff -urwB libzmq-original/src/tcp_listener.hpp libzmq/src/tcp_listener.hpp
--- libzmq-original/src/tcp_listener.hpp        2014-11-25 11:01:35.725930288 
+0100
+++ libzmq/src/tcp_listener.hpp 2014-11-25 11:03:41.050082146 +0100
@@ -55,6 +55,7 @@
 
         //  Handlers for I/O events.
         void in_event ();
+        void in_event_ex (pollfd *);
 
         //  Close the listening socket.
         void close ();
diff -urwB libzmq-original/src/zmq.cpp libzmq/src/zmq.cpp
--- libzmq-original/src/zmq.cpp 2014-11-25 11:01:35.729930228 +0100
+++ libzmq/src/zmq.cpp  2014-11-25 13:41:42.364243253 +0100
@@ -263,6 +263,17 @@
     return result;
 }
 
+int zmq_getsockopt_ex (void *s_, int option_, void *optval_, size_t 
*optvallen_, pollfd *pfd_)
+{
+    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
+        errno = ENOTSOCK;
+        return -1;
+    }
+    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
+    int result = s->getsockopt_ex (option_, optval_, optvallen_, pfd_);
+    return result;
+}
+
 int zmq_getsockopt (void *s_, int option_, void *optval_, size_t *optvallen_)
 {
     if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
@@ -761,10 +772,11 @@
             //  using the ZMQ_EVENTS socket option.
             if (items_ [i].socket) {
                 size_t zmq_events_size = sizeof (uint32_t);
-                uint32_t zmq_events;
-                if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events,
-                    &zmq_events_size) == -1) {
-                    if (pollfds != spollfds)
+                uint32_t zmq_events = 0;
+
+               if (zmq_getsockopt_ex (items_ [i].socket, ZMQ_EVENTS,
+                                      &zmq_events,
+                                      &zmq_events_size, &pollfds [i]) == -1) {
                         free (pollfds);
                     return -1;
                 }
@@ -774,6 +786,9 @@
                 if ((items_ [i].events & ZMQ_POLLIN) &&
                       (zmq_events & ZMQ_POLLIN))
                     items_ [i].revents |= ZMQ_POLLIN;
+                if ((items_ [i].events & ZMQ_POLLPRI) &&
+                      (zmq_events & ZMQ_POLLPRI))
+                    items_ [i].revents |= ZMQ_POLLPRI;
             }
             //  Else, the poll item is a raw file descriptor, simply convert
             //  the events to zmq_pollitem_t-style format.
_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev

Reply via email to