On Wed Aug 6, 2025 at 7:44 AM -03, Álvaro Herrera wrote:
>> My questions:
>> 
>> 1. Is it acceptable to drop notifications from the async queue if
>> there are no active listeners? There might still be notifications that
>> haven’t been read by any previous listener.
>
> I'm somewhat wary of this idea -- could these inactive listeners become
> active later and expect to be able to read their notifies?
>
I'm bit worry about this too.

>> 2. If the answer to 1 is no, how can we teach VACUUM to respect the
>> minimum xid stored in all AsyncQueueEntries?
>
> Maybe we can have AsyncQueueAdvanceTail return the oldest XID of
> listeners, and back off the pg_clog truncation based on that.  This
> could be done by having a new boolean argument that says to look up the
> XID from the PGPROC using BackendPidGetProc(QUEUE_BACKEND_PID) (which
> would only be passed true by vac_update_datfrozenxid(), to avoid
> overhead by other callers), then collect the oldest of those and return
> it.
>
The problem with only considering the oldest XID of listeners is that
IIUC we may have notifications without listeners, and in this case we
may still get this error because when the LISTEN is executed we loop
through the AsyncQueueEntry's on asyncQueueProcessPageEntries() and we
call TransactionIdDidCommit() that raise the error before
IsListeningOn(channel) is called.

Another option would be to add a minXid field on AsyncQueueControl and
then update this value on asyncQueueProcessPageEntries() and
asyncQueueAddEntries() routines, and then we could check this value on
vac_update_datfrozenxid().

> This does create the problem that an inactive listener could cause the
> XID counter to stay far in the past.  Maybe we could try to avoid this
> by adding more signalling (e.g, AsyncQueueAdvanceTail() itself could
> send PROCSIG_NOTIFY_INTERRUPT signal?), and terminating backends that
> are way overdue on reading notifies.  I'm not sure if this is really
> needed or useful; consider a backend stuck on SIGSTOP (debugger or
> whatever): it will just sit there forever.
>
With this idea that I've proposed we still could have this problem, if a
listener take too long to consume a message we would block vacuum freeze
to advance the xid. For this I think that we could have two GUC's; One
to enable and disable the oldest xmin check on async queue and the
second to control how far we want to prevent the vacuum from freezing
the oldest async queue xid, and if the min xid raises this limit we
ignore and truncate the xid.

I've write a draft patch that plays with the idea, see attached.

--
Matheus Alcantara
From 7ec33dfc14174e9a4424a1738977601a4fd13bf5 Mon Sep 17 00:00:00 2001
From: Matheus Alcantara <mths....@pm.me>
Date: Sat, 9 Aug 2025 13:51:21 -0300
Subject: [PATCH v0] Consider async queue min xid on VACUUM FREEZE

---
 src/backend/commands/async.c  | 58 +++++++++++++++++++++++++++++++++++
 src/backend/commands/vacuum.c | 11 +++++++
 src/include/commands/async.h  |  3 ++
 3 files changed, 72 insertions(+)

diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 4bd37d5beb5..fff055601a1 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -287,6 +287,7 @@ typedef struct AsyncQueueControl
                                                                 * tail.page */
        ProcNumber      firstListener;  /* id of first listener, or
                                                                 * 
INVALID_PROC_NUMBER */
+       TransactionId minXid;
        TimestampTz lastQueueFillWarn;  /* time of last queue-full msg */
        QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER];
 } AsyncQueueControl;
@@ -458,6 +459,9 @@ static uint32 notification_hash(const void *key, Size 
keysize);
 static int     notification_match(const void *key1, const void *key2, Size 
keysize);
 static void ClearPendingActionsAndNotifies(void);
 
+static void StoreEntryXid(TransactionId xid);
+static void ReleaseEntryXid(TransactionId xid);
+
 /*
  * Compute the difference between two queue page numbers.
  * Previously this function accounted for a wraparound.
@@ -521,6 +525,7 @@ AsyncShmemInit(void)
                QUEUE_STOP_PAGE = 0;
                QUEUE_FIRST_LISTENER = INVALID_PROC_NUMBER;
                asyncQueueControl->lastQueueFillWarn = 0;
+               asyncQueueControl->minXid = MaxTransactionId;
                for (int i = 0; i < MaxBackends; i++)
                {
                        QUEUE_BACKEND_PID(i) = InvalidPid;
@@ -1428,6 +1433,12 @@ asyncQueueAddEntries(ListCell *nextNotify)
                           &qe,
                           qe.length);
 
+               /*
+                * Remember the notification xid so that vacuum don't frozen 
after
+                * this xid.
+                */
+               StoreEntryXid(qe.xid);
+
                /* Advance queue_head appropriately, and detect if page is full 
*/
                if (asyncQueueAdvance(&(queue_head), qe.length))
                {
@@ -2077,6 +2088,12 @@ asyncQueueProcessPageEntries(volatile QueuePosition 
*current,
                                        char       *payload = qe->data + 
strlen(channel) + 1;
 
                                        NotifyMyFrontEnd(channel, payload, 
qe->srcPid);
+
+                                       /*
+                                        * Notification was sent, so release 
the notification xid
+                                        * so that vacuum can freeze past this 
notification.
+                                        */
+                                       ReleaseEntryXid(qe->xid);
                                }
                        }
                        else
@@ -2395,3 +2412,44 @@ check_notify_buffers(int *newval, void **extra, 
GucSource source)
 {
        return check_slru_buffers("notify_buffers", newval);
 }
+
+/*
+ *  Return the minimum notification xid on the queue.
+ *
+ *  TODO(matheus): handle when the queue is empty, so we don't get locked in
+ *  the past with a notification that was already sent.
+ */
+TransactionId
+AsyncQueueMinXid()
+{
+       return asyncQueueControl->minXid;
+}
+
+/*
+ *  Subroutine of asyncQueueAddEntries
+ *
+ *  We are holding NotifyQueueLock already from the caller.
+ */
+static void
+StoreEntryXid(TransactionId xid)
+{
+       if (xid < asyncQueueControl->minXid)
+               asyncQueueControl->minXid = xid;
+}
+
+/*
+ * Subroutine of asyncQueueAddEntries
+ *
+ * TODO(matheus): may update the code comment on
+ * asyncQeueuReadAllNotifications() when calling
+ * asyncQueueProcessPageEntries().
+ *
+ */
+static void
+ReleaseEntryXid(TransactionId xid)
+{
+       LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
+       if (xid > asyncQueueControl->minXid)
+               asyncQueueControl->minXid = xid;
+       LWLockRelease(NotifyQueueLock);
+}
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index 733ef40ae7c..f0d6d868e60 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -21,6 +21,7 @@
  *
  *-------------------------------------------------------------------------
  */
+#include "commands/async.h"
 #include "postgres.h"
 
 #include <math.h>
@@ -1739,6 +1740,16 @@ vac_update_datfrozenxid(void)
        if (bogus)
                return;
 
+       /*
+        * We need to check transaction status of notifications before of notify
+        * the client, if there is lag to consume the notifications we need to
+        * consider the older xid of notification on the queue so that the
+        * transaction status can be accessed.
+        *
+        * XXX(matheus): Wrap this behavior into a GUC?
+        */
+       newFrozenXid = Min(newFrozenXid, AsyncQueueMinXid());
+
        Assert(TransactionIdIsNormal(newFrozenXid));
        Assert(MultiXactIdIsValid(newMinMulti));
 
diff --git a/src/include/commands/async.h b/src/include/commands/async.h
index f75c3df9556..512a8976126 100644
--- a/src/include/commands/async.h
+++ b/src/include/commands/async.h
@@ -13,6 +13,7 @@
 #ifndef ASYNC_H
 #define ASYNC_H
 
+#include <c.h>
 #include <signal.h>
 
 extern PGDLLIMPORT bool Trace_notify;
@@ -46,4 +47,6 @@ extern void HandleNotifyInterrupt(void);
 /* process interrupts */
 extern void ProcessNotifyInterrupt(bool flush);
 
+extern TransactionId AsyncQueueMinXid(void);
+
 #endif                                                 /* ASYNC_H */
-- 
2.39.5 (Apple Git-154)

Reply via email to