From 73513f3aacddb8e9d8762215a6c2000fc21f1589 Mon Sep 17 00:00:00 2001
From: alterego655 <824662526@qq.com>
Date: Tue, 16 Dec 2025 10:50:22 +0800
Subject: [PATCH v8 2/4] Add MODE option to WAIT FOR LSN command

Extend the WAIT FOR LSN command with an optional MODE option in the
WITH clause that specifies which LSN type to wait for:

  WAIT FOR LSN '<lsn>' [WITH (MODE '<mode>', ...)]

where mode can be:
- 'standby_replay' (default): Wait for WAL to be replayed to the specified LSN
- 'standby_write': Wait for WAL to be written (received) to the specified LSN
- 'standby_flush': Wait for WAL to be flushed to disk at the specified LSN
- 'primary_flush': Wait for WAL to be flushed to disk on the primary server

The default mode is 'standby_replay', matching the original behavior when MODE
is not specified. This follows the pattern used by COPY and EXPLAIN
commands where options are specified as string values in the WITH clause.

Modes are explicitly named to distinguish between primary and standby operations:
- Standby modes ('standby_replay', 'standby_write', 'standby_flush') can only
  be used during recovery (on a standby server)
- Primary mode ('primary_flush') can only be used on a primary server

The 'standby_write' and 'standby_flush' modes are useful for scenarios where
applications need to ensure WAL has been received or persisted on the standby
without necessarily waiting for replay to complete. The 'primary_flush' mode
allows waiting for WAL to be flushed on the primary server.

Also includes:
- Documentation updates for the new syntax and mode descriptions
- Test coverage for all four modes including error cases and concurrent waiters
- Wakeup logic in walreceiver for standby write/flush waiters
- Wakeup logic in WAL writer for primary flush waiters
---
 doc/src/sgml/ref/wait_for.sgml          | 213 +++++++++---
 src/backend/access/transam/xlog.c       |  22 +-
 src/backend/commands/wait.c             |  96 +++++-
 src/backend/replication/walreceiver.c   |  18 ++
 src/test/recovery/t/049_wait_for_lsn.pl | 411 ++++++++++++++++++++++--
 5 files changed, 673 insertions(+), 87 deletions(-)

diff --git a/doc/src/sgml/ref/wait_for.sgml b/doc/src/sgml/ref/wait_for.sgml
index 3b8e842d1de..df72b3327c8 100644
--- a/doc/src/sgml/ref/wait_for.sgml
+++ b/doc/src/sgml/ref/wait_for.sgml
@@ -16,17 +16,23 @@ PostgreSQL documentation
 
  <refnamediv>
   <refname>WAIT FOR</refname>
-  <refpurpose>wait for target <acronym>LSN</acronym> to be replayed, optionally with a timeout</refpurpose>
+  <refpurpose>wait for WAL to reach a target <acronym>LSN</acronym></refpurpose>
  </refnamediv>
 
  <refsynopsisdiv>
 <synopsis>
-WAIT FOR LSN '<replaceable class="parameter">lsn</replaceable>' [ WITH ( <replaceable class="parameter">option</replaceable> [, ...] ) ]
+WAIT FOR LSN '<replaceable class="parameter">lsn</replaceable>'
+    [ WITH ( <replaceable class="parameter">option</replaceable> [, ...] ) ]
 
 <phrase>where <replaceable class="parameter">option</replaceable> can be:</phrase>
 
+    MODE '<replaceable class="parameter">mode</replaceable>'
     TIMEOUT '<replaceable class="parameter">timeout</replaceable>'
     NO_THROW
+
+<phrase>and <replaceable class="parameter">mode</replaceable> can be:</phrase>
+
+    standby_replay | standby_write | standby_flush | primary_flush
 </synopsis>
  </refsynopsisdiv>
 
@@ -34,20 +40,27 @@ WAIT FOR LSN '<replaceable class="parameter">lsn</replaceable>' [ WITH ( <replac
   <title>Description</title>
 
   <para>
-    Waits until recovery replays <parameter>lsn</parameter>.
-    If no <parameter>timeout</parameter> is specified or it is set to
-    zero, this command waits indefinitely for the
-    <parameter>lsn</parameter>.
-    On timeout, or if the server is promoted before
-    <parameter>lsn</parameter> is reached, an error is emitted,
-    unless <literal>NO_THROW</literal> is specified in the WITH clause.
-    If <parameter>NO_THROW</parameter> is specified, then the command
-    doesn't throw errors.
+   Waits until the specified <parameter>lsn</parameter> is reached
+   according to the specified <parameter>mode</parameter>,
+   which determines whether to wait for WAL to be written, flushed, or replayed.
+   If no <parameter>timeout</parameter> is specified or it is set to
+   zero, this command waits indefinitely for the
+   <parameter>lsn</parameter>.
+  </para>
+
+  <para>
+   On timeout, an error is emitted unless <literal>NO_THROW</literal>
+   is specified in the WITH clause. For standby modes
+   (<literal>standby_replay</literal>, <literal>standby_write</literal>,
+   <literal>standby_flush</literal>), an error is also emitted if the
+   server is promoted before the <parameter>lsn</parameter> is reached.
+   If <parameter>NO_THROW</parameter> is specified, the command returns
+   a status string instead of throwing errors.
   </para>
 
   <para>
-    The possible return values are <literal>success</literal>,
-    <literal>timeout</literal>, and <literal>not in recovery</literal>.
+   The possible return values are <literal>success</literal>,
+   <literal>timeout</literal>, and <literal>not in recovery</literal>.
   </para>
  </refsect1>
 
@@ -72,6 +85,65 @@ WAIT FOR LSN '<replaceable class="parameter">lsn</replaceable>' [ WITH ( <replac
       The following parameters are supported:
 
       <variablelist>
+       <varlistentry>
+        <term><literal>MODE</literal> '<replaceable class="parameter">mode</replaceable>'</term>
+        <listitem>
+         <para>
+          Specifies the type of LSN processing to wait for. If not specified,
+          the default is <literal>standby_replay</literal>. The valid modes are:
+         </para>
+         <itemizedlist>
+          <listitem>
+           <para>
+            <literal>standby_replay</literal>: Wait for the LSN to be replayed
+            (applied to the database) on a standby server. After successful
+            completion, <function>pg_last_wal_replay_lsn()</function> will
+            return a value greater than or equal to the target LSN. This mode
+            can only be used during recovery.
+           </para>
+          </listitem>
+          <listitem>
+           <para>
+            <literal>standby_write</literal>: Wait for the WAL containing the
+            LSN to be received from the primary and written to disk on a
+            standby server, but not yet flushed. This is faster than
+            <literal>standby_flush</literal> but provides weaker durability
+            guarantees since the data may still be in operating system
+            buffers. After successful completion, the
+            <structfield>written_lsn</structfield> column in
+            <link linkend="monitoring-pg-stat-wal-receiver-view">
+            <structname>pg_stat_wal_receiver</structname></link> will show
+            a value greater than or equal to the target LSN. This mode can
+            only be used during recovery.
+           </para>
+          </listitem>
+          <listitem>
+           <para>
+            <literal>standby_flush</literal>: Wait for the WAL containing the
+            LSN to be received from the primary and flushed to disk on a
+            standby server. This provides a durability guarantee without
+            waiting for the WAL to be applied. After successful completion,
+            <function>pg_last_wal_receive_lsn()</function> will return a
+            value greater than or equal to the target LSN. This value is
+            also available as the <structfield>flushed_lsn</structfield>
+            column in <link linkend="monitoring-pg-stat-wal-receiver-view">
+            <structname>pg_stat_wal_receiver</structname></link>. This mode
+            can only be used during recovery.
+           </para>
+          </listitem>
+          <listitem>
+           <para>
+            <literal>primary_flush</literal>: Wait for the WAL containing the
+            LSN to be flushed to disk on a primary server. After successful
+            completion, <function>pg_current_wal_flush_lsn()</function> will
+            return a value greater than or equal to the target LSN. This mode
+            can only be used on a primary server (not during recovery).
+           </para>
+          </listitem>
+         </itemizedlist>
+        </listitem>
+       </varlistentry>
+
        <varlistentry>
         <term><literal>TIMEOUT</literal> '<replaceable class="parameter">timeout</replaceable>'</term>
         <listitem>
@@ -135,9 +207,12 @@ WAIT FOR LSN '<replaceable class="parameter">lsn</replaceable>' [ WITH ( <replac
     <listitem>
      <para>
       This return value denotes that the database server is not in a recovery
-      state.  This might mean either the database server was not in recovery
-      at the moment of receiving the command, or it was promoted before
-      reaching the target <parameter>lsn</parameter>.
+      state. This might mean either the database server was not in recovery
+      at the moment of receiving the command (i.e., executed on a primary),
+      or it was promoted before reaching the target <parameter>lsn</parameter>.
+      In the promotion case, this status indicates a timeline change occurred,
+      and the application should re-evaluate whether the target LSN is still
+      relevant.
      </para>
     </listitem>
    </varlistentry>
@@ -148,25 +223,34 @@ WAIT FOR LSN '<replaceable class="parameter">lsn</replaceable>' [ WITH ( <replac
   <title>Notes</title>
 
   <para>
-    <command>WAIT FOR</command> command waits till
-    <parameter>lsn</parameter> to be replayed on standby.
-    That is, after this command execution, the value returned by
-    <function>pg_last_wal_replay_lsn</function> should be greater or equal
-    to the <parameter>lsn</parameter> value.  This is useful to achieve
-    read-your-writes-consistency, while using async replica for reads and
-    primary for writes.  In that case, the <acronym>lsn</acronym> of the last
-    modification should be stored on the client application side or the
-    connection pooler side.
+   <command>WAIT FOR</command> waits until the specified
+   <parameter>lsn</parameter> is reached according to the specified
+   <parameter>mode</parameter>. The <literal>standby_replay</literal> mode
+   waits for the LSN to be replayed (applied to the database), which is
+   useful to achieve read-your-writes consistency while using an async
+   replica for reads and the primary for writes. The
+   <literal>standby_flush</literal> mode waits for the WAL to be flushed
+   to durable storage on the replica, providing a durability guarantee
+   without waiting for replay. The <literal>standby_write</literal> mode
+   waits for the WAL to be written to the operating system, which is
+   faster than flush but provides weaker durability guarantees. The
+   <literal>primary_flush</literal> mode waits for WAL to be flushed on
+   a primary server. In all cases, the <acronym>LSN</acronym> of the last
+   modification should be stored on the client application side or the
+   connection pooler side.
   </para>
 
   <para>
-    <command>WAIT FOR</command> command should be called on standby.
-    If a user runs <command>WAIT FOR</command> on primary, it
-    will error out unless <parameter>NO_THROW</parameter> is specified in the WITH clause.
-    However, if <command>WAIT FOR</command> is
-    called on primary promoted from standby and <literal>lsn</literal>
-    was already replayed, then the <command>WAIT FOR</command> command just
-    exits immediately.
+   The standby modes (<literal>standby_replay</literal>,
+   <literal>standby_write</literal>, <literal>standby_flush</literal>)
+   can only be used during recovery, and <literal>primary_flush</literal>
+   can only be used on a primary server. Using the wrong mode for the
+   current server state will result in an error. If a standby is promoted
+   while waiting with a standby mode, the command will return
+   <literal>not in recovery</literal> (or throw an error if
+   <literal>NO_THROW</literal> is not specified). Promotion creates a new
+   timeline, and the LSN being waited for may refer to WAL from the old
+   timeline.
   </para>
 
 </refsect1>
@@ -175,21 +259,21 @@ WAIT FOR LSN '<replaceable class="parameter">lsn</replaceable>' [ WITH ( <replac
   <title>Examples</title>
 
   <para>
-    You can use <command>WAIT FOR</command> command to wait for
-    the <type>pg_lsn</type> value.  For example, an application could update
-    the <literal>movie</literal> table and get the <acronym>lsn</acronym> after
-    changes just made.  This example uses <function>pg_current_wal_insert_lsn</function>
-    on primary server to get the <acronym>lsn</acronym> given that
-    <varname>synchronous_commit</varname> could be set to
-    <literal>off</literal>.
+   You can use <command>WAIT FOR</command> command to wait for
+   the <type>pg_lsn</type> value.  For example, an application could update
+   the <literal>movie</literal> table and get the <acronym>lsn</acronym> after
+   changes just made.  This example uses <function>pg_current_wal_insert_lsn</function>
+   on primary server to get the <acronym>lsn</acronym> given that
+   <varname>synchronous_commit</varname> could be set to
+   <literal>off</literal>.
 
    <programlisting>
 postgres=# UPDATE movie SET genre = 'Dramatic' WHERE genre = 'Drama';
 UPDATE 100
 postgres=# SELECT pg_current_wal_insert_lsn();
-pg_current_wal_insert_lsn
---------------------
-0/306EE20
+ pg_current_wal_insert_lsn
+---------------------------
+ 0/306EE20
 (1 row)
 </programlisting>
 
@@ -200,7 +284,7 @@ pg_current_wal_insert_lsn
 <programlisting>
 postgres=# WAIT FOR LSN '0/306EE20';
  status
---------
+---------
  success
 (1 row)
 postgres=# SELECT * FROM movie WHERE genre = 'Drama';
@@ -211,7 +295,43 @@ postgres=# SELECT * FROM movie WHERE genre = 'Drama';
   </para>
 
   <para>
-    If the target LSN is not reached before the timeout, the error is thrown.
+   Wait for flush (data durable on replica):
+
+<programlisting>
+postgres=# WAIT FOR LSN '0/306EE20' WITH (MODE 'standby_flush');
+ status
+---------
+ success
+(1 row)
+</programlisting>
+  </para>
+
+  <para>
+   Wait for write with timeout:
+
+<programlisting>
+postgres=# WAIT FOR LSN '0/306EE20' WITH (MODE 'standby_write', TIMEOUT '100ms', NO_THROW);
+ status
+---------
+ success
+(1 row)
+</programlisting>
+  </para>
+
+  <para>
+   Wait for flush on primary:
+
+<programlisting>
+postgres=# WAIT FOR LSN '0/306EE20' WITH (MODE 'primary_flush');
+ status
+---------
+ success
+(1 row)
+</programlisting>
+  </para>
+
+  <para>
+   If the target LSN is not reached before the timeout, an error is thrown:
 
 <programlisting>
 postgres=# WAIT FOR LSN '0/306EE20' WITH (TIMEOUT '0.1s');
@@ -221,11 +341,12 @@ ERROR:  timed out while waiting for target LSN 0/306EE20 to be replayed; current
 
   <para>
    The same example uses <command>WAIT FOR</command> with
-   <parameter>NO_THROW</parameter> option.
+   <parameter>NO_THROW</parameter> option:
+
 <programlisting>
 postgres=# WAIT FOR LSN '0/306EE20' WITH (TIMEOUT '100ms', NO_THROW);
  status
---------
+---------
  timeout
 (1 row)
 </programlisting>
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index fdb92deac57..da96b627228 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -2918,6 +2918,14 @@ XLogFlush(XLogRecPtr record)
 	/* wake up walsenders now that we've released heavily contended locks */
 	WalSndWakeupProcessRequests(true, !RecoveryInProgress());
 
+	/*
+	 * If we flushed an LSN that someone was waiting for, notify the waiters.
+	 */
+	if (waitLSNState &&
+		(LogwrtResult.Flush >=
+		 pg_atomic_read_u64(&waitLSNState->minWaitedLSN[WAIT_LSN_TYPE_PRIMARY_FLUSH])))
+		WaitLSNWakeup(WAIT_LSN_TYPE_PRIMARY_FLUSH, LogwrtResult.Flush);
+
 	/*
 	 * If we still haven't flushed to the request point then we have a
 	 * problem; most likely, the requested flush point is past end of XLOG.
@@ -3100,6 +3108,14 @@ XLogBackgroundFlush(void)
 	/* wake up walsenders now that we've released heavily contended locks */
 	WalSndWakeupProcessRequests(true, !RecoveryInProgress());
 
+	/*
+	 * If we flushed an LSN that someone was waiting for, notify the waiters.
+	 */
+	if (waitLSNState &&
+		(LogwrtResult.Flush >=
+		 pg_atomic_read_u64(&waitLSNState->minWaitedLSN[WAIT_LSN_TYPE_PRIMARY_FLUSH])))
+		WaitLSNWakeup(WAIT_LSN_TYPE_PRIMARY_FLUSH, LogwrtResult.Flush);
+
 	/*
 	 * Great, done. To take some work off the critical path, try to initialize
 	 * as many of the no-longer-needed WAL buffers for future use as we can.
@@ -6277,10 +6293,12 @@ StartupXLOG(void)
 	WakeupCheckpointer();
 
 	/*
-	 * Wake up all waiters for replay LSN.  They need to report an error that
-	 * recovery was ended before reaching the target LSN.
+	 * Wake up all waiters.  They need to report an error that recovery was
+	 * ended before reaching the target LSN.
 	 */
 	WaitLSNWakeup(WAIT_LSN_TYPE_STANDBY_REPLAY, InvalidXLogRecPtr);
+	WaitLSNWakeup(WAIT_LSN_TYPE_STANDBY_WRITE, InvalidXLogRecPtr);
+	WaitLSNWakeup(WAIT_LSN_TYPE_STANDBY_FLUSH, InvalidXLogRecPtr);
 
 	/*
 	 * Shutdown the recovery environment.  This must occur after
diff --git a/src/backend/commands/wait.c b/src/backend/commands/wait.c
index dd2570cb787..a85c3b0de98 100644
--- a/src/backend/commands/wait.c
+++ b/src/backend/commands/wait.c
@@ -2,7 +2,7 @@
  *
  * wait.c
  *	  Implements WAIT FOR, which allows waiting for events such as
- *	  time passing or LSN having been replayed on replica.
+ *	  time passing or LSN having been replayed, flushed, or written.
  *
  * Portions Copyright (c) 2025, PostgreSQL Global Development Group
  *
@@ -15,6 +15,7 @@
 
 #include <math.h>
 
+#include "access/xlog.h"
 #include "access/xlogrecovery.h"
 #include "access/xlogwait.h"
 #include "commands/defrem.h"
@@ -28,18 +29,39 @@
 #include "utils/snapmgr.h"
 
 
+/*
+ * Type descriptor for WAIT FOR LSN wait types, used for error messages.
+ */
+typedef struct WaitLSNTypeDesc
+{
+	const char *noun;			/* Mode name: "standby_replay",
+								 * "standby_write", "standby_flush",
+								 * "primary_flush" */
+	const char *verb;			/* Past participle: "replayed", "written",
+								 * "flushed" */
+}			WaitLSNTypeDesc;
+
+static const WaitLSNTypeDesc WaitLSNTypeDescs[] = {
+	[WAIT_LSN_TYPE_STANDBY_REPLAY] = {"standby_replay", "replayed"},
+	[WAIT_LSN_TYPE_STANDBY_WRITE] = {"standby_write", "written"},
+	[WAIT_LSN_TYPE_STANDBY_FLUSH] = {"standby_flush", "flushed"},
+	[WAIT_LSN_TYPE_PRIMARY_FLUSH] = {"primary_flush", "flushed"},
+};
+
 void
 ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, DestReceiver *dest)
 {
 	XLogRecPtr	lsn;
 	int64		timeout = 0;
 	WaitLSNResult waitLSNResult;
+	WaitLSNType lsnType = WAIT_LSN_TYPE_STANDBY_REPLAY; /* default */
 	bool		throw = true;
 	TupleDesc	tupdesc;
 	TupOutputState *tstate;
 	const char *result = "<unset>";
 	bool		timeout_specified = false;
 	bool		no_throw_specified = false;
+	bool		mode_specified = false;
 
 	/* Parse and validate the mandatory LSN */
 	lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in,
@@ -47,7 +69,32 @@ ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, DestReceiver *dest)
 
 	foreach_node(DefElem, defel, stmt->options)
 	{
-		if (strcmp(defel->defname, "timeout") == 0)
+		if (strcmp(defel->defname, "mode") == 0)
+		{
+			char	   *mode_str;
+
+			if (mode_specified)
+				errorConflictingDefElem(defel, pstate);
+			mode_specified = true;
+
+			mode_str = defGetString(defel);
+
+			if (pg_strcasecmp(mode_str, "standby_replay") == 0)
+				lsnType = WAIT_LSN_TYPE_STANDBY_REPLAY;
+			else if (pg_strcasecmp(mode_str, "standby_write") == 0)
+				lsnType = WAIT_LSN_TYPE_STANDBY_WRITE;
+			else if (pg_strcasecmp(mode_str, "standby_flush") == 0)
+				lsnType = WAIT_LSN_TYPE_STANDBY_FLUSH;
+			else if (pg_strcasecmp(mode_str, "primary_flush") == 0)
+				lsnType = WAIT_LSN_TYPE_PRIMARY_FLUSH;
+			else
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						 errmsg("unrecognized value for WAIT option \"MODE\": \"%s\"",
+								mode_str),
+						 parser_errposition(pstate, defel->location)));
+		}
+		else if (strcmp(defel->defname, "timeout") == 0)
 		{
 			char	   *timeout_str;
 			const char *hintmsg;
@@ -107,8 +154,8 @@ ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, DestReceiver *dest)
 	}
 
 	/*
-	 * We are going to wait for the LSN replay.  We should first care that we
-	 * don't hold a snapshot and correspondingly our MyProc->xmin is invalid.
+	 * We are going to wait for the LSN.  We should first care that we don't
+	 * hold a snapshot and correspondingly our MyProc->xmin is invalid.
 	 * Otherwise, our snapshot could prevent the replay of WAL records
 	 * implying a kind of self-deadlock.  This is the reason why WAIT FOR is a
 	 * command, not a procedure or function.
@@ -140,7 +187,22 @@ ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, DestReceiver *dest)
 	 */
 	Assert(MyProc->xmin == InvalidTransactionId);
 
-	waitLSNResult = WaitForLSN(WAIT_LSN_TYPE_STANDBY_REPLAY, lsn, timeout);
+	/*
+	 * Validate that the requested mode matches the current server state.
+	 * Primary modes can only be used on a primary.
+	 */
+	if (lsnType == WAIT_LSN_TYPE_PRIMARY_FLUSH)
+	{
+		if (RecoveryInProgress())
+			ereport(ERROR,
+					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					 errmsg("recovery is in progress"),
+					 errhint("Waiting for primary_flush can only be done on a primary server. "
+							 "Use standby_flush mode on a standby server.")));
+	}
+
+	/* Now wait for the LSN */
+	waitLSNResult = WaitForLSN(lsnType, lsn, timeout);
 
 	/*
 	 * Process the result of WaitForLSN().  Throw appropriate error if needed.
@@ -154,11 +216,18 @@ ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, DestReceiver *dest)
 
 		case WAIT_LSN_RESULT_TIMEOUT:
 			if (throw)
+			{
+				const		WaitLSNTypeDesc *desc = &WaitLSNTypeDescs[lsnType];
+				XLogRecPtr	currentLSN = GetCurrentLSNForWaitType(lsnType);
+
 				ereport(ERROR,
 						errcode(ERRCODE_QUERY_CANCELED),
-						errmsg("timed out while waiting for target LSN %X/%08X to be replayed; current replay LSN %X/%08X",
+						errmsg("timed out while waiting for target LSN %X/%08X to be %s; current %s LSN %X/%08X",
 							   LSN_FORMAT_ARGS(lsn),
-							   LSN_FORMAT_ARGS(GetXLogReplayRecPtr(NULL))));
+							   desc->verb,
+							   desc->noun,
+							   LSN_FORMAT_ARGS(currentLSN)));
+			}
 			else
 				result = "timeout";
 			break;
@@ -166,20 +235,27 @@ ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, DestReceiver *dest)
 		case WAIT_LSN_RESULT_NOT_IN_RECOVERY:
 			if (throw)
 			{
+				const		WaitLSNTypeDesc *desc = &WaitLSNTypeDescs[lsnType];
+
 				if (PromoteIsTriggered())
 				{
+					XLogRecPtr	currentLSN = GetCurrentLSNForWaitType(lsnType);
+
 					ereport(ERROR,
 							errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 							errmsg("recovery is not in progress"),
-							errdetail("Recovery ended before replaying target LSN %X/%08X; last replay LSN %X/%08X.",
+							errdetail("Recovery ended before target LSN %X/%08X was %s; last %s LSN %X/%08X.",
 									  LSN_FORMAT_ARGS(lsn),
-									  LSN_FORMAT_ARGS(GetXLogReplayRecPtr(NULL))));
+									  desc->verb,
+									  desc->noun,
+									  LSN_FORMAT_ARGS(currentLSN)));
 				}
 				else
 					ereport(ERROR,
 							errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 							errmsg("recovery is not in progress"),
-							errhint("Waiting for the replay LSN can only be executed during recovery."));
+							errhint("Waiting for the %s LSN can only be executed during recovery.",
+									desc->noun));
 			}
 			else
 				result = "not in recovery";
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index ac802ae85b4..404d348da37 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -57,6 +57,7 @@
 #include "access/xlog_internal.h"
 #include "access/xlogarchive.h"
 #include "access/xlogrecovery.h"
+#include "access/xlogwait.h"
 #include "catalog/pg_authid.h"
 #include "funcapi.h"
 #include "libpq/pqformat.h"
@@ -965,6 +966,14 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli)
 	/* Update shared-memory status */
 	pg_atomic_write_u64(&WalRcv->writtenUpto, LogstreamResult.Write);
 
+	/*
+	 * If we wrote an LSN that someone was waiting for, notify the waiters.
+	 */
+	if (waitLSNState &&
+		(LogstreamResult.Write >=
+		 pg_atomic_read_u64(&waitLSNState->minWaitedLSN[WAIT_LSN_TYPE_STANDBY_WRITE])))
+		WaitLSNWakeup(WAIT_LSN_TYPE_STANDBY_WRITE, LogstreamResult.Write);
+
 	/*
 	 * Close the current segment if it's fully written up in the last cycle of
 	 * the loop, to create its archive notification file soon. Otherwise WAL
@@ -1004,6 +1013,15 @@ XLogWalRcvFlush(bool dying, TimeLineID tli)
 		}
 		SpinLockRelease(&walrcv->mutex);
 
+		/*
+		 * If we flushed an LSN that someone was waiting for, notify the
+		 * waiters.
+		 */
+		if (waitLSNState &&
+			(LogstreamResult.Flush >=
+			 pg_atomic_read_u64(&waitLSNState->minWaitedLSN[WAIT_LSN_TYPE_STANDBY_FLUSH])))
+			WaitLSNWakeup(WAIT_LSN_TYPE_STANDBY_FLUSH, LogstreamResult.Flush);
+
 		/* Signal the startup process and walsender that new WAL has arrived */
 		WakeupRecovery();
 		if (AllowCascadeReplication())
diff --git a/src/test/recovery/t/049_wait_for_lsn.pl b/src/test/recovery/t/049_wait_for_lsn.pl
index e0ddb06a2f0..e41aad45e28 100644
--- a/src/test/recovery/t/049_wait_for_lsn.pl
+++ b/src/test/recovery/t/049_wait_for_lsn.pl
@@ -1,5 +1,6 @@
-# Checks waiting for the LSN replay on standby using
-# the WAIT FOR command.
+# Checks waiting for the LSN using the WAIT FOR command.
+# Tests standby modes (standby_replay/standby_write/standby_flush) on standby
+# and primary_flush mode on primary.
 use strict;
 use warnings FATAL => 'all';
 
@@ -7,6 +8,42 @@ use PostgreSQL::Test::Cluster;
 use PostgreSQL::Test::Utils;
 use Test::More;
 
+# Helper functions to control walreceiver for testing wait conditions.
+# These allow us to stop WAL streaming so waiters block, then resume it.
+my $saved_primary_conninfo;
+
+sub stop_walreceiver
+{
+	my ($node) = @_;
+	$saved_primary_conninfo = $node->safe_psql(
+		'postgres', qq[
+		SELECT pg_catalog.quote_literal(setting)
+		FROM pg_settings
+		WHERE name = 'primary_conninfo';
+	]);
+	$node->safe_psql(
+		'postgres', qq[
+		ALTER SYSTEM SET primary_conninfo = '';
+		SELECT pg_reload_conf();
+	]);
+
+	$node->poll_query_until('postgres',
+		"SELECT NOT EXISTS (SELECT * FROM pg_stat_wal_receiver);");
+}
+
+sub resume_walreceiver
+{
+	my ($node) = @_;
+	$node->safe_psql(
+		'postgres', qq[
+		ALTER SYSTEM SET primary_conninfo = $saved_primary_conninfo;
+		SELECT pg_reload_conf();
+	]);
+
+	$node->poll_query_until('postgres',
+		"SELECT EXISTS (SELECT * FROM pg_stat_wal_receiver);");
+}
+
 # Initialize primary node
 my $node_primary = PostgreSQL::Test::Cluster->new('primary');
 $node_primary->init(allows_streaming => 1);
@@ -62,7 +99,52 @@ $output = $node_standby->safe_psql(
 ok((split("\n", $output))[-1] eq 30,
 	"standby reached the same LSN as primary");
 
-# 3. Check that waiting for unreachable LSN triggers the timeout.  The
+# 3. Check that WAIT FOR works with standby_write, standby_flush, and
+# primary_flush modes.
+$node_primary->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(31, 40))");
+my $lsn_write =
+  $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()");
+$output = $node_standby->safe_psql(
+	'postgres', qq[
+	WAIT FOR LSN '${lsn_write}' WITH (MODE 'standby_write', timeout '1d');
+	SELECT pg_lsn_cmp((SELECT written_lsn FROM pg_stat_wal_receiver), '${lsn_write}'::pg_lsn);
+]);
+
+ok( (split("\n", $output))[-1] >= 0,
+	"standby wrote WAL up to target LSN after WAIT FOR with MODE 'standby_write'"
+);
+
+$node_primary->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(41, 50))");
+my $lsn_flush =
+  $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()");
+$output = $node_standby->safe_psql(
+	'postgres', qq[
+	WAIT FOR LSN '${lsn_flush}' WITH (MODE 'standby_flush', timeout '1d');
+	SELECT pg_lsn_cmp(pg_last_wal_receive_lsn(), '${lsn_flush}'::pg_lsn);
+]);
+
+ok( (split("\n", $output))[-1] >= 0,
+	"standby flushed WAL up to target LSN after WAIT FOR with MODE 'standby_flush'"
+);
+
+# Check primary_flush mode on primary
+$node_primary->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(51, 60))");
+my $lsn_primary_flush =
+  $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()");
+$output = $node_primary->safe_psql(
+	'postgres', qq[
+	WAIT FOR LSN '${lsn_primary_flush}' WITH (MODE 'primary_flush', timeout '1d');
+	SELECT pg_lsn_cmp(pg_current_wal_flush_lsn(), '${lsn_primary_flush}'::pg_lsn);
+]);
+
+ok( (split("\n", $output))[-1] >= 0,
+	"primary flushed WAL up to target LSN after WAIT FOR with MODE 'primary_flush'"
+);
+
+# 4. Check that waiting for unreachable LSN triggers the timeout.  The
 # unreachable LSN must be well in advance.  So WAL records issued by
 # the concurrent autovacuum could not affect that.
 my $lsn3 =
@@ -88,14 +170,26 @@ $output = $node_standby->safe_psql(
 	WAIT FOR LSN '${lsn3}' WITH (timeout '10ms', no_throw);]);
 ok($output eq "timeout", "WAIT FOR returns correct status after timeout");
 
-# 4. Check that WAIT FOR triggers an error if called on primary,
-# within another function, or inside a transaction with an isolation level
-# higher than READ COMMITTED.
+# 5. Check mode validation: standby modes error on primary, primary mode errors
+# on standby, and primary_flush works on primary.  Also check that WAIT FOR
+# triggers an error if called within another function or inside a transaction
+# with an isolation level higher than READ COMMITTED.
+
+# Test standby_flush on primary - should error
+$node_primary->psql(
+	'postgres',
+	"WAIT FOR LSN '${lsn3}' WITH (MODE 'standby_flush');",
+	stderr => \$stderr);
+ok($stderr =~ /recovery is not in progress/,
+	"get an error when running standby_flush on the primary");
 
-$node_primary->psql('postgres', "WAIT FOR LSN '${lsn3}';",
+# Test primary_flush on standby - should error
+$node_standby->psql(
+	'postgres',
+	"WAIT FOR LSN '${lsn3}' WITH (MODE 'primary_flush');",
 	stderr => \$stderr);
-ok( $stderr =~ /recovery is not in progress/,
-	"get an error when running on the primary");
+ok($stderr =~ /recovery is in progress/,
+	"get an error when running primary_flush on the standby");
 
 $node_standby->psql(
 	'postgres',
@@ -125,7 +219,7 @@ ok( $stderr =~
 	  /WAIT FOR must be only called without an active or registered snapshot/,
 	"get an error when running within another function");
 
-# 5. Check parameter validation error cases on standby before promotion
+# 6. Check parameter validation error cases on standby before promotion
 my $test_lsn =
   $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()");
 
@@ -208,10 +302,26 @@ $node_standby->psql(
 ok( $stderr =~ /option "invalid_option" not recognized/,
 	"get error for invalid WITH clause option");
 
-# 6. Check the scenario of multiple LSN waiters.  We make 5 background
-# psql sessions each waiting for a corresponding insertion.  When waiting is
-# finished, stored procedures logs if there are visible as many rows as
-# should be.
+# Test invalid MODE value
+$node_standby->psql(
+	'postgres',
+	"WAIT FOR LSN '${test_lsn}' WITH (MODE 'invalid');",
+	stderr => \$stderr);
+ok($stderr =~ /unrecognized value for WAIT option "MODE": "invalid"/,
+	"get error for invalid MODE value");
+
+# Test duplicate MODE parameter
+$node_standby->psql(
+	'postgres',
+	"WAIT FOR LSN '${test_lsn}' WITH (MODE 'standby_replay', MODE 'standby_write');",
+	stderr => \$stderr);
+ok( $stderr =~ /conflicting or redundant options/,
+	"get error for duplicate MODE parameter");
+
+# 7a. Check the scenario of multiple standby_replay waiters.  We make 5
+# background psql sessions each waiting for a corresponding insertion.  When
+# waiting is finished, stored procedures logs if there are visible as many
+# rows as should be.
 $node_primary->safe_psql(
 	'postgres', qq[
 CREATE FUNCTION log_count(i int) RETURNS void AS \$\$
@@ -225,8 +335,17 @@ CREATE FUNCTION log_count(i int) RETURNS void AS \$\$
   END
 \$\$
 LANGUAGE plpgsql;
+
+CREATE FUNCTION log_wait_done(prefix text, i int) RETURNS void AS \$\$
+  BEGIN
+    RAISE LOG '% %', prefix, i;
+  END
+\$\$
+LANGUAGE plpgsql;
 ]);
+
 $node_standby->safe_psql('postgres', "SELECT pg_wal_replay_pause();");
+
 my @psql_sessions;
 for (my $i = 0; $i < 5; $i++)
 {
@@ -243,6 +362,7 @@ for (my $i = 0; $i < 5; $i++)
 		SELECT log_count(${i});
 	]);
 }
+
 my $log_offset = -s $node_standby->logfile;
 $node_standby->safe_psql('postgres', "SELECT pg_wal_replay_resume();");
 for (my $i = 0; $i < 5; $i++)
@@ -251,23 +371,246 @@ for (my $i = 0; $i < 5; $i++)
 	$psql_sessions[$i]->quit;
 }
 
-ok(1, 'multiple LSN waiters reported consistent data');
+ok(1, 'multiple standby_replay waiters reported consistent data');
+
+# 7b. Check the scenario of multiple standby_write waiters.
+# Stop walreceiver to ensure waiters actually block.
+stop_walreceiver($node_standby);
+
+# Generate WAL on primary (standby won't receive it yet)
+my @write_lsns;
+for (my $i = 0; $i < 5; $i++)
+{
+	$node_primary->safe_psql('postgres',
+		"INSERT INTO wait_test VALUES (100 + ${i});");
+	$write_lsns[$i] =
+	  $node_primary->safe_psql('postgres',
+		"SELECT pg_current_wal_insert_lsn()");
+}
+
+# Start standby_write waiters (they will block since walreceiver is stopped)
+my @write_sessions;
+for (my $i = 0; $i < 5; $i++)
+{
+	$write_sessions[$i] = $node_standby->background_psql('postgres');
+	$write_sessions[$i]->query_until(
+		qr/start/, qq[
+		\\echo start
+		WAIT FOR LSN '$write_lsns[$i]' WITH (MODE 'standby_write', timeout '1d');
+		SELECT log_wait_done('write_done', $i);
+	]);
+}
+
+# Verify waiters are blocked
+$node_standby->poll_query_until('postgres',
+	"SELECT count(*) = 5 FROM pg_stat_activity WHERE wait_event = 'WaitForWalWrite'"
+);
+
+# Restore walreceiver to unblock waiters
+my $write_log_offset = -s $node_standby->logfile;
+resume_walreceiver($node_standby);
+
+# Wait for all waiters to complete and close sessions
+for (my $i = 0; $i < 5; $i++)
+{
+	$node_standby->wait_for_log("write_done $i", $write_log_offset);
+	$write_sessions[$i]->quit;
+}
+
+# Verify on standby that WAL was written up to the target LSN
+$output = $node_standby->safe_psql('postgres',
+	"SELECT pg_lsn_cmp((SELECT written_lsn FROM pg_stat_wal_receiver), '$write_lsns[4]'::pg_lsn);"
+);
+
+ok($output >= 0,
+	"multiple standby_write waiters: standby wrote WAL up to target LSN");
+
+# 7c. Check the scenario of multiple standby_flush waiters.
+# Stop walreceiver to ensure waiters actually block.
+stop_walreceiver($node_standby);
+
+# Generate WAL on primary (standby won't receive it yet)
+my @flush_lsns;
+for (my $i = 0; $i < 5; $i++)
+{
+	$node_primary->safe_psql('postgres',
+		"INSERT INTO wait_test VALUES (200 + ${i});");
+	$flush_lsns[$i] =
+	  $node_primary->safe_psql('postgres',
+		"SELECT pg_current_wal_insert_lsn()");
+}
+
+# Start standby_flush waiters (they will block since walreceiver is stopped)
+my @flush_sessions;
+for (my $i = 0; $i < 5; $i++)
+{
+	$flush_sessions[$i] = $node_standby->background_psql('postgres');
+	$flush_sessions[$i]->query_until(
+		qr/start/, qq[
+		\\echo start
+		WAIT FOR LSN '$flush_lsns[$i]' WITH (MODE 'standby_flush', timeout '1d');
+		SELECT log_wait_done('flush_done', $i);
+	]);
+}
+
+# Verify waiters are blocked
+$node_standby->poll_query_until('postgres',
+	"SELECT count(*) = 5 FROM pg_stat_activity WHERE wait_event = 'WaitForWalFlush'"
+);
+
+# Restore walreceiver to unblock waiters
+my $flush_log_offset = -s $node_standby->logfile;
+resume_walreceiver($node_standby);
+
+# Wait for all waiters to complete and close sessions
+for (my $i = 0; $i < 5; $i++)
+{
+	$node_standby->wait_for_log("flush_done $i", $flush_log_offset);
+	$flush_sessions[$i]->quit;
+}
+
+# Verify on standby that WAL was flushed up to the target LSN
+$output = $node_standby->safe_psql('postgres',
+	"SELECT pg_lsn_cmp(pg_last_wal_receive_lsn(), '$flush_lsns[4]'::pg_lsn);"
+);
+
+ok($output >= 0,
+	"multiple standby_flush waiters: standby flushed WAL up to target LSN");
+
+# 7d. Check the scenario of mixed standby mode waiters (standby_replay,
+# standby_write, standby_flush) running concurrently.  We start 6 sessions:
+# 2 for each mode, all waiting for the same target LSN.  We stop the
+# walreceiver and pause replay to ensure all waiters block.  Then we resume
+# replay and restart the walreceiver to verify they unblock and complete
+# correctly.
+
+# Stop walreceiver first to ensure we can control the flow without hanging
+# (stopping it after pausing replay can hang if the startup process is paused).
+stop_walreceiver($node_standby);
+
+# Pause replay
+$node_standby->safe_psql('postgres', "SELECT pg_wal_replay_pause();");
+
+# Generate WAL on primary
+$node_primary->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(301, 310));");
+my $mixed_target_lsn =
+  $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()");
+
+# Start 6 waiters: 2 for each mode
+my @mixed_sessions;
+my @mixed_modes = ('standby_replay', 'standby_write', 'standby_flush');
+for (my $i = 0; $i < 6; $i++)
+{
+	$mixed_sessions[$i] = $node_standby->background_psql('postgres');
+	$mixed_sessions[$i]->query_until(
+		qr/start/, qq[
+		\\echo start
+		WAIT FOR LSN '${mixed_target_lsn}' WITH (MODE '$mixed_modes[$i % 3]', timeout '1d');
+		SELECT log_wait_done('mixed_done', $i);
+	]);
+}
+
+# Verify all waiters are blocked
+$node_standby->poll_query_until('postgres',
+	"SELECT count(*) = 6 FROM pg_stat_activity WHERE wait_event LIKE 'WaitForWal%'"
+);
+
+# Resume replay (waiters should still be blocked as no WAL has arrived)
+my $mixed_log_offset = -s $node_standby->logfile;
+$node_standby->safe_psql('postgres', "SELECT pg_wal_replay_resume();");
+$node_standby->poll_query_until('postgres',
+	"SELECT NOT pg_is_wal_replay_paused();");
+
+# Restore walreceiver to allow WAL to arrive
+resume_walreceiver($node_standby);
 
-# 7. Check that the standby promotion terminates the wait on LSN.  Start
-# waiting for an unreachable LSN then promote.  Check the log for the relevant
-# error message.  Also, check that waiting for already replayed LSN doesn't
-# cause an error even after promotion.
+# Wait for all sessions to complete and close them
+for (my $i = 0; $i < 6; $i++)
+{
+	$node_standby->wait_for_log("mixed_done $i", $mixed_log_offset);
+	$mixed_sessions[$i]->quit;
+}
+
+# Verify all modes reached the target LSN
+$output = $node_standby->safe_psql(
+	'postgres', qq[
+	SELECT pg_lsn_cmp((SELECT written_lsn FROM pg_stat_wal_receiver), '${mixed_target_lsn}'::pg_lsn) >= 0 AND
+	       pg_lsn_cmp(pg_last_wal_receive_lsn(), '${mixed_target_lsn}'::pg_lsn) >= 0 AND
+	       pg_lsn_cmp(pg_last_wal_replay_lsn(), '${mixed_target_lsn}'::pg_lsn) >= 0;
+]);
+
+ok($output eq 't',
+	"mixed mode waiters: all modes completed and reached target LSN");
+
+# 7e. Check the scenario of multiple primary_flush waiters on primary.
+# We start 5 background sessions waiting for different LSNs with primary_flush
+# mode.  Each waiter logs when done.
+my @primary_flush_lsns;
+for (my $i = 0; $i < 5; $i++)
+{
+	$node_primary->safe_psql('postgres',
+		"INSERT INTO wait_test VALUES (400 + ${i});");
+	$primary_flush_lsns[$i] =
+	  $node_primary->safe_psql('postgres',
+		"SELECT pg_current_wal_insert_lsn()");
+}
+
+my $primary_flush_log_offset = -s $node_primary->logfile;
+
+# Start primary_flush waiters
+my @primary_flush_sessions;
+for (my $i = 0; $i < 5; $i++)
+{
+	$primary_flush_sessions[$i] = $node_primary->background_psql('postgres');
+	$primary_flush_sessions[$i]->query_until(
+		qr/start/, qq[
+		\\echo start
+		WAIT FOR LSN '$primary_flush_lsns[$i]' WITH (MODE 'primary_flush', timeout '1d');
+		SELECT log_wait_done('primary_flush_done', $i);
+	]);
+}
+
+# The WAL should already be flushed, so waiters should complete quickly
+for (my $i = 0; $i < 5; $i++)
+{
+	$node_primary->wait_for_log("primary_flush_done $i",
+		$primary_flush_log_offset);
+	$primary_flush_sessions[$i]->quit;
+}
+
+# Verify on primary that WAL was flushed up to the target LSN
+$output = $node_primary->safe_psql('postgres',
+	"SELECT pg_lsn_cmp(pg_current_wal_flush_lsn(), '$primary_flush_lsns[4]'::pg_lsn);"
+);
+
+ok($output >= 0,
+	"multiple primary_flush waiters: primary flushed WAL up to target LSN");
+
+# 8. Check that the standby promotion terminates all standby wait modes.  Start
+# waiting for unreachable LSNs with standby_replay, standby_write, and
+# standby_flush modes, then promote.  Check the log for the relevant error
+# messages.  Also, check that waiting for already replayed LSN doesn't cause
+# an error even after promotion.
 my $lsn4 =
   $node_primary->safe_psql('postgres',
 	"SELECT pg_current_wal_insert_lsn() + 10000000000");
+
 my $lsn5 =
   $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()");
-my $psql_session = $node_standby->background_psql('postgres');
-$psql_session->query_until(
-	qr/start/, qq[
-	\\echo start
-	WAIT FOR LSN '${lsn4}';
-]);
+
+# Start background sessions waiting for unreachable LSN with all modes
+my @wait_modes = ('standby_replay', 'standby_write', 'standby_flush');
+my @wait_sessions;
+for (my $i = 0; $i < 3; $i++)
+{
+	$wait_sessions[$i] = $node_standby->background_psql('postgres');
+	$wait_sessions[$i]->query_until(
+		qr/start/, qq[
+		\\echo start
+		WAIT FOR LSN '${lsn4}' WITH (MODE '$wait_modes[$i]');
+	]);
+}
 
 # Make sure standby will be promoted at least at the primary insert LSN we
 # have just observed.  Use pg_switch_wal() to force the insert LSN to be
@@ -277,9 +620,16 @@ $node_primary->wait_for_catchup($node_standby);
 
 $log_offset = -s $node_standby->logfile;
 $node_standby->promote;
-$node_standby->wait_for_log('recovery is not in progress', $log_offset);
 
-ok(1, 'got error after standby promote');
+# Wait for all three sessions to get the error (each mode has distinct message)
+$node_standby->wait_for_log(qr/Recovery ended before target LSN.*was written/,
+	$log_offset);
+$node_standby->wait_for_log(qr/Recovery ended before target LSN.*was flushed/,
+	$log_offset);
+$node_standby->wait_for_log(
+	qr/Recovery ended before target LSN.*was replayed/, $log_offset);
+
+ok(1, 'promotion interrupted all wait modes');
 
 $node_standby->safe_psql('postgres', "WAIT FOR LSN '${lsn5}';");
 
@@ -295,8 +645,11 @@ ok($output eq "not in recovery",
 $node_standby->stop;
 $node_primary->stop;
 
-# If we send \q with $psql_session->quit the command can be sent to the session
+# If we send \q with $session->quit the command can be sent to the session
 # already closed. So \q is in initial script, here we only finish IPC::Run.
-$psql_session->{run}->finish;
+for (my $i = 0; $i < 3; $i++)
+{
+	$wait_sessions[$i]->{run}->finish;
+}
 
 done_testing();
-- 
2.51.0

