From 0ab676629c405a030603274872788c11e7508d59 Mon Sep 17 00:00:00 2001
From: Joel Jacobson <joel@compiler.org>
Date: Tue, 25 Nov 2025 15:42:58 +0100
Subject: [PATCH 3/3] Execute LISTEN/UNLISTEN during PreCommit

---
 src/backend/commands/async.c | 239 +++++++++++++++--------------------
 1 file changed, 100 insertions(+), 139 deletions(-)

diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index ea6ea9e09a8..36b83896363 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -438,10 +438,9 @@ static inline int64 asyncQueuePageDiff(int64 p, int64 q);
 static inline bool asyncQueuePagePrecedes(int64 p, int64 q);
 static void queue_listen(ListenActionKind action, const char *channel);
 static void Async_UnlistenOnExit(int code, Datum arg);
-static void Exec_ListenPreCommit(void);
-static void Exec_ListenCommit(const char *channel);
-static void Exec_UnlistenCommit(const char *channel);
-static void Exec_UnlistenAllCommit(void);
+static void Exec_Listen(const char *channel);
+static void Exec_Unlisten(const char *channel);
+static void Exec_UnlistenAll(void);
 static bool IsListeningOn(const char *channel);
 static void asyncQueueUnregister(void);
 static bool asyncQueueIsFull(void);
@@ -848,7 +847,7 @@ pg_listening_channels(PG_FUNCTION_ARGS)
 static void
 Async_UnlistenOnExit(int code, Datum arg)
 {
-	Exec_UnlistenAllCommit();
+	Exec_UnlistenAll();
 	asyncQueueUnregister();
 }
 
@@ -877,7 +876,7 @@ AtPrepare_Notify(void)
  *		If there are pending LISTEN actions, make sure we are listed in the
  *		shared-memory listener array.  This must happen before commit to
  *		ensure we don't miss any notifies from transactions that commit
- *		just after ours.
+ *		just after ours.  This function also update listenChannels.
  *
  *		If there are outbound notify requests in the pendingNotifies list,
  *		add them to the global queue.  We do that before commit so that
@@ -894,7 +893,7 @@ PreCommit_Notify(void)
 	if (Trace_notify)
 		elog(DEBUG1, "PreCommit_Notify");
 
-	/* Preflight for any pending listen/unlisten actions */
+	/* Perform any pending listen/unlisten actions */
 	if (pendingActions != NULL)
 	{
 		foreach(p, pendingActions->actions)
@@ -904,18 +903,22 @@ PreCommit_Notify(void)
 			switch (actrec->action)
 			{
 				case LISTEN_LISTEN:
-					Exec_ListenPreCommit();
+					Exec_Listen(actrec->channel);
 					break;
 				case LISTEN_UNLISTEN:
-					/* there is no Exec_UnlistenPreCommit() */
+					Exec_Unlisten(actrec->channel);
 					break;
 				case LISTEN_UNLISTEN_ALL:
-					/* there is no Exec_UnlistenAllPreCommit() */
+					Exec_UnlistenAll();
 					break;
 			}
 		}
 	}
 
+	/* If no longer listening to anything, get out of listener array */
+	if (amRegisteredListener && listenChannels == NIL)
+		asyncQueueUnregister();
+
 	/* Queue any pending notifies (must happen after the above) */
 	if (pendingNotifies)
 	{
@@ -1005,18 +1008,16 @@ PreCommit_Notify(void)
  *
  *		This is called at transaction commit, after committing to clog.
  *
- *		Update listenChannels and clear transaction-local state.
- *
  *		If we issued any notifications in the transaction, send signals to
  *		listening backends (possibly including ourselves) to process them.
  *		Also, if we filled enough queue pages with new notifies, try to
  *		advance the queue tail pointer.
+ *
+ *		Finally, clear transaction-local state.
  */
 void
 AtCommit_Notify(void)
 {
-	ListCell   *p;
-
 	/*
 	 * Allow transactions that have not executed LISTEN/UNLISTEN/NOTIFY to
 	 * return as soon as possible
@@ -1027,32 +1028,6 @@ AtCommit_Notify(void)
 	if (Trace_notify)
 		elog(DEBUG1, "AtCommit_Notify");
 
-	/* Perform any pending listen/unlisten actions */
-	if (pendingActions != NULL)
-	{
-		foreach(p, pendingActions->actions)
-		{
-			ListenAction *actrec = (ListenAction *) lfirst(p);
-
-			switch (actrec->action)
-			{
-				case LISTEN_LISTEN:
-					Exec_ListenCommit(actrec->channel);
-					break;
-				case LISTEN_UNLISTEN:
-					Exec_UnlistenCommit(actrec->channel);
-					break;
-				case LISTEN_UNLISTEN_ALL:
-					Exec_UnlistenAllCommit();
-					break;
-			}
-		}
-	}
-
-	/* If no longer listening to anything, get out of listener array */
-	if (amRegisteredListener && listenChannels == NIL)
-		asyncQueueUnregister();
-
 	/*
 	 * Send signals to listening backends.  We need do this only if there are
 	 * pending notifies, which were previously added to the shared queue by
@@ -1066,109 +1041,100 @@ AtCommit_Notify(void)
 }
 
 /*
- * Exec_ListenPreCommit --- subroutine for PreCommit_Notify
+ * Exec_Listen --- subroutine for PreCommit_Notify
  *
- * This function must make sure we are ready to catch any incoming messages.
+ * This function must make sure we are ready to catch any incoming messages,
+ * and adds the channel to the list of channels we are listening on.
  */
 static void
-Exec_ListenPreCommit(void)
+Exec_Listen(const char *channel)
 {
 	QueuePosition head;
 	QueuePosition max;
 	ProcNumber	prevListener;
+	MemoryContext oldcontext;
+
+	if (Trace_notify)
+		elog(DEBUG1, "Exec_Listen(%s,%d)", channel, MyProcPid);
 
 	/*
 	 * Nothing to do if we are already listening to something, nor if we
 	 * already ran this routine in this transaction.
 	 */
-	if (amRegisteredListener)
-		return;
-
-	if (Trace_notify)
-		elog(DEBUG1, "Exec_ListenPreCommit(%d)", MyProcPid);
-
-	/*
-	 * Before registering, make sure we will unlisten before dying. (Note:
-	 * this action does not get undone if we abort later.)
-	 */
-	if (!unlistenExitRegistered)
+	if (!amRegisteredListener)
 	{
-		before_shmem_exit(Async_UnlistenOnExit, 0);
-		unlistenExitRegistered = true;
-	}
+		/*
+		 * Before registering, make sure we will unlisten before dying. (Note:
+		 * this action does not get undone if we abort later.)
+		 */
+		if (!unlistenExitRegistered)
+		{
+			before_shmem_exit(Async_UnlistenOnExit, 0);
+			unlistenExitRegistered = true;
+		}
 
-	/*
-	 * This is our first LISTEN, so establish our pointer.
-	 *
-	 * We set our pointer to the global tail pointer and then move it forward
-	 * over already-committed notifications.  This ensures we cannot miss any
-	 * not-yet-committed notifications.  We might get a few more but that
-	 * doesn't hurt.
-	 *
-	 * In some scenarios there might be a lot of committed notifications that
-	 * have not yet been pruned away (because some backend is being lazy about
-	 * reading them).  To reduce our startup time, we can look at other
-	 * backends and adopt the maximum "pos" pointer of any backend that's in
-	 * our database; any notifications it's already advanced over are surely
-	 * committed and need not be re-examined by us.  (We must consider only
-	 * backends connected to our DB, because others will not have bothered to
-	 * check committed-ness of notifications in our DB.)
-	 *
-	 * We need exclusive lock here so we can look at other backends' entries
-	 * and manipulate the list links.
-	 */
-	LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
-	head = QUEUE_HEAD;
-	max = QUEUE_TAIL;
-	prevListener = INVALID_PROC_NUMBER;
-	for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i))
-	{
-		if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId)
-			max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i));
-		/* Also find last listening backend before this one */
-		if (i < MyProcNumber)
-			prevListener = i;
-	}
-	QUEUE_BACKEND_POS(MyProcNumber) = max;
-	QUEUE_BACKEND_PID(MyProcNumber) = MyProcPid;
-	QUEUE_BACKEND_DBOID(MyProcNumber) = MyDatabaseId;
-	/* Insert backend into list of listeners at correct position */
-	if (prevListener != INVALID_PROC_NUMBER)
-	{
-		QUEUE_NEXT_LISTENER(MyProcNumber) = QUEUE_NEXT_LISTENER(prevListener);
-		QUEUE_NEXT_LISTENER(prevListener) = MyProcNumber;
-	}
-	else
-	{
-		QUEUE_NEXT_LISTENER(MyProcNumber) = QUEUE_FIRST_LISTENER;
-		QUEUE_FIRST_LISTENER = MyProcNumber;
-	}
-	LWLockRelease(NotifyQueueLock);
+		/*
+		 * This is our first LISTEN, so establish our pointer.
+		 *
+		 * We set our pointer to the global tail pointer and then move it forward
+		 * over already-committed notifications.  This ensures we cannot miss any
+		 * not-yet-committed notifications.  We might get a few more but that
+		 * doesn't hurt.
+		 *
+		 * In some scenarios there might be a lot of committed notifications that
+		 * have not yet been pruned away (because some backend is being lazy about
+		 * reading them).  To reduce our startup time, we can look at other
+		 * backends and adopt the maximum "pos" pointer of any backend that's in
+		 * our database; any notifications it's already advanced over are surely
+		 * committed and need not be re-examined by us.  (We must consider only
+		 * backends connected to our DB, because others will not have bothered to
+		 * check committed-ness of notifications in our DB.)
+		 *
+		 * We need exclusive lock here so we can look at other backends' entries
+		 * and manipulate the list links.
+		 */
+		LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
+		head = QUEUE_HEAD;
+		max = QUEUE_TAIL;
+		prevListener = INVALID_PROC_NUMBER;
+		for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i))
+		{
+			if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId)
+				max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i));
+			/* Also find last listening backend before this one */
+			if (i < MyProcNumber)
+				prevListener = i;
+		}
+		QUEUE_BACKEND_POS(MyProcNumber) = max;
+		QUEUE_BACKEND_PID(MyProcNumber) = MyProcPid;
+		QUEUE_BACKEND_DBOID(MyProcNumber) = MyDatabaseId;
+		/* Insert backend into list of listeners at correct position */
+		if (prevListener != INVALID_PROC_NUMBER)
+		{
+			QUEUE_NEXT_LISTENER(MyProcNumber) = QUEUE_NEXT_LISTENER(prevListener);
+			QUEUE_NEXT_LISTENER(prevListener) = MyProcNumber;
+		}
+		else
+		{
+			QUEUE_NEXT_LISTENER(MyProcNumber) = QUEUE_FIRST_LISTENER;
+			QUEUE_FIRST_LISTENER = MyProcNumber;
+		}
+		LWLockRelease(NotifyQueueLock);
 
-	/* Now we are listed in the global array, so remember we're listening */
-	amRegisteredListener = true;
+		/* Now we are listed in the global array, so remember we're listening */
+		amRegisteredListener = true;
 
-	/*
-	 * Try to move our pointer forward as far as possible.  This will skip
-	 * over already-committed notifications, which we want to do because they
-	 * might be quite stale.  Note that we are not yet listening on anything,
-	 * so we won't deliver such notifications to our frontend.  Also, although
-	 * our transaction might have executed NOTIFY, those message(s) aren't
-	 * queued yet so we won't skip them here.
-	 */
-	if (!QUEUE_POS_EQUAL(max, head))
-		asyncQueueReadAllNotifications();
-}
-
-/*
- * Exec_ListenCommit --- subroutine for AtCommit_Notify
- *
- * Add the channel to the list of channels we are listening on.
- */
-static void
-Exec_ListenCommit(const char *channel)
-{
-	MemoryContext oldcontext;
+		/*
+		 * Try to move our pointer forward as far as possible.  This will skip
+		 * over already-committed notifications, which we want to do because they
+		 * might be quite stale.  Note that we are not yet listening on anything,
+		 * so we won't deliver such notifications to our frontend.  Also, although
+		 * our transaction might have executed NOTIFY, those message(s) aren't
+		 * queued yet so we won't skip them here.
+		 */
+		if (!QUEUE_POS_EQUAL(max, head))
+			asyncQueueReadAllNotifications();
+	}
 
 	/* Do nothing if we are already listening on this channel */
 	if (IsListeningOn(channel))
@@ -1176,11 +1142,6 @@ Exec_ListenCommit(const char *channel)
 
 	/*
 	 * Add the new channel name to listenChannels.
-	 *
-	 * XXX It is theoretically possible to get an out-of-memory failure here,
-	 * which would be bad because we already committed.  For the moment it
-	 * doesn't seem worth trying to guard against that, but maybe improve this
-	 * later.
 	 */
 	oldcontext = MemoryContextSwitchTo(TopMemoryContext);
 	listenChannels = lappend(listenChannels, pstrdup(channel));
@@ -1188,17 +1149,17 @@ Exec_ListenCommit(const char *channel)
 }
 
 /*
- * Exec_UnlistenCommit --- subroutine for AtCommit_Notify
+ * Exec_Unlisten --- subroutine for AtCommit_Notify
  *
  * Remove the specified channel name from listenChannels.
  */
 static void
-Exec_UnlistenCommit(const char *channel)
+Exec_Unlisten(const char *channel)
 {
 	ListCell   *q;
 
 	if (Trace_notify)
-		elog(DEBUG1, "Exec_UnlistenCommit(%s,%d)", channel, MyProcPid);
+		elog(DEBUG1, "Exec_Unlisten(%s,%d)", channel, MyProcPid);
 
 	foreach(q, listenChannels)
 	{
@@ -1219,15 +1180,15 @@ Exec_UnlistenCommit(const char *channel)
 }
 
 /*
- * Exec_UnlistenAllCommit --- subroutine for AtCommit_Notify
+ * Exec_UnlistenAll --- subroutine for PreCommit_Notify
  *
  *		Unlisten on all channels for this backend.
  */
 static void
-Exec_UnlistenAllCommit(void)
+Exec_UnlistenAll(void)
 {
 	if (Trace_notify)
-		elog(DEBUG1, "Exec_UnlistenAllCommit(%d)", MyProcPid);
+		elog(DEBUG1, "Exec_UnlistenAll(%d)", MyProcPid);
 
 	list_free_deep(listenChannels);
 	listenChannels = NIL;
@@ -1927,7 +1888,7 @@ asyncQueueReadAllNotifications(void)
 	 *
 	 * What we do guarantee is that we'll see all notifications from
 	 * transactions committing after the snapshot we take here.
-	 * Exec_ListenPreCommit has already added us to the listener array,
+	 * Exec_Listen has already added us to the listener array,
 	 * so no not-yet-committed messages can be removed from the queue
 	 * before we see them.
 	 *----------
-- 
2.50.1

