From da210bfc2b62d9a38ea54b94037380144753663a Mon Sep 17 00:00:00 2001
From: alterego655 <824662526@qq.com>
Date: Tue, 25 Nov 2025 17:58:28 +0800
Subject: [PATCH v2 1/5] Extend xlogwait infrastructure with write and flush 
 wait types

Add support for waiting on WAL write and flush LSNs in addition to the
existing replay LSN wait type. This provides the foundation for
extending the WAIT FOR command with MODE parameter.

Key changes:
- Add WAIT_LSN_TYPE_WRITE_STANDBY and WAIT_LSN_TYPE_FLUSH_STANDBY to WaitLSNType
- Renamed enum values for consistency (WAIT_LSN_TYPE_REPLAY → WAIT_LSN_TYPE_REPLAY_STANDBY, etc.)
- Add GetCurrentLSNForWaitType() to retrieve current LSN
- Add new wait events WAIT_EVENT_WAIT_FOR_WAL_WRITE and
  WAIT_EVENT_WAIT_FOR_WAL_FLUSH for pg_stat_activity visibility
- Update WaitForLSN() to use GetCurrentLSNForWaitType() internally
---
 src/backend/access/transam/xlog.c             |  2 +-
 src/backend/access/transam/xlogrecovery.c     |  4 +-
 src/backend/access/transam/xlogwait.c         | 84 ++++++++++++++-----
 src/backend/commands/wait.c                   |  2 +-
 .../utils/activity/wait_event_names.txt       |  3 +-
 src/include/access/xlogwait.h                 | 13 ++-
 6 files changed, 81 insertions(+), 27 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 22d0a2e8c3a..4b145515269 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -6243,7 +6243,7 @@ StartupXLOG(void)
 	 * Wake up all waiters for replay LSN.  They need to report an error that
 	 * recovery was ended before reaching the target LSN.
 	 */
-	WaitLSNWakeup(WAIT_LSN_TYPE_REPLAY, InvalidXLogRecPtr);
+	WaitLSNWakeup(WAIT_LSN_TYPE_REPLAY_STANDBY, InvalidXLogRecPtr);
 
 	/*
 	 * Shutdown the recovery environment.  This must occur after
diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index 21b8f179ba0..243c0b368a9 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -1846,8 +1846,8 @@ PerformWalRecovery(void)
 			 */
 			if (waitLSNState &&
 				(XLogRecoveryCtl->lastReplayedEndRecPtr >=
-				 pg_atomic_read_u64(&waitLSNState->minWaitedLSN[WAIT_LSN_TYPE_REPLAY])))
-				WaitLSNWakeup(WAIT_LSN_TYPE_REPLAY, XLogRecoveryCtl->lastReplayedEndRecPtr);
+				 pg_atomic_read_u64(&waitLSNState->minWaitedLSN[WAIT_LSN_TYPE_REPLAY_STANDBY])))
+				WaitLSNWakeup(WAIT_LSN_TYPE_REPLAY_STANDBY, XLogRecoveryCtl->lastReplayedEndRecPtr);
 
 			/* Else, try to fetch the next WAL record */
 			record = ReadRecord(xlogprefetcher, LOG, false, replayTLI);
diff --git a/src/backend/access/transam/xlogwait.c b/src/backend/access/transam/xlogwait.c
index 98aa5f1e4a2..21823acee9c 100644
--- a/src/backend/access/transam/xlogwait.c
+++ b/src/backend/access/transam/xlogwait.c
@@ -12,25 +12,30 @@
  *		This file implements waiting for WAL operations to reach specific LSNs
  *		on both physical standby and primary servers. The core idea is simple:
  *		every process that wants to wait publishes the LSN it needs to the
- *		shared memory, and the appropriate process (startup on standby, or
- *		WAL writer/backend on primary) wakes it once that LSN has been reached.
+ *		shared memory, and the appropriate process (startup on standby,
+ *		walreceiver on standby, or WAL writer/backend on primary) wakes it
+ *		once that LSN has been reached.
  *
  *		The shared memory used by this module comprises a procInfos
  *		per-backend array with the information of the awaited LSN for each
  *		of the backend processes.  The elements of that array are organized
- *		into a pairing heap waitersHeap, which allows for very fast finding
- *		of the least awaited LSN.
+ *		into pairing heaps (waitersHeap), one for each WaitLSNType, which
+ *		allows for very fast finding of the least awaited LSN for each type.
  *
- *		In addition, the least-awaited LSN is cached as minWaitedLSN.  The
- *		waiter process publishes information about itself to the shared
- *		memory and waits on the latch until it is woken up by the appropriate
- *		process, standby is promoted, or the postmaster	dies.  Then, it cleans
- *		information about itself in the shared memory.
+ *		In addition, the least-awaited LSN for each type is cached in the
+ *		minWaitedLSN array.  The waiter process publishes information about
+ *		itself to the shared memory and waits on the latch until it is woken
+ *		up by the appropriate process, standby is promoted, or the postmaster
+ *		dies.  Then, it cleans information about itself in the shared memory.
  *
- *		On standby servers: After replaying a WAL record, the startup process
- *		first performs a fast path check minWaitedLSN > replayLSN.  If this
- *		check is negative, it checks waitersHeap and wakes up the backend
- *		whose awaited LSNs are reached.
+ *		On standby servers:
+ *		- After replaying a WAL record, the startup process performs a fast
+ *		  path check minWaitedLSN[REPLAY] > replayLSN.  If this check is
+ *		  negative, it checks waitersHeap[REPLAY] and wakes up the backends
+ *		  whose awaited LSNs are reached.
+ *		- After receiving WAL, the walreceiver process performs similar checks
+ *		  against the flush and write LSNs, waking up waiters in the FLUSH
+ *		  and WRITE heaps respectively.
  *
  *		On primary servers: After flushing WAL, the WAL writer or backend
  *		process performs a similar check against the flush LSN and wakes up
@@ -49,6 +54,7 @@
 #include "access/xlogwait.h"
 #include "miscadmin.h"
 #include "pgstat.h"
+#include "replication/walreceiver.h"
 #include "storage/latch.h"
 #include "storage/proc.h"
 #include "storage/shmem.h"
@@ -62,6 +68,48 @@ static int	waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b,
 
 struct WaitLSNState *waitLSNState = NULL;
 
+/*
+ * Wait event for each WaitLSNType, used with WaitLatch() to report
+ * the wait in pg_stat_activity.
+ */
+static const uint32 WaitLSNWaitEvents[] = {
+	[WAIT_LSN_TYPE_REPLAY_STANDBY] = WAIT_EVENT_WAIT_FOR_WAL_REPLAY,
+	[WAIT_LSN_TYPE_WRITE_STANDBY] = WAIT_EVENT_WAIT_FOR_WAL_WRITE,
+	[WAIT_LSN_TYPE_FLUSH_STANDBY] = WAIT_EVENT_WAIT_FOR_WAL_FLUSH,
+	[WAIT_LSN_TYPE_FLUSH_PRIMARY] = WAIT_EVENT_WAIT_FOR_WAL_FLUSH,
+};
+
+StaticAssertDecl(lengthof(WaitLSNWaitEvents) == WAIT_LSN_TYPE_COUNT,
+				 "WaitLSNWaitEvents must match WaitLSNType enum");
+
+/*
+ * Get the current LSN for the specified wait type.
+ */
+XLogRecPtr
+GetCurrentLSNForWaitType(WaitLSNType lsnType)
+{
+	switch (lsnType)
+	{
+		case WAIT_LSN_TYPE_REPLAY_STANDBY:
+			return GetXLogReplayRecPtr(NULL);
+
+		case WAIT_LSN_TYPE_WRITE_STANDBY:
+			return GetWalRcvWriteRecPtr();
+
+		case WAIT_LSN_TYPE_FLUSH_STANDBY:
+			return GetWalRcvFlushRecPtr(NULL, NULL);
+
+		case WAIT_LSN_TYPE_FLUSH_PRIMARY:
+			return GetFlushRecPtr(NULL);
+
+		case WAIT_LSN_TYPE_COUNT:
+			break;
+	}
+
+	elog(ERROR, "invalid LSN wait type: %d", lsnType);
+	pg_unreachable();
+}
+
 /* Report the amount of shared memory space needed for WaitLSNState. */
 Size
 WaitLSNShmemSize(void)
@@ -341,13 +389,11 @@ WaitForLSN(WaitLSNType lsnType, XLogRecPtr targetLSN, int64 timeout)
 		int			rc;
 		long		delay_ms = -1;
 
-		if (lsnType == WAIT_LSN_TYPE_REPLAY)
-			currentLSN = GetXLogReplayRecPtr(NULL);
-		else
-			currentLSN = GetFlushRecPtr(NULL);
+		/* Get current LSN for the wait type */
+		currentLSN = GetCurrentLSNForWaitType(lsnType);
 
 		/* Check that recovery is still in-progress */
-		if (lsnType == WAIT_LSN_TYPE_REPLAY && !RecoveryInProgress())
+		if (lsnType != WAIT_LSN_TYPE_FLUSH_PRIMARY && !RecoveryInProgress())
 		{
 			/*
 			 * Recovery was ended, but check if target LSN was already
@@ -376,7 +422,7 @@ WaitForLSN(WaitLSNType lsnType, XLogRecPtr targetLSN, int64 timeout)
 		CHECK_FOR_INTERRUPTS();
 
 		rc = WaitLatch(MyLatch, wake_events, delay_ms,
-					   (lsnType == WAIT_LSN_TYPE_REPLAY) ? WAIT_EVENT_WAIT_FOR_WAL_REPLAY : WAIT_EVENT_WAIT_FOR_WAL_FLUSH);
+					   WaitLSNWaitEvents[lsnType]);
 
 		/*
 		 * Emergency bailout if postmaster has died.  This is to avoid the
diff --git a/src/backend/commands/wait.c b/src/backend/commands/wait.c
index a37bddaefb2..43b37095afb 100644
--- a/src/backend/commands/wait.c
+++ b/src/backend/commands/wait.c
@@ -140,7 +140,7 @@ ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, DestReceiver *dest)
 	 */
 	Assert(MyProc->xmin == InvalidTransactionId);
 
-	waitLSNResult = WaitForLSN(WAIT_LSN_TYPE_REPLAY, lsn, timeout);
+	waitLSNResult = WaitForLSN(WAIT_LSN_TYPE_REPLAY_STANDBY, lsn, timeout);
 
 	/*
 	 * Process the result of WaitForLSN().  Throw appropriate error if needed.
diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt
index c1ac71ff7f2..fbcdb92dcfb 100644
--- a/src/backend/utils/activity/wait_event_names.txt
+++ b/src/backend/utils/activity/wait_event_names.txt
@@ -89,8 +89,9 @@ LIBPQWALRECEIVER_CONNECT	"Waiting in WAL receiver to establish connection to rem
 LIBPQWALRECEIVER_RECEIVE	"Waiting in WAL receiver to receive data from remote server."
 SSL_OPEN_SERVER	"Waiting for SSL while attempting connection."
 WAIT_FOR_STANDBY_CONFIRMATION	"Waiting for WAL to be received and flushed by the physical standby."
-WAIT_FOR_WAL_FLUSH	"Waiting for WAL flush to reach a target LSN on a primary."
+WAIT_FOR_WAL_FLUSH	"Waiting for WAL flush to reach a target LSN on a primary or standby."
 WAIT_FOR_WAL_REPLAY	"Waiting for WAL replay to reach a target LSN on a standby."
+WAIT_FOR_WAL_WRITE	"Waiting for WAL write to reach a target LSN on a standby."
 WAL_SENDER_WAIT_FOR_WAL	"Waiting for WAL to be flushed in WAL sender process."
 WAL_SENDER_WRITE_DATA	"Waiting for any activity when processing replies from WAL receiver in WAL sender process."
 
diff --git a/src/include/access/xlogwait.h b/src/include/access/xlogwait.h
index e607441d618..9721a7a7195 100644
--- a/src/include/access/xlogwait.h
+++ b/src/include/access/xlogwait.h
@@ -35,9 +35,15 @@ typedef enum
  */
 typedef enum WaitLSNType
 {
-	WAIT_LSN_TYPE_REPLAY = 0,	/* Waiting for replay on standby */
-	WAIT_LSN_TYPE_FLUSH = 1,	/* Waiting for flush on primary */
-	WAIT_LSN_TYPE_COUNT = 2
+	/* Standby wait types (walreceiver/startup wakes) */
+	WAIT_LSN_TYPE_REPLAY_STANDBY = 0,
+	WAIT_LSN_TYPE_WRITE_STANDBY = 1,
+	WAIT_LSN_TYPE_FLUSH_STANDBY = 2,
+
+	/* Primary wait types (WAL writer/backends wake) */
+	WAIT_LSN_TYPE_FLUSH_PRIMARY = 3,
+
+	WAIT_LSN_TYPE_COUNT = 4
 } WaitLSNType;
 
 /*
@@ -96,6 +102,7 @@ extern PGDLLIMPORT WaitLSNState *waitLSNState;
 
 extern Size WaitLSNShmemSize(void);
 extern void WaitLSNShmemInit(void);
+extern XLogRecPtr GetCurrentLSNForWaitType(WaitLSNType lsnType);
 extern void WaitLSNWakeup(WaitLSNType lsnType, XLogRecPtr currentLSN);
 extern void WaitLSNCleanup(void);
 extern WaitLSNResult WaitForLSN(WaitLSNType lsnType, XLogRecPtr targetLSN,
-- 
2.51.0

