From eb30d509886b4ff1a908b11d23fa46a5ad751e8b Mon Sep 17 00:00:00 2001
From: alterego655 <824662526@qq.com>
Date: Sun, 28 Sep 2025 18:44:54 +0800
Subject: [PATCH v4 2/2] Improve read_local_xlog_page_guts by replacing polling
  with latch-based waiting

Replace inefficient polling loops in read_local_xlog_page_guts with latch-based waiting
when WAL data is not yet available.  This eliminates CPU-intensive busy waiting and improves
responsiveness by waking processes immediately when their target LSN becomes available.
---
 src/backend/access/transam/xlog.c             |  20 +-
 src/backend/access/transam/xlogrecovery.c     |   4 +-
 src/backend/access/transam/xlogutils.c        |  48 ++-
 src/backend/access/transam/xlogwait.c         | 322 +++++++++++++-----
 src/backend/replication/walsender.c           |   4 -
 .../utils/activity/wait_event_names.txt       |   1 +
 src/include/access/xlogwait.h                 |  58 ++--
 7 files changed, 342 insertions(+), 115 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 36b8ac6b855..76c5ad7ae26 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -2913,6 +2913,15 @@ XLogFlush(XLogRecPtr record)
 	/* wake up walsenders now that we've released heavily contended locks */
 	WalSndWakeupProcessRequests(true, !RecoveryInProgress());
 
+	/*
+	 * If we flushed an LSN that someone was waiting for then walk
+	 * over the shared memory array and set latches to notify the
+	 * waiters.
+	 */
+	if (waitLSNState &&
+		(LogwrtResult.Flush >= pg_atomic_read_u64(&waitLSNState->minWaitedFlushLSN)))
+		WaitLSNWakeupFlush(LogwrtResult.Flush);
+
 	/*
 	 * If we still haven't flushed to the request point then we have a
 	 * problem; most likely, the requested flush point is past end of XLOG.
@@ -3095,6 +3104,15 @@ XLogBackgroundFlush(void)
 	/* wake up walsenders now that we've released heavily contended locks */
 	WalSndWakeupProcessRequests(true, !RecoveryInProgress());
 
+	/*
+	 * If we flushed an LSN that someone was waiting for then walk
+	 * over the shared memory array and set latches to notify the
+	 * waiters.
+	 */
+	if (waitLSNState &&
+		(LogwrtResult.Flush >= pg_atomic_read_u64(&waitLSNState->minWaitedFlushLSN)))
+		WaitLSNWakeupFlush(LogwrtResult.Flush);
+
 	/*
 	 * Great, done. To take some work off the critical path, try to initialize
 	 * as many of the no-longer-needed WAL buffers for future use as we can.
@@ -6227,7 +6245,7 @@ StartupXLOG(void)
 	 * Wake up all waiters for replay LSN.  They need to report an error that
 	 * recovery was ended before reaching the target LSN.
 	 */
-	WaitLSNWakeup(InvalidXLogRecPtr);
+	WaitLSNWakeupReplay(InvalidXLogRecPtr);
 
 	/*
 	 * Shutdown the recovery environment.  This must occur after
diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index 824b0942b34..1859d2084e8 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -1846,8 +1846,8 @@ PerformWalRecovery(void)
 			 */
 			if (waitLSNState &&
 				(XLogRecoveryCtl->lastReplayedEndRecPtr >=
-				 pg_atomic_read_u64(&waitLSNState->minWaitedLSN)))
-				WaitLSNWakeup(XLogRecoveryCtl->lastReplayedEndRecPtr);
+				 pg_atomic_read_u64(&waitLSNState->minWaitedReplayLSN)))
+				WaitLSNWakeupReplay(XLogRecoveryCtl->lastReplayedEndRecPtr);
 
 			/* Else, try to fetch the next WAL record */
 			record = ReadRecord(xlogprefetcher, LOG, false, replayTLI);
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index 38176d9688e..0ea02a45c6b 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -23,6 +23,7 @@
 #include "access/xlogrecovery.h"
 #include "access/xlog_internal.h"
 #include "access/xlogutils.h"
+#include "access/xlogwait.h"
 #include "miscadmin.h"
 #include "storage/fd.h"
 #include "storage/smgr.h"
@@ -880,12 +881,7 @@ read_local_xlog_page_guts(XLogReaderState *state, XLogRecPtr targetPagePtr,
 	loc = targetPagePtr + reqLen;
 
 	/*
-	 * Loop waiting for xlog to be available if necessary
-	 *
-	 * TODO: The walsender has its own version of this function, which uses a
-	 * condition variable to wake up whenever WAL is flushed. We could use the
-	 * same infrastructure here, instead of the check/sleep/repeat style of
-	 * loop.
+	 * Waiting for xlog to be available if necessary.
 	 */
 	while (1)
 	{
@@ -927,7 +923,6 @@ read_local_xlog_page_guts(XLogReaderState *state, XLogRecPtr targetPagePtr,
 
 		if (state->currTLI == currTLI)
 		{
-
 			if (loc <= read_upto)
 				break;
 
@@ -947,7 +942,44 @@ read_local_xlog_page_guts(XLogReaderState *state, XLogRecPtr targetPagePtr,
 			}
 
 			CHECK_FOR_INTERRUPTS();
-			pg_usleep(1000L);
+
+			/*
+			 * Wait for LSN using appropriate method based on server state.
+			 */
+			if (!RecoveryInProgress())
+			{
+				/* Primary: wait for flush */
+				WaitForLSNFlush(loc);
+			}
+			else
+			{
+				/* Standby: wait for replay */
+				WaitLSNResult result = WaitForLSNReplay(loc, 0);
+
+				switch (result)
+				{
+					case WAIT_LSN_RESULT_SUCCESS:
+						/* LSN was replayed, loop back to recheck timeline */
+						break;
+
+					case WAIT_LSN_RESULT_NOT_IN_RECOVERY:
+						/*
+						 * Promoted while waiting. This is the tricky case.
+						 * We're now a primary, so loop back and use flush
+						 * logic instead of replay logic.
+						 */
+						break;
+
+					default:
+						/* Shouldn't happen without timeout */
+						elog(ERROR, "unexpected wait result");
+				}
+			}
+
+			/*
+			 * Loop back to recheck everything.
+			 * Timeline might have changed during our wait.
+			 */
 		}
 		else
 		{
diff --git a/src/backend/access/transam/xlogwait.c b/src/backend/access/transam/xlogwait.c
index 4d831fbfa74..e0ac2620bd5 100644
--- a/src/backend/access/transam/xlogwait.c
+++ b/src/backend/access/transam/xlogwait.c
@@ -1,8 +1,8 @@
 /*-------------------------------------------------------------------------
  *
  * xlogwait.c
- *	  Implements waiting for the given replay LSN, which is used in
- *	  WAIT FOR lsn '...'
+ *	  Implements waiting for WAL operations to reach specific LSNs.
+ *	  Used by WAIT FOR lsn '...' and internal WAL reading operations.
  *
  * Copyright (c) 2025, PostgreSQL Global Development Group
  *
@@ -10,10 +10,11 @@
  *	  src/backend/access/transam/xlogwait.c
  *
  * NOTES
- *		This file implements waiting for the replay of the given LSN on a
- *		physical standby.  The core idea is very small: every backend that
- *		wants to wait publishes the LSN it needs to the shared memory, and
- *		the startup process wakes it once that LSN has been replayed.
+ *		This file implements waiting for WAL operations to reach specific LSNs
+ *		on both physical standby and primary servers. The core idea is simple:
+ *		every process that wants to wait publishes the LSN it needs to the
+ *		shared memory, and the appropriate process (startup on standby, or
+ *		WAL writer/backend on primary) wakes it once that LSN has been reached.
  *
  *		The shared memory used by this module comprises a procInfos
  *		per-backend array with the information of the awaited LSN for each
@@ -23,14 +24,18 @@
  *
  *		In addition, the least-awaited LSN is cached as minWaitedLSN.  The
  *		waiter process publishes information about itself to the shared
- *		memory and waits on the latch before it wakens up by a startup
+ *		memory and waits on the latch before it wakens up by the appropriate
  *		process, timeout is reached, standby is promoted, or the postmaster
  *		dies.  Then, it cleans information about itself in the shared memory.
  *
- *		After replaying a WAL record, the startup process first performs a
- *		fast path check minWaitedLSN > replayLSN.  If this check is negative,
- *		it checks waitersHeap and wakes up the backend whose awaited LSNs
- *		are reached.
+ *		On standby servers: After replaying a WAL record, the startup process
+ *		first performs a fast path check minWaitedLSN > replayLSN.  If this
+ *		check is negative, it checks waitersHeap and wakes up the backend
+ *		whose awaited LSNs are reached.
+ *
+ *		On primary servers: After flushing WAL, the WAL writer or backend
+ *		process performs a similar check against the flush LSN and wakes up
+ *		waiters whose target flush LSNs have been reached.
  *
  *-------------------------------------------------------------------------
  */
@@ -81,22 +86,46 @@ WaitLSNShmemInit(void)
 														  &found);
 	if (!found)
 	{
-		pg_atomic_init_u64(&waitLSNState->minWaitedLSN, PG_UINT64_MAX);
-		pairingheap_initialize(&waitLSNState->waitersHeap, waitlsn_cmp, NULL);
+		/* Initialize replay heap and tracking */
+		pg_atomic_init_u64(&waitLSNState->minWaitedReplayLSN, PG_UINT64_MAX);
+		pairingheap_initialize(&waitLSNState->replayWaitersHeap, waitlsn_cmp, (void *)(uintptr_t)WAIT_LSN_REPLAY);
+
+		/* Initialize flush heap and tracking */
+		pg_atomic_init_u64(&waitLSNState->minWaitedFlushLSN, PG_UINT64_MAX);
+		pairingheap_initialize(&waitLSNState->flushWaitersHeap, waitlsn_cmp, (void *)(uintptr_t)WAIT_LSN_FLUSH);
+
+		/* Initialize process info array */
 		memset(&waitLSNState->procInfos, 0,
 			   (MaxBackends + NUM_AUXILIARY_PROCS) * sizeof(WaitLSNProcInfo));
 	}
 }
 
 /*
- * Comparison function for waitReplayLSN->waitersHeap heap.  Waiting processes are
- * ordered by lsn, so that the waiter with smallest lsn is at the top.
+ * Comparison function for waiters heaps. Waiting processes are
+ * ordered by LSN, so that the waiter with smallest LSN is at the top.
+ * This function works for both replay and flush heaps.
  */
 static int
 waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg)
 {
-	const WaitLSNProcInfo *aproc = pairingheap_const_container(WaitLSNProcInfo, phNode, a);
-	const WaitLSNProcInfo *bproc = pairingheap_const_container(WaitLSNProcInfo, phNode, b);
+	const WaitLSNProcInfo *aproc;
+	const WaitLSNProcInfo *bproc;
+
+	/*
+	 * We need to determine which heap node we're comparing.
+	 * Since both heap nodes are at different offsets in WaitLSNProcInfo,
+	 * we use the arg parameter to distinguish between them.
+	 */
+	if ((uintptr_t)arg == WAIT_LSN_REPLAY)
+	{
+		aproc = pairingheap_const_container(WaitLSNProcInfo, replayHeapNode, a);
+		bproc = pairingheap_const_container(WaitLSNProcInfo, replayHeapNode, b);
+	}
+	else
+	{
+		aproc = pairingheap_const_container(WaitLSNProcInfo, flushHeapNode, a);
+		bproc = pairingheap_const_container(WaitLSNProcInfo, flushHeapNode, b);
+	}
 
 	if (aproc->waitLSN < bproc->waitLSN)
 		return 1;
@@ -107,65 +136,88 @@ waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg)
 }
 
 /*
- * Update waitReplayLSN->minWaitedLSN according to the current state of
- * waitReplayLSN->waitersHeap.
+ * Update minimum waited LSN for the specified operation type
  */
 static void
-updateMinWaitedLSN(void)
+updateMinWaitedLSN(WaitLSNOperation operation)
 {
-	XLogRecPtr	minWaitedLSN = PG_UINT64_MAX;
+	XLogRecPtr minWaitedLSN = PG_UINT64_MAX;
 
-	if (!pairingheap_is_empty(&waitLSNState->waitersHeap))
+	if (operation == WAIT_LSN_REPLAY)
 	{
-		pairingheap_node *node = pairingheap_first(&waitLSNState->waitersHeap);
-
-		minWaitedLSN = pairingheap_container(WaitLSNProcInfo, phNode, node)->waitLSN;
+		if (!pairingheap_is_empty(&waitLSNState->replayWaitersHeap))
+		{
+			pairingheap_node *node = pairingheap_first(&waitLSNState->replayWaitersHeap);
+			WaitLSNProcInfo *procInfo = pairingheap_container(WaitLSNProcInfo, replayHeapNode, node);
+			minWaitedLSN = procInfo->waitLSN;
+		}
+		pg_atomic_write_u64(&waitLSNState->minWaitedReplayLSN, minWaitedLSN);
+	}
+	else /* WAIT_LSN_FLUSH */
+	{
+		if (!pairingheap_is_empty(&waitLSNState->flushWaitersHeap))
+		{
+			pairingheap_node *node = pairingheap_first(&waitLSNState->flushWaitersHeap);
+			WaitLSNProcInfo *procInfo = pairingheap_container(WaitLSNProcInfo, flushHeapNode, node);
+			minWaitedLSN = procInfo->waitLSN;
+		}
+		pg_atomic_write_u64(&waitLSNState->minWaitedFlushLSN, minWaitedLSN);
 	}
-
-	pg_atomic_write_u64(&waitLSNState->minWaitedLSN, minWaitedLSN);
 }
 
 /*
- * Put the current process into the heap of LSN waiters.
+ * Add current process to appropriate waiters heap based on operation type
  */
 static void
-addLSNWaiter(XLogRecPtr lsn)
+addLSNWaiter(XLogRecPtr lsn, WaitLSNOperation operation)
 {
 	WaitLSNProcInfo *procInfo = &waitLSNState->procInfos[MyProcNumber];
 
 	LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
 
-	Assert(!procInfo->inHeap);
-
 	procInfo->procno = MyProcNumber;
 	procInfo->waitLSN = lsn;
 
-	pairingheap_add(&waitLSNState->waitersHeap, &procInfo->phNode);
-	procInfo->inHeap = true;
-	updateMinWaitedLSN();
+	if (operation == WAIT_LSN_REPLAY)
+	{
+		Assert(!procInfo->inReplayHeap);
+		pairingheap_add(&waitLSNState->replayWaitersHeap, &procInfo->replayHeapNode);
+		procInfo->inReplayHeap = true;
+		updateMinWaitedLSN(WAIT_LSN_REPLAY);
+	}
+	else /* WAIT_LSN_FLUSH */
+	{
+		Assert(!procInfo->inFlushHeap);
+		pairingheap_add(&waitLSNState->flushWaitersHeap, &procInfo->flushHeapNode);
+		procInfo->inFlushHeap = true;
+		updateMinWaitedLSN(WAIT_LSN_FLUSH);
+	}
 
 	LWLockRelease(WaitLSNLock);
 }
 
 /*
- * Remove the current process from the heap of LSN waiters if it's there.
+ * Remove current process from appropriate waiters heap based on operation type
  */
 static void
-deleteLSNWaiter(void)
+deleteLSNWaiter(WaitLSNOperation operation)
 {
 	WaitLSNProcInfo *procInfo = &waitLSNState->procInfos[MyProcNumber];
 
 	LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
 
-	if (!procInfo->inHeap)
+	if (operation == WAIT_LSN_REPLAY && procInfo->inReplayHeap)
 	{
-		LWLockRelease(WaitLSNLock);
-		return;
+		pairingheap_remove(&waitLSNState->replayWaitersHeap, &procInfo->replayHeapNode);
+		procInfo->inReplayHeap = false;
+		updateMinWaitedLSN(WAIT_LSN_REPLAY);
+	}
+	else if (operation == WAIT_LSN_FLUSH && procInfo->inFlushHeap)
+	{
+		pairingheap_remove(&waitLSNState->flushWaitersHeap, &procInfo->flushHeapNode);
+		procInfo->inFlushHeap = false;
+		updateMinWaitedLSN(WAIT_LSN_FLUSH);
 	}
-
-	pairingheap_remove(&waitLSNState->waitersHeap, &procInfo->phNode);
-	procInfo->inHeap = false;
-	updateMinWaitedLSN();
 
 	LWLockRelease(WaitLSNLock);
 }
@@ -177,7 +229,7 @@ deleteLSNWaiter(void)
 #define	WAKEUP_PROC_STATIC_ARRAY_SIZE (16)
 
 /*
- * Remove waiters whose LSN has been replayed from the heap and set their
+ * Remove waiters whose LSN has been reached from the heap and set their
  * latches.  If InvalidXLogRecPtr is given, remove all waiters from the heap
  * and set latches for all waiters.
  *
@@ -188,12 +240,18 @@ deleteLSNWaiter(void)
  * if there are more waiters, this function will loop to process them in
  * multiple chunks.
  */
-void
-WaitLSNWakeup(XLogRecPtr currentLSN)
+static void
+wakeupWaiters(WaitLSNOperation operation, XLogRecPtr currentLSN)
 {
-	int			i;
-	ProcNumber	wakeUpProcs[WAKEUP_PROC_STATIC_ARRAY_SIZE];
-	int			numWakeUpProcs;
+	ProcNumber wakeUpProcs[WAKEUP_PROC_STATIC_ARRAY_SIZE];
+	int numWakeUpProcs;
+	int i;
+	pairingheap *heap;
+
+	/* Select appropriate heap */
+	heap = (operation == WAIT_LSN_REPLAY) ?
+		   &waitLSNState->replayWaitersHeap :
+		   &waitLSNState->flushWaitersHeap;
 
 	do
 	{
@@ -201,35 +259,42 @@ WaitLSNWakeup(XLogRecPtr currentLSN)
 		LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
 
 		/*
-		 * Iterate the pairing heap of waiting processes till we find LSN not
-		 * yet replayed.  Record the process numbers to wake up, but to avoid
-		 * holding the lock for too long, send the wakeups only after
-		 * releasing the lock.
+		 * Iterate the waiters heap until we find LSN not yet reached.
+		 * Record process numbers to wake up, but send wakeups after releasing lock.
 		 */
-		while (!pairingheap_is_empty(&waitLSNState->waitersHeap))
+		while (!pairingheap_is_empty(heap))
 		{
-			pairingheap_node *node = pairingheap_first(&waitLSNState->waitersHeap);
-			WaitLSNProcInfo *procInfo = pairingheap_container(WaitLSNProcInfo, phNode, node);
+			pairingheap_node *node = pairingheap_first(heap);
+			WaitLSNProcInfo *procInfo;
 
-			if (!XLogRecPtrIsInvalid(currentLSN) &&
-				procInfo->waitLSN > currentLSN)
+			/* Get procInfo using appropriate heap node */
+			if (operation == WAIT_LSN_REPLAY)
+				procInfo = pairingheap_container(WaitLSNProcInfo, replayHeapNode, node);
+			else
+				procInfo = pairingheap_container(WaitLSNProcInfo, flushHeapNode, node);
+
+			if (!XLogRecPtrIsInvalid(currentLSN) && procInfo->waitLSN > currentLSN)
 				break;
 
 			Assert(numWakeUpProcs < WAKEUP_PROC_STATIC_ARRAY_SIZE);
 			wakeUpProcs[numWakeUpProcs++] = procInfo->procno;
-			(void) pairingheap_remove_first(&waitLSNState->waitersHeap);
-			procInfo->inHeap = false;
+			(void) pairingheap_remove_first(heap);
+
+			/* Update appropriate flag */
+			if (operation == WAIT_LSN_REPLAY)
+				procInfo->inReplayHeap = false;
+			else
+				procInfo->inFlushHeap = false;
 
 			if (numWakeUpProcs == WAKEUP_PROC_STATIC_ARRAY_SIZE)
 				break;
 		}
 
-		updateMinWaitedLSN();
-
+		updateMinWaitedLSN(operation);
 		LWLockRelease(WaitLSNLock);
 
 		/*
-		 * Set latches for processes, whose waited LSNs are already replayed.
+		 * Set latches for processes, whose waited LSNs are already reached.
 		 * As the time consuming operations, we do this outside of
 		 * WaitLSNLock. This is  actually fine because procLatch isn't ever
 		 * freed, so we just can potentially set the wrong process' (or no
@@ -238,25 +303,54 @@ WaitLSNWakeup(XLogRecPtr currentLSN)
 		for (i = 0; i < numWakeUpProcs; i++)
 			SetLatch(&GetPGProcByNumber(wakeUpProcs[i])->procLatch);
 
-		/* Need to recheck if there were more waiters than static array size. */
-	}
-	while (numWakeUpProcs == WAKEUP_PROC_STATIC_ARRAY_SIZE);
+	} while (numWakeUpProcs == WAKEUP_PROC_STATIC_ARRAY_SIZE);
+}
+
+/*
+ * Wake up processes waiting for replay LSN to reach currentLSN
+ */
+void
+WaitLSNWakeupReplay(XLogRecPtr currentLSN)
+{
+	/* Fast path check */
+	if (pg_atomic_read_u64(&waitLSNState->minWaitedReplayLSN) > currentLSN)
+		return;
+
+	wakeupWaiters(WAIT_LSN_REPLAY, currentLSN);
+}
+
+/*
+ * Wake up processes waiting for flush LSN to reach currentLSN
+ */
+void
+WaitLSNWakeupFlush(XLogRecPtr currentLSN)
+{
+	/* Fast path check */
+	if (pg_atomic_read_u64(&waitLSNState->minWaitedFlushLSN) > currentLSN)
+		return;
+
+	wakeupWaiters(WAIT_LSN_FLUSH, currentLSN);
 }
 
 /*
- * Delete our item from shmem array if any.
+ * Clean up LSN waiters for exiting process
  */
 void
 WaitLSNCleanup(void)
 {
-	/*
-	 * We do a fast-path check of the 'inHeap' flag without the lock.  This
-	 * flag is set to true only by the process itself.  So, it's only possible
-	 * to get a false positive.  But that will be eliminated by a recheck
-	 * inside deleteLSNWaiter().
-	 */
-	if (waitLSNState->procInfos[MyProcNumber].inHeap)
-		deleteLSNWaiter();
+	if (waitLSNState)
+	{
+		/*
+		 * We do a fast-path check of the heap flags without the lock.  These
+		 * flags are set to true only by the process itself.  So, it's only possible
+		 * to get a false positive.  But that will be eliminated by a recheck
+		 * inside deleteLSNWaiter().
+		 */
+		if (waitLSNState->procInfos[MyProcNumber].inReplayHeap)
+			deleteLSNWaiter(WAIT_LSN_REPLAY);
+		if (waitLSNState->procInfos[MyProcNumber].inFlushHeap)
+			deleteLSNWaiter(WAIT_LSN_FLUSH);
+	}
 }
 
 /*
@@ -308,11 +402,11 @@ WaitForLSNReplay(XLogRecPtr targetLSN, int64 timeout)
 	}
 
 	/*
-	 * Add our process to the pairing heap of waiters.  It might happen that
+	 * Add our process to the replay waiters heap.  It might happen that
 	 * target LSN gets replayed before we do.  Another check at the beginning
 	 * of the loop below prevents the race condition.
 	 */
-	addLSNWaiter(targetLSN);
+	addLSNWaiter(targetLSN, WAIT_LSN_REPLAY);
 
 	for (;;)
 	{
@@ -326,7 +420,7 @@ WaitForLSNReplay(XLogRecPtr targetLSN, int64 timeout)
 			 * Recovery was ended, but recheck if target LSN was already
 			 * replayed.  See the comment regarding deleteLSNWaiter() below.
 			 */
-			deleteLSNWaiter();
+			deleteLSNWaiter(WAIT_LSN_REPLAY);
 			currentLSN = GetXLogReplayRecPtr(NULL);
 			if (PromoteIsTriggered() && targetLSN <= currentLSN)
 				return WAIT_LSN_RESULT_SUCCESS;
@@ -372,11 +466,11 @@ WaitForLSNReplay(XLogRecPtr targetLSN, int64 timeout)
 	}
 
 	/*
-	 * Delete our process from the shared memory pairing heap.  We might
-	 * already be deleted by the startup process.  The 'inHeap' flag prevents
+	 * Delete our process from the shared memory replay heap.  We might
+	 * already be deleted by the startup process.  The 'inReplayHeap' flag prevents
 	 * us from the double deletion.
 	 */
-	deleteLSNWaiter();
+	deleteLSNWaiter(WAIT_LSN_REPLAY);
 
 	/*
 	 * If we didn't reach the target LSN, we must be exited by timeout.
@@ -386,3 +480,69 @@ WaitForLSNReplay(XLogRecPtr targetLSN, int64 timeout)
 
 	return WAIT_LSN_RESULT_SUCCESS;
 }
+
+/*
+ * Wait until targetLSN has been flushed on a primary server.
+ * Returns only after the condition is satisfied or on FATAL exit.
+ */
+void
+WaitForLSNFlush(XLogRecPtr targetLSN)
+{
+	XLogRecPtr	currentLSN;
+	int			wake_events = WL_LATCH_SET | WL_POSTMASTER_DEATH;
+
+	/* Shouldn't be called when shmem isn't initialized */
+	Assert(waitLSNState);
+
+	/* Should have a valid proc number */
+	Assert(MyProcNumber >= 0 && MyProcNumber < MaxBackends + NUM_AUXILIARY_PROCS);
+
+	/* We can only wait for flush when we are not in recovery */
+	Assert(!RecoveryInProgress());
+
+	/* Quick exit if already flushed */
+	currentLSN = GetFlushRecPtr(NULL);
+	if (targetLSN <= currentLSN)
+		return;
+
+	/* Add to flush waiters */
+	addLSNWaiter(targetLSN, WAIT_LSN_FLUSH);
+
+	/* Wait loop */
+	for (;;)
+	{
+		int			rc;
+
+		/* Check if the waited LSN has been flushed */
+		currentLSN = GetFlushRecPtr(NULL);
+		if (targetLSN <= currentLSN)
+			break;
+
+		CHECK_FOR_INTERRUPTS();
+
+		rc = WaitLatch(MyLatch, wake_events, -1,
+					   WAIT_EVENT_WAIT_FOR_WAL_FLUSH);
+
+		/*
+		 * Emergency bailout if postmaster has died. This is to avoid the
+		 * necessity for manual cleanup of all postmaster children.
+		 */
+		if (rc & WL_POSTMASTER_DEATH)
+			ereport(FATAL,
+					(errcode(ERRCODE_ADMIN_SHUTDOWN),
+					 errmsg("terminating connection due to unexpected postmaster exit"),
+					 errcontext("while waiting for LSN flush")));
+
+		if (rc & WL_LATCH_SET)
+			ResetLatch(MyLatch);
+	}
+
+	/*
+	 * Delete our process from the shared memory flush heap. We might
+	 * already be deleted by the waker process. The 'inFlushHeap' flag prevents
+	 * us from the double deletion.
+	 */
+	deleteLSNWaiter(WAIT_LSN_FLUSH);
+
+	return;
+}
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 59822f22b8d..9955e829190 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1022,10 +1022,6 @@ StartReplication(StartReplicationCmd *cmd)
 /*
  * XLogReaderRoutine->page_read callback for logical decoding contexts, as a
  * walsender process.
- *
- * Inside the walsender we can do better than read_local_xlog_page,
- * which has to do a plain sleep/busy loop, because the walsender's latch gets
- * set every time WAL is flushed.
  */
 static int
 logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt
index eb77924c4be..c1ac71ff7f2 100644
--- a/src/backend/utils/activity/wait_event_names.txt
+++ b/src/backend/utils/activity/wait_event_names.txt
@@ -89,6 +89,7 @@ LIBPQWALRECEIVER_CONNECT	"Waiting in WAL receiver to establish connection to rem
 LIBPQWALRECEIVER_RECEIVE	"Waiting in WAL receiver to receive data from remote server."
 SSL_OPEN_SERVER	"Waiting for SSL while attempting connection."
 WAIT_FOR_STANDBY_CONFIRMATION	"Waiting for WAL to be received and flushed by the physical standby."
+WAIT_FOR_WAL_FLUSH	"Waiting for WAL flush to reach a target LSN on a primary."
 WAIT_FOR_WAL_REPLAY	"Waiting for WAL replay to reach a target LSN on a standby."
 WAL_SENDER_WAIT_FOR_WAL	"Waiting for WAL to be flushed in WAL sender process."
 WAL_SENDER_WRITE_DATA	"Waiting for any activity when processing replies from WAL receiver in WAL sender process."
diff --git a/src/include/access/xlogwait.h b/src/include/access/xlogwait.h
index df8202528b9..f9c303a8c7f 100644
--- a/src/include/access/xlogwait.h
+++ b/src/include/access/xlogwait.h
@@ -30,49 +30,67 @@ typedef enum
 										 * wait */
 } WaitLSNResult;
 
+/*
+ * Wait operation types for LSN waiting facility.
+ */
+typedef enum WaitLSNOperation
+{
+	WAIT_LSN_REPLAY,	/* Waiting for replay on standby */
+	WAIT_LSN_FLUSH		/* Waiting for flush on primary */
+} WaitLSNOperation;
+
 /*
  * WaitLSNProcInfo - the shared memory structure representing information
- * about the single process, which may wait for LSN replay.  An item of
- * waitLSN->procInfos array.
+ * about the single process, which may wait for LSN operations.  An item of
+ * waitLSNState->procInfos array.
  */
 typedef struct WaitLSNProcInfo
 {
 	/* LSN, which this process is waiting for */
 	XLogRecPtr	waitLSN;
 
-	/* Process to wake up once the waitLSN is replayed */
+	/* Process to wake up once the waitLSN is reached */
 	ProcNumber	procno;
 
-	/*
-	 * A flag indicating that this item is present in
-	 * waitReplayLSNState->waitersHeap
-	 */
-	bool		inHeap;
+	/* Type-safe heap membership flags */
+	bool		inReplayHeap;	/* In replay waiters heap */
+	bool		inFlushHeap;	/* In flush waiters heap */
 
-	/*
-	 * A pairing heap node for participation in
-	 * waitReplayLSNState->waitersHeap
-	 */
-	pairingheap_node phNode;
+	/* Separate heap nodes for type safety */
+	pairingheap_node replayHeapNode;
+	pairingheap_node flushHeapNode;
 } WaitLSNProcInfo;
 
 /*
- * WaitLSNState - the shared memory state for the replay LSN waiting facility.
+ * WaitLSNState - the shared memory state for the LSN waiting facility.
  */
 typedef struct WaitLSNState
 {
 	/*
-	 * The minimum LSN value some process is waiting for.  Used for the
+	 * The minimum replay LSN value some process is waiting for.  Used for the
 	 * fast-path checking if we need to wake up any waiters after replaying a
 	 * WAL record.  Could be read lock-less.  Update protected by WaitLSNLock.
 	 */
-	pg_atomic_uint64 minWaitedLSN;
+	pg_atomic_uint64 minWaitedReplayLSN;
+
+	/*
+	 * A pairing heap of replay waiting processes ordered by LSN values (least LSN is
+	 * on top).  Protected by WaitLSNLock.
+	 */
+	pairingheap replayWaitersHeap;
+
+	/*
+	 * The minimum flush LSN value some process is waiting for.  Used for the
+	 * fast-path checking if we need to wake up any waiters after flushing
+	 * WAL.  Could be read lock-less.  Update protected by WaitLSNLock.
+	 */
+	pg_atomic_uint64 minWaitedFlushLSN;
 
 	/*
-	 * A pairing heap of waiting processes order by LSN values (least LSN is
+	 * A pairing heap of flush waiting processes ordered by LSN values (least LSN is
 	 * on top).  Protected by WaitLSNLock.
 	 */
-	pairingheap waitersHeap;
+	pairingheap flushWaitersHeap;
 
 	/*
 	 * An array with per-process information, indexed by the process number.
@@ -86,8 +104,10 @@ extern PGDLLIMPORT WaitLSNState *waitLSNState;
 
 extern Size WaitLSNShmemSize(void);
 extern void WaitLSNShmemInit(void);
-extern void WaitLSNWakeup(XLogRecPtr currentLSN);
+extern void WaitLSNWakeupReplay(XLogRecPtr currentLSN);
+extern void WaitLSNWakeupFlush(XLogRecPtr currentLSN);
 extern void WaitLSNCleanup(void);
 extern WaitLSNResult WaitForLSNReplay(XLogRecPtr targetLSN, int64 timeout);
+extern void WaitForLSNFlush(XLogRecPtr targetLSN);
 
 #endif							/* XLOG_WAIT_H */
-- 
2.51.0

