On Fri, Jan 14, 2022 at 4:35 PM Andres Freund <and...@anarazel.de> wrote:
> The more I think about it, the less I see why we *ever* need to re-arm the
> latch in pq_check_connection() in this approach. pq_check_connection() is only
> used from from ProcessInterrupts(), and there's plenty things inside
> ProcessInterrupts() that can cause latches to be reset (e.g. parallel message
> processing causing log messages to be sent to the client, causing network IO,
> which obviously can do a latch reset).

Thanks for the detailed explanation.  I guess I was being overly
cautious and a little myopic, "leave things exactly the way you found
them", so I didn't have to think about any of that.  I see now that
the scenario I was worrying about would be a bug in whatever
latch-wait loop happens to reach this code.  Alright then, here is
just... one... more... patch, this time consuming any latch that gets
in the way and retrying, with no restore.
From d0399282e73ebd47dbead19b56586fff8ba3e9d2 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.mu...@gmail.com>
Date: Fri, 30 Apr 2021 10:38:40 +1200
Subject: [PATCH v7 1/2] Add WL_SOCKET_CLOSED for socket shutdown events.

Provide a way for WaitEventSet to report that the remote peer has shut
down its socket, independently of whether there is any buffered data
remaining to be read.  This works only on systems where the kernel
exposes that information, namely:

* WAIT_USE_POLL builds using POLLRDHUP, if available
* WAIT_USE_EPOLL builds using EPOLLRDHUP
* WAIT_USE_KQUEUE builds using EV_EOF

Reviewed-by: Zhihong Yu <z...@yugabyte.com>
Reviewed-by: Maksim Milyutin <milyuti...@gmail.com>
Discussion: https://postgr.es/m/77def86b27e41f0efcba411460e929ae%40postgrespro.ru
---
 src/backend/storage/ipc/latch.c | 79 +++++++++++++++++++++++++++++----
 src/include/storage/latch.h     |  6 ++-
 2 files changed, 74 insertions(+), 11 deletions(-)

diff --git a/src/backend/storage/ipc/latch.c b/src/backend/storage/ipc/latch.c
index 61c876beff..9a498b0f12 100644
--- a/src/backend/storage/ipc/latch.c
+++ b/src/backend/storage/ipc/latch.c
@@ -841,6 +841,7 @@ FreeWaitEventSet(WaitEventSet *set)
  * - WL_SOCKET_CONNECTED: Wait for socket connection to be established,
  *	 can be combined with other WL_SOCKET_* events (on non-Windows
  *	 platforms, this is the same as WL_SOCKET_WRITEABLE)
+ * - WL_SOCKET_CLOSED: Wait for socket to be closed by remote peer.
  * - WL_EXIT_ON_PM_DEATH: Exit immediately if the postmaster dies
  *
  * Returns the offset in WaitEventSet->events (starting from 0), which can be
@@ -1043,12 +1044,16 @@ WaitEventAdjustEpoll(WaitEventSet *set, WaitEvent *event, int action)
 	else
 	{
 		Assert(event->fd != PGINVALID_SOCKET);
-		Assert(event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE));
+		Assert(event->events & (WL_SOCKET_READABLE |
+								WL_SOCKET_WRITEABLE |
+								WL_SOCKET_CLOSED));
 
 		if (event->events & WL_SOCKET_READABLE)
 			epoll_ev.events |= EPOLLIN;
 		if (event->events & WL_SOCKET_WRITEABLE)
 			epoll_ev.events |= EPOLLOUT;
+		if (event->events & WL_SOCKET_CLOSED)
+			epoll_ev.events |= EPOLLRDHUP;
 	}
 
 	/*
@@ -1087,12 +1092,18 @@ WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event)
 	}
 	else
 	{
-		Assert(event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE));
+		Assert(event->events & (WL_SOCKET_READABLE |
+								WL_SOCKET_WRITEABLE |
+								WL_SOCKET_CLOSED));
 		pollfd->events = 0;
 		if (event->events & WL_SOCKET_READABLE)
 			pollfd->events |= POLLIN;
 		if (event->events & WL_SOCKET_WRITEABLE)
 			pollfd->events |= POLLOUT;
+#ifdef POLLRDHUP
+		if (event->events & WL_SOCKET_CLOSED)
+			pollfd->events |= POLLRDHUP;
+#endif
 	}
 
 	Assert(event->fd != PGINVALID_SOCKET);
@@ -1165,7 +1176,9 @@ WaitEventAdjustKqueue(WaitEventSet *set, WaitEvent *event, int old_events)
 	Assert(event->events != WL_LATCH_SET || set->latch != NULL);
 	Assert(event->events == WL_LATCH_SET ||
 		   event->events == WL_POSTMASTER_DEATH ||
-		   (event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)));
+		   (event->events & (WL_SOCKET_READABLE |
+							 WL_SOCKET_WRITEABLE |
+							 WL_SOCKET_CLOSED)));
 
 	if (event->events == WL_POSTMASTER_DEATH)
 	{
@@ -1188,9 +1201,9 @@ WaitEventAdjustKqueue(WaitEventSet *set, WaitEvent *event, int old_events)
 		 * old event mask to the new event mask, since kevent treats readable
 		 * and writable as separate events.
 		 */
-		if (old_events & WL_SOCKET_READABLE)
+		if (old_events & (WL_SOCKET_READABLE | WL_SOCKET_CLOSED))
 			old_filt_read = true;
-		if (event->events & WL_SOCKET_READABLE)
+		if (event->events & (WL_SOCKET_READABLE | WL_SOCKET_CLOSED))
 			new_filt_read = true;
 		if (old_events & WL_SOCKET_WRITEABLE)
 			old_filt_write = true;
@@ -1210,7 +1223,10 @@ WaitEventAdjustKqueue(WaitEventSet *set, WaitEvent *event, int old_events)
 									 event);
 	}
 
-	Assert(count > 0);
+	/* For WL_SOCKET_READ -> WL_SOCKET_CLOSED, no change needed. */
+	if (count == 0)
+		return;
+
 	Assert(count <= 2);
 
 	rc = kevent(set->kqueue_fd, &k_ev[0], count, NULL, 0, NULL);
@@ -1525,7 +1541,9 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
 				returned_events++;
 			}
 		}
-		else if (cur_event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE))
+		else if (cur_event->events & (WL_SOCKET_READABLE |
+									  WL_SOCKET_WRITEABLE |
+									  WL_SOCKET_CLOSED))
 		{
 			Assert(cur_event->fd != PGINVALID_SOCKET);
 
@@ -1543,6 +1561,13 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
 				occurred_events->events |= WL_SOCKET_WRITEABLE;
 			}
 
+			if ((cur_event->events & WL_SOCKET_CLOSED) &&
+				(cur_epoll_event->events & (EPOLLRDHUP | EPOLLERR | EPOLLHUP)))
+			{
+				/* remote peer shut down, or error */
+				occurred_events->events |= WL_SOCKET_CLOSED;
+			}
+
 			if (occurred_events->events != 0)
 			{
 				occurred_events->fd = cur_event->fd;
@@ -1668,7 +1693,9 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
 			occurred_events++;
 			returned_events++;
 		}
-		else if (cur_event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE))
+		else if (cur_event->events & (WL_SOCKET_READABLE |
+									  WL_SOCKET_WRITEABLE |
+									  WL_SOCKET_CLOSED))
 		{
 			Assert(cur_event->fd >= 0);
 
@@ -1679,6 +1706,14 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
 				occurred_events->events |= WL_SOCKET_READABLE;
 			}
 
+			if ((cur_event->events & WL_SOCKET_CLOSED) &&
+				(cur_kqueue_event->filter == EVFILT_READ) &&
+				(cur_kqueue_event->flags & EV_EOF))
+			{
+				/* the remote peer has shut down */
+				occurred_events->events |= WL_SOCKET_CLOSED;
+			}
+
 			if ((cur_event->events & WL_SOCKET_WRITEABLE) &&
 				(cur_kqueue_event->filter == EVFILT_WRITE))
 			{
@@ -1789,7 +1824,9 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
 				returned_events++;
 			}
 		}
-		else if (cur_event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE))
+		else if (cur_event->events & (WL_SOCKET_READABLE |
+									  WL_SOCKET_WRITEABLE |
+									  WL_SOCKET_CLOSED))
 		{
 			int			errflags = POLLHUP | POLLERR | POLLNVAL;
 
@@ -1809,6 +1846,15 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
 				occurred_events->events |= WL_SOCKET_WRITEABLE;
 			}
 
+#ifdef POLLRDHUP
+			if ((cur_event->events & WL_SOCKET_CLOSED) &&
+				(cur_pollfd->revents & (POLLRDHUP | errflags)))
+			{
+				/* remote peer closed, or error */
+				occurred_events->events |= WL_SOCKET_CLOSED;
+			}
+#endif
+
 			if (occurred_events->events != 0)
 			{
 				occurred_events->fd = cur_event->fd;
@@ -2015,6 +2061,21 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
 }
 #endif
 
+/*
+ * Return whether the current build options can report WL_SOCKET_CLOSED.
+ */
+bool
+WaitEventSetCanReportClosed(void)
+{
+#if (defined(WAIT_USE_POLL) && defined(POLLRDHUP)) || \
+	defined(WAIT_USE_EPOLL) || \
+	defined(WAIT_USE_KQUEUE)
+	return true;
+#else
+	return false;
+#endif
+}
+
 /*
  * Get the number of wait events registered in a given WaitEventSet.
  */
diff --git a/src/include/storage/latch.h b/src/include/storage/latch.h
index 3aa7b33834..0dd79d73fa 100644
--- a/src/include/storage/latch.h
+++ b/src/include/storage/latch.h
@@ -134,10 +134,11 @@ typedef struct Latch
 /* avoid having to deal with case on platforms not requiring it */
 #define WL_SOCKET_CONNECTED  WL_SOCKET_WRITEABLE
 #endif
-
+#define WL_SOCKET_CLOSED 	 (1 << 7)
 #define WL_SOCKET_MASK		(WL_SOCKET_READABLE | \
 							 WL_SOCKET_WRITEABLE | \
-							 WL_SOCKET_CONNECTED)
+							 WL_SOCKET_CONNECTED | \
+							 WL_SOCKET_CLOSED)
 
 typedef struct WaitEvent
 {
@@ -180,5 +181,6 @@ extern int	WaitLatchOrSocket(Latch *latch, int wakeEvents,
 							  pgsocket sock, long timeout, uint32 wait_event_info);
 extern void InitializeLatchWaitSet(void);
 extern int	GetNumRegisteredWaitEvents(WaitEventSet *set);
+extern bool	WaitEventSetCanReportClosed(void);
 
 #endif							/* LATCH_H */
-- 
2.33.1

From d0d472018c1c42a1c4a5233ca18718bc90a82448 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.mu...@gmail.com>
Date: Fri, 30 Apr 2021 10:48:32 +1200
Subject: [PATCH v7 2/2] Use WL_SOCKET_CLOSED for
 client_connection_check_interval.

Previously we used poll() directly to check for a POLLRDHUP event.
Instead, use the WaitEventSet API to poll the socket for
WL_SOCKET_CLOSED, which knows how to detect this condition on many more
operating systems.

Reviewed-by: Zhihong Yu <z...@yugabyte.com>
Reviewed-by: Maksim Milyutin <milyuti...@gmail.com>
Reviewed-by: Andres Freund <and...@anarazel.de>
Discussion: https://postgr.es/m/77def86b27e41f0efcba411460e929ae%40postgrespro.ru
---
 doc/src/sgml/config.sgml     |  6 ++---
 src/backend/libpq/pqcomm.c   | 46 ++++++++++++++++++------------------
 src/backend/utils/misc/guc.c |  7 ++----
 src/include/libpq/libpq.h    |  1 +
 4 files changed, 29 insertions(+), 31 deletions(-)

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index afbb6c35e3..4a6a869460 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -1012,9 +1012,9 @@ include_dir 'conf.d'
         the kernel reports that the connection is closed.
        </para>
        <para>
-        This option is currently available only on systems that support the
-        non-standard <symbol>POLLRDHUP</symbol> extension to the
-        <symbol>poll</symbol> system call, including Linux.
+        This option relies on kernel events exposed by Linux, macOS, illumos
+        and the BSD family of operating systems, and is not currently available
+        on other systems.
        </para>
        <para>
         If the value is specified without units, it is taken as milliseconds.
diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c
index f05723dc92..1c6e096500 100644
--- a/src/backend/libpq/pqcomm.c
+++ b/src/backend/libpq/pqcomm.c
@@ -204,7 +204,7 @@ pq_init(void)
 				(errmsg("could not set socket to nonblocking mode: %m")));
 #endif
 
-	FeBeWaitSet = CreateWaitEventSet(TopMemoryContext, 3);
+	FeBeWaitSet = CreateWaitEventSet(TopMemoryContext, FeBeWaitSetNEvents);
 	socket_pos = AddWaitEventToSet(FeBeWaitSet, WL_SOCKET_WRITEABLE,
 								   MyProcPort->sock, NULL, NULL);
 	latch_pos = AddWaitEventToSet(FeBeWaitSet, WL_LATCH_SET, PGINVALID_SOCKET,
@@ -1959,33 +1959,33 @@ pq_settcpusertimeout(int timeout, Port *port)
 bool
 pq_check_connection(void)
 {
-#if defined(POLLRDHUP)
-	/*
-	 * POLLRDHUP is a Linux extension to poll(2) to detect sockets closed by
-	 * the other end.  We don't have a portable way to do that without
-	 * actually trying to read or write data on other systems.  We don't want
-	 * to read because that would be confused by pipelined queries and COPY
-	 * data. Perhaps in future we'll try to write a heartbeat message instead.
-	 */
-	struct pollfd pollfd;
+	WaitEvent	events[FeBeWaitSetNEvents];
 	int			rc;
 
-	pollfd.fd = MyProcPort->sock;
-	pollfd.events = POLLOUT | POLLIN | POLLRDHUP;
-	pollfd.revents = 0;
-
-	rc = poll(&pollfd, 1, 0);
+	/*
+	 * It's OK to modify the socket event filter without restoring, because
+	 * all FeBeWaitSet socket wait sites do the same.
+	 */
+	ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, WL_SOCKET_CLOSED, NULL);
 
-	if (rc < 0)
+retry:
+	rc = WaitEventSetWait(FeBeWaitSet, 0, events, lengthof(events), 0);
+	for (int i = 0; i < rc; ++i)
 	{
-		ereport(COMMERROR,
-				(errcode_for_socket_access(),
-				 errmsg("could not poll socket: %m")));
-		return false;
+		if (events[i].events & WL_SOCKET_CLOSED)
+			return false;
+		if (events[i].events & WL_LATCH_SET)
+		{
+			/*
+			 * A latch event might be preventing other events from being
+			 * reported.  Reset it and poll again.  No need to restore it
+			 * because no code should expect latches to survive across
+			 * CHECK_FOR_INTERRUPTS().
+			 */
+			 ResetLatch(MyLatch);
+			 goto retry;
+		}
 	}
-	else if (rc == 1 && (pollfd.revents & (POLLHUP | POLLRDHUP)))
-		return false;
-#endif
 
 	return true;
 }
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 6fc5cbc09a..21c9c04594 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -12129,14 +12129,11 @@ check_huge_page_size(int *newval, void **extra, GucSource source)
 static bool
 check_client_connection_check_interval(int *newval, void **extra, GucSource source)
 {
-#ifndef POLLRDHUP
-	/* Linux only, for now.  See pq_check_connection(). */
-	if (*newval != 0)
+	if (!WaitEventSetCanReportClosed() && *newval != 0)
 	{
-		GUC_check_errdetail("client_connection_check_interval must be set to 0 on platforms that lack POLLRDHUP.");
+		GUC_check_errdetail("client_connection_check_interval must be set to 0 on this platform");
 		return false;
 	}
-#endif
 	return true;
 }
 
diff --git a/src/include/libpq/libpq.h b/src/include/libpq/libpq.h
index f0786e08b4..d348a55812 100644
--- a/src/include/libpq/libpq.h
+++ b/src/include/libpq/libpq.h
@@ -62,6 +62,7 @@ extern WaitEventSet *FeBeWaitSet;
 
 #define FeBeWaitSetSocketPos 0
 #define FeBeWaitSetLatchPos 1
+#define FeBeWaitSetNEvents 3
 
 extern int	StreamServerPort(int family, const char *hostName,
 							 unsigned short portNumber, const char *unixSocketDir,
-- 
2.33.1

Reply via email to