On Wed Oct 22, 2025 at 12:49 PM -03, Álvaro Herrera wrote:
> On 2025-Oct-22, Matheus Alcantara wrote:
>
>> I'm wondering if the 002_aborted_tx_notifies.pl is still needed with
>> this architecture being used. I think that it's not, but perhaps is a
>> good test to keep it?
>
> I'd rather have tests than not, but I'd think it needs to be behind
> PG_TEST_EXTRA because of things like
>
> +$node->safe_psql('postgres', 'select consume_xids(10000000);');
>
Attached v10 with wrapping into PG_TEST_EXTRA. Should we enable this
somewhere to be executed on build farm?

--
Matheus Alcantara

From a7e3f43f3091520ff6dcaa7d0b0a3b5a74729f19 Mon Sep 17 00:00:00 2001
From: Joel Jacobson <[email protected]>
Date: Sun, 19 Oct 2025 18:55:25 +0200
Subject: [PATCH v10 1/2] Prevent VACUUM from truncating XIDs still present in
 notification queue

VACUUM's computation of datfrozenxid did not account for transaction IDs
in the LISTEN/NOTIFY queue.  This allowed VACUUM to truncate clog
entries for XIDs that were still referenced by queued notifications,
causing backends to fail in TransactionIdDidCommit when later processing
those notifications.

Fix by adding GetOldestQueuedNotifyXid to find the oldest XID in queued
notifications for the current database, and constraining datfrozenxid to
not pass that.  The function scans from QUEUE_TAIL, since notifications
may have been written before any listeners existed.

To avoid code duplication, refactor SLRU page-reading code into a new
helper function asyncQueueReadPageToBuffer.
---
 src/backend/commands/async.c  | 139 ++++++++++++++++++++++++++++------
 src/backend/commands/vacuum.c |  12 +++
 src/include/commands/async.h  |   3 +
 3 files changed, 130 insertions(+), 24 deletions(-)

diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 4bd37d5beb5..7c9d7831c9f 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -1841,6 +1841,44 @@ ProcessNotifyInterrupt(bool flush)
                ProcessIncomingNotify(flush);
 }
 
+/*
+ * Read a page from the SLRU queue into a local buffer.
+ *
+ * Reads the page containing 'pos', copying the data from the current offset
+ * either to the end of the page or up to 'head' (whichever comes first)
+ * into page_buffer.
+ */
+static void
+asyncQueueReadPageToBuffer(QueuePosition pos, QueuePosition head,
+                                                  char *page_buffer)
+{
+       int64           curpage = QUEUE_POS_PAGE(pos);
+       int                     curoffset = QUEUE_POS_OFFSET(pos);
+       int                     slotno;
+       int                     copysize;
+
+       slotno = SimpleLruReadPage_ReadOnly(NotifyCtl, curpage,
+                                                                               
InvalidTransactionId);
+
+       if (curpage == QUEUE_POS_PAGE(head))
+       {
+               /* we only want to read as far as head */
+               copysize = QUEUE_POS_OFFSET(head) - curoffset;
+               if (copysize < 0)
+                       copysize = 0;           /* just for safety */
+       }
+       else
+       {
+               /* fetch all the rest of the page */
+               copysize = QUEUE_PAGESIZE - curoffset;
+       }
+
+       memcpy(page_buffer + curoffset,
+                  NotifyCtl->shared->page_buffer[slotno] + curoffset,
+                  copysize);
+       /* Release lock that we got from SimpleLruReadPage_ReadOnly() */
+       LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage));
+}
 
 /*
  * Read all pending notifications from the queue, and deliver appropriate
@@ -1932,36 +1970,13 @@ asyncQueueReadAllNotifications(void)
 
                do
                {
-                       int64           curpage = QUEUE_POS_PAGE(pos);
-                       int                     curoffset = 
QUEUE_POS_OFFSET(pos);
-                       int                     slotno;
-                       int                     copysize;
-
                        /*
                         * We copy the data from SLRU into a local buffer, so 
as to avoid
                         * holding the SLRU lock while we are examining the 
entries and
                         * possibly transmitting them to our frontend.  Copy 
only the part
                         * of the page we will actually inspect.
                         */
-                       slotno = SimpleLruReadPage_ReadOnly(NotifyCtl, curpage,
-                                                                               
                InvalidTransactionId);
-                       if (curpage == QUEUE_POS_PAGE(head))
-                       {
-                               /* we only want to read as far as head */
-                               copysize = QUEUE_POS_OFFSET(head) - curoffset;
-                               if (copysize < 0)
-                                       copysize = 0;   /* just for safety */
-                       }
-                       else
-                       {
-                               /* fetch all the rest of the page */
-                               copysize = QUEUE_PAGESIZE - curoffset;
-                       }
-                       memcpy(page_buffer.buf + curoffset,
-                                  NotifyCtl->shared->page_buffer[slotno] + 
curoffset,
-                                  copysize);
-                       /* Release lock that we got from 
SimpleLruReadPage_ReadOnly() */
-                       LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage));
+                       asyncQueueReadPageToBuffer(pos, head, page_buffer.buf);
 
                        /*
                         * Process messages up to the stop position, end of 
page, or an
@@ -2097,6 +2112,82 @@ asyncQueueProcessPageEntries(volatile QueuePosition 
*current,
        return reachedStop;
 }
 
+/*
+ * Get the oldest XID in the notification queue that has not yet been
+ * processed by all listening backends.
+ *
+ * Returns InvalidTransactionId if there are no unprocessed notifications.
+ */
+TransactionId
+GetOldestQueuedNotifyXid(void)
+{
+       QueuePosition pos;
+       QueuePosition head;
+       TransactionId oldestXid = InvalidTransactionId;
+
+       /* page_buffer must be adequately aligned, so use a union */
+       union
+       {
+               char            buf[QUEUE_PAGESIZE];
+               AsyncQueueEntry align;
+       }                       page_buffer;
+
+       LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
+
+       /*
+        * We must start at QUEUE_TAIL since notification data might have been
+        * written before there were any listening backends.
+        */
+       pos = QUEUE_TAIL;
+       head = QUEUE_HEAD;
+
+       /* If the queue is empty, no XIDs need protection */
+       if (QUEUE_POS_EQUAL(pos, head))
+       {
+               LWLockRelease(NotifyQueueLock);
+               return InvalidTransactionId;
+       }
+
+       while (!QUEUE_POS_EQUAL(pos, head))
+       {
+               int                     curoffset;
+               AsyncQueueEntry *qe;
+
+               /* Read the current page from SLRU into our local buffer */
+               asyncQueueReadPageToBuffer(pos, head, page_buffer.buf);
+
+               curoffset = QUEUE_POS_OFFSET(pos);
+
+               /* Process all entries on this page up to head */
+               while (curoffset + QUEUEALIGN(AsyncQueueEntryEmptySize) <= 
QUEUE_PAGESIZE &&
+                          !QUEUE_POS_EQUAL(pos, head))
+               {
+                       qe = (AsyncQueueEntry *) (page_buffer.buf + curoffset);
+
+                       /*
+                        * Check if this entry is for our database and has a 
valid XID.
+                        * Only entries for our database matter for our 
datfrozenxid.
+                        */
+                       if (qe->dboid == MyDatabaseId && 
TransactionIdIsValid(qe->xid))
+                       {
+                               if (!TransactionIdIsValid(oldestXid) ||
+                                       TransactionIdPrecedes(qe->xid, 
oldestXid))
+                                       oldestXid = qe->xid;
+                       }
+
+                       /* Advance to next entry */
+                       if (asyncQueueAdvance(&pos, qe->length))
+                               break;                  /* advanced to next 
page */
+
+                       curoffset = QUEUE_POS_OFFSET(pos);
+               }
+       }
+
+       LWLockRelease(NotifyQueueLock);
+
+       return oldestXid;
+}
+
 /*
  * Advance the shared queue tail variable to the minimum of all the
  * per-backend tail pointers.  Truncate pg_notify space if possible.
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index ed03e3bd50d..6c601ce81aa 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -37,6 +37,7 @@
 #include "catalog/namespace.h"
 #include "catalog/pg_database.h"
 #include "catalog/pg_inherits.h"
+#include "commands/async.h"
 #include "commands/cluster.h"
 #include "commands/defrem.h"
 #include "commands/progress.h"
@@ -1617,6 +1618,7 @@ vac_update_datfrozenxid(void)
        bool            dirty = false;
        ScanKeyData key[1];
        void       *inplace_state;
+       TransactionId oldestNotifyXid;
 
        /*
         * Restrict this task to one backend per database.  This avoids race
@@ -1733,6 +1735,16 @@ vac_update_datfrozenxid(void)
        if (bogus)
                return;
 
+       /*
+        * Also consider the oldest XID in the notification queue, since 
backends
+        * will need to call TransactionIdDidCommit() on those XIDs when
+        * processing the notifications.
+        */
+       oldestNotifyXid = GetOldestQueuedNotifyXid();
+       if (TransactionIdIsValid(oldestNotifyXid) &&
+               TransactionIdPrecedes(oldestNotifyXid, newFrozenXid))
+               newFrozenXid = oldestNotifyXid;
+
        Assert(TransactionIdIsNormal(newFrozenXid));
        Assert(MultiXactIdIsValid(newMinMulti));
 
diff --git a/src/include/commands/async.h b/src/include/commands/async.h
index f75c3df9556..ac323ada492 100644
--- a/src/include/commands/async.h
+++ b/src/include/commands/async.h
@@ -26,6 +26,9 @@ extern void NotifyMyFrontEnd(const char *channel,
                                                         const char *payload,
                                                         int32 srcPid);
 
+/* get oldest XID in the notification queue for vacuum freeze */
+extern TransactionId GetOldestQueuedNotifyXid(void);
+
 /* notify-related SQL statements */
 extern void Async_Notify(const char *channel, const char *payload);
 extern void Async_Listen(const char *channel);
-- 
2.51.0

From 0a644d3d7061ff316b11a8245118b7d70f7e986d Mon Sep 17 00:00:00 2001
From: Matheus Alcantara <[email protected]>
Date: Wed, 22 Oct 2025 11:06:18 -0300
Subject: [PATCH v10 2/2] Add tap tests for listen notify vacuum freeze

---
 src/test/modules/Makefile                     |  1 +
 src/test/modules/meson.build                  |  1 +
 src/test/modules/test_listen_notify/Makefile  | 19 +++++
 .../modules/test_listen_notify/meson.build    | 14 ++++
 .../test_listen_notify/t/001_xid_freeze.pl    | 74 ++++++++++++++++
 .../t/002_aborted_tx_notifies.pl              | 84 +++++++++++++++++++
 6 files changed, 193 insertions(+)
 create mode 100644 src/test/modules/test_listen_notify/Makefile
 create mode 100644 src/test/modules/test_listen_notify/meson.build
 create mode 100644 src/test/modules/test_listen_notify/t/001_xid_freeze.pl
 create mode 100644 
src/test/modules/test_listen_notify/t/002_aborted_tx_notifies.pl

diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile
index 902a7954101..a015c961d35 100644
--- a/src/test/modules/Makefile
+++ b/src/test/modules/Makefile
@@ -29,6 +29,7 @@ SUBDIRS = \
                  test_int128 \
                  test_integerset \
                  test_json_parser \
+                 test_listen_notify \
                  test_lfind \
                  test_lwlock_tranches \
                  test_misc \
diff --git a/src/test/modules/meson.build b/src/test/modules/meson.build
index 14fc761c4cf..6af33448d7b 100644
--- a/src/test/modules/meson.build
+++ b/src/test/modules/meson.build
@@ -28,6 +28,7 @@ subdir('test_ginpostinglist')
 subdir('test_int128')
 subdir('test_integerset')
 subdir('test_json_parser')
+subdir('test_listen_notify')
 subdir('test_lfind')
 subdir('test_lwlock_tranches')
 subdir('test_misc')
diff --git a/src/test/modules/test_listen_notify/Makefile 
b/src/test/modules/test_listen_notify/Makefile
new file mode 100644
index 00000000000..c1eb4fde370
--- /dev/null
+++ b/src/test/modules/test_listen_notify/Makefile
@@ -0,0 +1,19 @@
+# src/test/modules/test_listen_notify/Makefile
+
+MODULE = test_listen_notify
+PGFILEDESC = "test_listen_notify - regression testing for LISTEN/NOTIFY 
support"
+
+TAP_TESTS = 1
+
+EXTRA_INSTALL=src/test/modules/xid_wraparound
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = src/test/modules/test_listen_notify
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/src/test/modules/test_listen_notify/meson.build 
b/src/test/modules/test_listen_notify/meson.build
new file mode 100644
index 00000000000..a68052cd353
--- /dev/null
+++ b/src/test/modules/test_listen_notify/meson.build
@@ -0,0 +1,14 @@
+# Copyright (c) 2022-2025, PostgreSQL Global Development Group
+
+tests += {
+  'name': 'test_listen_notify',
+  'sd': meson.current_source_dir(),
+  'bd': meson.current_build_dir(),
+  'tap': {
+    'tests': [
+      't/001_xid_freeze.pl',
+      't/002_aborted_tx_notifies.pl'
+    ],
+  },
+}
+
diff --git a/src/test/modules/test_listen_notify/t/001_xid_freeze.pl 
b/src/test/modules/test_listen_notify/t/001_xid_freeze.pl
new file mode 100644
index 00000000000..a8bbd268c0f
--- /dev/null
+++ b/src/test/modules/test_listen_notify/t/001_xid_freeze.pl
@@ -0,0 +1,74 @@
+# Copyright (c) 2024-2025, PostgreSQL Global Development Group
+
+use strict;
+use warnings FATAL => 'all';
+use File::Path qw(mkpath);
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+my $node = PostgreSQL::Test::Cluster->new('node');
+$node->init;
+$node->start;
+
+# Check if the extension xid_wraparound is available, as it may be
+# possible that this script is run with installcheck, where the module
+# would not be installed by default.
+if (!$node->check_extension('xid_wraparound'))
+{
+       plan skip_all => 'Extension xid_wraparound not installed';
+}
+
+# Setup
+$node->safe_psql('postgres', 'CREATE EXTENSION xid_wraparound');
+$node->safe_psql('postgres',
+       'CREATE TABLE t AS SELECT g AS a, g+2 AS b from 
generate_series(1,100000) g;'
+);
+$node->safe_psql('postgres',
+       'ALTER DATABASE template0 WITH ALLOW_CONNECTIONS true');
+
+# --- Start Session 1 and leave it idle in transaction
+my $psql_session1 = $node->background_psql('postgres');
+$psql_session1->query_safe('listen s;', "Session 1 listens to 's'");
+$psql_session1->query_safe('begin;', "Session 1 starts a transaction");
+
+# --- Session 2, multiple notify's, and commit ---
+for my $i (1 .. 10)
+{
+       $node->safe_psql(
+               'postgres', "
+               BEGIN;
+               NOTIFY s, '$i';
+               COMMIT;");
+}
+
+# Consume enough XIDs to trigger truncation
+$node->safe_psql('postgres', 'select consume_xids(10000000);');
+
+# Execute update so the frozen xid of "t" table is updated to a xid greater
+# than consume_xids() result
+$node->safe_psql('postgres', 'UPDATE t SET a = a+b;');
+
+# Remember current datfrozenxid before vacuum freeze to ensure that it is 
advanced.
+my $datafronzenxid = $node->safe_psql('postgres', "select datfrozenxid from 
pg_database where datname = 'postgres'");
+
+# Execute vacuum freeze on all databases
+$node->command_ok([ 'vacuumdb', '--all', '--freeze', '--port', $node->port ],
+       "vacuumdb --all --freeze");
+
+# Get the new datfrozenxid after vacuum freeze to ensure that is advanced but
+# we can still get the notification status of the notification
+my $datafronzenxid_freeze = $node->safe_psql('postgres', "select datfrozenxid 
from pg_database where datname = 'postgres'");
+ok($datafronzenxid_freeze > $datafronzenxid, 'datfrozenxid is advanced');
+
+# On Session 1, commit and ensure that the all notifications is received
+my $res = $psql_session1->query_safe('commit;', "commit listen s;");
+my $notifications_count = 0;
+foreach my $i (split('\n', $res))
+{
+       $notifications_count++;
+       like($i, qr/Asynchronous notification "s" with payload 
"$notifications_count" received/);
+}
+is($notifications_count, 10, 'received all committed notifications');
+
+done_testing();
diff --git a/src/test/modules/test_listen_notify/t/002_aborted_tx_notifies.pl 
b/src/test/modules/test_listen_notify/t/002_aborted_tx_notifies.pl
new file mode 100644
index 00000000000..464c959a5a9
--- /dev/null
+++ b/src/test/modules/test_listen_notify/t/002_aborted_tx_notifies.pl
@@ -0,0 +1,84 @@
+# Copyright (c) 2024-2025, PostgreSQL Global Development Group
+
+use strict;
+use warnings FATAL => 'all';
+use File::Path qw(mkpath);
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+if (!$ENV{PG_TEST_EXTRA} || $ENV{PG_TEST_EXTRA} !~ /\blisten_notify\b/)
+{
+       plan skip_all => "test listen_notify not enabled in PG_TEST_EXTRA";
+}
+
+# Test checks that listeners do not receive notifications from aborted
+# transaction even if notifications have been added to the listen/notify
+# queue. To reproduce it we use the fact that serializable conflicts
+# are checked after tx adds notifications to the queue.
+
+my $node = PostgreSQL::Test::Cluster->new('node');
+$node->init;
+$node->start;
+
+# Setup
+$node->safe_psql('postgres', 'CREATE TABLE t1 (a bigserial);');
+
+# Listener
+my $psql_listener = $node->background_psql('postgres');
+$psql_listener->query_safe('LISTEN ch;');
+
+# Session1. Start SERIALIZABLE tx and add a notification.
+my $psql_session1 = $node->background_psql('postgres');
+$psql_session1->query_safe("
+       BEGIN ISOLATION LEVEL SERIALIZABLE;
+       SELECT * FROM t1;
+       INSERT INTO t1 DEFAULT VALUES;
+       NOTIFY ch,'committed_0';
+       NOTIFY ch,'committed_1';
+");
+
+# Session2. Start SERIALIZABLE tx, add a notification and introduce a conflict
+# with session1.
+my $psql_session2 = $node->background_psql('postgres', on_error_stop => 0);
+$psql_session2->query_safe("
+       BEGIN ISOLATION LEVEL SERIALIZABLE;
+       SELECT * FROM t1;
+       INSERT INTO t1 DEFAULT VALUES;
+");
+
+# Send notifications that should not be eventually delivered, as session2
+# transaction will be aborted.
+my $message = 'aborted_' . 'a' x 1000;
+for (my $i = 0; $i < 10; $i++) {
+    $psql_session2->query_safe("NOTIFY ch, '$i$message'");
+}
+
+# Session1 should be committed successfully. Listeners must receive session1
+# notifications.
+$psql_session1->query_safe("COMMIT;");
+
+# Session2 should be aborted due to the conflict with session1. Transaction
+# is aborted after adding notifications to the listen/notify queue, but
+# listeners should not receive session2 notifications.
+$psql_session2->query("COMMIT;");
+
+# send more notifications after aborted
+$node->safe_psql('postgres', "NOTIFY ch, 'committed_2';");
+$node->safe_psql('postgres', "NOTIFY ch, 'committed_3';");
+
+# fetch notifications
+my $res = $psql_listener->query_safe('begin; commit;');
+
+# check received notifications
+my @lines = split('\n', $res);
+is(@lines, 4, 'received all committed notifications');
+for (my $i = 0; $i < 4; $i++) {
+    like($lines[$i], qr/Asynchronous notification "ch" with payload 
"committed_$i" received/);
+}
+
+ok($psql_listener->quit);
+ok($psql_session1->quit);
+ok($psql_session2->quit);
+
+done_testing();
-- 
2.51.0

Reply via email to