Thanks for the review!
On Thu Oct 23, 2025 at 8:42 PM -03, Masahiko Sawada wrote:
> On Wed, Oct 22, 2025 at 10:25 AM Matheus Alcantara
> <[email protected]> wrote:
>>
>> On Wed Oct 22, 2025 at 12:49 PM -03, Álvaro Herrera wrote:
>> > On 2025-Oct-22, Matheus Alcantara wrote:
>> >
>> >> I'm wondering if the 002_aborted_tx_notifies.pl is still needed with
>> >> this architecture being used. I think that it's not, but perhaps is a
>> >> good test to keep it?
>> >
>> > I'd rather have tests than not, but I'd think it needs to be behind
>> > PG_TEST_EXTRA because of things like
>> >
>> > +$node->safe_psql('postgres', 'select consume_xids(10000000);');
>> >
>> Attached v10 with wrapping into PG_TEST_EXTRA.
>
> Thank you for updating the patches!
>
> I've reviewed the patches and here are the comments.
>
> v10-0001-Prevent-VACUUM-from-truncating-XIDs-still-presen.patch:
>
> + *
> + * Returns InvalidTransactionId if there are no unprocessed notifications.
>
> This comment is not accurate since the function returns
> InvalidTransactionId even if all unprocessed notifications are created
> on other databases.
>
Fixed
> ---
> +TransactionId
> +GetOldestQueuedNotifyXid(void)
> +{
>
> How about renaming it to something like GetOldestNotifyTransactionId()?
>
Yeap, sounds better, fixed.
> ---
> + /* page_buffer must be adequately aligned, so use a union */
> + union
> + {
> + char buf[QUEUE_PAGESIZE];
> + AsyncQueueEntry align;
> + } page_buffer;
>
> asyncQueueReadAllNotifications() uses this union too, so how about
> define this type as AlignedQueueEntryPage or something and use it in
> both functions?
>
Good point, fixed.
> ---
> +
> + LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
> +
>
> I don't think we need an exclusive lock here.
>
Fixed.
> ---
> In GetOldestQueuedNotifyXid() why do we keep holding NotifyQueueLock
> while calculating the oldest XID in the queue?
>
Yeah, I don't think that it's necessary. The
asyncQueueReadAllNotifications() for example only hold the lock when
reading the QUEUE_BACKEND_POS and QUEUE_HEAD. I think that it's a
similar case here, fixed.
> ---
> +
> + /* Process all entries on this page up to head */
> + while (curoffset + QUEUEALIGN(AsyncQueueEntryEmptySize) <=
> QUEUE_PAGESIZE &&
> + !QUEUE_POS_EQUAL(pos, head))
> + {
> + qe = (AsyncQueueEntry *) (page_buffer.buf + curoffset);
> (snip)
> + /* Advance to next entry */
> + if (asyncQueueAdvance(&pos, qe->length))
> + break; /* advanced to next page */
> +
>
> I think the check "curoffset + QUEUEALIGN(AsyncQueueEntryEmptySize) <=
> QUEUE_PAGESIZE" is essentially the same as the one we do in
> asyncQueueAdvance(). I think we can refactor the inner loop in
> GetOldestQueuedNotifyXid() to something like:
>
> for (;;)
> {
> bool reachedEndOfPage;
>
> qe = (AsyncQueueEntry *) (page_buffer.buf + curoffset);
>
> // check qe->xid here...
>
> reachedEndOfPage = asyncQueueAdvance(&pos, qe->length);
>
> if (reachedEndOfPage || QUEUE_POS_EQUAL(pos, head))
> break;
> }
>
Fixed
> ---
> v10-0002-Add-tap-tests-for-listen-notify-vacuum-freeze.patch:
>
> This new test directory needs to have .gitignore.
>
Fixed
> ---
> +# Copyright (c) 2024-2025, PostgreSQL Global Development Group
> +
>
> It's better to have a short description of this test here.
>
Fixed
> ---
> +use File::Path qw(mkpath);
>
> It seems not necessary.
>
Fixed
> ---
> +$node->safe_psql('postgres',
> + 'CREATE TABLE t AS SELECT g AS a, g+2 AS b from
> generate_series(1,100000) g;'
>
> Why does it need to insert many rows?
>
I think that this value was left when I was writing the first versions of
this test, I don't think that it's necessary. I reduced to just 10 rows.
> ---
> +# --- Session 2, multiple notify's, and commit ---
> +for my $i (1 .. 10)
> +{
> + $node->safe_psql(
> + 'postgres', "
> + BEGIN;
> + NOTIFY s, '$i';
> + COMMIT;");
> +}
>
> Why does it need to send a notification 10 times?
>
I just wanted to test with multiple notifications, we can reduce to two
or three, there is not specific reason to send 10 notifications.
> ---
> +# Consume enough XIDs to trigger truncation
> +$node->safe_psql('postgres', 'select consume_xids(10000000);');
> +
>
> I guess it consumes XID too much to trigger truncation. Given the one
> clog segment file is 256kB in size, it's enough to consume 1,050,000
> XIDs to move to the next segment file.
>
Fixed
> ---
> +# Get the new datfrozenxid after vacuum freeze to ensure that is advanced but
> +# we can still get the notification status of the notification
> +my $datafronzenxid_freeze = $node->safe_psql('postgres', "select
> datfrozenxid from pg_database where datname = 'postgres'");
> +ok($datafronzenxid_freeze > $datafronzenxid, 'datfrozenxid is advanced');
>
> I think this test passes in all cases, so it is meaningless. Instead,
> what we need to check in terms of datfrozenxid is that its value
> doesn't get greater than the XID we used for the notification, even
> after vacuum freeze. I think we remember the XID used for NOTIFY and
> check if the old/new datfrozenxid values are older than that value.
>
I think that was the goal of this test. I've swap the values to make it
more clear. Please let me know if I misunderstood your point here.
> ---
> +is($notifications_count, 10, 'received all committed notifications');
> +
> +done_testing();
>
> After consuming the unconsumed notifications, let's do vacuum freeze
> and check if datfrozenxid now can be advanced.
>
Good point, during this I realize that the datafrozenxid was not being
advanced even after the queue being consumed. This was because when a
backend listener is consuming the notifications it only update their own
queue tail pointer and not the global shared tail pointer, so I've
included a call to asyncQueueAdvanceTail() at the beginning of
GetOldestNotifyTransactionId() to do this.
> ---
> I would expect to add 002_aborted_tx_notifies.pl in a separate patch
> since it's not related to this bug fix.
>
On the new attached version I've moved this test to a separated patch.
> ---
> +# Test checks that listeners do not receive notifications from aborted
> +# transaction even if notifications have been added to the listen/notify
> +# queue. To reproduce it we use the fact that serializable conflicts
> +# are checked after tx adds notifications to the queue.
>
> I wonder if we could implement this test using the isolation test
> instead of the tap test. Is there any reason why you used a tap test
> for that?
>
On this new version I just moved the test into a separated patch but I
agree that we can implement this using the isolation test,
I don't know how we should handle the follow-up patches that Joel have
sent on [1], if we should incoportate on 0001 or not. I still need to
review and test them.
[1]
https://www.postgresql.org/message-id/b6f6cbb8-1903-4ca0-ae64-01d84d1e12a3%40app.fastmail.com
--
Matheus Alcantara
From 37675f6feff30a365de3b26719002b65a81dabb3 Mon Sep 17 00:00:00 2001
From: Joel Jacobson <[email protected]>
Date: Sun, 19 Oct 2025 18:55:25 +0200
Subject: [PATCH v11 1/2] Prevent VACUUM from truncating XIDs still present in
notification queue
VACUUM's computation of datfrozenxid did not account for transaction IDs
in the LISTEN/NOTIFY queue. This allowed VACUUM to truncate clog
entries for XIDs that were still referenced by queued notifications,
causing backends to fail in TransactionIdDidCommit when later processing
those notifications.
Fix by adding GetOldestQueuedNotifyXid to find the oldest XID in queued
notifications for the current database, and constraining datfrozenxid to
not pass that. The function scans from QUEUE_TAIL, since notifications
may have been written before any listeners existed.
To avoid code duplication, refactor SLRU page-reading code into a new
helper function asyncQueueReadPageToBuffer.
Co-authored-by: Matheus Alcantara <[email protected]>
---
src/backend/commands/async.c | 156 ++++++++++++++----
src/backend/commands/vacuum.c | 12 ++
src/include/commands/async.h | 3 +
src/test/modules/Makefile | 1 +
src/test/modules/meson.build | 1 +
.../modules/test_listen_notify/.gitignore | 4 +
src/test/modules/test_listen_notify/Makefile | 19 +++
.../modules/test_listen_notify/meson.build | 13 ++
.../test_listen_notify/t/001_xid_freeze.pl | 89 ++++++++++
src/tools/pgindent/typedefs.list | 1 +
10 files changed, 268 insertions(+), 31 deletions(-)
create mode 100644 src/test/modules/test_listen_notify/.gitignore
create mode 100644 src/test/modules/test_listen_notify/Makefile
create mode 100644 src/test/modules/test_listen_notify/meson.build
create mode 100644 src/test/modules/test_listen_notify/t/001_xid_freeze.pl
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 4bd37d5beb5..8fc1bbba7a2 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -401,6 +401,17 @@ struct NotificationHash
Notification *event; /* => the actual Notification struct */
};
+/*
+ * Page buffers used to read from SLRU cache must be adequately aligned,
+ * so use a union.
+ */
+typedef union
+{
+ char buf[QUEUE_PAGESIZE];
+ AsyncQueueEntry align;
+} AlignedQueueEntryPage;
+
+
static NotificationList *pendingNotifies = NULL;
/*
@@ -1841,6 +1852,44 @@ ProcessNotifyInterrupt(bool flush)
ProcessIncomingNotify(flush);
}
+/*
+ * Read a page from the SLRU queue into a local buffer.
+ *
+ * Reads the page containing 'pos', copying the data from the current offset
+ * either to the end of the page or up to 'head' (whichever comes first)
+ * into page_buffer.
+ */
+static void
+asyncQueueReadPageToBuffer(QueuePosition pos, QueuePosition head,
+ char *page_buffer)
+{
+ int64 curpage = QUEUE_POS_PAGE(pos);
+ int curoffset = QUEUE_POS_OFFSET(pos);
+ int slotno;
+ int copysize;
+
+ slotno = SimpleLruReadPage_ReadOnly(NotifyCtl, curpage,
+
InvalidTransactionId);
+
+ if (curpage == QUEUE_POS_PAGE(head))
+ {
+ /* we only want to read as far as head */
+ copysize = QUEUE_POS_OFFSET(head) - curoffset;
+ if (copysize < 0)
+ copysize = 0; /* just for safety */
+ }
+ else
+ {
+ /* fetch all the rest of the page */
+ copysize = QUEUE_PAGESIZE - curoffset;
+ }
+
+ memcpy(page_buffer + curoffset,
+ NotifyCtl->shared->page_buffer[slotno] + curoffset,
+ copysize);
+ /* Release lock that we got from SimpleLruReadPage_ReadOnly() */
+ LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage));
+}
/*
* Read all pending notifications from the queue, and deliver appropriate
@@ -1853,13 +1902,7 @@ asyncQueueReadAllNotifications(void)
volatile QueuePosition pos;
QueuePosition head;
Snapshot snapshot;
-
- /* page_buffer must be adequately aligned, so use a union */
- union
- {
- char buf[QUEUE_PAGESIZE];
- AsyncQueueEntry align;
- } page_buffer;
+ AlignedQueueEntryPage page_buffer;
/* Fetch current state */
LWLockAcquire(NotifyQueueLock, LW_SHARED);
@@ -1932,36 +1975,13 @@ asyncQueueReadAllNotifications(void)
do
{
- int64 curpage = QUEUE_POS_PAGE(pos);
- int curoffset =
QUEUE_POS_OFFSET(pos);
- int slotno;
- int copysize;
-
/*
* We copy the data from SLRU into a local buffer, so
as to avoid
* holding the SLRU lock while we are examining the
entries and
* possibly transmitting them to our frontend. Copy
only the part
* of the page we will actually inspect.
*/
- slotno = SimpleLruReadPage_ReadOnly(NotifyCtl, curpage,
-
InvalidTransactionId);
- if (curpage == QUEUE_POS_PAGE(head))
- {
- /* we only want to read as far as head */
- copysize = QUEUE_POS_OFFSET(head) - curoffset;
- if (copysize < 0)
- copysize = 0; /* just for safety */
- }
- else
- {
- /* fetch all the rest of the page */
- copysize = QUEUE_PAGESIZE - curoffset;
- }
- memcpy(page_buffer.buf + curoffset,
- NotifyCtl->shared->page_buffer[slotno] +
curoffset,
- copysize);
- /* Release lock that we got from
SimpleLruReadPage_ReadOnly() */
- LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage));
+ asyncQueueReadPageToBuffer(pos, head, page_buffer.buf);
/*
* Process messages up to the stop position, end of
page, or an
@@ -2097,6 +2117,80 @@ asyncQueueProcessPageEntries(volatile QueuePosition
*current,
return reachedStop;
}
+/*
+ * Get the oldest XID in the notification queue that has not yet been
+ * processed by all listening backends.
+ *
+ * Returns InvalidTransactionId if there are no unprocessed notifications or if
+ * all unprocessed notifications are created on other databases different from
+ * MyDatabaseId.
+ */
+TransactionId
+GetOldestNotifyTransactionId(void)
+{
+ QueuePosition pos;
+ QueuePosition head;
+ AlignedQueueEntryPage page_buffer;
+ TransactionId oldestXid = InvalidTransactionId;
+
+ /* First advance the shared queue tail pointer */
+ asyncQueueAdvanceTail();
+
+ /*
+ * We must start at QUEUE_TAIL since notification data might have been
+ * written before there were any listening backends.
+ */
+ LWLockAcquire(NotifyQueueLock, LW_SHARED);
+ pos = QUEUE_TAIL;
+ head = QUEUE_HEAD;
+ LWLockRelease(NotifyQueueLock);
+
+ /* If the queue is empty, no XIDs need protection */
+ if (QUEUE_POS_EQUAL(pos, head))
+ return InvalidTransactionId;
+
+ while (!QUEUE_POS_EQUAL(pos, head))
+ {
+ int curoffset;
+ AsyncQueueEntry *qe;
+
+ /* Read the current page from SLRU into our local buffer */
+ asyncQueueReadPageToBuffer(pos, head, page_buffer.buf);
+
+ curoffset = QUEUE_POS_OFFSET(pos);
+
+ /* Process all entries on this page up to head */
+ for (;;)
+ {
+ bool reachedEndOfPage;
+
+ qe = (AsyncQueueEntry *) (page_buffer.buf + curoffset);
+
+ /*
+ * Check if this entry is for our database and has a
valid XID.
+ * Only entries for our database matter for our
datfrozenxid.
+ */
+ if (qe->dboid == MyDatabaseId &&
TransactionIdIsValid(qe->xid))
+ {
+ if (!TransactionIdIsValid(oldestXid) ||
+ TransactionIdPrecedes(qe->xid,
oldestXid))
+ oldestXid = qe->xid;
+ }
+
+ /* Advance to next entry */
+ reachedEndOfPage = asyncQueueAdvance(&pos, qe->length);
+
+ if (reachedEndOfPage || QUEUE_POS_EQUAL(pos, head))
+ break;
+
+
+ curoffset = QUEUE_POS_OFFSET(pos);
+ }
+ }
+
+ return oldestXid;
+}
+
/*
* Advance the shared queue tail variable to the minimum of all the
* per-backend tail pointers. Truncate pg_notify space if possible.
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index ed03e3bd50d..e5fedfb3238 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -37,6 +37,7 @@
#include "catalog/namespace.h"
#include "catalog/pg_database.h"
#include "catalog/pg_inherits.h"
+#include "commands/async.h"
#include "commands/cluster.h"
#include "commands/defrem.h"
#include "commands/progress.h"
@@ -1617,6 +1618,7 @@ vac_update_datfrozenxid(void)
bool dirty = false;
ScanKeyData key[1];
void *inplace_state;
+ TransactionId oldestNotifyXid;
/*
* Restrict this task to one backend per database. This avoids race
@@ -1733,6 +1735,16 @@ vac_update_datfrozenxid(void)
if (bogus)
return;
+ /*
+ * Also consider the oldest XID in the notification queue, since
backends
+ * will need to call TransactionIdDidCommit() on those XIDs when
+ * processing the notifications.
+ */
+ oldestNotifyXid = GetOldestNotifyTransactionId();
+ if (TransactionIdIsValid(oldestNotifyXid) &&
+ TransactionIdPrecedes(oldestNotifyXid, newFrozenXid))
+ newFrozenXid = oldestNotifyXid;
+
Assert(TransactionIdIsNormal(newFrozenXid));
Assert(MultiXactIdIsValid(newMinMulti));
diff --git a/src/include/commands/async.h b/src/include/commands/async.h
index f75c3df9556..0f8f17ad22b 100644
--- a/src/include/commands/async.h
+++ b/src/include/commands/async.h
@@ -26,6 +26,9 @@ extern void NotifyMyFrontEnd(const char *channel,
const char *payload,
int32 srcPid);
+/* get oldest XID in the notification queue for vacuum freeze */
+extern TransactionId GetOldestNotifyTransactionId(void);
+
/* notify-related SQL statements */
extern void Async_Notify(const char *channel, const char *payload);
extern void Async_Listen(const char *channel);
diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile
index 902a7954101..a015c961d35 100644
--- a/src/test/modules/Makefile
+++ b/src/test/modules/Makefile
@@ -29,6 +29,7 @@ SUBDIRS = \
test_int128 \
test_integerset \
test_json_parser \
+ test_listen_notify \
test_lfind \
test_lwlock_tranches \
test_misc \
diff --git a/src/test/modules/meson.build b/src/test/modules/meson.build
index 14fc761c4cf..6af33448d7b 100644
--- a/src/test/modules/meson.build
+++ b/src/test/modules/meson.build
@@ -28,6 +28,7 @@ subdir('test_ginpostinglist')
subdir('test_int128')
subdir('test_integerset')
subdir('test_json_parser')
+subdir('test_listen_notify')
subdir('test_lfind')
subdir('test_lwlock_tranches')
subdir('test_misc')
diff --git a/src/test/modules/test_listen_notify/.gitignore
b/src/test/modules/test_listen_notify/.gitignore
new file mode 100644
index 00000000000..5dcb3ff9723
--- /dev/null
+++ b/src/test/modules/test_listen_notify/.gitignore
@@ -0,0 +1,4 @@
+# Generated subdirectories
+/log/
+/results/
+/tmp_check/
diff --git a/src/test/modules/test_listen_notify/Makefile
b/src/test/modules/test_listen_notify/Makefile
new file mode 100644
index 00000000000..c1eb4fde370
--- /dev/null
+++ b/src/test/modules/test_listen_notify/Makefile
@@ -0,0 +1,19 @@
+# src/test/modules/test_listen_notify/Makefile
+
+MODULE = test_listen_notify
+PGFILEDESC = "test_listen_notify - regression testing for LISTEN/NOTIFY
support"
+
+TAP_TESTS = 1
+
+EXTRA_INSTALL=src/test/modules/xid_wraparound
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = src/test/modules/test_listen_notify
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/src/test/modules/test_listen_notify/meson.build
b/src/test/modules/test_listen_notify/meson.build
new file mode 100644
index 00000000000..ebf9444108a
--- /dev/null
+++ b/src/test/modules/test_listen_notify/meson.build
@@ -0,0 +1,13 @@
+# Copyright (c) 2022-2025, PostgreSQL Global Development Group
+
+tests += {
+ 'name': 'test_listen_notify',
+ 'sd': meson.current_source_dir(),
+ 'bd': meson.current_build_dir(),
+ 'tap': {
+ 'tests': [
+ 't/001_xid_freeze.pl'
+ ],
+ },
+}
+
diff --git a/src/test/modules/test_listen_notify/t/001_xid_freeze.pl
b/src/test/modules/test_listen_notify/t/001_xid_freeze.pl
new file mode 100644
index 00000000000..c5553d6c792
--- /dev/null
+++ b/src/test/modules/test_listen_notify/t/001_xid_freeze.pl
@@ -0,0 +1,89 @@
+# Copyright (c) 2024-2025, PostgreSQL Global Development Group
+
+# Test that VACUUM FREEZE don't remove clog files that are needed to check the
+# transaction status of notifications that are on the LISTEN/NOTIFY queue
+# during its execution. The VACUUM FREEZE operation should check the oldest xid
+# on the queue during execution.
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+my $node = PostgreSQL::Test::Cluster->new('node');
+$node->init;
+$node->start;
+
+# Check if the extension xid_wraparound is available, as it may be
+# possible that this script is run with installcheck, where the module
+# would not be installed by default.
+if (!$node->check_extension('xid_wraparound'))
+{
+ plan skip_all => 'Extension xid_wraparound not installed';
+}
+
+# Setup
+$node->safe_psql('postgres', 'CREATE EXTENSION xid_wraparound');
+$node->safe_psql('postgres',
+ 'CREATE TABLE t AS SELECT g AS a, g+2 AS b from generate_series(1,10)
g;'
+);
+$node->safe_psql('postgres',
+ 'ALTER DATABASE template0 WITH ALLOW_CONNECTIONS true');
+
+# --- Start Session 1 and leave it idle in transaction
+my $psql_session1 = $node->background_psql('postgres');
+$psql_session1->query_safe('listen s;', "Session 1 listens to 's'");
+$psql_session1->query_safe('begin;', "Session 1 starts a transaction");
+
+# --- Session 2, multiple notify's, and commit ---
+for my $i (1 .. 10)
+{
+ $node->safe_psql(
+ 'postgres', "
+ BEGIN;
+ NOTIFY s, '$i';
+ COMMIT;");
+}
+
+# Consume enough XIDs to trigger truncation
+$node->safe_psql('postgres', 'select consume_xids(1050000);');
+
+# Execute update so the frozen xid of "t" table is updated to a xid greater
+# than consume_xids() result
+$node->safe_psql('postgres', 'UPDATE t SET a = a+b;');
+
+# Remember current datfrozenxid before vacuum freeze to ensure that it is
advanced.
+my $datafronzenxid = $node->safe_psql('postgres', "select datfrozenxid from
pg_database where datname = 'postgres'");
+
+# Execute vacuum freeze on all databases
+$node->command_ok([ 'vacuumdb', '--all', '--freeze', '--port', $node->port ],
+ "vacuumdb --all --freeze");
+
+# Get the new datfrozenxid after vacuum freeze to ensure that is not advanced
+# to a value greater than the xid used to send the notifications.
+my $datafronzenxid_freeze = $node->safe_psql('postgres', "select datfrozenxid
from pg_database where datname = 'postgres'");
+print("\n\n$datafronzenxid < $datafronzenxid_freeze\n\n");
+ok($datafronzenxid < $datafronzenxid_freeze, 'datfrozenxid is not fully
advanced');
+
+# On Session 1, commit and ensure that the all notifications is received
+my $res = $psql_session1->query_safe('commit;', "commit listen s;");
+my $notifications_count = 0;
+foreach my $i (split('\n', $res))
+{
+ $notifications_count++;
+ like($i, qr/Asynchronous notification "s" with payload
"$notifications_count" received/);
+}
+is($notifications_count, 10, 'received all committed notifications');
+
+# Execute vacuum freeze on all databases and ensure that the datafrozenxid is
advanced
+
+$datafronzenxid = $node->safe_psql('postgres', "select datfrozenxid from
pg_database where datname = 'postgres'");
+
+$node->command_ok([ 'vacuumdb', '--all', '--freeze', '--port', $node->port ],
+ "vacuumdb --all --freeze");
+
+$datafronzenxid_freeze = $node->safe_psql('postgres', "select datfrozenxid
from pg_database where datname = 'postgres'");
+
+ok($datafronzenxid_freeze > $datafronzenxid, "datfrozenxid is advanced:
$datafronzenxid_freeze > $datafronzenxid");
+
+done_testing();
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 43fe3bcd593..a8aa1365382 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -58,6 +58,7 @@ Aggref
AggregateInstrumentation
AlenState
Alias
+AlignedQueueEntryPage
AllocBlock
AllocFreeListLink
AllocPointer
--
2.51.0
From cad38eb3c267ac1d46dfbd1da546500c0985c28b Mon Sep 17 00:00:00 2001
From: Matheus Alcantara <[email protected]>
Date: Fri, 24 Oct 2025 21:14:38 -0300
Subject: [PATCH v11 2/2] Add more tests for listen notify vacuum freeze
Author: Arseniy Mukhin <[email protected]
---
.../modules/test_listen_notify/meson.build | 3 +-
.../t/002_aborted_tx_notifies.pl | 84 +++++++++++++++++++
2 files changed, 86 insertions(+), 1 deletion(-)
create mode 100644
src/test/modules/test_listen_notify/t/002_aborted_tx_notifies.pl
diff --git a/src/test/modules/test_listen_notify/meson.build
b/src/test/modules/test_listen_notify/meson.build
index ebf9444108a..a68052cd353 100644
--- a/src/test/modules/test_listen_notify/meson.build
+++ b/src/test/modules/test_listen_notify/meson.build
@@ -6,7 +6,8 @@ tests += {
'bd': meson.current_build_dir(),
'tap': {
'tests': [
- 't/001_xid_freeze.pl'
+ 't/001_xid_freeze.pl',
+ 't/002_aborted_tx_notifies.pl'
],
},
}
diff --git a/src/test/modules/test_listen_notify/t/002_aborted_tx_notifies.pl
b/src/test/modules/test_listen_notify/t/002_aborted_tx_notifies.pl
new file mode 100644
index 00000000000..464c959a5a9
--- /dev/null
+++ b/src/test/modules/test_listen_notify/t/002_aborted_tx_notifies.pl
@@ -0,0 +1,84 @@
+# Copyright (c) 2024-2025, PostgreSQL Global Development Group
+
+use strict;
+use warnings FATAL => 'all';
+use File::Path qw(mkpath);
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+if (!$ENV{PG_TEST_EXTRA} || $ENV{PG_TEST_EXTRA} !~ /\blisten_notify\b/)
+{
+ plan skip_all => "test listen_notify not enabled in PG_TEST_EXTRA";
+}
+
+# Test checks that listeners do not receive notifications from aborted
+# transaction even if notifications have been added to the listen/notify
+# queue. To reproduce it we use the fact that serializable conflicts
+# are checked after tx adds notifications to the queue.
+
+my $node = PostgreSQL::Test::Cluster->new('node');
+$node->init;
+$node->start;
+
+# Setup
+$node->safe_psql('postgres', 'CREATE TABLE t1 (a bigserial);');
+
+# Listener
+my $psql_listener = $node->background_psql('postgres');
+$psql_listener->query_safe('LISTEN ch;');
+
+# Session1. Start SERIALIZABLE tx and add a notification.
+my $psql_session1 = $node->background_psql('postgres');
+$psql_session1->query_safe("
+ BEGIN ISOLATION LEVEL SERIALIZABLE;
+ SELECT * FROM t1;
+ INSERT INTO t1 DEFAULT VALUES;
+ NOTIFY ch,'committed_0';
+ NOTIFY ch,'committed_1';
+");
+
+# Session2. Start SERIALIZABLE tx, add a notification and introduce a conflict
+# with session1.
+my $psql_session2 = $node->background_psql('postgres', on_error_stop => 0);
+$psql_session2->query_safe("
+ BEGIN ISOLATION LEVEL SERIALIZABLE;
+ SELECT * FROM t1;
+ INSERT INTO t1 DEFAULT VALUES;
+");
+
+# Send notifications that should not be eventually delivered, as session2
+# transaction will be aborted.
+my $message = 'aborted_' . 'a' x 1000;
+for (my $i = 0; $i < 10; $i++) {
+ $psql_session2->query_safe("NOTIFY ch, '$i$message'");
+}
+
+# Session1 should be committed successfully. Listeners must receive session1
+# notifications.
+$psql_session1->query_safe("COMMIT;");
+
+# Session2 should be aborted due to the conflict with session1. Transaction
+# is aborted after adding notifications to the listen/notify queue, but
+# listeners should not receive session2 notifications.
+$psql_session2->query("COMMIT;");
+
+# send more notifications after aborted
+$node->safe_psql('postgres', "NOTIFY ch, 'committed_2';");
+$node->safe_psql('postgres', "NOTIFY ch, 'committed_3';");
+
+# fetch notifications
+my $res = $psql_listener->query_safe('begin; commit;');
+
+# check received notifications
+my @lines = split('\n', $res);
+is(@lines, 4, 'received all committed notifications');
+for (my $i = 0; $i < 4; $i++) {
+ like($lines[$i], qr/Asynchronous notification "ch" with payload
"committed_$i" received/);
+}
+
+ok($psql_listener->quit);
+ok($psql_session1->quit);
+ok($psql_session2->quit);
+
+done_testing();
--
2.51.0