On 2/14/20 6:54 PM, anton.iva...@cambridgegreys.com wrote: > From: Anton Ivanov <anton.iva...@cambridgegreys.com> > > 1. Adds "persistent" behaviour where feasible (streams and signals). > These are waited upon in the same thread where they are created. This > allows them to be registered persistently with the OS (if possible) > as well as the OS to provide hints - is the FD ready, is it closed, > etc. > > 2. Removes unnecessary attempts to perform a read vs EAGAIN on a fd > which is not ready if that fd has been registered as "private" to the > thread which waits upon it. > > 3. No longer breaks other parts of OVS which create the fd in one > thread and waits upon it in others. > > 3. Adds support for EPOLL on Linux and can be expanded to cover similar > poll++ frameworks in other OSes. > > 4. Sets up the necessary infrastructure to make IO/SSL multi-threaded > using a "centeral (e)poll dispatcher + IO threads" pattern > > Signed-off-by: Anton Ivanov <anton.iva...@cambridgegreys.com>
Hi Anton, A couple of issues inline. Except for that: 1. The "STP - flush the fdb and mdb when topology changed" OVS test is failing with your patches applied: make check TESTSUITEFLAGS='-k "flush the fdb"' 2. Travis CI build fails: lib/fatal-signal.c:244:5: error: ignoring return value of ‘read’, declared with attribute warn_unused_result [-Werror=unused-result] read(signal_fds[0], sigbuffer, sizeof(sigbuffer)); 3. Travis CI OSX build fails: lib/poll-loop.c:46:1: error: unused function 'poll_create_node_add' [-Werror,-Wunused-function] COVERAGE_DEFINE(poll_create_node); 4. While OVS might benefit from these changes I'm wondering about OVN and ovsdb-server specifically. ovsdb-server is single threaded and usually on large scale deployments we don't really see "poll" as the bottleneck or even the fact that code tries to read/write from FDs when FDs are not available for read/write. For example, here are results of running a scale test scenario which repeats the following iteration 300 times: - bring up a node (ovn-fake-multinode container) and connect it to the OVN Southbound DB. - configure an OVN logical switch to be bound to the new node. - configure an OVN logical switch port on the new logical switch. - configure an OVS internal interface on the new node and bind it to the OVN logical switch port. - wait until the new internal interface can ping its default gateway through OVN (i.e., until ovn-controller on the node received all updates from the SB DB and installed all OVS flows), highlighted in the output. The tests use rally-ovs (ovn-scale-test) on a 9 server setup (1 machine running OVN ovsdb-servers and ovn-northd and 8 machines simulating chassis using ovn-fake-multinode), in particular this modified scenario: https://github.com/dceara/ovn-scale-test/blob/ovn-switch-per-node/samples/tasks/scenarios/ovn-network/osh_workload_incremental.json With OVS master and OVN master: http://pastebin.test.redhat.com/836568 With OVS master + your patches and OVN master: http://pastebin.test.redhat.com/836571 Here are some of the logs we get on the OVN Southbound DB ovsdb-server that show that ovsdb-server spends up to 2 seconds in a single loop iteration sending/receiving updates to/from ovn-controllers: 2020-02-17T10:43:41.175Z|01991|poll_loop|INFO|wakeup due to [OVS_POLLIN] on fd 140 (192.16.0.1:6642<->192.16.0.120:52018) at lib/stream-fd.c:79 (84% CPU usage) 2020-02-17T10:43:43.338Z|01992|timeval|WARN|Unreasonably long 2163ms poll interval (2144ms user, 9ms system) 2020-02-17T10:43:43.339Z|01993|timeval|WARN|faults: 590 minor, 0 major 2020-02-17T10:43:43.339Z|01994|timeval|WARN|disk: 0 reads, 8 writes 2020-02-17T10:43:43.339Z|01995|timeval|WARN|context switches: 0 voluntary, 4 involuntary 2020-02-17T10:43:43.339Z|01996|poll_loop|INFO|Dropped 63 log messages in last 2 seconds (most recently, 2 seconds ago) due to excessive rate 2020-02-17T10:43:43.339Z|01997|poll_loop|INFO|wakeup due to [OVS_POLLIN] on fd 76 (192.16.0.1:6642<->192.16.0.56:33538) at lib/stream-fd.c:79 (84% CPU usage) 2020-02-17T10:43:45.495Z|01998|timeval|WARN|Unreasonably long 2156ms poll interval (2129ms user, 17ms system) 2020-02-17T10:43:45.495Z|01999|timeval|WARN|faults: 738 minor, 0 major 2020-02-17T10:43:45.495Z|02000|timeval|WARN|context switches: 0 voluntary, 7 involuntary 2020-02-17T10:43:47.651Z|02001|timeval|WARN|Unreasonably long 2157ms poll interval (2136ms user, 10ms system) In this case, and I think in most OVN use cases, ovsdb-server is busy because it actually has to send updates to large numbers of ovn-controllers connected to it. Unless I'm missing something the epoll change seems to improve performance only in cases where the Southbound DB doesn't do much sending/receiving. How do you test performance/scalability improvements? Regards, Dumitru > --- > include/openvswitch/poll-loop.h | 56 +++++- > lib/dpif-netlink.c | 6 +- > lib/fatal-signal.c | 7 +- > lib/latch-unix.c | 3 +- > lib/netdev-afxdp.c | 2 +- > lib/poll-loop.c | 320 ++++++++++++++++++++++++-------- > lib/route-table-bsd.c | 1 + > lib/stream-fd.c | 62 ++++++- > lib/stream-ssl.c | 50 ++++- > lib/timeval.c | 83 +++++++++ > lib/timeval.h | 7 + > 11 files changed, 508 insertions(+), 89 deletions(-) > > diff --git a/include/openvswitch/poll-loop.h b/include/openvswitch/poll-loop.h > index 532d9caa6..6d0331f6d 100644 > --- a/include/openvswitch/poll-loop.h > +++ b/include/openvswitch/poll-loop.h > @@ -41,11 +41,30 @@ > #include <windows.h> > #endif > > +#ifdef __linux__ > +#define OVS_USE_EPOLL > +#endif > + > +#ifdef OVS_USE_EPOLL > +#include <sys/epoll.h> > + > +#define OVS_POLLIN EPOLLIN > +#define OVS_POLLOUT EPOLLOUT > +#define OVS_POLLERR EPOLLERR > +#define OVS_POLLHUP EPOLLHUP > +#define OVS_ONESHOT EPOLLONESHOT > +#define OVS_POLLNVAL 0 > + > +#else > + > #define OVS_POLLIN POLLIN > #define OVS_POLLOUT POLLOUT > #define OVS_POLLERR POLLERR > #define OVS_POLLNVAL POLLNVAL > #define OVS_POLLHUP POLLHUP > +#define OVS_ONESHOT (1U << 30) > + > +#endif > > #ifdef __cplusplus > extern "C" { > @@ -60,10 +79,43 @@ extern "C" { > * the source code location of the caller. The function version allows the > * caller to supply a location explicitly, which is useful if the caller's > own > * caller would be more useful in log output. See timer_wait_at() for an > - * example. */ > -void poll_fd_wait_at(int fd, short int events, const char *where); > + * example. > + * Note - using on fds registered using poll_fd_register() will generate a > + * warning as this is not an intended use. > + */ > +void poll_fd_wait_at(int fd, int events, const char *where); > #define poll_fd_wait(fd, events) poll_fd_wait_at(fd, events, > OVS_SOURCE_LOCATOR) > > +/* Register a fd with a persistence framework if available so it can be > served > + * "faster" and the caller can be provided with "hints" on what caused the IO > + * event. > + * If the "hint" argument is supplied it set to point to the pollfd structure > + * containing the events passed by the OS in .revents. > + * Note - as the frameworks are OS dependent, the events are limited to what > + * can be passed in a .revents which is a short int. > + * Limitations - MUST BE registered from the same thread as the one where > + * it will be waited upon. > + */ > + > +void poll_fd_register_at(int fd, int events, struct pollfd **hint, const > char *where); > +#define poll_fd_register(fd, events, hint) poll_fd_register_at(fd, events, > hint, OVS_SOURCE_LOCATOR) > + > +/* De-register a fd which was registered as "private" with the persistence > + * framework > + */ > + > +void poll_fd_deregister_at(int fd, const char *where); > +#define poll_fd_deregister(fd) poll_fd_deregister_at(fd, OVS_SOURCE_LOCATOR) > + > +/* Schedule events to wake up the following poll_block() - "private fds" > + * Same as poll_fd_wait, but for fds which have been registered and are > + * expected to persist. If a "fast" OS fd notification framework is used > + * this version of wait may be a NOOP (f.e. for (E)POLLIN events. > + */ > +void private_poll_fd_wait_at(int fd, int events, const char *where); > +#define private_poll_fd_wait(fd, events) private_poll_fd_wait_at(fd, events, > OVS_SOURCE_LOCATOR) > + > + > #ifdef _WIN32 > void poll_wevent_wait_at(HANDLE wevent, const char *where); > #define poll_wevent_wait(wevent) poll_wevent_wait_at(wevent, > OVS_SOURCE_LOCATOR) > diff --git a/lib/dpif-netlink.c b/lib/dpif-netlink.c > index 5b5c96d72..ad5db9452 100644 > --- a/lib/dpif-netlink.c > +++ b/lib/dpif-netlink.c > @@ -1289,7 +1289,7 @@ dpif_netlink_port_poll_wait(const struct dpif *dpif_) > const struct dpif_netlink *dpif = dpif_netlink_cast(dpif_); > > if (dpif->port_notifier) { > - nl_sock_wait(dpif->port_notifier, POLLIN); > + nl_sock_wait(dpif->port_notifier, OVS_POLLIN); > } else { > poll_immediate_wake(); > } > @@ -2756,13 +2756,13 @@ dpif_netlink_recv_wait__(struct dpif_netlink *dpif, > uint32_t handler_id) > } > > for (i = 0; i < VPORT_SOCK_POOL_SIZE; i++) { > - nl_sock_wait(sock_pool[i].nl_sock, POLLIN); > + nl_sock_wait(sock_pool[i].nl_sock, OVS_POLLIN); > } > #else > if (dpif->handlers && handler_id < dpif->n_handlers) { > struct dpif_handler *handler = &dpif->handlers[handler_id]; > > - poll_fd_wait(handler->epoll_fd, POLLIN); > + poll_fd_wait(handler->epoll_fd, OVS_POLLIN); > } > #endif > } > diff --git a/lib/fatal-signal.c b/lib/fatal-signal.c > index 97d8d1dab..424636e07 100644 > --- a/lib/fatal-signal.c > +++ b/lib/fatal-signal.c > @@ -96,6 +96,7 @@ fatal_signal_init(void) > ovs_mutex_init_recursive(&mutex); > #ifndef _WIN32 > xpipe_nonblocking(signal_fds); > + poll_fd_register(signal_fds[0], OVS_POLLIN, NULL); > #else > wevent = CreateEvent(NULL, TRUE, FALSE, NULL); > if (!wevent) { > @@ -236,9 +237,12 @@ void > fatal_signal_run(void) > { > sig_atomic_t sig_nr; > + char sigbuffer[_POSIX_PIPE_BUF]; > > fatal_signal_init(); > > + read(signal_fds[0], sigbuffer, sizeof(sigbuffer)); > + > sig_nr = stored_sig_nr; > if (sig_nr != SIG_ATOMIC_MAX) { > char namebuf[SIGNAL_NAME_BUFSIZE]; > @@ -271,7 +275,8 @@ fatal_signal_wait(void) > #ifdef _WIN32 > poll_wevent_wait(wevent); > #else > - poll_fd_wait(signal_fds[0], OVS_POLLIN); > + /* a noop - schedule for removal */ > + private_poll_fd_wait(signal_fds[0], OVS_POLLIN); > #endif > } > > diff --git a/lib/latch-unix.c b/lib/latch-unix.c > index fea61ab28..5f15b59fe 100644 > --- a/lib/latch-unix.c > +++ b/lib/latch-unix.c > @@ -83,5 +83,6 @@ latch_is_set(const struct latch *latch) > void > latch_wait_at(const struct latch *latch, const char *where) > { > - poll_fd_wait_at(latch->fds[0], OVS_POLLIN, where); > + /* Ask for wait and make it one-shot if persistence is in play */ > + poll_fd_wait_at(latch->fds[0], OVS_POLLIN | OVS_ONESHOT, where); > } > diff --git a/lib/netdev-afxdp.c b/lib/netdev-afxdp.c > index ef367e5ea..482400d8d 100644 > --- a/lib/netdev-afxdp.c > +++ b/lib/netdev-afxdp.c > @@ -184,7 +184,7 @@ xsk_rx_wakeup_if_needed(struct xsk_umem_info *umem, > > if (xsk_ring_prod__needs_wakeup(&umem->fq)) { > pfd.fd = fd; > - pfd.events = OVS_POLLIN; > + pfd.events = POLLIN; > > ret = poll(&pfd, 1, 0); > if (OVS_UNLIKELY(ret < 0)) { > diff --git a/lib/poll-loop.c b/lib/poll-loop.c > index 3902d6c1f..10a5b0c01 100644 > --- a/lib/poll-loop.c > +++ b/lib/poll-loop.c > @@ -18,6 +18,12 @@ > #include "openvswitch/poll-loop.h" > #include <errno.h> > #include <inttypes.h> > +#ifdef OVS_USE_EPOLL > +#include <sys/epoll.h> > +#endif > +#ifndef _WIN32 > +#include <unistd.h> > +#endif > #include <poll.h> > #include <stdlib.h> > #include <string.h> > @@ -31,7 +37,9 @@ > #include "timeval.h" > #include "openvswitch/vlog.h" > #include "openvswitch/hmap.h" > +#include "openvswitch/list.h" > #include "hash.h" > +#include "ovs-atomic.h" > > VLOG_DEFINE_THIS_MODULE(poll_loop); > > @@ -43,21 +51,32 @@ struct poll_node { > struct pollfd pollfd; /* Events to pass to time_poll(). */ > HANDLE wevent; /* Events for WaitForMultipleObjects(). */ > const char *where; /* Where poll_node was created. */ > + bool valid; /* Can it be used? */ > + bool private; /* Can we assume that it is only in this > thread poll loop? */ > }; > > +#define MAX_EPOLL_EVENTS 64 > + > struct poll_loop { > - /* All active poll waiters. */ > + /* List of all poll loops in the system */ > + struct ovs_mutex loop_mutex; > + /* All poll waiters for this poll loop */ > struct hmap poll_nodes; > > /* Time at which to wake up the next call to poll_block(), LLONG_MIN to > * wake up immediately, or LLONG_MAX to wait forever. */ > long long int timeout_when; /* In msecs as returned by time_msec(). */ > const char *timeout_where; /* Where 'timeout_when' was set. */ > +#ifdef OVS_USE_EPOLL > + int epoll_fd; > + struct epoll_event epoll_events[MAX_EPOLL_EVENTS]; > +#endif > }; > > + > static struct poll_loop *poll_loop(void); > > -/* Look up the node with same fd or wevent. */ > +/* Look up the node with same fd or wevent - should be accessed under > &loop->mutex. */ > static struct poll_node * > find_poll_node(struct poll_loop *loop, int fd, HANDLE wevent) > { > @@ -76,79 +95,142 @@ find_poll_node(struct poll_loop *loop, int fd, HANDLE > wevent) > } > return NULL; > } > - > -/* On Unix based systems: > - * > - * Registers 'fd' as waiting for the specified 'events' (which should be > - * OVS_POLLIN or OVS_POLLOUT or OVS_POLLIN | OVS_POLLOUT). The > following call to > - * poll_block() will wake up when 'fd' becomes ready for one or more of > the > - * requested events. The 'fd's are given to poll() function later. > - * > - * On Windows system: > +/* Registers 'fd' as waiting for the specified 'events' (which should be > OVS_POLLIN > + * or OVS_POLLOUT or OVS_POLLIN | OVS_POLLOUT). The following call to > poll_block() will > + * wake up when 'fd' becomes ready for one or more of the requested events. > * > - * If 'fd' is specified, create a new 'wevent'. Association of 'fd' and > - * 'wevent' for 'events' happens in poll_block(). If 'wevent' is > specified, > - * it is assumed that it is unrelated to any sockets and poll_block() > - * will wake up on any event on that 'wevent'. It is an error to pass > - * both 'wevent' and 'fd'. > + * The event registration is PERSISTENT. This is intended for OSes which > have a persistent > + * event framework. For now it is implemented only for epoll and Linux, other > + * implementations such as BSD kqueue and Solaris /dev/poll may follow. > * > - * The event registration is one-shot: only the following call to > - * poll_block() is affected. The event will need to be re-registered after > - * poll_block() is called if it is to persist. > + * If the OS has no persistent even framework does nothing > * > * ('where' is used in debug logging. Commonly one would use poll_fd_wait() > to > * automatically provide the caller's source file and line number for > * 'where'.) */ > + > static void > -poll_create_node(int fd, HANDLE wevent, short int events, const char *where) > +poll_fd_subscribe_at(int fd, HANDLE wevent, int events, struct pollfd > **hint, const char *where, bool private) > { > struct poll_loop *loop = poll_loop(); > struct poll_node *node; > +#ifdef OVS_USE_EPOLL > + struct epoll_event event; > +#endif > > - COVERAGE_INC(poll_create_node); > - > - /* Both 'fd' and 'wevent' cannot be set. */ > ovs_assert(!fd != !wevent); > > + /* This is mostly uncontended, so the thread should grab it straight > away. > + * We will reuse it later to introduce threading for IO and SSL > + */ > + ovs_mutex_lock(&loop->loop_mutex); > + > /* Check for duplicate. If found, "or" the events. */ > node = find_poll_node(loop, fd, wevent); > - if (node) { > - node->pollfd.events |= events; > - } else { > - node = xzalloc(sizeof *node); > - hmap_insert(&loop->poll_nodes, &node->hmap_node, > - hash_2words(fd, (uint32_t)wevent)); > - node->pollfd.fd = fd; > - node->pollfd.events = events; > -#ifdef _WIN32 > - if (!wevent) { > - wevent = CreateEvent(NULL, FALSE, FALSE, NULL); > + > + if (node && node->valid) { > +#ifdef OVS_USE_EPOLL > + int old_event_mask = node->pollfd.events; > +#endif > + /* If there is an existing event mask we do not need to inc - this > will be waited upon */ > + node->pollfd.events |= (events & 0x0000FFFF); /* or without epoll > specific bits */ > + > +#ifdef OVS_USE_EPOLL > + /* modify existing epoll entry if there is an epoll specific ask or > if the > + * mask has changed > + */ > + if ((events & 0xFFFF0000) || (old_event_mask != > node->pollfd.events)) { > + event.events = node->pollfd.events | events | EPOLLHUP | > EPOLLRDHUP; > + event.data.ptr = node; > + epoll_ctl(loop->epoll_fd, EPOLL_CTL_MOD, fd, &event); > } > #endif > + } else { > + if (!node) { > + node = xzalloc(sizeof *node); > + hmap_insert(&loop->poll_nodes, &node->hmap_node, > + hash_2words(fd, 0)); > + } else { > + /* node marked for reaping, OS has reused the fd number, valid > is set to false */ > +#ifdef OVS_USE_EPOLl This should be "#ifdef OVS_USE_EPOLL" > + epoll_ctl(loop->epoll_fd, EPOLL_CTL_DEL, fd, NULL); > +#endif > + } > + node->pollfd.fd = fd; > + node->pollfd.events = (events & 0x0000FFFF); > node->wevent = wevent; > node->where = where; > + node->valid = true; > + node->private = private; > +#ifdef OVS_USE_EPOLL > + event.events = node->pollfd.events | EPOLLHUP | EPOLLRDHUP; /* we > always listen for fd close */ > + event.data.ptr = node; > + epoll_ctl(loop->epoll_fd, EPOLL_CTL_ADD, fd, &event); > +#endif > + } > + if (hint) { > + *hint = &node->pollfd; > } > + ovs_mutex_unlock(&loop->loop_mutex); > +} > + > +void > +poll_fd_register_at(int fd, int events, struct pollfd **hint, const char > *where) { > + poll_fd_subscribe_at(fd, 0, events, hint, where , true); > +} > + > +/* Deregisters a fd. Note - this looks like a memory leak (deallocating only > private fds) > + * but it is not. > + * In order to be compatible with existing calling conventions while using > fd persistence > + * where supported we have to keep "legacy" fds around for the duration of > the life of > + * the thread because we have no idea if they have been reaped properly or > not. > + * The reason for this is that for some of them the close() is in a thread > different from the > + * poll loop. > + * Thus, the only thing we can do in this case is mark them "invalid". Once > the OS reuses the > + * same fd number, we will reuse the existing has entry. > + */ > + > +void > +poll_fd_deregister_at(int fd, const char *where) { > + struct poll_loop *loop = poll_loop(); > + > + VLOG(VLL_DBG, "Deregister %d from %s", fd, where); > + struct poll_node *node; > + > + ovs_mutex_lock(&loop->loop_mutex); > + node = find_poll_node(loop, fd, 0); > + if (node) { > + if (node->private) { > +#ifdef OVN_USE_EPOLL This should be "#ifdef OVS_USE_EPOLL". > + epoll_ctl(loop->epoll_fd, EPOLL_CTL_DEL, node->pollfd.fd, NULL); > +#endif > + hmap_remove(&loop->poll_nodes, &node->hmap_node); > + } else { > + VLOG(VLL_WARN, "Trying to deregister a non-private %d from %s", > fd, where); > + node->valid = false; > + } > + } > + ovs_mutex_unlock(&loop->loop_mutex); > +} > + > +void > +poll_fd_wait_at(int fd, int events, const char *where) > +{ > + poll_fd_subscribe_at(fd, 0, events, NULL, where, false); > } > > -/* Registers 'fd' as waiting for the specified 'events' (which should be > OVS_POLLIN > - * or OVS_POLLOUT or OVS_POLLIN | OVS_POLLOUT). The following call to > poll_block() will > - * wake up when 'fd' becomes ready for one or more of the requested events. > - * > - * On Windows, 'fd' must be a socket. > - * > - * The event registration is one-shot: only the following call to > poll_block() > - * is affected. The event will need to be re-registered after poll_block() > is > - * called if it is to persist. > - * > - * ('where' is used in debug logging. Commonly one would use poll_fd_wait() > to > - * automatically provide the caller's source file and line number for > - * 'where'.) */ > void > -poll_fd_wait_at(int fd, short int events, const char *where) > +private_poll_fd_wait_at(int fd, int events, const char *where) > { > - poll_create_node(fd, 0, events, where); > + /* POLLIN persists on "private" fds - either emulated or at epoll > + * or other persistence framework level > + */ > + if (events & (~OVS_POLLIN)) { > + poll_fd_subscribe_at(fd, 0, events, NULL, where, true); > + } > } > > + > #ifdef _WIN32 > /* Registers for the next call to poll_block() to wake up when 'wevent' is > * signaled. > @@ -163,7 +245,7 @@ poll_fd_wait_at(int fd, short int events, const char > *where) > void > poll_wevent_wait_at(HANDLE wevent, const char *where) > { > - poll_create_node(0, wevent, 0, where); > + poll_fd_subscribe_at(0, wevent, 0, NULL, where); > } > #endif /* _WIN32 */ > > @@ -277,9 +359,12 @@ log_wakeup(const char *where, const struct pollfd > *pollfd, int timeout) > if (pollfd->revents & OVS_POLLHUP) { > ds_put_cstr(&s, "[OVS_POLLHUP]"); > } > +#ifndef OVS_USE_EPOLL > + /* epoll does not have NVAL - it uses RDHUP and HUP which we cannot > actually get to here*/ > if (pollfd->revents & OVS_POLLNVAL) { > ds_put_cstr(&s, "[OVS_POLLNVAL]"); > } > +#endif > ds_put_format(&s, " on fd %d (%s)", pollfd->fd, description); > free(description); > } else { > @@ -295,12 +380,17 @@ log_wakeup(const char *where, const struct pollfd > *pollfd, int timeout) > ds_destroy(&s); > } > > + > static void > free_poll_nodes(struct poll_loop *loop) > { > struct poll_node *node, *next; > > + ovs_mutex_lock(&loop->loop_mutex); > HMAP_FOR_EACH_SAFE (node, next, hmap_node, &loop->poll_nodes) { > +#ifdef OVS_USE_EPOLL > + epoll_ctl(loop->epoll_fd, EPOLL_CTL_DEL, node->pollfd.fd, NULL); > +#endif > hmap_remove(&loop->poll_nodes, &node->hmap_node); > #ifdef _WIN32 > if (node->wevent && node->pollfd.fd) { > @@ -310,6 +400,7 @@ free_poll_nodes(struct poll_loop *loop) > #endif > free(node); > } > + ovs_mutex_unlock(&loop->loop_mutex); > } > > /* Blocks until one or more of the events registered with poll_fd_wait() > @@ -320,8 +411,13 @@ poll_block(void) > { > struct poll_loop *loop = poll_loop(); > struct poll_node *node; > +#ifndef OVS_USE_EPOLL > struct pollfd *pollfds; > +#endif > +#ifndef OVS_USE_EPOLL > HANDLE *wevents = NULL; > + int counter; > +#endif > int elapsed; > int retval; > int i; > @@ -335,54 +431,126 @@ poll_block(void) > } > > timewarp_run(); > - pollfds = xmalloc(hmap_count(&loop->poll_nodes) * sizeof *pollfds); > > +#ifdef OVS_USE_EPOLL > + retval = time_epoll_wait(loop->epoll_fd, > + (struct epoll_event *) &loop->epoll_events, MAX_EPOLL_EVENTS, > loop->timeout_when, &elapsed); > + if (retval < 0) { > + static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5); > + VLOG_ERR_RL(&rl, "epoll: %s", ovs_strerror(retval)); > + } else if (!retval) { > + log_wakeup(loop->timeout_where, NULL, elapsed); > + } else { > + ovs_mutex_lock(&loop->loop_mutex); > + if (get_cpu_usage() > 50 || VLOG_IS_DBG_ENABLED()) { > + for (i = 0; i < retval; i++) { > + node = (struct poll_node *) loop->epoll_events[i].data.ptr; > + if (loop->epoll_events[i].events) { > + node->pollfd.revents = loop->epoll_events[i].events; > + log_wakeup(node->where, &node->pollfd, 0); > + } > + } > + } > + for (i = 0; i < retval; i++) { > + node = (struct poll_node *) loop->epoll_events[i].data.ptr; > + if (loop->epoll_events[i].events & EPOLLHUP) { > + /* File descriptor closed already elsewhere > + * We have to make the assumption that whoever closed it has > + * ensured that anything which refers to IO event hints will > not run > + * on this fd after we free it. > + */ > + node->valid = false; > + } > + if (loop->epoll_events[i].events) { > + node->pollfd.revents |= (loop->epoll_events[i].events & > 0x0000FFFF); > + } > + if (loop->epoll_events[i].events & OVS_POLLOUT) { > + struct epoll_event event; > + node->pollfd.events = OVS_POLLIN; /* reset back to defaults > - write needs one shot */ > + event.events = node->pollfd.events; > + event.data.ptr = node; > + epoll_ctl(loop->epoll_fd, EPOLL_CTL_MOD, node->pollfd.fd, > &event); > + } > + } > + ovs_mutex_unlock(&loop->loop_mutex); > + } > +#else > + pollfds = xmalloc(hmap_count(&loop->poll_nodes) * sizeof *pollfds); > #ifdef _WIN32 > wevents = xmalloc(hmap_count(&loop->poll_nodes) * sizeof *wevents); > #endif > > + > /* Populate with all the fds and events. */ > - i = 0; > + counter = 0; > HMAP_FOR_EACH (node, hmap_node, &loop->poll_nodes) { > - pollfds[i] = node->pollfd; > + if ((node->valid) && (node->pollfd.events)) { > + pollfds[counter] = node->pollfd; > #ifdef _WIN32 > - wevents[i] = node->wevent; > - if (node->pollfd.fd && node->wevent) { > - short int wsa_events = 0; > - if (node->pollfd.events & OVS_POLLIN) { > - wsa_events |= FD_READ | FD_ACCEPT | FD_CLOSE; > + wevents[counter] = node->wevent; > + if (node->pollfd.fd && node->wevent) { > + short int wsa_events = 0; > + if (node->pollfd.events & OVS_POLLIN) { > + wsa_events |= FD_READ | FD_ACCEPT | FD_CLOSE; > + } > + if (node->pollfd.events & OVS_POLLOUT) { > + wsa_events |= FD_WRITE | FD_CONNECT | FD_CLOSE; > + } > + WSAEventSelect(node->pollfd.fd, node->wevent, wsa_events); > } > - if (node->pollfd.events & OVS_POLLOUT) { > - wsa_events |= FD_WRITE | FD_CONNECT | FD_CLOSE; > - } > - WSAEventSelect(node->pollfd.fd, node->wevent, wsa_events); > - } > #endif > - i++; > + counter++; > + } > } > > - retval = time_poll(pollfds, hmap_count(&loop->poll_nodes), wevents, > + retval = time_poll(pollfds, counter, wevents, > loop->timeout_when, &elapsed); > if (retval < 0) { > static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5); > VLOG_ERR_RL(&rl, "poll: %s", ovs_strerror(-retval)); > - } else if (!retval) { > + } else if (retval == 0) { > log_wakeup(loop->timeout_where, NULL, elapsed); > - } else if (get_cpu_usage() > 50 || VLOG_IS_DBG_ENABLED()) { > - i = 0; > - HMAP_FOR_EACH (node, hmap_node, &loop->poll_nodes) { > + } else { > + for (i = 0; i < counter; i++) { > if (pollfds[i].revents) { > - log_wakeup(node->where, &pollfds[i], 0); > + > + node = find_poll_node(loop, pollfds[i].fd, 0); > + > + if (!node) { > + VLOG_FATAL("poll: persistence state corrupted, no hash > entry for %d", pollfds[i].fd); > + } > + if (pollfds[i].revents & (OVS_POLLHUP | OVS_POLLNVAL)) { > + node->valid = false; > + } > + > + if (get_cpu_usage() > 50 || VLOG_IS_DBG_ENABLED()) { > + log_wakeup(node->where, &pollfds[i], 0); > + } > + /* update "requested" events. > + * Note - "private" fds always want POLLIN - that emulates > EPOLL, /dev/poll, etc > + * behaviour which they should be using in real life instead > of using poll() > + */ > + if (node->private) { > + node->pollfd.events &= ~(pollfds[i].revents & > (~OVS_POLLIN)); > + } else { > + node->pollfd.events &= ~pollfds[i].revents; > + } > + /* update "occured" events for use by streams and handlers. > In case there > + * is an existing (but not consumed yet) event, we OR the > events in the > + * stored record with the new ones - it is the job of the > stream to clear > + * that. > + */ > + node->pollfd.revents |= pollfds[i].revents; > } > - i++; > } > } > > - free_poll_nodes(loop); > + free(pollfds); > + if (wevents) > + free(wevents); > +#endif > loop->timeout_when = LLONG_MAX; > loop->timeout_where = NULL; > - free(pollfds); > - free(wevents); > > /* Handle any pending signals before doing anything else. */ > fatal_signal_run(); > @@ -416,8 +584,12 @@ poll_loop(void) > if (!loop) { > loop = xzalloc(sizeof *loop); > loop->timeout_when = LLONG_MAX; > + ovs_mutex_init(&loop->loop_mutex); > hmap_init(&loop->poll_nodes); > xpthread_setspecific(key, loop); > +#ifdef OVS_USE_EPOLL > + loop->epoll_fd = epoll_create(MAX_EPOLL_EVENTS); > +#endif > } > return loop; > } > diff --git a/lib/route-table-bsd.c b/lib/route-table-bsd.c > index 3dfa80c7f..16d155989 100644 > --- a/lib/route-table-bsd.c > +++ b/lib/route-table-bsd.c > @@ -34,6 +34,7 @@ > #include "ovs-router.h" > #include "packets.h" > #include "openvswitch/vlog.h" > +#include "openvswitch/poll-loop.h" > #include "util.h" > > VLOG_DEFINE_THIS_MODULE(route_table_bsd); > diff --git a/lib/stream-fd.c b/lib/stream-fd.c > index 62f768d45..6a80d6e05 100644 > --- a/lib/stream-fd.c > +++ b/lib/stream-fd.c > @@ -40,6 +40,8 @@ struct stream_fd > struct stream stream; > int fd; > int fd_type; > + bool rx_ready, tx_ready; > + struct pollfd *hint; > }; > > static const struct stream_class stream_fd_class; > @@ -67,7 +69,14 @@ new_fd_stream(char *name, int fd, int connect_status, int > fd_type, > stream_init(&s->stream, &stream_fd_class, connect_status, name); > s->fd = fd; > s->fd_type = fd_type; > + s->rx_ready = true; > + s->tx_ready = true; > + s->hint = NULL; > *streamp = &s->stream; > + /* Persistent registration - we always get POLLINs from now on, > + * POLLOUTs when we ask for them > + */ > + poll_fd_register(s->fd, OVS_POLLIN, &s->hint); > return 0; > } > > @@ -82,6 +91,8 @@ static void > fd_close(struct stream *stream) > { > struct stream_fd *s = stream_fd_cast(stream); > + /* Deregister the FD from any persistent registrations if supported */ > + poll_fd_deregister(s->fd); > closesocket(s->fd); > free(s); > } > @@ -104,6 +115,24 @@ fd_recv(struct stream *stream, void *buffer, size_t n) > ssize_t retval; > int error; > > + if (s->hint) { > + /* poll-loop is providing us with hints for IO. If we got a HUP/NVAL > we skip straight > + * to the read which should return 0 if the HUP is a real one, if > not we clear it > + * for all other cases we belive what (e)poll has fed us. > + */ > + if ((!(s->hint->revents & (OVS_POLLHUP|OVS_POLLNVAL))) && > (!s->rx_ready)) { > + if (!(s->hint->revents & OVS_POLLIN)) { > + return -EAGAIN; > + } else { > + /* POLLIN event from poll loop, mark us as ready */ > + s->rx_ready = true; > + s->hint->revents &= ~OVS_POLLIN; > + } > + } else { > + s->hint->revents &= ~(OVS_POLLHUP|OVS_POLLNVAL); > + } > + } > + > retval = recv(s->fd, buffer, n, 0); > if (retval < 0) { > error = sock_errno(); > @@ -114,6 +143,8 @@ fd_recv(struct stream *stream, void *buffer, size_t n) > #endif > if (error != EAGAIN) { > VLOG_DBG_RL(&rl, "recv: %s", sock_strerror(error)); > + } else { > + s->rx_ready = false; > } > return -error; > } > @@ -127,9 +158,29 @@ fd_send(struct stream *stream, const void *buffer, > size_t n) > ssize_t retval; > int error; > > + if (s->hint) { > + /* poll-loop is providing us with hints for IO */ > + if (!s->tx_ready) { > + if (!(s->hint->revents & OVS_POLLOUT)) { > + return -EAGAIN; > + } else { > + /* POLLOUT event from poll loop, mark us as ready */ > + s->tx_ready = true; > + s->hint->revents &= ~OVS_POLLOUT; > + } > + } > + } > retval = send(s->fd, buffer, n, 0); > if (retval < 0) { > error = sock_errno(); > +#ifdef __linux__ > + /* Linux will sometimes return ENOBUFS on sockets instead of EAGAIN. > Usually seen > + * on unix domain sockets > + */ > + if (error == ENOBUFS) { > + error = EAGAIN; > + } > +#endif > #ifdef _WIN32 > if (error == WSAEWOULDBLOCK) { > error = EAGAIN; > @@ -137,6 +188,8 @@ fd_send(struct stream *stream, const void *buffer, size_t > n) > #endif > if (error != EAGAIN) { > VLOG_DBG_RL(&rl, "send: %s", sock_strerror(error)); > + } else { > + s->tx_ready = false; > } > return -error; > } > @@ -150,11 +203,11 @@ fd_wait(struct stream *stream, enum stream_wait_type > wait) > switch (wait) { > case STREAM_CONNECT: > case STREAM_SEND: > - poll_fd_wait(s->fd, OVS_POLLOUT); > + private_poll_fd_wait(s->fd, OVS_POLLOUT); > break; > > case STREAM_RECV: > - poll_fd_wait(s->fd, OVS_POLLIN); > + private_poll_fd_wait(s->fd, OVS_POLLIN); > break; > > default: > @@ -223,6 +276,8 @@ new_fd_pstream(char *name, int fd, > ps->accept_cb = accept_cb; > ps->unlink_path = unlink_path; > *pstreamp = &ps->pstream; > + /* persistent registration */ > + poll_fd_register(ps->fd, OVS_POLLIN, NULL); > return 0; > } > > @@ -230,6 +285,7 @@ static void > pfd_close(struct pstream *pstream) > { > struct fd_pstream *ps = fd_pstream_cast(pstream); > + poll_fd_deregister(ps->fd); > closesocket(ps->fd); > maybe_unlink_and_free(ps->unlink_path); > free(ps); > @@ -271,7 +327,7 @@ static void > pfd_wait(struct pstream *pstream) > { > struct fd_pstream *ps = fd_pstream_cast(pstream); > - poll_fd_wait(ps->fd, OVS_POLLIN); > + private_poll_fd_wait(ps->fd, OVS_POLLIN); > } > > static const struct pstream_class fd_pstream_class = { > diff --git a/lib/stream-ssl.c b/lib/stream-ssl.c > index 3b7f9865e..53ae51c1b 100644 > --- a/lib/stream-ssl.c > +++ b/lib/stream-ssl.c > @@ -147,6 +147,7 @@ struct ssl_stream > /* A few bytes of header data in case SSL negotiation fails. */ > uint8_t head[2]; > short int n_head; > + struct pollfd *hint; > }; > > /* SSL context created by ssl_init(). */ > @@ -310,6 +311,8 @@ new_ssl_stream(char *name, char *server_name, int fd, > enum session_type type, > SSL_set_msg_callback_arg(ssl, sslv); > } > > + > + poll_fd_register(sslv->fd, OVS_POLLIN, &sslv->hint); > *streamp = &sslv->stream; > free(server_name); > return 0; > @@ -604,6 +607,7 @@ ssl_close(struct stream *stream) > ERR_clear_error(); > > SSL_free(sslv->ssl); > + poll_fd_deregister(sslv->fd); > closesocket(sslv->fd); > free(sslv); > } > @@ -697,6 +701,27 @@ ssl_recv(struct stream *stream, void *buffer, size_t n) > /* Behavior of zero-byte SSL_read is poorly defined. */ > ovs_assert(n > 0); > > + if (sslv->hint) { > + /* poll-loop is providing us with hints for IO. If we got a HUP/NVAL > we skip straight > + * to the read which should return 0 if the HUP is a real one, if > not we clear it > + * for all other cases we belive what (e)poll has fed us. > + */ > + if ((!(sslv->hint->revents & (OVS_POLLHUP|OVS_POLLNVAL))) && > (sslv->rx_want == SSL_READING)) { > + if (!(sslv->hint->revents & OVS_POLLIN)) { > + return -EAGAIN; > + } else { > + /* POLLIN event from poll loop, mark us as ready > + * rx_want is cleared further down by reading ssl fsm > + */ > + sslv->hint->revents &= ~OVS_POLLIN; > + } > + } else { > + sslv->hint->revents &= ~(OVS_POLLHUP|OVS_POLLNVAL); > + } > + } > + > + > + > old_state = SSL_get_state(sslv->ssl); > ret = SSL_read(sslv->ssl, buffer, n); > if (old_state != SSL_get_state(sslv->ssl)) { > @@ -729,6 +754,19 @@ ssl_do_tx(struct stream *stream) > { > struct ssl_stream *sslv = ssl_stream_cast(stream); > > + if (sslv->hint) { > + /* poll-loop is providing us with hints for IO */ > + if (sslv->tx_want == SSL_WRITING) { > + if (!(sslv->hint->revents & OVS_POLLOUT)) { > + return EAGAIN; > + } else { > + /* POLLIN event from poll loop, mark us as ready > + * rx_want is cleared further down by reading ssl fsm > + */ > + sslv->hint->revents &= ~OVS_POLLOUT; > + } > + } > + } > for (;;) { > int old_state = SSL_get_state(sslv->ssl); > int ret = SSL_write(sslv->ssl, sslv->txbuf->data, sslv->txbuf->size); > @@ -771,6 +809,8 @@ ssl_send(struct stream *stream, const void *buffer, > size_t n) > ssl_clear_txbuf(sslv); > return n; > case EAGAIN: > + /* we want to know when this fd will become available again */ > + stream_send_wait(stream); > return n; > default: > ssl_clear_txbuf(sslv); > @@ -795,7 +835,7 @@ ssl_run_wait(struct stream *stream) > struct ssl_stream *sslv = ssl_stream_cast(stream); > > if (sslv->tx_want != SSL_NOTHING) { > - poll_fd_wait(sslv->fd, want_to_poll_events(sslv->tx_want)); > + private_poll_fd_wait(sslv->fd, want_to_poll_events(sslv->tx_want)); > } > } > > @@ -811,13 +851,13 @@ ssl_wait(struct stream *stream, enum stream_wait_type > wait) > } else { > switch (sslv->state) { > case STATE_TCP_CONNECTING: > - poll_fd_wait(sslv->fd, OVS_POLLOUT); > + private_poll_fd_wait(sslv->fd, OVS_POLLOUT); > break; > > case STATE_SSL_CONNECTING: > /* ssl_connect() called SSL_accept() or SSL_connect(), which > * set up the status that we test here. */ > - poll_fd_wait(sslv->fd, > + private_poll_fd_wait(sslv->fd, > want_to_poll_events(SSL_want(sslv->ssl))); > break; > > @@ -829,7 +869,7 @@ ssl_wait(struct stream *stream, enum stream_wait_type > wait) > > case STREAM_RECV: > if (sslv->rx_want != SSL_NOTHING) { > - poll_fd_wait(sslv->fd, want_to_poll_events(sslv->rx_want)); > + private_poll_fd_wait(sslv->fd, > want_to_poll_events(sslv->rx_want)); > } else { > poll_immediate_wake(); > } > @@ -911,6 +951,7 @@ pssl_open(const char *name OVS_UNUSED, char *suffix, > struct pstream **pstreamp, > ds_steal_cstr(&bound_name)); > pstream_set_bound_port(&pssl->pstream, htons(port)); > pssl->fd = fd; > + poll_fd_register(fd, OVS_POLLIN, NULL); > *pstreamp = &pssl->pstream; > > return 0; > @@ -920,6 +961,7 @@ static void > pssl_close(struct pstream *pstream) > { > struct pssl_pstream *pssl = pssl_pstream_cast(pstream); > + poll_fd_deregister(pssl->fd); > closesocket(pssl->fd); > free(pssl); > } > diff --git a/lib/timeval.c b/lib/timeval.c > index 193c7bab1..59a12414f 100644 > --- a/lib/timeval.c > +++ b/lib/timeval.c > @@ -38,6 +38,7 @@ > #include "unixctl.h" > #include "util.h" > #include "openvswitch/vlog.h" > +#include "openvswitch/poll-loop.h" > > VLOG_DEFINE_THIS_MODULE(timeval); > > @@ -369,6 +370,88 @@ time_poll(struct pollfd *pollfds, int n_pollfds, HANDLE > *handles OVS_UNUSED, > return retval; > } > > +#ifdef OVS_USE_EPOLL > + > +/* Like epoll_wait(), except: > + * > + * - The timeout is specified as an absolute time, as defined by > + * time_msec(), instead of a duration. > + * > + * - On error, returns a negative error code (instead of setting errno). > + * > + * - If interrupted by a signal, retries automatically until the > original > + * timeout is reached. (Because of this property, this function will > + * never return -EINTR.) > + * > + * Stores the number of milliseconds elapsed during poll in '*elapsed'. */ > +int > +time_epoll_wait(int epoll_fd, struct epoll_event *events, int max, > + long long int timeout_when, int *elapsed) > +{ > + long long int *last_wakeup = last_wakeup_get(); > + long long int start; > + bool quiescent; > + int retval = 0; > + > + time_init(); > + coverage_clear(); > + coverage_run(); > + if (*last_wakeup && !thread_is_pmd()) { > + log_poll_interval(*last_wakeup); > + } > + start = time_msec(); > + > + timeout_when = MIN(timeout_when, deadline); > + quiescent = ovsrcu_is_quiescent(); > + > + for (;;) { > + long long int now = time_msec(); > + int time_left; > + > + if (now >= timeout_when) { > + time_left = 0; > + } else if ((unsigned long long int) timeout_when - now > INT_MAX) { > + time_left = INT_MAX; > + } else { > + time_left = timeout_when - now; > + } > + > + if (!quiescent) { > + if (!time_left) { > + ovsrcu_quiesce(); > + } else { > + ovsrcu_quiesce_start(); > + } > + } > + > + retval = epoll_wait(epoll_fd, events, max, time_left); > + if (retval < 0) { > + retval = -errno; > + } > + > + if (!quiescent && time_left) { > + ovsrcu_quiesce_end(); > + } > + > + if (deadline <= time_msec()) { > + fatal_signal_handler(SIGALRM); > + if (retval < 0) { > + retval = 0; > + } > + break; > + } > + > + if (retval != -EINTR) { > + break; > + } > + } > + *last_wakeup = time_msec(); > + refresh_rusage(); > + *elapsed = *last_wakeup - start; > + return retval; > +} > +#endif > + > long long int > timespec_to_msec(const struct timespec *ts) > { > diff --git a/lib/timeval.h b/lib/timeval.h > index 502f703d4..347a09d63 100644 > --- a/lib/timeval.h > +++ b/lib/timeval.h > @@ -20,6 +20,9 @@ > #include <time.h> > #include "openvswitch/type-props.h" > #include "util.h" > +#ifdef __linux__ > +#include <sys/epoll.h> > +#endif > > #ifdef __cplusplus > extern "C" { > @@ -61,6 +64,10 @@ void time_wall_timespec(struct timespec *); > void time_alarm(unsigned int secs); > int time_poll(struct pollfd *, int n_pollfds, HANDLE *handles, > long long int timeout_when, int *elapsed); > +#ifdef __linux__ > +int time_epoll_wait(int epoll_fd, struct epoll_event *events, int max, > + long long int timeout_when, int *elapsed); > +#endif > > long long int timespec_to_msec(const struct timespec *); > long long int timespec_to_usec(const struct timespec *); > _______________________________________________ dev mailing list d...@openvswitch.org https://mail.openvswitch.org/mailman/listinfo/ovs-dev