On Tue, Dec 14, 2021 at 11:50 PM Thomas Munro <thomas.mu...@gmail.com> wrote: > On Tue, Dec 14, 2021 at 11:18 AM Thomas Munro <thomas.mu...@gmail.com> wrote: > > Well, I was trying to avoid bikeshedding an API change just for a > > hypothetical problem we could solve when the time comes (say, after > > fixing the more egregious problems with Append's WES usage), but here > > goes: we could do something like AddWaitEventToSet(FeBeWaitSet, > > WL_LATCH_SET_LOPRIO, ...) that is translated to WL_LATCH_SET > > internally but also sets a flag to enable this > > no-really-please-poll-all-the-things-if-there-is-space behaviour.
That API is probably useless for anything else and is just too complicated for what it's doing here. I considered another idea we discussed: if we see a latch event, clear it and try again so that other events can be revealed (rince and repeat), but remember if that happens and set the latch at the end. I think that still requires PG_FINALLY() if you want to guarantee not to eat a latch event if WaitEventSetWait() throws. This may be a theoretical point because things must be pretty broken if WaitEventSetWait() is throwing, but I don't like an egregious lack of exception safety on principle. So I think I had it better in the beginning: just mute the latch, and then unmute it at the end in a PG_FINALLY() block. I'm back to proposing that short and sweet version, this time with some minor cleanup.
From 00d5c758588c6d0c23736898e5dc6aedc79c9315 Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Fri, 30 Apr 2021 10:38:40 +1200 Subject: [PATCH v6 1/2] Add WL_SOCKET_CLOSED for socket shutdown events. Provide a way for WaitEventSet to report that the remote peer has shut down its socket, independently of whether there is any buffered data remaining to be read. This works only on systems where the kernel exposes that information, namely: * WAIT_USE_POLL builds using POLLRDHUP, if available * WAIT_USE_EPOLL builds using EPOLLRDHUP * WAIT_USE_KQUEUE builds using EV_EOF Reviewed-by: Zhihong Yu <z...@yugabyte.com> Reviewed-by: Maksim Milyutin <milyuti...@gmail.com> Discussion: https://postgr.es/m/77def86b27e41f0efcba411460e929ae%40postgrespro.ru --- src/backend/storage/ipc/latch.c | 79 +++++++++++++++++++++++++++++---- src/include/storage/latch.h | 6 ++- 2 files changed, 74 insertions(+), 11 deletions(-) diff --git a/src/backend/storage/ipc/latch.c b/src/backend/storage/ipc/latch.c index 1d893cf863..54e928c564 100644 --- a/src/backend/storage/ipc/latch.c +++ b/src/backend/storage/ipc/latch.c @@ -841,6 +841,7 @@ FreeWaitEventSet(WaitEventSet *set) * - WL_SOCKET_CONNECTED: Wait for socket connection to be established, * can be combined with other WL_SOCKET_* events (on non-Windows * platforms, this is the same as WL_SOCKET_WRITEABLE) + * - WL_SOCKET_CLOSED: Wait for socket to be closed by remote peer. * - WL_EXIT_ON_PM_DEATH: Exit immediately if the postmaster dies * * Returns the offset in WaitEventSet->events (starting from 0), which can be @@ -1043,12 +1044,16 @@ WaitEventAdjustEpoll(WaitEventSet *set, WaitEvent *event, int action) else { Assert(event->fd != PGINVALID_SOCKET); - Assert(event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)); + Assert(event->events & (WL_SOCKET_READABLE | + WL_SOCKET_WRITEABLE | + WL_SOCKET_CLOSED)); if (event->events & WL_SOCKET_READABLE) epoll_ev.events |= EPOLLIN; if (event->events & WL_SOCKET_WRITEABLE) epoll_ev.events |= EPOLLOUT; + if (event->events & WL_SOCKET_CLOSED) + epoll_ev.events |= EPOLLRDHUP; } /* @@ -1087,12 +1092,18 @@ WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event) } else { - Assert(event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)); + Assert(event->events & (WL_SOCKET_READABLE | + WL_SOCKET_WRITEABLE | + WL_SOCKET_CLOSED)); pollfd->events = 0; if (event->events & WL_SOCKET_READABLE) pollfd->events |= POLLIN; if (event->events & WL_SOCKET_WRITEABLE) pollfd->events |= POLLOUT; +#ifdef POLLRDHUP + if (event->events & WL_SOCKET_CLOSED) + pollfd->events |= POLLRDHUP; +#endif } Assert(event->fd != PGINVALID_SOCKET); @@ -1165,7 +1176,9 @@ WaitEventAdjustKqueue(WaitEventSet *set, WaitEvent *event, int old_events) Assert(event->events != WL_LATCH_SET || set->latch != NULL); Assert(event->events == WL_LATCH_SET || event->events == WL_POSTMASTER_DEATH || - (event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE))); + (event->events & (WL_SOCKET_READABLE | + WL_SOCKET_WRITEABLE | + WL_SOCKET_CLOSED))); if (event->events == WL_POSTMASTER_DEATH) { @@ -1188,9 +1201,9 @@ WaitEventAdjustKqueue(WaitEventSet *set, WaitEvent *event, int old_events) * old event mask to the new event mask, since kevent treats readable * and writable as separate events. */ - if (old_events & WL_SOCKET_READABLE) + if (old_events & (WL_SOCKET_READABLE | WL_SOCKET_CLOSED)) old_filt_read = true; - if (event->events & WL_SOCKET_READABLE) + if (event->events & (WL_SOCKET_READABLE | WL_SOCKET_CLOSED)) new_filt_read = true; if (old_events & WL_SOCKET_WRITEABLE) old_filt_write = true; @@ -1210,7 +1223,10 @@ WaitEventAdjustKqueue(WaitEventSet *set, WaitEvent *event, int old_events) event); } - Assert(count > 0); + /* For WL_SOCKET_READ -> WL_SOCKET_CLOSED, no change needed. */ + if (count == 0) + return; + Assert(count <= 2); rc = kevent(set->kqueue_fd, &k_ev[0], count, NULL, 0, NULL); @@ -1525,7 +1541,9 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, returned_events++; } } - else if (cur_event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) + else if (cur_event->events & (WL_SOCKET_READABLE | + WL_SOCKET_WRITEABLE | + WL_SOCKET_CLOSED)) { Assert(cur_event->fd != PGINVALID_SOCKET); @@ -1543,6 +1561,13 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, occurred_events->events |= WL_SOCKET_WRITEABLE; } + if ((cur_event->events & WL_SOCKET_CLOSED) && + (cur_epoll_event->events & (EPOLLRDHUP | EPOLLERR | EPOLLHUP))) + { + /* remote peer shut down, or error */ + occurred_events->events |= WL_SOCKET_CLOSED; + } + if (occurred_events->events != 0) { occurred_events->fd = cur_event->fd; @@ -1668,7 +1693,9 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, occurred_events++; returned_events++; } - else if (cur_event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) + else if (cur_event->events & (WL_SOCKET_READABLE | + WL_SOCKET_WRITEABLE | + WL_SOCKET_CLOSED)) { Assert(cur_event->fd >= 0); @@ -1679,6 +1706,14 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, occurred_events->events |= WL_SOCKET_READABLE; } + if ((cur_event->events & WL_SOCKET_CLOSED) && + (cur_kqueue_event->filter == EVFILT_READ) && + (cur_kqueue_event->flags & EV_EOF)) + { + /* the remote peer has shut down */ + occurred_events->events |= WL_SOCKET_CLOSED; + } + if ((cur_event->events & WL_SOCKET_WRITEABLE) && (cur_kqueue_event->filter == EVFILT_WRITE)) { @@ -1789,7 +1824,9 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, returned_events++; } } - else if (cur_event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) + else if (cur_event->events & (WL_SOCKET_READABLE | + WL_SOCKET_WRITEABLE | + WL_SOCKET_CLOSED)) { int errflags = POLLHUP | POLLERR | POLLNVAL; @@ -1809,6 +1846,15 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, occurred_events->events |= WL_SOCKET_WRITEABLE; } +#ifdef POLLRDHUP + if ((cur_event->events & WL_SOCKET_CLOSED) && + (cur_pollfd->revents & (POLLRDHUP | errflags))) + { + /* remote peer closed, or error */ + occurred_events->events |= WL_SOCKET_CLOSED; + } +#endif + if (occurred_events->events != 0) { occurred_events->fd = cur_event->fd; @@ -2015,6 +2061,21 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, } #endif +/* + * Return whether the current build options can report WL_SOCKET_CLOSED. + */ +bool +WaitEventSetCanReportClosed(void) +{ +#if (defined(WAIT_USE_POLL) && defined(POLLRDHUP)) || \ + defined(WAIT_USE_EPOLL) || \ + defined(WAIT_USE_KQUEUE) + return true; +#else + return false; +#endif +} + /* * Get the number of wait events registered in a given WaitEventSet. */ diff --git a/src/include/storage/latch.h b/src/include/storage/latch.h index 44f9368c64..d78ff0bede 100644 --- a/src/include/storage/latch.h +++ b/src/include/storage/latch.h @@ -134,10 +134,11 @@ typedef struct Latch /* avoid having to deal with case on platforms not requiring it */ #define WL_SOCKET_CONNECTED WL_SOCKET_WRITEABLE #endif - +#define WL_SOCKET_CLOSED (1 << 7) #define WL_SOCKET_MASK (WL_SOCKET_READABLE | \ WL_SOCKET_WRITEABLE | \ - WL_SOCKET_CONNECTED) + WL_SOCKET_CONNECTED | \ + WL_SOCKET_CLOSED) typedef struct WaitEvent { @@ -180,5 +181,6 @@ extern int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info); extern void InitializeLatchWaitSet(void); extern int GetNumRegisteredWaitEvents(WaitEventSet *set); +extern bool WaitEventSetCanReportClosed(void); #endif /* LATCH_H */ -- 2.33.1
From 494c949b3a9fe9704bb99689a3dc375fa5156f16 Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Fri, 30 Apr 2021 10:48:32 +1200 Subject: [PATCH v6 2/2] Use WL_SOCKET_CLOSED for client_connection_check_interval. Previously we used poll() directly to check for a POLLRDHUP event. Instead, use the WaitEventSet API to poll the socket for WL_SOCKET_CLOSED, which knows how to detect this condition on many more operating systems. Reviewed-by: Zhihong Yu <z...@yugabyte.com> Reviewed-by: Maksim Milyutin <milyuti...@gmail.com> Reviewed-by: Andres Freund <and...@anarazel.de> Discussion: https://postgr.es/m/77def86b27e41f0efcba411460e929ae%40postgrespro.ru --- doc/src/sgml/config.sgml | 6 ++-- src/backend/libpq/pqcomm.c | 54 +++++++++++++++++++++--------------- src/backend/utils/misc/guc.c | 7 ++--- src/include/libpq/libpq.h | 1 + 4 files changed, 37 insertions(+), 31 deletions(-) diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index afbb6c35e3..4a6a869460 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -1012,9 +1012,9 @@ include_dir 'conf.d' the kernel reports that the connection is closed. </para> <para> - This option is currently available only on systems that support the - non-standard <symbol>POLLRDHUP</symbol> extension to the - <symbol>poll</symbol> system call, including Linux. + This option relies on kernel events exposed by Linux, macOS, illumos + and the BSD family of operating systems, and is not currently available + on other systems. </para> <para> If the value is specified without units, it is taken as milliseconds. diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c index 4c37df09cf..3b701998b7 100644 --- a/src/backend/libpq/pqcomm.c +++ b/src/backend/libpq/pqcomm.c @@ -204,7 +204,7 @@ pq_init(void) (errmsg("could not set socket to nonblocking mode: %m"))); #endif - FeBeWaitSet = CreateWaitEventSet(TopMemoryContext, 3); + FeBeWaitSet = CreateWaitEventSet(TopMemoryContext, FeBeWaitSetNEvents); socket_pos = AddWaitEventToSet(FeBeWaitSet, WL_SOCKET_WRITEABLE, MyProcPort->sock, NULL, NULL); latch_pos = AddWaitEventToSet(FeBeWaitSet, WL_LATCH_SET, PGINVALID_SOCKET, @@ -1959,33 +1959,41 @@ pq_settcpusertimeout(int timeout, Port *port) bool pq_check_connection(void) { -#if defined(POLLRDHUP) - /* - * POLLRDHUP is a Linux extension to poll(2) to detect sockets closed by - * the other end. We don't have a portable way to do that without - * actually trying to read or write data on other systems. We don't want - * to read because that would be confused by pipelined queries and COPY - * data. Perhaps in future we'll try to write a heartbeat message instead. - */ - struct pollfd pollfd; + WaitEvent events[FeBeWaitSetNEvents]; + bool result = true; int rc; - pollfd.fd = MyProcPort->sock; - pollfd.events = POLLOUT | POLLIN | POLLRDHUP; - pollfd.revents = 0; + /* + * It's OK to modify the socket event filter without restoring, because + * all FeBeWaitSet socket wait sites do the same. + */ + ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, WL_SOCKET_CLOSED, NULL); - rc = poll(&pollfd, 1, 0); + /* + * Temporarily silence the latch, because its higher priority event might + * hide the socket event we want to poll for. + */ + ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetLatchPos, WL_LATCH_SET, NULL); - if (rc < 0) + PG_TRY(); { - ereport(COMMERROR, - (errcode_for_socket_access(), - errmsg("could not poll socket: %m"))); - return false; + rc = WaitEventSetWait(FeBeWaitSet, 0, events, lengthof(events), 0); + for (int i = 0; i < rc; ++i) + { + if (events[i].events & WL_SOCKET_CLOSED) + result = false; + } } - else if (rc == 1 && (pollfd.revents & (POLLHUP | POLLRDHUP))) - return false; -#endif + PG_FINALLY(); + { + /* + * If WaitEventSetWait() reports an error, something must be pretty + * seriously wrong, but we should restore the latch on principle. + */ + ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetLatchPos, WL_LATCH_SET, + MyLatch); + } + PG_END_TRY(); - return true; + return result; } diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index f9504d3aec..57292b2364 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -12129,14 +12129,11 @@ check_huge_page_size(int *newval, void **extra, GucSource source) static bool check_client_connection_check_interval(int *newval, void **extra, GucSource source) { -#ifndef POLLRDHUP - /* Linux only, for now. See pq_check_connection(). */ - if (*newval != 0) + if (!WaitEventSetCanReportClosed() && *newval != 0) { - GUC_check_errdetail("client_connection_check_interval must be set to 0 on platforms that lack POLLRDHUP."); + GUC_check_errdetail("client_connection_check_interval must be set to 0 on this platform"); return false; } -#endif return true; } diff --git a/src/include/libpq/libpq.h b/src/include/libpq/libpq.h index 6b67a2a318..9f168a284f 100644 --- a/src/include/libpq/libpq.h +++ b/src/include/libpq/libpq.h @@ -62,6 +62,7 @@ extern WaitEventSet *FeBeWaitSet; #define FeBeWaitSetSocketPos 0 #define FeBeWaitSetLatchPos 1 +#define FeBeWaitSetNEvents 3 extern int StreamServerPort(int family, const char *hostName, unsigned short portNumber, const char *unixSocketDir, -- 2.33.1