On 31/10/2025 01:27, Joel Jacobson wrote:
On Fri, Oct 31, 2025, at 00:08, Joel Jacobson wrote:
On Thu, Oct 30, 2025, at 14:25, Heikki Linnakangas wrote:
Joel, since you've been working on some optimizations in this area too,
would you happen to have some suitable performance test scripts for this?
Glad you asked. I'm actually working on a benchmark+correctness tester.
It's very much work-in-progress though, don't look too much at the code,
or your eyes will bleed.
It's a combined benchmark + correctness tester, that verifies that only
the expected notifications are received on the expected connections,
while at the same time doing timing measurements.
To run multiple pg_bench_lino processes in parallell to simulate
concurrent workloads, I realized the randomization of the channel names
and payloads were not random enough to avoid collissions. New version
attached that uses real UUIDs for channel names and payloads.
Thanks! Here's a sketch for holding the bank lock across
TransactionIdDidCommit() calls. In quick testing with your test program,
I can't see any performance difference. However, I'm not quite sure what
options I should be using to stress this. My gut feeling is that it's
fine, but it'd be nice to do construct a real worst case test case to be
sure.
There are some opportunities for micro-optimizations here:
* IsListeningOn() is kind of expensive if a backend is listening
multiple channels. We really should turn that into a hash table. As the
patch stands, I'm doing the IsListeningOn() calls while holding the bank
lock, but unless we speed up IsListeningOn() in those degenerate cases,
we should perhaps call it only after copying and releasing the lock.
* If IsListeningOn() is made fast, it might make sense to call it before
TransactionIdDidCommit().
* Implement the hint bits.
I don't know how much those matter. Again, a test case would be nice.
I'll work more on performance testing next week of this, if no one else
picks that up.
- Heikki
From 74384374a24334616ac427fdcfeed5199ee1fc43 Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <[email protected]>
Date: Fri, 31 Oct 2025 18:27:25 +0200
Subject: [PATCH 1/2] Fix bug where we truncated CLOG that was still needed by
LISTEN/NOTIFY
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
The async notification queue contains the XID of the sender, and wnen
processing notifications we call TransactionIdDidCommit() on the
XID. But we had no safeguards to prevent the CLOG segments containing
those XIDs from being truncated. As a result, if a backend didn't for
some reason process the notifications for a long time, or when a new
backend issued LISTEN, you could get an error like:
test=# listen c21;
ERROR: 58P01: could not access status of transaction 14279685
DETAIL: Could not open file "pg_xact/000D": No such file or directory.
LOCATION: SlruReportIOError, slru.c:1087
This was first reported by Sergey Zhuravlev in 2021, with many other
people hitting the same issue later. I believe the bug goes back all
the way to commit d1e027221d, which introduced the SLRU-based async
notification queue.
Thanks to:
- Alexandra Wang, Daniil Davydov, Andrei Varashen and Jacques Combrink for
investigating and providing reproducable test cases,
- Matheus Alcantara and Arseniy Mukhin for earlier proposed patches to
fix this,
- Álvaro Herrera and Masahiko Sawada for reviewing said earlier patches,
- Yura Sokolov aka funny-falcon for the idea of marking transactions as
committed in the notification queue, and
- Joel Jacobson for the final patch version. I hope I didn't forget anyone.
Author: Joel Jacobson, Arseniy Mukhin
Discussion: https://www.postgresql.org/message-id/[email protected]
Discussion: https://www.postgresql.org/message-id/[email protected]
Discussion: https://www.postgresql.org/message-id/cak98qz3wzle-rzjn_y%[email protected]
---
src/backend/commands/async.c | 114 ++++++++++++++++++
src/backend/commands/vacuum.c | 7 ++
src/include/commands/async.h | 3 +
src/test/isolation/expected/async-notify.out | 33 ++++-
src/test/isolation/specs/async-notify.spec | 24 ++++
src/test/modules/xid_wraparound/meson.build | 1 +
.../xid_wraparound/t/004_notify_freeze.pl | 67 ++++++++++
7 files changed, 248 insertions(+), 1 deletion(-)
create mode 100644 src/test/modules/xid_wraparound/t/004_notify_freeze.pl
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 4bd37d5beb5..ba06234dc8e 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -2168,6 +2168,120 @@ asyncQueueAdvanceTail(void)
LWLockRelease(NotifyQueueTailLock);
}
+/*
+ * AsyncNotifyFreezeXids
+ *
+ * Prepare the async notification queue for CLOG truncation by freezing
+ * transaction IDs that are about to become inaccessible.
+ *
+ * This function is called by VACUUM before advancing datfrozenxid. It scans
+ * the notification queue and replaces XIDs that would become inaccessible
+ * after CLOG truncation with special markers:
+ * - Committed transactions are set to FrozenTransactionId
+ * - Aborted/crashed transactions are set to InvalidTransactionId
+ *
+ * Only XIDs < newFrozenXid are processed, as those are the ones whose CLOG
+ * pages will be truncated. If XID < newFrozenXid, it cannot still be running
+ * (or it would have held back newFrozenXid through ProcArray).
+ * Therefore, if TransactionIdDidCommit returns false, we know the transaction
+ * either aborted explicitly or crashed, and we can safely mark it invalid.
+ */
+void
+AsyncNotifyFreezeXids(TransactionId newFrozenXid)
+{
+ QueuePosition pos;
+ QueuePosition head;
+ int64 curpage = -1;
+ int slotno = -1;
+ char *page_buffer = NULL;
+ bool page_dirty = false;
+
+ /*
+ * Acquire locks in the correct order to avoid deadlocks. As per the
+ * locking protocol: NotifyQueueTailLock, then NotifyQueueLock, then SLRU
+ * bank locks.
+ *
+ * We only need SHARED mode since we're just reading the head/tail
+ * positions, not modifying them.
+ */
+ LWLockAcquire(NotifyQueueTailLock, LW_SHARED);
+ LWLockAcquire(NotifyQueueLock, LW_SHARED);
+
+ pos = QUEUE_TAIL;
+ head = QUEUE_HEAD;
+
+ /* Release NotifyQueueLock early, we only needed to read the positions */
+ LWLockRelease(NotifyQueueLock);
+
+ /*
+ * Scan the queue from tail to head, freezing XIDs as needed. We hold
+ * NotifyQueueTailLock throughout to ensure the tail doesn't move while
+ * we're working.
+ */
+ while (!QUEUE_POS_EQUAL(pos, head))
+ {
+ AsyncQueueEntry *qe;
+ TransactionId xid;
+ int64 pageno = QUEUE_POS_PAGE(pos);
+ int offset = QUEUE_POS_OFFSET(pos);
+
+ /* If we need a different page, release old lock and get new one */
+ if (pageno != curpage)
+ {
+ LWLock *lock;
+
+ /* Release previous page if any */
+ if (slotno >= 0)
+ {
+ if (page_dirty)
+ {
+ NotifyCtl->shared->page_dirty[slotno] = true;
+ page_dirty = false;
+ }
+ LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage));
+ }
+
+ lock = SimpleLruGetBankLock(NotifyCtl, pageno);
+ LWLockAcquire(lock, LW_EXCLUSIVE);
+ slotno = SimpleLruReadPage(NotifyCtl, pageno, true,
+ InvalidTransactionId);
+ page_buffer = NotifyCtl->shared->page_buffer[slotno];
+ curpage = pageno;
+ }
+
+ qe = (AsyncQueueEntry *) (page_buffer + offset);
+ xid = qe->xid;
+
+ if (TransactionIdIsNormal(xid) &&
+ TransactionIdPrecedes(xid, newFrozenXid))
+ {
+ if (TransactionIdDidCommit(xid))
+ {
+ qe->xid = FrozenTransactionId;
+ page_dirty = true;
+ }
+ else
+ {
+ qe->xid = InvalidTransactionId;
+ page_dirty = true;
+ }
+ }
+
+ /* Advance to next entry */
+ asyncQueueAdvance(&pos, qe->length);
+ }
+
+ /* Release final page lock if we acquired one */
+ if (slotno >= 0)
+ {
+ if (page_dirty)
+ NotifyCtl->shared->page_dirty[slotno] = true;
+ LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage));
+ }
+
+ LWLockRelease(NotifyQueueTailLock);
+}
+
/*
* ProcessIncomingNotify
*
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index ed03e3bd50d..e785dd55ce5 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -37,6 +37,7 @@
#include "catalog/namespace.h"
#include "catalog/pg_database.h"
#include "catalog/pg_inherits.h"
+#include "commands/async.h"
#include "commands/cluster.h"
#include "commands/defrem.h"
#include "commands/progress.h"
@@ -1941,6 +1942,12 @@ vac_truncate_clog(TransactionId frozenXID,
return;
}
+ /*
+ * Freeze any old transaction IDs in the async notification queue before
+ * CLOG truncation.
+ */
+ AsyncNotifyFreezeXids(frozenXID);
+
/*
* Advance the oldest value for commit timestamps before truncating, so
* that if a user requests a timestamp for a transaction we're truncating
diff --git a/src/include/commands/async.h b/src/include/commands/async.h
index f75c3df9556..aaec7314c10 100644
--- a/src/include/commands/async.h
+++ b/src/include/commands/async.h
@@ -46,4 +46,7 @@ extern void HandleNotifyInterrupt(void);
/* process interrupts */
extern void ProcessNotifyInterrupt(bool flush);
+/* freeze old transaction IDs in notify queue (called by VACUUM) */
+extern void AsyncNotifyFreezeXids(TransactionId newFrozenXid);
+
#endif /* ASYNC_H */
diff --git a/src/test/isolation/expected/async-notify.out b/src/test/isolation/expected/async-notify.out
index 556e1805893..20d5763f319 100644
--- a/src/test/isolation/expected/async-notify.out
+++ b/src/test/isolation/expected/async-notify.out
@@ -1,4 +1,4 @@
-Parsed test spec with 3 sessions
+Parsed test spec with 4 sessions
starting permutation: listenc notify1 notify2 notify3 notifyf
step listenc: LISTEN c1; LISTEN c2;
@@ -104,6 +104,37 @@ step l2commit: COMMIT;
listener2: NOTIFY "c1" with payload "" from notifier
step l2stop: UNLISTEN *;
+starting permutation: llisten n1begins n1select n1insert notify1 n2begins n2select n2insert n2notify1 n1commit n2commit notify1 lcheck
+step llisten: LISTEN c1; LISTEN c2;
+step n1begins: BEGIN ISOLATION LEVEL SERIALIZABLE;
+step n1select: SELECT * FROM t1;
+a
+-
+(0 rows)
+
+step n1insert: INSERT INTO t1 DEFAULT VALUES;
+step notify1: NOTIFY c1;
+step n2begins: BEGIN ISOLATION LEVEL SERIALIZABLE;
+step n2select: SELECT * FROM t1;
+a
+-
+(0 rows)
+
+step n2insert: INSERT INTO t1 DEFAULT VALUES;
+step n2notify1: NOTIFY c1, 'n2_payload';
+step n1commit: COMMIT;
+step n2commit: COMMIT;
+ERROR: could not serialize access due to read/write dependencies among transactions
+step notify1: NOTIFY c1;
+step lcheck: SELECT 1 AS x;
+x
+-
+1
+(1 row)
+
+listener: NOTIFY "c1" with payload "" from notifier
+listener: NOTIFY "c1" with payload "" from notifier
+
starting permutation: llisten lbegin usage bignotify usage
step llisten: LISTEN c1; LISTEN c2;
step lbegin: BEGIN;
diff --git a/src/test/isolation/specs/async-notify.spec b/src/test/isolation/specs/async-notify.spec
index 0b8cfd91083..51b7ad43849 100644
--- a/src/test/isolation/specs/async-notify.spec
+++ b/src/test/isolation/specs/async-notify.spec
@@ -5,6 +5,10 @@
# Note we assume that each step is delivered to the backend as a single Query
# message so it will run as one transaction.
+# t1 table is used for serializable conflict
+setup { CREATE TABLE t1 (a bigserial); }
+teardown { DROP TABLE t1; }
+
session notifier
step listenc { LISTEN c1; LISTEN c2; }
step notify1 { NOTIFY c1; }
@@ -33,8 +37,21 @@ step notifys1 {
}
step usage { SELECT pg_notification_queue_usage() > 0 AS nonzero; }
step bignotify { SELECT count(pg_notify('c1', s::text)) FROM generate_series(1, 1000) s; }
+step n1begins { BEGIN ISOLATION LEVEL SERIALIZABLE; }
+step n1select { SELECT * FROM t1; }
+step n1insert { INSERT INTO t1 DEFAULT VALUES; }
+step n1commit { COMMIT; }
teardown { UNLISTEN *; }
+# notifier2 session is used to reproduce serializable conflict with notifier
+
+session notifier2
+step n2begins { BEGIN ISOLATION LEVEL SERIALIZABLE; }
+step n2select { SELECT * FROM t1; }
+step n2insert { INSERT INTO t1 DEFAULT VALUES; }
+step n2commit { COMMIT; }
+step n2notify1 { NOTIFY c1, 'n2_payload'; }
+
# The listener session is used for cross-backend notify checks.
session listener
@@ -73,6 +90,13 @@ permutation listenc llisten notify1 notify2 notify3 notifyf lcheck
# and notify queue is not empty
permutation l2listen l2begin notify1 lbegins llisten lcommit l2commit l2stop
+# Test checks that listeners ignore notifications from aborted
+# transaction even if notifications have been added to the listen/notify
+# queue. To reproduce it we use the fact that serializable conflicts
+# are checked after tx adds notifications to the queue.
+
+permutation llisten n1begins n1select n1insert notify1 n2begins n2select n2insert n2notify1 n1commit n2commit notify1 lcheck
+
# Verify that pg_notification_queue_usage correctly reports a non-zero result,
# after submitting notifications while another connection is listening for
# those notifications and waiting inside an active transaction. We have to
diff --git a/src/test/modules/xid_wraparound/meson.build b/src/test/modules/xid_wraparound/meson.build
index f7dada67f67..3aec430df8c 100644
--- a/src/test/modules/xid_wraparound/meson.build
+++ b/src/test/modules/xid_wraparound/meson.build
@@ -30,6 +30,7 @@ tests += {
't/001_emergency_vacuum.pl',
't/002_limits.pl',
't/003_wraparounds.pl',
+ 't/004_notify_freeze.pl',
],
},
}
diff --git a/src/test/modules/xid_wraparound/t/004_notify_freeze.pl b/src/test/modules/xid_wraparound/t/004_notify_freeze.pl
new file mode 100644
index 00000000000..50824e7b5d7
--- /dev/null
+++ b/src/test/modules/xid_wraparound/t/004_notify_freeze.pl
@@ -0,0 +1,67 @@
+# Copyright (c) 2024-2025, PostgreSQL Global Development Group
+#
+# Test freezing the XIDs in the async notification queue
+#
+
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use Test::More;
+
+my $node = PostgreSQL::Test::Cluster->new('node');
+$node->init;
+$node->start;
+
+if (!$ENV{PG_TEST_EXTRA} || $ENV{PG_TEST_EXTRA} !~ /\bxid_wraparound\b/)
+{
+ plan skip_all => "test xid_wraparound not enabled in PG_TEST_EXTRA";
+}
+
+# Setup
+$node->safe_psql('postgres', 'CREATE EXTENSION xid_wraparound');
+$node->safe_psql('postgres',
+ 'ALTER DATABASE template0 WITH ALLOW_CONNECTIONS true');
+
+# --- Start Session 1 and leave it idle in transaction
+my $psql_session1 = $node->background_psql('postgres');
+$psql_session1->query_safe('listen s;', "Session 1 listens to 's'");
+$psql_session1->query_safe('begin;', "Session 1 starts a transaction");
+
+# --- Send multiple notify's from other sessions ---
+for my $i (1 .. 10)
+{
+ $node->safe_psql(
+ 'postgres', "
+ BEGIN;
+ NOTIFY s, '$i';
+ COMMIT;");
+}
+
+# Consume enough XIDs to trigger truncation, and one more with 'txid_current' to
+# bump up the freeze horizon.
+$node->safe_psql('postgres', 'select consume_xids(10000000);');
+$node->safe_psql('postgres', 'select txid_current()');
+
+# Remember current datfrozenxid before vacuum freeze to ensure that it is advanced.
+my $datafronzenxid = $node->safe_psql('postgres', "select datfrozenxid from pg_database where datname = 'postgres'");
+
+# Execute vacuum freeze on all databases
+$node->command_ok([ 'vacuumdb', '--all', '--freeze', '--port', $node->port ],
+ "vacuumdb --all --freeze");
+
+# Get the new datfrozenxid after vacuum freeze to ensure that is advanced.
+my $datafronzenxid_freeze = $node->safe_psql('postgres', "select datfrozenxid from pg_database where datname = 'postgres'");
+ok($datafronzenxid_freeze > $datafronzenxid, 'datfrozenxid is advanced');
+
+# On Session 1, commit and ensure that the all the notifications are received. (This depends
+# on correctly freezing the XIDs in the pending notification entries.)
+my $res = $psql_session1->query_safe('commit;', "commit listen s;");
+my $notifications_count = 0;
+foreach my $i (split('\n', $res))
+{
+ $notifications_count++;
+ like($i, qr/Asynchronous notification "s" with payload "$notifications_count" received/);
+}
+is($notifications_count, 10, 'received all committed notifications');
+
+done_testing();
--
2.47.3
From 7b5052f4b6add84e78b39ef59ac7cb26e315c238 Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <[email protected]>
Date: Fri, 31 Oct 2025 18:30:26 +0200
Subject: [PATCH 2/2] Hold SLRU bank lock across TransactionIdDidCommit in
NOTIFY processing
Per Tom Lane's idea
---
src/backend/commands/async.c | 101 ++++++++++++++++-------------------
1 file changed, 47 insertions(+), 54 deletions(-)
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index ba06234dc8e..b5dee75af48 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -448,7 +448,6 @@ static void SignalBackends(void);
static void asyncQueueReadAllNotifications(void);
static bool asyncQueueProcessPageEntries(volatile QueuePosition *current,
QueuePosition stop,
- char *page_buffer,
Snapshot snapshot);
static void asyncQueueAdvanceTail(void);
static void ProcessIncomingNotify(bool flush);
@@ -1854,13 +1853,6 @@ asyncQueueReadAllNotifications(void)
QueuePosition head;
Snapshot snapshot;
- /* page_buffer must be adequately aligned, so use a union */
- union
- {
- char buf[QUEUE_PAGESIZE];
- AsyncQueueEntry align;
- } page_buffer;
-
/* Fetch current state */
LWLockAcquire(NotifyQueueLock, LW_SHARED);
/* Assert checks that we have a valid state entry */
@@ -1932,37 +1924,6 @@ asyncQueueReadAllNotifications(void)
do
{
- int64 curpage = QUEUE_POS_PAGE(pos);
- int curoffset = QUEUE_POS_OFFSET(pos);
- int slotno;
- int copysize;
-
- /*
- * We copy the data from SLRU into a local buffer, so as to avoid
- * holding the SLRU lock while we are examining the entries and
- * possibly transmitting them to our frontend. Copy only the part
- * of the page we will actually inspect.
- */
- slotno = SimpleLruReadPage_ReadOnly(NotifyCtl, curpage,
- InvalidTransactionId);
- if (curpage == QUEUE_POS_PAGE(head))
- {
- /* we only want to read as far as head */
- copysize = QUEUE_POS_OFFSET(head) - curoffset;
- if (copysize < 0)
- copysize = 0; /* just for safety */
- }
- else
- {
- /* fetch all the rest of the page */
- copysize = QUEUE_PAGESIZE - curoffset;
- }
- memcpy(page_buffer.buf + curoffset,
- NotifyCtl->shared->page_buffer[slotno] + curoffset,
- copysize);
- /* Release lock that we got from SimpleLruReadPage_ReadOnly() */
- LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage));
-
/*
* Process messages up to the stop position, end of page, or an
* uncommitted message.
@@ -1978,9 +1939,7 @@ asyncQueueReadAllNotifications(void)
* rewrite pages under us. Especially we don't want to hold a lock
* while sending the notifications to the frontend.
*/
- reachedStop = asyncQueueProcessPageEntries(&pos, head,
- page_buffer.buf,
- snapshot);
+ reachedStop = asyncQueueProcessPageEntries(&pos, head, snapshot);
} while (!reachedStop);
}
PG_FINALLY();
@@ -2000,12 +1959,9 @@ asyncQueueReadAllNotifications(void)
* Fetch notifications from the shared queue, beginning at position current,
* and deliver relevant ones to my frontend.
*
- * The current page must have been fetched into page_buffer from shared
- * memory. (We could access the page right in shared memory, but that
- * would imply holding the SLRU bank lock throughout this routine.)
- *
- * We stop if we reach the "stop" position, or reach a notification from an
- * uncommitted transaction, or reach the end of the page.
+ * This function processes the notifications on one page, the page that
+ * 'current' points to. We stop if we reach the "stop" position, or reach a
+ * notification from an uncommitted transaction, or reach the end of the page.
*
* The function returns true once we have reached the stop position or an
* uncommitted notification, and false if we have finished with the page.
@@ -2015,16 +1971,35 @@ asyncQueueReadAllNotifications(void)
static bool
asyncQueueProcessPageEntries(volatile QueuePosition *current,
QueuePosition stop,
- char *page_buffer,
Snapshot snapshot)
{
+ int64 curpage = QUEUE_POS_PAGE(*current);
+ int slotno;
+ char *page_buffer;
bool reachedStop = false;
bool reachedEndOfPage;
- AsyncQueueEntry *qe;
+
+ /*
+ * We copy the entries into a local buffer, so as to avoid holding the
+ * SLRU lock while we transmit them to our frontend.
+ *
+ * The local buffer must be adequately aligned, so use a union.
+ */
+ union
+ {
+ char buf[QUEUE_PAGESIZE];
+ AsyncQueueEntry align;
+ } scratch;
+ char *scratch_end = scratch.buf;
+
+ slotno = SimpleLruReadPage_ReadOnly(NotifyCtl, curpage,
+ InvalidTransactionId);
+ page_buffer = NotifyCtl->shared->page_buffer[slotno];
do
{
QueuePosition thisentry = *current;
+ AsyncQueueEntry *qe;
if (QUEUE_POS_EQUAL(thisentry, stop))
break;
@@ -2073,10 +2048,8 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,
if (IsListeningOn(channel))
{
- /* payload follows channel name */
- char *payload = qe->data + strlen(channel) + 1;
-
- NotifyMyFrontEnd(channel, payload, qe->srcPid);
+ memcpy(scratch_end, qe, qe->length);
+ scratch_end += qe->length;
}
}
else
@@ -2091,6 +2064,26 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,
/* Loop back if we're not at end of page */
} while (!reachedEndOfPage);
+ /* Release lock that we got from SimpleLruReadPage_ReadOnly() */
+ LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage));
+
+ /*
+ * Now that we have let go of the SLRU bank lock, send the notifications
+ * to our backend
+ */
+ Assert(scratch_end - scratch.buf <= BLCKSZ);
+ for (char *p = scratch.buf; p < scratch_end;)
+ {
+ AsyncQueueEntry *qe = (AsyncQueueEntry *) p;
+ char *channel = qe->data;
+ /* payload follows channel name */
+ char *payload = qe->data + strlen(channel) + 1;
+
+ NotifyMyFrontEnd(channel, payload, qe->srcPid);
+
+ p += qe->length;
+ }
+
if (QUEUE_POS_EQUAL(*current, stop))
reachedStop = true;
--
2.47.3