On Tue, Feb 8, 2022 at 1:51 PM Thomas Munro <thomas.mu...@gmail.com> wrote: > (and one day we should make it dynamic and change udata to hold > an index instead of a pointer...)
Here's a patch like that. I'd originally sketched this out for another project, but I don't think I need it for that anymore. After this exchange I couldn't resist fleshing it out for a commitfest, just on useability grounds. Thoughts?
From 7d011ed1de06ee6aad850f72399f0d3c9217458f Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Fri, 11 Feb 2022 10:39:46 +1300 Subject: [PATCH] Expand WaitEventSet dynamically. Previously, CreateWaitEventSet() took an argument to say how many events could be added. Teach it to grow automatically instead. Also expose a "reserve" function in case a caller knows how many it needs and would like to avoid reallocation. Discussion: https://postgr.es/m/CA%2BhUKG%2BiaLp6ETo%2Bu0632yVUGO6eSMx4hKBbcc7zy88thBQy%3Dg%40mail.gmail.com --- src/backend/executor/nodeAppend.c | 2 +- src/backend/libpq/pqcomm.c | 2 +- src/backend/postmaster/pgstat.c | 2 +- src/backend/postmaster/syslogger.c | 2 +- src/backend/storage/ipc/latch.c | 201 +++++++++++++++-------------- src/include/storage/latch.h | 3 +- 6 files changed, 110 insertions(+), 102 deletions(-) diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c index 7937f1c88f..3ad0441756 100644 --- a/src/backend/executor/nodeAppend.c +++ b/src/backend/executor/nodeAppend.c @@ -1029,7 +1029,7 @@ ExecAppendAsyncEventWait(AppendState *node) /* We should never be called when there are no valid async subplans. */ Assert(node->as_nasyncremain > 0); - node->as_eventset = CreateWaitEventSet(CurrentMemoryContext, nevents); + node->as_eventset = CreateWaitEventSet(CurrentMemoryContext); AddWaitEventToSet(node->as_eventset, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET, NULL, NULL); diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c index 22eb04948e..99fcc447b0 100644 --- a/src/backend/libpq/pqcomm.c +++ b/src/backend/libpq/pqcomm.c @@ -204,7 +204,7 @@ pq_init(void) (errmsg("could not set socket to nonblocking mode: %m"))); #endif - FeBeWaitSet = CreateWaitEventSet(TopMemoryContext, 3); + FeBeWaitSet = CreateWaitEventSet(TopMemoryContext); socket_pos = AddWaitEventToSet(FeBeWaitSet, WL_SOCKET_WRITEABLE, MyProcPort->sock, NULL, NULL); latch_pos = AddWaitEventToSet(FeBeWaitSet, WL_LATCH_SET, PGINVALID_SOCKET, diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index 0646f53098..b4e791146c 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -3528,7 +3528,7 @@ PgstatCollectorMain(int argc, char *argv[]) pgStatDBHash = pgstat_read_statsfiles(InvalidOid, true, true); /* Prepare to wait for our latch or data in our socket. */ - wes = CreateWaitEventSet(CurrentMemoryContext, 3); + wes = CreateWaitEventSet(CurrentMemoryContext); AddWaitEventToSet(wes, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL); AddWaitEventToSet(wes, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL, NULL); AddWaitEventToSet(wes, WL_SOCKET_READABLE, pgStatSock, NULL, NULL); diff --git a/src/backend/postmaster/syslogger.c b/src/backend/postmaster/syslogger.c index 25e2131e31..d4a9a8af34 100644 --- a/src/backend/postmaster/syslogger.c +++ b/src/backend/postmaster/syslogger.c @@ -311,7 +311,7 @@ SysLoggerMain(int argc, char *argv[]) * syslog pipe, which implies that all other backends have exited * (including the postmaster). */ - wes = CreateWaitEventSet(CurrentMemoryContext, 2); + wes = CreateWaitEventSet(CurrentMemoryContext); AddWaitEventToSet(wes, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL); #ifndef WIN32 AddWaitEventToSet(wes, WL_SOCKET_READABLE, syslogPipe[0], NULL, NULL); diff --git a/src/backend/storage/ipc/latch.c b/src/backend/storage/ipc/latch.c index 5bb609b368..b5974d0f06 100644 --- a/src/backend/storage/ipc/latch.c +++ b/src/backend/storage/ipc/latch.c @@ -84,9 +84,32 @@ #error "no wait set implementation available" #endif +/* Each implementation needs an array of a certain object type. */ +#if defined(WAIT_USE_EPOLL) +typedef struct epoll_event WaitEventImpl; +#elif defined(WAIT_USE_KQUEUE) +typedef struct kevent WaitEventImpl; +#elif defined(WAIT_USE_POLL) +typedef struct pollfd WaitEventImpl; +#elif defined(WAIT_USE_WIN32) +typedef HANDLE WaitEventImpl; +#endif + +/* Windows needs an extra element in events_impl. */ +#if defined(WAIT_USE_WIN32) +#define EXTRA_EVENT_IMPL 1 +#else +#define EXTRA_EVENT_IMPL 0 +#endif + +/* Can be set lower to exercise reallocation code. */ +#define INITIAL_WAIT_EVENT_SET_SIZE 4 + /* typedef in latch.h */ struct WaitEventSet { + MemoryContext context; + int nevents; /* number of registered events */ int nevents_space; /* maximum number of events in this set */ @@ -112,26 +135,17 @@ struct WaitEventSet */ bool exit_on_postmaster_death; + /* + * Array, of nevents_space length, storing implementation-specific + * objects. For Windows this is of size nevents_space + 1. + */ + WaitEventImpl *events_impl; + #if defined(WAIT_USE_EPOLL) int epoll_fd; - /* epoll_wait returns events in a user provided arrays, allocate once */ - struct epoll_event *epoll_ret_events; #elif defined(WAIT_USE_KQUEUE) int kqueue_fd; - /* kevent returns events in a user provided arrays, allocate once */ - struct kevent *kqueue_ret_events; bool report_postmaster_not_running; -#elif defined(WAIT_USE_POLL) - /* poll expects events to be waited on every poll() call, prepare once */ - struct pollfd *pollfds; -#elif defined(WAIT_USE_WIN32) - - /* - * Array of windows events. The first element always contains - * pgwin32_signal_event, so the remaining elements are offset by one (i.e. - * event->pos + 1). - */ - HANDLE *handles; #endif }; @@ -293,7 +307,7 @@ InitializeLatchWaitSet(void) Assert(LatchWaitSet == NULL); /* Set up the WaitEventSet used by WaitLatch(). */ - LatchWaitSet = CreateWaitEventSet(TopMemoryContext, 2); + LatchWaitSet = CreateWaitEventSet(TopMemoryContext); latch_pos = AddWaitEventToSet(LatchWaitSet, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL); if (IsUnderPostmaster) @@ -502,7 +516,7 @@ WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, int ret = 0; int rc; WaitEvent event; - WaitEventSet *set = CreateWaitEventSet(CurrentMemoryContext, 3); + WaitEventSet *set = CreateWaitEventSet(CurrentMemoryContext); if (wakeEvents & WL_TIMEOUT) Assert(timeout >= 0); @@ -674,63 +688,47 @@ ResetLatch(Latch *latch) } /* - * Create a WaitEventSet with space for nevents different events to wait for. - * - * These events can then be efficiently waited upon together, using - * WaitEventSetWait(). + * Ensure that enough space exists for 'nevents'. */ -WaitEventSet * -CreateWaitEventSet(MemoryContext context, int nevents) +void +WaitEventSetReserve(WaitEventSet *set, int nevents) { - WaitEventSet *set; - char *data; - Size sz = 0; + WaitEventImpl *new_events_impl; + WaitEvent *new_events; - /* - * Use MAXALIGN size/alignment to guarantee that later uses of memory are - * aligned correctly. E.g. epoll_event might need 8 byte alignment on some - * platforms, but earlier allocations like WaitEventSet and WaitEvent - * might not sized to guarantee that when purely using sizeof(). - */ - sz += MAXALIGN(sizeof(WaitEventSet)); - sz += MAXALIGN(sizeof(WaitEvent) * nevents); - -#if defined(WAIT_USE_EPOLL) - sz += MAXALIGN(sizeof(struct epoll_event) * nevents); -#elif defined(WAIT_USE_KQUEUE) - sz += MAXALIGN(sizeof(struct kevent) * nevents); -#elif defined(WAIT_USE_POLL) - sz += MAXALIGN(sizeof(struct pollfd) * nevents); -#elif defined(WAIT_USE_WIN32) - /* need space for the pgwin32_signal_event */ - sz += MAXALIGN(sizeof(HANDLE) * (nevents + 1)); -#endif + if (nevents <= set->nevents_space) + return; - data = (char *) MemoryContextAllocZero(context, sz); + new_events = MemoryContextAllocZero(set->context, + sizeof(WaitEvent) * nevents); + if (set->events) + memcpy(new_events, set->events, sizeof(WaitEvent) * set->nevents); - set = (WaitEventSet *) data; - data += MAXALIGN(sizeof(WaitEventSet)); + new_events_impl = MemoryContextAllocZero(set->context, + sizeof(WaitEventImpl) * + (nevents + EXTRA_EVENT_IMPL)); + if (set->events_impl) + memcpy(new_events_impl, set->events_impl, + sizeof(WaitEventImpl) * (set->nevents + EXTRA_EVENT_IMPL)); - set->events = (WaitEvent *) data; - data += MAXALIGN(sizeof(WaitEvent) * nevents); + set->events = new_events; + set->events_impl = new_events_impl; + set->nevents_space = nevents; +} -#if defined(WAIT_USE_EPOLL) - set->epoll_ret_events = (struct epoll_event *) data; - data += MAXALIGN(sizeof(struct epoll_event) * nevents); -#elif defined(WAIT_USE_KQUEUE) - set->kqueue_ret_events = (struct kevent *) data; - data += MAXALIGN(sizeof(struct kevent) * nevents); -#elif defined(WAIT_USE_POLL) - set->pollfds = (struct pollfd *) data; - data += MAXALIGN(sizeof(struct pollfd) * nevents); -#elif defined(WAIT_USE_WIN32) - set->handles = (HANDLE) data; - data += MAXALIGN(sizeof(HANDLE) * nevents); -#endif +/* + * Create a WaitEventSet. + * + * These events can then be efficiently waited upon together, using + * WaitEventSetWait(). + */ +WaitEventSet * +CreateWaitEventSet(MemoryContext context) +{ + WaitEventSet *set; - set->latch = NULL; - set->nevents_space = nevents; - set->exit_on_postmaster_death = false; + set = MemoryContextAllocZero(context, sizeof(WaitEventSet)); + set->context = context; #if defined(WAIT_USE_EPOLL) if (!AcquireExternalFD()) @@ -775,9 +773,11 @@ CreateWaitEventSet(MemoryContext context, int nevents) * * Note: pgwin32_signal_event should be first to ensure that it will be * reported when multiple events are set. We want to guarantee that - * pending signals are serviced. + * pending signals are serviced. Therefore we need to reserve space + * eagerly in this implementation. */ - set->handles[0] = pgwin32_signal_event; + WaitEventSetReserve(set, INITIAL_WAIT_EVENT_SET_SIZE); + set->events_impl[0] = pgwin32_signal_event; StaticAssertStmt(WSA_INVALID_EVENT == NULL, ""); #endif @@ -821,11 +821,15 @@ FreeWaitEventSet(WaitEventSet *set) { /* Clean up the event object we created for the socket */ WSAEventSelect(cur_event->fd, NULL, 0); - WSACloseEvent(set->handles[cur_event->pos + 1]); + WSACloseEvent(set->events_impl[cur_event->pos + 1]); } } #endif + if (set->events) + pfree(set->events); + if (set->events_impl) + pfree(set->events_impl); pfree(set); } @@ -863,7 +867,11 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch, { WaitEvent *event; - /* not enough space */ + /* make sure we have enough space */ + if (set->nevents == set->nevents_space) + WaitEventSetReserve(set, + Max(INITIAL_WAIT_EVENT_SET_SIZE, + set->nevents_space * 2)); Assert(set->nevents < set->nevents_space); if (events == WL_EXIT_ON_PM_DEATH) @@ -1024,8 +1032,8 @@ WaitEventAdjustEpoll(WaitEventSet *set, WaitEvent *event, int action) struct epoll_event epoll_ev; int rc; - /* pointer to our event, returned by epoll_wait */ - epoll_ev.data.ptr = event; + /* index to our event, returned by epoll_wait */ + epoll_ev.data.u32 = event->pos; /* always wait for errors */ epoll_ev.events = EPOLLERR | EPOLLHUP; @@ -1069,7 +1077,7 @@ WaitEventAdjustEpoll(WaitEventSet *set, WaitEvent *event, int action) static void WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event) { - struct pollfd *pollfd = &set->pollfds[event->pos]; + struct pollfd *pollfd = &set->events_impl[event->pos]; pollfd->revents = 0; pollfd->fd = event->fd; @@ -1101,12 +1109,11 @@ WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event) #if defined(WAIT_USE_KQUEUE) /* - * On most BSD family systems, the udata member of struct kevent is of type - * void *, so we could directly convert to/from WaitEvent *. Unfortunately, - * NetBSD has it as intptr_t, so here we wallpaper over that difference with - * an lvalue cast. + * On most BSD systems, udata is of type void *. On NetBSD it's intptr_t. + * Wallpaper over that difference with an lvalue cast, so we can load and + * store event position. */ -#define AccessWaitEvent(k_ev) (*((WaitEvent **)(&(k_ev)->udata))) +#define AccessWaitEventPos(k_ev) (*((intptr_t *)(&(k_ev)->udata))) static inline void WaitEventAdjustKqueueAdd(struct kevent *k_ev, int filter, int action, @@ -1117,7 +1124,7 @@ WaitEventAdjustKqueueAdd(struct kevent *k_ev, int filter, int action, k_ev->flags = action; k_ev->fflags = 0; k_ev->data = 0; - AccessWaitEvent(k_ev) = event; + AccessWaitEventPos(k_ev) = event->pos; } static inline void @@ -1129,7 +1136,7 @@ WaitEventAdjustKqueueAddPostmaster(struct kevent *k_ev, WaitEvent *event) k_ev->flags = EV_ADD; k_ev->fflags = NOTE_EXIT; k_ev->data = 0; - AccessWaitEvent(k_ev) = event; + AccessWaitEventPos(k_ev) = event->pos; } static inline void @@ -1141,7 +1148,7 @@ WaitEventAdjustKqueueAddLatch(struct kevent *k_ev, WaitEvent *event) k_ev->flags = EV_ADD; k_ev->fflags = 0; k_ev->data = 0; - AccessWaitEvent(k_ev) = event; + AccessWaitEventPos(k_ev) = event->pos; } /* @@ -1251,7 +1258,7 @@ WaitEventAdjustKqueue(WaitEventSet *set, WaitEvent *event, int old_events) static void WaitEventAdjustWin32(WaitEventSet *set, WaitEvent *event) { - HANDLE *handle = &set->handles[event->pos + 1]; + HANDLE *handle = &set->events_impl[event->pos + 1]; if (event->events == WL_LATCH_SET) { @@ -1446,7 +1453,7 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, struct epoll_event *cur_epoll_event; /* Sleep */ - rc = epoll_wait(set->epoll_fd, set->epoll_ret_events, + rc = epoll_wait(set->epoll_fd, set->events_impl, nevents, cur_timeout); /* Check return code */ @@ -1474,13 +1481,13 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, * until they're either all processed, or we've returned all the events * the caller desired. */ - for (cur_epoll_event = set->epoll_ret_events; - cur_epoll_event < (set->epoll_ret_events + rc) && + for (cur_epoll_event = set->events_impl; + cur_epoll_event < (set->events_impl + rc) && returned_events < nevents; cur_epoll_event++) { - /* epoll's data pointer is set to the associated WaitEvent */ - cur_event = (WaitEvent *) cur_epoll_event->data.ptr; + /* epoll's data u32 is set to the associated WaitEvent */ + cur_event = (WaitEvent *) &set->events[cur_epoll_event->data.u32]; occurred_events->pos = cur_event->pos; occurred_events->user_data = cur_event->user_data; @@ -1598,7 +1605,7 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, /* Sleep */ rc = kevent(set->kqueue_fd, NULL, 0, - set->kqueue_ret_events, nevents, + set->events_impl, nevents, timeout_p); /* Check return code */ @@ -1626,13 +1633,13 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, * until they're either all processed, or we've returned all the events * the caller desired. */ - for (cur_kqueue_event = set->kqueue_ret_events; - cur_kqueue_event < (set->kqueue_ret_events + rc) && + for (cur_kqueue_event = set->events_impl; + cur_kqueue_event < (set->events_impl + rc) && returned_events < nevents; cur_kqueue_event++) { - /* kevent's udata points to the associated WaitEvent */ - cur_event = AccessWaitEvent(cur_kqueue_event); + /* kevent's udata is the index of the associated WaitEvent */ + cur_event = &set->events[AccessWaitEventPos(cur_kqueue_event)]; occurred_events->pos = cur_event->pos; occurred_events->user_data = cur_event->user_data; @@ -1715,7 +1722,7 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, struct pollfd *cur_pollfd; /* Sleep */ - rc = poll(set->pollfds, set->nevents, (int) cur_timeout); + rc = poll(set->events_impl, set->nevents, (int) cur_timeout); /* Check return code */ if (rc < 0) @@ -1737,7 +1744,7 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, return -1; } - for (cur_event = set->events, cur_pollfd = set->pollfds; + for (cur_event = set->events, cur_pollfd = set->events_impl; cur_event < (set->events + set->nevents) && returned_events < nevents; cur_event++, cur_pollfd++) @@ -1888,7 +1895,7 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, * * Need to wait for ->nevents + 1, because signal handle is in [0]. */ - rc = WaitForMultipleObjects(set->nevents + 1, set->handles, FALSE, + rc = WaitForMultipleObjects(set->nevents + 1, set->events_impl, FALSE, cur_timeout); /* Check return code */ @@ -1924,7 +1931,7 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, * We cannot use set->latch->event to reset the fired event if we * aren't waiting on this latch now. */ - if (!ResetEvent(set->handles[cur_event->pos + 1])) + if (!ResetEvent(set->events_impl[cur_event->pos + 1])) elog(ERROR, "ResetEvent failed: error code %lu", GetLastError()); if (set->latch && set->latch->is_set) @@ -1957,7 +1964,7 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, else if (cur_event->events & WL_SOCKET_MASK) { WSANETWORKEVENTS resEvents; - HANDLE handle = set->handles[cur_event->pos + 1]; + HANDLE handle = set->events_impl[cur_event->pos + 1]; Assert(cur_event->fd); diff --git a/src/include/storage/latch.h b/src/include/storage/latch.h index 3aa7b33834..9d4c857249 100644 --- a/src/include/storage/latch.h +++ b/src/include/storage/latch.h @@ -165,7 +165,7 @@ extern void SetLatch(Latch *latch); extern void ResetLatch(Latch *latch); extern void ShutdownLatchSupport(void); -extern WaitEventSet *CreateWaitEventSet(MemoryContext context, int nevents); +extern WaitEventSet *CreateWaitEventSet(MemoryContext context); extern void FreeWaitEventSet(WaitEventSet *set); extern int AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch, void *user_data); @@ -179,6 +179,7 @@ extern int WaitLatch(Latch *latch, int wakeEvents, long timeout, extern int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info); extern void InitializeLatchWaitSet(void); +extern void WaitEventSetReserve(WaitEventSet *set, int nevents); extern int GetNumRegisteredWaitEvents(WaitEventSet *set); #endif /* LATCH_H */ -- 2.30.2