On Mon Sep 1, 2025 at 11:06 AM -03, Jacques Combrink wrote:
> TLDR:
> active listener on one database causes notify on another database to get
> stuck.
> At no point could I get a stuck notify if I don't have a listener on at
> least one other database than the one I am notifying on. See the Extra
> weirdness section.
> At no point do you need to have any other queries running, there is
> never an idle in transaction query needed for bad timing with the vacuum.
>
> I hope I explained everything well enough so that one of you smart
> people can find and fix the problem.
>
The  long running transaction steps is just an example that we can lose
notifications using the first patch from Daniil that Alex has shared on
[1]. The steps that you've shared is just another way to trigger the
issue but it's similar to the steps that Alex also shared on [1].

All these different ways to trigger the error face the same underlying
problem: If a notification is keep for too long on the queue that vacuum
freeze can run and truncate clog files that contains transaction
information of this notification the error will happen.

The patch that I've attached on [2] aims to fix the issue following the
steps that you've shared, but during the tests I've found a stack
overflow bug on AsyncQueueIterNextNotification() due to the number of
notifications. I'm attaching a new version that fix this bug and I tried
to reproduce your steps with this new version and the issue seems to be
fixed.

Note that notifications that were added without any previous LISTEN will
block the xid advance during VACUUM FREEZE until we have a listener on
the database that owns these notifications. The XXX comment on vacuum.c
is about this problem.

[1] 
https://www.postgresql.org/message-id/CAK98qZ3wZLE-RZJN_Y%2BTFjiTRPPFPBwNBpBi5K5CU8hUHkzDpw%40mail.gmail.com
[2] https://www.postgresql.org/message-id 
CAFY6G8cJm73_MM9SuynZUqtqcaTuepUDgDuvS661oLW7U0dgsg%40mail.gmail.com

--
Matheus Alcantara
From 839b2934fe3d73b78965198d86dba3b97e3696e2 Mon Sep 17 00:00:00 2001
From: Matheus Alcantara <[email protected]>
Date: Tue, 26 Aug 2025 10:09:01 -0300
Subject: [PATCH v2] Consider LISTEN/NOTIFY min xid during VACUUM FREEZE

Previously a listener backend that is delaying to consume notifications
(due to idle in transaction for example), if the VACUUM FREEZE is
executed during this period and drop clog files that contains
transaction information about the notification in the queue, the
listener backend can loose this notification when committing the
transaction:
ERROR:  could not access status of transaction 756
DETAIL:  Could not open file "pg_xact/0000": No such file or directory.

This commit fix this issue by iterating over the queue notifications for
each backend listener and check which is the oldest transaction xid on
the queue and then consider this value during VACUUM FREEZE execution.
---
 src/backend/commands/async.c                  | 261 ++++++++++++++++++
 src/backend/commands/vacuum.c                 |  13 +
 src/include/commands/async.h                  |   3 +
 src/test/modules/Makefile                     |   1 +
 src/test/modules/meson.build                  |   1 +
 src/test/modules/test_listen_notify/Makefile  |  17 ++
 .../modules/test_listen_notify/meson.build    |  13 +
 .../test_listen_notify/t/001_xid_freeze.pl    |  73 +++++
 src/tools/pgindent/typedefs.list              |   1 +
 9 files changed, 383 insertions(+)
 create mode 100644 src/test/modules/test_listen_notify/Makefile
 create mode 100644 src/test/modules/test_listen_notify/meson.build
 create mode 100644 src/test/modules/test_listen_notify/t/001_xid_freeze.pl

diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 4bd37d5beb5..e986a27eb4e 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -401,6 +401,45 @@ struct NotificationHash
        Notification *event;            /* => the actual Notification struct */
 };
 
+/*
+ * A state-based iterator for consuming notifications (AsyncQueueEntry) from 
the async queue.
+ *
+ * Note that the iterator will iterate over the async queue based on the
+ * "current" and "head" positions, it will start at "current" and it will read
+ * until it reach the "head" position. For example, to read notifications for a
+ * specific backend it should just use the QUEUE_BACKEND_POS as a "current"
+ * position starting point and head as QUEUE_HEAD. To read all async queue
+ * notifications just use QUEUE_HEAD as "current".
+ */
+typedef struct AsyncQueueIterator
+{
+       /* Current queue position of iteration. */
+       QueuePosition current;
+
+       /* how far it will read */
+       QueuePosition head;
+
+       /* Current queue entry being read. */
+       AsyncQueueEntry *current_entry;
+
+       /* Snapshot used to decide which xacts are still in progress. */
+       Snapshot        snapshot;
+
+       /* buffer to read pages from SLRU */
+       union
+       {
+               char            buf[QUEUE_PAGESIZE];
+               AsyncQueueEntry align;
+       }                       page_buffer;
+
+       /* Should read a page from SLRU? */
+       bool            read_next_page;
+
+       /* No more entries to read */
+       bool            done;
+} AsyncQueueIterator;
+
+
 static NotificationList *pendingNotifies = NULL;
 
 /*
@@ -458,6 +497,9 @@ static uint32 notification_hash(const void *key, Size 
keysize);
 static int     notification_match(const void *key1, const void *key2, Size 
keysize);
 static void ClearPendingActionsAndNotifies(void);
 
+static void AsyncQueueIterInit(AsyncQueueIterator *iter, QueuePosition 
current, QueuePosition head);
+static AsyncQueueEntry *AsyncQueueIterNextNotification(AsyncQueueIterator 
*iter);
+
 /*
  * Compute the difference between two queue page numbers.
  * Previously this function accounted for a wraparound.
@@ -2395,3 +2437,222 @@ check_notify_buffers(int *newval, void **extra, 
GucSource source)
 {
        return check_slru_buffers("notify_buffers", newval);
 }
+
+
+/*
+ * Initializes an AsyncQueueIterator.
+ *
+ * It sets up the state and gets the initial snapshot.
+ */
+static void
+AsyncQueueIterInit(AsyncQueueIterator *iter, QueuePosition current, 
QueuePosition head)
+{
+       /* Initialize internal state */
+       iter->read_next_page = true;
+       iter->done = false;
+       iter->current = current;
+       iter->head = head;
+
+       /* Get the snapshot we'll use for visibility checks */
+       iter->snapshot = RegisterSnapshot(GetLatestSnapshot());
+}
+
+/*
+ * Returns the next AsyncQueueEntry from the async queue.
+ *
+ * Returns a pointer to the entry on success, otherwise NULL if there are no
+ * more notifications to process, or if an uncommitted notification is found.
+ *
+ * It handles fetching pages from the shared SLRU as needed. The returned
+ * pointer is to a local buffer, so it's only valid until the next call to this
+ * function.
+ */
+static AsyncQueueEntry *
+AsyncQueueIterNextNotification(AsyncQueueIterator *iter)
+{
+       AsyncQueueEntry *qe;
+       QueuePosition thisentry;
+
+       /*
+        * Loop until a valid notification is found or we reach the end of the
+        * queue or an uncommitted transaction.
+        */
+       do
+       {
+               CHECK_FOR_INTERRUPTS();
+
+               /* No more entries to process. */
+               if (iter->done)
+                       return NULL;
+
+               if (QUEUE_POS_EQUAL(iter->current, iter->head))
+               {
+                       /* Nothing to do, the backend don't have any 
notification to read. */
+                       iter->done = true;
+                       return NULL;
+               }
+
+               /*
+                * We need to process a page at a time. If we haven't read the 
current
+                * page yet, or have reached the end of the previous one, read 
the
+                * next page.
+                */
+               if (iter->read_next_page)
+               {
+                       int64           curpage = QUEUE_POS_PAGE(iter->current);
+                       int                     curoffset = 
QUEUE_POS_OFFSET(iter->current);
+                       int                     slotno;
+                       int                     copysize;
+
+
+                       /*
+                        * We copy the data from SLRU into a local buffer, so 
as to avoid
+                        * holding the SLRU lock while we are examining the 
entries and
+                        * possibly transmitting them to our frontend.  Copy 
only the part
+                        * of the page we will actually inspect.
+                        */
+                       slotno = SimpleLruReadPage_ReadOnly(NotifyCtl, curpage, 
InvalidTransactionId);
+
+                       /* Determine how much of the page we need to copy */
+                       if (curpage == QUEUE_POS_PAGE(iter->head))
+                       {
+                               /* We only want to read as far as head */
+                               copysize = QUEUE_POS_OFFSET(iter->head) - 
curoffset;
+                               if (copysize < 0)
+                                       copysize = 0;   /* just for safety */
+                       }
+                       else
+                       {
+                               /* fetch all the rest of the page */
+                               copysize = QUEUE_PAGESIZE - curoffset;
+                       }
+
+                       memcpy(iter->page_buffer.buf,
+                                  NotifyCtl->shared->page_buffer[slotno] + 
curoffset,
+                                  copysize);
+
+                       /* Release lock that we got from 
SimpleLruReadPage_ReadOnly() */
+                       LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage));
+
+                       /*
+                        * Page was read, set to false to next calls will 
consume the
+                        * entries on the read page.
+                        */
+                       iter->read_next_page = false;
+                       iter->current_entry = (AsyncQueueEntry *) 
iter->page_buffer.buf;
+               }
+               else
+               {
+                       /*
+                        * If we are not reading a new page, advance our local 
entry
+                        * pointer to the next message.
+                        */
+                       iter->current_entry = (AsyncQueueEntry *) ((char *) 
iter->current_entry + iter->current_entry->length);
+               }
+
+               thisentry = iter->current;
+
+               /* Read the AsyncQueueEntry within our local buffer. */
+               qe = iter->current_entry;
+
+               Assert(qe->length > 0);
+
+               /*
+                * Advance iter->current over this message. If reached the end 
of the
+                * page set read_page to true to try to read the next one on 
next
+                * call.
+                *
+                * TODO(matheus): Incorporate comments from
+                * asyncQueueProcessPageEntries.
+                */
+               iter->read_next_page = asyncQueueAdvance(&iter->current, 
qe->length);
+
+               /* Ignore messages destined for other databases */
+               if (qe->dboid == MyDatabaseId)
+               {
+                       /* Check for uncommitted transaction and ignore if 
found */
+                       if (XidInMVCCSnapshot(qe->xid, iter->snapshot))
+                       {
+                               /*
+                                * The source transaction is still in progress, 
so we can't
+                                * process this message yet.  Break out of the 
loop, but first
+                                * back up *current so we will reprocess the 
message next
+                                * time. (Note: it is unlikely but not 
impossible for
+                                * TransactionIdDidCommit to fail, so we can't 
really avoid
+                                * this advance-then-back-up behavior when 
dealing with an
+                                * uncommitted message.)
+                                *
+                                * Note that we must test XidInMVCCSnapshot 
before we test
+                                * TransactionIdDidCommit, else we might return 
a message from
+                                * a transaction that is not yet visible to 
snapshots; compare
+                                * the comments at the head of 
heapam_visibility.c.
+                                *
+                                * Also, while our own xact won't be listed in 
the snapshot,
+                                * we need not check for 
TransactionIdIsCurrentTransactionId
+                                * because our transaction cannot (yet) have 
queued any
+                                * messages.
+                                */
+
+                               iter->current = thisentry;
+                               return NULL;
+                       }
+                       else if (TransactionIdDidCommit(qe->xid))
+                       {
+                               /* Found a valid notification */
+                               return qe;
+                       }
+                       else
+                       {
+                               /*
+                                * The source transaction aborted or crashed, 
so we just
+                                * ignore its notifications and go to the next.
+                                */
+                               continue;
+                       }
+               }
+       } while (true);
+}
+
+/*
+ * Cleans up the iterator by unregistering the snapshot.
+ */
+static void
+AsyncQueueIterDestroy(AsyncQueueIterator *iter)
+{
+       UnregisterSnapshot(iter->snapshot);
+}
+
+TransactionId
+AsyncQueueMinXid(void)
+{
+       QueuePosition current;
+       QueuePosition head;
+       AsyncQueueEntry *qe;
+       AsyncQueueIterator iter;
+       TransactionId minXid = MaxTransactionId;
+
+       /*
+        * First advance the global queue tail so we don't need to worry about
+        * notifications already processed by backends.
+        */
+       asyncQueueAdvanceTail();
+
+       /* Fetch current state */
+       LWLockAcquire(NotifyQueueLock, LW_SHARED);
+       current = QUEUE_TAIL;
+       head = QUEUE_HEAD;
+       LWLockRelease(NotifyQueueLock);
+
+       AsyncQueueIterInit(&iter, current, head);
+
+       while ((qe = AsyncQueueIterNextNotification(&iter)) != NULL)
+       {
+               if (qe->xid < minXid)
+                       minXid = qe->xid;
+       }
+
+       AsyncQueueIterDestroy(&iter);
+
+
+       return minXid;
+}
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index 733ef40ae7c..d35d6fc8e8a 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -37,6 +37,7 @@
 #include "catalog/namespace.h"
 #include "catalog/pg_database.h"
 #include "catalog/pg_inherits.h"
+#include "commands/async.h"
 #include "commands/cluster.h"
 #include "commands/defrem.h"
 #include "commands/progress.h"
@@ -1739,6 +1740,18 @@ vac_update_datfrozenxid(void)
        if (bogus)
                return;
 
+       /*
+        * We need to check transaction status of notifications before of notify
+        * the client, if there is lag to consume the notifications we need to
+        * consider the older xid of notification on the queue so that the
+        * transaction status can be accessed.
+        *
+        * XXX(matheus): Maybe add a GUC to prevent lazy listeners or
+        * notifications that were added without listeners to block the VACUUM
+        * FREEZE newFronzenXid advance.
+        */
+       newFrozenXid = Min(newFrozenXid, AsyncQueueMinXid());
+
        Assert(TransactionIdIsNormal(newFrozenXid));
        Assert(MultiXactIdIsValid(newMinMulti));
 
diff --git a/src/include/commands/async.h b/src/include/commands/async.h
index f75c3df9556..3592103a0da 100644
--- a/src/include/commands/async.h
+++ b/src/include/commands/async.h
@@ -46,4 +46,7 @@ extern void HandleNotifyInterrupt(void);
 /* process interrupts */
 extern void ProcessNotifyInterrupt(bool flush);
 
+
+extern TransactionId AsyncQueueMinXid(void);
+
 #endif                                                 /* ASYNC_H */
diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile
index 903a8ac151a..4c0160df341 100644
--- a/src/test/modules/Makefile
+++ b/src/test/modules/Makefile
@@ -28,6 +28,7 @@ SUBDIRS = \
                  test_int128 \
                  test_integerset \
                  test_json_parser \
+                 test_listen_notify \
                  test_lfind \
                  test_misc \
                  test_oat_hooks \
diff --git a/src/test/modules/meson.build b/src/test/modules/meson.build
index 93be0f57289..144379b619b 100644
--- a/src/test/modules/meson.build
+++ b/src/test/modules/meson.build
@@ -27,6 +27,7 @@ subdir('test_ginpostinglist')
 subdir('test_int128')
 subdir('test_integerset')
 subdir('test_json_parser')
+subdir('test_listen_notify')
 subdir('test_lfind')
 subdir('test_misc')
 subdir('test_oat_hooks')
diff --git a/src/test/modules/test_listen_notify/Makefile 
b/src/test/modules/test_listen_notify/Makefile
new file mode 100644
index 00000000000..da1bf5bb1b7
--- /dev/null
+++ b/src/test/modules/test_listen_notify/Makefile
@@ -0,0 +1,17 @@
+# src/test/modules/test_listen_notify/Makefile
+
+MODULE = test_listen_notify
+PGFILEDESC = "test_listen_notify - regression testing for LISTEN/NOTIFY 
support"
+
+TAP_TESTS = 1
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = src/test/modules/test_listen_notify
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/src/test/modules/test_listen_notify/meson.build 
b/src/test/modules/test_listen_notify/meson.build
new file mode 100644
index 00000000000..8119e6c761f
--- /dev/null
+++ b/src/test/modules/test_listen_notify/meson.build
@@ -0,0 +1,13 @@
+# Copyright (c) 2022-2025, PostgreSQL Global Development Group
+
+tests += {
+  'name': 'test_listen_notify',
+  'sd': meson.current_source_dir(),
+  'bd': meson.current_build_dir(),
+  'tap': {
+    'tests': [
+      't/001_xid_freeze.pl',
+    ],
+  },
+}
+
diff --git a/src/test/modules/test_listen_notify/t/001_xid_freeze.pl 
b/src/test/modules/test_listen_notify/t/001_xid_freeze.pl
new file mode 100644
index 00000000000..79dcd73ed65
--- /dev/null
+++ b/src/test/modules/test_listen_notify/t/001_xid_freeze.pl
@@ -0,0 +1,73 @@
+# Copyright (c) 2024-2025, PostgreSQL Global Development Group
+
+use strict;
+use warnings FATAL => 'all';
+use File::Path qw(mkpath);
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+my $node = PostgreSQL::Test::Cluster->new('node');
+$node->init;
+$node->start;
+
+# Setup
+$node->safe_psql('postgres', 'CREATE EXTENSION xid_wraparound');
+$node->safe_psql('postgres',
+       'CREATE TABLE t AS SELECT g AS a, g+2 AS b from 
generate_series(1,100000) g;'
+);
+$node->safe_psql('postgres',
+       'ALTER DATABASE template0 WITH ALLOW_CONNECTIONS true');
+
+# --- Start Session 1 and leave it idle in transaction
+my $psql_session1 = $node->background_psql('postgres');
+$psql_session1->query_safe('listen s;', "Session 1 listens to 's'");
+$psql_session1->query_safe('begin;', "Session 1 starts a transaction");
+
+# --- Session 2, multiple notify's, and commit ---
+for my $i (1 .. 10)
+{
+       $node->safe_psql(
+               'postgres', "
+               BEGIN;
+               NOTIFY s, '$i';
+               COMMIT;");
+}
+
+# Consume enough XIDs to trigger truncation
+$node->safe_psql('postgres', 'select consume_xids(10000000);');
+
+# Execute update so the frozen xid of "t" table is updated to a xid greater
+# than consume_xids() result
+$node->safe_psql('postgres', 'UPDATE t SET a = a+b;');
+
+# Remember current datfrozenxid before vacuum freeze to ensure that it is 
advanced.
+my $datafronzenxid = $node->safe_psql('postgres', "select datfrozenxid from 
pg_database where datname = 'postgres'");
+
+# Execute vacuum freeze on all databases
+$node->command_ok([ 'vacuumdb', '--all', '--freeze', '--port', $node->port ],
+       "vacuumdb --all --freeze");
+
+# Get the new datfrozenxid after vacuum freeze to ensure that is advanced but
+# we can still get the notification status of the notification
+my $datafronzenxid_partial_freeze = $node->safe_psql('postgres', "select 
datfrozenxid from pg_database where datname = 'postgres'");
+ok($datafronzenxid_partial_freeze > $datafronzenxid, 'datfrozenxid is 
partially advanced');
+
+# On Session 1, commit and ensure that the all notifications is received
+my $res = $psql_session1->query_safe('commit;', "commit listen s;");
+my $notifications_count = 0;
+foreach my $i (split('\n', $res))
+{
+       $notifications_count++;
+       like($i, qr/Asynchronous notification "s" with payload 
"$notifications_count" received/);
+}
+is($notifications_count, 10, 'received all committed notifications');
+
+# Execute vacuum freeze on all databases again and ensure that datfrozenxid is 
fully advanced.
+$node->command_ok([ 'vacuumdb', '--all', '--freeze', '--port', $node->port ],
+       "vacuumdb --all --freeze");
+
+my $datafronzenxid_freeze = $node->safe_psql('postgres', "select datfrozenxid 
from pg_database where datname = 'postgres'");
+ok($datafronzenxid_freeze > $datafronzenxid_partial_freeze, 'datfrozenxid is 
advanced after notification is consumed');
+
+done_testing();
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index a13e8162890..14684584cff 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -158,6 +158,7 @@ ArrayToken
 ArrayType
 AsyncQueueControl
 AsyncQueueEntry
+AsyncQueueIterator
 AsyncRequest
 AttInMetadata
 AttStatsSlot
-- 
2.39.5 (Apple Git-154)

Reply via email to