On Wed Oct 22, 2025 at 1:31 AM -03, Masahiko Sawada wrote:
> On Tue, Oct 21, 2025 at 4:16 PM Matheus Alcantara
>> > I think adding a new GUC would be overkill for this fix. As for
>> > dropping old notifications from the queue, we probably don't need to
>> > make it configurable - we could simply drop notifications whose commit
>> > status is no longer available (instead of raising an error).
>> >
>> IIUC this is about not making the vacuum freeze considering the oldest
>> xid on the queue but just remove notifications whose transaction status
>> is no longer available right? Since currently when the error happens we
>> already can't process the notifications it seems a reasonable way to go
>> IMO.
>
> On second thought, simply hiding the error would be worse than our
> current behavior. Users wouldn't know their notifications are being
> dropped, as they often don't check WARNINGs. The more frequently they
> try to freeze XIDs, the more notifications they'd lose. To avoid
> silent discards, they would need to increase
> autovacuum_vacuum_max_freeze_age to accommodate more clog entries, but
> this increases the risk of XID wraparound. I think the proposed
> approach modifying the vacuum freeze to consider the oldest XID on the
> queue would be better. This has a downside as I mentioned: processes
> in idle-in-transaction state even without backend_xmin and backend_xid
> can still accumulate unconsumed notifications. However, leaving
> transactions in idle-in-transaction state for a long time is bad
> practice anyway. While we might want to consider adding a safeguard
> for this case, I guess it would rarely occur in practice.
>
I'm attaching a v9 patch which is based on the idea of changing the
vacuum freeze to consider the oldest xid on the listen/notify queue. The
0001 patch is from Joel that it was previously sent on [1] with some
small tweaks and the 0002 is the TAP tests introduced on the previously
versions by me and by Arseniy. I keep it separate because I'm not sure
if it's all suitable for back-pacthing.

I'm wondering if the 002_aborted_tx_notifies.pl is still needed with
this architecture being used. I think that it's not, but perhaps is a
good test to keep it?

[1] 
https://www.postgresql.org/message-id/25651193-da4e-4185-a564-f2efa6b0c8a4%40app.fastmail.com

--
Matheus Alcantara

From a7e3f43f3091520ff6dcaa7d0b0a3b5a74729f19 Mon Sep 17 00:00:00 2001
From: Joel Jacobson <[email protected]>
Date: Sun, 19 Oct 2025 18:55:25 +0200
Subject: [PATCH v9 1/2] Prevent VACUUM from truncating XIDs still present in
 notification queue

VACUUM's computation of datfrozenxid did not account for transaction IDs
in the LISTEN/NOTIFY queue.  This allowed VACUUM to truncate clog
entries for XIDs that were still referenced by queued notifications,
causing backends to fail in TransactionIdDidCommit when later processing
those notifications.

Fix by adding GetOldestQueuedNotifyXid to find the oldest XID in queued
notifications for the current database, and constraining datfrozenxid to
not pass that.  The function scans from QUEUE_TAIL, since notifications
may have been written before any listeners existed.

To avoid code duplication, refactor SLRU page-reading code into a new
helper function asyncQueueReadPageToBuffer.
---
 src/backend/commands/async.c  | 139 ++++++++++++++++++++++++++++------
 src/backend/commands/vacuum.c |  12 +++
 src/include/commands/async.h  |   3 +
 3 files changed, 130 insertions(+), 24 deletions(-)

diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 4bd37d5beb5..7c9d7831c9f 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -1841,6 +1841,44 @@ ProcessNotifyInterrupt(bool flush)
                ProcessIncomingNotify(flush);
 }
 
+/*
+ * Read a page from the SLRU queue into a local buffer.
+ *
+ * Reads the page containing 'pos', copying the data from the current offset
+ * either to the end of the page or up to 'head' (whichever comes first)
+ * into page_buffer.
+ */
+static void
+asyncQueueReadPageToBuffer(QueuePosition pos, QueuePosition head,
+                                                  char *page_buffer)
+{
+       int64           curpage = QUEUE_POS_PAGE(pos);
+       int                     curoffset = QUEUE_POS_OFFSET(pos);
+       int                     slotno;
+       int                     copysize;
+
+       slotno = SimpleLruReadPage_ReadOnly(NotifyCtl, curpage,
+                                                                               
InvalidTransactionId);
+
+       if (curpage == QUEUE_POS_PAGE(head))
+       {
+               /* we only want to read as far as head */
+               copysize = QUEUE_POS_OFFSET(head) - curoffset;
+               if (copysize < 0)
+                       copysize = 0;           /* just for safety */
+       }
+       else
+       {
+               /* fetch all the rest of the page */
+               copysize = QUEUE_PAGESIZE - curoffset;
+       }
+
+       memcpy(page_buffer + curoffset,
+                  NotifyCtl->shared->page_buffer[slotno] + curoffset,
+                  copysize);
+       /* Release lock that we got from SimpleLruReadPage_ReadOnly() */
+       LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage));
+}
 
 /*
  * Read all pending notifications from the queue, and deliver appropriate
@@ -1932,36 +1970,13 @@ asyncQueueReadAllNotifications(void)
 
                do
                {
-                       int64           curpage = QUEUE_POS_PAGE(pos);
-                       int                     curoffset = 
QUEUE_POS_OFFSET(pos);
-                       int                     slotno;
-                       int                     copysize;
-
                        /*
                         * We copy the data from SLRU into a local buffer, so 
as to avoid
                         * holding the SLRU lock while we are examining the 
entries and
                         * possibly transmitting them to our frontend.  Copy 
only the part
                         * of the page we will actually inspect.
                         */
-                       slotno = SimpleLruReadPage_ReadOnly(NotifyCtl, curpage,
-                                                                               
                InvalidTransactionId);
-                       if (curpage == QUEUE_POS_PAGE(head))
-                       {
-                               /* we only want to read as far as head */
-                               copysize = QUEUE_POS_OFFSET(head) - curoffset;
-                               if (copysize < 0)
-                                       copysize = 0;   /* just for safety */
-                       }
-                       else
-                       {
-                               /* fetch all the rest of the page */
-                               copysize = QUEUE_PAGESIZE - curoffset;
-                       }
-                       memcpy(page_buffer.buf + curoffset,
-                                  NotifyCtl->shared->page_buffer[slotno] + 
curoffset,
-                                  copysize);
-                       /* Release lock that we got from 
SimpleLruReadPage_ReadOnly() */
-                       LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage));
+                       asyncQueueReadPageToBuffer(pos, head, page_buffer.buf);
 
                        /*
                         * Process messages up to the stop position, end of 
page, or an
@@ -2097,6 +2112,82 @@ asyncQueueProcessPageEntries(volatile QueuePosition 
*current,
        return reachedStop;
 }
 
+/*
+ * Get the oldest XID in the notification queue that has not yet been
+ * processed by all listening backends.
+ *
+ * Returns InvalidTransactionId if there are no unprocessed notifications.
+ */
+TransactionId
+GetOldestQueuedNotifyXid(void)
+{
+       QueuePosition pos;
+       QueuePosition head;
+       TransactionId oldestXid = InvalidTransactionId;
+
+       /* page_buffer must be adequately aligned, so use a union */
+       union
+       {
+               char            buf[QUEUE_PAGESIZE];
+               AsyncQueueEntry align;
+       }                       page_buffer;
+
+       LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
+
+       /*
+        * We must start at QUEUE_TAIL since notification data might have been
+        * written before there were any listening backends.
+        */
+       pos = QUEUE_TAIL;
+       head = QUEUE_HEAD;
+
+       /* If the queue is empty, no XIDs need protection */
+       if (QUEUE_POS_EQUAL(pos, head))
+       {
+               LWLockRelease(NotifyQueueLock);
+               return InvalidTransactionId;
+       }
+
+       while (!QUEUE_POS_EQUAL(pos, head))
+       {
+               int                     curoffset;
+               AsyncQueueEntry *qe;
+
+               /* Read the current page from SLRU into our local buffer */
+               asyncQueueReadPageToBuffer(pos, head, page_buffer.buf);
+
+               curoffset = QUEUE_POS_OFFSET(pos);
+
+               /* Process all entries on this page up to head */
+               while (curoffset + QUEUEALIGN(AsyncQueueEntryEmptySize) <= 
QUEUE_PAGESIZE &&
+                          !QUEUE_POS_EQUAL(pos, head))
+               {
+                       qe = (AsyncQueueEntry *) (page_buffer.buf + curoffset);
+
+                       /*
+                        * Check if this entry is for our database and has a 
valid XID.
+                        * Only entries for our database matter for our 
datfrozenxid.
+                        */
+                       if (qe->dboid == MyDatabaseId && 
TransactionIdIsValid(qe->xid))
+                       {
+                               if (!TransactionIdIsValid(oldestXid) ||
+                                       TransactionIdPrecedes(qe->xid, 
oldestXid))
+                                       oldestXid = qe->xid;
+                       }
+
+                       /* Advance to next entry */
+                       if (asyncQueueAdvance(&pos, qe->length))
+                               break;                  /* advanced to next 
page */
+
+                       curoffset = QUEUE_POS_OFFSET(pos);
+               }
+       }
+
+       LWLockRelease(NotifyQueueLock);
+
+       return oldestXid;
+}
+
 /*
  * Advance the shared queue tail variable to the minimum of all the
  * per-backend tail pointers.  Truncate pg_notify space if possible.
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index ed03e3bd50d..6c601ce81aa 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -37,6 +37,7 @@
 #include "catalog/namespace.h"
 #include "catalog/pg_database.h"
 #include "catalog/pg_inherits.h"
+#include "commands/async.h"
 #include "commands/cluster.h"
 #include "commands/defrem.h"
 #include "commands/progress.h"
@@ -1617,6 +1618,7 @@ vac_update_datfrozenxid(void)
        bool            dirty = false;
        ScanKeyData key[1];
        void       *inplace_state;
+       TransactionId oldestNotifyXid;
 
        /*
         * Restrict this task to one backend per database.  This avoids race
@@ -1733,6 +1735,16 @@ vac_update_datfrozenxid(void)
        if (bogus)
                return;
 
+       /*
+        * Also consider the oldest XID in the notification queue, since 
backends
+        * will need to call TransactionIdDidCommit() on those XIDs when
+        * processing the notifications.
+        */
+       oldestNotifyXid = GetOldestQueuedNotifyXid();
+       if (TransactionIdIsValid(oldestNotifyXid) &&
+               TransactionIdPrecedes(oldestNotifyXid, newFrozenXid))
+               newFrozenXid = oldestNotifyXid;
+
        Assert(TransactionIdIsNormal(newFrozenXid));
        Assert(MultiXactIdIsValid(newMinMulti));
 
diff --git a/src/include/commands/async.h b/src/include/commands/async.h
index f75c3df9556..ac323ada492 100644
--- a/src/include/commands/async.h
+++ b/src/include/commands/async.h
@@ -26,6 +26,9 @@ extern void NotifyMyFrontEnd(const char *channel,
                                                         const char *payload,
                                                         int32 srcPid);
 
+/* get oldest XID in the notification queue for vacuum freeze */
+extern TransactionId GetOldestQueuedNotifyXid(void);
+
 /* notify-related SQL statements */
 extern void Async_Notify(const char *channel, const char *payload);
 extern void Async_Listen(const char *channel);
-- 
2.51.0

From b4c7c43a6c3f2b573f66f4e5e72f6e485446af3b Mon Sep 17 00:00:00 2001
From: Matheus Alcantara <[email protected]>
Date: Wed, 22 Oct 2025 11:06:18 -0300
Subject: [PATCH v9 2/2] Add tap tests for listen notify vacuum freeze

---
 src/test/modules/Makefile                     |  1 +
 src/test/modules/meson.build                  |  1 +
 src/test/modules/test_listen_notify/Makefile  | 19 +++++
 .../modules/test_listen_notify/meson.build    | 14 ++++
 .../test_listen_notify/t/001_xid_freeze.pl    | 74 +++++++++++++++++
 .../t/002_aborted_tx_notifies.pl              | 79 +++++++++++++++++++
 6 files changed, 188 insertions(+)
 create mode 100644 src/test/modules/test_listen_notify/Makefile
 create mode 100644 src/test/modules/test_listen_notify/meson.build
 create mode 100644 src/test/modules/test_listen_notify/t/001_xid_freeze.pl
 create mode 100644 
src/test/modules/test_listen_notify/t/002_aborted_tx_notifies.pl

diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile
index 902a7954101..a015c961d35 100644
--- a/src/test/modules/Makefile
+++ b/src/test/modules/Makefile
@@ -29,6 +29,7 @@ SUBDIRS = \
                  test_int128 \
                  test_integerset \
                  test_json_parser \
+                 test_listen_notify \
                  test_lfind \
                  test_lwlock_tranches \
                  test_misc \
diff --git a/src/test/modules/meson.build b/src/test/modules/meson.build
index 14fc761c4cf..6af33448d7b 100644
--- a/src/test/modules/meson.build
+++ b/src/test/modules/meson.build
@@ -28,6 +28,7 @@ subdir('test_ginpostinglist')
 subdir('test_int128')
 subdir('test_integerset')
 subdir('test_json_parser')
+subdir('test_listen_notify')
 subdir('test_lfind')
 subdir('test_lwlock_tranches')
 subdir('test_misc')
diff --git a/src/test/modules/test_listen_notify/Makefile 
b/src/test/modules/test_listen_notify/Makefile
new file mode 100644
index 00000000000..c1eb4fde370
--- /dev/null
+++ b/src/test/modules/test_listen_notify/Makefile
@@ -0,0 +1,19 @@
+# src/test/modules/test_listen_notify/Makefile
+
+MODULE = test_listen_notify
+PGFILEDESC = "test_listen_notify - regression testing for LISTEN/NOTIFY 
support"
+
+TAP_TESTS = 1
+
+EXTRA_INSTALL=src/test/modules/xid_wraparound
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = src/test/modules/test_listen_notify
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/src/test/modules/test_listen_notify/meson.build 
b/src/test/modules/test_listen_notify/meson.build
new file mode 100644
index 00000000000..a68052cd353
--- /dev/null
+++ b/src/test/modules/test_listen_notify/meson.build
@@ -0,0 +1,14 @@
+# Copyright (c) 2022-2025, PostgreSQL Global Development Group
+
+tests += {
+  'name': 'test_listen_notify',
+  'sd': meson.current_source_dir(),
+  'bd': meson.current_build_dir(),
+  'tap': {
+    'tests': [
+      't/001_xid_freeze.pl',
+      't/002_aborted_tx_notifies.pl'
+    ],
+  },
+}
+
diff --git a/src/test/modules/test_listen_notify/t/001_xid_freeze.pl 
b/src/test/modules/test_listen_notify/t/001_xid_freeze.pl
new file mode 100644
index 00000000000..a8bbd268c0f
--- /dev/null
+++ b/src/test/modules/test_listen_notify/t/001_xid_freeze.pl
@@ -0,0 +1,74 @@
+# Copyright (c) 2024-2025, PostgreSQL Global Development Group
+
+use strict;
+use warnings FATAL => 'all';
+use File::Path qw(mkpath);
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+my $node = PostgreSQL::Test::Cluster->new('node');
+$node->init;
+$node->start;
+
+# Check if the extension xid_wraparound is available, as it may be
+# possible that this script is run with installcheck, where the module
+# would not be installed by default.
+if (!$node->check_extension('xid_wraparound'))
+{
+       plan skip_all => 'Extension xid_wraparound not installed';
+}
+
+# Setup
+$node->safe_psql('postgres', 'CREATE EXTENSION xid_wraparound');
+$node->safe_psql('postgres',
+       'CREATE TABLE t AS SELECT g AS a, g+2 AS b from 
generate_series(1,100000) g;'
+);
+$node->safe_psql('postgres',
+       'ALTER DATABASE template0 WITH ALLOW_CONNECTIONS true');
+
+# --- Start Session 1 and leave it idle in transaction
+my $psql_session1 = $node->background_psql('postgres');
+$psql_session1->query_safe('listen s;', "Session 1 listens to 's'");
+$psql_session1->query_safe('begin;', "Session 1 starts a transaction");
+
+# --- Session 2, multiple notify's, and commit ---
+for my $i (1 .. 10)
+{
+       $node->safe_psql(
+               'postgres', "
+               BEGIN;
+               NOTIFY s, '$i';
+               COMMIT;");
+}
+
+# Consume enough XIDs to trigger truncation
+$node->safe_psql('postgres', 'select consume_xids(10000000);');
+
+# Execute update so the frozen xid of "t" table is updated to a xid greater
+# than consume_xids() result
+$node->safe_psql('postgres', 'UPDATE t SET a = a+b;');
+
+# Remember current datfrozenxid before vacuum freeze to ensure that it is 
advanced.
+my $datafronzenxid = $node->safe_psql('postgres', "select datfrozenxid from 
pg_database where datname = 'postgres'");
+
+# Execute vacuum freeze on all databases
+$node->command_ok([ 'vacuumdb', '--all', '--freeze', '--port', $node->port ],
+       "vacuumdb --all --freeze");
+
+# Get the new datfrozenxid after vacuum freeze to ensure that is advanced but
+# we can still get the notification status of the notification
+my $datafronzenxid_freeze = $node->safe_psql('postgres', "select datfrozenxid 
from pg_database where datname = 'postgres'");
+ok($datafronzenxid_freeze > $datafronzenxid, 'datfrozenxid is advanced');
+
+# On Session 1, commit and ensure that the all notifications is received
+my $res = $psql_session1->query_safe('commit;', "commit listen s;");
+my $notifications_count = 0;
+foreach my $i (split('\n', $res))
+{
+       $notifications_count++;
+       like($i, qr/Asynchronous notification "s" with payload 
"$notifications_count" received/);
+}
+is($notifications_count, 10, 'received all committed notifications');
+
+done_testing();
diff --git a/src/test/modules/test_listen_notify/t/002_aborted_tx_notifies.pl 
b/src/test/modules/test_listen_notify/t/002_aborted_tx_notifies.pl
new file mode 100644
index 00000000000..dae7a24f5b2
--- /dev/null
+++ b/src/test/modules/test_listen_notify/t/002_aborted_tx_notifies.pl
@@ -0,0 +1,79 @@
+# Copyright (c) 2024-2025, PostgreSQL Global Development Group
+
+use strict;
+use warnings FATAL => 'all';
+use File::Path qw(mkpath);
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Test checks that listeners do not receive notifications from aborted
+# transaction even if notifications have been added to the listen/notify
+# queue. To reproduce it we use the fact that serializable conflicts
+# are checked after tx adds notifications to the queue.
+
+my $node = PostgreSQL::Test::Cluster->new('node');
+$node->init;
+$node->start;
+
+# Setup
+$node->safe_psql('postgres', 'CREATE TABLE t1 (a bigserial);');
+
+# Listener
+my $psql_listener = $node->background_psql('postgres');
+$psql_listener->query_safe('LISTEN ch;');
+
+# Session1. Start SERIALIZABLE tx and add a notification.
+my $psql_session1 = $node->background_psql('postgres');
+$psql_session1->query_safe("
+       BEGIN ISOLATION LEVEL SERIALIZABLE;
+       SELECT * FROM t1;
+       INSERT INTO t1 DEFAULT VALUES;
+       NOTIFY ch,'committed_0';
+       NOTIFY ch,'committed_1';
+");
+
+# Session2. Start SERIALIZABLE tx, add a notification and introduce a conflict
+# with session1.
+my $psql_session2 = $node->background_psql('postgres', on_error_stop => 0);
+$psql_session2->query_safe("
+       BEGIN ISOLATION LEVEL SERIALIZABLE;
+       SELECT * FROM t1;
+       INSERT INTO t1 DEFAULT VALUES;
+");
+
+# Send notifications that should not be eventually delivered, as session2
+# transaction will be aborted.
+my $message = 'aborted_' . 'a' x 1000;
+for (my $i = 0; $i < 10; $i++) {
+    $psql_session2->query_safe("NOTIFY ch, '$i$message'");
+}
+
+# Session1 should be committed successfully. Listeners must receive session1
+# notifications.
+$psql_session1->query_safe("COMMIT;");
+
+# Session2 should be aborted due to the conflict with session1. Transaction
+# is aborted after adding notifications to the listen/notify queue, but
+# listeners should not receive session2 notifications.
+$psql_session2->query("COMMIT;");
+
+# send more notifications after aborted
+$node->safe_psql('postgres', "NOTIFY ch, 'committed_2';");
+$node->safe_psql('postgres', "NOTIFY ch, 'committed_3';");
+
+# fetch notifications
+my $res = $psql_listener->query_safe('begin; commit;');
+
+# check received notifications
+my @lines = split('\n', $res);
+is(@lines, 4, 'received all committed notifications');
+for (my $i = 0; $i < 4; $i++) {
+    like($lines[$i], qr/Asynchronous notification "ch" with payload 
"committed_$i" received/);
+}
+
+ok($psql_listener->quit);
+ok($psql_session1->quit);
+ok($psql_session2->quit);
+
+done_testing();
-- 
2.51.0

Reply via email to