Hi,

On 2016-03-16 15:41:28 -0400, Robert Haas wrote:
> On Wed, Mar 16, 2016 at 3:25 PM, Andres Freund <and...@anarazel.de> wrote:
> >> > - Given current users we don't need a large amount of events, so having
> >> >   to iterate through the registered events doesn't seem bothersome. We
> >> >   could however change the api to be something like
> >> >
> >> >   int WaitLatchEventSet(LatchEventSet *set, OccurredEvents *, int 
> >> > nevents, long timeout);
> >> >
> >> >   which would return the number of events that happened, and would
> >> >   basically "fill" one of the (usually stack allocated) OccurredEvent
> >> >   structures with what happened.
> >>
> >> I definitely think something along these lines is useful.  I want to
> >> be able to have an Append node with 100 ForeignScans under it and kick
> >> off all the scans asynchronously and wait for all of the FDs at once.
> >
> > So you'd like to get only an event for the FD with data back? Or are you
> > ok with iterating through hundred elements in an array, to see which are
> > ready?
> 
> I'd like to get an event back for the FD with data.  Iterating sounds
> like it could be really slow.  Say you get lots of little packets back
> from the same connection, while the others are idle.  Now you've got
> to keep iterating through them all over and over again.  Blech.

I've a (working) WIP version that works like I think you want.  It's
implemented in patch 05 (rework with just poll() support) and (add epoll
suppport). It's based on patches posted here earlier, but these aren't
interesting for the discussion.

The API is now:

typedef struct WaitEventSet WaitEventSet;

typedef struct WaitEvent
{
        int             pos;            /* position in the event data structure 
*/
        uint32  events;         /* tripped events */
        int             fd;                     /* fd associated with event */
} WaitEvent;

/*
 * Create a WaitEventSet with space for nevents different events to wait for.
 *
 * latch may be NULL.
 */
extern WaitEventSet *CreateWaitEventSet(int nevents);

/* ---
 * Add an event to the set. Possible events are:
 * - WL_LATCH_SET: Wait for the latch to be set
 * - WL_POSTMASTER_DEATH: Wait for postmaster to die
 * - WL_SOCKET_READABLE: Wait for socket to become readable
 *   can be combined in one event with WL_SOCKET_WRITEABLE
 * - WL_SOCKET_WRITABLE: Wait for socket to become readable
 *   can be combined with WL_SOCKET_READABLE
 *
 * Returns the offset in WaitEventSet->events (starting from 0), which can be
 * used to modify previously added wait events.
 */
extern int AddWaitEventToSet(WaitEventSet *set, uint32 events, int fd, Latch 
*latch);

/*
 * Change the event mask and, if applicable, the associated latch of of a
 * WaitEvent.
 */
extern void ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch 
*latch);

/*
 * Wait for events added to the set to happen, or until the timeout is
 * reached.  At most nevents occurrent events are returned.
 *
 * Returns the number of events occurred, or 0 if the timeout was reached.
 */
extern int WaitEventSetWait(WaitEventSet *set, long timeout, WaitEvent* 
occurred_events, int nevents);


I've for now left the old latch API in place, and only converted
be-secure.c to the new style.  I'd appreciate some feedback before I go
around and convert and polish everything.

Questions:
* I'm kinda inclined to merge the win32 and unix latch
  implementations. There's already a fair bit in common, and this is
  just going to increase that amount.

* Right now the caller has to allocate the WaitEvents he's waiting for
  locally (likely on the stack), but we also could allocate them as part
  of the WaitEventSet. Not sure if that'd be a benefit.

* I can do a blind rewrite of the windows implementation, but I'm
  obviously not going to get that entirely right. So I need some help
  from a windows person to test this.

* This approach, with a 'stateful' wait event data structure, will
  actually allow to fix a couple linering bugs we have on the windows
  port. C.f. http://www.postgresql.org/message-id/4351.1336927...@sss.pgh.pa.us

- Andres
>From a692a0bd6a8af7427d491adfecff10df3953a8ae Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Wed, 16 Mar 2016 10:44:04 -0700
Subject: [PATCH 1/6] Access the correct pollfd when checking for socket errors
 in the latch code.

Previously, if waiting for a latch, but not a socket, we checked
pfds[0].revents for socket errors. Even though pfds[0] wasn't actually
associated with the socket in that case.

This is currently harmless, because we check wakeEvents after the the
aforementioned check. But it's a bug waiting to be happening.
---
 src/backend/port/unix_latch.c | 16 ++++++++++------
 1 file changed, 10 insertions(+), 6 deletions(-)

diff --git a/src/backend/port/unix_latch.c b/src/backend/port/unix_latch.c
index 2ad609c..2ffce60 100644
--- a/src/backend/port/unix_latch.c
+++ b/src/backend/port/unix_latch.c
@@ -365,13 +365,17 @@ WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock,
 				/* socket is writable */
 				result |= WL_SOCKET_WRITEABLE;
 			}
-			if (pfds[0].revents & (POLLHUP | POLLERR | POLLNVAL))
+			if ((wakeEvents & WL_SOCKET_READABLE) &&
+				(pfds[0].revents & (POLLHUP | POLLERR | POLLNVAL)))
 			{
-				/* EOF/error condition */
-				if (wakeEvents & WL_SOCKET_READABLE)
-					result |= WL_SOCKET_READABLE;
-				if (wakeEvents & WL_SOCKET_WRITEABLE)
-					result |= WL_SOCKET_WRITEABLE;
+				/* EOF/error condition, while waiting for readable socket */
+				result |= WL_SOCKET_READABLE;
+			}
+			if ((wakeEvents & WL_SOCKET_WRITEABLE) &&
+				(pfds[0].revents & (POLLHUP | POLLERR | POLLNVAL)))
+			{
+				/* EOF/error condition, while waiting for writeable socket */
+				result |= WL_SOCKET_WRITEABLE;
 			}
 
 			/*
-- 
2.7.0.229.g701fa7f

>From d56555391a67fe9b4f4808b6b9f97a35c3682460 Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Thu, 14 Jan 2016 14:17:43 +0100
Subject: [PATCH 2/6] Make it easier to choose the used waiting primitive in
 unix_latch.c.

This allows for easier testing of the different primitives; in
preparation for adding a new primitive.

Discussion: 20160114143931.gg10...@awork2.anarazel.de
Reviewed-By: Robert Haas
---
 src/backend/port/unix_latch.c | 50 +++++++++++++++++++++++++++++--------------
 1 file changed, 34 insertions(+), 16 deletions(-)

diff --git a/src/backend/port/unix_latch.c b/src/backend/port/unix_latch.c
index 2ffce60..93fbc9e 100644
--- a/src/backend/port/unix_latch.c
+++ b/src/backend/port/unix_latch.c
@@ -56,6 +56,22 @@
 #include "storage/pmsignal.h"
 #include "storage/shmem.h"
 
+/*
+ * Select the fd readiness primitive to use. Normally the "most modern"
+ * primitive supported by the OS will be used, but for testing it can be
+ * useful to manually specify the used primitive.  If desired, just add a
+ * define somewhere before this block.
+ */
+#if defined(LATCH_USE_POLL) || defined(LATCH_USE_SELECT)
+/* don't overwrite manual choice */
+#elif defined(HAVE_POLL)
+#define LATCH_USE_POLL
+#elif HAVE_SYS_SELECT_H
+#define LATCH_USE_SELECT
+#else
+#error "no latch implementation available"
+#endif
+
 /* Are we currently in WaitLatch? The signal handler would like to know. */
 static volatile sig_atomic_t waiting = false;
 
@@ -215,10 +231,10 @@ WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock,
 				cur_time;
 	long		cur_timeout;
 
-#ifdef HAVE_POLL
+#if defined(LATCH_USE_POLL)
 	struct pollfd pfds[3];
 	int			nfds;
-#else
+#elif defined(LATCH_USE_SELECT)
 	struct timeval tv,
 			   *tvp;
 	fd_set		input_mask;
@@ -247,7 +263,7 @@ WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock,
 		Assert(timeout >= 0 && timeout <= INT_MAX);
 		cur_timeout = timeout;
 
-#ifndef HAVE_POLL
+#ifdef LATCH_USE_SELECT
 		tv.tv_sec = cur_timeout / 1000L;
 		tv.tv_usec = (cur_timeout % 1000L) * 1000L;
 		tvp = &tv;
@@ -257,7 +273,7 @@ WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock,
 	{
 		cur_timeout = -1;
 
-#ifndef HAVE_POLL
+#ifdef LATCH_USE_SELECT
 		tvp = NULL;
 #endif
 	}
@@ -291,16 +307,10 @@ WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock,
 		}
 
 		/*
-		 * Must wait ... we use poll(2) if available, otherwise select(2).
-		 *
-		 * On at least older linux kernels select(), in violation of POSIX,
-		 * doesn't reliably return a socket as writable if closed - but we
-		 * rely on that. So far all the known cases of this problem are on
-		 * platforms that also provide a poll() implementation without that
-		 * bug.  If we find one where that's not the case, we'll need to add a
-		 * workaround.
+		 * Must wait ... we use the polling interface determined at the top of
+		 * this file to do so.
 		 */
-#ifdef HAVE_POLL
+#if defined(LATCH_USE_POLL)
 		nfds = 0;
 		if (wakeEvents & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE))
 		{
@@ -400,8 +410,16 @@ WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock,
 					result |= WL_POSTMASTER_DEATH;
 			}
 		}
-#else							/* !HAVE_POLL */
+#elif defined(LATCH_USE_SELECT)
 
+		/*
+		 * On at least older linux kernels select(), in violation of POSIX,
+		 * doesn't reliably return a socket as writable if closed - but we
+		 * rely on that. So far all the known cases of this problem are on
+		 * platforms that also provide a poll() implementation without that
+		 * bug.  If we find one where that's not the case, we'll need to add a
+		 * workaround.
+		 */
 		FD_ZERO(&input_mask);
 		FD_ZERO(&output_mask);
 
@@ -481,7 +499,7 @@ WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock,
 					result |= WL_POSTMASTER_DEATH;
 			}
 		}
-#endif   /* HAVE_POLL */
+#endif   /* LATCH_USE_SELECT */
 
 		/* If we're not done, update cur_timeout for next iteration */
 		if (result == 0 && (wakeEvents & WL_TIMEOUT))
@@ -494,7 +512,7 @@ WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock,
 				/* Timeout has expired, no need to continue looping */
 				result |= WL_TIMEOUT;
 			}
-#ifndef HAVE_POLL
+#ifdef LATCH_USE_SELECT
 			else
 			{
 				tv.tv_sec = cur_timeout / 1000L;
-- 
2.7.0.229.g701fa7f

>From 174dc618b8b9be46c6c12f247d7129dbfb1300f4 Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Thu, 14 Jan 2016 14:24:09 +0100
Subject: [PATCH 3/6] Error out if waiting on socket readiness without a
 specified socket.

Previously we just ignored such an attempt, but that seems to serve no
purpose but making things harder to debug.

Discussion: 20160114143931.gg10...@awork2.anarazel.de
    20151230173734.hx7jj2fnwyljf...@alap3.anarazel.de
Reviewed-By: Robert Haas
---
 src/backend/port/unix_latch.c  | 7 ++++---
 src/backend/port/win32_latch.c | 6 ++++--
 2 files changed, 8 insertions(+), 5 deletions(-)

diff --git a/src/backend/port/unix_latch.c b/src/backend/port/unix_latch.c
index 93fbc9e..b06798f 100644
--- a/src/backend/port/unix_latch.c
+++ b/src/backend/port/unix_latch.c
@@ -242,9 +242,10 @@ WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock,
 	int			hifd;
 #endif
 
-	/* Ignore WL_SOCKET_* events if no valid socket is given */
-	if (sock == PGINVALID_SOCKET)
-		wakeEvents &= ~(WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE);
+	/* waiting for socket readiness without a socket indicates a bug */
+	if (sock == PGINVALID_SOCKET &&
+		(wakeEvents & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) != 0)
+		elog(ERROR, "cannot wait on socket events without a socket");
 
 	Assert(wakeEvents != 0);	/* must have at least one wake event */
 
diff --git a/src/backend/port/win32_latch.c b/src/backend/port/win32_latch.c
index 80adc13..e101acf 100644
--- a/src/backend/port/win32_latch.c
+++ b/src/backend/port/win32_latch.c
@@ -119,8 +119,10 @@ WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock,
 
 	Assert(wakeEvents != 0);	/* must have at least one wake event */
 
-	if ((wakeEvents & WL_LATCH_SET) && latch->owner_pid != MyProcPid)
-		elog(ERROR, "cannot wait on a latch owned by another process");
+	/* waiting for socket readiness without a socket indicates a bug */
+	if (sock == PGINVALID_SOCKET &&
+		(wakeEvents & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) != 0)
+		elog(ERROR, "cannot wait on socket events without a socket");
 
 	/*
 	 * Initialize timeout if requested.  We must record the current time so
-- 
2.7.0.229.g701fa7f

>From a8c526070791c6db27983aed6e0f1f9b9ed2554c Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Thu, 14 Jan 2016 15:15:17 +0100
Subject: [PATCH 4/6] Only clear unix_latch.c's self-pipe if it actually
 contains data.

This avoids a good number of, individually quite fast, system calls in
scenarios with many quick queries. Besides the aesthetic benefit of
seing fewer superflous system calls with strace, it also improves
performance by ~2% measured by pgbench -M prepared -c 96 -j 8 -S (scale
100).
---
 src/backend/port/unix_latch.c | 79 ++++++++++++++++++++++++++++---------------
 1 file changed, 52 insertions(+), 27 deletions(-)

diff --git a/src/backend/port/unix_latch.c b/src/backend/port/unix_latch.c
index b06798f..f6cb15b 100644
--- a/src/backend/port/unix_latch.c
+++ b/src/backend/port/unix_latch.c
@@ -283,27 +283,29 @@ WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock,
 	do
 	{
 		/*
-		 * Clear the pipe, then check if the latch is set already. If someone
-		 * sets the latch between this and the poll()/select() below, the
-		 * setter will write a byte to the pipe (or signal us and the signal
-		 * handler will do that), and the poll()/select() will return
-		 * immediately.
+		 * Check if the latch is set already. If so, leave loop immediately,
+		 * avoid blocking again. We don't attempt to report any other events
+		 * that might also be satisfied.
+		 *
+		 * If someone sets the latch between this and the poll()/select()
+		 * below, the setter will write a byte to the pipe (or signal us and
+		 * the signal handler will do that), and the poll()/select() will
+		 * return immediately.
+		 *
+		 * If there's a pending byte in the self pipe, we'll notice whenever
+		 * blocking. Only clearing the pipe in that case avoids having to
+		 * drain it everytime WaitLatchOrSocket() is used. Should the
+		 * pipe-buffer fill up in some scenarios - widly unlikely - we're
+		 * still ok, because the pipe is in nonblocking mode.
 		 *
 		 * Note: we assume that the kernel calls involved in drainSelfPipe()
 		 * and SetLatch() will provide adequate synchronization on machines
 		 * with weak memory ordering, so that we cannot miss seeing is_set if
 		 * the signal byte is already in the pipe when we drain it.
 		 */
-		drainSelfPipe();
-
 		if ((wakeEvents & WL_LATCH_SET) && latch->is_set)
 		{
 			result |= WL_LATCH_SET;
-
-			/*
-			 * Leave loop immediately, avoid blocking again. We don't attempt
-			 * to report any other events that might also be satisfied.
-			 */
 			break;
 		}
 
@@ -313,24 +315,26 @@ WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock,
 		 */
 #if defined(LATCH_USE_POLL)
 		nfds = 0;
+
+		/* selfpipe is always in pfds[0] */
+		pfds[0].fd = selfpipe_readfd;
+		pfds[0].events = POLLIN;
+		pfds[0].revents = 0;
+		nfds++;
+
 		if (wakeEvents & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE))
 		{
-			/* socket, if used, is always in pfds[0] */
-			pfds[0].fd = sock;
-			pfds[0].events = 0;
+			/* socket, if used, is always in pfds[1] */
+			pfds[1].fd = sock;
+			pfds[1].events = 0;
 			if (wakeEvents & WL_SOCKET_READABLE)
-				pfds[0].events |= POLLIN;
+				pfds[1].events |= POLLIN;
 			if (wakeEvents & WL_SOCKET_WRITEABLE)
-				pfds[0].events |= POLLOUT;
-			pfds[0].revents = 0;
+				pfds[1].events |= POLLOUT;
+			pfds[1].revents = 0;
 			nfds++;
 		}
 
-		pfds[nfds].fd = selfpipe_readfd;
-		pfds[nfds].events = POLLIN;
-		pfds[nfds].revents = 0;
-		nfds++;
-
 		if (wakeEvents & WL_POSTMASTER_DEATH)
 		{
 			/* postmaster fd, if used, is always in pfds[nfds - 1] */
@@ -364,26 +368,33 @@ WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock,
 		else
 		{
 			/* at least one event occurred, so check revents values */
+
+			if (pfds[0].revents & POLLIN)
+			{
+				/* There's data in the self-pipe, clear it. */
+				drainSelfPipe();
+			}
+
 			if ((wakeEvents & WL_SOCKET_READABLE) &&
-				(pfds[0].revents & POLLIN))
+				(pfds[1].revents & POLLIN))
 			{
 				/* data available in socket, or EOF/error condition */
 				result |= WL_SOCKET_READABLE;
 			}
 			if ((wakeEvents & WL_SOCKET_WRITEABLE) &&
-				(pfds[0].revents & POLLOUT))
+				(pfds[1].revents & POLLOUT))
 			{
 				/* socket is writable */
 				result |= WL_SOCKET_WRITEABLE;
 			}
 			if ((wakeEvents & WL_SOCKET_READABLE) &&
-				(pfds[0].revents & (POLLHUP | POLLERR | POLLNVAL)))
+				(pfds[1].revents & (POLLHUP | POLLERR | POLLNVAL)))
 			{
 				/* EOF/error condition, while waiting for readable socket */
 				result |= WL_SOCKET_READABLE;
 			}
 			if ((wakeEvents & WL_SOCKET_WRITEABLE) &&
-				(pfds[0].revents & (POLLHUP | POLLERR | POLLNVAL)))
+				(pfds[1].revents & (POLLHUP | POLLERR | POLLNVAL)))
 			{
 				/* EOF/error condition, while waiting for writeable socket */
 				result |= WL_SOCKET_WRITEABLE;
@@ -472,6 +483,11 @@ WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock,
 		else
 		{
 			/* at least one event occurred, so check masks */
+			if (FD_ISSET(selfpipe_readfd, &input_mask))
+			{
+				/* There's data in the self-pipe, clear it. */
+				drainSelfPipe();
+			}
 			if ((wakeEvents & WL_SOCKET_READABLE) && FD_ISSET(sock, &input_mask))
 			{
 				/* data available in socket, or EOF */
@@ -502,6 +518,15 @@ WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock,
 		}
 #endif   /* LATCH_USE_SELECT */
 
+		/*
+		 * Check again wether latch is set, the arrival of a signal/self-byte
+		 * might be what stopped our sleep. It's not required for correctness
+		 * to signal the latch as being set (we'd just loop if there's no
+		 * other event), but it seems good to report an arrived latch asap.
+		 */
+		if ((wakeEvents & WL_LATCH_SET) && latch->is_set)
+			result |= WL_LATCH_SET;
+
 		/* If we're not done, update cur_timeout for next iteration */
 		if (result == 0 && (wakeEvents & WL_TIMEOUT))
 		{
-- 
2.7.0.229.g701fa7f

>From bdb1101bc5b2fd0dd3efa6484f08fa4a1bb93b0c Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Wed, 16 Mar 2016 17:28:05 -0700
Subject: [PATCH 5/6] WIP: WaitEvent API

---
 src/backend/libpq/be-secure.c     |  24 +--
 src/backend/libpq/pqcomm.c        |  12 ++
 src/backend/port/unix_latch.c     | 317 ++++++++++++++++++++++++++++++++++++++
 src/backend/utils/init/miscinit.c |   8 +
 src/include/libpq/libpq.h         |   3 +
 src/include/storage/latch.h       |  44 ++++++
 6 files changed, 396 insertions(+), 12 deletions(-)

diff --git a/src/backend/libpq/be-secure.c b/src/backend/libpq/be-secure.c
index ac709d1..c396811 100644
--- a/src/backend/libpq/be-secure.c
+++ b/src/backend/libpq/be-secure.c
@@ -140,13 +140,13 @@ retry:
 	/* In blocking mode, wait until the socket is ready */
 	if (n < 0 && !port->noblock && (errno == EWOULDBLOCK || errno == EAGAIN))
 	{
-		int			w;
+		WaitEvent   event;
 
 		Assert(waitfor);
 
-		w = WaitLatchOrSocket(MyLatch,
-							  WL_LATCH_SET | WL_POSTMASTER_DEATH | waitfor,
-							  port->sock, 0);
+		ModifyWaitEvent(FeBeWaitSet, 0, waitfor, NULL);
+
+		WaitEventSetWait(FeBeWaitSet, 0 /* no timeout */, &event, 1);
 
 		/*
 		 * If the postmaster has died, it's not safe to continue running,
@@ -165,13 +165,13 @@ retry:
 		 * cycles checking for this very rare condition, and this should cause
 		 * us to exit quickly in most cases.)
 		 */
-		if (w & WL_POSTMASTER_DEATH)
+		if (event.events & WL_POSTMASTER_DEATH)
 			ereport(FATAL,
 					(errcode(ERRCODE_ADMIN_SHUTDOWN),
 					errmsg("terminating connection due to unexpected postmaster exit")));
 
 		/* Handle interrupt. */
-		if (w & WL_LATCH_SET)
+		if (event.events & WL_LATCH_SET)
 		{
 			ResetLatch(MyLatch);
 			ProcessClientReadInterrupt(true);
@@ -241,22 +241,22 @@ retry:
 
 	if (n < 0 && !port->noblock && (errno == EWOULDBLOCK || errno == EAGAIN))
 	{
-		int			w;
+		WaitEvent   event;
 
 		Assert(waitfor);
 
-		w = WaitLatchOrSocket(MyLatch,
-							  WL_LATCH_SET | WL_POSTMASTER_DEATH | waitfor,
-							  port->sock, 0);
+		ModifyWaitEvent(FeBeWaitSet, 0, waitfor, NULL);
+
+		WaitEventSetWait(FeBeWaitSet, 0 /* no timeout */, &event, 1);
 
 		/* See comments in secure_read. */
-		if (w & WL_POSTMASTER_DEATH)
+		if (event.events & WL_POSTMASTER_DEATH)
 			ereport(FATAL,
 					(errcode(ERRCODE_ADMIN_SHUTDOWN),
 					errmsg("terminating connection due to unexpected postmaster exit")));
 
 		/* Handle interrupt. */
-		if (w & WL_LATCH_SET)
+		if (event.events & WL_LATCH_SET)
 		{
 			ResetLatch(MyLatch);
 			ProcessClientWriteInterrupt(true);
diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c
index 71473db..31d646d 100644
--- a/src/backend/libpq/pqcomm.c
+++ b/src/backend/libpq/pqcomm.c
@@ -201,6 +201,18 @@ pq_init(void)
 				(errmsg("could not set socket to nonblocking mode: %m")));
 #endif
 
+	{
+		MemoryContext oldcontext;
+
+		oldcontext = MemoryContextSwitchTo(TopMemoryContext);
+
+		FeBeWaitSet = CreateWaitEventSet(3);
+		AddWaitEventToSet(FeBeWaitSet, WL_SOCKET_WRITEABLE, MyProcPort->sock, NULL);
+		AddWaitEventToSet(FeBeWaitSet, WL_LATCH_SET, -1, MyLatch);
+		AddWaitEventToSet(FeBeWaitSet, WL_POSTMASTER_DEATH, -1, NULL);
+
+		MemoryContextSwitchTo(oldcontext);
+	}
 }
 
 /* --------------------------------
diff --git a/src/backend/port/unix_latch.c b/src/backend/port/unix_latch.c
index f6cb15b..9bcfe14 100644
--- a/src/backend/port/unix_latch.c
+++ b/src/backend/port/unix_latch.c
@@ -72,6 +72,28 @@
 #error "no latch implementation available"
 #endif
 
+#if defined(WAIT_USE_POLL) || defined(WAIT_USE_SELECT)
+/* don't overwrite manual choice */
+#elif defined(HAVE_POLL)
+#define WAIT_USE_POLL
+#elif HAVE_SYS_SELECT_H
+#define WAIT_USE_SELECT
+#else
+#error "no wait set implementation available"
+#endif
+
+typedef struct WaitEventSet
+{
+	int nevents;
+	int nevents_space;
+	Latch *latch;
+	int latch_pos;
+	WaitEvent *events;
+#if defined(WAIT_USE_POLL)
+	struct pollfd *pollfds;
+#endif
+} WaitEventSet;
+
 /* Are we currently in WaitLatch? The signal handler would like to know. */
 static volatile sig_atomic_t waiting = false;
 
@@ -83,6 +105,9 @@ static int	selfpipe_writefd = -1;
 static void sendSelfPipeByte(void);
 static void drainSelfPipe(void);
 
+#if defined(WAIT_USE_POLL)
+static void WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event);
+#endif
 
 /*
  * Initialize the process-local latch infrastructure.
@@ -636,6 +661,298 @@ ResetLatch(volatile Latch *latch)
 	pg_memory_barrier();
 }
 
+WaitEventSet *
+CreateWaitEventSet(int nevents)
+{
+	WaitEventSet   *set;
+	char		   *data;
+	Size			sz = 0;
+
+	sz += sizeof(WaitEventSet);
+	sz += sizeof(WaitEvent) * nevents;
+
+#if defined(LATCH_USE_POLL)
+	sz += sizeof(struct pollfd) * nevents;
+#endif
+
+	data = (char *) palloc0(sz);
+
+	set = (WaitEventSet *) data;
+	data += sizeof(WaitEventSet);
+
+	set->events = (WaitEvent *) data;
+	data += sizeof(WaitEvent) * nevents;
+
+#if defined(WAIT_USE_POLL)
+	set->pollfds = (struct pollfd *) data;
+	data += sizeof(struct pollfd) * nevents;
+#endif
+
+	set->latch = NULL;
+	set->nevents_space = nevents;
+
+	return set;
+}
+
+int
+AddWaitEventToSet(WaitEventSet *set, uint32 events, int fd, Latch *latch)
+{
+	WaitEvent *event;
+
+	if (set->nevents_space <= set->nevents)
+		elog(ERROR, "no space for yet another event");
+
+	if (set->latch && latch)
+		elog(ERROR, "can only wait for one latch");
+	if (!latch && (events & WL_LATCH_SET))
+		elog(ERROR, "cannot wait on latch without latch");
+
+	/* FIXME: validate event mask */
+
+	event = &set->events[set->nevents];
+	event->pos = set->nevents++;
+	event->fd = fd;
+	event->events = events;
+
+	if (events == WL_LATCH_SET)
+	{
+		set->latch = latch;
+		set->latch_pos = event->pos;
+		event->fd = selfpipe_readfd;
+	}
+	else if (events == WL_POSTMASTER_DEATH)
+	{
+		event->fd = postmaster_alive_fds[POSTMASTER_FD_WATCH];
+	}
+
+#if defined(WAIT_USE_POLL)
+	WaitEventAdjustPoll(set, event);
+#endif
+
+	return event->pos;
+}
+
+void
+ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
+{
+	WaitEvent *event;
+
+	Assert(pos < set->nevents);
+
+	event = &set->events[pos];
+
+	/* no need to perform any checks/modifications */
+	if (events == event->events && !(event->events & WL_LATCH_SET))
+		return;
+
+	if (event->events & WL_LATCH_SET &&
+		events != event->events)
+	{
+		/* we could allow to disable latch events for a while */
+		elog(ERROR, "cannot modify latch event");
+	}
+	if (event->events & WL_POSTMASTER_DEATH &&
+		events != event->events)
+	{
+		elog(ERROR, "cannot modify postmaster death event");
+	}
+
+	/* FIXME: validate event mask */
+	event->events = events;
+
+	if (events == WL_LATCH_SET)
+	{
+		set->latch = latch;
+	}
+
+#if defined(WAIT_USE_POLL)
+	WaitEventAdjustPoll(set, event);
+#endif
+}
+
+#if defined(WAIT_USE_POLL)
+static void
+WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event)
+{
+	struct pollfd *pollfd = &set->pollfds[event->pos];
+
+	pollfd->revents = 0;
+	pollfd->fd = event->fd;
+
+	/* prepare pollfd entry once */
+	if (event->events == WL_LATCH_SET)
+	{
+		Assert(set->latch != NULL);
+		pollfd->events = POLLIN;
+	}
+	else if (event->events == WL_POSTMASTER_DEATH)
+	{
+		pollfd->events = POLLIN;
+	}
+	else
+	{
+		Assert(event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE));
+		pollfd->events = 0;
+		if (event->events & WL_SOCKET_READABLE)
+			pollfd->events |= POLLIN;
+		if (event->events & WL_SOCKET_WRITEABLE)
+			pollfd->events |= POLLOUT;
+	}
+
+	Assert(event->fd >= 0);
+}
+#endif
+
+#if defined(WAIT_USE_POLL)
+int
+WaitEventSetWait(WaitEventSet *set, long timeout,
+				 WaitEvent* occurred_events, int nevents)
+{
+	int returned_events = 0;
+	instr_time	start_time,
+				cur_time;
+	long		cur_timeout = -1;
+	WaitEvent *cur_event;
+
+	struct pollfd *pfds = set->pollfds;
+
+	Assert(nevents > 0);
+
+	if (timeout)
+	{
+		INSTR_TIME_SET_CURRENT(start_time);
+		Assert(timeout >= 0 && timeout <= INT_MAX);
+		cur_timeout = timeout;
+	}
+
+	waiting = true;
+	while (returned_events == 0)
+	{
+		int rc;
+		int pos;
+		struct pollfd *cur_pollfd;
+
+		/* return immediately if latch is set */
+		if (set->latch && set->latch->is_set)
+		{
+			occurred_events->fd = -1;
+			occurred_events->pos = set->latch_pos;
+			occurred_events->events = WL_LATCH_SET;
+			occurred_events++;
+			returned_events++;
+
+			continue;
+		}
+
+		/* Sleep */
+		rc = poll(pfds, set->nevents, (int) cur_timeout);
+
+		/* Check return code */
+		if (rc < 0)
+		{
+			/* EINTR is okay, otherwise complain */
+			if (errno != EINTR)
+			{
+				waiting = false;
+				ereport(ERROR,
+						(errcode_for_socket_access(),
+						 errmsg("poll() failed: %m")));
+			}
+			continue;
+		}
+		else if (rc == 0)
+		{
+			break;
+		}
+
+		for (pos = 0, cur_event = set->events, cur_pollfd = set->pollfds;
+			 pos < set->nevents && returned_events < nevents;
+			 pos++, cur_event++, cur_pollfd++)
+		{
+			if (cur_event->events == WL_LATCH_SET &&
+				(cur_pollfd->revents & (POLLIN | POLLHUP | POLLERR | POLLNVAL)))
+			{
+				/* There's data in the self-pipe, clear it. */
+				drainSelfPipe();
+
+				if (set->latch->is_set)
+				{
+					occurred_events->fd = -1;
+					occurred_events->pos = cur_event->pos;
+					occurred_events->events = WL_LATCH_SET;
+					occurred_events++;
+				}
+			}
+			else if (cur_event->events == WL_POSTMASTER_DEATH &&
+					 (cur_pollfd->revents & (POLLIN | POLLHUP | POLLERR | POLLNVAL)))
+			{
+				/*
+				 * According to the select(2) man page on Linux, select(2) may
+				 * spuriously return and report a file descriptor as readable,
+				 * when it's not; and presumably so can poll(2).  It's not
+				 * clear that the relevant cases would ever apply to the
+				 * postmaster pipe, but since the consequences of falsely
+				 * returning WL_POSTMASTER_DEATH could be pretty unpleasant,
+				 * we take the trouble to positively verify EOF with
+				 * PostmasterIsAlive().
+				 */
+				if (!PostmasterIsAlive())
+				{
+					occurred_events->fd = -1;
+					occurred_events->pos = cur_event->pos;
+					occurred_events->events = WL_POSTMASTER_DEATH;
+					occurred_events++;
+					returned_events++;
+				}
+			}
+			else if (cur_event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE))
+			{
+				Assert(cur_event->fd);
+
+				occurred_events->fd = cur_event->fd;
+				occurred_events->pos = cur_event->pos;
+				occurred_events->events = 0;
+
+				if ((cur_event->events & WL_SOCKET_READABLE) &&
+					(cur_pollfd->revents & (POLLIN | POLLHUP | POLLERR | POLLNVAL)))
+				{
+					occurred_events->events |= WL_SOCKET_READABLE;
+				}
+
+				if ((cur_event->events & WL_SOCKET_WRITEABLE) &&
+					(cur_pollfd->revents & (POLLOUT | POLLHUP | POLLERR | POLLNVAL)))
+				{
+					occurred_events->events |= WL_SOCKET_WRITEABLE;
+					occurred_events++;
+					returned_events++;
+				}
+
+				if (occurred_events->events != 0)
+				{
+					occurred_events++;
+					returned_events++;
+				}
+			}
+		}
+
+
+		if (occurred_events == 0 && timeout != 0)
+		{
+			INSTR_TIME_SET_CURRENT(cur_time);
+			INSTR_TIME_SUBTRACT(cur_time, start_time);
+			cur_timeout = timeout - (long) INSTR_TIME_GET_MILLISEC(cur_time);
+			if (cur_timeout <= 0)
+				goto out;
+		}
+	}
+out:
+	waiting = false;
+
+	return returned_events;
+}
+#elif defined(WAIT_USE_SELECT)
+#endif
+
 /*
  * SetLatch uses SIGUSR1 to wake up the process waiting on the latch.
  *
diff --git a/src/backend/utils/init/miscinit.c b/src/backend/utils/init/miscinit.c
index 18f5e6f..d13355b 100644
--- a/src/backend/utils/init/miscinit.c
+++ b/src/backend/utils/init/miscinit.c
@@ -33,6 +33,7 @@
 
 #include "access/htup_details.h"
 #include "catalog/pg_authid.h"
+#include "libpq/libpq.h"
 #include "mb/pg_wchar.h"
 #include "miscadmin.h"
 #include "postmaster/autovacuum.h"
@@ -247,6 +248,9 @@ SwitchToSharedLatch(void)
 
 	MyLatch = &MyProc->procLatch;
 
+	if (FeBeWaitSet)
+		ModifyWaitEvent(FeBeWaitSet, 1, WL_LATCH_SET, MyLatch);
+
 	/*
 	 * Set the shared latch as the local one might have been set. This
 	 * shouldn't normally be necessary as code is supposed to check the
@@ -262,6 +266,10 @@ SwitchBackToLocalLatch(void)
 	Assert(MyProc != NULL && MyLatch == &MyProc->procLatch);
 
 	MyLatch = &LocalLatchData;
+
+	if (FeBeWaitSet)
+		ModifyWaitEvent(FeBeWaitSet, 1, WL_LATCH_SET, MyLatch);
+
 	SetLatch(MyLatch);
 }
 
diff --git a/src/include/libpq/libpq.h b/src/include/libpq/libpq.h
index 0569994..109fdf7 100644
--- a/src/include/libpq/libpq.h
+++ b/src/include/libpq/libpq.h
@@ -19,6 +19,7 @@
 
 #include "lib/stringinfo.h"
 #include "libpq/libpq-be.h"
+#include "storage/latch.h"
 
 
 typedef struct
@@ -95,6 +96,8 @@ extern ssize_t secure_raw_write(Port *port, const void *ptr, size_t len);
 
 extern bool ssl_loaded_verify_locations;
 
+WaitEventSet *FeBeWaitSet;
+
 /* GUCs */
 extern char *SSLCipherSuites;
 extern char *SSLECDHCurve;
diff --git a/src/include/storage/latch.h b/src/include/storage/latch.h
index e77491e..941e2f0 100644
--- a/src/include/storage/latch.h
+++ b/src/include/storage/latch.h
@@ -102,6 +102,50 @@ typedef struct Latch
 #define WL_TIMEOUT			 (1 << 3)
 #define WL_POSTMASTER_DEATH  (1 << 4)
 
+typedef struct WaitEventSet WaitEventSet;
+
+typedef struct WaitEvent
+{
+	int		pos;		/* position in the event data structure */
+	uint32	events;		/* tripped events */
+	int		fd;			/* fd associated with event */
+} WaitEvent;
+
+/*
+ * Create a WaitEventSet with space for nevents different events to wait for.
+ *
+ * latch may be NULL.
+ */
+extern WaitEventSet *CreateWaitEventSet(int nevents);
+
+/* ---
+ * Add an event to the set. Possible events are:
+ * - WL_LATCH_SET: Wait for the latch to be set
+ * - WL_POSTMASTER_DEATH: Wait for postmaster to die
+ * - WL_SOCKET_READABLE: Wait for socket to become readable
+ *   can be combined in one event with WL_SOCKET_WRITEABLE
+ * - WL_SOCKET_WRITABLE: Wait for socket to become readable
+ *   can be combined with WL_SOCKET_READABLE
+ *
+ * Returns the offset in WaitEventSet->events (starting from 0), which can be
+ * used to modify previously added wait events.
+ */
+extern int AddWaitEventToSet(WaitEventSet *set, uint32 events, int fd, Latch *latch);
+
+/*
+ * Change the event mask and, if applicable, the associated latch of of a
+ * WaitEvent.
+ */
+extern void ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch);
+
+/*
+ * Wait for events added to the set to happen, or until the timeout is
+ * reached.  At most nevents occurrent events are returned.
+ *
+ * Returns the number of events occurred, or 0 if the timeout was reached.
+ */
+extern int WaitEventSetWait(WaitEventSet *set, long timeout, WaitEvent* occurred_events, int nevents);
+
 /*
  * prototypes for functions in latch.c
  */
-- 
2.7.0.229.g701fa7f

>From 3feafa5ecd6666aacbaf3ceda466044476dda63a Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Wed, 16 Mar 2016 18:14:18 -0700
Subject: [PATCH 6/6] WIP: Use epoll for Wait Event API if available.

---
 configure                     |   2 +-
 configure.in                  |   2 +-
 src/backend/port/unix_latch.c | 238 ++++++++++++++++++++++++++++++++++++++++--
 src/include/pg_config.h.in    |   3 +
 4 files changed, 235 insertions(+), 10 deletions(-)

diff --git a/configure b/configure
index a45be67..da897ae 100755
--- a/configure
+++ b/configure
@@ -10193,7 +10193,7 @@ fi
 ## Header files
 ##
 
-for ac_header in atomic.h crypt.h dld.h fp_class.h getopt.h ieeefp.h ifaddrs.h langinfo.h mbarrier.h poll.h pwd.h sys/ioctl.h sys/ipc.h sys/poll.h sys/pstat.h sys/resource.h sys/select.h sys/sem.h sys/shm.h sys/socket.h sys/sockio.h sys/tas.h sys/time.h sys/un.h termios.h ucred.h utime.h wchar.h wctype.h
+for ac_header in atomic.h crypt.h dld.h fp_class.h getopt.h ieeefp.h ifaddrs.h langinfo.h mbarrier.h poll.h pwd.h sys/epoll.h sys/ioctl.h sys/ipc.h sys/poll.h sys/pstat.h sys/resource.h sys/select.h sys/sem.h sys/shm.h sys/socket.h sys/sockio.h sys/tas.h sys/time.h sys/un.h termios.h ucred.h utime.h wchar.h wctype.h
 do :
   as_ac_Header=`$as_echo "ac_cv_header_$ac_header" | $as_tr_sh`
 ac_fn_c_check_header_mongrel "$LINENO" "$ac_header" "$as_ac_Header" "$ac_includes_default"
diff --git a/configure.in b/configure.in
index c298926..dee3c45 100644
--- a/configure.in
+++ b/configure.in
@@ -1183,7 +1183,7 @@ AC_SUBST(UUID_LIBS)
 ##
 
 dnl sys/socket.h is required by AC_FUNC_ACCEPT_ARGTYPES
-AC_CHECK_HEADERS([atomic.h crypt.h dld.h fp_class.h getopt.h ieeefp.h ifaddrs.h langinfo.h mbarrier.h poll.h pwd.h sys/ioctl.h sys/ipc.h sys/poll.h sys/pstat.h sys/resource.h sys/select.h sys/sem.h sys/shm.h sys/socket.h sys/sockio.h sys/tas.h sys/time.h sys/un.h termios.h ucred.h utime.h wchar.h wctype.h])
+AC_CHECK_HEADERS([atomic.h crypt.h dld.h fp_class.h getopt.h ieeefp.h ifaddrs.h langinfo.h mbarrier.h poll.h pwd.h sys/epoll.h sys/ioctl.h sys/ipc.h sys/poll.h sys/pstat.h sys/resource.h sys/select.h sys/sem.h sys/shm.h sys/socket.h sys/sockio.h sys/tas.h sys/time.h sys/un.h termios.h ucred.h utime.h wchar.h wctype.h])
 
 # On BSD, test for net/if.h will fail unless sys/socket.h
 # is included first.
diff --git a/src/backend/port/unix_latch.c b/src/backend/port/unix_latch.c
index 9bcfe14..c233d68 100644
--- a/src/backend/port/unix_latch.c
+++ b/src/backend/port/unix_latch.c
@@ -38,6 +38,9 @@
 #include <unistd.h>
 #include <sys/time.h>
 #include <sys/types.h>
+#ifdef HAVE_SYS_EPOLL_H
+#include <sys/epoll.h>
+#endif
 #ifdef HAVE_POLL_H
 #include <poll.h>
 #endif
@@ -72,8 +75,10 @@
 #error "no latch implementation available"
 #endif
 
-#if defined(WAIT_USE_POLL) || defined(WAIT_USE_SELECT)
+#if defined(WAIT_USE_EPOLL) || defined(WAIT_USE_POLL) || defined(WAIT_USE_SELECT)
 /* don't overwrite manual choice */
+#elif defined(HAVE_SYS_EPOLL_H)
+#define WAIT_USE_EPOLL
 #elif defined(HAVE_POLL)
 #define WAIT_USE_POLL
 #elif HAVE_SYS_SELECT_H
@@ -89,7 +94,10 @@ typedef struct WaitEventSet
 	Latch *latch;
 	int latch_pos;
 	WaitEvent *events;
-#if defined(WAIT_USE_POLL)
+#if defined(WAIT_USE_EPOLL)
+	struct epoll_event *epoll_ret_events;
+	int epoll_fd;
+#elif defined(WAIT_USE_POLL)
 	struct pollfd *pollfds;
 #endif
 } WaitEventSet;
@@ -105,7 +113,9 @@ static int	selfpipe_writefd = -1;
 static void sendSelfPipeByte(void);
 static void drainSelfPipe(void);
 
-#if defined(WAIT_USE_POLL)
+#if defined(WAIT_USE_EPOLL)
+static void WaitEventAdjustEpoll(WaitEventSet *set, WaitEvent *event, int action);
+#elif defined(WAIT_USE_POLL)
 static void WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event);
 #endif
 
@@ -671,7 +681,9 @@ CreateWaitEventSet(int nevents)
 	sz += sizeof(WaitEventSet);
 	sz += sizeof(WaitEvent) * nevents;
 
-#if defined(LATCH_USE_POLL)
+#if defined(WAIT_USE_EPOLL)
+	sz += sizeof(struct epoll_event) * nevents;
+#elif defined(WAIT_USE_POLL)
 	sz += sizeof(struct pollfd) * nevents;
 #endif
 
@@ -683,7 +695,10 @@ CreateWaitEventSet(int nevents)
 	set->events = (WaitEvent *) data;
 	data += sizeof(WaitEvent) * nevents;
 
-#if defined(WAIT_USE_POLL)
+#if defined(WAIT_USE_EPOLL)
+	set->epoll_ret_events = (struct epoll_event *) data;
+	data += sizeof(struct epoll_event) * nevents;
+#elif defined(WAIT_USE_POLL)
 	set->pollfds = (struct pollfd *) data;
 	data += sizeof(struct pollfd) * nevents;
 #endif
@@ -691,6 +706,12 @@ CreateWaitEventSet(int nevents)
 	set->latch = NULL;
 	set->nevents_space = nevents;
 
+#if defined(WAIT_USE_EPOLL)
+	set->epoll_fd = epoll_create(nevents);
+	if (set->epoll_fd < 0)
+		elog(ERROR, "epoll_create failed: %m");
+#endif
+
 	return set;
 }
 
@@ -725,7 +746,9 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, int fd, Latch *latch)
 		event->fd = postmaster_alive_fds[POSTMASTER_FD_WATCH];
 	}
 
-#if defined(WAIT_USE_POLL)
+#if defined(WAIT_USE_EPOLL)
+	WaitEventAdjustEpoll(set, event, EPOLL_CTL_ADD);
+#elif defined(WAIT_USE_POLL)
 	WaitEventAdjustPoll(set, event);
 #endif
 
@@ -765,11 +788,59 @@ ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
 		set->latch = latch;
 	}
 
-#if defined(WAIT_USE_POLL)
+#if defined(WAIT_USE_EPOLL)
+	WaitEventAdjustEpoll(set, event, EPOLL_CTL_MOD);
+#elif defined(WAIT_USE_POLL)
 	WaitEventAdjustPoll(set, event);
 #endif
 }
 
+#if defined(WAIT_USE_EPOLL)
+static void
+WaitEventAdjustEpoll(WaitEventSet *set, WaitEvent *event, int action)
+{
+	struct epoll_event epoll_ev;
+	int rc;
+
+	epoll_ev.events = EPOLLERR | EPOLLHUP;
+	/* pointer to our event, returned by epoll_wait */
+	epoll_ev.data.ptr = event;
+
+	/* prepare pollfd entry once */
+	if (event->events == WL_LATCH_SET)
+	{
+		Assert(set->latch != NULL);
+		epoll_ev.events |= EPOLLIN;
+	}
+	else if (event->events == WL_POSTMASTER_DEATH)
+	{
+		epoll_ev.events |= EPOLLIN;
+	}
+	else
+	{
+		Assert(event->fd >= 0);
+		Assert(event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE));
+
+		if (event->events & WL_SOCKET_READABLE)
+			epoll_ev.events |= EPOLLIN;
+		if (event->events & WL_SOCKET_WRITEABLE)
+			epoll_ev.events |= EPOLLOUT;
+	}
+
+	/*
+	 * Even though unused, we also poss epoll_ev as the data argument for
+	 * EPOLL_CTL_DELETE.  There used to be an epoll bug requiring that, and it
+	 * makes the code simpler...
+	 */
+	rc = epoll_ctl(set->epoll_fd, action, event->fd, &epoll_ev);
+
+	if (rc < 0)
+		ereport(ERROR,
+				(errcode_for_socket_access(),
+				 errmsg("epoll_ctl() failed: %m")));
+}
+#endif
+
 #if defined(WAIT_USE_POLL)
 static void
 WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event)
@@ -803,7 +874,158 @@ WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event)
 }
 #endif
 
-#if defined(WAIT_USE_POLL)
+#if defined(WAIT_USE_EPOLL)
+int
+WaitEventSetWait(WaitEventSet *set, long timeout,
+				 WaitEvent* occurred_events, int nevents)
+{
+	int returned_events = 0;
+	instr_time	start_time,
+				cur_time;
+	long		cur_timeout = -1;
+	WaitEvent *cur_event;
+
+	Assert(nevents > 0);
+
+	if (timeout)
+	{
+		INSTR_TIME_SET_CURRENT(start_time);
+		Assert(timeout >= 0 && timeout <= INT_MAX);
+		cur_timeout = timeout;
+	}
+
+	waiting = true;
+	while (returned_events == 0)
+	{
+		int rc;
+		int pos;
+		struct epoll_event *cur_epoll_event;
+
+		/* return immediately if latch is set */
+		if (set->latch && set->latch->is_set)
+		{
+			occurred_events->fd = -1;
+			occurred_events->pos = set->latch_pos;
+			occurred_events->events = WL_LATCH_SET;
+			occurred_events++;
+			returned_events++;
+
+			continue;
+		}
+
+		/* Sleep */
+		rc = epoll_wait(set->epoll_fd, set->epoll_ret_events,
+						nevents, cur_timeout);
+
+		/* Check return code */
+		if (rc < 0)
+		{
+			/* EINTR is okay, otherwise complain */
+			if (errno != EINTR)
+			{
+				waiting = false;
+				ereport(ERROR,
+						(errcode_for_socket_access(),
+						 errmsg("poll() failed: %m")));
+			}
+			continue;
+		}
+		else if (rc == 0)
+		{
+			break;
+		}
+
+		/* iterate over the returned epoll events */
+		for (pos = 0, cur_epoll_event = set->epoll_ret_events;
+			 pos < rc && returned_events < nevents;
+			 pos++, cur_epoll_event++)
+		{
+			cur_event = (WaitEvent *) cur_epoll_event->data.ptr;
+
+			if (cur_event->events == WL_LATCH_SET &&
+				cur_epoll_event->events & (EPOLLIN | EPOLLERR | EPOLLHUP))
+			{
+				/* There's data in the self-pipe, clear it. */
+				drainSelfPipe();
+
+				if (set->latch->is_set)
+				{
+					occurred_events->fd = -1;
+					occurred_events->pos = cur_event->pos;
+					occurred_events->events = WL_LATCH_SET;
+					occurred_events++;
+				}
+			}
+			else if (cur_event->events == WL_POSTMASTER_DEATH &&
+					 cur_epoll_event->events & (EPOLLIN | EPOLLERR | EPOLLHUP))
+			{
+				/*
+				 * FIXME:
+				 * According to the select(2) man page on Linux, select(2) may
+				 * spuriously return and report a file descriptor as readable,
+				 * when it's not; and presumably so can poll(2).  It's not
+				 * clear that the relevant cases would ever apply to the
+				 * postmaster pipe, but since the consequences of falsely
+				 * returning WL_POSTMASTER_DEATH could be pretty unpleasant,
+				 * we take the trouble to positively verify EOF with
+				 * PostmasterIsAlive().
+				 */
+				if (!PostmasterIsAlive())
+				{
+					occurred_events->fd = -1;
+					occurred_events->pos = cur_event->pos;
+					occurred_events->events = WL_POSTMASTER_DEATH;
+					occurred_events++;
+					returned_events++;
+				}
+			}
+			else if (cur_event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE))
+			{
+				Assert(cur_event->fd);
+
+				occurred_events->fd = cur_event->fd;
+				occurred_events->pos = cur_event->pos;
+				occurred_events->events = 0;
+
+				if ((cur_event->events & WL_SOCKET_READABLE) &&
+					(cur_epoll_event->events & (EPOLLIN | EPOLLERR | EPOLLHUP)))
+				{
+					occurred_events->events |= WL_SOCKET_READABLE;
+				}
+
+				if ((cur_event->events & WL_SOCKET_WRITEABLE) &&
+					(cur_epoll_event->events & (EPOLLOUT | EPOLLERR | EPOLLHUP)))
+				{
+					occurred_events->events |= WL_SOCKET_WRITEABLE;
+					occurred_events++;
+					returned_events++;
+				}
+
+				if (occurred_events->events != 0)
+				{
+					occurred_events++;
+					returned_events++;
+				}
+			}
+		}
+
+
+		if (occurred_events == 0 && timeout != 0)
+		{
+			INSTR_TIME_SET_CURRENT(cur_time);
+			INSTR_TIME_SUBTRACT(cur_time, start_time);
+			cur_timeout = timeout - (long) INSTR_TIME_GET_MILLISEC(cur_time);
+			if (cur_timeout <= 0)
+				goto out;
+		}
+	}
+out:
+	waiting = false;
+
+	return returned_events;
+}
+
+#elif defined(WAIT_USE_POLL)
 int
 WaitEventSetWait(WaitEventSet *set, long timeout,
 				 WaitEvent* occurred_events, int nevents)
diff --git a/src/include/pg_config.h.in b/src/include/pg_config.h.in
index 3813226..c72635c 100644
--- a/src/include/pg_config.h.in
+++ b/src/include/pg_config.h.in
@@ -530,6 +530,9 @@
 /* Define to 1 if you have the syslog interface. */
 #undef HAVE_SYSLOG
 
+/* Define to 1 if you have the <sys/epoll.h> header file. */
+#undef HAVE_SYS_EPOLL_H
+
 /* Define to 1 if you have the <sys/ioctl.h> header file. */
 #undef HAVE_SYS_IOCTL_H
 
-- 
2.7.0.229.g701fa7f

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to