On Sat, Oct 25, 2025 at 3:33 AM Matheus Alcantara <[email protected]> wrote: > > On Fri Oct 24, 2025 at 4:36 AM -03, Arseniy Mukhin wrote: > >> I would expect to add 002_aborted_tx_notifies.pl in a separate patch > >> since it's not related to this bug fix. > >> > >> --- > >> +# Test checks that listeners do not receive notifications from aborted > >> +# transaction even if notifications have been added to the listen/notify > >> +# queue. To reproduce it we use the fact that serializable conflicts > >> +# are checked after tx adds notifications to the queue. > >> > >> I wonder if we could implement this test using the isolation test > >> instead of the tap test. Is there any reason why you used a tap test > >> for that? > >> > > > > I agree it's less relevant to the patch now than it was with the new > > 'committed' field approach. And there is no particular reason why it > > was implemented as a TAP test actually.. So +1 to move it to separate > > patch (does it mean to separate thread as well or just separate patch > > file?) and rewrite as an isolation test (IIUC it's better to use > > isolation test infrastructure if it's possible). I can try to do it if > > nobody else does it earlier. > > > On the v11 version that I've sent on [1] I've move this test into a > separate patch, please feel free to implement it as an isolation test if > you want it. >
Thank you! I reimplemented the test in 0002 as an isolation test and added the commit message. PFA the new version. Best regards, Arseniy Mukhin
From 494975f96d1295b05baf3063ef7aa05ecd2e11f1 Mon Sep 17 00:00:00 2001 From: Arseniy Mukhin <[email protected]> Date: Sat, 25 Oct 2025 15:15:51 +0300 Subject: [PATCH v12 2/2] Add test for listen/notify This commit adds an isolation test showing that listeners ignore notifications from aborted transactions even if they were added to the listen/notify queue --- src/test/isolation/expected/async-notify.out | 33 +++++++++++++++++++- src/test/isolation/specs/async-notify.spec | 24 ++++++++++++++ 2 files changed, 56 insertions(+), 1 deletion(-) diff --git a/src/test/isolation/expected/async-notify.out b/src/test/isolation/expected/async-notify.out index 556e1805893..20d5763f319 100644 --- a/src/test/isolation/expected/async-notify.out +++ b/src/test/isolation/expected/async-notify.out @@ -1,4 +1,4 @@ -Parsed test spec with 3 sessions +Parsed test spec with 4 sessions starting permutation: listenc notify1 notify2 notify3 notifyf step listenc: LISTEN c1; LISTEN c2; @@ -104,6 +104,37 @@ step l2commit: COMMIT; listener2: NOTIFY "c1" with payload "" from notifier step l2stop: UNLISTEN *; +starting permutation: llisten n1begins n1select n1insert notify1 n2begins n2select n2insert n2notify1 n1commit n2commit notify1 lcheck +step llisten: LISTEN c1; LISTEN c2; +step n1begins: BEGIN ISOLATION LEVEL SERIALIZABLE; +step n1select: SELECT * FROM t1; +a +- +(0 rows) + +step n1insert: INSERT INTO t1 DEFAULT VALUES; +step notify1: NOTIFY c1; +step n2begins: BEGIN ISOLATION LEVEL SERIALIZABLE; +step n2select: SELECT * FROM t1; +a +- +(0 rows) + +step n2insert: INSERT INTO t1 DEFAULT VALUES; +step n2notify1: NOTIFY c1, 'n2_payload'; +step n1commit: COMMIT; +step n2commit: COMMIT; +ERROR: could not serialize access due to read/write dependencies among transactions +step notify1: NOTIFY c1; +step lcheck: SELECT 1 AS x; +x +- +1 +(1 row) + +listener: NOTIFY "c1" with payload "" from notifier +listener: NOTIFY "c1" with payload "" from notifier + starting permutation: llisten lbegin usage bignotify usage step llisten: LISTEN c1; LISTEN c2; step lbegin: BEGIN; diff --git a/src/test/isolation/specs/async-notify.spec b/src/test/isolation/specs/async-notify.spec index 0b8cfd91083..51b7ad43849 100644 --- a/src/test/isolation/specs/async-notify.spec +++ b/src/test/isolation/specs/async-notify.spec @@ -5,6 +5,10 @@ # Note we assume that each step is delivered to the backend as a single Query # message so it will run as one transaction. +# t1 table is used for serializable conflict +setup { CREATE TABLE t1 (a bigserial); } +teardown { DROP TABLE t1; } + session notifier step listenc { LISTEN c1; LISTEN c2; } step notify1 { NOTIFY c1; } @@ -33,8 +37,21 @@ step notifys1 { } step usage { SELECT pg_notification_queue_usage() > 0 AS nonzero; } step bignotify { SELECT count(pg_notify('c1', s::text)) FROM generate_series(1, 1000) s; } +step n1begins { BEGIN ISOLATION LEVEL SERIALIZABLE; } +step n1select { SELECT * FROM t1; } +step n1insert { INSERT INTO t1 DEFAULT VALUES; } +step n1commit { COMMIT; } teardown { UNLISTEN *; } +# notifier2 session is used to reproduce serializable conflict with notifier + +session notifier2 +step n2begins { BEGIN ISOLATION LEVEL SERIALIZABLE; } +step n2select { SELECT * FROM t1; } +step n2insert { INSERT INTO t1 DEFAULT VALUES; } +step n2commit { COMMIT; } +step n2notify1 { NOTIFY c1, 'n2_payload'; } + # The listener session is used for cross-backend notify checks. session listener @@ -73,6 +90,13 @@ permutation listenc llisten notify1 notify2 notify3 notifyf lcheck # and notify queue is not empty permutation l2listen l2begin notify1 lbegins llisten lcommit l2commit l2stop +# Test checks that listeners ignore notifications from aborted +# transaction even if notifications have been added to the listen/notify +# queue. To reproduce it we use the fact that serializable conflicts +# are checked after tx adds notifications to the queue. + +permutation llisten n1begins n1select n1insert notify1 n2begins n2select n2insert n2notify1 n1commit n2commit notify1 lcheck + # Verify that pg_notification_queue_usage correctly reports a non-zero result, # after submitting notifications while another connection is listening for # those notifications and waiting inside an active transaction. We have to -- 2.43.0
From eaf6da2921d647766a9c2398d59810ab042427ee Mon Sep 17 00:00:00 2001 From: Joel Jacobson <[email protected]> Date: Sun, 19 Oct 2025 18:55:25 +0200 Subject: [PATCH v12 1/2] Prevent VACUUM from truncating XIDs still present in notification queue VACUUM's computation of datfrozenxid did not account for transaction IDs in the LISTEN/NOTIFY queue. This allowed VACUUM to truncate clog entries for XIDs that were still referenced by queued notifications, causing backends to fail in TransactionIdDidCommit when later processing those notifications. Fix by adding GetOldestQueuedNotifyXid to find the oldest XID in queued notifications for the current database, and constraining datfrozenxid to not pass that. The function scans from QUEUE_TAIL, since notifications may have been written before any listeners existed. To avoid code duplication, refactor SLRU page-reading code into a new helper function asyncQueueReadPageToBuffer. Co-authored-by: Matheus Alcantara <[email protected]> --- src/backend/commands/async.c | 156 ++++++++++++++---- src/backend/commands/vacuum.c | 12 ++ src/include/commands/async.h | 3 + src/test/modules/Makefile | 1 + src/test/modules/meson.build | 1 + .../modules/test_listen_notify/.gitignore | 4 + src/test/modules/test_listen_notify/Makefile | 19 +++ .../modules/test_listen_notify/meson.build | 13 ++ .../test_listen_notify/t/001_xid_freeze.pl | 89 ++++++++++ src/tools/pgindent/typedefs.list | 1 + 10 files changed, 268 insertions(+), 31 deletions(-) create mode 100644 src/test/modules/test_listen_notify/.gitignore create mode 100644 src/test/modules/test_listen_notify/Makefile create mode 100644 src/test/modules/test_listen_notify/meson.build create mode 100644 src/test/modules/test_listen_notify/t/001_xid_freeze.pl diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index 4bd37d5beb5..8fc1bbba7a2 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -401,6 +401,17 @@ struct NotificationHash Notification *event; /* => the actual Notification struct */ }; +/* + * Page buffers used to read from SLRU cache must be adequately aligned, + * so use a union. + */ +typedef union +{ + char buf[QUEUE_PAGESIZE]; + AsyncQueueEntry align; +} AlignedQueueEntryPage; + + static NotificationList *pendingNotifies = NULL; /* @@ -1841,6 +1852,44 @@ ProcessNotifyInterrupt(bool flush) ProcessIncomingNotify(flush); } +/* + * Read a page from the SLRU queue into a local buffer. + * + * Reads the page containing 'pos', copying the data from the current offset + * either to the end of the page or up to 'head' (whichever comes first) + * into page_buffer. + */ +static void +asyncQueueReadPageToBuffer(QueuePosition pos, QueuePosition head, + char *page_buffer) +{ + int64 curpage = QUEUE_POS_PAGE(pos); + int curoffset = QUEUE_POS_OFFSET(pos); + int slotno; + int copysize; + + slotno = SimpleLruReadPage_ReadOnly(NotifyCtl, curpage, + InvalidTransactionId); + + if (curpage == QUEUE_POS_PAGE(head)) + { + /* we only want to read as far as head */ + copysize = QUEUE_POS_OFFSET(head) - curoffset; + if (copysize < 0) + copysize = 0; /* just for safety */ + } + else + { + /* fetch all the rest of the page */ + copysize = QUEUE_PAGESIZE - curoffset; + } + + memcpy(page_buffer + curoffset, + NotifyCtl->shared->page_buffer[slotno] + curoffset, + copysize); + /* Release lock that we got from SimpleLruReadPage_ReadOnly() */ + LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage)); +} /* * Read all pending notifications from the queue, and deliver appropriate @@ -1853,13 +1902,7 @@ asyncQueueReadAllNotifications(void) volatile QueuePosition pos; QueuePosition head; Snapshot snapshot; - - /* page_buffer must be adequately aligned, so use a union */ - union - { - char buf[QUEUE_PAGESIZE]; - AsyncQueueEntry align; - } page_buffer; + AlignedQueueEntryPage page_buffer; /* Fetch current state */ LWLockAcquire(NotifyQueueLock, LW_SHARED); @@ -1932,36 +1975,13 @@ asyncQueueReadAllNotifications(void) do { - int64 curpage = QUEUE_POS_PAGE(pos); - int curoffset = QUEUE_POS_OFFSET(pos); - int slotno; - int copysize; - /* * We copy the data from SLRU into a local buffer, so as to avoid * holding the SLRU lock while we are examining the entries and * possibly transmitting them to our frontend. Copy only the part * of the page we will actually inspect. */ - slotno = SimpleLruReadPage_ReadOnly(NotifyCtl, curpage, - InvalidTransactionId); - if (curpage == QUEUE_POS_PAGE(head)) - { - /* we only want to read as far as head */ - copysize = QUEUE_POS_OFFSET(head) - curoffset; - if (copysize < 0) - copysize = 0; /* just for safety */ - } - else - { - /* fetch all the rest of the page */ - copysize = QUEUE_PAGESIZE - curoffset; - } - memcpy(page_buffer.buf + curoffset, - NotifyCtl->shared->page_buffer[slotno] + curoffset, - copysize); - /* Release lock that we got from SimpleLruReadPage_ReadOnly() */ - LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage)); + asyncQueueReadPageToBuffer(pos, head, page_buffer.buf); /* * Process messages up to the stop position, end of page, or an @@ -2097,6 +2117,80 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current, return reachedStop; } +/* + * Get the oldest XID in the notification queue that has not yet been + * processed by all listening backends. + * + * Returns InvalidTransactionId if there are no unprocessed notifications or if + * all unprocessed notifications are created on other databases different from + * MyDatabaseId. + */ +TransactionId +GetOldestNotifyTransactionId(void) +{ + QueuePosition pos; + QueuePosition head; + AlignedQueueEntryPage page_buffer; + TransactionId oldestXid = InvalidTransactionId; + + /* First advance the shared queue tail pointer */ + asyncQueueAdvanceTail(); + + /* + * We must start at QUEUE_TAIL since notification data might have been + * written before there were any listening backends. + */ + LWLockAcquire(NotifyQueueLock, LW_SHARED); + pos = QUEUE_TAIL; + head = QUEUE_HEAD; + LWLockRelease(NotifyQueueLock); + + /* If the queue is empty, no XIDs need protection */ + if (QUEUE_POS_EQUAL(pos, head)) + return InvalidTransactionId; + + while (!QUEUE_POS_EQUAL(pos, head)) + { + int curoffset; + AsyncQueueEntry *qe; + + /* Read the current page from SLRU into our local buffer */ + asyncQueueReadPageToBuffer(pos, head, page_buffer.buf); + + curoffset = QUEUE_POS_OFFSET(pos); + + /* Process all entries on this page up to head */ + for (;;) + { + bool reachedEndOfPage; + + qe = (AsyncQueueEntry *) (page_buffer.buf + curoffset); + + /* + * Check if this entry is for our database and has a valid XID. + * Only entries for our database matter for our datfrozenxid. + */ + if (qe->dboid == MyDatabaseId && TransactionIdIsValid(qe->xid)) + { + if (!TransactionIdIsValid(oldestXid) || + TransactionIdPrecedes(qe->xid, oldestXid)) + oldestXid = qe->xid; + } + + /* Advance to next entry */ + reachedEndOfPage = asyncQueueAdvance(&pos, qe->length); + + if (reachedEndOfPage || QUEUE_POS_EQUAL(pos, head)) + break; + + + curoffset = QUEUE_POS_OFFSET(pos); + } + } + + return oldestXid; +} + /* * Advance the shared queue tail variable to the minimum of all the * per-backend tail pointers. Truncate pg_notify space if possible. diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c index ed03e3bd50d..e5fedfb3238 100644 --- a/src/backend/commands/vacuum.c +++ b/src/backend/commands/vacuum.c @@ -37,6 +37,7 @@ #include "catalog/namespace.h" #include "catalog/pg_database.h" #include "catalog/pg_inherits.h" +#include "commands/async.h" #include "commands/cluster.h" #include "commands/defrem.h" #include "commands/progress.h" @@ -1617,6 +1618,7 @@ vac_update_datfrozenxid(void) bool dirty = false; ScanKeyData key[1]; void *inplace_state; + TransactionId oldestNotifyXid; /* * Restrict this task to one backend per database. This avoids race @@ -1733,6 +1735,16 @@ vac_update_datfrozenxid(void) if (bogus) return; + /* + * Also consider the oldest XID in the notification queue, since backends + * will need to call TransactionIdDidCommit() on those XIDs when + * processing the notifications. + */ + oldestNotifyXid = GetOldestNotifyTransactionId(); + if (TransactionIdIsValid(oldestNotifyXid) && + TransactionIdPrecedes(oldestNotifyXid, newFrozenXid)) + newFrozenXid = oldestNotifyXid; + Assert(TransactionIdIsNormal(newFrozenXid)); Assert(MultiXactIdIsValid(newMinMulti)); diff --git a/src/include/commands/async.h b/src/include/commands/async.h index f75c3df9556..0f8f17ad22b 100644 --- a/src/include/commands/async.h +++ b/src/include/commands/async.h @@ -26,6 +26,9 @@ extern void NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid); +/* get oldest XID in the notification queue for vacuum freeze */ +extern TransactionId GetOldestNotifyTransactionId(void); + /* notify-related SQL statements */ extern void Async_Notify(const char *channel, const char *payload); extern void Async_Listen(const char *channel); diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile index 902a7954101..a015c961d35 100644 --- a/src/test/modules/Makefile +++ b/src/test/modules/Makefile @@ -29,6 +29,7 @@ SUBDIRS = \ test_int128 \ test_integerset \ test_json_parser \ + test_listen_notify \ test_lfind \ test_lwlock_tranches \ test_misc \ diff --git a/src/test/modules/meson.build b/src/test/modules/meson.build index 14fc761c4cf..6af33448d7b 100644 --- a/src/test/modules/meson.build +++ b/src/test/modules/meson.build @@ -28,6 +28,7 @@ subdir('test_ginpostinglist') subdir('test_int128') subdir('test_integerset') subdir('test_json_parser') +subdir('test_listen_notify') subdir('test_lfind') subdir('test_lwlock_tranches') subdir('test_misc') diff --git a/src/test/modules/test_listen_notify/.gitignore b/src/test/modules/test_listen_notify/.gitignore new file mode 100644 index 00000000000..5dcb3ff9723 --- /dev/null +++ b/src/test/modules/test_listen_notify/.gitignore @@ -0,0 +1,4 @@ +# Generated subdirectories +/log/ +/results/ +/tmp_check/ diff --git a/src/test/modules/test_listen_notify/Makefile b/src/test/modules/test_listen_notify/Makefile new file mode 100644 index 00000000000..c1eb4fde370 --- /dev/null +++ b/src/test/modules/test_listen_notify/Makefile @@ -0,0 +1,19 @@ +# src/test/modules/test_listen_notify/Makefile + +MODULE = test_listen_notify +PGFILEDESC = "test_listen_notify - regression testing for LISTEN/NOTIFY support" + +TAP_TESTS = 1 + +EXTRA_INSTALL=src/test/modules/xid_wraparound + +ifdef USE_PGXS +PG_CONFIG = pg_config +PGXS := $(shell $(PG_CONFIG) --pgxs) +include $(PGXS) +else +subdir = src/test/modules/test_listen_notify +top_builddir = ../../../.. +include $(top_builddir)/src/Makefile.global +include $(top_srcdir)/contrib/contrib-global.mk +endif diff --git a/src/test/modules/test_listen_notify/meson.build b/src/test/modules/test_listen_notify/meson.build new file mode 100644 index 00000000000..ebf9444108a --- /dev/null +++ b/src/test/modules/test_listen_notify/meson.build @@ -0,0 +1,13 @@ +# Copyright (c) 2022-2025, PostgreSQL Global Development Group + +tests += { + 'name': 'test_listen_notify', + 'sd': meson.current_source_dir(), + 'bd': meson.current_build_dir(), + 'tap': { + 'tests': [ + 't/001_xid_freeze.pl' + ], + }, +} + diff --git a/src/test/modules/test_listen_notify/t/001_xid_freeze.pl b/src/test/modules/test_listen_notify/t/001_xid_freeze.pl new file mode 100644 index 00000000000..c5553d6c792 --- /dev/null +++ b/src/test/modules/test_listen_notify/t/001_xid_freeze.pl @@ -0,0 +1,89 @@ +# Copyright (c) 2024-2025, PostgreSQL Global Development Group + +# Test that VACUUM FREEZE don't remove clog files that are needed to check the +# transaction status of notifications that are on the LISTEN/NOTIFY queue +# during its execution. The VACUUM FREEZE operation should check the oldest xid +# on the queue during execution. +use strict; +use warnings FATAL => 'all'; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +my $node = PostgreSQL::Test::Cluster->new('node'); +$node->init; +$node->start; + +# Check if the extension xid_wraparound is available, as it may be +# possible that this script is run with installcheck, where the module +# would not be installed by default. +if (!$node->check_extension('xid_wraparound')) +{ + plan skip_all => 'Extension xid_wraparound not installed'; +} + +# Setup +$node->safe_psql('postgres', 'CREATE EXTENSION xid_wraparound'); +$node->safe_psql('postgres', + 'CREATE TABLE t AS SELECT g AS a, g+2 AS b from generate_series(1,10) g;' +); +$node->safe_psql('postgres', + 'ALTER DATABASE template0 WITH ALLOW_CONNECTIONS true'); + +# --- Start Session 1 and leave it idle in transaction +my $psql_session1 = $node->background_psql('postgres'); +$psql_session1->query_safe('listen s;', "Session 1 listens to 's'"); +$psql_session1->query_safe('begin;', "Session 1 starts a transaction"); + +# --- Session 2, multiple notify's, and commit --- +for my $i (1 .. 10) +{ + $node->safe_psql( + 'postgres', " + BEGIN; + NOTIFY s, '$i'; + COMMIT;"); +} + +# Consume enough XIDs to trigger truncation +$node->safe_psql('postgres', 'select consume_xids(1050000);'); + +# Execute update so the frozen xid of "t" table is updated to a xid greater +# than consume_xids() result +$node->safe_psql('postgres', 'UPDATE t SET a = a+b;'); + +# Remember current datfrozenxid before vacuum freeze to ensure that it is advanced. +my $datafronzenxid = $node->safe_psql('postgres', "select datfrozenxid from pg_database where datname = 'postgres'"); + +# Execute vacuum freeze on all databases +$node->command_ok([ 'vacuumdb', '--all', '--freeze', '--port', $node->port ], + "vacuumdb --all --freeze"); + +# Get the new datfrozenxid after vacuum freeze to ensure that is not advanced +# to a value greater than the xid used to send the notifications. +my $datafronzenxid_freeze = $node->safe_psql('postgres', "select datfrozenxid from pg_database where datname = 'postgres'"); +print("\n\n$datafronzenxid < $datafronzenxid_freeze\n\n"); +ok($datafronzenxid < $datafronzenxid_freeze, 'datfrozenxid is not fully advanced'); + +# On Session 1, commit and ensure that the all notifications is received +my $res = $psql_session1->query_safe('commit;', "commit listen s;"); +my $notifications_count = 0; +foreach my $i (split('\n', $res)) +{ + $notifications_count++; + like($i, qr/Asynchronous notification "s" with payload "$notifications_count" received/); +} +is($notifications_count, 10, 'received all committed notifications'); + +# Execute vacuum freeze on all databases and ensure that the datafrozenxid is advanced + +$datafronzenxid = $node->safe_psql('postgres', "select datfrozenxid from pg_database where datname = 'postgres'"); + +$node->command_ok([ 'vacuumdb', '--all', '--freeze', '--port', $node->port ], + "vacuumdb --all --freeze"); + +$datafronzenxid_freeze = $node->safe_psql('postgres', "select datfrozenxid from pg_database where datname = 'postgres'"); + +ok($datafronzenxid_freeze > $datafronzenxid, "datfrozenxid is advanced: $datafronzenxid_freeze > $datafronzenxid"); + +done_testing(); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index c13f92988ee..23f85b73114 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -58,6 +58,7 @@ Aggref AggregateInstrumentation AlenState Alias +AlignedQueueEntryPage AllocBlock AllocFreeListLink AllocPointer -- 2.43.0
