On Tue, Feb 8, 2022 at 1:51 PM Thomas Munro <thomas.mu...@gmail.com> wrote:
> (and one day we should make it dynamic and change udata to hold
> an index instead of a pointer...)

Here's a patch like that.

I'd originally sketched this out for another project, but I don't
think I need it for that anymore.  After this exchange I couldn't
resist fleshing it out for a commitfest, just on useability grounds.
Thoughts?
From 7d011ed1de06ee6aad850f72399f0d3c9217458f Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.mu...@gmail.com>
Date: Fri, 11 Feb 2022 10:39:46 +1300
Subject: [PATCH] Expand WaitEventSet dynamically.

Previously, CreateWaitEventSet() took an argument to say how many events
could be added.  Teach it to grow automatically instead.  Also expose a
"reserve" function in case a caller knows how many it needs and would
like to avoid reallocation.

Discussion: https://postgr.es/m/CA%2BhUKG%2BiaLp6ETo%2Bu0632yVUGO6eSMx4hKBbcc7zy88thBQy%3Dg%40mail.gmail.com
---
 src/backend/executor/nodeAppend.c  |   2 +-
 src/backend/libpq/pqcomm.c         |   2 +-
 src/backend/postmaster/pgstat.c    |   2 +-
 src/backend/postmaster/syslogger.c |   2 +-
 src/backend/storage/ipc/latch.c    | 201 +++++++++++++++--------------
 src/include/storage/latch.h        |   3 +-
 6 files changed, 110 insertions(+), 102 deletions(-)

diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c
index 7937f1c88f..3ad0441756 100644
--- a/src/backend/executor/nodeAppend.c
+++ b/src/backend/executor/nodeAppend.c
@@ -1029,7 +1029,7 @@ ExecAppendAsyncEventWait(AppendState *node)
 	/* We should never be called when there are no valid async subplans. */
 	Assert(node->as_nasyncremain > 0);
 
-	node->as_eventset = CreateWaitEventSet(CurrentMemoryContext, nevents);
+	node->as_eventset = CreateWaitEventSet(CurrentMemoryContext);
 	AddWaitEventToSet(node->as_eventset, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
 					  NULL, NULL);
 
diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c
index 22eb04948e..99fcc447b0 100644
--- a/src/backend/libpq/pqcomm.c
+++ b/src/backend/libpq/pqcomm.c
@@ -204,7 +204,7 @@ pq_init(void)
 				(errmsg("could not set socket to nonblocking mode: %m")));
 #endif
 
-	FeBeWaitSet = CreateWaitEventSet(TopMemoryContext, 3);
+	FeBeWaitSet = CreateWaitEventSet(TopMemoryContext);
 	socket_pos = AddWaitEventToSet(FeBeWaitSet, WL_SOCKET_WRITEABLE,
 								   MyProcPort->sock, NULL, NULL);
 	latch_pos = AddWaitEventToSet(FeBeWaitSet, WL_LATCH_SET, PGINVALID_SOCKET,
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 0646f53098..b4e791146c 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -3528,7 +3528,7 @@ PgstatCollectorMain(int argc, char *argv[])
 	pgStatDBHash = pgstat_read_statsfiles(InvalidOid, true, true);
 
 	/* Prepare to wait for our latch or data in our socket. */
-	wes = CreateWaitEventSet(CurrentMemoryContext, 3);
+	wes = CreateWaitEventSet(CurrentMemoryContext);
 	AddWaitEventToSet(wes, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL);
 	AddWaitEventToSet(wes, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL, NULL);
 	AddWaitEventToSet(wes, WL_SOCKET_READABLE, pgStatSock, NULL, NULL);
diff --git a/src/backend/postmaster/syslogger.c b/src/backend/postmaster/syslogger.c
index 25e2131e31..d4a9a8af34 100644
--- a/src/backend/postmaster/syslogger.c
+++ b/src/backend/postmaster/syslogger.c
@@ -311,7 +311,7 @@ SysLoggerMain(int argc, char *argv[])
 	 * syslog pipe, which implies that all other backends have exited
 	 * (including the postmaster).
 	 */
-	wes = CreateWaitEventSet(CurrentMemoryContext, 2);
+	wes = CreateWaitEventSet(CurrentMemoryContext);
 	AddWaitEventToSet(wes, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL);
 #ifndef WIN32
 	AddWaitEventToSet(wes, WL_SOCKET_READABLE, syslogPipe[0], NULL, NULL);
diff --git a/src/backend/storage/ipc/latch.c b/src/backend/storage/ipc/latch.c
index 5bb609b368..b5974d0f06 100644
--- a/src/backend/storage/ipc/latch.c
+++ b/src/backend/storage/ipc/latch.c
@@ -84,9 +84,32 @@
 #error "no wait set implementation available"
 #endif
 
+/* Each implementation needs an array of a certain object type. */
+#if defined(WAIT_USE_EPOLL)
+typedef struct epoll_event WaitEventImpl;
+#elif defined(WAIT_USE_KQUEUE)
+typedef struct kevent WaitEventImpl;
+#elif defined(WAIT_USE_POLL)
+typedef struct pollfd WaitEventImpl;
+#elif defined(WAIT_USE_WIN32)
+typedef HANDLE WaitEventImpl;
+#endif
+
+/* Windows needs an extra element in events_impl. */
+#if defined(WAIT_USE_WIN32)
+#define EXTRA_EVENT_IMPL 1
+#else
+#define EXTRA_EVENT_IMPL 0
+#endif
+
+/* Can be set lower to exercise reallocation code. */
+#define INITIAL_WAIT_EVENT_SET_SIZE 4
+
 /* typedef in latch.h */
 struct WaitEventSet
 {
+	MemoryContext context;
+
 	int			nevents;		/* number of registered events */
 	int			nevents_space;	/* maximum number of events in this set */
 
@@ -112,26 +135,17 @@ struct WaitEventSet
 	 */
 	bool		exit_on_postmaster_death;
 
+	/*
+	 * Array, of nevents_space length, storing implementation-specific
+	 * objects.  For Windows this is of size nevents_space + 1.
+	 */
+	WaitEventImpl *events_impl;
+
 #if defined(WAIT_USE_EPOLL)
 	int			epoll_fd;
-	/* epoll_wait returns events in a user provided arrays, allocate once */
-	struct epoll_event *epoll_ret_events;
 #elif defined(WAIT_USE_KQUEUE)
 	int			kqueue_fd;
-	/* kevent returns events in a user provided arrays, allocate once */
-	struct kevent *kqueue_ret_events;
 	bool		report_postmaster_not_running;
-#elif defined(WAIT_USE_POLL)
-	/* poll expects events to be waited on every poll() call, prepare once */
-	struct pollfd *pollfds;
-#elif defined(WAIT_USE_WIN32)
-
-	/*
-	 * Array of windows events. The first element always contains
-	 * pgwin32_signal_event, so the remaining elements are offset by one (i.e.
-	 * event->pos + 1).
-	 */
-	HANDLE	   *handles;
 #endif
 };
 
@@ -293,7 +307,7 @@ InitializeLatchWaitSet(void)
 	Assert(LatchWaitSet == NULL);
 
 	/* Set up the WaitEventSet used by WaitLatch(). */
-	LatchWaitSet = CreateWaitEventSet(TopMemoryContext, 2);
+	LatchWaitSet = CreateWaitEventSet(TopMemoryContext);
 	latch_pos = AddWaitEventToSet(LatchWaitSet, WL_LATCH_SET, PGINVALID_SOCKET,
 								  MyLatch, NULL);
 	if (IsUnderPostmaster)
@@ -502,7 +516,7 @@ WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock,
 	int			ret = 0;
 	int			rc;
 	WaitEvent	event;
-	WaitEventSet *set = CreateWaitEventSet(CurrentMemoryContext, 3);
+	WaitEventSet *set = CreateWaitEventSet(CurrentMemoryContext);
 
 	if (wakeEvents & WL_TIMEOUT)
 		Assert(timeout >= 0);
@@ -674,63 +688,47 @@ ResetLatch(Latch *latch)
 }
 
 /*
- * Create a WaitEventSet with space for nevents different events to wait for.
- *
- * These events can then be efficiently waited upon together, using
- * WaitEventSetWait().
+ * Ensure that enough space exists for 'nevents'.
  */
-WaitEventSet *
-CreateWaitEventSet(MemoryContext context, int nevents)
+void
+WaitEventSetReserve(WaitEventSet *set, int nevents)
 {
-	WaitEventSet *set;
-	char	   *data;
-	Size		sz = 0;
+	WaitEventImpl *new_events_impl;
+	WaitEvent  *new_events;
 
-	/*
-	 * Use MAXALIGN size/alignment to guarantee that later uses of memory are
-	 * aligned correctly. E.g. epoll_event might need 8 byte alignment on some
-	 * platforms, but earlier allocations like WaitEventSet and WaitEvent
-	 * might not sized to guarantee that when purely using sizeof().
-	 */
-	sz += MAXALIGN(sizeof(WaitEventSet));
-	sz += MAXALIGN(sizeof(WaitEvent) * nevents);
-
-#if defined(WAIT_USE_EPOLL)
-	sz += MAXALIGN(sizeof(struct epoll_event) * nevents);
-#elif defined(WAIT_USE_KQUEUE)
-	sz += MAXALIGN(sizeof(struct kevent) * nevents);
-#elif defined(WAIT_USE_POLL)
-	sz += MAXALIGN(sizeof(struct pollfd) * nevents);
-#elif defined(WAIT_USE_WIN32)
-	/* need space for the pgwin32_signal_event */
-	sz += MAXALIGN(sizeof(HANDLE) * (nevents + 1));
-#endif
+	if (nevents <= set->nevents_space)
+		return;
 
-	data = (char *) MemoryContextAllocZero(context, sz);
+	new_events = MemoryContextAllocZero(set->context,
+										sizeof(WaitEvent) * nevents);
+	if (set->events)
+		memcpy(new_events, set->events, sizeof(WaitEvent) * set->nevents);
 
-	set = (WaitEventSet *) data;
-	data += MAXALIGN(sizeof(WaitEventSet));
+	new_events_impl = MemoryContextAllocZero(set->context,
+											 sizeof(WaitEventImpl) *
+											 (nevents + EXTRA_EVENT_IMPL));
+	if (set->events_impl)
+		memcpy(new_events_impl, set->events_impl,
+			   sizeof(WaitEventImpl) * (set->nevents + EXTRA_EVENT_IMPL));
 
-	set->events = (WaitEvent *) data;
-	data += MAXALIGN(sizeof(WaitEvent) * nevents);
+	set->events = new_events;
+	set->events_impl = new_events_impl;
+	set->nevents_space = nevents;
+}
 
-#if defined(WAIT_USE_EPOLL)
-	set->epoll_ret_events = (struct epoll_event *) data;
-	data += MAXALIGN(sizeof(struct epoll_event) * nevents);
-#elif defined(WAIT_USE_KQUEUE)
-	set->kqueue_ret_events = (struct kevent *) data;
-	data += MAXALIGN(sizeof(struct kevent) * nevents);
-#elif defined(WAIT_USE_POLL)
-	set->pollfds = (struct pollfd *) data;
-	data += MAXALIGN(sizeof(struct pollfd) * nevents);
-#elif defined(WAIT_USE_WIN32)
-	set->handles = (HANDLE) data;
-	data += MAXALIGN(sizeof(HANDLE) * nevents);
-#endif
+/*
+ * Create a WaitEventSet.
+ *
+ * These events can then be efficiently waited upon together, using
+ * WaitEventSetWait().
+ */
+WaitEventSet *
+CreateWaitEventSet(MemoryContext context)
+{
+	WaitEventSet *set;
 
-	set->latch = NULL;
-	set->nevents_space = nevents;
-	set->exit_on_postmaster_death = false;
+	set = MemoryContextAllocZero(context, sizeof(WaitEventSet));
+	set->context = context;
 
 #if defined(WAIT_USE_EPOLL)
 	if (!AcquireExternalFD())
@@ -775,9 +773,11 @@ CreateWaitEventSet(MemoryContext context, int nevents)
 	 *
 	 * Note: pgwin32_signal_event should be first to ensure that it will be
 	 * reported when multiple events are set.  We want to guarantee that
-	 * pending signals are serviced.
+	 * pending signals are serviced.  Therefore we need to reserve space
+	 * eagerly in this implementation.
 	 */
-	set->handles[0] = pgwin32_signal_event;
+	WaitEventSetReserve(set, INITIAL_WAIT_EVENT_SET_SIZE);
+	set->events_impl[0] = pgwin32_signal_event;
 	StaticAssertStmt(WSA_INVALID_EVENT == NULL, "");
 #endif
 
@@ -821,11 +821,15 @@ FreeWaitEventSet(WaitEventSet *set)
 		{
 			/* Clean up the event object we created for the socket */
 			WSAEventSelect(cur_event->fd, NULL, 0);
-			WSACloseEvent(set->handles[cur_event->pos + 1]);
+			WSACloseEvent(set->events_impl[cur_event->pos + 1]);
 		}
 	}
 #endif
 
+	if (set->events)
+		pfree(set->events);
+	if (set->events_impl)
+		pfree(set->events_impl);
 	pfree(set);
 }
 
@@ -863,7 +867,11 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
 {
 	WaitEvent  *event;
 
-	/* not enough space */
+	/* make sure we have enough space */
+	if (set->nevents == set->nevents_space)
+		WaitEventSetReserve(set,
+							Max(INITIAL_WAIT_EVENT_SET_SIZE,
+								set->nevents_space * 2));
 	Assert(set->nevents < set->nevents_space);
 
 	if (events == WL_EXIT_ON_PM_DEATH)
@@ -1024,8 +1032,8 @@ WaitEventAdjustEpoll(WaitEventSet *set, WaitEvent *event, int action)
 	struct epoll_event epoll_ev;
 	int			rc;
 
-	/* pointer to our event, returned by epoll_wait */
-	epoll_ev.data.ptr = event;
+	/* index to our event, returned by epoll_wait */
+	epoll_ev.data.u32 = event->pos;
 	/* always wait for errors */
 	epoll_ev.events = EPOLLERR | EPOLLHUP;
 
@@ -1069,7 +1077,7 @@ WaitEventAdjustEpoll(WaitEventSet *set, WaitEvent *event, int action)
 static void
 WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event)
 {
-	struct pollfd *pollfd = &set->pollfds[event->pos];
+	struct pollfd *pollfd = &set->events_impl[event->pos];
 
 	pollfd->revents = 0;
 	pollfd->fd = event->fd;
@@ -1101,12 +1109,11 @@ WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event)
 #if defined(WAIT_USE_KQUEUE)
 
 /*
- * On most BSD family systems, the udata member of struct kevent is of type
- * void *, so we could directly convert to/from WaitEvent *.  Unfortunately,
- * NetBSD has it as intptr_t, so here we wallpaper over that difference with
- * an lvalue cast.
+ * On most BSD systems, udata is of type void *.  On NetBSD it's intptr_t.
+ * Wallpaper over that difference with an lvalue cast, so we can load and
+ * store event position.
  */
-#define AccessWaitEvent(k_ev) (*((WaitEvent **)(&(k_ev)->udata)))
+#define AccessWaitEventPos(k_ev) (*((intptr_t *)(&(k_ev)->udata)))
 
 static inline void
 WaitEventAdjustKqueueAdd(struct kevent *k_ev, int filter, int action,
@@ -1117,7 +1124,7 @@ WaitEventAdjustKqueueAdd(struct kevent *k_ev, int filter, int action,
 	k_ev->flags = action;
 	k_ev->fflags = 0;
 	k_ev->data = 0;
-	AccessWaitEvent(k_ev) = event;
+	AccessWaitEventPos(k_ev) = event->pos;
 }
 
 static inline void
@@ -1129,7 +1136,7 @@ WaitEventAdjustKqueueAddPostmaster(struct kevent *k_ev, WaitEvent *event)
 	k_ev->flags = EV_ADD;
 	k_ev->fflags = NOTE_EXIT;
 	k_ev->data = 0;
-	AccessWaitEvent(k_ev) = event;
+	AccessWaitEventPos(k_ev) = event->pos;
 }
 
 static inline void
@@ -1141,7 +1148,7 @@ WaitEventAdjustKqueueAddLatch(struct kevent *k_ev, WaitEvent *event)
 	k_ev->flags = EV_ADD;
 	k_ev->fflags = 0;
 	k_ev->data = 0;
-	AccessWaitEvent(k_ev) = event;
+	AccessWaitEventPos(k_ev) = event->pos;
 }
 
 /*
@@ -1251,7 +1258,7 @@ WaitEventAdjustKqueue(WaitEventSet *set, WaitEvent *event, int old_events)
 static void
 WaitEventAdjustWin32(WaitEventSet *set, WaitEvent *event)
 {
-	HANDLE	   *handle = &set->handles[event->pos + 1];
+	HANDLE	   *handle = &set->events_impl[event->pos + 1];
 
 	if (event->events == WL_LATCH_SET)
 	{
@@ -1446,7 +1453,7 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
 	struct epoll_event *cur_epoll_event;
 
 	/* Sleep */
-	rc = epoll_wait(set->epoll_fd, set->epoll_ret_events,
+	rc = epoll_wait(set->epoll_fd, set->events_impl,
 					nevents, cur_timeout);
 
 	/* Check return code */
@@ -1474,13 +1481,13 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
 	 * until they're either all processed, or we've returned all the events
 	 * the caller desired.
 	 */
-	for (cur_epoll_event = set->epoll_ret_events;
-		 cur_epoll_event < (set->epoll_ret_events + rc) &&
+	for (cur_epoll_event = set->events_impl;
+		 cur_epoll_event < (set->events_impl + rc) &&
 		 returned_events < nevents;
 		 cur_epoll_event++)
 	{
-		/* epoll's data pointer is set to the associated WaitEvent */
-		cur_event = (WaitEvent *) cur_epoll_event->data.ptr;
+		/* epoll's data u32 is set to the associated WaitEvent */
+		cur_event = (WaitEvent *) &set->events[cur_epoll_event->data.u32];
 
 		occurred_events->pos = cur_event->pos;
 		occurred_events->user_data = cur_event->user_data;
@@ -1598,7 +1605,7 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
 
 	/* Sleep */
 	rc = kevent(set->kqueue_fd, NULL, 0,
-				set->kqueue_ret_events, nevents,
+				set->events_impl, nevents,
 				timeout_p);
 
 	/* Check return code */
@@ -1626,13 +1633,13 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
 	 * until they're either all processed, or we've returned all the events
 	 * the caller desired.
 	 */
-	for (cur_kqueue_event = set->kqueue_ret_events;
-		 cur_kqueue_event < (set->kqueue_ret_events + rc) &&
+	for (cur_kqueue_event = set->events_impl;
+		 cur_kqueue_event < (set->events_impl + rc) &&
 		 returned_events < nevents;
 		 cur_kqueue_event++)
 	{
-		/* kevent's udata points to the associated WaitEvent */
-		cur_event = AccessWaitEvent(cur_kqueue_event);
+		/* kevent's udata is the index of the associated WaitEvent */
+		cur_event = &set->events[AccessWaitEventPos(cur_kqueue_event)];
 
 		occurred_events->pos = cur_event->pos;
 		occurred_events->user_data = cur_event->user_data;
@@ -1715,7 +1722,7 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
 	struct pollfd *cur_pollfd;
 
 	/* Sleep */
-	rc = poll(set->pollfds, set->nevents, (int) cur_timeout);
+	rc = poll(set->events_impl, set->nevents, (int) cur_timeout);
 
 	/* Check return code */
 	if (rc < 0)
@@ -1737,7 +1744,7 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
 		return -1;
 	}
 
-	for (cur_event = set->events, cur_pollfd = set->pollfds;
+	for (cur_event = set->events, cur_pollfd = set->events_impl;
 		 cur_event < (set->events + set->nevents) &&
 		 returned_events < nevents;
 		 cur_event++, cur_pollfd++)
@@ -1888,7 +1895,7 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
 	 *
 	 * Need to wait for ->nevents + 1, because signal handle is in [0].
 	 */
-	rc = WaitForMultipleObjects(set->nevents + 1, set->handles, FALSE,
+	rc = WaitForMultipleObjects(set->nevents + 1, set->events_impl, FALSE,
 								cur_timeout);
 
 	/* Check return code */
@@ -1924,7 +1931,7 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
 		 * We cannot use set->latch->event to reset the fired event if we
 		 * aren't waiting on this latch now.
 		 */
-		if (!ResetEvent(set->handles[cur_event->pos + 1]))
+		if (!ResetEvent(set->events_impl[cur_event->pos + 1]))
 			elog(ERROR, "ResetEvent failed: error code %lu", GetLastError());
 
 		if (set->latch && set->latch->is_set)
@@ -1957,7 +1964,7 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
 	else if (cur_event->events & WL_SOCKET_MASK)
 	{
 		WSANETWORKEVENTS resEvents;
-		HANDLE		handle = set->handles[cur_event->pos + 1];
+		HANDLE		handle = set->events_impl[cur_event->pos + 1];
 
 		Assert(cur_event->fd);
 
diff --git a/src/include/storage/latch.h b/src/include/storage/latch.h
index 3aa7b33834..9d4c857249 100644
--- a/src/include/storage/latch.h
+++ b/src/include/storage/latch.h
@@ -165,7 +165,7 @@ extern void SetLatch(Latch *latch);
 extern void ResetLatch(Latch *latch);
 extern void ShutdownLatchSupport(void);
 
-extern WaitEventSet *CreateWaitEventSet(MemoryContext context, int nevents);
+extern WaitEventSet *CreateWaitEventSet(MemoryContext context);
 extern void FreeWaitEventSet(WaitEventSet *set);
 extern int	AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd,
 							  Latch *latch, void *user_data);
@@ -179,6 +179,7 @@ extern int	WaitLatch(Latch *latch, int wakeEvents, long timeout,
 extern int	WaitLatchOrSocket(Latch *latch, int wakeEvents,
 							  pgsocket sock, long timeout, uint32 wait_event_info);
 extern void InitializeLatchWaitSet(void);
+extern void WaitEventSetReserve(WaitEventSet *set, int nevents);
 extern int	GetNumRegisteredWaitEvents(WaitEventSet *set);
 
 #endif							/* LATCH_H */
-- 
2.30.2

Reply via email to