From 3ed2c16c7d4b03514d9828c96818a9b0ac6c279b Mon Sep 17 00:00:00 2001
From: Daniil Davidov <d.davydov@postgrespro.ru>
Date: Mon, 4 Aug 2025 17:49:23 +0700
Subject: [PATCH] Advance tail of async queue before updating datfrozenxid in
 vacuum

---
 src/backend/commands/async.c  |  9 ++++-----
 src/backend/commands/vacuum.c | 10 ++++++++++
 src/include/commands/async.h  |  2 ++
 3 files changed, 16 insertions(+), 5 deletions(-)

diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 4bd37d5beb5..5504132cf1b 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -450,7 +450,6 @@ static bool asyncQueueProcessPageEntries(volatile QueuePosition *current,
 										 QueuePosition stop,
 										 char *page_buffer,
 										 Snapshot snapshot);
-static void asyncQueueAdvanceTail(void);
 static void ProcessIncomingNotify(bool flush);
 static bool AsyncExistsPendingNotify(Notification *n);
 static void AddEventToPendingNotifies(Notification *n);
@@ -1025,7 +1024,7 @@ AtCommit_Notify(void)
 	if (tryAdvanceTail)
 	{
 		tryAdvanceTail = false;
-		asyncQueueAdvanceTail();
+		AsyncQueueAdvanceTail();
 	}
 
 	/* And clean up */
@@ -1483,7 +1482,7 @@ pg_notification_queue_usage(PG_FUNCTION_ARGS)
 	double		usage;
 
 	/* Advance the queue tail so we don't report a too-large result */
-	asyncQueueAdvanceTail();
+	AsyncQueueAdvanceTail();
 
 	LWLockAcquire(NotifyQueueLock, LW_SHARED);
 	usage = asyncQueueUsage();
@@ -2104,8 +2103,8 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,
  * This is (usually) called during CommitTransaction(), so it's important for
  * it to have very low probability of failure.
  */
-static void
-asyncQueueAdvanceTail(void)
+void
+AsyncQueueAdvanceTail(void)
 {
 	QueuePosition min;
 	int64		oldtailpage;
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index 733ef40ae7c..aedae9bf94d 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -37,6 +37,7 @@
 #include "catalog/namespace.h"
 #include "catalog/pg_database.h"
 #include "catalog/pg_inherits.h"
+#include "commands/async.h"
 #include "commands/cluster.h"
 #include "commands/defrem.h"
 #include "commands/progress.h"
@@ -1632,6 +1633,15 @@ vac_update_datfrozenxid(void)
 	 */
 	LockDatabaseFrozenIds(ExclusiveLock);
 
+	/*
+	 * Note, that messages in async queue store transaction id of their senders.
+	 * We must explicitly advance tail of async queue so that new channel
+	 * listeners don't have to process old messages (during
+	 * asyncQueueReadAllNotifications call), i.e. don't have to check their
+	 * transactions statuses (that might be already gone after clog truncate).
+	 */
+	AsyncQueueAdvanceTail();
+
 	/*
 	 * Initialize the "min" calculation with
 	 * GetOldestNonRemovableTransactionId(), which is a reasonable
diff --git a/src/include/commands/async.h b/src/include/commands/async.h
index f75c3df9556..47275f6f959 100644
--- a/src/include/commands/async.h
+++ b/src/include/commands/async.h
@@ -43,6 +43,8 @@ extern void AtPrepare_Notify(void);
 /* signal handler for inbound notifies (PROCSIG_NOTIFY_INTERRUPT) */
 extern void HandleNotifyInterrupt(void);
 
+extern void AsyncQueueAdvanceTail(void);
+
 /* process interrupts */
 extern void ProcessNotifyInterrupt(bool flush);
 
-- 
2.43.0

