From 071f67d1fae98e397c071dce0b9993b3be0c0e9f Mon Sep 17 00:00:00 2001
From: alterego655 <824662526@qq.com>
Date: Tue, 25 Nov 2025 19:11:54 +0800
Subject: [PATCH v1 3/5] Add MODE parameter to WAIT FOR LSN command

Extend the WAIT FOR LSN command with an optional MODE parameter that
specifies which LSN type to wait for:

  WAIT FOR LSN '<lsn>' [MODE { REPLAY | WRITE | FLUSH }] [WITH (...)]

- REPLAY (default): Wait for WAL to be replayed to the specified LSN
- WRITE: Wait for WAL to be written (received) to the specified LSN
- FLUSH: Wait for WAL to be flushed to disk at the specified LSN

The default mode is REPLAY, matching the original behavior when MODE
is not specified. This follows the pattern used by LOCK command where
the mode parameter is optional with a sensible default.

The WRITE and FLUSH modes are useful for scenarios where applications
need to ensure WAL has been received or persisted on the standby
without necessarily waiting for replay to complete.

Also includes:
- Documentation updates for the new syntax and refactoring
  of existing WAIT FOR command documentation
- Test coverage for all three modes including mixed concurrent waiters
- Wakeup logic in walreceiver for WRITE/FLUSH waiters
---
 doc/src/sgml/ref/wait_for.sgml          | 184 ++++++++++++++++------
 src/backend/access/transam/xlog.c       |   6 +-
 src/backend/commands/wait.c             |  59 +++++--
 src/backend/parser/gram.y               |  21 ++-
 src/backend/replication/walreceiver.c   |  19 +++
 src/include/nodes/parsenodes.h          |  16 ++
 src/include/parser/kwlist.h             |   2 +
 src/test/recovery/t/049_wait_for_lsn.pl | 201 +++++++++++++++++++++---
 8 files changed, 422 insertions(+), 86 deletions(-)

diff --git a/doc/src/sgml/ref/wait_for.sgml b/doc/src/sgml/ref/wait_for.sgml
index 3b8e842d1de..efd851149c0 100644
--- a/doc/src/sgml/ref/wait_for.sgml
+++ b/doc/src/sgml/ref/wait_for.sgml
@@ -16,12 +16,13 @@ PostgreSQL documentation
 
  <refnamediv>
   <refname>WAIT FOR</refname>
-  <refpurpose>wait for target <acronym>LSN</acronym> to be replayed, optionally with a timeout</refpurpose>
+  <refpurpose>wait for WAL to reach a target <acronym>LSN</acronym> on a replica</refpurpose>
  </refnamediv>
 
  <refsynopsisdiv>
 <synopsis>
-WAIT FOR LSN '<replaceable class="parameter">lsn</replaceable>' [ WITH ( <replaceable class="parameter">option</replaceable> [, ...] ) ]
+WAIT FOR LSN '<replaceable class="parameter">lsn</replaceable>' [ MODE { REPLAY | FLUSH | WRITE } ]
+    [ WITH ( <replaceable class="parameter">option</replaceable> [, ...] ) ]
 
 <phrase>where <replaceable class="parameter">option</replaceable> can be:</phrase>
 
@@ -34,20 +35,22 @@ WAIT FOR LSN '<replaceable class="parameter">lsn</replaceable>' [ WITH ( <replac
   <title>Description</title>
 
   <para>
-    Waits until recovery replays <parameter>lsn</parameter>.
-    If no <parameter>timeout</parameter> is specified or it is set to
-    zero, this command waits indefinitely for the
-    <parameter>lsn</parameter>.
-    On timeout, or if the server is promoted before
-    <parameter>lsn</parameter> is reached, an error is emitted,
-    unless <literal>NO_THROW</literal> is specified in the WITH clause.
-    If <parameter>NO_THROW</parameter> is specified, then the command
-    doesn't throw errors.
+   Waits until the specified <parameter>lsn</parameter> is reached
+   according to the specified <parameter>mode</parameter>,
+   which determines whether to wait for WAL to be written, flushed, or replayed.
+   If no <parameter>timeout</parameter> is specified or it is set to
+   zero, this command waits indefinitely for the
+   <parameter>lsn</parameter>.
+   On timeout, or if the server is promoted before
+   <parameter>lsn</parameter> is reached, an error is emitted,
+   unless <literal>NO_THROW</literal> is specified in the WITH clause.
+   If <parameter>NO_THROW</parameter> is specified, then the command
+   doesn't throw errors.
   </para>
 
   <para>
-    The possible return values are <literal>success</literal>,
-    <literal>timeout</literal>, and <literal>not in recovery</literal>.
+   The possible return values are <literal>success</literal>,
+   <literal>timeout</literal>, and <literal>not in recovery</literal>.
   </para>
  </refsect1>
 
@@ -64,6 +67,53 @@ WAIT FOR LSN '<replaceable class="parameter">lsn</replaceable>' [ WITH ( <replac
     </listitem>
    </varlistentry>
 
+   <varlistentry>
+    <term><literal>MODE</literal></term>
+    <listitem>
+     <para>
+      Specifies the type of LSN processing to wait for. If not specified,
+      the default is <literal>REPLAY</literal>. The valid modes are:
+     </para>
+
+     <variablelist>
+      <varlistentry>
+       <term><literal>REPLAY</literal></term>
+       <listitem>
+        <para>
+         Wait for the LSN to be replayed (applied to the database).
+         After successful completion, <function>pg_last_wal_replay_lsn()</function>
+         will return a value greater than or equal to the target LSN.
+        </para>
+       </listitem>
+      </varlistentry>
+
+      <varlistentry>
+       <term><literal>FLUSH</literal></term>
+       <listitem>
+        <para>
+         Wait for the WAL containing the LSN to be received from the
+         primary and flushed to durable storage on the replica. This
+         provides a durability guarantee without waiting for the WAL
+         to be applied.
+        </para>
+       </listitem>
+      </varlistentry>
+
+      <varlistentry>
+       <term><literal>WRITE</literal></term>
+       <listitem>
+        <para>
+         Wait for the WAL containing the LSN to be received from the
+         primary and written to the operating system on the replica.
+         This is faster than <literal>FLUSH</literal> but provides weaker
+         durability guarantees since the data may still be in OS buffers.
+        </para>
+       </listitem>
+      </varlistentry>
+     </variablelist>
+    </listitem>
+   </varlistentry>
+
    <varlistentry>
     <term><literal>WITH ( <replaceable class="parameter">option</replaceable> [, ...] )</literal></term>
     <listitem>
@@ -135,9 +185,12 @@ WAIT FOR LSN '<replaceable class="parameter">lsn</replaceable>' [ WITH ( <replac
     <listitem>
      <para>
       This return value denotes that the database server is not in a recovery
-      state.  This might mean either the database server was not in recovery
-      at the moment of receiving the command, or it was promoted before
-      reaching the target <parameter>lsn</parameter>.
+      state. This might mean either the database server was not in recovery
+      at the moment of receiving the command (i.e., executed on a primary),
+      or it was promoted before reaching the target <parameter>lsn</parameter>.
+      In the promotion case, this status indicates a timeline change occurred,
+      and the application should re-evaluate whether the target LSN is still
+      relevant.
      </para>
     </listitem>
    </varlistentry>
@@ -148,25 +201,33 @@ WAIT FOR LSN '<replaceable class="parameter">lsn</replaceable>' [ WITH ( <replac
   <title>Notes</title>
 
   <para>
-    <command>WAIT FOR</command> command waits till
-    <parameter>lsn</parameter> to be replayed on standby.
-    That is, after this command execution, the value returned by
-    <function>pg_last_wal_replay_lsn</function> should be greater or equal
-    to the <parameter>lsn</parameter> value.  This is useful to achieve
-    read-your-writes-consistency, while using async replica for reads and
-    primary for writes.  In that case, the <acronym>lsn</acronym> of the last
-    modification should be stored on the client application side or the
-    connection pooler side.
+   <command>WAIT FOR</command> waits until the specified
+   <parameter>lsn</parameter> is reached according to the specified
+   <parameter>mode</parameter>. The <literal>REPLAY</literal> mode waits
+   for the LSN to be replayed (applied to the database), which is useful
+   to achieve read-your-writes consistency while using an async replica
+   for reads and the primary for writes. The <literal>FLUSH</literal> mode
+   waits for the WAL to be flushed to durable storage on the replica,
+   providing a durability guarantee without waiting for replay. The
+   <literal>WRITE</literal> mode waits for the WAL to be written to the
+   operating system, which is faster than flush but provides weaker
+   durability guarantees. In all cases, the <acronym>LSN</acronym> of the
+   last modification should be stored on the client application side or
+   the connection pooler side.
   </para>
 
   <para>
-    <command>WAIT FOR</command> command should be called on standby.
-    If a user runs <command>WAIT FOR</command> on primary, it
-    will error out unless <parameter>NO_THROW</parameter> is specified in the WITH clause.
-    However, if <command>WAIT FOR</command> is
-    called on primary promoted from standby and <literal>lsn</literal>
-    was already replayed, then the <command>WAIT FOR</command> command just
-    exits immediately.
+   <command>WAIT FOR</command> should be called on a standby.
+   If a user runs <command>WAIT FOR</command> on the primary, it
+   will error out unless <parameter>NO_THROW</parameter> is specified
+   in the WITH clause. However, if <command>WAIT FOR</command> is
+   called on a primary promoted from standby and <literal>lsn</literal>
+   was already reached, then the <command>WAIT FOR</command> command
+   just exits immediately. If the replica is promoted while waiting,
+   the command will return <literal>not in recovery</literal> (or throw
+   an error if <literal>NO_THROW</literal> is not specified). Promotion
+   creates a new timeline, and the LSN being waited for may refer to
+   WAL from the old timeline.
   </para>
 
 </refsect1>
@@ -175,21 +236,21 @@ WAIT FOR LSN '<replaceable class="parameter">lsn</replaceable>' [ WITH ( <replac
   <title>Examples</title>
 
   <para>
-    You can use <command>WAIT FOR</command> command to wait for
-    the <type>pg_lsn</type> value.  For example, an application could update
-    the <literal>movie</literal> table and get the <acronym>lsn</acronym> after
-    changes just made.  This example uses <function>pg_current_wal_insert_lsn</function>
-    on primary server to get the <acronym>lsn</acronym> given that
-    <varname>synchronous_commit</varname> could be set to
-    <literal>off</literal>.
+   You can use <command>WAIT FOR</command> command to wait for
+   the <type>pg_lsn</type> value.  For example, an application could update
+   the <literal>movie</literal> table and get the <acronym>lsn</acronym> after
+   changes just made.  This example uses <function>pg_current_wal_insert_lsn</function>
+   on primary server to get the <acronym>lsn</acronym> given that
+   <varname>synchronous_commit</varname> could be set to
+   <literal>off</literal>.
 
    <programlisting>
 postgres=# UPDATE movie SET genre = 'Dramatic' WHERE genre = 'Drama';
 UPDATE 100
 postgres=# SELECT pg_current_wal_insert_lsn();
-pg_current_wal_insert_lsn
---------------------
-0/306EE20
+ pg_current_wal_insert_lsn
+---------------------------
+ 0/306EE20
 (1 row)
 </programlisting>
 
@@ -198,9 +259,9 @@ pg_current_wal_insert_lsn
    changes made on primary should be guaranteed to be visible on replica.
 
 <programlisting>
-postgres=# WAIT FOR LSN '0/306EE20';
+postgres=# WAIT FOR LSN '0/306EE20' MODE REPLAY;
  status
---------
+---------
  success
 (1 row)
 postgres=# SELECT * FROM movie WHERE genre = 'Drama';
@@ -211,21 +272,46 @@ postgres=# SELECT * FROM movie WHERE genre = 'Drama';
   </para>
 
   <para>
-    If the target LSN is not reached before the timeout, the error is thrown.
+   Wait for flush (data durable on replica):
 
 <programlisting>
-postgres=# WAIT FOR LSN '0/306EE20' WITH (TIMEOUT '0.1s');
+postgres=# WAIT FOR LSN '0/306EE20' MODE FLUSH;
+ status
+---------
+ success
+(1 row)
+</programlisting>
+  </para>
+
+  <para>
+   Wait for write with timeout:
+
+<programlisting>
+postgres=# WAIT FOR LSN '0/306EE20' MODE WRITE WITH (TIMEOUT '100ms', NO_THROW);
+ status
+---------
+ success
+(1 row)
+</programlisting>
+  </para>
+
+  <para>
+   If the target LSN is not reached before the timeout, an error is thrown:
+
+<programlisting>
+postgres=# WAIT FOR LSN '0/306EE20' MODE REPLAY WITH (TIMEOUT '0.1s');
 ERROR:  timed out while waiting for target LSN 0/306EE20 to be replayed; current replay LSN 0/306EA60
 </programlisting>
   </para>
 
   <para>
    The same example uses <command>WAIT FOR</command> with
-   <parameter>NO_THROW</parameter> option.
+   <parameter>NO_THROW</parameter> option:
+
 <programlisting>
-postgres=# WAIT FOR LSN '0/306EE20' WITH (TIMEOUT '100ms', NO_THROW);
+postgres=# WAIT FOR LSN '0/306EE20' MODE REPLAY WITH (TIMEOUT '100ms', NO_THROW);
  status
---------
+---------
  timeout
 (1 row)
 </programlisting>
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 22d0a2e8c3a..a4c7a7c2b38 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -6240,10 +6240,12 @@ StartupXLOG(void)
 	LWLockRelease(ControlFileLock);
 
 	/*
-	 * Wake up all waiters for replay LSN.  They need to report an error that
-	 * recovery was ended before reaching the target LSN.
+	 * Wake up all waiters.  They need to report an error that recovery was
+	 * ended before reaching the target LSN.
 	 */
 	WaitLSNWakeup(WAIT_LSN_TYPE_REPLAY, InvalidXLogRecPtr);
+	WaitLSNWakeup(WAIT_LSN_TYPE_WRITE, InvalidXLogRecPtr);
+	WaitLSNWakeup(WAIT_LSN_TYPE_FLUSH_STANDBY, InvalidXLogRecPtr);
 
 	/*
 	 * Shutdown the recovery environment.  This must occur after
diff --git a/src/backend/commands/wait.c b/src/backend/commands/wait.c
index a37bddaefb2..73876ca5c7c 100644
--- a/src/backend/commands/wait.c
+++ b/src/backend/commands/wait.c
@@ -2,7 +2,7 @@
  *
  * wait.c
  *	  Implements WAIT FOR, which allows waiting for events such as
- *	  time passing or LSN having been replayed on replica.
+ *	  time passing or LSN having been replayed, flushed, or written.
  *
  * Portions Copyright (c) 2025, PostgreSQL Global Development Group
  *
@@ -15,6 +15,7 @@
 
 #include <math.h>
 
+#include "access/xlog.h"
 #include "access/xlogrecovery.h"
 #include "access/xlogwait.h"
 #include "commands/defrem.h"
@@ -28,12 +29,29 @@
 #include "utils/snapmgr.h"
 
 
+/*
+ * Type descriptor for WAIT FOR LSN wait types, used for error messages.
+ */
+typedef struct WaitLSNTypeDesc
+{
+	const char *noun;			/* "replay", "flush", "write" */
+	const char *verb;			/* "replayed", "flushed", "written" */
+}			WaitLSNTypeDesc;
+
+static const WaitLSNTypeDesc WaitLSNTypeDescs[] = {
+	[WAIT_LSN_TYPE_REPLAY] = {"replay", "replayed"},
+	[WAIT_LSN_TYPE_FLUSH_STANDBY] = {"flush", "flushed"},
+	[WAIT_LSN_TYPE_WRITE] = {"write", "written"},
+};
+
+
 void
 ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, DestReceiver *dest)
 {
 	XLogRecPtr	lsn;
 	int64		timeout = 0;
 	WaitLSNResult waitLSNResult;
+	WaitLSNType lsnType;
 	bool		throw = true;
 	TupleDesc	tupdesc;
 	TupOutputState *tstate;
@@ -41,6 +59,16 @@ ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, DestReceiver *dest)
 	bool		timeout_specified = false;
 	bool		no_throw_specified = false;
 
+	/*
+	 * Convert parse-time WaitLSNMode to runtime WaitLSNType. Values are
+	 * designed to match, so a simple cast is safe.
+	 */
+	lsnType = (WaitLSNType) stmt->mode;
+
+	/* Validate mode value (should never fail if grammar is correct) */
+	Assert(lsnType >= WAIT_LSN_TYPE_REPLAY &&
+		   lsnType < WAIT_LSN_TYPE_FLUSH_PRIMARY);
+
 	/* Parse and validate the mandatory LSN */
 	lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in,
 										  CStringGetDatum(stmt->lsn_literal)));
@@ -107,8 +135,8 @@ ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, DestReceiver *dest)
 	}
 
 	/*
-	 * We are going to wait for the LSN replay.  We should first care that we
-	 * don't hold a snapshot and correspondingly our MyProc->xmin is invalid.
+	 * We are going to wait for the LSN.  We should first care that we don't
+	 * hold a snapshot and correspondingly our MyProc->xmin is invalid.
 	 * Otherwise, our snapshot could prevent the replay of WAL records
 	 * implying a kind of self-deadlock.  This is the reason why WAIT FOR is a
 	 * command, not a procedure or function.
@@ -140,7 +168,7 @@ ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, DestReceiver *dest)
 	 */
 	Assert(MyProc->xmin == InvalidTransactionId);
 
-	waitLSNResult = WaitForLSN(WAIT_LSN_TYPE_REPLAY, lsn, timeout);
+	waitLSNResult = WaitForLSN(lsnType, lsn, timeout);
 
 	/*
 	 * Process the result of WaitForLSN().  Throw appropriate error if needed.
@@ -154,11 +182,18 @@ ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, DestReceiver *dest)
 
 		case WAIT_LSN_RESULT_TIMEOUT:
 			if (throw)
+			{
+				const		WaitLSNTypeDesc *desc = &WaitLSNTypeDescs[lsnType];
+				XLogRecPtr	currentLSN = GetCurrentLSNForWaitType(lsnType);
+
 				ereport(ERROR,
 						errcode(ERRCODE_QUERY_CANCELED),
-						errmsg("timed out while waiting for target LSN %X/%08X to be replayed; current replay LSN %X/%08X",
+						errmsg("timed out while waiting for target LSN %X/%08X to be %s; current %s LSN %X/%08X",
 							   LSN_FORMAT_ARGS(lsn),
-							   LSN_FORMAT_ARGS(GetXLogReplayRecPtr(NULL))));
+							   desc->verb,
+							   desc->noun,
+							   LSN_FORMAT_ARGS(currentLSN)));
+			}
 			else
 				result = "timeout";
 			break;
@@ -166,20 +201,26 @@ ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, DestReceiver *dest)
 		case WAIT_LSN_RESULT_NOT_IN_RECOVERY:
 			if (throw)
 			{
+				const		WaitLSNTypeDesc *desc = &WaitLSNTypeDescs[lsnType];
+				XLogRecPtr	currentLSN = GetCurrentLSNForWaitType(lsnType);
+
 				if (PromoteIsTriggered())
 				{
 					ereport(ERROR,
 							errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 							errmsg("recovery is not in progress"),
-							errdetail("Recovery ended before replaying target LSN %X/%08X; last replay LSN %X/%08X.",
+							errdetail("Recovery ended before target LSN %X/%08X was %s; last %s LSN %X/%08X.",
 									  LSN_FORMAT_ARGS(lsn),
-									  LSN_FORMAT_ARGS(GetXLogReplayRecPtr(NULL))));
+									  desc->verb,
+									  desc->noun,
+									  LSN_FORMAT_ARGS(currentLSN)));
 				}
 				else
 					ereport(ERROR,
 							errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 							errmsg("recovery is not in progress"),
-							errhint("Waiting for the replay LSN can only be executed during recovery."));
+							errhint("Waiting for the %s LSN can only be executed during recovery.",
+									desc->noun));
 			}
 			else
 				result = "not in recovery";
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index c3a0a354a9c..57b3e91893c 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -640,6 +640,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 %type <windef>	window_definition over_clause window_specification
 				opt_frame_clause frame_extent frame_bound
 %type <ival>	null_treatment opt_window_exclusion_clause
+%type <ival>	opt_wait_lsn_mode
 %type <str>		opt_existing_window_name
 %type <boolean> opt_if_not_exists
 %type <boolean> opt_unique_null_treatment
@@ -729,7 +730,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 	ESCAPE EVENT EXCEPT EXCLUDE EXCLUDING EXCLUSIVE EXECUTE EXISTS EXPLAIN
 	EXPRESSION EXTENSION EXTERNAL EXTRACT
 
-	FALSE_P FAMILY FETCH FILTER FINALIZE FIRST_P FLOAT_P FOLLOWING FOR
+	FALSE_P FAMILY FETCH FILTER FINALIZE FIRST_P FLOAT_P FLUSH FOLLOWING FOR
 	FORCE FOREIGN FORMAT FORWARD FREEZE FROM FULL FUNCTION FUNCTIONS
 
 	GENERATED GLOBAL GRANT GRANTED GREATEST GROUP_P GROUPING GROUPS
@@ -770,7 +771,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 	QUOTE QUOTES
 
 	RANGE READ REAL REASSIGN RECURSIVE REF_P REFERENCES REFERENCING
-	REFRESH REINDEX RELATIVE_P RELEASE RENAME REPEATABLE REPLACE REPLICA
+	REFRESH REINDEX RELATIVE_P RELEASE RENAME REPEATABLE REPLACE REPLAY REPLICA
 	RESET RESPECT_P RESTART RESTRICT RETURN RETURNING RETURNS REVOKE RIGHT ROLE ROLLBACK ROLLUP
 	ROUTINE ROUTINES ROW ROWS RULE
 
@@ -16489,15 +16490,23 @@ xml_passing_mech:
  *****************************************************************************/
 
 WaitStmt:
-			WAIT FOR LSN_P Sconst opt_wait_with_clause
+			WAIT FOR LSN_P Sconst opt_wait_lsn_mode opt_wait_with_clause
 				{
 					WaitStmt *n = makeNode(WaitStmt);
 					n->lsn_literal = $4;
-					n->options = $5;
+					n->mode = $5;
+					n->options = $6;
 					$$ = (Node *) n;
 				}
 			;
 
+opt_wait_lsn_mode:
+			MODE REPLAY			{ $$ = WAIT_LSN_MODE_REPLAY; }
+			| MODE FLUSH		{ $$ = WAIT_LSN_MODE_FLUSH; }
+			| MODE WRITE		{ $$ = WAIT_LSN_MODE_WRITE; }
+			| /*EMPTY*/			{ $$ = WAIT_LSN_MODE_REPLAY; }
+			;
+
 opt_wait_with_clause:
 			WITH '(' utility_option_list ')'		{ $$ = $3; }
 			| /*EMPTY*/								{ $$ = NIL; }
@@ -17937,6 +17946,7 @@ unreserved_keyword:
 			| FILTER
 			| FINALIZE
 			| FIRST_P
+			| FLUSH
 			| FOLLOWING
 			| FORCE
 			| FORMAT
@@ -18071,6 +18081,7 @@ unreserved_keyword:
 			| RENAME
 			| REPEATABLE
 			| REPLACE
+			| REPLAY
 			| REPLICA
 			| RESET
 			| RESPECT_P
@@ -18524,6 +18535,7 @@ bare_label_keyword:
 			| FINALIZE
 			| FIRST_P
 			| FLOAT_P
+			| FLUSH
 			| FOLLOWING
 			| FORCE
 			| FOREIGN
@@ -18706,6 +18718,7 @@ bare_label_keyword:
 			| RENAME
 			| REPEATABLE
 			| REPLACE
+			| REPLAY
 			| REPLICA
 			| RESET
 			| RESTART
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 4217fc54e2e..818049599ed 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -57,6 +57,7 @@
 #include "access/xlog_internal.h"
 #include "access/xlogarchive.h"
 #include "access/xlogrecovery.h"
+#include "access/xlogwait.h"
 #include "catalog/pg_authid.h"
 #include "funcapi.h"
 #include "libpq/pqformat.h"
@@ -965,6 +966,15 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli)
 	/* Update shared-memory status */
 	pg_atomic_write_u64(&WalRcv->writtenUpto, LogstreamResult.Write);
 
+	/*
+	 * If we wrote an LSN that someone was waiting for then walk over the
+	 * shared memory array and set latches to notify the waiters.
+	 */
+	if (waitLSNState &&
+		(LogstreamResult.Write >=
+		 pg_atomic_read_u64(&waitLSNState->minWaitedLSN[WAIT_LSN_TYPE_WRITE])))
+		WaitLSNWakeup(WAIT_LSN_TYPE_WRITE, LogstreamResult.Write);
+
 	/*
 	 * Close the current segment if it's fully written up in the last cycle of
 	 * the loop, to create its archive notification file soon. Otherwise WAL
@@ -1004,6 +1014,15 @@ XLogWalRcvFlush(bool dying, TimeLineID tli)
 		}
 		SpinLockRelease(&walrcv->mutex);
 
+		/*
+		 * If we flushed an LSN that someone was waiting for then walk over
+		 * the shared memory array and set latches to notify the waiters.
+		 */
+		if (waitLSNState &&
+			(LogstreamResult.Flush >=
+			 pg_atomic_read_u64(&waitLSNState->minWaitedLSN[WAIT_LSN_TYPE_FLUSH_STANDBY])))
+			WaitLSNWakeup(WAIT_LSN_TYPE_FLUSH_STANDBY, LogstreamResult.Flush);
+
 		/* Signal the startup process and walsender that new WAL has arrived */
 		WakeupRecovery();
 		if (AllowCascadeReplication())
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index d14294a4ece..68dc49dc2da 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -4385,10 +4385,26 @@ typedef struct DropSubscriptionStmt
 	DropBehavior behavior;		/* RESTRICT or CASCADE behavior */
 } DropSubscriptionStmt;
 
+/*
+ * WaitLSNMode - MODE parameter for WAIT FOR command
+ *
+ * These values are defined to match WaitLSNType in access/xlogwait.h
+ * for efficient conversion without overhead. The values must be kept
+ * in sync with WaitLSNType.
+ */
+typedef enum WaitLSNMode
+{
+	WAIT_LSN_MODE_REPLAY = 0,	/* Wait for LSN replay on standby */
+	WAIT_LSN_MODE_FLUSH = 1,	/* Wait for LSN flush to disk on standby */
+	WAIT_LSN_MODE_WRITE = 2		/* Wait for LSN write to WAL buffers on
+								 * standby */
+} WaitLSNMode;
+
 typedef struct WaitStmt
 {
 	NodeTag		type;
 	char	   *lsn_literal;	/* LSN string from grammar */
+	WaitLSNMode mode;			/* Wait mode: REPLAY/FLUSH/WRITE */
 	List	   *options;		/* List of DefElem nodes */
 } WaitStmt;
 
diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h
index 5d4fe27ef96..7ad8b11b725 100644
--- a/src/include/parser/kwlist.h
+++ b/src/include/parser/kwlist.h
@@ -176,6 +176,7 @@ PG_KEYWORD("filter", FILTER, UNRESERVED_KEYWORD, AS_LABEL)
 PG_KEYWORD("finalize", FINALIZE, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("first", FIRST_P, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("float", FLOAT_P, COL_NAME_KEYWORD, BARE_LABEL)
+PG_KEYWORD("flush", FLUSH, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("following", FOLLOWING, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("for", FOR, RESERVED_KEYWORD, AS_LABEL)
 PG_KEYWORD("force", FORCE, UNRESERVED_KEYWORD, BARE_LABEL)
@@ -378,6 +379,7 @@ PG_KEYWORD("release", RELEASE, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("rename", RENAME, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("repeatable", REPEATABLE, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("replace", REPLACE, UNRESERVED_KEYWORD, BARE_LABEL)
+PG_KEYWORD("replay", REPLAY, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("replica", REPLICA, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("reset", RESET, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("respect", RESPECT_P, UNRESERVED_KEYWORD, AS_LABEL)
diff --git a/src/test/recovery/t/049_wait_for_lsn.pl b/src/test/recovery/t/049_wait_for_lsn.pl
index e0ddb06a2f0..e579b98f019 100644
--- a/src/test/recovery/t/049_wait_for_lsn.pl
+++ b/src/test/recovery/t/049_wait_for_lsn.pl
@@ -1,4 +1,4 @@
-# Checks waiting for the LSN replay on standby using
+# Checks waiting for the LSN replay/write/flush on standby using
 # the WAIT FOR command.
 use strict;
 use warnings FATAL => 'all';
@@ -62,7 +62,40 @@ $output = $node_standby->safe_psql(
 ok((split("\n", $output))[-1] eq 30,
 	"standby reached the same LSN as primary");
 
-# 3. Check that waiting for unreachable LSN triggers the timeout.  The
+# 3. Check that WAIT FOR works with WRITE and FLUSH modes.
+$node_primary->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(31, 40))");
+my $lsn_write =
+  $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()");
+$node_standby->safe_psql('postgres',
+	"WAIT FOR LSN '${lsn_write}' MODE WRITE WITH (timeout '1d');");
+
+# Verify via pg_stat_replication that standby reported the write
+my $standby_write_lsn = $node_primary->safe_psql(
+	'postgres', qq[
+	SELECT write_lsn FROM pg_stat_replication
+	WHERE application_name = 'standby';
+]);
+
+ok( $node_primary->safe_psql('postgres',
+		"SELECT '${standby_write_lsn}'::pg_lsn >= '${lsn_write}'::pg_lsn") eq
+	  't',
+	"standby wrote WAL up to target LSN after WAIT FOR MODE WRITE");
+
+$node_primary->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(41, 50))");
+my $lsn_flush =
+  $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()");
+$output = $node_standby->safe_psql(
+	'postgres', qq[
+	WAIT FOR LSN '${lsn_flush}' MODE FLUSH WITH (timeout '1d');
+	SELECT pg_lsn_cmp(pg_last_wal_receive_lsn(), '${lsn_flush}'::pg_lsn);
+]);
+
+ok((split("\n", $output))[-1] >= 0,
+	"standby flushed WAL up to target LSN after WAIT FOR MODE FLUSH");
+
+# 4. Check that waiting for unreachable LSN triggers the timeout.  The
 # unreachable LSN must be well in advance.  So WAL records issued by
 # the concurrent autovacuum could not affect that.
 my $lsn3 =
@@ -88,7 +121,7 @@ $output = $node_standby->safe_psql(
 	WAIT FOR LSN '${lsn3}' WITH (timeout '10ms', no_throw);]);
 ok($output eq "timeout", "WAIT FOR returns correct status after timeout");
 
-# 4. Check that WAIT FOR triggers an error if called on primary,
+# 5. Check that WAIT FOR triggers an error if called on primary,
 # within another function, or inside a transaction with an isolation level
 # higher than READ COMMITTED.
 
@@ -125,7 +158,7 @@ ok( $stderr =~
 	  /WAIT FOR must be only called without an active or registered snapshot/,
 	"get an error when running within another function");
 
-# 5. Check parameter validation error cases on standby before promotion
+# 6. Check parameter validation error cases on standby before promotion
 my $test_lsn =
   $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()");
 
@@ -208,7 +241,7 @@ $node_standby->psql(
 ok( $stderr =~ /option "invalid_option" not recognized/,
 	"get error for invalid WITH clause option");
 
-# 6. Check the scenario of multiple LSN waiters.  We make 5 background
+# 7a. Check the scenario of multiple REPLAY waiters.  We make 5 background
 # psql sessions each waiting for a corresponding insertion.  When waiting is
 # finished, stored procedures logs if there are visible as many rows as
 # should be.
@@ -239,7 +272,7 @@ for (my $i = 0; $i < 5; $i++)
 	$psql_sessions[$i]->query_until(
 		qr/start/, qq[
 		\\echo start
-		WAIT FOR LSN '${lsn}';
+		WAIT FOR LSN '${lsn}' MODE REPLAY;
 		SELECT log_count(${i});
 	]);
 }
@@ -251,23 +284,138 @@ for (my $i = 0; $i < 5; $i++)
 	$psql_sessions[$i]->quit;
 }
 
-ok(1, 'multiple LSN waiters reported consistent data');
+ok(1, 'multiple REPLAY waiters reported consistent data');
+
+# 7b. Check the scenario of multiple WRITE waiters.
+my @write_sessions;
+my @write_lsns;
+for (my $i = 0; $i < 3; $i++)
+{
+	$node_primary->safe_psql('postgres',
+		"INSERT INTO wait_test VALUES (100 + ${i});");
+	$write_lsns[$i] =
+	  $node_primary->safe_psql('postgres',
+		"SELECT pg_current_wal_insert_lsn()");
+	$write_sessions[$i] = $node_standby->background_psql('postgres');
+	$write_sessions[$i]->query_until(
+		qr/start/, qq[
+		\\echo start
+		WAIT FOR LSN '$write_lsns[$i]' MODE WRITE WITH (timeout '1d');
+	]);
+}
+
+# Wait for all WAIT FOR LSN commands to complete
+for (my $i = 0; $i < 3; $i++)
+{
+	$write_sessions[$i]->{run}->finish;
+}
+
+# Verify on standby that WAL was written up to the target LSN
+$output = $node_standby->safe_psql('postgres',
+	"SELECT pg_lsn_cmp(pg_last_wal_write_lsn(), '$write_lsns[2]'::pg_lsn);");
 
-# 7. Check that the standby promotion terminates the wait on LSN.  Start
-# waiting for an unreachable LSN then promote.  Check the log for the relevant
-# error message.  Also, check that waiting for already replayed LSN doesn't
-# cause an error even after promotion.
+ok($output >= 0,
+	"multiple WRITE waiters: standby wrote WAL up to target LSN");
+
+# 7c. Check the scenario of multiple FLUSH waiters.
+my @flush_sessions;
+my @flush_lsns;
+for (my $i = 0; $i < 3; $i++)
+{
+	$node_primary->safe_psql('postgres',
+		"INSERT INTO wait_test VALUES (200 + ${i});");
+	$flush_lsns[$i] =
+	  $node_primary->safe_psql('postgres',
+		"SELECT pg_current_wal_insert_lsn()");
+	$flush_sessions[$i] = $node_standby->background_psql('postgres');
+	$flush_sessions[$i]->query_until(
+		qr/start/, qq[
+		\\echo start
+		WAIT FOR LSN '$flush_lsns[$i]' MODE FLUSH WITH (timeout '1d');
+	]);
+}
+
+# Wait for all WAIT FOR LSN commands to complete
+for (my $i = 0; $i < 3; $i++)
+{
+	$flush_sessions[$i]->{run}->finish;
+}
+
+# Verify on standby that WAL was flushed up to the target LSN
+$output = $node_standby->safe_psql('postgres',
+	"SELECT pg_lsn_cmp(pg_last_wal_receive_lsn(), '$flush_lsns[2]'::pg_lsn);"
+);
+
+ok($output >= 0,
+	"multiple FLUSH waiters: standby flushed WAL up to target LSN");
+
+# 7d. Check the scenario of mixed mode waiters (REPLAY, WRITE, FLUSH)
+# running concurrently.  We start 6 sessions: 2 for each mode, all waiting
+# for the same target LSN.  When all complete, we verify that the replay LSN
+# (the slowest to advance due to recovery_min_apply_delay) has reached the
+# target.  Since REPLAY waiters block until replay completes, and WRITE/FLUSH
+# complete earlier, successful completion of all sessions proves proper
+# coordination.
+$node_standby->safe_psql('postgres', "SELECT pg_wal_replay_pause();");
+
+$node_primary->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(301, 310));");
+my $mixed_target_lsn =
+  $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()");
+
+my @mixed_sessions;
+my @mixed_modes = ('REPLAY', 'WRITE', 'FLUSH');
+for (my $i = 0; $i < 6; $i++)
+{
+	$mixed_sessions[$i] = $node_standby->background_psql('postgres');
+	$mixed_sessions[$i]->query_until(
+		qr/start/, qq[
+		\\echo start
+		WAIT FOR LSN '${mixed_target_lsn}' MODE $mixed_modes[$i % 3] WITH (timeout '1d');
+	]);
+}
+
+# Resume replay so REPLAY waiters can complete
+$node_standby->safe_psql('postgres', "SELECT pg_wal_replay_resume();");
+
+# Wait for all sessions to complete - this blocks until WAIT FOR LSN returns
+for (my $i = 0; $i < 6; $i++)
+{
+	$mixed_sessions[$i]->{run}->finish;
+}
+
+# Verify: if all waiters completed, then the slowest (REPLAY) must have
+# reached the target LSN, which implies WRITE and FLUSH also succeeded
+$output = $node_standby->safe_psql('postgres',
+	"SELECT pg_lsn_cmp(pg_last_wal_replay_lsn(), '${mixed_target_lsn}'::pg_lsn);"
+);
+
+ok($output >= 0,
+	"mixed mode waiters: all modes completed, replay reached target LSN");
+
+# 8. Check that the standby promotion terminates all wait modes.  Start
+# waiting for unreachable LSNs with REPLAY, WRITE, and FLUSH modes, then
+# promote.  Check the log for the relevant error messages.  Also, check that
+# waiting for already replayed LSN doesn't cause an error even after promotion.
 my $lsn4 =
   $node_primary->safe_psql('postgres',
 	"SELECT pg_current_wal_insert_lsn() + 10000000000");
+
 my $lsn5 =
   $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()");
-my $psql_session = $node_standby->background_psql('postgres');
-$psql_session->query_until(
-	qr/start/, qq[
-	\\echo start
-	WAIT FOR LSN '${lsn4}';
-]);
+
+# Start background sessions waiting for unreachable LSN with all modes
+my @wait_modes = ('REPLAY', 'WRITE', 'FLUSH');
+my @wait_sessions;
+for (my $i = 0; $i < 3; $i++)
+{
+	$wait_sessions[$i] = $node_standby->background_psql('postgres');
+	$wait_sessions[$i]->query_until(
+		qr/start/, qq[
+		\\echo start
+		WAIT FOR LSN '${lsn4}' MODE $wait_modes[$i];
+	]);
+}
 
 # Make sure standby will be promoted at least at the primary insert LSN we
 # have just observed.  Use pg_switch_wal() to force the insert LSN to be
@@ -277,17 +425,23 @@ $node_primary->wait_for_catchup($node_standby);
 
 $log_offset = -s $node_standby->logfile;
 $node_standby->promote;
+
+# Wait for at least one "recovery is not in progress" error to appear
 $node_standby->wait_for_log('recovery is not in progress', $log_offset);
 
-ok(1, 'got error after standby promote');
+# Verify all three sessions got the error by checking the log contains
+# the error message at least three times (from the promotion point)
+my $log_contents = slurp_file($node_standby->logfile, $log_offset);
+my $error_count = () = $log_contents =~ /recovery is not in progress/g;
+ok($error_count >= 3, 'promotion interrupted all wait modes');
 
-$node_standby->safe_psql('postgres', "WAIT FOR LSN '${lsn5}';");
+$node_standby->safe_psql('postgres', "WAIT FOR LSN '${lsn5}' MODE REPLAY;");
 
 ok(1, 'wait for already replayed LSN exits immediately even after promotion');
 
 $output = $node_standby->safe_psql(
 	'postgres', qq[
-	WAIT FOR LSN '${lsn4}' WITH (timeout '10ms', no_throw);]);
+	WAIT FOR LSN '${lsn4}' MODE REPLAY WITH (timeout '10ms', no_throw);]);
 ok($output eq "not in recovery",
 	"WAIT FOR returns correct status after standby promotion");
 
@@ -295,8 +449,11 @@ ok($output eq "not in recovery",
 $node_standby->stop;
 $node_primary->stop;
 
-# If we send \q with $psql_session->quit the command can be sent to the session
+# If we send \q with $session->quit the command can be sent to the session
 # already closed. So \q is in initial script, here we only finish IPC::Run.
-$psql_session->{run}->finish;
+for (my $i = 0; $i < 3; $i++)
+{
+	$wait_sessions[$i]->{run}->finish;
+}
 
 done_testing();
-- 
2.51.0

