On Wed, 2023-03-08 at 11:25 +0100, Drouvot, Bertrand wrote: > - Maybe ConditionVariableEventSleep() should take care of the > “WaitEventSetWait returns 1 and cvEvent.event == WL_POSTMASTER_DEATH” > case?
Thank you, done. I think the nearby line was also wrong, returning true when there was no timeout. I combined the lines and got rid of the early return so it can check the list and timeout condition like normal. Attached. > - Maybe ConditionVariableEventSleep() could accept and deal with the > CV being NULL? > I used it in the POC attached to handle logical decoding on the > primary server case. > One option should be to create a dedicated CV for that case though. I don't think it's a good idea to have a CV-based API that doesn't need a CV. Wouldn't that just be a normal WaitEventSet? > - In the POC attached I had to add this extra condition “(cv && > !RecoveryInProgress())” to avoid waiting on the timeout when there is > a promotion. > That makes me think that we may want to add 2 extra parameters (as 2 > functions returning a bool?) to ConditionVariableEventSleep() > to check whether or not we still want to test the socket or the CV > wake up in each loop iteration. That seems like a complex API. Would it work to signal the CV during promotion instead? > Also 3 additional remarks: > > 1) About InitializeConditionVariableWaitSet() and > ConditionVariableWaitSetCreate(): I'm not sure about the naming as > there is no CV yet (they "just" deal with WaitEventSet). It's a WaitEventSet that contains the conditions always required for any CV, and allows you to add in more. > 3) > > I wonder if there is no race conditions: ConditionVariableWaitSet is > being initialized with PGINVALID_SOCKET > as WL_LATCH_SET and might be also (if IsUnderPostmaster) be > initialized with PGINVALID_SOCKET as WL_EXIT_ON_PM_DEATH. > > So IIUC, the patch is introducing 2 new possible source of wake up. Those should be the same conditions already required by ConditionVariableTimedSleep() in master, right? Regards, Jeff Davis
From 2f05cab9012950ae9290fccbb6366d50fc01553e Mon Sep 17 00:00:00 2001 From: Jeff Davis <j...@j-davis.com> Date: Wed, 1 Mar 2023 20:02:42 -0800 Subject: [PATCH v2] Introduce ConditionVariableEventSleep(). The new API takes a WaitEventSet which can include socket events. The WaitEventSet must have been created by ConditionVariableWaitSetCreate(), another new function, so that it includes the wait events necessary for a condition variable. --- src/backend/storage/lmgr/condition_variable.c | 106 ++++++++++++++++-- src/backend/storage/lmgr/proc.c | 6 + src/backend/utils/init/miscinit.c | 1 + src/include/storage/condition_variable.h | 10 ++ 4 files changed, 115 insertions(+), 8 deletions(-) diff --git a/src/backend/storage/lmgr/condition_variable.c b/src/backend/storage/lmgr/condition_variable.c index 7e2bbf46d9..3dbfa7468b 100644 --- a/src/backend/storage/lmgr/condition_variable.c +++ b/src/backend/storage/lmgr/condition_variable.c @@ -27,9 +27,29 @@ #include "storage/spin.h" #include "utils/memutils.h" +#define ConditionVariableWaitSetLatchPos 0 + /* Initially, we are not prepared to sleep on any condition variable. */ static ConditionVariable *cv_sleep_target = NULL; +/* Used by ConditionVariableSleep() and ConditionVariableTimedSleep(). */ +static WaitEventSet *ConditionVariableWaitSet = NULL; + +/* + * Initialize the process-local condition variable WaitEventSet. + * + * This must be called once during startup of any process that can wait on + * condition variables, before it issues any ConditionVariableInit() calls. + */ +void +InitializeConditionVariableWaitSet(void) +{ + Assert(ConditionVariableWaitSet == NULL); + + ConditionVariableWaitSet = ConditionVariableWaitSetCreate( + TopMemoryContext, 0); +} + /* * Initialize a condition variable. */ @@ -40,6 +60,51 @@ ConditionVariableInit(ConditionVariable *cv) proclist_init(&cv->wakeup); } +/* + * Create a WaitEventSet for ConditionVariableEventSleep(). This should be + * used when the caller of ConditionVariableEventSleep() would like to wake up + * on either the condition variable signal or a socket event. For example: + * + * ConditionVariableInit(&cv); + * waitset = ConditionVariableWaitSetCreate(mcxt, 1); + * event_pos = AddWaitEventToSet(waitset, 0, sock, NULL, NULL); + * ... + * ConditionVariablePrepareToSleep(&cv); + * while (...condition not met...) + * { + * socket_wait_events = ... + * ModifyWaitEvent(waitset, event_pos, socket_wait_events, NULL); + * ConditionVariableEventSleep(&cv, waitset, ...); + * } + * ConditionVariableCancelSleep(); + * + * The waitset is created with the standard events for a condition variable, + * and room for adding n_socket_events additional socket events. The + * initially-filled event positions should not be modified, but added socket + * events can be modified. The same waitset can be used for multiple condition + * variables as long as the callers of ConditionVariableEventSleep() are + * interested in the same sockets. + */ +WaitEventSet * +ConditionVariableWaitSetCreate(MemoryContext mcxt, int n_socket_events) +{ + int latch_pos PG_USED_FOR_ASSERTS_ONLY; + int n_cv_events = IsUnderPostmaster ? 2 : 1; + int nevents = n_cv_events + n_socket_events; + WaitEventSet *waitset = CreateWaitEventSet(mcxt, nevents); + + latch_pos = AddWaitEventToSet(waitset, WL_LATCH_SET, PGINVALID_SOCKET, + MyLatch, NULL); + + if (IsUnderPostmaster) + AddWaitEventToSet(waitset, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET, + NULL, NULL); + + Assert(latch_pos == ConditionVariableWaitSetLatchPos); + + return waitset; +} + /* * Prepare to wait on a given condition variable. * @@ -97,7 +162,8 @@ ConditionVariablePrepareToSleep(ConditionVariable *cv) void ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info) { - (void) ConditionVariableTimedSleep(cv, -1 /* no timeout */ , + (void) ConditionVariableEventSleep(cv, ConditionVariableWaitSet, + -1 /* no timeout */ , wait_event_info); } @@ -111,11 +177,27 @@ ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info) bool ConditionVariableTimedSleep(ConditionVariable *cv, long timeout, uint32 wait_event_info) +{ + return ConditionVariableEventSleep(cv, ConditionVariableWaitSet, timeout, + wait_event_info); +} + +/* + * Wait for a condition variable to be signaled, a timeout to be reached, or a + * socket event in the given waitset. The waitset must have been created by + * ConditionVariableWaitSetCreate(). + * + * Returns true when timeout expires, otherwise returns false. + * + * See ConditionVariableSleep() for general usage. + */ +bool +ConditionVariableEventSleep(ConditionVariable *cv, WaitEventSet *waitset, + long timeout, uint32 wait_event_info) { long cur_timeout = -1; instr_time start_time; instr_time cur_time; - int wait_events; /* * If the caller didn't prepare to sleep explicitly, then do so now and @@ -147,24 +229,32 @@ ConditionVariableTimedSleep(ConditionVariable *cv, long timeout, INSTR_TIME_SET_CURRENT(start_time); Assert(timeout >= 0 && timeout <= INT_MAX); cur_timeout = timeout; - wait_events = WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH; } - else - wait_events = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH; while (true) { bool done = false; + WaitEvent cvEvent; + int nevents; /* - * Wait for latch to be set. (If we're awakened for some other - * reason, the code below will cope anyway.) + * Wait for latch to be set, or other events which will be handled + * below. */ - (void) WaitLatch(MyLatch, wait_events, cur_timeout, wait_event_info); + nevents = WaitEventSetWait(waitset, cur_timeout, &cvEvent, + 1, wait_event_info); /* Reset latch before examining the state of the wait list. */ ResetLatch(MyLatch); + /* + * If the wakeup was due to a socket event or postmaster death, then + * we must return to the caller. + */ + if (nevents == 1 && + (cvEvent.events & (WL_SOCKET_MASK | WL_POSTMASTER_DEATH)) != 0) + done = true; + /* * If this process has been taken out of the wait list, then we know * that it has been signaled by ConditionVariableSignal (or diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index 22b4278610..ae4a7aecd4 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -440,6 +440,9 @@ InitProcess(void) OwnLatch(&MyProc->procLatch); SwitchToSharedLatch(); + /* Initialize process-local condition variable support */ + InitializeConditionVariableWaitSet(); + /* now that we have a proc, report wait events to shared memory */ pgstat_set_wait_event_storage(&MyProc->wait_event_info); @@ -596,6 +599,9 @@ InitAuxiliaryProcess(void) OwnLatch(&MyProc->procLatch); SwitchToSharedLatch(); + /* Initialize process-local condition variable support */ + InitializeConditionVariableWaitSet(); + /* now that we have a proc, report wait events to shared memory */ pgstat_set_wait_event_storage(&MyProc->wait_event_info); diff --git a/src/backend/utils/init/miscinit.c b/src/backend/utils/init/miscinit.c index a604432126..0545044225 100644 --- a/src/backend/utils/init/miscinit.c +++ b/src/backend/utils/init/miscinit.c @@ -40,6 +40,7 @@ #include "postmaster/interrupt.h" #include "postmaster/pgarch.h" #include "postmaster/postmaster.h" +#include "storage/condition_variable.h" #include "storage/fd.h" #include "storage/ipc.h" #include "storage/latch.h" diff --git a/src/include/storage/condition_variable.h b/src/include/storage/condition_variable.h index 589bdd323c..94adb54b91 100644 --- a/src/include/storage/condition_variable.h +++ b/src/include/storage/condition_variable.h @@ -22,6 +22,7 @@ #ifndef CONDITION_VARIABLE_H #define CONDITION_VARIABLE_H +#include "storage/latch.h" #include "storage/proclist_types.h" #include "storage/spin.h" @@ -42,9 +43,14 @@ typedef union ConditionVariableMinimallyPadded char pad[CV_MINIMAL_SIZE]; } ConditionVariableMinimallyPadded; +extern void InitializeConditionVariableWaitSet(void); + /* Initialize a condition variable. */ extern void ConditionVariableInit(ConditionVariable *cv); +extern WaitEventSet *ConditionVariableWaitSetCreate(MemoryContext mcxt, + int n_socket_events); + /* * To sleep on a condition variable, a process should use a loop which first * checks the condition, exiting the loop if it is met, and then calls @@ -56,6 +62,10 @@ extern void ConditionVariableInit(ConditionVariable *cv); extern void ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info); extern bool ConditionVariableTimedSleep(ConditionVariable *cv, long timeout, uint32 wait_event_info); +extern bool ConditionVariableEventSleep(ConditionVariable *cv, + WaitEventSet *cvEventSet, + long timeout, + uint32 wait_event_info); extern void ConditionVariableCancelSleep(void); /* -- 2.34.1