commit c7daefa51118d2041623b14b7f26c9177ac0b6cd
Author: Chao Li (Evan) <lic@highgo.com>
Date:   Sat Oct 25 15:42:26 2025 +0800

    fix race condition

diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 6e8d728e9ce..3c8a640ebed 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -250,6 +250,11 @@ typedef struct QueuePosition
 #define QUEUE_POS_EQUAL(x,y) \
 	((x).page == (y).page && (x).offset == (y).offset)
 
+/* returns true if x comes before y in queue order */
+#define QUEUE_POS_PRECEDES(x,y) \
+	(asyncQueuePagePrecedes((x).page, (y).page) || \
+	 ((x).page == (y).page && (x).offset < (y).offset))
+
 #define QUEUE_POS_IS_ZERO(x) \
 	((x).page == 0 && (x).offset == 0)
 
@@ -287,7 +292,9 @@ typedef struct QueueBackendStatus
 	Oid			dboid;			/* backend's database OID, or InvalidOid */
 	ProcNumber	nextListener;	/* id of next listener, or INVALID_PROC_NUMBER */
 	QueuePosition pos;			/* backend has read queue up to here */
+	QueuePosition advisoryPos;	/* advisory position for this backend */
 	bool		wakeupPending;	/* signal sent but not yet processed */
+	bool        awakening;	/* backend is awakening */
 } QueueBackendStatus;
 
 /*
@@ -348,7 +355,9 @@ static dshash_table *channelHash = NULL;
 #define QUEUE_BACKEND_DBOID(i)		(asyncQueueControl->backend[i].dboid)
 #define QUEUE_NEXT_LISTENER(i)		(asyncQueueControl->backend[i].nextListener)
 #define QUEUE_BACKEND_POS(i)		(asyncQueueControl->backend[i].pos)
+#define QUEUE_BACKEND_ADVISORY_POS(i)	(asyncQueueControl->backend[i].advisoryPos)
 #define QUEUE_BACKEND_WAKEUP_PENDING(i)	(asyncQueueControl->backend[i].wakeupPending)
+#define QUEUE_BACKEND_AWAKENING(i)	(asyncQueueControl->backend[i].awakening)
 
 /*
  * The SLRU buffer area through which we access the notification queue
@@ -675,7 +684,9 @@ AsyncShmemInit(void)
 			QUEUE_BACKEND_DBOID(i) = InvalidOid;
 			QUEUE_NEXT_LISTENER(i) = INVALID_PROC_NUMBER;
 			SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0);
+			SET_QUEUE_POS(QUEUE_BACKEND_ADVISORY_POS(i), 0, 0);
 			QUEUE_BACKEND_WAKEUP_PENDING(i) = false;
+			QUEUE_BACKEND_AWAKENING(i) = false;
 		}
 	}
 
@@ -1954,6 +1965,7 @@ SignalBackends(void)
 	ProcNumber *procnos;
 	int			count;
 	ListCell   *lc;
+	List *interestedProcs = NIL;
 
 	INJECTION_POINT("listen-notify-signal-backends", NULL);
 
@@ -2002,6 +2014,12 @@ SignalBackends(void)
 			int32		pid;
 			QueuePosition pos;
 
+			// XXX: Use a list to record listeners interested in any of the pending channels.
+			// List is not the best choice, so it we decide to take this apprach, we
+			// can optimize it later by using a hash or bitmap.
+			if (!list_member_int(interestedProcs, i))
+				interestedProcs = lappend_int(interestedProcs, i);
+
 			if (QUEUE_BACKEND_WAKEUP_PENDING(i))
 				continue;
 
@@ -2054,19 +2072,41 @@ SignalBackends(void)
 		int64		lag;
 		int32		pid;
 
-		if (QUEUE_BACKEND_WAKEUP_PENDING(i))
-			continue;
+		/* XXX we cannot rely on wakeupPending here, because the flag might be set by another notifier. */
+		//if (QUEUE_BACKEND_WAKEUP_PENDING(i))
+		//	continue;
 
 		pos = QUEUE_BACKEND_POS(i);
 
-		/* Direct advancement for idle backends at the old head */
-		if (pendingNotifies != NULL &&
-			QUEUE_POS_EQUAL(pos, queueHeadBeforeWrite))
+		if (!list_member_int(interestedProcs, i))
 		{
-			QUEUE_BACKEND_POS(i) = queueHeadAfterWrite;
-			pos = queueHeadAfterWrite;
+			/* Direct advancement for idle backends at the old head */
+			if (pendingNotifies != NULL)
+			{
+				if (QUEUE_BACKEND_AWAKENING(i))
+				{
+					// For awakening backend, advice a new position.
+					if ((QUEUE_POS_EQUAL(pos, queueHeadBeforeWrite) ||
+					     (QUEUE_POS_PRECEDES(queueHeadBeforeWrite, pos) && QUEUE_POS_PRECEDES(pos, queueHeadAfterWrite))) &&
+						 QUEUE_POS_EQUAL(QUEUE_BACKEND_ADVISORY_POS(i), queueHeadBeforeWrite))
+						QUEUE_BACKEND_ADVISORY_POS(i) = queueHeadAfterWrite;
+				}
+				else
+				{
+					// For non-awakening backend, directly advance its position.
+					if (QUEUE_POS_EQUAL(pos, queueHeadBeforeWrite))
+					{
+						QUEUE_BACKEND_POS(i) = queueHeadAfterWrite;
+						SET_QUEUE_POS(QUEUE_BACKEND_ADVISORY_POS(i), 0, 0);
+					}
+				}
+				pos = queueHeadAfterWrite;
+			}
 		}
 
+		if (QUEUE_BACKEND_WAKEUP_PENDING(i))
+			continue;
+
 		/* Signal backends that have fallen too far behind */
 		lag = asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_HEAD),
 								 QUEUE_POS_PAGE(pos));
@@ -2321,6 +2361,7 @@ asyncQueueReadAllNotifications(void)
 	/* Assert checks that we have a valid state entry */
 	Assert(MyProcPid == QUEUE_BACKEND_PID(MyProcNumber));
 	QUEUE_BACKEND_WAKEUP_PENDING(MyProcNumber) = false;
+	QUEUE_BACKEND_AWAKENING(MyProcNumber) = true;
 	pos = QUEUE_BACKEND_POS(MyProcNumber);
 	head = QUEUE_HEAD;
 	LWLockRelease(NotifyQueueLock);
@@ -2330,6 +2371,9 @@ asyncQueueReadAllNotifications(void)
 	if (QUEUE_POS_EQUAL(pos, head))
 	{
 		/* Nothing to do, we have read all notifications already. */
+		LWLockAcquire(NotifyQueueLock, LW_SHARED);
+		QUEUE_BACKEND_AWAKENING(MyProcNumber) = false;
+		LWLockRelease(NotifyQueueLock);
 		return;
 	}
 
@@ -2441,21 +2485,29 @@ asyncQueueReadAllNotifications(void)
 												   page_buffer.buf,
 												   snapshot);
 
-		/*
-		 * Update our position in shared memory.  The 'pos' variable now
-		 * holds our new position (advanced past all messages we just
-		 * processed).  This ensures that if we fail while processing
-		 * messages from the next page, we won't reprocess the ones we
-		 * just handled.  It also prevents us from overwriting any direct
-		 * advancement that another backend might have done while we were
-		 * processing messages.
-		 */
-		LWLockAcquire(NotifyQueueLock, LW_SHARED);
-		QUEUE_BACKEND_POS(MyProcNumber) = pos;
-		LWLockRelease(NotifyQueueLock);
-
+		// If there is a direct advancement, let's stop reading.
+		// We don't need to lock here because even if the position
+		// changes right after we read it, we just do one more loop.
+		//LWLockAcquire(NotifyQueueLock, LW_SHARED);
+		if (QUEUE_POS_PRECEDES(pos, QUEUE_BACKEND_ADVISORY_POS(MyProcNumber)))
+		{
+			reachedStop = true;
+		}
+		//QUEUE_BACKEND_POS(MyProcNumber) = pos;
+		//LWLockRelease(NotifyQueueLock);
 	} while (!reachedStop);
 
+	LWLockAcquire(NotifyQueueLock, LW_SHARED);
+	if (QUEUE_POS_PRECEDES(pos, QUEUE_BACKEND_ADVISORY_POS(MyProcNumber)))
+	{
+		/* respect direct advancement */
+		pos = QUEUE_BACKEND_ADVISORY_POS(MyProcNumber);
+	}
+	SET_QUEUE_POS(QUEUE_BACKEND_ADVISORY_POS(MyProcNumber), 0, 0);
+	QUEUE_BACKEND_POS(MyProcNumber) = pos;
+	QUEUE_BACKEND_AWAKENING(MyProcNumber) = false;
+	LWLockRelease(NotifyQueueLock);
+
 	/* Done with snapshot */
 	UnregisterSnapshot(snapshot);
 }
diff --git a/src/test/authentication/t/008_listen-pos-race.pl b/src/test/authentication/t/008_listen-pos-race.pl
index 060e33ed391..c858e8da524 100644
--- a/src/test/authentication/t/008_listen-pos-race.pl
+++ b/src/test/authentication/t/008_listen-pos-race.pl
@@ -8,7 +8,7 @@ use PostgreSQL::Test::Utils;
 use Time::HiRes qw(usleep);
 use Test::More;
 
-if ($ENV{enable_injection_points} ne 'yes') {
+if ($ENV{enable_injection_points} // '' ne 'yes') {
     plan skip_all => 'Injection points not supported by this build';
 }
 
@@ -18,6 +18,7 @@ $node->start;
 
 
 if (!$node->check_extension('injection_points')) {
+	$node->stop;
     plan skip_all => 'Extension injection_points not installed';
 }
 
