On Sat, Dec 11, 2021 at 7:09 PM Thomas Munro <thomas.mu...@gmail.com> wrote:
> On Sat, Dec 11, 2021 at 6:11 PM Andres Freund <and...@anarazel.de> wrote:
> > Yuck. Is there really no better way to deal with this? What kind of errors 
> > is
> > this trying to handle transparently? Afaics this still changes when we'd
> > e.g. detect postmaster death.
>
> The problem is that WaitEventSetWait() only reports the latch, if it's
> set, so I removed it from the set (setting it to NULL), and then undo
> that afterwards.  Perhaps we could fix that root problem instead.
> That is, we could make it so that latches aren't higher priority in
> that way, ie don't hide other events[1].  Then I wouldn't have to
> modify the WES here, I could just ignore it in the output event list
> (and make sure that's big enough for all possible events, as I had it
> in the last version).  I'll think about that.

I tried that.  It seems OK, and gets rid of the PG_FINALLY(), which is
nice.  Latches still have higher priority, and still have the fast
return if already set and you asked for only one event, but now if you
ask for nevents > 1 we'll make the syscall too so we'll see the
WL_SOCKET_CLOSED.

It's possible that someone might want the old behaviour one day (fast
return even for nevents > 1, hiding other events), and then we'd have
to come up with some way to request that, which is the type of tricky
policy question that had put me off "fixing" this before.  But I guess
we could cross that bridge if we come to it.  Everywhere else calls
with nevents == 1, so that's hypothetical.

Better?
From 6b994807c29157ac7053ec347a51268d60f2b3b4 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.mu...@gmail.com>
Date: Fri, 30 Apr 2021 10:38:40 +1200
Subject: [PATCH v4 1/3] Add WL_SOCKET_CLOSED for socket shutdown events.

Provide a way for WaitEventSet to report that the remote peer has shut
down its socket, independently of whether there is any buffered data
remaining to be read.  This works only on systems where the kernel
exposes that information, namely:

* WAIT_USE_POLL builds using POLLRDHUP, if available
* WAIT_USE_EPOLL builds using EPOLLRDHUP
* WAIT_USE_KQUEUE builds using EV_EOF

Reviewed-by: Zhihong Yu <z...@yugabyte.com>
Reviewed-by: Maksim Milyutin <milyuti...@gmail.com>
Discussion: https://postgr.es/m/77def86b27e41f0efcba411460e929ae%40postgrespro.ru
---
 src/backend/storage/ipc/latch.c | 79 +++++++++++++++++++++++++++++----
 src/include/storage/latch.h     |  6 ++-
 2 files changed, 74 insertions(+), 11 deletions(-)

diff --git a/src/backend/storage/ipc/latch.c b/src/backend/storage/ipc/latch.c
index 1d893cf863..54e928c564 100644
--- a/src/backend/storage/ipc/latch.c
+++ b/src/backend/storage/ipc/latch.c
@@ -841,6 +841,7 @@ FreeWaitEventSet(WaitEventSet *set)
  * - WL_SOCKET_CONNECTED: Wait for socket connection to be established,
  *	 can be combined with other WL_SOCKET_* events (on non-Windows
  *	 platforms, this is the same as WL_SOCKET_WRITEABLE)
+ * - WL_SOCKET_CLOSED: Wait for socket to be closed by remote peer.
  * - WL_EXIT_ON_PM_DEATH: Exit immediately if the postmaster dies
  *
  * Returns the offset in WaitEventSet->events (starting from 0), which can be
@@ -1043,12 +1044,16 @@ WaitEventAdjustEpoll(WaitEventSet *set, WaitEvent *event, int action)
 	else
 	{
 		Assert(event->fd != PGINVALID_SOCKET);
-		Assert(event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE));
+		Assert(event->events & (WL_SOCKET_READABLE |
+								WL_SOCKET_WRITEABLE |
+								WL_SOCKET_CLOSED));
 
 		if (event->events & WL_SOCKET_READABLE)
 			epoll_ev.events |= EPOLLIN;
 		if (event->events & WL_SOCKET_WRITEABLE)
 			epoll_ev.events |= EPOLLOUT;
+		if (event->events & WL_SOCKET_CLOSED)
+			epoll_ev.events |= EPOLLRDHUP;
 	}
 
 	/*
@@ -1087,12 +1092,18 @@ WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event)
 	}
 	else
 	{
-		Assert(event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE));
+		Assert(event->events & (WL_SOCKET_READABLE |
+								WL_SOCKET_WRITEABLE |
+								WL_SOCKET_CLOSED));
 		pollfd->events = 0;
 		if (event->events & WL_SOCKET_READABLE)
 			pollfd->events |= POLLIN;
 		if (event->events & WL_SOCKET_WRITEABLE)
 			pollfd->events |= POLLOUT;
+#ifdef POLLRDHUP
+		if (event->events & WL_SOCKET_CLOSED)
+			pollfd->events |= POLLRDHUP;
+#endif
 	}
 
 	Assert(event->fd != PGINVALID_SOCKET);
@@ -1165,7 +1176,9 @@ WaitEventAdjustKqueue(WaitEventSet *set, WaitEvent *event, int old_events)
 	Assert(event->events != WL_LATCH_SET || set->latch != NULL);
 	Assert(event->events == WL_LATCH_SET ||
 		   event->events == WL_POSTMASTER_DEATH ||
-		   (event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)));
+		   (event->events & (WL_SOCKET_READABLE |
+							 WL_SOCKET_WRITEABLE |
+							 WL_SOCKET_CLOSED)));
 
 	if (event->events == WL_POSTMASTER_DEATH)
 	{
@@ -1188,9 +1201,9 @@ WaitEventAdjustKqueue(WaitEventSet *set, WaitEvent *event, int old_events)
 		 * old event mask to the new event mask, since kevent treats readable
 		 * and writable as separate events.
 		 */
-		if (old_events & WL_SOCKET_READABLE)
+		if (old_events & (WL_SOCKET_READABLE | WL_SOCKET_CLOSED))
 			old_filt_read = true;
-		if (event->events & WL_SOCKET_READABLE)
+		if (event->events & (WL_SOCKET_READABLE | WL_SOCKET_CLOSED))
 			new_filt_read = true;
 		if (old_events & WL_SOCKET_WRITEABLE)
 			old_filt_write = true;
@@ -1210,7 +1223,10 @@ WaitEventAdjustKqueue(WaitEventSet *set, WaitEvent *event, int old_events)
 									 event);
 	}
 
-	Assert(count > 0);
+	/* For WL_SOCKET_READ -> WL_SOCKET_CLOSED, no change needed. */
+	if (count == 0)
+		return;
+
 	Assert(count <= 2);
 
 	rc = kevent(set->kqueue_fd, &k_ev[0], count, NULL, 0, NULL);
@@ -1525,7 +1541,9 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
 				returned_events++;
 			}
 		}
-		else if (cur_event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE))
+		else if (cur_event->events & (WL_SOCKET_READABLE |
+									  WL_SOCKET_WRITEABLE |
+									  WL_SOCKET_CLOSED))
 		{
 			Assert(cur_event->fd != PGINVALID_SOCKET);
 
@@ -1543,6 +1561,13 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
 				occurred_events->events |= WL_SOCKET_WRITEABLE;
 			}
 
+			if ((cur_event->events & WL_SOCKET_CLOSED) &&
+				(cur_epoll_event->events & (EPOLLRDHUP | EPOLLERR | EPOLLHUP)))
+			{
+				/* remote peer shut down, or error */
+				occurred_events->events |= WL_SOCKET_CLOSED;
+			}
+
 			if (occurred_events->events != 0)
 			{
 				occurred_events->fd = cur_event->fd;
@@ -1668,7 +1693,9 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
 			occurred_events++;
 			returned_events++;
 		}
-		else if (cur_event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE))
+		else if (cur_event->events & (WL_SOCKET_READABLE |
+									  WL_SOCKET_WRITEABLE |
+									  WL_SOCKET_CLOSED))
 		{
 			Assert(cur_event->fd >= 0);
 
@@ -1679,6 +1706,14 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
 				occurred_events->events |= WL_SOCKET_READABLE;
 			}
 
+			if ((cur_event->events & WL_SOCKET_CLOSED) &&
+				(cur_kqueue_event->filter == EVFILT_READ) &&
+				(cur_kqueue_event->flags & EV_EOF))
+			{
+				/* the remote peer has shut down */
+				occurred_events->events |= WL_SOCKET_CLOSED;
+			}
+
 			if ((cur_event->events & WL_SOCKET_WRITEABLE) &&
 				(cur_kqueue_event->filter == EVFILT_WRITE))
 			{
@@ -1789,7 +1824,9 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
 				returned_events++;
 			}
 		}
-		else if (cur_event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE))
+		else if (cur_event->events & (WL_SOCKET_READABLE |
+									  WL_SOCKET_WRITEABLE |
+									  WL_SOCKET_CLOSED))
 		{
 			int			errflags = POLLHUP | POLLERR | POLLNVAL;
 
@@ -1809,6 +1846,15 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
 				occurred_events->events |= WL_SOCKET_WRITEABLE;
 			}
 
+#ifdef POLLRDHUP
+			if ((cur_event->events & WL_SOCKET_CLOSED) &&
+				(cur_pollfd->revents & (POLLRDHUP | errflags)))
+			{
+				/* remote peer closed, or error */
+				occurred_events->events |= WL_SOCKET_CLOSED;
+			}
+#endif
+
 			if (occurred_events->events != 0)
 			{
 				occurred_events->fd = cur_event->fd;
@@ -2015,6 +2061,21 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
 }
 #endif
 
+/*
+ * Return whether the current build options can report WL_SOCKET_CLOSED.
+ */
+bool
+WaitEventSetCanReportClosed(void)
+{
+#if (defined(WAIT_USE_POLL) && defined(POLLRDHUP)) || \
+	defined(WAIT_USE_EPOLL) || \
+	defined(WAIT_USE_KQUEUE)
+	return true;
+#else
+	return false;
+#endif
+}
+
 /*
  * Get the number of wait events registered in a given WaitEventSet.
  */
diff --git a/src/include/storage/latch.h b/src/include/storage/latch.h
index 44f9368c64..d78ff0bede 100644
--- a/src/include/storage/latch.h
+++ b/src/include/storage/latch.h
@@ -134,10 +134,11 @@ typedef struct Latch
 /* avoid having to deal with case on platforms not requiring it */
 #define WL_SOCKET_CONNECTED  WL_SOCKET_WRITEABLE
 #endif
-
+#define WL_SOCKET_CLOSED 	 (1 << 7)
 #define WL_SOCKET_MASK		(WL_SOCKET_READABLE | \
 							 WL_SOCKET_WRITEABLE | \
-							 WL_SOCKET_CONNECTED)
+							 WL_SOCKET_CONNECTED | \
+							 WL_SOCKET_CLOSED)
 
 typedef struct WaitEvent
 {
@@ -180,5 +181,6 @@ extern int	WaitLatchOrSocket(Latch *latch, int wakeEvents,
 							  pgsocket sock, long timeout, uint32 wait_event_info);
 extern void InitializeLatchWaitSet(void);
 extern int	GetNumRegisteredWaitEvents(WaitEventSet *set);
+extern bool	WaitEventSetCanReportClosed(void);
 
 #endif							/* LATCH_H */
-- 
2.30.2

From 4edb07dbe87911de2ef0c30e57b766fcad022dff Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.mu...@gmail.com>
Date: Mon, 13 Dec 2021 14:31:18 +1300
Subject: [PATCH v4 2/3] Don't let latches hide other WaitEventSet events.

Previously, if WaitEventSet() saw that a latch had been set in memory,
it would return an event immediately, hiding any other events that might
also be pending in the kernel.

With this change, latches still have higher priority than other events,
but if you ask for 3 events, you'll get up to 3 events that are
currently pending, even if the latch is set.  This makes no difference
to any existing caller, because they all wait for just one event, so
they never reach the slow enter-the-kernel case.

Discussion: https://postgr.es/m/77def86b27e41f0efcba411460e929ae%40postgrespro.ru
---
 src/backend/storage/ipc/latch.c | 43 +++++++++++++++++++++++----------
 1 file changed, 30 insertions(+), 13 deletions(-)

diff --git a/src/backend/storage/ipc/latch.c b/src/backend/storage/ipc/latch.c
index 54e928c564..30f7160c61 100644
--- a/src/backend/storage/ipc/latch.c
+++ b/src/backend/storage/ipc/latch.c
@@ -1354,9 +1354,9 @@ WaitEventSetWait(WaitEventSet *set, long timeout,
 		int			rc;
 
 		/*
-		 * Check if the latch is set already. If so, leave the loop
-		 * immediately, avoid blocking again. We don't attempt to report any
-		 * other events that might also be satisfied.
+		 * Check if the latch is set already first.  We don't have to enter the
+		 * kernel if it's already set, unless the caller wants more than one
+		 * event.
 		 *
 		 * If someone sets the latch between this and the
 		 * WaitEventSetWaitBlock() below, the setter will write a byte to the
@@ -1401,7 +1401,26 @@ WaitEventSetWait(WaitEventSet *set, long timeout,
 			/* could have been set above */
 			set->latch->maybe_sleeping = false;
 
-			break;
+			if (returned_events == nevents)
+			{
+				/*
+				 * Already fully satisfied by this latch event, so there's no
+				 * need to enter the kernel.
+				 */
+				break;
+			}
+			else
+			{
+				/*
+				 * The caller wants to poll other events too.  Set timeout to
+				 * zero, because we already have an event to report to the
+				 * caller.  WaitEventSetWaitBlock() won't double-report the
+				 * latch, because we cleared latch->maybe_sleeping.
+				 */
+				Assert(returned_events == 1);
+				Assert(nevents > 1);
+				cur_timeout = 0;
+			}
 		}
 
 		/*
@@ -1410,18 +1429,16 @@ WaitEventSetWait(WaitEventSet *set, long timeout,
 		 * to retry, everything >= 1 is the number of returned events.
 		 */
 		rc = WaitEventSetWaitBlock(set, cur_timeout,
-								   occurred_events, nevents);
+								   occurred_events,
+								   nevents - returned_events);
 
 		if (set->latch)
-		{
-			Assert(set->latch->maybe_sleeping);
 			set->latch->maybe_sleeping = false;
-		}
 
 		if (rc == -1)
 			break;				/* timeout occurred */
 		else
-			returned_events = rc;
+			returned_events += rc;
 
 		/* If we're not done, update cur_timeout for next iteration */
 		if (returned_events == 0 && timeout >= 0)
@@ -1509,7 +1526,7 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
 			/* Drain the signalfd. */
 			drain();
 
-			if (set->latch && set->latch->is_set)
+			if (set->latch && set->latch->maybe_sleeping && set->latch->is_set)
 			{
 				occurred_events->fd = PGINVALID_SOCKET;
 				occurred_events->events = WL_LATCH_SET;
@@ -1667,7 +1684,7 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
 		if (cur_event->events == WL_LATCH_SET &&
 			cur_kqueue_event->filter == EVFILT_SIGNAL)
 		{
-			if (set->latch && set->latch->is_set)
+			if (set->latch && set->latch->maybe_sleeping && set->latch->is_set)
 			{
 				occurred_events->fd = PGINVALID_SOCKET;
 				occurred_events->events = WL_LATCH_SET;
@@ -1792,7 +1809,7 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
 			/* There's data in the self-pipe, clear it. */
 			drain();
 
-			if (set->latch && set->latch->is_set)
+			if (set->latch && set->latch->maybe_sleeping && set->latch->is_set)
 			{
 				occurred_events->fd = PGINVALID_SOCKET;
 				occurred_events->events = WL_LATCH_SET;
@@ -1974,7 +1991,7 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
 		if (!ResetEvent(set->handles[cur_event->pos + 1]))
 			elog(ERROR, "ResetEvent failed: error code %lu", GetLastError());
 
-		if (set->latch && set->latch->is_set)
+		if (set->latch && set->latch->maybe_sleeping && set->latch->is_set)
 		{
 			occurred_events->fd = PGINVALID_SOCKET;
 			occurred_events->events = WL_LATCH_SET;
-- 
2.30.2

From 6920b741edc3e9bb5f0a573941fbc055b8d7a434 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.mu...@gmail.com>
Date: Fri, 30 Apr 2021 10:48:32 +1200
Subject: [PATCH v4 3/3] Use WL_SOCKET_CLOSED for
 client_connection_check_interval.

Previously we used poll() directly to check for a POLLRDHUP event.
Instead, use the WaitEventSet API to poll the socket for
WL_SOCKET_CLOSED, which knows how to detect this condition on many more
operating systems.

Reviewed-by: Zhihong Yu <z...@yugabyte.com>
Reviewed-by: Maksim Milyutin <milyuti...@gmail.com>
Reviewed-by: Andres Freund <and...@anarazel.de>
Discussion: https://postgr.es/m/77def86b27e41f0efcba411460e929ae%40postgrespro.ru
---
 doc/src/sgml/config.sgml     |  6 +++---
 src/backend/libpq/pqcomm.c   | 33 +++++++++++++++------------------
 src/backend/utils/misc/guc.c |  7 ++-----
 src/include/libpq/libpq.h    |  1 +
 4 files changed, 21 insertions(+), 26 deletions(-)

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 4ac617615c..ab52d4bd2c 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -1012,9 +1012,9 @@ include_dir 'conf.d'
         the kernel reports that the connection is closed.
        </para>
        <para>
-        This option is currently available only on systems that support the
-        non-standard <symbol>POLLRDHUP</symbol> extension to the
-        <symbol>poll</symbol> system call, including Linux.
+        This option relies on kernel events exposed by Linux, macOS, illumos
+        and the BSD family of operating systems, and is not currently available
+        on other systems.
        </para>
        <para>
         If the value is specified without units, it is taken as milliseconds.
diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c
index 4c37df09cf..d99ab09973 100644
--- a/src/backend/libpq/pqcomm.c
+++ b/src/backend/libpq/pqcomm.c
@@ -204,7 +204,7 @@ pq_init(void)
 				(errmsg("could not set socket to nonblocking mode: %m")));
 #endif
 
-	FeBeWaitSet = CreateWaitEventSet(TopMemoryContext, 3);
+	FeBeWaitSet = CreateWaitEventSet(TopMemoryContext, FeBeWaitSetNEvents);
 	socket_pos = AddWaitEventToSet(FeBeWaitSet, WL_SOCKET_WRITEABLE,
 								   MyProcPort->sock, NULL, NULL);
 	latch_pos = AddWaitEventToSet(FeBeWaitSet, WL_LATCH_SET, PGINVALID_SOCKET,
@@ -1959,33 +1959,30 @@ pq_settcpusertimeout(int timeout, Port *port)
 bool
 pq_check_connection(void)
 {
-#if defined(POLLRDHUP)
-	/*
-	 * POLLRDHUP is a Linux extension to poll(2) to detect sockets closed by
-	 * the other end.  We don't have a portable way to do that without
-	 * actually trying to read or write data on other systems.  We don't want
-	 * to read because that would be confused by pipelined queries and COPY
-	 * data. Perhaps in future we'll try to write a heartbeat message instead.
-	 */
-	struct pollfd pollfd;
+	WaitEvent	events[FeBeWaitSetNEvents];
 	int			rc;
 
-	pollfd.fd = MyProcPort->sock;
-	pollfd.events = POLLOUT | POLLIN | POLLRDHUP;
-	pollfd.revents = 0;
+	/*
+	 * It's OK to modify the socket event filter without restoring, because all
+	 * socket wait sites do the same.
+	 */
+	ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, WL_SOCKET_CLOSED, NULL);
 
-	rc = poll(&pollfd, 1, 0);
+	rc = WaitEventSetWait(FeBeWaitSet, 0, events, lengthof(events), 0);
 
 	if (rc < 0)
 	{
 		ereport(COMMERROR,
 				(errcode_for_socket_access(),
-				 errmsg("could not poll socket: %m")));
+				 errmsg("could not check for closed socket: %m")));
 		return false;
 	}
-	else if (rc == 1 && (pollfd.revents & (POLLHUP | POLLRDHUP)))
-		return false;
-#endif
+
+	for (int i = 0; i < rc; ++i)
+	{
+		if (events[i].events & WL_SOCKET_CLOSED)
+			return false;
+	}
 
 	return true;
 }
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index ee6a838b3a..ca45b36754 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -12169,14 +12169,11 @@ check_huge_page_size(int *newval, void **extra, GucSource source)
 static bool
 check_client_connection_check_interval(int *newval, void **extra, GucSource source)
 {
-#ifndef POLLRDHUP
-	/* Linux only, for now.  See pq_check_connection(). */
-	if (*newval != 0)
+	if (!WaitEventSetCanReportClosed() && *newval != 0)
 	{
-		GUC_check_errdetail("client_connection_check_interval must be set to 0 on platforms that lack POLLRDHUP.");
+		GUC_check_errdetail("client_connection_check_interval must be set to 0 on this platform");
 		return false;
 	}
-#endif
 	return true;
 }
 
diff --git a/src/include/libpq/libpq.h b/src/include/libpq/libpq.h
index 6b67a2a318..9f168a284f 100644
--- a/src/include/libpq/libpq.h
+++ b/src/include/libpq/libpq.h
@@ -62,6 +62,7 @@ extern WaitEventSet *FeBeWaitSet;
 
 #define FeBeWaitSetSocketPos 0
 #define FeBeWaitSetLatchPos 1
+#define FeBeWaitSetNEvents 3
 
 extern int	StreamServerPort(int family, const char *hostName,
 							 unsigned short portNumber, const char *unixSocketDir,
-- 
2.30.2

Reply via email to