From 1367c1f3322b93190fcd4ca70ab309efd8556c77 Mon Sep 17 00:00:00 2001
From: alterego655 <824662526@qq.com>
Date: Tue, 25 Nov 2025 19:11:54 +0800
Subject: [PATCH v2] Add MODE parameter to WAIT FOR LSN command

Extend the WAIT FOR LSN command with an optional MODE parameter that
specifies which LSN type to wait for:

  WAIT FOR LSN '<lsn>' [MODE { REPLAY | WRITE | FLUSH }] [WITH (...)]

- REPLAY (default): Wait for WAL to be replayed to the specified LSN
- WRITE: Wait for WAL to be written (received) to the specified LSN
- FLUSH: Wait for WAL to be flushed to disk at the specified LSN

The default mode is REPLAY, matching the original behavior when MODE
is not specified. This follows the pattern used by LOCK command where
the mode parameter is optional with a sensible default.

The WRITE and FLUSH modes are useful for scenarios where applications
need to ensure WAL has been received or persisted on the standby
without necessarily waiting for replay to complete.

Also includes:
- Documentation updates for the new syntax and refactoring
  of existing WAIT FOR command documentation
- Test coverage for all three modes including mixed concurrent waiters
- Wakeup logic in walreceiver for WRITE/FLUSH waiters
---
 doc/src/sgml/ref/wait_for.sgml          | 188 +++++++++++----
 src/backend/access/transam/xlog.c       |   6 +-
 src/backend/commands/wait.c             |  64 ++++-
 src/backend/parser/gram.y               |  21 +-
 src/backend/replication/walreceiver.c   |  19 ++
 src/include/nodes/parsenodes.h          |  11 +
 src/include/parser/kwlist.h             |   2 +
 src/test/recovery/t/049_wait_for_lsn.pl | 299 ++++++++++++++++++++++--
 8 files changed, 523 insertions(+), 87 deletions(-)

diff --git a/doc/src/sgml/ref/wait_for.sgml b/doc/src/sgml/ref/wait_for.sgml
index 3b8e842d1de..a5e7f6c6fe9 100644
--- a/doc/src/sgml/ref/wait_for.sgml
+++ b/doc/src/sgml/ref/wait_for.sgml
@@ -16,12 +16,13 @@ PostgreSQL documentation
 
  <refnamediv>
   <refname>WAIT FOR</refname>
-  <refpurpose>wait for target <acronym>LSN</acronym> to be replayed, optionally with a timeout</refpurpose>
+  <refpurpose>wait for WAL to reach a target <acronym>LSN</acronym> on a replica</refpurpose>
  </refnamediv>
 
  <refsynopsisdiv>
 <synopsis>
-WAIT FOR LSN '<replaceable class="parameter">lsn</replaceable>' [ WITH ( <replaceable class="parameter">option</replaceable> [, ...] ) ]
+WAIT FOR LSN '<replaceable class="parameter">lsn</replaceable>' [ MODE { REPLAY | FLUSH | WRITE } ]
+    [ WITH ( <replaceable class="parameter">option</replaceable> [, ...] ) ]
 
 <phrase>where <replaceable class="parameter">option</replaceable> can be:</phrase>
 
@@ -34,20 +35,22 @@ WAIT FOR LSN '<replaceable class="parameter">lsn</replaceable>' [ WITH ( <replac
   <title>Description</title>
 
   <para>
-    Waits until recovery replays <parameter>lsn</parameter>.
-    If no <parameter>timeout</parameter> is specified or it is set to
-    zero, this command waits indefinitely for the
-    <parameter>lsn</parameter>.
-    On timeout, or if the server is promoted before
-    <parameter>lsn</parameter> is reached, an error is emitted,
-    unless <literal>NO_THROW</literal> is specified in the WITH clause.
-    If <parameter>NO_THROW</parameter> is specified, then the command
-    doesn't throw errors.
+   Waits until the specified <parameter>lsn</parameter> is reached
+   according to the specified <parameter>mode</parameter>,
+   which determines whether to wait for WAL to be written, flushed, or replayed.
+   If no <parameter>timeout</parameter> is specified or it is set to
+   zero, this command waits indefinitely for the
+   <parameter>lsn</parameter>.
+   On timeout, or if the server is promoted before
+   <parameter>lsn</parameter> is reached, an error is emitted,
+   unless <literal>NO_THROW</literal> is specified in the WITH clause.
+   If <parameter>NO_THROW</parameter> is specified, then the command
+   doesn't throw errors.
   </para>
 
   <para>
-    The possible return values are <literal>success</literal>,
-    <literal>timeout</literal>, and <literal>not in recovery</literal>.
+   The possible return values are <literal>success</literal>,
+   <literal>timeout</literal>, and <literal>not in recovery</literal>.
   </para>
  </refsect1>
 
@@ -64,6 +67,57 @@ WAIT FOR LSN '<replaceable class="parameter">lsn</replaceable>' [ WITH ( <replac
     </listitem>
    </varlistentry>
 
+   <varlistentry>
+    <term><literal>MODE</literal></term>
+    <listitem>
+     <para>
+      Specifies the type of LSN processing to wait for. If not specified,
+      the default is <literal>REPLAY</literal>. The valid modes are:
+     </para>
+
+     <variablelist>
+      <varlistentry>
+       <term><literal>REPLAY</literal></term>
+       <listitem>
+        <para>
+         Wait for the LSN to be replayed (applied to the database).
+         After successful completion, <function>pg_last_wal_replay_lsn()</function>
+         will return a value greater than or equal to the target LSN.
+        </para>
+       </listitem>
+      </varlistentry>
+
+      <varlistentry>
+       <term><literal>FLUSH</literal></term>
+       <listitem>
+        <para>
+         Wait for the WAL containing the LSN to be received from the
+         primary and synced to durable storage via <function>fsync()</function>.
+         This provides a durability guarantee without waiting for the WAL
+         to be applied. After successful completion,
+         <function>pg_last_wal_receive_lsn()</function> will return a value
+         greater than or equal to the target LSN.
+        </para>
+       </listitem>
+      </varlistentry>
+
+      <varlistentry>
+       <term><literal>WRITE</literal></term>
+       <listitem>
+        <para>
+         Wait for the WAL containing the LSN to be received from the
+         primary and passed to the operating system via <function>write()</function>.
+         This is faster than <literal>FLUSH</literal> but provides weaker
+         durability guarantees since the data may still be in OS buffers.
+         After successful completion, <function>pg_last_wal_write_lsn()</function>
+         will return a value greater than or equal to the target LSN.
+        </para>
+       </listitem>
+      </varlistentry>
+     </variablelist>
+    </listitem>
+   </varlistentry>
+
    <varlistentry>
     <term><literal>WITH ( <replaceable class="parameter">option</replaceable> [, ...] )</literal></term>
     <listitem>
@@ -135,9 +189,12 @@ WAIT FOR LSN '<replaceable class="parameter">lsn</replaceable>' [ WITH ( <replac
     <listitem>
      <para>
       This return value denotes that the database server is not in a recovery
-      state.  This might mean either the database server was not in recovery
-      at the moment of receiving the command, or it was promoted before
-      reaching the target <parameter>lsn</parameter>.
+      state. This might mean either the database server was not in recovery
+      at the moment of receiving the command (i.e., executed on a primary),
+      or it was promoted before reaching the target <parameter>lsn</parameter>.
+      In the promotion case, this status indicates a timeline change occurred,
+      and the application should re-evaluate whether the target LSN is still
+      relevant.
      </para>
     </listitem>
    </varlistentry>
@@ -148,25 +205,33 @@ WAIT FOR LSN '<replaceable class="parameter">lsn</replaceable>' [ WITH ( <replac
   <title>Notes</title>
 
   <para>
-    <command>WAIT FOR</command> command waits till
-    <parameter>lsn</parameter> to be replayed on standby.
-    That is, after this command execution, the value returned by
-    <function>pg_last_wal_replay_lsn</function> should be greater or equal
-    to the <parameter>lsn</parameter> value.  This is useful to achieve
-    read-your-writes-consistency, while using async replica for reads and
-    primary for writes.  In that case, the <acronym>lsn</acronym> of the last
-    modification should be stored on the client application side or the
-    connection pooler side.
+   <command>WAIT FOR</command> waits until the specified
+   <parameter>lsn</parameter> is reached according to the specified
+   <parameter>mode</parameter>. The <literal>REPLAY</literal> mode waits
+   for the LSN to be replayed (applied to the database), which is useful
+   to achieve read-your-writes consistency while using an async replica
+   for reads and the primary for writes. The <literal>FLUSH</literal> mode
+   waits for the WAL to be flushed to durable storage on the replica,
+   providing a durability guarantee without waiting for replay. The
+   <literal>WRITE</literal> mode waits for the WAL to be written to the
+   operating system, which is faster than flush but provides weaker
+   durability guarantees. In all cases, the <acronym>LSN</acronym> of the
+   last modification should be stored on the client application side or
+   the connection pooler side.
   </para>
 
   <para>
-    <command>WAIT FOR</command> command should be called on standby.
-    If a user runs <command>WAIT FOR</command> on primary, it
-    will error out unless <parameter>NO_THROW</parameter> is specified in the WITH clause.
-    However, if <command>WAIT FOR</command> is
-    called on primary promoted from standby and <literal>lsn</literal>
-    was already replayed, then the <command>WAIT FOR</command> command just
-    exits immediately.
+   <command>WAIT FOR</command> should be called on a standby.
+   If a user runs <command>WAIT FOR</command> on the primary, it
+   will error out unless <parameter>NO_THROW</parameter> is specified
+   in the WITH clause. However, if <command>WAIT FOR</command> is
+   called on a primary promoted from standby and <literal>lsn</literal>
+   was already reached, then the <command>WAIT FOR</command> command
+   just exits immediately. If the replica is promoted while waiting,
+   the command will return <literal>not in recovery</literal> (or throw
+   an error if <literal>NO_THROW</literal> is not specified). Promotion
+   creates a new timeline, and the LSN being waited for may refer to
+   WAL from the old timeline.
   </para>
 
 </refsect1>
@@ -175,21 +240,21 @@ WAIT FOR LSN '<replaceable class="parameter">lsn</replaceable>' [ WITH ( <replac
   <title>Examples</title>
 
   <para>
-    You can use <command>WAIT FOR</command> command to wait for
-    the <type>pg_lsn</type> value.  For example, an application could update
-    the <literal>movie</literal> table and get the <acronym>lsn</acronym> after
-    changes just made.  This example uses <function>pg_current_wal_insert_lsn</function>
-    on primary server to get the <acronym>lsn</acronym> given that
-    <varname>synchronous_commit</varname> could be set to
-    <literal>off</literal>.
+   You can use <command>WAIT FOR</command> command to wait for
+   the <type>pg_lsn</type> value.  For example, an application could update
+   the <literal>movie</literal> table and get the <acronym>lsn</acronym> after
+   changes just made.  This example uses <function>pg_current_wal_insert_lsn</function>
+   on primary server to get the <acronym>lsn</acronym> given that
+   <varname>synchronous_commit</varname> could be set to
+   <literal>off</literal>.
 
    <programlisting>
 postgres=# UPDATE movie SET genre = 'Dramatic' WHERE genre = 'Drama';
 UPDATE 100
 postgres=# SELECT pg_current_wal_insert_lsn();
-pg_current_wal_insert_lsn
---------------------
-0/306EE20
+ pg_current_wal_insert_lsn
+---------------------------
+ 0/306EE20
 (1 row)
 </programlisting>
 
@@ -198,9 +263,9 @@ pg_current_wal_insert_lsn
    changes made on primary should be guaranteed to be visible on replica.
 
 <programlisting>
-postgres=# WAIT FOR LSN '0/306EE20';
+postgres=# WAIT FOR LSN '0/306EE20' MODE REPLAY;
  status
---------
+---------
  success
 (1 row)
 postgres=# SELECT * FROM movie WHERE genre = 'Drama';
@@ -211,21 +276,46 @@ postgres=# SELECT * FROM movie WHERE genre = 'Drama';
   </para>
 
   <para>
-    If the target LSN is not reached before the timeout, the error is thrown.
+   Wait for flush (data durable on replica):
 
 <programlisting>
-postgres=# WAIT FOR LSN '0/306EE20' WITH (TIMEOUT '0.1s');
+postgres=# WAIT FOR LSN '0/306EE20' MODE FLUSH;
+ status
+---------
+ success
+(1 row)
+</programlisting>
+  </para>
+
+  <para>
+   Wait for write with timeout:
+
+<programlisting>
+postgres=# WAIT FOR LSN '0/306EE20' MODE WRITE WITH (TIMEOUT '100ms', NO_THROW);
+ status
+---------
+ success
+(1 row)
+</programlisting>
+  </para>
+
+  <para>
+   If the target LSN is not reached before the timeout, an error is thrown:
+
+<programlisting>
+postgres=# WAIT FOR LSN '0/306EE20' MODE REPLAY WITH (TIMEOUT '0.1s');
 ERROR:  timed out while waiting for target LSN 0/306EE20 to be replayed; current replay LSN 0/306EA60
 </programlisting>
   </para>
 
   <para>
    The same example uses <command>WAIT FOR</command> with
-   <parameter>NO_THROW</parameter> option.
+   <parameter>NO_THROW</parameter> option:
+
 <programlisting>
-postgres=# WAIT FOR LSN '0/306EE20' WITH (TIMEOUT '100ms', NO_THROW);
+postgres=# WAIT FOR LSN '0/306EE20' MODE REPLAY WITH (TIMEOUT '100ms', NO_THROW);
  status
---------
+---------
  timeout
 (1 row)
 </programlisting>
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 4b145515269..5b2a262ff8e 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -6240,10 +6240,12 @@ StartupXLOG(void)
 	LWLockRelease(ControlFileLock);
 
 	/*
-	 * Wake up all waiters for replay LSN.  They need to report an error that
-	 * recovery was ended before reaching the target LSN.
+	 * Wake up all waiters.  They need to report an error that recovery was
+	 * ended before reaching the target LSN.
 	 */
 	WaitLSNWakeup(WAIT_LSN_TYPE_REPLAY_STANDBY, InvalidXLogRecPtr);
+	WaitLSNWakeup(WAIT_LSN_TYPE_WRITE_STANDBY, InvalidXLogRecPtr);
+	WaitLSNWakeup(WAIT_LSN_TYPE_FLUSH_STANDBY, InvalidXLogRecPtr);
 
 	/*
 	 * Shutdown the recovery environment.  This must occur after
diff --git a/src/backend/commands/wait.c b/src/backend/commands/wait.c
index 43b37095afb..05ad84fdb5b 100644
--- a/src/backend/commands/wait.c
+++ b/src/backend/commands/wait.c
@@ -2,7 +2,7 @@
  *
  * wait.c
  *	  Implements WAIT FOR, which allows waiting for events such as
- *	  time passing or LSN having been replayed on replica.
+ *	  time passing or LSN having been replayed, flushed, or written.
  *
  * Portions Copyright (c) 2025, PostgreSQL Global Development Group
  *
@@ -15,6 +15,7 @@
 
 #include <math.h>
 
+#include "access/xlog.h"
 #include "access/xlogrecovery.h"
 #include "access/xlogwait.h"
 #include "commands/defrem.h"
@@ -28,12 +29,28 @@
 #include "utils/snapmgr.h"
 
 
+/*
+ * Type descriptor for WAIT FOR LSN wait types, used for error messages.
+ */
+typedef struct WaitLSNTypeDesc
+{
+	const char *noun;			/* "replay", "flush", "write" */
+	const char *verb;			/* "replayed", "flushed", "written" */
+}			WaitLSNTypeDesc;
+
+static const WaitLSNTypeDesc WaitLSNTypeDescs[] = {
+	[WAIT_LSN_TYPE_REPLAY_STANDBY] = {"replay", "replayed"},
+	[WAIT_LSN_TYPE_WRITE_STANDBY] = {"write", "written"},
+	[WAIT_LSN_TYPE_FLUSH_STANDBY] = {"flush", "flushed"},
+};
+
 void
 ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, DestReceiver *dest)
 {
 	XLogRecPtr	lsn;
 	int64		timeout = 0;
 	WaitLSNResult waitLSNResult;
+	WaitLSNType lsnType;
 	bool		throw = true;
 	TupleDesc	tupdesc;
 	TupOutputState *tstate;
@@ -45,6 +62,22 @@ ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, DestReceiver *dest)
 	lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in,
 										  CStringGetDatum(stmt->lsn_literal)));
 
+	/* Convert parse-time WaitLSNMode to runtime WaitLSNType */
+	switch (stmt->mode)
+	{
+		case WAIT_LSN_MODE_REPLAY:
+			lsnType = WAIT_LSN_TYPE_REPLAY_STANDBY;
+			break;
+		case WAIT_LSN_MODE_WRITE:
+			lsnType = WAIT_LSN_TYPE_WRITE_STANDBY;
+			break;
+		case WAIT_LSN_MODE_FLUSH:
+			lsnType = WAIT_LSN_TYPE_FLUSH_STANDBY;
+			break;
+		default:
+			elog(ERROR, "unrecognized wait mode: %d", stmt->mode);
+	}
+
 	foreach_node(DefElem, defel, stmt->options)
 	{
 		if (strcmp(defel->defname, "timeout") == 0)
@@ -107,8 +140,8 @@ ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, DestReceiver *dest)
 	}
 
 	/*
-	 * We are going to wait for the LSN replay.  We should first care that we
-	 * don't hold a snapshot and correspondingly our MyProc->xmin is invalid.
+	 * We are going to wait for the LSN.  We should first care that we don't
+	 * hold a snapshot and correspondingly our MyProc->xmin is invalid.
 	 * Otherwise, our snapshot could prevent the replay of WAL records
 	 * implying a kind of self-deadlock.  This is the reason why WAIT FOR is a
 	 * command, not a procedure or function.
@@ -140,7 +173,7 @@ ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, DestReceiver *dest)
 	 */
 	Assert(MyProc->xmin == InvalidTransactionId);
 
-	waitLSNResult = WaitForLSN(WAIT_LSN_TYPE_REPLAY_STANDBY, lsn, timeout);
+	waitLSNResult = WaitForLSN(lsnType, lsn, timeout);
 
 	/*
 	 * Process the result of WaitForLSN().  Throw appropriate error if needed.
@@ -154,11 +187,18 @@ ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, DestReceiver *dest)
 
 		case WAIT_LSN_RESULT_TIMEOUT:
 			if (throw)
+			{
+				const		WaitLSNTypeDesc *desc = &WaitLSNTypeDescs[lsnType];
+				XLogRecPtr	currentLSN = GetCurrentLSNForWaitType(lsnType);
+
 				ereport(ERROR,
 						errcode(ERRCODE_QUERY_CANCELED),
-						errmsg("timed out while waiting for target LSN %X/%08X to be replayed; current replay LSN %X/%08X",
+						errmsg("timed out while waiting for target LSN %X/%08X to be %s; current %s LSN %X/%08X",
 							   LSN_FORMAT_ARGS(lsn),
-							   LSN_FORMAT_ARGS(GetXLogReplayRecPtr(NULL))));
+							   desc->verb,
+							   desc->noun,
+							   LSN_FORMAT_ARGS(currentLSN)));
+			}
 			else
 				result = "timeout";
 			break;
@@ -166,20 +206,26 @@ ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, DestReceiver *dest)
 		case WAIT_LSN_RESULT_NOT_IN_RECOVERY:
 			if (throw)
 			{
+				const		WaitLSNTypeDesc *desc = &WaitLSNTypeDescs[lsnType];
+				XLogRecPtr	currentLSN = GetCurrentLSNForWaitType(lsnType);
+
 				if (PromoteIsTriggered())
 				{
 					ereport(ERROR,
 							errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 							errmsg("recovery is not in progress"),
-							errdetail("Recovery ended before replaying target LSN %X/%08X; last replay LSN %X/%08X.",
+							errdetail("Recovery ended before target LSN %X/%08X was %s; last %s LSN %X/%08X.",
 									  LSN_FORMAT_ARGS(lsn),
-									  LSN_FORMAT_ARGS(GetXLogReplayRecPtr(NULL))));
+									  desc->verb,
+									  desc->noun,
+									  LSN_FORMAT_ARGS(currentLSN)));
 				}
 				else
 					ereport(ERROR,
 							errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 							errmsg("recovery is not in progress"),
-							errhint("Waiting for the replay LSN can only be executed during recovery."));
+							errhint("Waiting for the %s LSN can only be executed during recovery.",
+									desc->noun));
 			}
 			else
 				result = "not in recovery";
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index c3a0a354a9c..57b3e91893c 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -640,6 +640,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 %type <windef>	window_definition over_clause window_specification
 				opt_frame_clause frame_extent frame_bound
 %type <ival>	null_treatment opt_window_exclusion_clause
+%type <ival>	opt_wait_lsn_mode
 %type <str>		opt_existing_window_name
 %type <boolean> opt_if_not_exists
 %type <boolean> opt_unique_null_treatment
@@ -729,7 +730,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 	ESCAPE EVENT EXCEPT EXCLUDE EXCLUDING EXCLUSIVE EXECUTE EXISTS EXPLAIN
 	EXPRESSION EXTENSION EXTERNAL EXTRACT
 
-	FALSE_P FAMILY FETCH FILTER FINALIZE FIRST_P FLOAT_P FOLLOWING FOR
+	FALSE_P FAMILY FETCH FILTER FINALIZE FIRST_P FLOAT_P FLUSH FOLLOWING FOR
 	FORCE FOREIGN FORMAT FORWARD FREEZE FROM FULL FUNCTION FUNCTIONS
 
 	GENERATED GLOBAL GRANT GRANTED GREATEST GROUP_P GROUPING GROUPS
@@ -770,7 +771,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 	QUOTE QUOTES
 
 	RANGE READ REAL REASSIGN RECURSIVE REF_P REFERENCES REFERENCING
-	REFRESH REINDEX RELATIVE_P RELEASE RENAME REPEATABLE REPLACE REPLICA
+	REFRESH REINDEX RELATIVE_P RELEASE RENAME REPEATABLE REPLACE REPLAY REPLICA
 	RESET RESPECT_P RESTART RESTRICT RETURN RETURNING RETURNS REVOKE RIGHT ROLE ROLLBACK ROLLUP
 	ROUTINE ROUTINES ROW ROWS RULE
 
@@ -16489,15 +16490,23 @@ xml_passing_mech:
  *****************************************************************************/
 
 WaitStmt:
-			WAIT FOR LSN_P Sconst opt_wait_with_clause
+			WAIT FOR LSN_P Sconst opt_wait_lsn_mode opt_wait_with_clause
 				{
 					WaitStmt *n = makeNode(WaitStmt);
 					n->lsn_literal = $4;
-					n->options = $5;
+					n->mode = $5;
+					n->options = $6;
 					$$ = (Node *) n;
 				}
 			;
 
+opt_wait_lsn_mode:
+			MODE REPLAY			{ $$ = WAIT_LSN_MODE_REPLAY; }
+			| MODE FLUSH		{ $$ = WAIT_LSN_MODE_FLUSH; }
+			| MODE WRITE		{ $$ = WAIT_LSN_MODE_WRITE; }
+			| /*EMPTY*/			{ $$ = WAIT_LSN_MODE_REPLAY; }
+			;
+
 opt_wait_with_clause:
 			WITH '(' utility_option_list ')'		{ $$ = $3; }
 			| /*EMPTY*/								{ $$ = NIL; }
@@ -17937,6 +17946,7 @@ unreserved_keyword:
 			| FILTER
 			| FINALIZE
 			| FIRST_P
+			| FLUSH
 			| FOLLOWING
 			| FORCE
 			| FORMAT
@@ -18071,6 +18081,7 @@ unreserved_keyword:
 			| RENAME
 			| REPEATABLE
 			| REPLACE
+			| REPLAY
 			| REPLICA
 			| RESET
 			| RESPECT_P
@@ -18524,6 +18535,7 @@ bare_label_keyword:
 			| FINALIZE
 			| FIRST_P
 			| FLOAT_P
+			| FLUSH
 			| FOLLOWING
 			| FORCE
 			| FOREIGN
@@ -18706,6 +18718,7 @@ bare_label_keyword:
 			| RENAME
 			| REPEATABLE
 			| REPLACE
+			| REPLAY
 			| REPLICA
 			| RESET
 			| RESTART
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 4217fc54e2e..be2971408e7 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -57,6 +57,7 @@
 #include "access/xlog_internal.h"
 #include "access/xlogarchive.h"
 #include "access/xlogrecovery.h"
+#include "access/xlogwait.h"
 #include "catalog/pg_authid.h"
 #include "funcapi.h"
 #include "libpq/pqformat.h"
@@ -965,6 +966,15 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli)
 	/* Update shared-memory status */
 	pg_atomic_write_u64(&WalRcv->writtenUpto, LogstreamResult.Write);
 
+	/*
+	 * If we wrote an LSN that someone was waiting for then walk over the
+	 * shared memory array and set latches to notify the waiters.
+	 */
+	if (waitLSNState &&
+		(LogstreamResult.Write >=
+		 pg_atomic_read_u64(&waitLSNState->minWaitedLSN[WAIT_LSN_TYPE_WRITE_STANDBY])))
+		WaitLSNWakeup(WAIT_LSN_TYPE_WRITE_STANDBY, LogstreamResult.Write);
+
 	/*
 	 * Close the current segment if it's fully written up in the last cycle of
 	 * the loop, to create its archive notification file soon. Otherwise WAL
@@ -1004,6 +1014,15 @@ XLogWalRcvFlush(bool dying, TimeLineID tli)
 		}
 		SpinLockRelease(&walrcv->mutex);
 
+		/*
+		 * If we flushed an LSN that someone was waiting for then walk over
+		 * the shared memory array and set latches to notify the waiters.
+		 */
+		if (waitLSNState &&
+			(LogstreamResult.Flush >=
+			 pg_atomic_read_u64(&waitLSNState->minWaitedLSN[WAIT_LSN_TYPE_FLUSH_STANDBY])))
+			WaitLSNWakeup(WAIT_LSN_TYPE_FLUSH_STANDBY, LogstreamResult.Flush);
+
 		/* Signal the startup process and walsender that new WAL has arrived */
 		WakeupRecovery();
 		if (AllowCascadeReplication())
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index d14294a4ece..bbaf3242ccb 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -4385,10 +4385,21 @@ typedef struct DropSubscriptionStmt
 	DropBehavior behavior;		/* RESTRICT or CASCADE behavior */
 } DropSubscriptionStmt;
 
+/*
+ * WaitLSNMode - MODE parameter for WAIT FOR command
+ */
+typedef enum WaitLSNMode
+{
+	WAIT_LSN_MODE_REPLAY,		/* Wait for LSN replay on standby */
+	WAIT_LSN_MODE_WRITE,		/* Wait for LSN write on standby */
+	WAIT_LSN_MODE_FLUSH			/* Wait for LSN flush on standby */
+}			WaitLSNMode;
+
 typedef struct WaitStmt
 {
 	NodeTag		type;
 	char	   *lsn_literal;	/* LSN string from grammar */
+	WaitLSNMode mode;			/* Wait mode: REPLAY/FLUSH/WRITE */
 	List	   *options;		/* List of DefElem nodes */
 } WaitStmt;
 
diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h
index 5d4fe27ef96..7ad8b11b725 100644
--- a/src/include/parser/kwlist.h
+++ b/src/include/parser/kwlist.h
@@ -176,6 +176,7 @@ PG_KEYWORD("filter", FILTER, UNRESERVED_KEYWORD, AS_LABEL)
 PG_KEYWORD("finalize", FINALIZE, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("first", FIRST_P, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("float", FLOAT_P, COL_NAME_KEYWORD, BARE_LABEL)
+PG_KEYWORD("flush", FLUSH, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("following", FOLLOWING, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("for", FOR, RESERVED_KEYWORD, AS_LABEL)
 PG_KEYWORD("force", FORCE, UNRESERVED_KEYWORD, BARE_LABEL)
@@ -378,6 +379,7 @@ PG_KEYWORD("release", RELEASE, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("rename", RENAME, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("repeatable", REPEATABLE, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("replace", REPLACE, UNRESERVED_KEYWORD, BARE_LABEL)
+PG_KEYWORD("replay", REPLAY, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("replica", REPLICA, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("reset", RESET, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("respect", RESPECT_P, UNRESERVED_KEYWORD, AS_LABEL)
diff --git a/src/test/recovery/t/049_wait_for_lsn.pl b/src/test/recovery/t/049_wait_for_lsn.pl
index e0ddb06a2f0..6c9a463775b 100644
--- a/src/test/recovery/t/049_wait_for_lsn.pl
+++ b/src/test/recovery/t/049_wait_for_lsn.pl
@@ -1,4 +1,4 @@
-# Checks waiting for the LSN replay on standby using
+# Checks waiting for the LSN replay/write/flush on standby using
 # the WAIT FOR command.
 use strict;
 use warnings FATAL => 'all';
@@ -62,7 +62,34 @@ $output = $node_standby->safe_psql(
 ok((split("\n", $output))[-1] eq 30,
 	"standby reached the same LSN as primary");
 
-# 3. Check that waiting for unreachable LSN triggers the timeout.  The
+# 3. Check that WAIT FOR works with WRITE and FLUSH modes.
+$node_primary->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(31, 40))");
+my $lsn_write =
+  $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()");
+$output = $node_standby->safe_psql(
+	'postgres', qq[
+	WAIT FOR LSN '${lsn_write}' MODE WRITE WITH (timeout '1d');
+	SELECT pg_lsn_cmp(pg_last_wal_write_lsn(), '${lsn_write}'::pg_lsn);
+]);
+
+ok((split("\n", $output))[-1] >= 0,
+	"standby wrote WAL up to target LSN after WAIT FOR MODE WRITE");
+
+$node_primary->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(41, 50))");
+my $lsn_flush =
+  $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()");
+$output = $node_standby->safe_psql(
+	'postgres', qq[
+	WAIT FOR LSN '${lsn_flush}' MODE FLUSH WITH (timeout '1d');
+	SELECT pg_lsn_cmp(pg_last_wal_receive_lsn(), '${lsn_flush}'::pg_lsn);
+]);
+
+ok((split("\n", $output))[-1] >= 0,
+	"standby flushed WAL up to target LSN after WAIT FOR MODE FLUSH");
+
+# 4. Check that waiting for unreachable LSN triggers the timeout.  The
 # unreachable LSN must be well in advance.  So WAL records issued by
 # the concurrent autovacuum could not affect that.
 my $lsn3 =
@@ -88,7 +115,7 @@ $output = $node_standby->safe_psql(
 	WAIT FOR LSN '${lsn3}' WITH (timeout '10ms', no_throw);]);
 ok($output eq "timeout", "WAIT FOR returns correct status after timeout");
 
-# 4. Check that WAIT FOR triggers an error if called on primary,
+# 5. Check that WAIT FOR triggers an error if called on primary,
 # within another function, or inside a transaction with an isolation level
 # higher than READ COMMITTED.
 
@@ -125,7 +152,7 @@ ok( $stderr =~
 	  /WAIT FOR must be only called without an active or registered snapshot/,
 	"get an error when running within another function");
 
-# 5. Check parameter validation error cases on standby before promotion
+# 6. Check parameter validation error cases on standby before promotion
 my $test_lsn =
   $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()");
 
@@ -208,7 +235,7 @@ $node_standby->psql(
 ok( $stderr =~ /option "invalid_option" not recognized/,
 	"get error for invalid WITH clause option");
 
-# 6. Check the scenario of multiple LSN waiters.  We make 5 background
+# 7a. Check the scenario of multiple REPLAY waiters.  We make 5 background
 # psql sessions each waiting for a corresponding insertion.  When waiting is
 # finished, stored procedures logs if there are visible as many rows as
 # should be.
@@ -239,7 +266,7 @@ for (my $i = 0; $i < 5; $i++)
 	$psql_sessions[$i]->query_until(
 		qr/start/, qq[
 		\\echo start
-		WAIT FOR LSN '${lsn}';
+		WAIT FOR LSN '${lsn}' MODE REPLAY;
 		SELECT log_count(${i});
 	]);
 }
@@ -251,23 +278,239 @@ for (my $i = 0; $i < 5; $i++)
 	$psql_sessions[$i]->quit;
 }
 
-ok(1, 'multiple LSN waiters reported consistent data');
+ok(1, 'multiple REPLAY waiters reported consistent data');
+
+# 7b. Check the scenario of multiple WRITE waiters.
+# Stop walreceiver to ensure waiters actually block.
+my $orig_conninfo = $node_standby->safe_psql('postgres',
+	"SELECT setting FROM pg_settings WHERE name = 'primary_conninfo'");
+$node_standby->safe_psql(
+	'postgres', qq[
+	ALTER SYSTEM SET primary_conninfo = '';
+	SELECT pg_reload_conf();
+]);
+$node_standby->poll_query_until('postgres',
+	"SELECT NOT EXISTS (SELECT * FROM pg_stat_wal_receiver);");
+
+# Generate WAL on primary (standby won't receive it yet)
+my @write_lsns;
+for (my $i = 0; $i < 3; $i++)
+{
+	$node_primary->safe_psql('postgres',
+		"INSERT INTO wait_test VALUES (100 + ${i});");
+	$write_lsns[$i] =
+	  $node_primary->safe_psql('postgres',
+		"SELECT pg_current_wal_insert_lsn()");
+}
+
+# Start WRITE waiters (they will block since walreceiver is stopped)
+my @write_sessions;
+for (my $i = 0; $i < 3; $i++)
+{
+	$write_sessions[$i] = $node_standby->background_psql('postgres');
+	$write_sessions[$i]->query_until(
+		qr/start/, qq[
+		\\echo start
+		WAIT FOR LSN '$write_lsns[$i]' MODE WRITE WITH (timeout '1d');
+		DO \$\$ BEGIN RAISE LOG 'write_done %', $i; END \$\$;
+	]);
+}
+
+# Verify waiters are blocked
+$node_standby->poll_query_until('postgres',
+	"SELECT count(*) = 3 FROM pg_stat_activity WHERE wait_event = 'WaitForWalWrite'"
+);
+
+# Restore walreceiver to unblock waiters
+my $write_log_offset = -s $node_standby->logfile;
+$node_standby->safe_psql(
+	'postgres', qq[
+	ALTER SYSTEM SET primary_conninfo = '$orig_conninfo';
+	SELECT pg_reload_conf();
+]);
+$node_standby->poll_query_until('postgres',
+	"SELECT EXISTS (SELECT * FROM pg_stat_wal_receiver);");
+
+# Wait for all waiters to complete and close sessions
+for (my $i = 0; $i < 3; $i++)
+{
+	$node_standby->wait_for_log("write_done $i", $write_log_offset);
+	$write_sessions[$i]->quit;
+}
+
+# Verify on standby that WAL was written up to the target LSN
+$output = $node_standby->safe_psql('postgres',
+	"SELECT pg_lsn_cmp(pg_last_wal_write_lsn(), '$write_lsns[2]'::pg_lsn);");
+
+ok($output >= 0,
+	"multiple WRITE waiters: standby wrote WAL up to target LSN");
+
+# 7c. Check the scenario of multiple FLUSH waiters.
+# Stop walreceiver to ensure waiters actually block.
+$node_standby->safe_psql(
+	'postgres', qq[
+	ALTER SYSTEM SET primary_conninfo = '';
+	SELECT pg_reload_conf();
+]);
+$node_standby->poll_query_until('postgres',
+	"SELECT NOT EXISTS (SELECT * FROM pg_stat_wal_receiver);");
+
+# Generate WAL on primary (standby won't receive it yet)
+my @flush_lsns;
+for (my $i = 0; $i < 3; $i++)
+{
+	$node_primary->safe_psql('postgres',
+		"INSERT INTO wait_test VALUES (200 + ${i});");
+	$flush_lsns[$i] =
+	  $node_primary->safe_psql('postgres',
+		"SELECT pg_current_wal_insert_lsn()");
+}
+
+# Start FLUSH waiters (they will block since walreceiver is stopped)
+my @flush_sessions;
+for (my $i = 0; $i < 3; $i++)
+{
+	$flush_sessions[$i] = $node_standby->background_psql('postgres');
+	$flush_sessions[$i]->query_until(
+		qr/start/, qq[
+		\\echo start
+		WAIT FOR LSN '$flush_lsns[$i]' MODE FLUSH WITH (timeout '1d');
+		DO \$\$ BEGIN RAISE LOG 'flush_done %', $i; END \$\$;
+	]);
+}
+
+# Verify waiters are blocked
+$node_standby->poll_query_until('postgres',
+	"SELECT count(*) = 3 FROM pg_stat_activity WHERE wait_event = 'WaitForWalFlush'"
+);
+
+# Restore walreceiver to unblock waiters
+my $flush_log_offset = -s $node_standby->logfile;
+$node_standby->safe_psql(
+	'postgres', qq[
+	ALTER SYSTEM SET primary_conninfo = '$orig_conninfo';
+	SELECT pg_reload_conf();
+]);
+$node_standby->poll_query_until('postgres',
+	"SELECT EXISTS (SELECT * FROM pg_stat_wal_receiver);");
+
+# Wait for all waiters to complete and close sessions
+for (my $i = 0; $i < 3; $i++)
+{
+	$node_standby->wait_for_log("flush_done $i", $flush_log_offset);
+	$flush_sessions[$i]->quit;
+}
+
+# Verify on standby that WAL was flushed up to the target LSN
+$output = $node_standby->safe_psql('postgres',
+	"SELECT pg_lsn_cmp(pg_last_wal_receive_lsn(), '$flush_lsns[2]'::pg_lsn);"
+);
+
+ok($output >= 0,
+	"multiple FLUSH waiters: standby flushed WAL up to target LSN");
+
+# 7d. Check the scenario of mixed mode waiters (REPLAY, WRITE, FLUSH)
+# running concurrently.  We start 6 sessions: 2 for each mode, all waiting
+# for the same target LSN.  We stop the walreceiver and pause replay to
+# ensure all waiters block.  Then we resume replay and restart the
+# walreceiver to verify they unblock and complete correctly.
+
+# Stop walreceiver first to ensure we can control the flow without hanging
+# (stopping it after pausing replay can hang if the startup process is paused).
+my $orig_conninfo_7d = $node_standby->safe_psql('postgres',
+	"SELECT setting FROM pg_settings WHERE name = 'primary_conninfo'");
+$node_standby->safe_psql(
+	'postgres', qq[
+	ALTER SYSTEM SET primary_conninfo = '';
+	SELECT pg_reload_conf();
+]);
+$node_standby->poll_query_until('postgres',
+	"SELECT NOT EXISTS (SELECT * FROM pg_stat_wal_receiver);");
+
+# Pause replay
+$node_standby->safe_psql('postgres', "SELECT pg_wal_replay_pause();");
+
+# Generate WAL on primary
+$node_primary->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(301, 310));");
+my $mixed_target_lsn =
+  $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()");
+
+# Start 6 waiters: 2 for each mode
+my @mixed_sessions;
+my @mixed_modes = ('REPLAY', 'WRITE', 'FLUSH');
+for (my $i = 0; $i < 6; $i++)
+{
+	$mixed_sessions[$i] = $node_standby->background_psql('postgres');
+	$mixed_sessions[$i]->query_until(
+		qr/start/, qq[
+		\\echo start
+		WAIT FOR LSN '${mixed_target_lsn}' MODE $mixed_modes[$i % 3] WITH (timeout '1d');
+		DO \$\$ BEGIN RAISE LOG 'mixed_done %', $i; END \$\$;
+	]);
+}
+
+# Verify all waiters are blocked
+$node_standby->poll_query_until('postgres',
+	"SELECT count(*) = 6 FROM pg_stat_activity WHERE wait_event LIKE 'WaitForWal%'"
+);
+
+# Resume replay (waiters should still be blocked as no WAL has arrived)
+my $mixed_log_offset = -s $node_standby->logfile;
+$node_standby->safe_psql('postgres', "SELECT pg_wal_replay_resume();");
+$node_standby->poll_query_until('postgres',
+	"SELECT NOT pg_is_wal_replay_paused();");
+
+# Restore walreceiver to allow WAL to arrive
+$node_standby->safe_psql(
+	'postgres', qq[
+	ALTER SYSTEM SET primary_conninfo = '$orig_conninfo_7d';
+	SELECT pg_reload_conf();
+]);
+$node_standby->poll_query_until('postgres',
+	"SELECT EXISTS (SELECT * FROM pg_stat_wal_receiver);");
+
+# Wait for all sessions to complete and close them
+for (my $i = 0; $i < 6; $i++)
+{
+	$node_standby->wait_for_log("mixed_done $i", $mixed_log_offset);
+	$mixed_sessions[$i]->quit;
+}
 
-# 7. Check that the standby promotion terminates the wait on LSN.  Start
-# waiting for an unreachable LSN then promote.  Check the log for the relevant
-# error message.  Also, check that waiting for already replayed LSN doesn't
-# cause an error even after promotion.
+# Verify all modes reached the target LSN
+$output = $node_standby->safe_psql(
+	'postgres', qq[
+	SELECT pg_lsn_cmp(pg_last_wal_write_lsn(), '${mixed_target_lsn}'::pg_lsn) >= 0 AND
+	       pg_lsn_cmp(pg_last_wal_receive_lsn(), '${mixed_target_lsn}'::pg_lsn) >= 0 AND
+	       pg_lsn_cmp(pg_last_wal_replay_lsn(), '${mixed_target_lsn}'::pg_lsn) >= 0;
+]);
+
+ok($output eq 't',
+	"mixed mode waiters: all modes completed and reached target LSN");
+
+# 8. Check that the standby promotion terminates all wait modes.  Start
+# waiting for unreachable LSNs with REPLAY, WRITE, and FLUSH modes, then
+# promote.  Check the log for the relevant error messages.  Also, check that
+# waiting for already replayed LSN doesn't cause an error even after promotion.
 my $lsn4 =
   $node_primary->safe_psql('postgres',
 	"SELECT pg_current_wal_insert_lsn() + 10000000000");
+
 my $lsn5 =
   $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()");
-my $psql_session = $node_standby->background_psql('postgres');
-$psql_session->query_until(
-	qr/start/, qq[
-	\\echo start
-	WAIT FOR LSN '${lsn4}';
-]);
+
+# Start background sessions waiting for unreachable LSN with all modes
+my @wait_modes = ('REPLAY', 'WRITE', 'FLUSH');
+my @wait_sessions;
+for (my $i = 0; $i < 3; $i++)
+{
+	$wait_sessions[$i] = $node_standby->background_psql('postgres');
+	$wait_sessions[$i]->query_until(
+		qr/start/, qq[
+		\\echo start
+		WAIT FOR LSN '${lsn4}' MODE $wait_modes[$i];
+	]);
+}
 
 # Make sure standby will be promoted at least at the primary insert LSN we
 # have just observed.  Use pg_switch_wal() to force the insert LSN to be
@@ -277,17 +520,24 @@ $node_primary->wait_for_catchup($node_standby);
 
 $log_offset = -s $node_standby->logfile;
 $node_standby->promote;
-$node_standby->wait_for_log('recovery is not in progress', $log_offset);
 
-ok(1, 'got error after standby promote');
+# Wait for all three sessions to get the error (each mode has distinct message)
+$node_standby->wait_for_log(qr/Recovery ended before target LSN.*was written/,
+	$log_offset);
+$node_standby->wait_for_log(qr/Recovery ended before target LSN.*was flushed/,
+	$log_offset);
+$node_standby->wait_for_log(
+	qr/Recovery ended before target LSN.*was replayed/, $log_offset);
 
-$node_standby->safe_psql('postgres', "WAIT FOR LSN '${lsn5}';");
+ok(1, 'promotion interrupted all wait modes');
+
+$node_standby->safe_psql('postgres', "WAIT FOR LSN '${lsn5}' MODE REPLAY;");
 
 ok(1, 'wait for already replayed LSN exits immediately even after promotion');
 
 $output = $node_standby->safe_psql(
 	'postgres', qq[
-	WAIT FOR LSN '${lsn4}' WITH (timeout '10ms', no_throw);]);
+	WAIT FOR LSN '${lsn4}' MODE REPLAY WITH (timeout '10ms', no_throw);]);
 ok($output eq "not in recovery",
 	"WAIT FOR returns correct status after standby promotion");
 
@@ -295,8 +545,11 @@ ok($output eq "not in recovery",
 $node_standby->stop;
 $node_primary->stop;
 
-# If we send \q with $psql_session->quit the command can be sent to the session
+# If we send \q with $session->quit the command can be sent to the session
 # already closed. So \q is in initial script, here we only finish IPC::Run.
-$psql_session->{run}->finish;
+for (my $i = 0; $i < 3; $i++)
+{
+	$wait_sessions[$i]->{run}->finish;
+}
 
 done_testing();
-- 
2.51.0

