On Wed Aug 6, 2025 at 7:44 AM -03, Álvaro Herrera wrote: >> My questions: >> >> 1. Is it acceptable to drop notifications from the async queue if >> there are no active listeners? There might still be notifications that >> haven’t been read by any previous listener. > > I'm somewhat wary of this idea -- could these inactive listeners become > active later and expect to be able to read their notifies? > I'm bit worry about this too.
>> 2. If the answer to 1 is no, how can we teach VACUUM to respect the >> minimum xid stored in all AsyncQueueEntries? > > Maybe we can have AsyncQueueAdvanceTail return the oldest XID of > listeners, and back off the pg_clog truncation based on that. This > could be done by having a new boolean argument that says to look up the > XID from the PGPROC using BackendPidGetProc(QUEUE_BACKEND_PID) (which > would only be passed true by vac_update_datfrozenxid(), to avoid > overhead by other callers), then collect the oldest of those and return > it. > The problem with only considering the oldest XID of listeners is that IIUC we may have notifications without listeners, and in this case we may still get this error because when the LISTEN is executed we loop through the AsyncQueueEntry's on asyncQueueProcessPageEntries() and we call TransactionIdDidCommit() that raise the error before IsListeningOn(channel) is called. Another option would be to add a minXid field on AsyncQueueControl and then update this value on asyncQueueProcessPageEntries() and asyncQueueAddEntries() routines, and then we could check this value on vac_update_datfrozenxid(). > This does create the problem that an inactive listener could cause the > XID counter to stay far in the past. Maybe we could try to avoid this > by adding more signalling (e.g, AsyncQueueAdvanceTail() itself could > send PROCSIG_NOTIFY_INTERRUPT signal?), and terminating backends that > are way overdue on reading notifies. I'm not sure if this is really > needed or useful; consider a backend stuck on SIGSTOP (debugger or > whatever): it will just sit there forever. > With this idea that I've proposed we still could have this problem, if a listener take too long to consume a message we would block vacuum freeze to advance the xid. For this I think that we could have two GUC's; One to enable and disable the oldest xmin check on async queue and the second to control how far we want to prevent the vacuum from freezing the oldest async queue xid, and if the min xid raises this limit we ignore and truncate the xid. I've write a draft patch that plays with the idea, see attached. -- Matheus Alcantara
From 7ec33dfc14174e9a4424a1738977601a4fd13bf5 Mon Sep 17 00:00:00 2001 From: Matheus Alcantara <mths....@pm.me> Date: Sat, 9 Aug 2025 13:51:21 -0300 Subject: [PATCH v0] Consider async queue min xid on VACUUM FREEZE --- src/backend/commands/async.c | 58 +++++++++++++++++++++++++++++++++++ src/backend/commands/vacuum.c | 11 +++++++ src/include/commands/async.h | 3 ++ 3 files changed, 72 insertions(+) diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index 4bd37d5beb5..fff055601a1 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -287,6 +287,7 @@ typedef struct AsyncQueueControl * tail.page */ ProcNumber firstListener; /* id of first listener, or * INVALID_PROC_NUMBER */ + TransactionId minXid; TimestampTz lastQueueFillWarn; /* time of last queue-full msg */ QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER]; } AsyncQueueControl; @@ -458,6 +459,9 @@ static uint32 notification_hash(const void *key, Size keysize); static int notification_match(const void *key1, const void *key2, Size keysize); static void ClearPendingActionsAndNotifies(void); +static void StoreEntryXid(TransactionId xid); +static void ReleaseEntryXid(TransactionId xid); + /* * Compute the difference between two queue page numbers. * Previously this function accounted for a wraparound. @@ -521,6 +525,7 @@ AsyncShmemInit(void) QUEUE_STOP_PAGE = 0; QUEUE_FIRST_LISTENER = INVALID_PROC_NUMBER; asyncQueueControl->lastQueueFillWarn = 0; + asyncQueueControl->minXid = MaxTransactionId; for (int i = 0; i < MaxBackends; i++) { QUEUE_BACKEND_PID(i) = InvalidPid; @@ -1428,6 +1433,12 @@ asyncQueueAddEntries(ListCell *nextNotify) &qe, qe.length); + /* + * Remember the notification xid so that vacuum don't frozen after + * this xid. + */ + StoreEntryXid(qe.xid); + /* Advance queue_head appropriately, and detect if page is full */ if (asyncQueueAdvance(&(queue_head), qe.length)) { @@ -2077,6 +2088,12 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current, char *payload = qe->data + strlen(channel) + 1; NotifyMyFrontEnd(channel, payload, qe->srcPid); + + /* + * Notification was sent, so release the notification xid + * so that vacuum can freeze past this notification. + */ + ReleaseEntryXid(qe->xid); } } else @@ -2395,3 +2412,44 @@ check_notify_buffers(int *newval, void **extra, GucSource source) { return check_slru_buffers("notify_buffers", newval); } + +/* + * Return the minimum notification xid on the queue. + * + * TODO(matheus): handle when the queue is empty, so we don't get locked in + * the past with a notification that was already sent. + */ +TransactionId +AsyncQueueMinXid() +{ + return asyncQueueControl->minXid; +} + +/* + * Subroutine of asyncQueueAddEntries + * + * We are holding NotifyQueueLock already from the caller. + */ +static void +StoreEntryXid(TransactionId xid) +{ + if (xid < asyncQueueControl->minXid) + asyncQueueControl->minXid = xid; +} + +/* + * Subroutine of asyncQueueAddEntries + * + * TODO(matheus): may update the code comment on + * asyncQeueuReadAllNotifications() when calling + * asyncQueueProcessPageEntries(). + * + */ +static void +ReleaseEntryXid(TransactionId xid) +{ + LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); + if (xid > asyncQueueControl->minXid) + asyncQueueControl->minXid = xid; + LWLockRelease(NotifyQueueLock); +} diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c index 733ef40ae7c..f0d6d868e60 100644 --- a/src/backend/commands/vacuum.c +++ b/src/backend/commands/vacuum.c @@ -21,6 +21,7 @@ * *------------------------------------------------------------------------- */ +#include "commands/async.h" #include "postgres.h" #include <math.h> @@ -1739,6 +1740,16 @@ vac_update_datfrozenxid(void) if (bogus) return; + /* + * We need to check transaction status of notifications before of notify + * the client, if there is lag to consume the notifications we need to + * consider the older xid of notification on the queue so that the + * transaction status can be accessed. + * + * XXX(matheus): Wrap this behavior into a GUC? + */ + newFrozenXid = Min(newFrozenXid, AsyncQueueMinXid()); + Assert(TransactionIdIsNormal(newFrozenXid)); Assert(MultiXactIdIsValid(newMinMulti)); diff --git a/src/include/commands/async.h b/src/include/commands/async.h index f75c3df9556..512a8976126 100644 --- a/src/include/commands/async.h +++ b/src/include/commands/async.h @@ -13,6 +13,7 @@ #ifndef ASYNC_H #define ASYNC_H +#include <c.h> #include <signal.h> extern PGDLLIMPORT bool Trace_notify; @@ -46,4 +47,6 @@ extern void HandleNotifyInterrupt(void); /* process interrupts */ extern void ProcessNotifyInterrupt(bool flush); +extern TransactionId AsyncQueueMinXid(void); + #endif /* ASYNC_H */ -- 2.39.5 (Apple Git-154)