Hi,

On Sat, Oct 11, 2025 at 11:02 AM Xuneng Zhou <[email protected]> wrote:
>
> Hi,
>
> The following is the split patch set. There are certain limitations to
> this simplification effort, particularly in patch 2. The
> read_local_xlog_page_guts callback demands more functionality from the
> facility than the WAIT FOR patch — specifically, it must wait for WAL
> flush events, though it does not require timeout handling. In some
> sense, parts of patch 3 can be viewed as a superset of the WAIT FOR
> patch, since it installs wake-up hooks in more locations. Unlike the
> WAIT FOR patch, which only needs wake-ups triggered by replay,
> read_local_xlog_page_guts must also handle wake-ups triggered by WAL
> flushes.
>
> Workload characteristics play a key role here. A sorted dlist performs
> well when insertions and removals occur in order, achieving O(1)
> complexity in the best case. In synchronous replication, insertion
> patterns seem generally monotonic with commit LSNs, though not
> strictly ordered due to timing variations and contention. When most
> insertions remain ordered, a dlist can be efficient. However, as the
> number of elements grows and out-of-order insertions become more
> frequent, the insertion cost can degrade to O(n) more often.
>
> By contrast, a pairing heap maintains stable O(1) insertion for both
> ordered and disordered inputs, with amortized O(log n) removals. Since
> LSNs in the WAIT FOR command are likely to arrive in a non-sequential
> fashion, the pairing heap introduced in v6 provides more predictable
> performance under such workloads.
>
> At this stage (v7), no consolidation between syncrep and xlogwait has
> been implemented. This is mainly because the dlist and pairing heap
> each works well under different workloads — neither is likely to be
> universally optimal. Introducing the facility with a pairing heap
> first seems reasonable, as it offers flexibility for future
> refactoring: we could later replace dlist with a heap or adopt a
> modular design depending on observed workload characteristics.
>

v8-0002 removed the early fast check before addLSNWaiter in WaitForLSNReplay,
as the likelihood of a server state change is small compared to the
branching cost and added code complexity.

Best,
Xuneng
From 32dab7ed64eecb62adce6b1d124b1fa389515e74 Mon Sep 17 00:00:00 2001
From: alterego655 <[email protected]>
Date: Fri, 10 Oct 2025 16:35:38 +0800
Subject: [PATCH v8 2/3] Add infrastructure for efficient LSN waiting

Implement a new facility that allows processes to wait for WAL to reach
specific LSNs, both on primary (waiting for flush) and standby (waiting
for replay) servers.

The implementation uses shared memory with per-backend information
organized into pairing heaps, allowing O(1) access to the minimum
waited LSN. This enables fast-path checks: after replaying or flushing
WAL, the startup process or WAL writer can quickly determine if any
waiters need to be awakened.

Key components:
- New xlogwait.c/h module with WaitForLSNReplay() and WaitForLSNFlush()
- Separate pairing heaps for replay and flush waiters
- WaitLSN lightweight lock for coordinating shared state
- Wait events WAIT_FOR_WAL_REPLAY and WAIT_FOR_WAL_FLUSH for monitoring

This infrastructure can be used by features that need to wait for WAL
operations to complete.

Discussion:
https://www.postgresql.org/message-id/flat/CAPpHfdsjtZLVzxjGT8rJHCYbM0D5dwkO+BBjcirozJ6nYbOW8Q@mail.gmail.com
https://www.postgresql.org/message-id/flat/CABPTF7UNft368x-RgOXkfj475OwEbp%2BVVO-wEXz7StgjD_%3D6sw%40mail.gmail.com

Author: Kartyshov Ivan <[email protected]>
Author: Alexander Korotkov <[email protected]>
Author: Xuneng Zhou <[email protected]>

Reviewed-by: Michael Paquier <[email protected]>
Reviewed-by: Peter Eisentraut <[email protected]>
Reviewed-by: Dilip Kumar <[email protected]>
Reviewed-by: Amit Kapila <[email protected]>
Reviewed-by: Alexander Lakhin <[email protected]>
Reviewed-by: Bharath Rupireddy <[email protected]>
Reviewed-by: Euler Taveira <[email protected]>
Reviewed-by: Heikki Linnakangas <[email protected]>
Reviewed-by: Kyotaro Horiguchi <[email protected]>
---
 src/backend/access/transam/Makefile           |   3 +-
 src/backend/access/transam/meson.build        |   1 +
 src/backend/access/transam/xlogwait.c         | 525 ++++++++++++++++++
 src/backend/storage/ipc/ipci.c                |   3 +
 .../utils/activity/wait_event_names.txt       |   3 +
 src/include/access/xlogwait.h                 | 112 ++++
 src/include/storage/lwlocklist.h              |   1 +
 7 files changed, 627 insertions(+), 1 deletion(-)
 create mode 100644 src/backend/access/transam/xlogwait.c
 create mode 100644 src/include/access/xlogwait.h

diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile
index 661c55a9db7..a32f473e0a2 100644
--- a/src/backend/access/transam/Makefile
+++ b/src/backend/access/transam/Makefile
@@ -36,7 +36,8 @@ OBJS = \
 	xlogreader.o \
 	xlogrecovery.o \
 	xlogstats.o \
-	xlogutils.o
+	xlogutils.o \
+	xlogwait.o
 
 include $(top_srcdir)/src/backend/common.mk
 
diff --git a/src/backend/access/transam/meson.build b/src/backend/access/transam/meson.build
index e8ae9b13c8e..74a62ab3eab 100644
--- a/src/backend/access/transam/meson.build
+++ b/src/backend/access/transam/meson.build
@@ -24,6 +24,7 @@ backend_sources += files(
   'xlogrecovery.c',
   'xlogstats.c',
   'xlogutils.c',
+  'xlogwait.c',
 )
 
 # used by frontend programs to build a frontend xlogreader
diff --git a/src/backend/access/transam/xlogwait.c b/src/backend/access/transam/xlogwait.c
new file mode 100644
index 00000000000..4faed65765c
--- /dev/null
+++ b/src/backend/access/transam/xlogwait.c
@@ -0,0 +1,525 @@
+/*-------------------------------------------------------------------------
+ *
+ * xlogwait.c
+ *	  Implements waiting for WAL operations to reach specific LSNs.
+ *	  Used by internal WAL reading operations.
+ *
+ * Copyright (c) 2025, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *	  src/backend/access/transam/xlogwait.c
+ *
+ * NOTES
+ *		This file implements waiting for WAL operations to reach specific LSNs
+ *		on both physical standby and primary servers. The core idea is simple:
+ *		every process that wants to wait publishes the LSN it needs to the
+ *		shared memory, and the appropriate process (startup on standby, or
+ *		WAL writer/backend on primary) wakes it once that LSN has been reached.
+ *
+ *		The shared memory used by this module comprises a procInfos
+ *		per-backend array with the information of the awaited LSN for each
+ *		of the backend processes.  The elements of that array are organized
+ *		into a pairing heap waitersHeap, which allows for very fast finding
+ *		of the least awaited LSN.
+ *
+ *		In addition, the least-awaited LSN is cached as minWaitedLSN.  The
+ *		waiter process publishes information about itself to the shared
+ *		memory and waits on the latch before it wakens up by the appropriate
+ *		process, standby is promoted, or the postmaster	dies.  Then, it cleans
+ *		information about itself in the shared memory.
+ *
+ *		On standby servers: After replaying a WAL record, the startup process
+ *		first performs a fast path check minWaitedLSN > replayLSN.  If this
+ *		check is negative, it checks waitersHeap and wakes up the backend
+ *		whose awaited LSNs are reached.
+ *
+ *		On primary servers: After flushing WAL, the WAL writer or backend
+ *		process performs a similar check against the flush LSN and wakes up
+ *		waiters whose target flush LSNs have been reached.
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include <float.h>
+#include <math.h>
+
+#include "access/xlog.h"
+#include "access/xlogrecovery.h"
+#include "access/xlogwait.h"
+#include "miscadmin.h"
+#include "pgstat.h"
+#include "storage/latch.h"
+#include "storage/proc.h"
+#include "storage/shmem.h"
+#include "utils/fmgrprotos.h"
+#include "utils/pg_lsn.h"
+#include "utils/snapmgr.h"
+
+
+static int	waitlsn_replay_cmp(const pairingheap_node *a, const pairingheap_node *b,
+						void *arg);
+
+static int	waitlsn_flush_cmp(const pairingheap_node *a, const pairingheap_node *b,
+						void *arg);
+
+struct WaitLSNState *waitLSNState = NULL;
+
+/* Report the amount of shared memory space needed for WaitLSNState. */
+Size
+WaitLSNShmemSize(void)
+{
+	Size		size;
+
+	size = offsetof(WaitLSNState, procInfos);
+	size = add_size(size, mul_size(MaxBackends + NUM_AUXILIARY_PROCS, sizeof(WaitLSNProcInfo)));
+	return size;
+}
+
+/* Initialize the WaitLSNState in the shared memory. */
+void
+WaitLSNShmemInit(void)
+{
+	bool		found;
+
+	waitLSNState = (WaitLSNState *) ShmemInitStruct("WaitLSNState",
+														  WaitLSNShmemSize(),
+														  &found);
+	if (!found)
+	{
+		/* Initialize replay heap and tracking */
+		pg_atomic_init_u64(&waitLSNState->minWaitedReplayLSN, PG_UINT64_MAX);
+		pairingheap_initialize(&waitLSNState->replayWaitersHeap, waitlsn_replay_cmp, (void *)(uintptr_t)WAIT_LSN_REPLAY);
+
+		/* Initialize flush heap and tracking */
+		pg_atomic_init_u64(&waitLSNState->minWaitedFlushLSN, PG_UINT64_MAX);
+		pairingheap_initialize(&waitLSNState->flushWaitersHeap, waitlsn_flush_cmp, (void *)(uintptr_t)WAIT_LSN_FLUSH);
+
+		/* Initialize process info array */
+		memset(&waitLSNState->procInfos, 0,
+			   (MaxBackends + NUM_AUXILIARY_PROCS) * sizeof(WaitLSNProcInfo));
+	}
+}
+
+/*
+ * Comparison function for replay waiters heaps. Waiting processes are
+ * ordered by LSN, so that the waiter with smallest LSN is at the top.
+ */
+static int
+waitlsn_replay_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg)
+{
+	const WaitLSNProcInfo *aproc = pairingheap_const_container(WaitLSNProcInfo, replayHeapNode, a);
+	const WaitLSNProcInfo *bproc = pairingheap_const_container(WaitLSNProcInfo, replayHeapNode, b);
+
+	if (aproc->waitLSN < bproc->waitLSN)
+		return 1;
+	else if (aproc->waitLSN > bproc->waitLSN)
+		return -1;
+	else
+		return 0;
+}
+
+/*
+ * Comparison function for flush waiters heaps. Waiting processes are
+ * ordered by LSN, so that the waiter with smallest LSN is at the top.
+ */
+static int
+waitlsn_flush_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg)
+{
+	const WaitLSNProcInfo *aproc = pairingheap_const_container(WaitLSNProcInfo, flushHeapNode, a);
+	const WaitLSNProcInfo *bproc = pairingheap_const_container(WaitLSNProcInfo, flushHeapNode, b);
+
+	if (aproc->waitLSN < bproc->waitLSN)
+		return 1;
+	else if (aproc->waitLSN > bproc->waitLSN)
+		return -1;
+	else
+		return 0;
+}
+
+/*
+ * Update minimum waited LSN for the specified operation type
+ */
+static void
+updateMinWaitedLSN(WaitLSNOperation operation)
+{
+	XLogRecPtr minWaitedLSN = PG_UINT64_MAX;
+
+	if (operation == WAIT_LSN_REPLAY)
+	{
+		if (!pairingheap_is_empty(&waitLSNState->replayWaitersHeap))
+		{
+			pairingheap_node *node = pairingheap_first(&waitLSNState->replayWaitersHeap);
+			WaitLSNProcInfo *procInfo = pairingheap_container(WaitLSNProcInfo, replayHeapNode, node);
+			minWaitedLSN = procInfo->waitLSN;
+		}
+		pg_atomic_write_u64(&waitLSNState->minWaitedReplayLSN, minWaitedLSN);
+	}
+	else /* WAIT_LSN_FLUSH */
+	{
+		if (!pairingheap_is_empty(&waitLSNState->flushWaitersHeap))
+		{
+			pairingheap_node *node = pairingheap_first(&waitLSNState->flushWaitersHeap);
+			WaitLSNProcInfo *procInfo = pairingheap_container(WaitLSNProcInfo, flushHeapNode, node);
+			minWaitedLSN = procInfo->waitLSN;
+		}
+		pg_atomic_write_u64(&waitLSNState->minWaitedFlushLSN, minWaitedLSN);
+	}
+}
+
+/*
+ * Add current process to appropriate waiters heap based on operation type
+ */
+static void
+addLSNWaiter(XLogRecPtr lsn, WaitLSNOperation operation)
+{
+	WaitLSNProcInfo *procInfo = &waitLSNState->procInfos[MyProcNumber];
+
+	LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
+
+	procInfo->procno = MyProcNumber;
+	procInfo->waitLSN = lsn;
+
+	if (operation == WAIT_LSN_REPLAY)
+	{
+		Assert(!procInfo->inReplayHeap);
+		pairingheap_add(&waitLSNState->replayWaitersHeap, &procInfo->replayHeapNode);
+		procInfo->inReplayHeap = true;
+		updateMinWaitedLSN(WAIT_LSN_REPLAY);
+	}
+	else /* WAIT_LSN_FLUSH */
+	{
+		Assert(!procInfo->inFlushHeap);
+		pairingheap_add(&waitLSNState->flushWaitersHeap, &procInfo->flushHeapNode);
+		procInfo->inFlushHeap = true;
+		updateMinWaitedLSN(WAIT_LSN_FLUSH);
+	}
+
+	LWLockRelease(WaitLSNLock);
+}
+
+/*
+ * Remove current process from appropriate waiters heap based on operation type
+ */
+static void
+deleteLSNWaiter(WaitLSNOperation operation)
+{
+	WaitLSNProcInfo *procInfo = &waitLSNState->procInfos[MyProcNumber];
+
+	LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
+
+	if (operation == WAIT_LSN_REPLAY && procInfo->inReplayHeap)
+	{
+		pairingheap_remove(&waitLSNState->replayWaitersHeap, &procInfo->replayHeapNode);
+		procInfo->inReplayHeap = false;
+		updateMinWaitedLSN(WAIT_LSN_REPLAY);
+	}
+	else if (operation == WAIT_LSN_FLUSH && procInfo->inFlushHeap)
+	{
+		pairingheap_remove(&waitLSNState->flushWaitersHeap, &procInfo->flushHeapNode);
+		procInfo->inFlushHeap = false;
+		updateMinWaitedLSN(WAIT_LSN_FLUSH);
+	}
+
+	LWLockRelease(WaitLSNLock);
+}
+
+/*
+ * Size of a static array of procs to wakeup by WaitLSNWakeup() allocated
+ * on the stack.  It should be enough to take single iteration for most cases.
+ */
+#define	WAKEUP_PROC_STATIC_ARRAY_SIZE (16)
+
+/*
+ * Remove waiters whose LSN has been reached from the heap and set their
+ * latches.  If InvalidXLogRecPtr is given, remove all waiters from the heap
+ * and set latches for all waiters.
+ *
+ * This function first accumulates waiters to wake up into an array, then
+ * wakes them up without holding a WaitLSNLock.  The array size is static and
+ * equal to WAKEUP_PROC_STATIC_ARRAY_SIZE.  That should be more than enough
+ * to wake up all the waiters at once in the vast majority of cases.  However,
+ * if there are more waiters, this function will loop to process them in
+ * multiple chunks.
+ */
+static void
+wakeupWaiters(WaitLSNOperation operation, XLogRecPtr currentLSN)
+{
+	ProcNumber wakeUpProcs[WAKEUP_PROC_STATIC_ARRAY_SIZE];
+	int numWakeUpProcs;
+	int i;
+	pairingheap *heap;
+
+	/* Select appropriate heap */
+	heap = (operation == WAIT_LSN_REPLAY) ?
+		   &waitLSNState->replayWaitersHeap :
+		   &waitLSNState->flushWaitersHeap;
+
+	do
+	{
+		numWakeUpProcs = 0;
+		LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
+
+		/*
+		 * Iterate the waiters heap until we find LSN not yet reached.
+		 * Record process numbers to wake up, but send wakeups after releasing lock.
+		 */
+		while (!pairingheap_is_empty(heap))
+		{
+			pairingheap_node *node = pairingheap_first(heap);
+			WaitLSNProcInfo *procInfo;
+
+			/* Get procInfo using appropriate heap node */
+			if (operation == WAIT_LSN_REPLAY)
+				procInfo = pairingheap_container(WaitLSNProcInfo, replayHeapNode, node);
+			else
+				procInfo = pairingheap_container(WaitLSNProcInfo, flushHeapNode, node);
+
+			if (!XLogRecPtrIsInvalid(currentLSN) && procInfo->waitLSN > currentLSN)
+				break;
+
+			Assert(numWakeUpProcs < WAKEUP_PROC_STATIC_ARRAY_SIZE);
+			wakeUpProcs[numWakeUpProcs++] = procInfo->procno;
+			(void) pairingheap_remove_first(heap);
+
+			/* Update appropriate flag */
+			if (operation == WAIT_LSN_REPLAY)
+				procInfo->inReplayHeap = false;
+			else
+				procInfo->inFlushHeap = false;
+
+			if (numWakeUpProcs == WAKEUP_PROC_STATIC_ARRAY_SIZE)
+				break;
+		}
+
+		updateMinWaitedLSN(operation);
+		LWLockRelease(WaitLSNLock);
+
+		/*
+		 * Set latches for processes, whose waited LSNs are already reached.
+		 * As the time consuming operations, we do this outside of
+		 * WaitLSNLock. This is  actually fine because procLatch isn't ever
+		 * freed, so we just can potentially set the wrong process' (or no
+		 * process') latch.
+		 */
+		for (i = 0; i < numWakeUpProcs; i++)
+			SetLatch(&GetPGProcByNumber(wakeUpProcs[i])->procLatch);
+
+	} while (numWakeUpProcs == WAKEUP_PROC_STATIC_ARRAY_SIZE);
+}
+
+/*
+ * Wake up processes waiting for replay LSN to reach currentLSN
+ */
+void
+WaitLSNWakeupReplay(XLogRecPtr currentLSN)
+{
+	/* Fast path check */
+	if (pg_atomic_read_u64(&waitLSNState->minWaitedReplayLSN) > currentLSN)
+		return;
+
+	wakeupWaiters(WAIT_LSN_REPLAY, currentLSN);
+}
+
+/*
+ * Wake up processes waiting for flush LSN to reach currentLSN
+ */
+void
+WaitLSNWakeupFlush(XLogRecPtr currentLSN)
+{
+	/* Fast path check */
+	if (pg_atomic_read_u64(&waitLSNState->minWaitedFlushLSN) > currentLSN)
+		return;
+
+	wakeupWaiters(WAIT_LSN_FLUSH, currentLSN);
+}
+
+/*
+ * Clean up LSN waiters for exiting process
+ */
+void
+WaitLSNCleanup(void)
+{
+	if (waitLSNState)
+	{
+		/*
+		 * We do a fast-path check of the heap flags without the lock.  These
+		 * flags are set to true only by the process itself.  So, it's only possible
+		 * to get a false positive.  But that will be eliminated by a recheck
+		 * inside deleteLSNWaiter().
+		 */
+		if (waitLSNState->procInfos[MyProcNumber].inReplayHeap)
+			deleteLSNWaiter(WAIT_LSN_REPLAY);
+		if (waitLSNState->procInfos[MyProcNumber].inFlushHeap)
+			deleteLSNWaiter(WAIT_LSN_FLUSH);
+	}
+}
+
+/*
+ * Wait using MyLatch till the given LSN is replayed, the replica gets
+ * promoted, or the postmaster dies.
+ *
+ * Returns WAIT_LSN_RESULT_SUCCESS if target LSN was replayed.
+ * Returns WAIT_LSN_RESULT_NOT_IN_RECOVERY if run not in recovery,
+ * or replica got promoted before the target LSN replayed.
+ */
+WaitLSNResult
+WaitForLSNReplay(XLogRecPtr targetLSN)
+{
+	XLogRecPtr	currentLSN;
+	int			wake_events = WL_LATCH_SET | WL_POSTMASTER_DEATH;
+
+	/* Shouldn't be called when shmem isn't initialized */
+	Assert(waitLSNState);
+
+	for (;;)
+	{
+		int			rc;
+		long		delay_ms = 0;
+
+		/* Recheck that recovery is still in-progress */
+		if (!RecoveryInProgress())
+		{
+			/*
+			 * Recovery was ended, but recheck if target LSN was already
+			 * replayed.  See the comment regarding deleteLSNWaiter() below.
+			 */
+			deleteLSNWaiter(WAIT_LSN_REPLAY);
+			currentLSN = GetXLogReplayRecPtr(NULL);
+			if (PromoteIsTriggered() && targetLSN <= currentLSN)
+				return WAIT_LSN_RESULT_SUCCESS;
+			return WAIT_LSN_RESULT_NOT_IN_RECOVERY;
+		}
+		else
+		{
+			/* Check if the waited LSN has been replayed */
+			currentLSN = GetXLogReplayRecPtr(NULL);
+			if (targetLSN <= currentLSN)
+				break;
+		}
+
+		CHECK_FOR_INTERRUPTS();
+
+		rc = WaitLatch(MyLatch, wake_events, delay_ms,
+					   WAIT_EVENT_WAIT_FOR_WAL_REPLAY);
+
+		/*
+		 * Emergency bailout if postmaster has died.  This is to avoid the
+		 * necessity for manual cleanup of all postmaster children.
+		 */
+		if (rc & WL_POSTMASTER_DEATH)
+			ereport(FATAL,
+					(errcode(ERRCODE_ADMIN_SHUTDOWN),
+					 errmsg("terminating connection due to unexpected postmaster exit"),
+					 errcontext("while waiting for LSN replay")));
+
+		if (rc & WL_LATCH_SET)
+			ResetLatch(MyLatch);
+	}
+
+	/*
+	 * Delete our process from the shared memory replay heap.  We might
+	 * already be deleted by the startup process.  The 'inReplayHeap' flag prevents
+	 * us from the double deletion.
+	 */
+	deleteLSNWaiter(WAIT_LSN_REPLAY);
+
+	return WAIT_LSN_RESULT_SUCCESS;
+}
+
+/*
+ * Wait until targetLSN has been flushed on a primary server.
+ * Returns only after the condition is satisfied or on FATAL exit.
+ */
+void
+WaitForLSNFlush(XLogRecPtr targetLSN)
+{
+	XLogRecPtr	currentLSN;
+	int			wake_events = WL_LATCH_SET | WL_POSTMASTER_DEATH;
+
+	/* Shouldn't be called when shmem isn't initialized */
+	Assert(waitLSNState);
+
+	/* Should have a valid proc number */
+	Assert(MyProcNumber >= 0 && MyProcNumber < MaxBackends + NUM_AUXILIARY_PROCS);
+
+	/* We can only wait for flush when we are not in recovery */
+	Assert(!RecoveryInProgress());
+
+	/* Quick exit if already flushed */
+	currentLSN = GetFlushRecPtr(NULL);
+	if (targetLSN <= currentLSN)
+		return;
+
+	/* Add to flush waiters */
+	addLSNWaiter(targetLSN, WAIT_LSN_FLUSH);
+
+	/* Wait loop */
+	for (;;)
+	{
+		int			rc;
+
+		/* Check if the waited LSN has been flushed */
+		currentLSN = GetFlushRecPtr(NULL);
+		if (targetLSN <= currentLSN)
+			break;
+
+		CHECK_FOR_INTERRUPTS();
+
+		rc = WaitLatch(MyLatch, wake_events, -1,
+					   WAIT_EVENT_WAIT_FOR_WAL_FLUSH);
+
+		/*
+		 * Emergency bailout if postmaster has died. This is to avoid the
+		 * necessity for manual cleanup of all postmaster children.
+		 */
+		if (rc & WL_POSTMASTER_DEATH)
+			ereport(FATAL,
+					(errcode(ERRCODE_ADMIN_SHUTDOWN),
+					 errmsg("terminating connection due to unexpected postmaster exit"),
+					 errcontext("while waiting for LSN flush")));
+
+		if (rc & WL_LATCH_SET)
+			ResetLatch(MyLatch);
+	}
+
+	/*
+	 * Delete our process from the shared memory flush heap. We might
+	 * already be deleted by the waker process. The 'inFlushHeap' flag prevents
+	 * us from the double deletion.
+	 */
+	deleteLSNWaiter(WAIT_LSN_FLUSH);
+
+	return;
+}
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 2fa045e6b0f..10ffce8d174 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -24,6 +24,7 @@
 #include "access/twophase.h"
 #include "access/xlogprefetcher.h"
 #include "access/xlogrecovery.h"
+#include "access/xlogwait.h"
 #include "commands/async.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -150,6 +151,7 @@ CalculateShmemSize(int *num_semaphores)
 	size = add_size(size, InjectionPointShmemSize());
 	size = add_size(size, SlotSyncShmemSize());
 	size = add_size(size, AioShmemSize());
+	size = add_size(size, WaitLSNShmemSize());
 
 	/* include additional requested shmem from preload libraries */
 	size = add_size(size, total_addin_request);
@@ -343,6 +345,7 @@ CreateOrAttachShmemStructs(void)
 	WaitEventCustomShmemInit();
 	InjectionPointShmemInit();
 	AioShmemInit();
+	WaitLSNShmemInit();
 }
 
 /*
diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt
index 7553f6eacef..c1ac71ff7f2 100644
--- a/src/backend/utils/activity/wait_event_names.txt
+++ b/src/backend/utils/activity/wait_event_names.txt
@@ -89,6 +89,8 @@ LIBPQWALRECEIVER_CONNECT	"Waiting in WAL receiver to establish connection to rem
 LIBPQWALRECEIVER_RECEIVE	"Waiting in WAL receiver to receive data from remote server."
 SSL_OPEN_SERVER	"Waiting for SSL while attempting connection."
 WAIT_FOR_STANDBY_CONFIRMATION	"Waiting for WAL to be received and flushed by the physical standby."
+WAIT_FOR_WAL_FLUSH	"Waiting for WAL flush to reach a target LSN on a primary."
+WAIT_FOR_WAL_REPLAY	"Waiting for WAL replay to reach a target LSN on a standby."
 WAL_SENDER_WAIT_FOR_WAL	"Waiting for WAL to be flushed in WAL sender process."
 WAL_SENDER_WRITE_DATA	"Waiting for any activity when processing replies from WAL receiver in WAL sender process."
 
@@ -355,6 +357,7 @@ DSMRegistry	"Waiting to read or update the dynamic shared memory registry."
 InjectionPoint	"Waiting to read or update information related to injection points."
 SerialControl	"Waiting to read or update shared <filename>pg_serial</filename> state."
 AioWorkerSubmissionQueue	"Waiting to access AIO worker submission queue."
+WaitLSN	"Waiting to read or update shared Wait-for-LSN state."
 
 #
 # END OF PREDEFINED LWLOCKS (DO NOT CHANGE THIS LINE)
diff --git a/src/include/access/xlogwait.h b/src/include/access/xlogwait.h
new file mode 100644
index 00000000000..441bf475b4d
--- /dev/null
+++ b/src/include/access/xlogwait.h
@@ -0,0 +1,112 @@
+/*-------------------------------------------------------------------------
+ *
+ * xlogwait.h
+ *	  Declarations for LSN replay waiting routines.
+ *
+ * Copyright (c) 2025, PostgreSQL Global Development Group
+ *
+ * src/include/access/xlogwait.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef XLOG_WAIT_H
+#define XLOG_WAIT_H
+
+#include "lib/pairingheap.h"
+#include "port/atomics.h"
+#include "postgres.h"
+#include "storage/procnumber.h"
+#include "storage/spin.h"
+#include "tcop/dest.h"
+
+/*
+ * Result statuses for WaitForLSNReplay().
+ */
+typedef enum
+{
+	WAIT_LSN_RESULT_SUCCESS,	/* Target LSN is reached */
+	WAIT_LSN_RESULT_NOT_IN_RECOVERY,	/* Recovery ended before or during our
+										 * wait */
+} WaitLSNResult;
+
+/*
+ * Wait operation types for LSN waiting facility.
+ */
+typedef enum WaitLSNOperation
+{
+	WAIT_LSN_REPLAY,	/* Waiting for replay on standby */
+	WAIT_LSN_FLUSH		/* Waiting for flush on primary */
+} WaitLSNOperation;
+
+/*
+ * WaitLSNProcInfo - the shared memory structure representing information
+ * about the single process, which may wait for LSN operations.  An item of
+ * waitLSNState->procInfos array.
+ */
+typedef struct WaitLSNProcInfo
+{
+	/* LSN, which this process is waiting for */
+	XLogRecPtr	waitLSN;
+
+	/* Process to wake up once the waitLSN is reached */
+	ProcNumber	procno;
+
+	/* Type-safe heap membership flags */
+	bool		inReplayHeap;	/* In replay waiters heap */
+	bool		inFlushHeap;	/* In flush waiters heap */
+
+	/* Separate heap nodes for type safety */
+	pairingheap_node replayHeapNode;
+	pairingheap_node flushHeapNode;
+} WaitLSNProcInfo;
+
+/*
+ * WaitLSNState - the shared memory state for the LSN waiting facility.
+ */
+typedef struct WaitLSNState
+{
+	/*
+	 * The minimum replay LSN value some process is waiting for.  Used for the
+	 * fast-path checking if we need to wake up any waiters after replaying a
+	 * WAL record.  Could be read lock-less.  Update protected by WaitLSNLock.
+	 */
+	pg_atomic_uint64 minWaitedReplayLSN;
+
+	/*
+	 * A pairing heap of replay waiting processes ordered by LSN values (least LSN is
+	 * on top).  Protected by WaitLSNLock.
+	 */
+	pairingheap replayWaitersHeap;
+
+	/*
+	 * The minimum flush LSN value some process is waiting for.  Used for the
+	 * fast-path checking if we need to wake up any waiters after flushing
+	 * WAL.  Could be read lock-less.  Update protected by WaitLSNLock.
+	 */
+	pg_atomic_uint64 minWaitedFlushLSN;
+
+	/*
+	 * A pairing heap of flush waiting processes ordered by LSN values (least LSN is
+	 * on top).  Protected by WaitLSNLock.
+	 */
+	pairingheap flushWaitersHeap;
+
+	/*
+	 * An array with per-process information, indexed by the process number.
+	 * Protected by WaitLSNLock.
+	 */
+	WaitLSNProcInfo procInfos[FLEXIBLE_ARRAY_MEMBER];
+} WaitLSNState;
+
+
+extern PGDLLIMPORT WaitLSNState *waitLSNState;
+
+extern Size WaitLSNShmemSize(void);
+extern void WaitLSNShmemInit(void);
+extern void WaitLSNWakeupReplay(XLogRecPtr currentLSN);
+extern void WaitLSNWakeupFlush(XLogRecPtr currentLSN);
+extern void WaitLSNCleanup(void);
+extern WaitLSNResult WaitForLSNReplay(XLogRecPtr targetLSN);
+extern void WaitForLSNFlush(XLogRecPtr targetLSN);
+
+#endif							/* XLOG_WAIT_H */
diff --git a/src/include/storage/lwlocklist.h b/src/include/storage/lwlocklist.h
index 06a1ffd4b08..5b0ce383408 100644
--- a/src/include/storage/lwlocklist.h
+++ b/src/include/storage/lwlocklist.h
@@ -85,6 +85,7 @@ PG_LWLOCK(50, DSMRegistry)
 PG_LWLOCK(51, InjectionPoint)
 PG_LWLOCK(52, SerialControl)
 PG_LWLOCK(53, AioWorkerSubmissionQueue)
+PG_LWLOCK(54, WaitLSN)
 
 /*
  * There also exist several built-in LWLock tranches.  As with the predefined
-- 
2.51.0

From 48abb92fb33628f6eba5bbe865b3b19c24fb716d Mon Sep 17 00:00:00 2001
From: alterego655 <[email protected]>
Date: Thu, 9 Oct 2025 10:29:05 +0800
Subject: [PATCH v8 1/3] Add pairingheap_initialize() for shared memory usage

The existing pairingheap_allocate() uses palloc(), which allocates
from process-local memory. For shared memory use cases, the pairingheap
structure must be allocated via ShmemAlloc() or embedded in a shared
memory struct. Add pairingheap_initialize() to initialize an already-
allocated pairingheap structure in-place, enabling shared memory usage.

Discussion:
https://www.postgresql.org/message-id/flat/CAPpHfdsjtZLVzxjGT8rJHCYbM0D5dwkO+BBjcirozJ6nYbOW8Q@mail.gmail.com
https://www.postgresql.org/message-id/flat/CABPTF7UNft368x-RgOXkfj475OwEbp%2BVVO-wEXz7StgjD_%3D6sw%40mail.gmail.com

Author: Kartyshov Ivan <[email protected]>
Author: Alexander Korotkov <[email protected]>

Reviewed-by: Michael Paquier <[email protected]>
Reviewed-by: Peter Eisentraut <[email protected]>
Reviewed-by: Dilip Kumar <[email protected]>
Reviewed-by: Amit Kapila <[email protected]>
Reviewed-by: Alexander Lakhin <[email protected]>
Reviewed-by: Bharath Rupireddy <[email protected]>
Reviewed-by: Euler Taveira <[email protected]>
Reviewed-by: Heikki Linnakangas <[email protected]>
Reviewed-by: Kyotaro Horiguchi <[email protected]>
Reviewed-by: Xuneng Zhou <[email protected]>
---
 src/backend/lib/pairingheap.c | 18 ++++++++++++++++--
 src/include/lib/pairingheap.h |  3 +++
 2 files changed, 19 insertions(+), 2 deletions(-)

diff --git a/src/backend/lib/pairingheap.c b/src/backend/lib/pairingheap.c
index 0aef8a88f1b..fa8431f7946 100644
--- a/src/backend/lib/pairingheap.c
+++ b/src/backend/lib/pairingheap.c
@@ -44,12 +44,26 @@ pairingheap_allocate(pairingheap_comparator compare, void *arg)
 	pairingheap *heap;
 
 	heap = (pairingheap *) palloc(sizeof(pairingheap));
+	pairingheap_initialize(heap, compare, arg);
+
+	return heap;
+}
+
+/*
+ * pairingheap_initialize
+ *
+ * Same as pairingheap_allocate(), but initializes the pairing heap in-place
+ * rather than allocating a new chunk of memory.  Useful to store the pairing
+ * heap in a shared memory.
+ */
+void
+pairingheap_initialize(pairingheap *heap, pairingheap_comparator compare,
+					   void *arg)
+{
 	heap->ph_compare = compare;
 	heap->ph_arg = arg;
 
 	heap->ph_root = NULL;
-
-	return heap;
 }
 
 /*
diff --git a/src/include/lib/pairingheap.h b/src/include/lib/pairingheap.h
index 3c57d3fda1b..567586f2ecf 100644
--- a/src/include/lib/pairingheap.h
+++ b/src/include/lib/pairingheap.h
@@ -77,6 +77,9 @@ typedef struct pairingheap
 
 extern pairingheap *pairingheap_allocate(pairingheap_comparator compare,
 										 void *arg);
+extern void pairingheap_initialize(pairingheap *heap,
+								   pairingheap_comparator compare,
+								   void *arg);
 extern void pairingheap_free(pairingheap *heap);
 extern void pairingheap_add(pairingheap *heap, pairingheap_node *node);
 extern pairingheap_node *pairingheap_first(pairingheap *heap);
-- 
2.51.0

From 6b3d84a211e4e4e5d5d3682b159967bd6278cbc6 Mon Sep 17 00:00:00 2001
From: alterego655 <[email protected]>
Date: Fri, 10 Oct 2025 16:48:45 +0800
Subject: [PATCH v8 3/3] Improve read_local_xlog_page_guts by replacing polling
 with latch-based waiting.

Replace inefficient polling loops in read_local_xlog_page_guts with facilities developed in xlogwaitmodule when WAL data is not yet available.  This eliminates CPU-intensive busy waiting and improves
responsiveness by waking processes immediately when their target LSN becomes available.

Discussion:
https://www.postgresql.org/message-id/flat/CAPpHfdsjtZLVzxjGT8rJHCYbM0D5dwkO+BBjcirozJ6nYbOW8Q@mail.gmail.com
https://www.postgresql.org/message-id/flat/CABPTF7UNft368x-RgOXkfj475OwEbp%2BVVO-wEXz7StgjD_%3D6sw%40mail.gmail.com

Author: Xuneng Zhou <[email protected]>
Author: Kartyshov Ivan <[email protected]>
Author: Alexander Korotkov <[email protected]>

Reviewed-by: Michael Paquier <[email protected]>
Reviewed-by: Peter Eisentraut <[email protected]>
Reviewed-by: Dilip Kumar <[email protected]>
Reviewed-by: Amit Kapila <[email protected]>
Reviewed-by: Alexander Lakhin <[email protected]>
Reviewed-by: Bharath Rupireddy <[email protected]>
Reviewed-by: Euler Taveira <[email protected]>
Reviewed-by: Heikki Linnakangas <[email protected]>
Reviewed-by: Kyotaro Horiguchi <[email protected]>
---
 src/backend/access/transam/xact.c         |  6 +++
 src/backend/access/transam/xlog.c         | 25 ++++++++++++
 src/backend/access/transam/xlogrecovery.c | 11 ++++++
 src/backend/access/transam/xlogutils.c    | 47 +++++++++++++++++++----
 src/backend/replication/walsender.c       |  4 --
 src/backend/storage/lmgr/proc.c           |  6 +++
 6 files changed, 87 insertions(+), 12 deletions(-)

diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 2cf3d4e92b7..092e197eba3 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -31,6 +31,7 @@
 #include "access/xloginsert.h"
 #include "access/xlogrecovery.h"
 #include "access/xlogutils.h"
+#include "access/xlogwait.h"
 #include "catalog/index.h"
 #include "catalog/namespace.h"
 #include "catalog/pg_enum.h"
@@ -2843,6 +2844,11 @@ AbortTransaction(void)
 	 */
 	LWLockReleaseAll();
 
+	/*
+	 * Cleanup waiting for LSN if any.
+	 */
+	WaitLSNCleanup();
+
 	/* Clear wait information and command progress indicator */
 	pgstat_report_wait_end();
 	pgstat_progress_end_command();
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index eceab341255..cff53106f76 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -62,6 +62,7 @@
 #include "access/xlogreader.h"
 #include "access/xlogrecovery.h"
 #include "access/xlogutils.h"
+#include "access/xlogwait.h"
 #include "backup/basebackup.h"
 #include "catalog/catversion.h"
 #include "catalog/pg_control.h"
@@ -2912,6 +2913,15 @@ XLogFlush(XLogRecPtr record)
 	/* wake up walsenders now that we've released heavily contended locks */
 	WalSndWakeupProcessRequests(true, !RecoveryInProgress());
 
+	/*
+	 * If we flushed an LSN that someone was waiting for then walk
+	 * over the shared memory array and set latches to notify the
+	 * waiters.
+	 */
+	if (waitLSNState &&
+		(LogwrtResult.Flush >= pg_atomic_read_u64(&waitLSNState->minWaitedFlushLSN)))
+		WaitLSNWakeupFlush(LogwrtResult.Flush);
+
 	/*
 	 * If we still haven't flushed to the request point then we have a
 	 * problem; most likely, the requested flush point is past end of XLOG.
@@ -3094,6 +3104,15 @@ XLogBackgroundFlush(void)
 	/* wake up walsenders now that we've released heavily contended locks */
 	WalSndWakeupProcessRequests(true, !RecoveryInProgress());
 
+	/*
+	 * If we flushed an LSN that someone was waiting for then walk
+	 * over the shared memory array and set latches to notify the
+	 * waiters.
+	 */
+	if (waitLSNState &&
+		(LogwrtResult.Flush >= pg_atomic_read_u64(&waitLSNState->minWaitedFlushLSN)))
+		WaitLSNWakeupFlush(LogwrtResult.Flush);
+
 	/*
 	 * Great, done. To take some work off the critical path, try to initialize
 	 * as many of the no-longer-needed WAL buffers for future use as we can.
@@ -6225,6 +6244,12 @@ StartupXLOG(void)
 	UpdateControlFile();
 	LWLockRelease(ControlFileLock);
 
+	/*
+	 * Wake up all waiters for replay LSN.  They need to report an error that
+	 * recovery was ended before reaching the target LSN.
+	 */
+	WaitLSNWakeupReplay(InvalidXLogRecPtr);
+
 	/*
 	 * Shutdown the recovery environment.  This must occur after
 	 * RecoverPreparedTransactions() (see notes in lock_twophase_recover())
diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index 52ff4d119e6..1859d2084e8 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -40,6 +40,7 @@
 #include "access/xlogreader.h"
 #include "access/xlogrecovery.h"
 #include "access/xlogutils.h"
+#include "access/xlogwait.h"
 #include "backup/basebackup.h"
 #include "catalog/pg_control.h"
 #include "commands/tablespace.h"
@@ -1838,6 +1839,16 @@ PerformWalRecovery(void)
 				break;
 			}
 
+			/*
+			 * If we replayed an LSN that someone was waiting for then walk
+			 * over the shared memory array and set latches to notify the
+			 * waiters.
+			 */
+			if (waitLSNState &&
+				(XLogRecoveryCtl->lastReplayedEndRecPtr >=
+				 pg_atomic_read_u64(&waitLSNState->minWaitedReplayLSN)))
+				WaitLSNWakeupReplay(XLogRecoveryCtl->lastReplayedEndRecPtr);
+
 			/* Else, try to fetch the next WAL record */
 			record = ReadRecord(xlogprefetcher, LOG, false, replayTLI);
 		} while (record != NULL);
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index 38176d9688e..df8d4629b6c 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -23,6 +23,7 @@
 #include "access/xlogrecovery.h"
 #include "access/xlog_internal.h"
 #include "access/xlogutils.h"
+#include "access/xlogwait.h"
 #include "miscadmin.h"
 #include "storage/fd.h"
 #include "storage/smgr.h"
@@ -880,12 +881,7 @@ read_local_xlog_page_guts(XLogReaderState *state, XLogRecPtr targetPagePtr,
 	loc = targetPagePtr + reqLen;
 
 	/*
-	 * Loop waiting for xlog to be available if necessary
-	 *
-	 * TODO: The walsender has its own version of this function, which uses a
-	 * condition variable to wake up whenever WAL is flushed. We could use the
-	 * same infrastructure here, instead of the check/sleep/repeat style of
-	 * loop.
+	 * Waiting for xlog to be available if necessary.
 	 */
 	while (1)
 	{
@@ -927,7 +923,6 @@ read_local_xlog_page_guts(XLogReaderState *state, XLogRecPtr targetPagePtr,
 
 		if (state->currTLI == currTLI)
 		{
-
 			if (loc <= read_upto)
 				break;
 
@@ -947,7 +942,43 @@ read_local_xlog_page_guts(XLogReaderState *state, XLogRecPtr targetPagePtr,
 			}
 
 			CHECK_FOR_INTERRUPTS();
-			pg_usleep(1000L);
+
+			/*
+			 * Wait for LSN using appropriate method based on server state.
+			 */
+			if (!RecoveryInProgress())
+			{
+				/* Primary: wait for flush */
+				WaitForLSNFlush(loc);
+			}
+			else
+			{
+				/* Standby: wait for replay */
+				WaitLSNResult result = WaitForLSNReplay(loc);
+
+				switch (result)
+				{
+					case WAIT_LSN_RESULT_SUCCESS:
+						/* LSN was replayed, loop back to recheck timeline */
+						break;
+
+					case WAIT_LSN_RESULT_NOT_IN_RECOVERY:
+						/*
+						 * Promoted while waiting. This is the tricky case.
+						 * We're now a primary, so loop back and use flush
+						 * logic instead of replay logic.
+						 */
+						break;
+
+					default:
+						elog(ERROR, "unexpected wait result");
+				}
+			}
+
+			/*
+			 * Loop back to recheck everything.
+			 * Timeline might have changed during our wait.
+			 */
 		}
 		else
 		{
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 59822f22b8d..9955e829190 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1022,10 +1022,6 @@ StartReplication(StartReplicationCmd *cmd)
 /*
  * XLogReaderRoutine->page_read callback for logical decoding contexts, as a
  * walsender process.
- *
- * Inside the walsender we can do better than read_local_xlog_page,
- * which has to do a plain sleep/busy loop, because the walsender's latch gets
- * set every time WAL is flushed.
  */
 static int
 logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index 96f29aafc39..26b201eadb8 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -36,6 +36,7 @@
 #include "access/transam.h"
 #include "access/twophase.h"
 #include "access/xlogutils.h"
+#include "access/xlogwait.h"
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "postmaster/autovacuum.h"
@@ -947,6 +948,11 @@ ProcKill(int code, Datum arg)
 	 */
 	LWLockReleaseAll();
 
+	/*
+	 * Cleanup waiting for LSN if any.
+	 */
+	WaitLSNCleanup();
+
 	/* Cancel any pending condition variable sleep, too */
 	ConditionVariableCancelSleep();
 
-- 
2.51.0

Reply via email to