---
include/openvswitch/poll-loop.h | 56 +++++-
lib/dpif-netlink.c | 6 +-
lib/fatal-signal.c | 7 +-
lib/latch-unix.c | 3 +-
lib/netdev-afxdp.c | 2 +-
lib/poll-loop.c | 320 ++++++++++++++++++++++++--------
lib/route-table-bsd.c | 1 +
lib/stream-fd.c | 62 ++++++-
lib/stream-ssl.c | 50 ++++-
lib/timeval.c | 83 +++++++++
lib/timeval.h | 7 +
11 files changed, 508 insertions(+), 89 deletions(-)
diff --git a/include/openvswitch/poll-loop.h b/include/openvswitch/poll-loop.h
index 532d9caa6..6d0331f6d 100644
--- a/include/openvswitch/poll-loop.h
+++ b/include/openvswitch/poll-loop.h
@@ -41,11 +41,30 @@
#include <windows.h>
#endif
+#ifdef __linux__
+#define OVS_USE_EPOLL
+#endif
+
+#ifdef OVS_USE_EPOLL
+#include <sys/epoll.h>
+
+#define OVS_POLLIN EPOLLIN
+#define OVS_POLLOUT EPOLLOUT
+#define OVS_POLLERR EPOLLERR
+#define OVS_POLLHUP EPOLLHUP
+#define OVS_ONESHOT EPOLLONESHOT
+#define OVS_POLLNVAL 0
+
+#else
+
#define OVS_POLLIN POLLIN
#define OVS_POLLOUT POLLOUT
#define OVS_POLLERR POLLERR
#define OVS_POLLNVAL POLLNVAL
#define OVS_POLLHUP POLLHUP
+#define OVS_ONESHOT (1U << 30)
+
+#endif
#ifdef __cplusplus
extern "C" {
@@ -60,10 +79,43 @@ extern "C" {
* the source code location of the caller. The function version allows the
* caller to supply a location explicitly, which is useful if the caller's own
* caller would be more useful in log output. See timer_wait_at() for an
- * example. */
-void poll_fd_wait_at(int fd, short int events, const char *where);
+ * example.
+ * Note - using on fds registered using poll_fd_register() will generate a
+ * warning as this is not an intended use.
+ */
+void poll_fd_wait_at(int fd, int events, const char *where);
#define poll_fd_wait(fd, events) poll_fd_wait_at(fd, events,
OVS_SOURCE_LOCATOR)
+/* Register a fd with a persistence framework if available so it can be served
+ * "faster" and the caller can be provided with "hints" on what caused the IO
+ * event.
+ * If the "hint" argument is supplied it set to point to the pollfd structure
+ * containing the events passed by the OS in .revents.
+ * Note - as the frameworks are OS dependent, the events are limited to what
+ * can be passed in a .revents which is a short int.
+ * Limitations - MUST BE registered from the same thread as the one where
+ * it will be waited upon.
+ */
+
+void poll_fd_register_at(int fd, int events, struct pollfd **hint, const char
*where);
+#define poll_fd_register(fd, events, hint) poll_fd_register_at(fd, events,
hint, OVS_SOURCE_LOCATOR)
+
+/* De-register a fd which was registered as "private" with the persistence
+ * framework
+ */
+
+void poll_fd_deregister_at(int fd, const char *where);
+#define poll_fd_deregister(fd) poll_fd_deregister_at(fd, OVS_SOURCE_LOCATOR)
+
+/* Schedule events to wake up the following poll_block() - "private fds"
+ * Same as poll_fd_wait, but for fds which have been registered and are
+ * expected to persist. If a "fast" OS fd notification framework is used
+ * this version of wait may be a NOOP (f.e. for (E)POLLIN events.
+ */
+void private_poll_fd_wait_at(int fd, int events, const char *where);
+#define private_poll_fd_wait(fd, events) private_poll_fd_wait_at(fd, events,
OVS_SOURCE_LOCATOR)
+
+
#ifdef _WIN32
void poll_wevent_wait_at(HANDLE wevent, const char *where);
#define poll_wevent_wait(wevent) poll_wevent_wait_at(wevent,
OVS_SOURCE_LOCATOR)
diff --git a/lib/dpif-netlink.c b/lib/dpif-netlink.c
index 5b5c96d72..ad5db9452 100644
--- a/lib/dpif-netlink.c
+++ b/lib/dpif-netlink.c
@@ -1289,7 +1289,7 @@ dpif_netlink_port_poll_wait(const struct dpif *dpif_)
const struct dpif_netlink *dpif = dpif_netlink_cast(dpif_);
if (dpif->port_notifier) {
- nl_sock_wait(dpif->port_notifier, POLLIN);
+ nl_sock_wait(dpif->port_notifier, OVS_POLLIN);
} else {
poll_immediate_wake();
}
@@ -2756,13 +2756,13 @@ dpif_netlink_recv_wait__(struct dpif_netlink *dpif,
uint32_t handler_id)
}
for (i = 0; i < VPORT_SOCK_POOL_SIZE; i++) {
- nl_sock_wait(sock_pool[i].nl_sock, POLLIN);
+ nl_sock_wait(sock_pool[i].nl_sock, OVS_POLLIN);
}
#else
if (dpif->handlers && handler_id < dpif->n_handlers) {
struct dpif_handler *handler = &dpif->handlers[handler_id];
- poll_fd_wait(handler->epoll_fd, POLLIN);
+ poll_fd_wait(handler->epoll_fd, OVS_POLLIN);
}
#endif
}
diff --git a/lib/fatal-signal.c b/lib/fatal-signal.c
index 97d8d1dab..424636e07 100644
--- a/lib/fatal-signal.c
+++ b/lib/fatal-signal.c
@@ -96,6 +96,7 @@ fatal_signal_init(void)
ovs_mutex_init_recursive(&mutex);
#ifndef _WIN32
xpipe_nonblocking(signal_fds);
+ poll_fd_register(signal_fds[0], OVS_POLLIN, NULL);
#else
wevent = CreateEvent(NULL, TRUE, FALSE, NULL);
if (!wevent) {
@@ -236,9 +237,12 @@ void
fatal_signal_run(void)
{
sig_atomic_t sig_nr;
+ char sigbuffer[_POSIX_PIPE_BUF];
fatal_signal_init();
+ read(signal_fds[0], sigbuffer, sizeof(sigbuffer));
+
sig_nr = stored_sig_nr;
if (sig_nr != SIG_ATOMIC_MAX) {
char namebuf[SIGNAL_NAME_BUFSIZE];
@@ -271,7 +275,8 @@ fatal_signal_wait(void)
#ifdef _WIN32
poll_wevent_wait(wevent);
#else
- poll_fd_wait(signal_fds[0], OVS_POLLIN);
+ /* a noop - schedule for removal */
+ private_poll_fd_wait(signal_fds[0], OVS_POLLIN);
#endif
}
diff --git a/lib/latch-unix.c b/lib/latch-unix.c
index fea61ab28..5f15b59fe 100644
--- a/lib/latch-unix.c
+++ b/lib/latch-unix.c
@@ -83,5 +83,6 @@ latch_is_set(const struct latch *latch)
void
latch_wait_at(const struct latch *latch, const char *where)
{
- poll_fd_wait_at(latch->fds[0], OVS_POLLIN, where);
+ /* Ask for wait and make it one-shot if persistence is in play */
+ poll_fd_wait_at(latch->fds[0], OVS_POLLIN | OVS_ONESHOT, where);
}
diff --git a/lib/netdev-afxdp.c b/lib/netdev-afxdp.c
index ef367e5ea..482400d8d 100644
--- a/lib/netdev-afxdp.c
+++ b/lib/netdev-afxdp.c
@@ -184,7 +184,7 @@ xsk_rx_wakeup_if_needed(struct xsk_umem_info *umem,
if (xsk_ring_prod__needs_wakeup(&umem->fq)) {
pfd.fd = fd;
- pfd.events = OVS_POLLIN;
+ pfd.events = POLLIN;
ret = poll(&pfd, 1, 0);
if (OVS_UNLIKELY(ret < 0)) {
diff --git a/lib/poll-loop.c b/lib/poll-loop.c
index 3902d6c1f..10a5b0c01 100644
--- a/lib/poll-loop.c
+++ b/lib/poll-loop.c
@@ -18,6 +18,12 @@
#include "openvswitch/poll-loop.h"
#include <errno.h>
#include <inttypes.h>
+#ifdef OVS_USE_EPOLL
+#include <sys/epoll.h>
+#endif
+#ifndef _WIN32
+#include <unistd.h>
+#endif
#include <poll.h>
#include <stdlib.h>
#include <string.h>
@@ -31,7 +37,9 @@
#include "timeval.h"
#include "openvswitch/vlog.h"
#include "openvswitch/hmap.h"
+#include "openvswitch/list.h"
#include "hash.h"
+#include "ovs-atomic.h"
VLOG_DEFINE_THIS_MODULE(poll_loop);
@@ -43,21 +51,32 @@ struct poll_node {
struct pollfd pollfd; /* Events to pass to time_poll(). */
HANDLE wevent; /* Events for WaitForMultipleObjects(). */
const char *where; /* Where poll_node was created. */
+ bool valid; /* Can it be used? */
+ bool private; /* Can we assume that it is only in this
thread poll loop? */
};
+#define MAX_EPOLL_EVENTS 64
+
struct poll_loop {
- /* All active poll waiters. */
+ /* List of all poll loops in the system */
+ struct ovs_mutex loop_mutex;
+ /* All poll waiters for this poll loop */
struct hmap poll_nodes;
/* Time at which to wake up the next call to poll_block(), LLONG_MIN to
* wake up immediately, or LLONG_MAX to wait forever. */
long long int timeout_when; /* In msecs as returned by time_msec(). */
const char *timeout_where; /* Where 'timeout_when' was set. */
+#ifdef OVS_USE_EPOLL
+ int epoll_fd;
+ struct epoll_event epoll_events[MAX_EPOLL_EVENTS];
+#endif
};
+
static struct poll_loop *poll_loop(void);
-/* Look up the node with same fd or wevent. */
+/* Look up the node with same fd or wevent - should be accessed under
&loop->mutex. */
static struct poll_node *
find_poll_node(struct poll_loop *loop, int fd, HANDLE wevent)
{
@@ -76,79 +95,142 @@ find_poll_node(struct poll_loop *loop, int fd, HANDLE
wevent)
}
return NULL;
}
-
-/* On Unix based systems:
- *
- * Registers 'fd' as waiting for the specified 'events' (which should be
- * OVS_POLLIN or OVS_POLLOUT or OVS_POLLIN | OVS_POLLOUT). The following
call to
- * poll_block() will wake up when 'fd' becomes ready for one or more of the
- * requested events. The 'fd's are given to poll() function later.
- *
- * On Windows system:
+/* Registers 'fd' as waiting for the specified 'events' (which should be
OVS_POLLIN
+ * or OVS_POLLOUT or OVS_POLLIN | OVS_POLLOUT). The following call to
poll_block() will
+ * wake up when 'fd' becomes ready for one or more of the requested events.
*
- * If 'fd' is specified, create a new 'wevent'. Association of 'fd' and
- * 'wevent' for 'events' happens in poll_block(). If 'wevent' is specified,
- * it is assumed that it is unrelated to any sockets and poll_block()
- * will wake up on any event on that 'wevent'. It is an error to pass
- * both 'wevent' and 'fd'.
+ * The event registration is PERSISTENT. This is intended for OSes which have
a persistent
+ * event framework. For now it is implemented only for epoll and Linux, other
+ * implementations such as BSD kqueue and Solaris /dev/poll may follow.
*
- * The event registration is one-shot: only the following call to
- * poll_block() is affected. The event will need to be re-registered after
- * poll_block() is called if it is to persist.
+ * If the OS has no persistent even framework does nothing
*
* ('where' is used in debug logging. Commonly one would use poll_fd_wait()
to
* automatically provide the caller's source file and line number for
* 'where'.) */
+
static void
-poll_create_node(int fd, HANDLE wevent, short int events, const char *where)
+poll_fd_subscribe_at(int fd, HANDLE wevent, int events, struct pollfd **hint,
const char *where, bool private)
{
struct poll_loop *loop = poll_loop();
struct poll_node *node;
+#ifdef OVS_USE_EPOLL
+ struct epoll_event event;
+#endif
- COVERAGE_INC(poll_create_node);
-
- /* Both 'fd' and 'wevent' cannot be set. */
ovs_assert(!fd != !wevent);
+ /* This is mostly uncontended, so the thread should grab it straight away.
+ * We will reuse it later to introduce threading for IO and SSL
+ */
+ ovs_mutex_lock(&loop->loop_mutex);
+
/* Check for duplicate. If found, "or" the events. */
node = find_poll_node(loop, fd, wevent);
- if (node) {
- node->pollfd.events |= events;
- } else {
- node = xzalloc(sizeof *node);
- hmap_insert(&loop->poll_nodes, &node->hmap_node,
- hash_2words(fd, (uint32_t)wevent));
- node->pollfd.fd = fd;
- node->pollfd.events = events;
-#ifdef _WIN32
- if (!wevent) {
- wevent = CreateEvent(NULL, FALSE, FALSE, NULL);
+
+ if (node && node->valid) {
+#ifdef OVS_USE_EPOLL
+ int old_event_mask = node->pollfd.events;
+#endif
+ /* If there is an existing event mask we do not need to inc - this
will be waited upon */
+ node->pollfd.events |= (events & 0x0000FFFF); /* or without epoll
specific bits */
+
+#ifdef OVS_USE_EPOLL
+ /* modify existing epoll entry if there is an epoll specific ask or if
the
+ * mask has changed
+ */
+ if ((events & 0xFFFF0000) || (old_event_mask != node->pollfd.events)) {
+ event.events = node->pollfd.events | events | EPOLLHUP |
EPOLLRDHUP;
+ event.data.ptr = node;
+ epoll_ctl(loop->epoll_fd, EPOLL_CTL_MOD, fd, &event);
}
#endif
+ } else {
+ if (!node) {
+ node = xzalloc(sizeof *node);
+ hmap_insert(&loop->poll_nodes, &node->hmap_node,
+ hash_2words(fd, 0));
+ } else {
+ /* node marked for reaping, OS has reused the fd number, valid is
set to false */
+#ifdef OVS_USE_EPOLl
+ epoll_ctl(loop->epoll_fd, EPOLL_CTL_DEL, node->pollfd.fd, NULL);
+#endif
+ hmap_remove(&loop->poll_nodes, &node->hmap_node);
+ } else {
+ VLOG(VLL_WARN, "Trying to deregister a non-private %d from %s",
fd, where);
+ node->valid = false;
+ }
+ }
+ ovs_mutex_unlock(&loop->loop_mutex);
+}
+
+void
+poll_fd_wait_at(int fd, int events, const char *where)
+{
+ poll_fd_subscribe_at(fd, 0, events, NULL, where, false);
}
-/* Registers 'fd' as waiting for the specified 'events' (which should be OVS_POLLIN
- * or OVS_POLLOUT or OVS_POLLIN | OVS_POLLOUT). The following call to
poll_block() will
- * wake up when 'fd' becomes ready for one or more of the requested events.
- *
- * On Windows, 'fd' must be a socket.
- *
- * The event registration is one-shot: only the following call to poll_block()
- * is affected. The event will need to be re-registered after poll_block() is
- * called if it is to persist.
- *
- * ('where' is used in debug logging. Commonly one would use poll_fd_wait() to
- * automatically provide the caller's source file and line number for
- * 'where'.) */
void
-poll_fd_wait_at(int fd, short int events, const char *where)
+private_poll_fd_wait_at(int fd, int events, const char *where)
{
- poll_create_node(fd, 0, events, where);
+ /* POLLIN persists on "private" fds - either emulated or at epoll
+ * or other persistence framework level
+ */
+ if (events & (~OVS_POLLIN)) {
+ poll_fd_subscribe_at(fd, 0, events, NULL, where, true);
+ }
}
+
#ifdef _WIN32
/* Registers for the next call to poll_block() to wake up when 'wevent' is
* signaled.
@@ -163,7 +245,7 @@ poll_fd_wait_at(int fd, short int events, const char *where)
void
poll_wevent_wait_at(HANDLE wevent, const char *where)
{
- poll_create_node(0, wevent, 0, where);
+ poll_fd_subscribe_at(0, wevent, 0, NULL, where);
}
#endif /* _WIN32 */
@@ -277,9 +359,12 @@ log_wakeup(const char *where, const struct pollfd *pollfd, int timeout)
if (pollfd->revents & OVS_POLLHUP) {
ds_put_cstr(&s, "[OVS_POLLHUP]");
}
+#ifndef OVS_USE_EPOLL
+ /* epoll does not have NVAL - it uses RDHUP and HUP which we cannot
actually get to here*/
if (pollfd->revents & OVS_POLLNVAL) {
ds_put_cstr(&s, "[OVS_POLLNVAL]");
}
+#endif
ds_put_format(&s, " on fd %d (%s)", pollfd->fd, description);
free(description);
} else {
@@ -295,12 +380,17 @@ log_wakeup(const char *where, const struct pollfd
*pollfd, int timeout)
ds_destroy(&s);
}
+
static void
free_poll_nodes(struct poll_loop *loop)
{
struct poll_node *node, *next;
+ ovs_mutex_lock(&loop->loop_mutex);
HMAP_FOR_EACH_SAFE (node, next, hmap_node, &loop->poll_nodes) {
+#ifdef OVS_USE_EPOLL
+ epoll_ctl(loop->epoll_fd, EPOLL_CTL_DEL, node->pollfd.fd, NULL);
+#endif
hmap_remove(&loop->poll_nodes, &node->hmap_node);
#ifdef _WIN32
if (node->wevent && node->pollfd.fd) {
@@ -310,6 +400,7 @@ free_poll_nodes(struct poll_loop *loop)
#endif
free(node);
}
+ ovs_mutex_unlock(&loop->loop_mutex);
}
/* Blocks until one or more of the events registered with poll_fd_wait()
@@ -320,8 +411,13 @@ poll_block(void)
{
struct poll_loop *loop = poll_loop();
struct poll_node *node;
+#ifndef OVS_USE_EPOLL
struct pollfd *pollfds;
+#endif
+#ifndef OVS_USE_EPOLL
HANDLE *wevents = NULL;
+ int counter;
+#endif
int elapsed;
int retval;
int i;
@@ -335,54 +431,126 @@ poll_block(void)
}
timewarp_run();
- pollfds = xmalloc(hmap_count(&loop->poll_nodes) * sizeof *pollfds);
+#ifdef OVS_USE_EPOLL
+ retval = time_epoll_wait(loop->epoll_fd,
+ (struct epoll_event *) &loop->epoll_events, MAX_EPOLL_EVENTS,
loop->timeout_when, &elapsed);
+ if (retval < 0) {
+ static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
+ VLOG_ERR_RL(&rl, "epoll: %s", ovs_strerror(retval));
+ } else if (!retval) {
+ log_wakeup(loop->timeout_where, NULL, elapsed);
+ } else {
+ ovs_mutex_lock(&loop->loop_mutex);
+ if (get_cpu_usage() > 50 || VLOG_IS_DBG_ENABLED()) {
+ for (i = 0; i < retval; i++) {
+ node = (struct poll_node *) loop->epoll_events[i].data.ptr;
+ if (loop->epoll_events[i].events) {
+ node->pollfd.revents = loop->epoll_events[i].events;
+ log_wakeup(node->where, &node->pollfd, 0);
+ }
+ }
+ }
+ for (i = 0; i < retval; i++) {
+ node = (struct poll_node *) loop->epoll_events[i].data.ptr;
+ if (loop->epoll_events[i].events & EPOLLHUP) {
+ /* File descriptor closed already elsewhere
+ * We have to make the assumption that whoever closed it has
+ * ensured that anything which refers to IO event hints will
not run
+ * on this fd after we free it.
+ */
+ node->valid = false;
+ }
+ if (loop->epoll_events[i].events) {
+ node->pollfd.revents |= (loop->epoll_events[i].events &
0x0000FFFF);
+ }
+ if (loop->epoll_events[i].events & OVS_POLLOUT) {
+ struct epoll_event event;
+ node->pollfd.events = OVS_POLLIN; /* reset back to defaults -
write needs one shot */
+ event.events = node->pollfd.events;
+ event.data.ptr = node;
+ epoll_ctl(loop->epoll_fd, EPOLL_CTL_MOD, node->pollfd.fd,
&event);
+ }
+ }
+ ovs_mutex_unlock(&loop->loop_mutex);
+ }
+#else
+ pollfds = xmalloc(hmap_count(&loop->poll_nodes) * sizeof *pollfds);
#ifdef _WIN32
wevents = xmalloc(hmap_count(&loop->poll_nodes) * sizeof *wevents);
#endif
+
/* Populate with all the fds and events. */
- i = 0;
+ counter = 0;
HMAP_FOR_EACH (node, hmap_node, &loop->poll_nodes) {
- pollfds[i] = node->pollfd;
+ if ((node->valid) && (node->pollfd.events)) {
+ pollfds[counter] = node->pollfd;
#ifdef _WIN32
- wevents[i] = node->wevent;
- if (node->pollfd.fd && node->wevent) {
- short int wsa_events = 0;
- if (node->pollfd.events & OVS_POLLIN) {
- wsa_events |= FD_READ | FD_ACCEPT | FD_CLOSE;
+ wevents[counter] = node->wevent;
+ if (node->pollfd.fd && node->wevent) {
+ short int wsa_events = 0;
+ if (node->pollfd.events & OVS_POLLIN) {
+ wsa_events |= FD_READ | FD_ACCEPT | FD_CLOSE;
+ }
+ if (node->pollfd.events & OVS_POLLOUT) {
+ wsa_events |= FD_WRITE | FD_CONNECT | FD_CLOSE;
+ }
+ WSAEventSelect(node->pollfd.fd, node->wevent, wsa_events);
}
- if (node->pollfd.events & OVS_POLLOUT) {
- wsa_events |= FD_WRITE | FD_CONNECT | FD_CLOSE;
- }
- WSAEventSelect(node->pollfd.fd, node->wevent, wsa_events);
- }
#endif
- i++;
+ counter++;
+ }
}
- retval = time_poll(pollfds, hmap_count(&loop->poll_nodes), wevents,
+ retval = time_poll(pollfds, counter, wevents,
loop->timeout_when, &elapsed);
if (retval < 0) {
static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
VLOG_ERR_RL(&rl, "poll: %s", ovs_strerror(-retval));
- } else if (!retval) {
+ } else if (retval == 0) {
log_wakeup(loop->timeout_where, NULL, elapsed);
- } else if (get_cpu_usage() > 50 || VLOG_IS_DBG_ENABLED()) {
- i = 0;
- HMAP_FOR_EACH (node, hmap_node, &loop->poll_nodes) {
+ } else {
+ for (i = 0; i < counter; i++) {
if (pollfds[i].revents) {
- log_wakeup(node->where, &pollfds[i], 0);
+
+ node = find_poll_node(loop, pollfds[i].fd, 0);
+
+ if (!node) {
+ VLOG_FATAL("poll: persistence state corrupted, no hash entry
for %d", pollfds[i].fd);
+ }
+ if (pollfds[i].revents & (OVS_POLLHUP | OVS_POLLNVAL)) {
+ node->valid = false;
+ }
+
+ if (get_cpu_usage() > 50 || VLOG_IS_DBG_ENABLED()) {
+ log_wakeup(node->where, &pollfds[i], 0);
+ }
+ /* update "requested" events.
+ * Note - "private" fds always want POLLIN - that emulates
EPOLL, /dev/poll, etc
+ * behaviour which they should be using in real life instead
of using poll()
+ */
+ if (node->private) {
+ node->pollfd.events &= ~(pollfds[i].revents &
(~OVS_POLLIN));
+ } else {
+ node->pollfd.events &= ~pollfds[i].revents;
+ }
+ /* update "occured" events for use by streams and handlers. In
case there
+ * is an existing (but not consumed yet) event, we OR the
events in the
+ * stored record with the new ones - it is the job of the
stream to clear
+ * that.
+ */
+ node->pollfd.revents |= pollfds[i].revents;
}
- i++;
}
}
- free_poll_nodes(loop);
+ free(pollfds);
+ if (wevents)
+ free(wevents);
+#endif
loop->timeout_when = LLONG_MAX;
loop->timeout_where = NULL;
- free(pollfds);
- free(wevents);
/* Handle any pending signals before doing anything else. */
fatal_signal_run();
@@ -416,8 +584,12 @@ poll_loop(void)
if (!loop) {
loop = xzalloc(sizeof *loop);
loop->timeout_when = LLONG_MAX;
+ ovs_mutex_init(&loop->loop_mutex);
hmap_init(&loop->poll_nodes);
xpthread_setspecific(key, loop);
+#ifdef OVS_USE_EPOLL
+ loop->epoll_fd = epoll_create(MAX_EPOLL_EVENTS);
+#endif
}
return loop;
}
diff --git a/lib/route-table-bsd.c b/lib/route-table-bsd.c
index 3dfa80c7f..16d155989 100644
--- a/lib/route-table-bsd.c
+++ b/lib/route-table-bsd.c
@@ -34,6 +34,7 @@
#include "ovs-router.h"
#include "packets.h"
#include "openvswitch/vlog.h"
+#include "openvswitch/poll-loop.h"
#include "util.h"
VLOG_DEFINE_THIS_MODULE(route_table_bsd);
diff --git a/lib/stream-fd.c b/lib/stream-fd.c
index 62f768d45..6a80d6e05 100644
--- a/lib/stream-fd.c
+++ b/lib/stream-fd.c
@@ -40,6 +40,8 @@ struct stream_fd
struct stream stream;
int fd;
int fd_type;
+ bool rx_ready, tx_ready;
+ struct pollfd *hint;
};
static const struct stream_class stream_fd_class;
@@ -67,7 +69,14 @@ new_fd_stream(char *name, int fd, int connect_status, int
fd_type,
stream_init(&s->stream, &stream_fd_class, connect_status, name);
s->fd = fd;
s->fd_type = fd_type;
+ s->rx_ready = true;
+ s->tx_ready = true;
+ s->hint = NULL;
*streamp = &s->stream;
+ /* Persistent registration - we always get POLLINs from now on,
+ * POLLOUTs when we ask for them
+ */
+ poll_fd_register(s->fd, OVS_POLLIN, &s->hint);
return 0;
}
@@ -82,6 +91,8 @@ static void
fd_close(struct stream *stream)
{
struct stream_fd *s = stream_fd_cast(stream);
+ /* Deregister the FD from any persistent registrations if supported */
+ poll_fd_deregister(s->fd);
closesocket(s->fd);
free(s);
}
@@ -104,6 +115,24 @@ fd_recv(struct stream *stream, void *buffer, size_t n)
ssize_t retval;
int error;
+ if (s->hint) {
+ /* poll-loop is providing us with hints for IO. If we got a HUP/NVAL
we skip straight
+ * to the read which should return 0 if the HUP is a real one, if not
we clear it
+ * for all other cases we belive what (e)poll has fed us.
+ */
+ if ((!(s->hint->revents & (OVS_POLLHUP|OVS_POLLNVAL))) &&
(!s->rx_ready)) {
+ if (!(s->hint->revents & OVS_POLLIN)) {
+ return -EAGAIN;
+ } else {
+ /* POLLIN event from poll loop, mark us as ready */
+ s->rx_ready = true;
+ s->hint->revents &= ~OVS_POLLIN;
+ }
+ } else {
+ s->hint->revents &= ~(OVS_POLLHUP|OVS_POLLNVAL);
+ }
+ }
+
retval = recv(s->fd, buffer, n, 0);
if (retval < 0) {
error = sock_errno();
@@ -114,6 +143,8 @@ fd_recv(struct stream *stream, void *buffer, size_t n)
#endif
if (error != EAGAIN) {
VLOG_DBG_RL(&rl, "recv: %s", sock_strerror(error));
+ } else {
+ s->rx_ready = false;
}
return -error;
}
@@ -127,9 +158,29 @@ fd_send(struct stream *stream, const void *buffer, size_t
n)
ssize_t retval;
int error;
+ if (s->hint) {
+ /* poll-loop is providing us with hints for IO */
+ if (!s->tx_ready) {
+ if (!(s->hint->revents & OVS_POLLOUT)) {
+ return -EAGAIN;
+ } else {
+ /* POLLOUT event from poll loop, mark us as ready */
+ s->tx_ready = true;
+ s->hint->revents &= ~OVS_POLLOUT;
+ }
+ }
+ }
retval = send(s->fd, buffer, n, 0);
if (retval < 0) {
error = sock_errno();
+#ifdef __linux__
+ /* Linux will sometimes return ENOBUFS on sockets instead of EAGAIN.
Usually seen
+ * on unix domain sockets
+ */
+ if (error == ENOBUFS) {
+ error = EAGAIN;
+ }
+#endif
#ifdef _WIN32
if (error == WSAEWOULDBLOCK) {
error = EAGAIN;
@@ -137,6 +188,8 @@ fd_send(struct stream *stream, const void *buffer, size_t n)
#endif
if (error != EAGAIN) {
VLOG_DBG_RL(&rl, "send: %s", sock_strerror(error));
+ } else {
+ s->tx_ready = false;
}
return -error;
}
@@ -150,11 +203,11 @@ fd_wait(struct stream *stream, enum stream_wait_type wait)
switch (wait) {
case STREAM_CONNECT:
case STREAM_SEND:
- poll_fd_wait(s->fd, OVS_POLLOUT);
+ private_poll_fd_wait(s->fd, OVS_POLLOUT);
break;
case STREAM_RECV:
- poll_fd_wait(s->fd, OVS_POLLIN);
+ private_poll_fd_wait(s->fd, OVS_POLLIN);
break;
default:
@@ -223,6 +276,8 @@ new_fd_pstream(char *name, int fd,
ps->accept_cb = accept_cb;
ps->unlink_path = unlink_path;
*pstreamp = &ps->pstream;
+ /* persistent registration */
+ poll_fd_register(ps->fd, OVS_POLLIN, NULL);
return 0;
}
@@ -230,6 +285,7 @@ static void
pfd_close(struct pstream *pstream)
{
struct fd_pstream *ps = fd_pstream_cast(pstream);
+ poll_fd_deregister(ps->fd);
closesocket(ps->fd);
maybe_unlink_and_free(ps->unlink_path);
free(ps);
@@ -271,7 +327,7 @@ static void
pfd_wait(struct pstream *pstream)
{
struct fd_pstream *ps = fd_pstream_cast(pstream);
- poll_fd_wait(ps->fd, OVS_POLLIN);
+ private_poll_fd_wait(ps->fd, OVS_POLLIN);
}
static const struct pstream_class fd_pstream_class = {
diff --git a/lib/stream-ssl.c b/lib/stream-ssl.c
index 3b7f9865e..53ae51c1b 100644
--- a/lib/stream-ssl.c
+++ b/lib/stream-ssl.c
@@ -147,6 +147,7 @@ struct ssl_stream
/* A few bytes of header data in case SSL negotiation fails. */
uint8_t head[2];
short int n_head;
+ struct pollfd *hint;
};
/* SSL context created by ssl_init(). */
@@ -310,6 +311,8 @@ new_ssl_stream(char *name, char *server_name, int fd, enum
session_type type,
SSL_set_msg_callback_arg(ssl, sslv);
}
+
+ poll_fd_register(sslv->fd, OVS_POLLIN, &sslv->hint);
*streamp = &sslv->stream;
free(server_name);
return 0;
@@ -604,6 +607,7 @@ ssl_close(struct stream *stream)
ERR_clear_error();
SSL_free(sslv->ssl);
+ poll_fd_deregister(sslv->fd);
closesocket(sslv->fd);
free(sslv);
}
@@ -697,6 +701,27 @@ ssl_recv(struct stream *stream, void *buffer, size_t n)
/* Behavior of zero-byte SSL_read is poorly defined. */
ovs_assert(n > 0);
+ if (sslv->hint) {
+ /* poll-loop is providing us with hints for IO. If we got a HUP/NVAL
we skip straight
+ * to the read which should return 0 if the HUP is a real one, if not
we clear it
+ * for all other cases we belive what (e)poll has fed us.
+ */
+ if ((!(sslv->hint->revents & (OVS_POLLHUP|OVS_POLLNVAL))) &&
(sslv->rx_want == SSL_READING)) {
+ if (!(sslv->hint->revents & OVS_POLLIN)) {
+ return -EAGAIN;
+ } else {
+ /* POLLIN event from poll loop, mark us as ready
+ * rx_want is cleared further down by reading ssl fsm
+ */
+ sslv->hint->revents &= ~OVS_POLLIN;
+ }
+ } else {
+ sslv->hint->revents &= ~(OVS_POLLHUP|OVS_POLLNVAL);
+ }
+ }
+
+
+
old_state = SSL_get_state(sslv->ssl);
ret = SSL_read(sslv->ssl, buffer, n);
if (old_state != SSL_get_state(sslv->ssl)) {
@@ -729,6 +754,19 @@ ssl_do_tx(struct stream *stream)
{
struct ssl_stream *sslv = ssl_stream_cast(stream);
+ if (sslv->hint) {
+ /* poll-loop is providing us with hints for IO */
+ if (sslv->tx_want == SSL_WRITING) {
+ if (!(sslv->hint->revents & OVS_POLLOUT)) {
+ return EAGAIN;
+ } else {
+ /* POLLIN event from poll loop, mark us as ready
+ * rx_want is cleared further down by reading ssl fsm
+ */
+ sslv->hint->revents &= ~OVS_POLLOUT;
+ }
+ }
+ }
for (;;) {
int old_state = SSL_get_state(sslv->ssl);
int ret = SSL_write(sslv->ssl, sslv->txbuf->data, sslv->txbuf->size);
@@ -771,6 +809,8 @@ ssl_send(struct stream *stream, const void *buffer, size_t
n)
ssl_clear_txbuf(sslv);
return n;
case EAGAIN:
+ /* we want to know when this fd will become available again */
+ stream_send_wait(stream);
return n;
default:
ssl_clear_txbuf(sslv);
@@ -795,7 +835,7 @@ ssl_run_wait(struct stream *stream)
struct ssl_stream *sslv = ssl_stream_cast(stream);
if (sslv->tx_want != SSL_NOTHING) {
- poll_fd_wait(sslv->fd, want_to_poll_events(sslv->tx_want));
+ private_poll_fd_wait(sslv->fd, want_to_poll_events(sslv->tx_want));
}
}
@@ -811,13 +851,13 @@ ssl_wait(struct stream *stream, enum stream_wait_type wait)
} else {
switch (sslv->state) {
case STATE_TCP_CONNECTING:
- poll_fd_wait(sslv->fd, OVS_POLLOUT);
+ private_poll_fd_wait(sslv->fd, OVS_POLLOUT);
break;
case STATE_SSL_CONNECTING:
/* ssl_connect() called SSL_accept() or SSL_connect(), which
* set up the status that we test here. */
- poll_fd_wait(sslv->fd,
+ private_poll_fd_wait(sslv->fd,
want_to_poll_events(SSL_want(sslv->ssl)));
break;
@@ -829,7 +869,7 @@ ssl_wait(struct stream *stream, enum stream_wait_type wait)
case STREAM_RECV:
if (sslv->rx_want != SSL_NOTHING) {
- poll_fd_wait(sslv->fd, want_to_poll_events(sslv->rx_want));
+ private_poll_fd_wait(sslv->fd, want_to_poll_events(sslv->rx_want));
} else {
poll_immediate_wake();
}
@@ -911,6 +951,7 @@ pssl_open(const char *name OVS_UNUSED, char *suffix, struct
pstream **pstreamp,
ds_steal_cstr(&bound_name));
pstream_set_bound_port(&pssl->pstream, htons(port));
pssl->fd = fd;
+ poll_fd_register(fd, OVS_POLLIN, NULL);
*pstreamp = &pssl->pstream;
return 0;
@@ -920,6 +961,7 @@ static void
pssl_close(struct pstream *pstream)
{
struct pssl_pstream *pssl = pssl_pstream_cast(pstream);
+ poll_fd_deregister(pssl->fd);
closesocket(pssl->fd);
free(pssl);
}
diff --git a/lib/timeval.c b/lib/timeval.c
index 193c7bab1..59a12414f 100644
--- a/lib/timeval.c
+++ b/lib/timeval.c
@@ -38,6 +38,7 @@
#include "unixctl.h"
#include "util.h"
#include "openvswitch/vlog.h"
+#include "openvswitch/poll-loop.h"
VLOG_DEFINE_THIS_MODULE(timeval);
@@ -369,6 +370,88 @@ time_poll(struct pollfd *pollfds, int n_pollfds, HANDLE *handles OVS_UNUSED,
return retval;
}
+#ifdef OVS_USE_EPOLL
+
+/* Like epoll_wait(), except:
+ *
+ * - The timeout is specified as an absolute time, as defined by
+ * time_msec(), instead of a duration.
+ *
+ * - On error, returns a negative error code (instead of setting errno).
+ *
+ * - If interrupted by a signal, retries automatically until the original
+ * timeout is reached. (Because of this property, this function will
+ * never return -EINTR.)
+ *
+ * Stores the number of milliseconds elapsed during poll in '*elapsed'. */
+int
+time_epoll_wait(int epoll_fd, struct epoll_event *events, int max,
+ long long int timeout_when, int *elapsed)
+{
+ long long int *last_wakeup = last_wakeup_get();
+ long long int start;
+ bool quiescent;
+ int retval = 0;
+
+ time_init();
+ coverage_clear();
+ coverage_run();
+ if (*last_wakeup && !thread_is_pmd()) {
+ log_poll_interval(*last_wakeup);
+ }
+ start = time_msec();
+
+ timeout_when = MIN(timeout_when, deadline);
+ quiescent = ovsrcu_is_quiescent();
+
+ for (;;) {
+ long long int now = time_msec();
+ int time_left;
+
+ if (now >= timeout_when) {
+ time_left = 0;
+ } else if ((unsigned long long int) timeout_when - now > INT_MAX) {
+ time_left = INT_MAX;
+ } else {
+ time_left = timeout_when - now;
+ }
+
+ if (!quiescent) {
+ if (!time_left) {
+ ovsrcu_quiesce();
+ } else {
+ ovsrcu_quiesce_start();
+ }
+ }
+
+ retval = epoll_wait(epoll_fd, events, max, time_left);
+ if (retval < 0) {
+ retval = -errno;
+ }
+
+ if (!quiescent && time_left) {
+ ovsrcu_quiesce_end();
+ }
+
+ if (deadline <= time_msec()) {
+ fatal_signal_handler(SIGALRM);
+ if (retval < 0) {
+ retval = 0;
+ }
+ break;
+ }
+
+ if (retval != -EINTR) {
+ break;
+ }
+ }
+ *last_wakeup = time_msec();
+ refresh_rusage();
+ *elapsed = *last_wakeup - start;
+ return retval;
+}
+#endif
+
long long int
timespec_to_msec(const struct timespec *ts)
{
diff --git a/lib/timeval.h b/lib/timeval.h
index 502f703d4..347a09d63 100644
--- a/lib/timeval.h
+++ b/lib/timeval.h
@@ -20,6 +20,9 @@
#include <time.h>
#include "openvswitch/type-props.h"
#include "util.h"
+#ifdef __linux__
+#include <sys/epoll.h>
+#endif
#ifdef __cplusplus
extern "C" {
@@ -61,6 +64,10 @@ void time_wall_timespec(struct timespec *);
void time_alarm(unsigned int secs);
int time_poll(struct pollfd *, int n_pollfds, HANDLE *handles,
long long int timeout_when, int *elapsed);
+#ifdef __linux__
+int time_epoll_wait(int epoll_fd, struct epoll_event *events, int max,
+ long long int timeout_when, int *elapsed);
+#endif
long long int timespec_to_msec(const struct timespec *);
long long int timespec_to_usec(const struct timespec *);