Intro
==========
The main purpose of the feature is to achieve
read-your-writes-consistency, while using async replica for reads and
primary for writes. In that case lsn of last modification is stored
inside application. We cannot store this lsn inside database, since
reads are distributed across all replicas and primary.


Two implementations of one feature
==========
We left two different solutions. Help me please to choose the best.


1) Classic (wait_classic_v7.patch)
https://www.postgresql.org/message-id/3cc883048264c2e9af022033925ff8db%40postgrespro.ru
Synopsis
==========
advantages: multiple wait events, separate WAIT FOR statement
disadvantages: new words in grammar



WAIT FOR  [ANY | ALL] event [, ...]
BEGIN [ WORK | TRANSACTION ] [ transaction_mode [, ...] ]
    [ WAIT FOR [ANY | ALL] event [, ...]]
event:
LSN value
TIMEOUT number_of_milliseconds
timestamp



2) After style: Kyotaro and Freund (wait_after_within_v6.patch)
https://www.postgresql.org/message-id/d3ff2e363af60b345f82396992595a03%40postgrespro.ru
Synopsis
==========
advantages: no new words in grammar
disadvantages: a little harder to understand, fewer events to wait



AFTER lsn_event [ WITHIN delay_milliseconds ] [, ...]
BEGIN [ WORK | TRANSACTION ] [ transaction_mode [, ...] ]
    [ AFTER lsn_event [ WITHIN delay_milliseconds ]]
START [ WORK | TRANSACTION ] [ transaction_mode [, ...] ]
    [ AFTER lsn_event [ WITHIN delay_milliseconds ]]


Examples
==========

primary         standby
-------         --------
            postgresql.conf
            recovery_min_apply_delay = 10s

CREATE TABLE tbl AS SELECT generate_series(1,10) AS a;
INSERT INTO tbl VALUES (generate_series(11, 20));
SELECT pg_current_wal_lsn();

            BEGIN WAIT FOR LSN '0/3002AE8';
            SELECT * FROM tbl; // read fresh insertions
            COMMIT;

Rebased and ready for review.

--
Ivan Kartyshov
Postgres Professional: www.postgrespro.com
diff --git a/doc/src/sgml/ref/allfiles.sgml b/doc/src/sgml/ref/allfiles.sgml
index 4a42999b18..657a217e27 100644
--- a/doc/src/sgml/ref/allfiles.sgml
+++ b/doc/src/sgml/ref/allfiles.sgml
@@ -188,6 +188,7 @@ Complete list of usable sgml source files in this directory.
 <!ENTITY update             SYSTEM "update.sgml">
 <!ENTITY vacuum             SYSTEM "vacuum.sgml">
 <!ENTITY values             SYSTEM "values.sgml">
+<!ENTITY wait               SYSTEM "wait.sgml">
 
 <!-- applications and utilities -->
 <!ENTITY clusterdb          SYSTEM "clusterdb.sgml">
diff --git a/doc/src/sgml/ref/begin.sgml b/doc/src/sgml/ref/begin.sgml
index 016b021487..b3af16c09f 100644
--- a/doc/src/sgml/ref/begin.sgml
+++ b/doc/src/sgml/ref/begin.sgml
@@ -21,13 +21,21 @@ PostgreSQL documentation
 
  <refsynopsisdiv>
 <synopsis>
-BEGIN [ WORK | TRANSACTION ] [ <replaceable class="parameter">transaction_mode</replaceable> [, ...] ]
+BEGIN [ WORK | TRANSACTION ] [ <replaceable class="parameter">transaction_mode</replaceable> [, ...] ] <replaceable class="parameter">wait_for_event</replaceable>
 
 <phrase>where <replaceable class="parameter">transaction_mode</replaceable> is one of:</phrase>
 
     ISOLATION LEVEL { SERIALIZABLE | REPEATABLE READ | READ COMMITTED | READ UNCOMMITTED }
     READ WRITE | READ ONLY
     [ NOT ] DEFERRABLE
+
+<phrase>where <replaceable class="parameter">wait_for_event</replaceable> is:</phrase>
+    WAIT FOR [ANY | ALL] <replaceable class="parameter">event</replaceable> [, ...]
+
+<phrase>and <replaceable class="parameter">event</replaceable> is one of:</phrase>
+    LSN lsn_value
+    TIMEOUT number_of_milliseconds
+    timestamp
 </synopsis>
  </refsynopsisdiv>
 
diff --git a/doc/src/sgml/ref/start_transaction.sgml b/doc/src/sgml/ref/start_transaction.sgml
index 74ccd7e345..1b54ed2084 100644
--- a/doc/src/sgml/ref/start_transaction.sgml
+++ b/doc/src/sgml/ref/start_transaction.sgml
@@ -21,13 +21,21 @@ PostgreSQL documentation
 
  <refsynopsisdiv>
 <synopsis>
-START TRANSACTION [ <replaceable class="parameter">transaction_mode</replaceable> [, ...] ]
+START TRANSACTION [ <replaceable class="parameter">transaction_mode</replaceable> [, ...] ] <replaceable class="parameter">wait_for_event</replaceable>
 
 <phrase>where <replaceable class="parameter">transaction_mode</replaceable> is one of:</phrase>
 
     ISOLATION LEVEL { SERIALIZABLE | REPEATABLE READ | READ COMMITTED | READ UNCOMMITTED }
     READ WRITE | READ ONLY
     [ NOT ] DEFERRABLE
+
+<phrase>where <replaceable class="parameter">wait_for_event</replaceable> is:</phrase>
+    WAIT FOR [ANY | ALL] <replaceable class="parameter">event</replaceable> [, ...]
+
+<phrase>and <replaceable class="parameter">event</replaceable> is one of:</phrase>
+    LSN lsn_value
+    TIMEOUT number_of_milliseconds
+    timestamp
 </synopsis>
  </refsynopsisdiv>
 
diff --git a/doc/src/sgml/ref/wait.sgml b/doc/src/sgml/ref/wait.sgml
new file mode 100644
index 0000000000..26cae3ad85
--- /dev/null
+++ b/doc/src/sgml/ref/wait.sgml
@@ -0,0 +1,146 @@
+<!--
+doc/src/sgml/ref/wait.sgml
+PostgreSQL documentation
+-->
+
+<refentry id="sql-wait">
+ <indexterm zone="sql-wait">
+  <primary>WAIT FOR</primary>
+ </indexterm>
+
+ <refmeta>
+  <refentrytitle>WAIT FOR</refentrytitle>
+  <manvolnum>7</manvolnum>
+  <refmiscinfo>SQL - Language Statements</refmiscinfo>
+ </refmeta>
+
+ <refnamediv>
+  <refname>WAIT FOR</refname>
+  <refpurpose>wait for the target <acronym>LSN</acronym> to be replayed or for specified time to pass</refpurpose>
+ </refnamediv>
+
+ <refsynopsisdiv>
+<synopsis>
+WAIT FOR [ANY | ALL] <replaceable class="parameter">event</replaceable> [, ...]
+
+<phrase>where <replaceable class="parameter">event</replaceable> is one of:</phrase>
+    LSN value
+    TIMEOUT number_of_milliseconds
+    timestamp
+
+WAIT FOR LSN '<replaceable class="parameter">lsn_number</replaceable>'
+WAIT FOR LSN '<replaceable class="parameter">lsn_number</replaceable>' TIMEOUT <replaceable class="parameter">wait_timeout</replaceable>
+WAIT FOR LSN '<replaceable class="parameter">lsn_number</replaceable>', TIMESTAMP <replaceable class="parameter">wait_time</replaceable>
+WAIT FOR TIMESTAMP <replaceable class="parameter">wait_time</replaceable>
+WAIT FOR ALL LSN '<replaceable class="parameter">lsn_number</replaceable>' TIMEOUT <replaceable class="parameter">wait_timeout</replaceable>, TIMESTAMP <replaceable class="parameter">wait_time</replaceable>
+WAIT FOR ANY LSN '<replaceable class="parameter">lsn_number</replaceable>', TIMESTAMP <replaceable class="parameter">wait_time</replaceable>
+</synopsis>
+ </refsynopsisdiv>
+
+ <refsect1>
+  <title>Description</title>
+
+  <para>
+   <command>WAIT FOR</command> provides a simple interprocess communication
+   mechanism to wait for timestamp, timeout or the target log sequence number
+   (<acronym>LSN</acronym>) on standby in <productname>PostgreSQL</productname>
+   databases with master-standby asynchronous replication. When run with the
+   <replaceable>LSN</replaceable> option, the <command>WAIT FOR</command>
+   command waits for the specified <acronym>LSN</acronym> to be replayed.
+   If no timestamp or timeout was specified, wait time is unlimited.
+   Waiting can be interrupted using <literal>Ctrl+C</literal>, or
+   by shutting down the <literal>postgres</literal> server.
+  </para>
+
+ </refsect1>
+
+ <refsect1>
+  <title>Parameters</title>
+
+  <variablelist>
+   <varlistentry>
+    <term><replaceable class="parameter">LSN</replaceable></term>
+    <listitem>
+     <para>
+      Specify the target log sequence number to wait for.
+     </para>
+    </listitem>
+   </varlistentry>
+
+   <varlistentry>
+    <term>TIMEOUT <replaceable class="parameter">wait_timeout</replaceable></term>
+    <listitem>
+     <para>
+      Limit the time interval to wait for the LSN to be replayed.
+      The specified <replaceable>wait_timeout</replaceable> must be an integer
+      and is measured in milliseconds.
+     </para>
+    </listitem>
+   </varlistentry>
+
+   <varlistentry>
+    <term>UNTIL TIMESTAMP <replaceable class="parameter">wait_time</replaceable></term>
+    <listitem>
+     <para>
+      Limit the time to wait for the LSN to be replayed.
+      The specified <replaceable>wait_time</replaceable> must be timestamp.
+     </para>
+    </listitem>
+   </varlistentry>
+
+  </variablelist>
+ </refsect1>
+
+ <refsect1>
+  <title>Examples</title>
+
+  <para>
+   Run <literal>WAIT FOR</literal> from <application>psql</application>,
+   limiting wait time to 10000 milliseconds:
+
+<screen>
+WAIT FOR LSN '0/3F07A6B1' TIMEOUT 10000;
+NOTICE:  LSN is not reached. Try to increase wait time.
+LSN reached
+-------------
+ f
+(1 row)
+</screen>
+  </para>
+
+  <para>
+   Wait until the specified <acronym>LSN</acronym> is replayed:
+<screen>
+WAIT FOR LSN '0/3F07A611';
+LSN reached
+-------------
+ t
+(1 row)
+</screen>
+  </para>
+
+  <para>
+   Limit <acronym>LSN</acronym> wait time to 500000 milliseconds,
+   and then cancel the command if <acronym>LSN</acronym> was not reached:
+<screen>
+WAIT FOR LSN '0/3F0FF791' TIMEOUT 500000;
+^CCancel request sent
+NOTICE:  LSN is not reached. Try to increase wait time.
+ERROR:  canceling statement due to user request
+ LSN reached
+-------------
+ f
+(1 row)
+</screen>
+</para>
+ </refsect1>
+
+ <refsect1>
+  <title>Compatibility</title>
+
+  <para>
+   There is no <command>WAIT FOR</command> statement in the SQL
+   standard.
+  </para>
+ </refsect1>
+</refentry>
diff --git a/doc/src/sgml/reference.sgml b/doc/src/sgml/reference.sgml
index aa94f6adf6..04e17620dd 100644
--- a/doc/src/sgml/reference.sgml
+++ b/doc/src/sgml/reference.sgml
@@ -216,6 +216,7 @@
    &update;
    &vacuum;
    &values;
+   &wait;
 
  </reference>
 
diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index 853b540945..02906c586d 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -43,6 +43,7 @@
 #include "backup/basebackup.h"
 #include "catalog/pg_control.h"
 #include "commands/tablespace.h"
+#include "commands/wait.h"
 #include "common/file_utils.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -1825,6 +1826,13 @@ PerformWalRecovery(void)
 				break;
 			}
 
+			/*
+			 * If we replayed an LSN that someone was waiting for,
+			 * set latches in shared memory array to notify the waiter.
+			 */
+			if (XLogRecoveryCtl->lastReplayedEndRecPtr >= GetMinWait())
+				WaitSetLatch(XLogRecoveryCtl->lastReplayedEndRecPtr);
+
 			/* Else, try to fetch the next WAL record */
 			record = ReadRecord(xlogprefetcher, LOG, false, replayTLI);
 		} while (record != NULL);
diff --git a/src/backend/commands/Makefile b/src/backend/commands/Makefile
index 48f7348f91..d8f6965d8c 100644
--- a/src/backend/commands/Makefile
+++ b/src/backend/commands/Makefile
@@ -61,6 +61,7 @@ OBJS = \
 	vacuum.o \
 	vacuumparallel.o \
 	variable.o \
-	view.o
+	view.o \
+	wait.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/commands/meson.build b/src/backend/commands/meson.build
index 6dd00a4abd..3f06dc5341 100644
--- a/src/backend/commands/meson.build
+++ b/src/backend/commands/meson.build
@@ -50,4 +50,5 @@ backend_sources += files(
   'vacuumparallel.c',
   'variable.c',
   'view.c',
+  'wait.c',
 )
diff --git a/src/backend/commands/wait.c b/src/backend/commands/wait.c
new file mode 100644
index 0000000000..8256240fa9
--- /dev/null
+++ b/src/backend/commands/wait.c
@@ -0,0 +1,403 @@
+/*-------------------------------------------------------------------------
+ *
+ * wait.c
+ *	  Implements WAIT FOR, which allows waiting for events such as
+ *	  time passing or LSN having been replayed on replica.
+ *
+ * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
+ * Portions Copyright (c) 2024, Regents of PostgresPro
+ *
+ * IDENTIFICATION
+ *	  src/backend/commands/wait.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include <float.h>
+#include <math.h>
+#include "postgres.h"
+#include "pgstat.h"
+#include "fmgr.h"
+#include "access/transam.h"
+#include "access/xact.h"
+#include "access/xlog.h"
+#include "access/xlogdefs.h"
+#include "access/xlogrecovery.h"
+#include "catalog/pg_type.h"
+#include "commands/wait.h"
+#include "funcapi.h"
+#include "miscadmin.h"
+#include "storage/ipc.h"
+#include "storage/latch.h"
+#include "storage/pmsignal.h"
+#include "storage/proc.h"
+#include "storage/procarray.h"
+#include "storage/shmem.h"
+#include "storage/spin.h"
+#include "storage/sinvaladt.h"
+#include "utils/builtins.h"
+#include "utils/pg_lsn.h"
+#include "utils/timestamp.h"
+#include "executor/spi.h"
+#include "utils/fmgrprotos.h"
+
+/* Add to / delete from shared memory array */
+static void AddEvent(XLogRecPtr lsn_to_wait);
+static void DeleteEvent(void);
+
+/* Shared memory structure */
+typedef struct
+{
+	int			backend_maxid;
+	pg_atomic_uint64	min_lsn;
+	slock_t		mutex;
+	XLogRecPtr	waited_lsn[FLEXIBLE_ARRAY_MEMBER];
+} WaitState;
+
+static volatile WaitState *state;
+
+/* Add the event of the current backend to the shared memory array */
+static void
+AddEvent(XLogRecPtr lsn_to_wait)
+{
+	SpinLockAcquire(&state->mutex);
+	if (state->backend_maxid < MyProcNumber)
+		state->backend_maxid = MyProcNumber;
+
+	state->waited_lsn[MyProcNumber] = lsn_to_wait;
+
+	if (lsn_to_wait < pg_atomic_read_u64(&state->min_lsn))
+		pg_atomic_write_u64(&state->min_lsn,lsn_to_wait);
+	SpinLockRelease(&state->mutex);
+}
+
+/*
+ * Delete event of the current backend from the shared memory array.
+ *
+ * TODO: Consider state cleanup on backend failure.
+ * Check:
+ * 1) nomal|smart|fast|immediate stop
+ * 2) SIGKILL and SIGTERM
+ */
+static void
+DeleteEvent(void)
+{
+	int			i;
+	XLogRecPtr	lsn_to_delete = state->waited_lsn[MyProcNumber];
+
+	state->waited_lsn[MyProcNumber] = InvalidXLogRecPtr;
+
+	SpinLockAcquire(&state->mutex);
+
+	/* If we need to choose the next min_lsn, update state->min_lsn */
+	if (pg_atomic_read_u64(&state->min_lsn) == lsn_to_delete)
+	{
+		pg_atomic_write_u64(&state->min_lsn,PG_UINT64_MAX);
+		for (i = 1; i <= state->backend_maxid; i++)
+			if (state->waited_lsn[i] != InvalidXLogRecPtr &&
+				state->waited_lsn[i] < pg_atomic_read_u64(&state->min_lsn))
+				pg_atomic_write_u64(&state->min_lsn,state->waited_lsn[i]);
+	}
+
+	if (state->backend_maxid == MyProcNumber)
+		for (i = (MyProcNumber); i >= 1; i--)
+			if (state->waited_lsn[i] != InvalidXLogRecPtr)
+			{
+				state->backend_maxid = i;
+				break;
+			}
+
+	SpinLockRelease(&state->mutex);
+}
+
+/*
+ * Report amount of shared memory space needed for WaitState
+ */
+Size
+WaitShmemSize(void)
+{
+	Size		size;
+
+	size = offsetof(WaitState, waited_lsn);
+	size = add_size(size, mul_size(MaxBackends, sizeof(XLogRecPtr)));
+	return size;
+}
+
+/* Init array of events in shared memory */
+void
+WaitShmemInit(void)
+{
+	bool		found;
+	uint32		i;
+
+	state = (WaitState *) ShmemInitStruct("pg_wait_lsn",
+										  WaitShmemSize(),
+										  &found);
+	if (!found)
+	{
+		SpinLockInit(&state->mutex);
+
+		for (i = 0; i < MaxBackends; i++)
+			state->waited_lsn[i] = InvalidXLogRecPtr;
+
+		state->backend_maxid = 0;
+		pg_atomic_init_u64(&state->min_lsn,PG_UINT64_MAX);
+	}
+}
+
+/* Set all latches in shared memory to signal that new LSN has been replayed */
+void
+WaitSetLatch(XLogRecPtr cur_lsn)
+{
+	uint32		i;
+	int 		backend_maxid;
+	PGPROC	   *backend;
+
+	SpinLockAcquire(&state->mutex);
+	backend_maxid = state->backend_maxid;
+	SpinLockRelease(&state->mutex);
+
+	for (i = 1; i <= backend_maxid; i++)
+	{
+		backend = ProcNumberGetProc(i);
+		if (state->waited_lsn[i] != 0)
+		{
+			if (backend && state->waited_lsn[i] <= cur_lsn)
+				SetLatch(&backend->procLatch);
+		}
+	}
+}
+
+/* Get minimal LSN that will be next */
+XLogRecPtr
+GetMinWait(void)
+{
+	return state ? pg_atomic_read_u64(&state->min_lsn) : PG_UINT64_MAX;
+}
+
+/*
+ * On WAIT use MyLatch to wait till LSN is replayed,
+ * postmaster dies or timeout happens.
+ */
+int
+WaitUtility(XLogRecPtr lsn, const float8 secs)
+{
+	XLogRecPtr	cur_lsn = GetXLogReplayRecPtr(NULL);
+	int			latch_events;
+	float8		endtime;
+	uint32		res = 0;
+
+	if (!RecoveryInProgress())
+	{
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				errmsg("Work only in standby mode")));
+		return false;
+	}
+
+#define GetNowFloat()	((float8) GetCurrentTimestamp() / 1000000.0)
+	endtime = GetNowFloat() + secs;
+
+	latch_events = WL_TIMEOUT | WL_EXIT_ON_PM_DEATH;
+
+	if (lsn != InvalidXLogRecPtr)
+	{
+		/* Just check if we reached */
+		if (lsn < cur_lsn || secs < 0)
+			return (lsn < cur_lsn);
+
+		latch_events |= WL_LATCH_SET;
+		AddEvent(lsn);
+	}
+	else if (!secs)
+		return 1;
+
+	for (;;)
+	{
+		int			rc;
+		float8		delay = 0;
+		long		delay_ms;
+
+		/* If LSN has been replayed */
+		if (lsn && lsn <= cur_lsn)
+			break;
+
+		if (secs > 0)
+			delay = endtime - GetNowFloat();
+		else if (secs == 0)
+			/*
+			* If we wait forever, then 1 minute timeout to check
+			* for Interupts.
+			*/
+			delay = 60;
+
+		if (delay > 0.0)
+			delay_ms = (long) ceil(delay * 1000.0);
+		else
+			break;
+
+		/*
+		 * If received an interruption from CHECK_FOR_INTERRUPTS,
+		 * then delete the current event from array.
+		 */
+		if (InterruptPending)
+		{
+			if (lsn != InvalidXLogRecPtr)
+				DeleteEvent();
+			ProcessInterrupts();
+		}
+
+		/* If postmaster dies, finish immediately */
+		if (!PostmasterIsAlive())
+			break;
+
+		rc = WaitLatch(MyLatch, latch_events, delay_ms,
+					   WAIT_EVENT_CLIENT_READ);
+
+		if (rc & WL_LATCH_SET)
+			ResetLatch(MyLatch);
+
+		if (lsn && rc & WL_LATCH_SET)
+			cur_lsn = GetXLogReplayRecPtr(NULL);
+	}
+
+	if (lsn != InvalidXLogRecPtr)
+		DeleteEvent();
+
+	if (lsn != InvalidXLogRecPtr && lsn > cur_lsn)
+		elog(NOTICE,"LSN is not reached. Try to increase wait time.");
+	else
+		res = 1;
+
+	return res;
+}
+
+/*
+ * Get the amount of seconds left till the specified time.
+ */
+float8
+WaitTimeResolve(Const *time)
+{
+	int			ret;
+	float8		val;
+	Oid			types[] = { time->consttype };
+	Datum		values[] = { time->constvalue };
+	char		nulls[] = { " " };
+	Datum		result;
+	bool		isnull;
+
+	SPI_connect();
+
+	if (time->consttype == 1083)
+		ret = SPI_execute_with_args("select extract (epoch from ($1 - now()::time))",
+									1, types, values, nulls, true, 0);
+	else if (time->consttype == 1266)
+		ret = SPI_execute_with_args("select extract (epoch from (timezone('UTC',$1)::time - timezone('UTC', now()::timetz)::time))",
+									1, types, values, nulls, true, 0);
+	else
+		ret = SPI_execute_with_args("select extract (epoch from ($1 - now()))",
+									1, types, values, nulls, true, 0);
+
+	Assert(ret >= 0);
+	result = SPI_getbinval(SPI_tuptable->vals[0],
+						   SPI_tuptable->tupdesc,
+						   1, &isnull);
+
+	Assert(!isnull);
+	val = DatumGetFloat8(result);
+
+	elog(INFO, "time: %f", val);
+
+	SPI_finish();
+	return val;
+}
+
+/* Implementation of WAIT FOR */
+int
+WaitMain(WaitStmt *stmt, DestReceiver *dest)
+{
+	ListCell   *events;
+	TupleDesc	tupdesc;
+	TupOutputState *tstate;
+	float8		delay = 0;
+	float8		final_delay = 0;
+	XLogRecPtr	lsn = InvalidXLogRecPtr;
+	XLogRecPtr	final_lsn = InvalidXLogRecPtr;
+	bool		has_lsn = false;
+	bool		wait_forever = true;
+	int			res = 0;
+
+	if (stmt->strategy == WAIT_FOR_ANY)
+	{
+		/* Prepare to find minimum LSN and delay */
+		final_delay = DBL_MAX;
+		final_lsn = PG_UINT64_MAX;
+	}
+
+	/* Extract options from the statement node tree */
+	foreach(events, stmt->events)
+	{
+		WaitStmt   *event = (WaitStmt *) lfirst(events);
+
+		/* LSN to wait for */
+		if (event->lsn)
+		{
+			has_lsn = true;
+			lsn = DatumGetLSN(
+						DirectFunctionCall1(pg_lsn_in,
+							CStringGetDatum(event->lsn)));
+
+			/*
+			 * When waiting for ALL, select max LSN to wait for.
+			 * When waiting for ANY, select min LSN to wait for.
+			 */
+			if ((stmt->strategy == WAIT_FOR_ALL && final_lsn <= lsn) ||
+				(stmt->strategy == WAIT_FOR_ANY && final_lsn > lsn))
+			{
+				final_lsn = lsn;
+			}
+		}
+
+		/* Time delay to wait for */
+		if (event->time || event->delay)
+		{
+			wait_forever = false;
+
+			if (event->delay)
+				delay = event->delay / 1000.0;
+
+			if (event->time)
+			{
+				Const *time = (Const *) event->time;
+				delay = WaitTimeResolve(time);
+			}
+
+			/*
+			 * When waiting for ALL, select max delay to wait for.
+			 * When waiting for ANY, select min delay to wait for.
+			 */
+			if ((stmt->strategy == WAIT_FOR_ALL && final_delay <= delay) ||
+				(stmt->strategy == WAIT_FOR_ANY && final_delay > delay))
+			{
+				final_delay = delay;
+			}
+		}
+	}
+	if (wait_forever)
+		final_delay = 0;
+	if (!has_lsn)
+		final_lsn = InvalidXLogRecPtr;
+
+	res = WaitUtility(final_lsn, final_delay);
+
+	/* Need a tuple descriptor representing a single TEXT column */
+	tupdesc = CreateTemplateTupleDesc(1);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 1, "LSN reached", TEXTOID, -1, 0);
+	/* Prepare for projection of tuples */
+	tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsMinimalTuple);
+
+	/* Send it */
+	do_text_output_oneline(tstate, res?"t":"f");
+	end_tup_output(tstate);
+	return res;
+}
diff --git a/src/backend/parser/analyze.c b/src/backend/parser/analyze.c
index 2255314c51..18be92ddbf 100644
--- a/src/backend/parser/analyze.c
+++ b/src/backend/parser/analyze.c
@@ -83,6 +83,7 @@ static Query *transformCreateTableAsStmt(ParseState *pstate,
 										 CreateTableAsStmt *stmt);
 static Query *transformCallStmt(ParseState *pstate,
 								CallStmt *stmt);
+static void transformWaitForStmt(ParseState *pstate, WaitStmt *stmt);
 static void transformLockingClause(ParseState *pstate, Query *qry,
 								   LockingClause *lc, bool pushedDown);
 #ifdef RAW_EXPRESSION_COVERAGE_TEST
@@ -403,7 +404,21 @@ transformStmt(ParseState *pstate, Node *parseTree)
 			result = transformCallStmt(pstate,
 									   (CallStmt *) parseTree);
 			break;
-
+		case T_WaitStmt:
+			transformWaitForStmt(pstate, (WaitStmt *) parseTree);
+			result = makeNode(Query);
+			result->commandType = CMD_UTILITY;
+			result->utilityStmt = (Node *) parseTree;
+			break;
+		case T_TransactionStmt:
+			{
+				TransactionStmt *stmt = (TransactionStmt *) parseTree;
+				if ((stmt->kind == TRANS_STMT_BEGIN ||
+						stmt->kind == TRANS_STMT_START) && stmt->wait)
+					transformWaitForStmt(pstate, (WaitStmt *) stmt->wait);
+			}
+			/* no break here - we want to fall through to the default */
+			/* FALLTHROUGH */
 		default:
 
 			/*
@@ -3560,6 +3575,23 @@ applyLockingClause(Query *qry, Index rtindex,
 	qry->rowMarks = lappend(qry->rowMarks, rc);
 }
 
+/*
+ * transformWaitForStmt -
+ *	transform the WAIT FOR clause of the BEGIN statement
+ *	transform the WAIT FOR statement (TODO: remove this line if we don't keep it)
+ */
+static void
+transformWaitForStmt(ParseState *pstate, WaitStmt *stmt)
+{
+	ListCell   *events;
+
+	foreach(events, stmt->events)
+	{
+		WaitStmt   *event = (WaitStmt *) lfirst(events);
+		event->time = transformExpr(pstate, event->time, EXPR_KIND_OTHER);
+	}
+}
+
 /*
  * Coverage testing for raw_expression_tree_walker().
  *
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 130f7fc7c3..dfdf6d2d78 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -312,7 +312,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 		SecLabelStmt SelectStmt TransactionStmt TransactionStmtLegacy TruncateStmt
 		UnlistenStmt UpdateStmt VacuumStmt
 		VariableResetStmt VariableSetStmt VariableShowStmt
-		ViewStmt CheckPointStmt CreateConversionStmt
+		ViewStmt WaitStmt CheckPointStmt CreateConversionStmt
 		DeallocateStmt PrepareStmt ExecuteStmt
 		DropOwnedStmt ReassignOwnedStmt
 		AlterTSConfigurationStmt AlterTSDictionaryStmt
@@ -537,7 +537,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 %type <node>	case_expr case_arg when_clause case_default
 %type <list>	when_clause_list
 %type <node>	opt_search_clause opt_cycle_clause
-%type <ival>	sub_type opt_materialized
+%type <ival>	sub_type wait_strategy opt_materialized
 %type <node>	NumericOnly
 %type <list>	NumericOnly_list
 %type <alias>	alias_clause opt_alias_clause opt_alias_clause_for_join_using
@@ -645,6 +645,8 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 %type <partboundspec> PartitionBoundSpec
 %type <list>		hash_partbound
 %type <defelt>		hash_partbound_elem
+%type <list>		wait_list
+%type <node>		WaitEvent wait_for
 
 %type <node>	json_format_clause
 				json_format_clause_opt
@@ -730,7 +732,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 
 	LABEL LANGUAGE LARGE_P LAST_P LATERAL_P
 	LEADING LEAKPROOF LEAST LEFT LEVEL LIKE LIMIT LISTEN LOAD LOCAL
-	LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOCKED LOGGED
+	LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOCKED LOGGED LSN
 
 	MAPPING MATCH MATCHED MATERIALIZED MAXVALUE MERGE METHOD
 	MINUTE_P MINVALUE MODE MONTH_P MOVE
@@ -764,7 +766,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 	SUBSCRIPTION SUBSTRING SUPPORT SYMMETRIC SYSID SYSTEM_P SYSTEM_USER
 
 	TABLE TABLES TABLESAMPLE TABLESPACE TEMP TEMPLATE TEMPORARY TEXT_P THEN
-	TIES TIME TIMESTAMP TO TRAILING TRANSACTION TRANSFORM
+	TIES TIME TIMEOUT TIMESTAMP TO TRAILING TRANSACTION TRANSFORM
 	TREAT TRIGGER TRIM TRUE_P
 	TRUNCATE TRUSTED TYPE_P TYPES_P
 
@@ -774,7 +776,8 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 	VACUUM VALID VALIDATE VALIDATOR VALUE_P VALUES VARCHAR VARIADIC VARYING
 	VERBOSE VERSION_P VIEW VIEWS VOLATILE
 
-	WHEN WHERE WHITESPACE_P WINDOW WITH WITHIN WITHOUT WORK WRAPPER WRITE
+	WAIT WHEN WHERE WHITESPACE_P WINDOW
+	WITH WITHIN WITHOUT WORK WRAPPER WRITE
 
 	XML_P XMLATTRIBUTES XMLCONCAT XMLELEMENT XMLEXISTS XMLFOREST XMLNAMESPACES
 	XMLPARSE XMLPI XMLROOT XMLSERIALIZE XMLTABLE
@@ -1103,6 +1106,7 @@ stmt:
 			| VariableSetStmt
 			| VariableShowStmt
 			| ViewStmt
+			| WaitStmt
 			| /*EMPTY*/
 				{ $$ = NULL; }
 		;
@@ -10934,12 +10938,13 @@ TransactionStmt:
 					n->location = -1;
 					$$ = (Node *) n;
 				}
-			| START TRANSACTION transaction_mode_list_or_empty
+			| START TRANSACTION transaction_mode_list_or_empty wait_for
 				{
 					TransactionStmt *n = makeNode(TransactionStmt);
 
 					n->kind = TRANS_STMT_START;
 					n->options = $3;
+					n->wait = $4;
 					n->location = -1;
 					$$ = (Node *) n;
 				}
@@ -11038,12 +11043,13 @@ TransactionStmt:
 		;
 
 TransactionStmtLegacy:
-			BEGIN_P opt_transaction transaction_mode_list_or_empty
+			BEGIN_P opt_transaction transaction_mode_list_or_empty wait_for
 				{
 					TransactionStmt *n = makeNode(TransactionStmt);
 
 					n->kind = TRANS_STMT_BEGIN;
 					n->options = $3;
+					n->wait = $4;
 					n->location = -1;
 					$$ = (Node *) n;
 				}
@@ -15875,6 +15881,74 @@ xml_passing_mech:
 			| BY VALUE_P
 		;
 
+/*****************************************************************************
+ *
+ *		QUERY:
+ *				WAIT FOR <event> [, <event> ...]
+ *				event is one of:
+ *					LSN value
+ *					TIMEOUT delay
+ *					timestamp
+ *
+ *****************************************************************************/
+WaitStmt:
+			WAIT FOR wait_strategy wait_list
+				{
+					WaitStmt *n = makeNode(WaitStmt);
+					n->strategy = $3;
+					n->events = $4;
+					$$ = (Node *)n;
+				}
+			;
+wait_for:
+			WAIT FOR wait_strategy wait_list
+				{
+					WaitStmt *n = makeNode(WaitStmt);
+					n->strategy = $3;
+					n->events = $4;
+					$$ = (Node *)n;
+				}
+			| /* EMPTY */		{ $$ = NULL; };
+
+wait_strategy:
+			ALL					{ $$ = WAIT_FOR_ALL; }
+			| ANY				{ $$ = WAIT_FOR_ANY; }
+			| /* EMPTY */		{ $$ = WAIT_FOR_ALL; }
+		;
+
+wait_list:
+			WaitEvent					{ $$ = list_make1($1); }
+			| wait_list ',' WaitEvent	{ $$ = lappend($1, $3); }
+			| wait_list WaitEvent		{ $$ = lappend($1, $2); }
+		;
+
+WaitEvent:
+			LSN Sconst
+				{
+					WaitStmt *n = makeNode(WaitStmt);
+					n->lsn = $2;
+					n->delay = 0;
+					n->time = NULL;
+					$$ = (Node *)n;
+				}
+			| TIMEOUT Iconst
+				{
+					WaitStmt *n = makeNode(WaitStmt);
+					n->lsn = NULL;
+					n->delay = $2;
+					n->time = NULL;
+					$$ = (Node *)n;
+				}
+			| ConstDatetime Sconst
+				{
+					WaitStmt *n = makeNode(WaitStmt);
+					n->lsn = NULL;
+					n->delay = 0;
+					n->time = makeStringConstCast($2, @2, $1);
+					$$ = (Node *)n;
+				}
+			;
+
 
 /*
  * Aggregate decoration clauses
@@ -17279,6 +17353,7 @@ unreserved_keyword:
 			| LOCK_P
 			| LOCKED
 			| LOGGED
+			| LSN
 			| MAPPING
 			| MATCH
 			| MATCHED
@@ -17411,6 +17486,7 @@ unreserved_keyword:
 			| TEMPORARY
 			| TEXT_P
 			| TIES
+			| TIMEOUT
 			| TRANSACTION
 			| TRANSFORM
 			| TRIGGER
@@ -17437,6 +17513,7 @@ unreserved_keyword:
 			| VIEW
 			| VIEWS
 			| VOLATILE
+			| WAIT
 			| WHITESPACE_P
 			| WITHIN
 			| WITHOUT
@@ -17869,6 +17946,7 @@ bare_label_keyword:
 			| LOCK_P
 			| LOCKED
 			| LOGGED
+			| LSN
 			| MAPPING
 			| MATCH
 			| MATCHED
@@ -18031,6 +18109,7 @@ bare_label_keyword:
 			| THEN
 			| TIES
 			| TIME
+			| TIMEOUT
 			| TIMESTAMP
 			| TRAILING
 			| TRANSACTION
@@ -18068,6 +18147,7 @@ bare_label_keyword:
 			| VIEW
 			| VIEWS
 			| VOLATILE
+			| WAIT
 			| WHEN
 			| WHITESPACE_P
 			| WORK
diff --git a/src/backend/storage/buffer/localbuf.c b/src/backend/storage/buffer/localbuf.c
index fcfac335a5..485b6b34b7 100644
--- a/src/backend/storage/buffer/localbuf.c
+++ b/src/backend/storage/buffer/localbuf.c
@@ -21,6 +21,7 @@
 #include "storage/buf_internals.h"
 #include "storage/bufmgr.h"
 #include "storage/fd.h"
+#include "storage/procnumber.h"
 #include "utils/guc_hooks.h"
 #include "utils/memutils.h"
 #include "utils/resowner.h"
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 521ed5418c..e1cbc7e49c 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -25,6 +25,7 @@
 #include "access/xlogprefetcher.h"
 #include "access/xlogrecovery.h"
 #include "commands/async.h"
+#include "commands/wait.h"
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "postmaster/autovacuum.h"
@@ -152,6 +153,7 @@ CalculateShmemSize(int *num_semaphores)
 	size = add_size(size, WaitEventExtensionShmemSize());
 	size = add_size(size, InjectionPointShmemSize());
 	size = add_size(size, SlotSyncShmemSize());
+	size = add_size(size, WaitShmemSize());
 #ifdef EXEC_BACKEND
 	size = add_size(size, ShmemBackendArraySize());
 #endif
@@ -357,6 +359,11 @@ CreateOrAttachShmemStructs(void)
 	StatsShmemInit();
 	WaitEventExtensionShmemInit();
 	InjectionPointShmemInit();
+
+	/*
+	 * Init array of Latches in shared memory for WAIT
+	 */
+	WaitShmemInit();
 }
 
 /*
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 83f86a42f7..51e86d8425 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -15,6 +15,7 @@
  *-------------------------------------------------------------------------
  */
 #include "postgres.h"
+#include <float.h>
 
 #include "access/reloptions.h"
 #include "access/twophase.h"
@@ -56,6 +57,7 @@
 #include "commands/user.h"
 #include "commands/vacuum.h"
 #include "commands/view.h"
+#include "commands/wait.h"
 #include "miscadmin.h"
 #include "parser/parse_utilcmd.h"
 #include "postmaster/bgwriter.h"
@@ -265,6 +267,7 @@ ClassifyUtilityCommandAsReadOnly(Node *parsetree)
 		case T_LoadStmt:
 		case T_PrepareStmt:
 		case T_UnlistenStmt:
+		case T_WaitStmt:
 		case T_VariableSetStmt:
 			{
 				/*
@@ -605,6 +608,11 @@ standard_ProcessUtility(PlannedStmt *pstmt,
 					case TRANS_STMT_START:
 						{
 							ListCell   *lc;
+							WaitStmt   *waitstmt = (WaitStmt *) stmt->wait;
+
+							/* If needed to WAIT FOR something but failed */
+							if (stmt->wait && WaitMain(waitstmt, dest) == 0)
+								break;
 
 							BeginTransactionBlock();
 							foreach(lc, stmt->options)
@@ -1062,6 +1070,13 @@ standard_ProcessUtility(PlannedStmt *pstmt,
 				break;
 			}
 
+		case T_WaitStmt:
+			{
+				WaitStmt   *stmt = (WaitStmt *) parsetree;
+				WaitMain(stmt, dest);
+				break;
+			}
+
 		default:
 			/* All other statement types have event trigger support */
 			ProcessUtilitySlow(pstate, pstmt, queryString,
@@ -2840,6 +2855,10 @@ CreateCommandTag(Node *parsetree)
 			tag = CMDTAG_NOTIFY;
 			break;
 
+		case T_WaitStmt:
+			tag = CMDTAG_WAIT;
+			break;
+
 		case T_ListenStmt:
 			tag = CMDTAG_LISTEN;
 			break;
@@ -3488,6 +3507,10 @@ GetCommandLogLevel(Node *parsetree)
 			lev = LOGSTMT_ALL;
 			break;
 
+		case T_WaitStmt:
+			lev = LOGSTMT_ALL;
+			break;
+
 		case T_ListenStmt:
 			lev = LOGSTMT_ALL;
 			break;
diff --git a/src/include/commands/wait.h b/src/include/commands/wait.h
new file mode 100644
index 0000000000..ba022a5e84
--- /dev/null
+++ b/src/include/commands/wait.h
@@ -0,0 +1,26 @@
+/*-------------------------------------------------------------------------
+ *
+ * wait.h
+ *	  prototypes for commands/wait.c
+ *
+ * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
+ * Portions Copyright (c) 2024, Regents of PostgresPRO
+ *
+ * src/include/commands/wait.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef WAIT_H
+#define WAIT_H
+#include "postgres.h"
+#include "tcop/dest.h"
+
+extern int WaitUtility(XLogRecPtr lsn, const float8 delay);
+extern Size WaitShmemSize(void);
+extern void WaitShmemInit(void);
+extern void WaitSetLatch(XLogRecPtr cur_lsn);
+extern XLogRecPtr GetMinWait(void);
+extern float8 WaitTimeResolve(Const *time);
+extern int WaitMain(WaitStmt *stmt, DestReceiver *dest);
+
+#endif   /* WAIT_H */
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 24f5c06bb6..ba5128c116 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3523,6 +3523,7 @@ typedef struct TransactionStmt
 	/* for two-phase-commit related commands */
 	char	   *gid pg_node_attr(query_jumble_ignore);
 	bool		chain;			/* AND CHAIN option */
+	Node	   *wait;			/* WAIT clause: list of events to wait for */
 	/* token location, or -1 if unknown */
 	int			location pg_node_attr(query_jumble_location);
 } TransactionStmt;
@@ -4074,4 +4075,26 @@ typedef struct DropSubscriptionStmt
 	DropBehavior behavior;		/* RESTRICT or CASCADE behavior */
 } DropSubscriptionStmt;
 
+/* ----------------------
+ *		WAIT FOR Statement + WAIT FOR clause of BEGIN statement
+ *		TODO: if we only pick one, remove the other
+ * ----------------------
+ */
+
+typedef enum WaitForStrategy
+{
+	WAIT_FOR_ANY = 0,
+	WAIT_FOR_ALL
+} WaitForStrategy;
+
+typedef struct WaitStmt
+{
+	NodeTag		type;
+	WaitForStrategy strategy;
+	List	   *events;		/* used as a pointer to the next WAIT event */
+	char	   *lsn;		/* WAIT FOR LSN */
+	int			delay;		/* WAIT FOR TIMEOUT */
+	Node	   *time;		/* WAIT FOR TIMESTAMP or TIME */
+} WaitStmt;
+
 #endif							/* PARSENODES_H */
diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h
index 2331acac09..ae7b65526b 100644
--- a/src/include/parser/kwlist.h
+++ b/src/include/parser/kwlist.h
@@ -260,6 +260,7 @@ PG_KEYWORD("location", LOCATION, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("lock", LOCK_P, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("locked", LOCKED, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("logged", LOGGED, UNRESERVED_KEYWORD, BARE_LABEL)
+PG_KEYWORD("lsn", LSN, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("mapping", MAPPING, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("match", MATCH, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("matched", MATCHED, UNRESERVED_KEYWORD, BARE_LABEL)
@@ -433,6 +434,7 @@ PG_KEYWORD("text", TEXT_P, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("then", THEN, RESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("ties", TIES, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("time", TIME, COL_NAME_KEYWORD, BARE_LABEL)
+PG_KEYWORD("timeout", TIMEOUT, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("timestamp", TIMESTAMP, COL_NAME_KEYWORD, BARE_LABEL)
 PG_KEYWORD("to", TO, RESERVED_KEYWORD, AS_LABEL)
 PG_KEYWORD("trailing", TRAILING, RESERVED_KEYWORD, BARE_LABEL)
@@ -473,6 +475,7 @@ PG_KEYWORD("version", VERSION_P, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("view", VIEW, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("views", VIEWS, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("volatile", VOLATILE, UNRESERVED_KEYWORD, BARE_LABEL)
+PG_KEYWORD("wait", WAIT, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("when", WHEN, RESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("where", WHERE, RESERVED_KEYWORD, AS_LABEL)
 PG_KEYWORD("whitespace", WHITESPACE_P, UNRESERVED_KEYWORD, BARE_LABEL)
diff --git a/src/include/tcop/cmdtaglist.h b/src/include/tcop/cmdtaglist.h
index 7fdcec6dd9..295cd6ff3a 100644
--- a/src/include/tcop/cmdtaglist.h
+++ b/src/include/tcop/cmdtaglist.h
@@ -217,3 +217,4 @@ PG_CMDTAG(CMDTAG_TRUNCATE_TABLE, "TRUNCATE TABLE", false, false, false)
 PG_CMDTAG(CMDTAG_UNLISTEN, "UNLISTEN", false, false, false)
 PG_CMDTAG(CMDTAG_UPDATE, "UPDATE", false, false, true)
 PG_CMDTAG(CMDTAG_VACUUM, "VACUUM", false, false, false)
+PG_CMDTAG(CMDTAG_WAIT, "WAIT FOR", false, false, false)
diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build
index c67249500e..bad8b38c87 100644
--- a/src/test/recovery/meson.build
+++ b/src/test/recovery/meson.build
@@ -50,6 +50,8 @@ tests += {
       't/039_end_of_wal.pl',
       't/040_standby_failover_slots_sync.pl',
       't/041_checkpoint_at_promote.pl',
+      't/042_begin_wait.pl',
+      't/043_wait.pl',
     ],
   },
 }
diff --git a/src/test/recovery/t/042_begin_wait.pl b/src/test/recovery/t/042_begin_wait.pl
new file mode 100644
index 0000000000..d56ac8399a
--- /dev/null
+++ b/src/test/recovery/t/042_begin_wait.pl
@@ -0,0 +1,147 @@
+# Checks WAIT FOR
+use strict;
+use warnings;
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Initialize primary node
+my $node_primary = PostgreSQL::Test::Cluster->new('primary');
+$node_primary->init(allows_streaming => 1);
+$node_primary->start;
+
+# And some content and take a backup
+$node_primary->safe_psql('postgres',
+	"CREATE TABLE wait_test AS SELECT generate_series(1,10) AS a");
+my $backup_name = 'my_backup';
+$node_primary->backup($backup_name);
+
+# Create a streaming standby with a 1 second delay from the backup
+my $node_standby = PostgreSQL::Test::Cluster->new('standby');
+my $delay        = 1;
+$node_standby->init_from_backup($node_primary, $backup_name,
+	has_streaming => 1);
+$node_standby->append_conf('postgresql.conf', qq[
+	recovery_min_apply_delay = '${delay}s'
+]);
+$node_standby->start;
+
+
+
+# Make sure that WAIT FOR LSN works: add new content to primary and memorize
+# primary's new LSN, then wait for primary's LSN on standby. Prove that WAIT is
+# able to setup an infinite waiting loop and exit it if given no wait timeout.
+$node_primary->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(11, 20))");
+my $lsn1 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+$node_standby->safe_psql('postgres', "BEGIN WAIT FOR LSN '$lsn1'");
+$node_standby->safe_psql('postgres', "COMMIT");
+
+# Get the current LSN on standby and make sure it's the same as primary's LSN
+my $lsn_standby = $node_standby->safe_psql('postgres',
+	"SELECT pg_last_wal_replay_lsn()");
+my $compare_lsns = $node_standby->safe_psql('postgres',
+	"SELECT pg_lsn_ge('$lsn_standby'::pg_lsn, '$lsn1'::pg_lsn)");
+ok($compare_lsns eq "t", "standby reached the same LSN as primary after WAIT");
+
+
+
+# Check that timeouts work on their own and let us wait for specified time (1s)
+my $current_time = $node_standby->safe_psql('postgres', "SELECT now()");
+my $one_second = 1000; # in milliseconds
+my $start_time = time();
+# While we're at it, also make sure that the syntax with commas works fine and
+# that by default we use WAIT FOR ALL strategy, which means waiting for max time
+$node_standby->safe_psql('postgres',
+	"WAIT FOR TIMEOUT $one_second, TIMESTAMP '$current_time'");
+my $time_waited = (time() - $start_time) * 1000; # convert to milliseconds
+ok($time_waited >= $one_second, "WAIT FOR TIMEOUT waits for enough time");
+
+# Now, check that timeouts work as expected when waiting for LSN
+$node_primary->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(21, 30))");
+my $lsn2 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+my $reached_lsn = $node_standby->safe_psql('postgres',
+	"BEGIN WAIT FOR LSN '$lsn2' TIMEOUT 1");
+ok($reached_lsn eq "f", "WAIT doesn't reach LSN if given too little wait time");
+
+
+#===============================================================================
+# TODO: remove this test if we remove the standalone "WAIT FOR" command
+#===============================================================================
+# We need to check that WAIT works fine inside transactions. For that, let's
+# get two LSNs that will correspond to two different max values in our table.
+$node_primary->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(31, 40))");
+my $lsn3 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+$node_primary->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(41, 50))");
+my $lsn4 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+
+# Before starting transaction, wait for LSN which ensures a max value of 40.
+# Inside the transaction, wait for LSN that ensures a max value of 50.
+# Due to ISOLATION LEVEL REPEATABLE READ, we should NOT see the new max value.
+my $standby_results = $node_standby->safe_psql(
+	'postgres', qq[
+	BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ WAIT FOR LSN '$lsn3';
+	SELECT max(a) FROM wait_test;
+	BEGIN WAIT FOR LSN '$lsn4';
+	SELECT pg_last_wal_replay_lsn();
+	SELECT max(a) FROM wait_test;
+	COMMIT;
+]);
+
+# Make sure that we indeed reach primary's last LSN inside the transaction.
+# For that, check that calling pg_last_wal_replay_lsn returned that LSN.
+my $last_lsn_reached = $standby_results =~ /$lsn4/;
+ok($last_lsn_reached, "WAIT FOR LSN works inside a transaction");
+
+# Check that transaction doesn't break and show us the new max value after WAIT.
+# For that, make sure that the older max value is repeated twice in the results.
+my $count = () = $standby_results =~ /40/g;
+ok($count eq 2, "transaction isolation level doesn't get broken due to WAIT");
+
+
+
+# Get multiple LSNs for testing WAIT FOR ANY / WAIT FOR ALL
+$node_primary->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(51, 60))");
+my $lsn5 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+$node_primary->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(61, 70000))");
+my $lsn6 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+$node_primary->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(61, 800000))");
+my $lsn7 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+
+# Check that WAIT FOR ANY works fine
+$node_standby->safe_psql('postgres',
+	"BEGIN WAIT FOR ANY LSN '$lsn5' LSN '$lsn6' LSN '$lsn7'");
+$lsn_standby = $node_standby->safe_psql('postgres',
+	"SELECT pg_last_wal_replay_lsn()");
+$compare_lsns = $node_standby->safe_psql('postgres',
+	"SELECT pg_lsn_cmp('$lsn_standby'::pg_lsn, '$lsn5'::pg_lsn)");
+ok($compare_lsns ge 0,
+	"WAIT FOR ANY makes us reach at least the minimum LSN from the list");
+$compare_lsns = $node_standby->safe_psql('postgres',
+	"SELECT pg_lsn_cmp('$lsn_standby'::pg_lsn, '$lsn7'::pg_lsn)");
+# TODO: Could this somehow fail due to the machine being very fast at applying LSN?
+ok($compare_lsns lt 0,
+	"WAIT FOR ANY didn't make us reach the maximum LSN from the list");
+
+# Check that WAIT FOR ALL works fine
+$node_standby->safe_psql('postgres',
+	"BEGIN WAIT FOR ALL LSN '$lsn5', LSN '$lsn6', LSN '$lsn7'");
+$lsn_standby = $node_standby->safe_psql('postgres',
+	"SELECT pg_last_wal_replay_lsn()");
+$compare_lsns = $node_standby->safe_psql('postgres',
+	"SELECT pg_lsn_ge('$lsn_standby'::pg_lsn, '$lsn7'::pg_lsn)");
+ok($compare_lsns eq "t",
+	"WAIT FOR ALL makes us reach the maximum LSN from the list");
+
+
+
+$node_standby->stop;
+$node_primary->stop;
+done_testing();
diff --git a/src/test/recovery/t/043_wait.pl b/src/test/recovery/t/043_wait.pl
new file mode 100644
index 0000000000..80f3419be0
--- /dev/null
+++ b/src/test/recovery/t/043_wait.pl
@@ -0,0 +1,145 @@
+# Checks WAIT FOR
+use strict;
+use warnings;
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Initialize primary node
+my $node_primary = PostgreSQL::Test::Cluster->new('primary');
+$node_primary->init(allows_streaming => 1);
+$node_primary->start;
+
+# And some content and take a backup
+$node_primary->safe_psql('postgres',
+	"CREATE TABLE wait_test AS SELECT generate_series(1,10) AS a");
+my $backup_name = 'my_backup';
+$node_primary->backup($backup_name);
+
+# Create a streaming standby with a 1 second delay from the backup
+my $node_standby = PostgreSQL::Test::Cluster->new('standby');
+my $delay        = 1;
+$node_standby->init_from_backup($node_primary, $backup_name,
+	has_streaming => 1);
+$node_standby->append_conf('postgresql.conf', qq[
+	recovery_min_apply_delay = '${delay}s'
+]);
+$node_standby->start;
+
+
+
+# Make sure that WAIT FOR LSN works: add new content to primary and memorize
+# primary's new LSN, then wait for primary's LSN on standby. Prove that WAIT is
+# able to setup an infinite waiting loop and exit it if given no wait timeout.
+$node_primary->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(11, 20))");
+my $lsn1 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+$node_standby->safe_psql('postgres', "WAIT FOR LSN '$lsn1'");
+
+# Get the current LSN on standby and make sure it's the same as primary's LSN
+my $lsn_standby = $node_standby->safe_psql('postgres',
+	"SELECT pg_last_wal_replay_lsn()");
+my $compare_lsns = $node_standby->safe_psql('postgres',
+	"SELECT pg_lsn_ge('$lsn_standby'::pg_lsn, '$lsn1'::pg_lsn)");
+ok($compare_lsns eq "t", "standby reached the same LSN as primary after WAIT");
+
+
+
+# Check that timeouts work on their own and let us wait for specified time (1s)
+my $current_time = $node_standby->safe_psql('postgres', "SELECT now()");
+my $one_second = 1000; # in milliseconds
+my $start_time = time();
+# While we're at it, also make sure that the syntax with commas works fine and
+# that by default we use WAIT FOR ALL strategy, which means waiting for max time
+$node_standby->safe_psql('postgres',
+	"WAIT FOR TIMEOUT $one_second, TIMESTAMP '$current_time'");
+my $time_waited = (time() - $start_time) * 1000; # convert to milliseconds
+ok($time_waited >= $one_second, "WAIT FOR TIMEOUT waits for enough time");
+
+# Now, check that timeouts work as expected when waiting for LSN
+$node_primary->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(21, 30))");
+my $lsn2 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+my $reached_lsn = $node_standby->safe_psql('postgres',
+	"WAIT FOR LSN '$lsn2' TIMEOUT 1");
+ok($reached_lsn eq "f", "WAIT doesn't reach LSN if given too little wait time");
+
+
+
+# We need to check that WAIT works fine inside transactions. For that, let's
+# get two LSNs that will correspond to two different max values in our table.
+$node_primary->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(31, 40))");
+my $lsn3 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+$node_primary->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(41, 50))");
+my $lsn4 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+
+# Before starting transaction, wait for LSN which ensures a max value of 40.
+# Inside the transaction, wait for LSN that ensures a max value of 50.
+# Due to ISOLATION LEVEL REPEATABLE READ, we should NOT see the new max value.
+my $standby_results = $node_standby->safe_psql(
+	'postgres', qq[
+	WAIT FOR LSN '$lsn3';
+	BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;
+	SELECT max(a) FROM wait_test;
+	WAIT FOR LSN '$lsn4';
+	SELECT pg_last_wal_replay_lsn();
+	SELECT max(a) FROM wait_test;
+	COMMIT;
+]);
+
+# Make sure that we indeed reach primary's last LSN inside the transaction.
+# For that, check that calling pg_last_wal_replay_lsn returned that LSN.
+my $last_lsn_reached = $standby_results =~ /$lsn4/;
+ok($last_lsn_reached, "WAIT FOR LSN works inside a transaction");
+
+# Check that transaction doesn't break and show us the new max value after WAIT.
+# For that, make sure that the older max value is repeated twice in the results.
+my $count = () = $standby_results =~ /40/g;
+ok($count eq 2, "transaction isolation level doesn't get broken due to WAIT");
+
+
+
+# Get multiple LSNs for testing WAIT FOR ANY / WAIT FOR ALL
+$node_primary->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(51, 60))");
+my $lsn5 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+$node_primary->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(61, 70000))");
+my $lsn6 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+$node_primary->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(61, 800000))");
+my $lsn7 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+
+# Check that WAIT FOR ANY works fine
+$node_standby->safe_psql('postgres',
+	"WAIT FOR ANY LSN '$lsn5' LSN '$lsn6' LSN '$lsn7'");
+$lsn_standby = $node_standby->safe_psql('postgres',
+	"SELECT pg_last_wal_replay_lsn()");
+$compare_lsns = $node_standby->safe_psql('postgres',
+	"SELECT pg_lsn_cmp('$lsn_standby'::pg_lsn, '$lsn5'::pg_lsn)");
+ok($compare_lsns ge 0,
+	"WAIT FOR ANY makes us reach at least the minimum LSN from the list");
+$compare_lsns = $node_standby->safe_psql('postgres',
+	"SELECT pg_lsn_cmp('$lsn_standby'::pg_lsn, '$lsn7'::pg_lsn)");
+# TODO: Could this somehow fail due to the machine being very fast at applying LSN?
+ok($compare_lsns lt 0,
+	"WAIT FOR ANY didn't make us reach the maximum LSN from the list");
+
+# Check that WAIT FOR ALL works fine
+$node_standby->safe_psql('postgres',
+	"WAIT FOR ALL LSN '$lsn5', LSN '$lsn6', LSN '$lsn7'");
+$lsn_standby = $node_standby->safe_psql('postgres',
+	"SELECT pg_last_wal_replay_lsn()");
+$compare_lsns = $node_standby->safe_psql('postgres',
+	"SELECT pg_lsn_ge('$lsn_standby'::pg_lsn, '$lsn7'::pg_lsn)");
+ok($compare_lsns eq "t",
+	"WAIT FOR ALL makes us reach the maximum LSN from the list");
+
+
+
+$node_standby->stop;
+$node_primary->stop;
+done_testing();
diff --git a/doc/src/sgml/ref/after.sgml b/doc/src/sgml/ref/after.sgml
new file mode 100644
index 0000000000..def0ec6be3
--- /dev/null
+++ b/doc/src/sgml/ref/after.sgml
@@ -0,0 +1,118 @@
+<!--
+doc/src/sgml/ref/after.sgml
+PostgreSQL documentation
+-->
+
+<refentry id="sql-wait">
+ <indexterm zone="sql-wait">
+  <primary>AFTER</primary>
+ </indexterm>
+
+ <refmeta>
+  <refentrytitle>AFTER</refentrytitle>
+  <manvolnum>7</manvolnum>
+  <refmiscinfo>SQL - Language Statements</refmiscinfo>
+ </refmeta>
+
+ <refnamediv>
+  <refname>AFTER</refname>
+  <refpurpose>AFTER the target <acronym>LSN</acronym> to be replayed and for specified time to timeout</refpurpose>
+ </refnamediv>
+
+ <refsynopsisdiv>
+<synopsis>
+AFTER <replaceable class="parameter">LSN</replaceable> [WITHIN timeout_in_milliseconds]
+
+AFTER LSN '<replaceable class="parameter">lsn_number</replaceable>'
+AFTER LSN '<replaceable class="parameter">lsn_number</replaceable>' WITHIN <replaceable class="parameter">wait_timeout</replaceable>
+</synopsis>
+ </refsynopsisdiv>
+
+ <refsect1>
+  <title>Description</title>
+
+  <para>
+   <command>AFTER</command> provides a simple interprocess communication
+   mechanism to wait for the target log sequence number 
+   (<acronym>LSN</acronym>) on standby in <productname>PostgreSQL</productname>
+   databases with master-standby asynchronous replication. <command>AFTER</command>
+   command waits for the specified <acronym>LSN</acronym> to be replayed.
+   If no timeout was specified, wait time is unlimited.
+   Waiting can be interrupted using <literal>Ctrl+C</literal>, or
+   by shutting down the <literal>postgres</literal> server.
+  </para>
+
+ </refsect1>
+
+ <refsect1>
+  <title>Parameters</title>
+
+  <variablelist>
+   <varlistentry>
+    <term><replaceable class="parameter">LSN</replaceable></term>
+    <listitem>
+     <para>
+      Specify the target log sequence number to wait for.
+     </para>
+    </listitem>
+   </varlistentry>
+
+   <varlistentry>
+    <term>TIMEOUT <replaceable class="parameter">wait_timeout</replaceable></term>
+    <listitem>
+     <para>
+      Limit the time interval to AFTER the LSN to be replayed.
+      The specified <replaceable>wait_timeout</replaceable> must be an integer
+      and is measured in milliseconds.
+     </para>
+    </listitem>
+   </varlistentry>
+
+  </variablelist>
+ </refsect1>
+
+ <refsect1>
+  <title>Examples</title>
+
+  <para>
+   Run <literal>AFTER</literal> from <application>psql</application>,
+   limiting wait time to 10000 milliseconds:
+
+<screen>
+AFTER '0/3F07A6B1' WITHIN 10000;
+NOTICE:  LSN is not reached. Try to increase wait time.
+LSN reached
+-------------
+ f
+(1 row)
+</screen>
+  </para>
+
+  <para>
+   Wait until the specified <acronym>LSN</acronym> is replayed:
+<screen>
+AFTER '0/3F07A611';
+LSN reached
+-------------
+ t
+(1 row)
+</screen>
+  </para>
+
+  <para>
+   Limit <acronym>LSN</acronym> wait time to 500000 milliseconds,
+   and then cancel the command if <acronym>LSN</acronym> was not reached:
+<screen>
+AFTER LSN '0/3F0FF791' WITHIN 500000;
+^CCancel request sent
+NOTICE:  LSN is not reached. Try to increase wait time.
+ERROR:  canceling statement due to user request
+ LSN reached
+-------------
+ f
+(1 row)
+</screen>
+</para>
+</refsect1>
+
+</refentry>
diff --git a/doc/src/sgml/ref/allfiles.sgml b/doc/src/sgml/ref/allfiles.sgml
index 4a42999b18..ff2e6e0f3f 100644
--- a/doc/src/sgml/ref/allfiles.sgml
+++ b/doc/src/sgml/ref/allfiles.sgml
@@ -6,6 +6,7 @@ Complete list of usable sgml source files in this directory.
 
 <!-- SQL commands -->
 <!ENTITY abort              SYSTEM "abort.sgml">
+<!ENTITY abort              SYSTEM "after.sgml">
 <!ENTITY alterAggregate     SYSTEM "alter_aggregate.sgml">
 <!ENTITY alterCollation     SYSTEM "alter_collation.sgml">
 <!ENTITY alterConversion    SYSTEM "alter_conversion.sgml">
@@ -188,6 +189,7 @@ Complete list of usable sgml source files in this directory.
 <!ENTITY update             SYSTEM "update.sgml">
 <!ENTITY vacuum             SYSTEM "vacuum.sgml">
 <!ENTITY values             SYSTEM "values.sgml">
+<!ENTITY wait               SYSTEM "wait.sgml">
 
 <!-- applications and utilities -->
 <!ENTITY clusterdb          SYSTEM "clusterdb.sgml">
diff --git a/doc/src/sgml/ref/begin.sgml b/doc/src/sgml/ref/begin.sgml
index 016b021487..a2794763b1 100644
--- a/doc/src/sgml/ref/begin.sgml
+++ b/doc/src/sgml/ref/begin.sgml
@@ -21,13 +21,16 @@ PostgreSQL documentation
 
  <refsynopsisdiv>
 <synopsis>
-BEGIN [ WORK | TRANSACTION ] [ <replaceable class="parameter">transaction_mode</replaceable> [, ...] ]
+BEGIN [ WORK | TRANSACTION ] [ <replaceable class="parameter">transaction_mode</replaceable> [, ...] ] <replaceable class="parameter">wait_event</replaceable>
 
 <phrase>where <replaceable class="parameter">transaction_mode</replaceable> is one of:</phrase>
 
     ISOLATION LEVEL { SERIALIZABLE | REPEATABLE READ | READ COMMITTED | READ UNCOMMITTED }
     READ WRITE | READ ONLY
     [ NOT ] DEFERRABLE
+
+<phrase>where <replaceable class="parameter">wait_event</replaceable> is:</phrase>
+    AFTER <replaceable class="parameter">lsn_value</replaceable> [ WITHIN number_of_milliseconds ]
 </synopsis>
  </refsynopsisdiv>
 
diff --git a/doc/src/sgml/ref/start_transaction.sgml b/doc/src/sgml/ref/start_transaction.sgml
index 74ccd7e345..46a3bcf1a8 100644
--- a/doc/src/sgml/ref/start_transaction.sgml
+++ b/doc/src/sgml/ref/start_transaction.sgml
@@ -21,13 +21,16 @@ PostgreSQL documentation
 
  <refsynopsisdiv>
 <synopsis>
-START TRANSACTION [ <replaceable class="parameter">transaction_mode</replaceable> [, ...] ]
+START TRANSACTION [ <replaceable class="parameter">transaction_mode</replaceable> [, ...] ] <replaceable class="parameter">wait_event</replaceable>
 
 <phrase>where <replaceable class="parameter">transaction_mode</replaceable> is one of:</phrase>
 
     ISOLATION LEVEL { SERIALIZABLE | REPEATABLE READ | READ COMMITTED | READ UNCOMMITTED }
     READ WRITE | READ ONLY
     [ NOT ] DEFERRABLE
+
+<phrase>where <replaceable class="parameter">wait_event</replaceable> is:</phrase>
+    AFTER <replaceable class="parameter">lsn_value</replaceable> [ WITHIN number_of_milliseconds ]
 </synopsis>
  </refsynopsisdiv>
 
diff --git a/doc/src/sgml/reference.sgml b/doc/src/sgml/reference.sgml
index aa94f6adf6..7c94cf9de1 100644
--- a/doc/src/sgml/reference.sgml
+++ b/doc/src/sgml/reference.sgml
@@ -34,6 +34,7 @@
   </partintro>
 
    &abort;
+   &after;
    &alterAggregate;
    &alterCollation;
    &alterConversion;
@@ -216,6 +217,7 @@
    &update;
    &vacuum;
    &values;
+   &wait;
 
  </reference>
 
diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index 853b540945..736124208f 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -43,6 +43,7 @@
 #include "backup/basebackup.h"
 #include "catalog/pg_control.h"
 #include "commands/tablespace.h"
+#include "commands/wait.h"
 #include "common/file_utils.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -1825,6 +1826,15 @@ PerformWalRecovery(void)
 				break;
 			}
 
+			/*
+			* If we replayed an LSN that someone was waiting for,
+			* set latches in shared memory array to notify the waiter.
+			*/
+			if (XLogRecoveryCtl->lastReplayedEndRecPtr >= GetMinWait())
+			{
+				 WaitSetLatch(XLogRecoveryCtl->lastReplayedEndRecPtr);
+			}
+
 			/* Else, try to fetch the next WAL record */
 			record = ReadRecord(xlogprefetcher, LOG, false, replayTLI);
 		} while (record != NULL);
diff --git a/src/backend/commands/Makefile b/src/backend/commands/Makefile
index 48f7348f91..d8f6965d8c 100644
--- a/src/backend/commands/Makefile
+++ b/src/backend/commands/Makefile
@@ -61,6 +61,7 @@ OBJS = \
 	vacuum.o \
 	vacuumparallel.o \
 	variable.o \
-	view.o
+	view.o \
+	wait.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/commands/meson.build b/src/backend/commands/meson.build
index 6dd00a4abd..3f06dc5341 100644
--- a/src/backend/commands/meson.build
+++ b/src/backend/commands/meson.build
@@ -50,4 +50,5 @@ backend_sources += files(
   'vacuumparallel.c',
   'variable.c',
   'view.c',
+  'wait.c',
 )
diff --git a/src/backend/commands/wait.c b/src/backend/commands/wait.c
new file mode 100644
index 0000000000..6f6749118d
--- /dev/null
+++ b/src/backend/commands/wait.c
@@ -0,0 +1,298 @@
+/*-------------------------------------------------------------------------
+ *
+ * wait.c
+ *	  Implements WAIT FOR, which allows waiting for events such as
+ *	  time passing or LSN having been replayed on replica.
+ *
+ * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
+ * Portions Copyright (c) 2024, Regents of PostgresPro
+ *
+ * IDENTIFICATION
+ *	  src/backend/commands/wait.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include <float.h>
+#include <math.h>
+#include "postgres.h"
+#include "pgstat.h"
+#include "fmgr.h"
+#include "access/transam.h"
+#include "access/xact.h"
+#include "access/xlog.h"
+#include "access/xlogdefs.h"
+#include "access/xlogrecovery.h"
+#include "catalog/pg_type.h"
+#include "commands/wait.h"
+#include "funcapi.h"
+#include "miscadmin.h"
+#include "storage/ipc.h"
+#include "storage/latch.h"
+#include "storage/pmsignal.h"
+#include "storage/proc.h"
+#include "storage/procarray.h"
+#include "storage/shmem.h"
+#include "storage/spin.h"
+#include "storage/sinvaladt.h"
+#include "utils/builtins.h"
+#include "utils/pg_lsn.h"
+#include "utils/timestamp.h"
+#include "executor/spi.h"
+#include "utils/fmgrprotos.h"
+
+/* Add to / delete from shared memory array */
+static void AddEvent(XLogRecPtr lsn_to_wait);
+static void DeleteEvent(void);
+
+/* Shared memory structure */
+typedef struct
+{
+	int			backend_maxid;
+	pg_atomic_uint64	min_lsn;
+	slock_t		mutex;
+	XLogRecPtr	waited_lsn[FLEXIBLE_ARRAY_MEMBER];
+} WaitState;
+
+static volatile WaitState *state;
+
+/* Add the event of the current backend to the shared memory array */
+static void
+AddEvent(XLogRecPtr lsn_to_wait)
+{
+	SpinLockAcquire(&state->mutex);
+	if (state->backend_maxid < MyProcNumber)
+		state->backend_maxid = MyProcNumber;
+
+	state->waited_lsn[MyProcNumber] = lsn_to_wait;
+
+	if (lsn_to_wait < pg_atomic_read_u64(&state->min_lsn))
+		pg_atomic_write_u64(&state->min_lsn,lsn_to_wait);
+	SpinLockRelease(&state->mutex);
+}
+
+/*
+ * Delete event of the current backend from the shared memory array.
+ *
+ * TODO: Consider state cleanup on backend failure.
+ * Check:
+ * 1) nomal|smart|fast|immediate stop
+ * 2) SIGKILL and SIGTERM
+ */
+static void
+DeleteEvent(void)
+{
+	int			i;
+	XLogRecPtr	lsn_to_delete = state->waited_lsn[MyProcNumber];
+
+	state->waited_lsn[MyProcNumber] = InvalidXLogRecPtr;
+
+	SpinLockAcquire(&state->mutex);
+
+	/* If we need to choose the next min_lsn, update state->min_lsn */
+	if (pg_atomic_read_u64(&state->min_lsn) == lsn_to_delete)
+	{
+		pg_atomic_write_u64(&state->min_lsn,PG_UINT64_MAX);
+		for (i = 1; i <= state->backend_maxid; i++)
+			if (state->waited_lsn[i] != InvalidXLogRecPtr &&
+				state->waited_lsn[i] < pg_atomic_read_u64(&state->min_lsn))
+				pg_atomic_write_u64(&state->min_lsn,state->waited_lsn[i]);
+	}
+
+	if (state->backend_maxid == MyProcNumber)
+		for (i = (MyProcNumber); i >= 1; i--)
+			if (state->waited_lsn[i] != InvalidXLogRecPtr)
+			{
+				state->backend_maxid = i;
+				break;
+			}
+
+	SpinLockRelease(&state->mutex);
+}
+
+/*
+ * Report amount of shared memory space needed for WaitState
+ */
+Size
+WaitShmemSize(void)
+{
+	Size		size;
+
+	size = offsetof(WaitState, waited_lsn);
+	size = add_size(size, mul_size(MaxBackends, sizeof(XLogRecPtr)));
+	return size;
+}
+
+/* Init array of events in shared memory */
+void
+WaitShmemInit(void)
+{
+	bool		found;
+	uint32		i;
+
+	state = (WaitState *) ShmemInitStruct("pg_wait_lsn",
+										  WaitShmemSize(),
+										  &found);
+	if (!found)
+	{
+		SpinLockInit(&state->mutex);
+
+		for (i = 0; i < MaxBackends; i++)
+			state->waited_lsn[i] = InvalidXLogRecPtr;
+
+		state->backend_maxid = 0;
+		pg_atomic_init_u64(&state->min_lsn,PG_UINT64_MAX);
+	}
+}
+
+/* Set all latches in shared memory to signal that new LSN has been replayed */
+void
+WaitSetLatch(XLogRecPtr cur_lsn)
+{
+	uint32		i;
+	int 		backend_maxid;
+	PGPROC	   *backend;
+
+	SpinLockAcquire(&state->mutex);
+	backend_maxid = state->backend_maxid;
+	SpinLockRelease(&state->mutex);
+
+	for (i = 1; i <= backend_maxid; i++)
+	{
+		backend = ProcNumberGetProc(i);
+		if (state->waited_lsn[i] != 0)
+		{
+			if (backend && state->waited_lsn[i] <= cur_lsn)
+				SetLatch(&backend->procLatch);
+		}
+	}
+}
+
+/* Get minimal LSN that will be next */
+XLogRecPtr
+GetMinWait(void)
+{
+	return state ? pg_atomic_read_u64(&state->min_lsn) : PG_UINT64_MAX;
+}
+
+/*
+ * On WAIT use MyLatch to wait till LSN is replayed,
+ * postmaster dies or timeout happens.
+ */
+int
+WaitUtility(XLogRecPtr lsn, const float8 secs)
+{
+	XLogRecPtr	cur_lsn = GetXLogReplayRecPtr(NULL);
+	int			latch_events;
+	float8		endtime;
+	uint32		res = 0;
+
+	if (!RecoveryInProgress())
+	{
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				errmsg("Work only in standby mode")));
+		return false;
+	}
+
+#define GetNowFloat()	((float8) GetCurrentTimestamp() / 1000000.0)
+	endtime = GetNowFloat() + secs;
+
+	latch_events = WL_TIMEOUT | WL_EXIT_ON_PM_DEATH;
+
+	if (lsn != InvalidXLogRecPtr)
+	{
+		/* Just check if we reached */
+		if (lsn < cur_lsn || secs < 0)
+			return (lsn < cur_lsn);
+
+		latch_events |= WL_LATCH_SET;
+		AddEvent(lsn);
+	}
+	else if (!secs)
+		return 1;
+
+	for (;;)
+	{
+		int			rc;
+		float8		delay = 0;
+		long		delay_ms;
+
+		/* If LSN has been replayed */
+		if (lsn && lsn <= cur_lsn)
+			break;
+
+		if (secs > 0)
+			delay = endtime - GetNowFloat();
+		else if (secs == 0)
+			/*
+			* If we wait forever, then 1 minute timeout to check
+			* for Interupts.
+			*/
+			delay = 60;
+
+		if (delay > 0.0)
+			delay_ms = (long) ceil(delay * 1000.0);
+		else
+			break;
+
+		/*
+		 * If received an interruption from CHECK_FOR_INTERRUPTS,
+		 * then delete the current event from array.
+		 */
+		if (InterruptPending)
+		{
+			if (lsn != InvalidXLogRecPtr)
+				DeleteEvent();
+			ProcessInterrupts();
+		}
+
+		/* If postmaster dies, finish immediately */
+		if (!PostmasterIsAlive())
+			break;
+
+		rc = WaitLatch(MyLatch, latch_events, delay_ms,
+					   WAIT_EVENT_CLIENT_READ);
+
+		if (rc & WL_LATCH_SET)
+			ResetLatch(MyLatch);
+
+		if (lsn && rc & WL_LATCH_SET)
+			cur_lsn = GetXLogReplayRecPtr(NULL);
+	}
+
+	if (lsn != InvalidXLogRecPtr)
+		DeleteEvent();
+
+	if (lsn != InvalidXLogRecPtr && lsn > cur_lsn)
+		elog(NOTICE,"LSN is not reached. Try to increase wait time.");
+	else
+		res = 1;
+
+	return res;
+}
+
+/* Implementation of WAIT FOR */
+int
+WaitMain(WaitStmt *stmt, DestReceiver *dest)
+{
+	TupleDesc	tupdesc;
+	TupOutputState *tstate;
+	XLogRecPtr	lsn = InvalidXLogRecPtr;
+	int		res = 0;
+
+	lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in,
+												 CStringGetDatum(stmt->lsn)));
+	res = WaitUtility(lsn, stmt->delay);
+
+	/* Need a tuple descriptor representing a single TEXT column */
+	tupdesc = CreateTemplateTupleDesc(1);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 1, "LSN reached", TEXTOID, -1, 0);
+	/* Prepare for projection of tuples */
+	tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsMinimalTuple);
+
+	/* Send it */
+	do_text_output_oneline(tstate, res?"t":"f");
+	end_tup_output(tstate);
+	return res;
+}
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 130f7fc7c3..37c425dbe4 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -312,7 +312,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 		SecLabelStmt SelectStmt TransactionStmt TransactionStmtLegacy TruncateStmt
 		UnlistenStmt UpdateStmt VacuumStmt
 		VariableResetStmt VariableSetStmt VariableShowStmt
-		ViewStmt CheckPointStmt CreateConversionStmt
+		ViewStmt WaitStmt CheckPointStmt CreateConversionStmt
 		DeallocateStmt PrepareStmt ExecuteStmt
 		DropOwnedStmt ReassignOwnedStmt
 		AlterTSConfigurationStmt AlterTSDictionaryStmt
@@ -645,6 +645,8 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 %type <partboundspec> PartitionBoundSpec
 %type <list>		hash_partbound
 %type <defelt>		hash_partbound_elem
+%type <ival>		wait_time
+%type <node>		wait_for
 
 %type <node>	json_format_clause
 				json_format_clause_opt
@@ -1103,6 +1105,7 @@ stmt:
 			| VariableSetStmt
 			| VariableShowStmt
 			| ViewStmt
+			| WaitStmt
 			| /*EMPTY*/
 				{ $$ = NULL; }
 		;
@@ -10934,12 +10937,13 @@ TransactionStmt:
 					n->location = -1;
 					$$ = (Node *) n;
 				}
-			| START TRANSACTION transaction_mode_list_or_empty
+			| START TRANSACTION transaction_mode_list_or_empty wait_for
 				{
 					TransactionStmt *n = makeNode(TransactionStmt);
 
 					n->kind = TRANS_STMT_START;
 					n->options = $3;
+					n->wait = $4;
 					n->location = -1;
 					$$ = (Node *) n;
 				}
@@ -11038,12 +11042,13 @@ TransactionStmt:
 		;
 
 TransactionStmtLegacy:
-			BEGIN_P opt_transaction transaction_mode_list_or_empty
+			BEGIN_P opt_transaction transaction_mode_list_or_empty wait_for
 				{
 					TransactionStmt *n = makeNode(TransactionStmt);
 
 					n->kind = TRANS_STMT_BEGIN;
 					n->options = $3;
+					n->wait = $4;
 					n->location = -1;
 					$$ = (Node *) n;
 				}
@@ -15875,6 +15880,37 @@ xml_passing_mech:
 			| BY VALUE_P
 		;
 
+/*****************************************************************************
+ *
+ *		QUERY:
+ *				AFTER LSN_value [WITHIN delay timestamp]
+ *
+ *****************************************************************************/
+WaitStmt:
+			AFTER Sconst wait_time
+				{
+					WaitStmt *n = makeNode(WaitStmt);
+					n->lsn = $2;
+					n->delay = $3;
+					$$ = (Node *)n;
+				}
+		;
+wait_for:
+			AFTER Sconst wait_time
+				{
+					WaitStmt *n = makeNode(WaitStmt);
+					n->lsn = $2;
+					n->delay = $3;
+					$$ = (Node *)n;
+				}
+			| /* EMPTY */		{ $$ = NULL; }
+		;
+
+wait_time:
+			WITHIN Iconst		{ $$ = $2; }
+			| /* EMPTY */           { $$ = 0; }
+		;
+
 
 /*
  * Aggregate decoration clauses
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 521ed5418c..9be97737b5 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -25,6 +25,7 @@
 #include "access/xlogprefetcher.h"
 #include "access/xlogrecovery.h"
 #include "commands/async.h"
+#include "commands/wait.h"
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "postmaster/autovacuum.h"
@@ -152,6 +153,7 @@ CalculateShmemSize(int *num_semaphores)
 	size = add_size(size, WaitEventExtensionShmemSize());
 	size = add_size(size, InjectionPointShmemSize());
 	size = add_size(size, SlotSyncShmemSize());
+	size = add_size(size, WaitShmemSize());
 #ifdef EXEC_BACKEND
 	size = add_size(size, ShmemBackendArraySize());
 #endif
@@ -244,6 +246,11 @@ CreateSharedMemoryAndSemaphores(void)
 	/* Initialize subsystems */
 	CreateOrAttachShmemStructs();
 
+	/*
+	 * Init array of Latches in shared memory for wait lsn
+	 */
+	WaitShmemInit();
+
 #ifdef EXEC_BACKEND
 
 	/*
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 83f86a42f7..c7f01c0a17 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -56,6 +56,7 @@
 #include "commands/user.h"
 #include "commands/vacuum.h"
 #include "commands/view.h"
+#include "commands/wait.h"
 #include "miscadmin.h"
 #include "parser/parse_utilcmd.h"
 #include "postmaster/bgwriter.h"
@@ -265,6 +266,7 @@ ClassifyUtilityCommandAsReadOnly(Node *parsetree)
 		case T_LoadStmt:
 		case T_PrepareStmt:
 		case T_UnlistenStmt:
+		case T_WaitStmt:
 		case T_VariableSetStmt:
 			{
 				/*
@@ -605,6 +607,11 @@ standard_ProcessUtility(PlannedStmt *pstmt,
 					case TRANS_STMT_START:
 						{
 							ListCell   *lc;
+							WaitStmt   *waitstmt = (WaitStmt *) stmt->wait;
+
+							/* If needed to WAIT FOR something but failed */
+							if (stmt->wait && WaitMain(waitstmt, dest) == 0)
+								break;
 
 							BeginTransactionBlock();
 							foreach(lc, stmt->options)
@@ -1062,6 +1069,13 @@ standard_ProcessUtility(PlannedStmt *pstmt,
 				break;
 			}
 
+		case T_WaitStmt:
+			{
+				WaitStmt   *stmt = (WaitStmt *) parsetree;
+				WaitMain(stmt, dest);
+				break;
+			}
+
 		default:
 			/* All other statement types have event trigger support */
 			ProcessUtilitySlow(pstate, pstmt, queryString,
@@ -2840,6 +2854,10 @@ CreateCommandTag(Node *parsetree)
 			tag = CMDTAG_NOTIFY;
 			break;
 
+		case T_WaitStmt:
+			tag = CMDTAG_WAIT;
+			break;
+
 		case T_ListenStmt:
 			tag = CMDTAG_LISTEN;
 			break;
@@ -3488,6 +3506,10 @@ GetCommandLogLevel(Node *parsetree)
 			lev = LOGSTMT_ALL;
 			break;
 
+		case T_WaitStmt:
+			lev = LOGSTMT_ALL;
+			break;
+
 		case T_ListenStmt:
 			lev = LOGSTMT_ALL;
 			break;
diff --git a/src/include/commands/wait.h b/src/include/commands/wait.h
new file mode 100644
index 0000000000..ba022a5e84
--- /dev/null
+++ b/src/include/commands/wait.h
@@ -0,0 +1,26 @@
+/*-------------------------------------------------------------------------
+ *
+ * wait.h
+ *	  prototypes for commands/wait.c
+ *
+ * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
+ * Portions Copyright (c) 2024, Regents of PostgresPRO
+ *
+ * src/include/commands/wait.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef WAIT_H
+#define WAIT_H
+#include "postgres.h"
+#include "tcop/dest.h"
+
+extern int WaitUtility(XLogRecPtr lsn, const float8 delay);
+extern Size WaitShmemSize(void);
+extern void WaitShmemInit(void);
+extern void WaitSetLatch(XLogRecPtr cur_lsn);
+extern XLogRecPtr GetMinWait(void);
+extern float8 WaitTimeResolve(Const *time);
+extern int WaitMain(WaitStmt *stmt, DestReceiver *dest);
+
+#endif   /* WAIT_H */
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 24f5c06bb6..ed6ac96660 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3523,6 +3523,7 @@ typedef struct TransactionStmt
 	/* for two-phase-commit related commands */
 	char	   *gid pg_node_attr(query_jumble_ignore);
 	bool		chain;			/* AND CHAIN option */
+	Node		*wait;			/* wait lsn clause */
 	/* token location, or -1 if unknown */
 	int			location pg_node_attr(query_jumble_location);
 } TransactionStmt;
@@ -4074,4 +4075,16 @@ typedef struct DropSubscriptionStmt
 	DropBehavior behavior;		/* RESTRICT or CASCADE behavior */
 } DropSubscriptionStmt;
 
+/* ----------------------
+ *		AFTER Statement + AFTER clause of BEGIN statement
+ * ----------------------
+ */
+
+typedef struct WaitStmt
+{
+	NodeTag		type;
+	char	   *lsn;		/* LSN */
+	int			delay;		/* TIMEOUT */
+} WaitStmt;
+
 #endif							/* PARSENODES_H */
diff --git a/src/include/tcop/cmdtaglist.h b/src/include/tcop/cmdtaglist.h
index 7fdcec6dd9..567139963a 100644
--- a/src/include/tcop/cmdtaglist.h
+++ b/src/include/tcop/cmdtaglist.h
@@ -217,3 +217,4 @@ PG_CMDTAG(CMDTAG_TRUNCATE_TABLE, "TRUNCATE TABLE", false, false, false)
 PG_CMDTAG(CMDTAG_UNLISTEN, "UNLISTEN", false, false, false)
 PG_CMDTAG(CMDTAG_UPDATE, "UPDATE", false, false, true)
 PG_CMDTAG(CMDTAG_VACUUM, "VACUUM", false, false, false)
+PG_CMDTAG(CMDTAG_WAIT, "AFTER", false, false, false)
diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build
index c67249500e..8dd8dc3f80 100644
--- a/src/test/recovery/meson.build
+++ b/src/test/recovery/meson.build
@@ -50,6 +50,8 @@ tests += {
       't/039_end_of_wal.pl',
       't/040_standby_failover_slots_sync.pl',
       't/041_checkpoint_at_promote.pl',
+      't/042_begin_after.pl',
+      't/043_after.pl',
     ],
   },
 }
diff --git a/src/test/recovery/t/042_begin_after.pl b/src/test/recovery/t/042_begin_after.pl
new file mode 100644
index 0000000000..bf81cf14bc
--- /dev/null
+++ b/src/test/recovery/t/042_begin_after.pl
@@ -0,0 +1,87 @@
+# Checks waiting for lsn on standby AFTER
+use strict;
+use warnings;
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Initialize primary node
+my $node_primary = PostgreSQL::Test::Cluster->new('primary');
+$node_primary->init(allows_streaming => 1);
+$node_primary->start;
+
+# And some content and take a backup
+$node_primary->safe_psql('postgres',
+	"CREATE TABLE wait_test AS SELECT generate_series(1,10) AS a");
+my $backup_name = 'my_backup';
+$node_primary->backup($backup_name);
+
+# Create a streaming standby with a 1 second delay from the backup
+my $node_standby = PostgreSQL::Test::Cluster->new('standby');
+my $delay        = 1;
+$node_standby->init_from_backup($node_primary, $backup_name,
+	has_streaming => 1);
+$node_standby->append_conf('postgresql.conf', qq[
+	recovery_min_apply_delay = '${delay}s'
+]);
+$node_standby->start;
+
+
+
+# Make sure that AFTER works: add new content to primary and memorize
+# primary's new LSN, then wait for primary's LSN on standby. Prove that AFTER is
+# able to setup an infinite waiting loop and exit it if given no wait timeout.
+$node_primary->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(11, 20))");
+my $lsn1 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+$node_standby->safe_psql('postgres', "BEGIN AFTER '$lsn1'");
+$node_standby->safe_psql('postgres', "ROLLBACK");
+
+# Get the current LSN on standby and make sure it's the same as primary's LSN
+my $lsn_standby = $node_standby->safe_psql('postgres',
+	"SELECT pg_last_wal_replay_lsn()");
+my $compare_lsns = $node_standby->safe_psql('postgres',
+	"SELECT pg_lsn_ge('$lsn_standby'::pg_lsn, '$lsn1'::pg_lsn)");
+ok($compare_lsns eq "t", "standby reached the same LSN as primary AFTER");
+
+
+
+#===============================================================================
+# TODO: remove this test if we remove the standalone "AFTER" command
+#===============================================================================
+# We need to check that AFTER works fine inside transactions. For that, let's
+# get two LSNs that will correspond to two different max values in our table.
+$node_primary->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(31, 40))");
+my $lsn3 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+$node_primary->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(41, 50))");
+my $lsn4 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+
+# Before starting transaction, AFTER which ensures a max value of 40.
+# Inside the transaction, AFTER that ensures a max value of 50.
+# Due to ISOLATION LEVEL REPEATABLE READ, we should NOT see the new max value.
+my $standby_results = $node_standby->safe_psql(
+	'postgres', qq[
+	BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ AFTER '$lsn3';
+	SELECT max(a) FROM wait_test;
+	BEGIN AFTER '$lsn4';
+	SELECT pg_last_wal_replay_lsn();
+	SELECT max(a) FROM wait_test;
+	COMMIT;
+]);
+
+# Make sure that we indeed reach primary's last LSN inside the transaction.
+# For that, check that calling pg_last_wal_replay_lsn returned that LSN.
+my $last_lsn_reached = $standby_results =~ /$lsn4/;
+ok($last_lsn_reached, "AFTER works inside a transaction");
+
+# Check that transaction doesn't break and show us the new max value after AFTER.
+# For that, make sure that the older max value is repeated twice in the results.
+my $count = () = $standby_results =~ /40/g;
+ok($count eq 2, "transaction isolation level doesn't get broken due to AFTER");
+
+$node_standby->stop;
+$node_primary->stop;
+done_testing();
diff --git a/src/test/recovery/t/043_after.pl b/src/test/recovery/t/043_after.pl
new file mode 100644
index 0000000000..afe26d2122
--- /dev/null
+++ b/src/test/recovery/t/043_after.pl
@@ -0,0 +1,99 @@
+# Checks waiting for lsn on standby AFTER
+use strict;
+use warnings;
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Initialize primary node
+my $node_primary = PostgreSQL::Test::Cluster->new('primary');
+$node_primary->init(allows_streaming => 1);
+$node_primary->start;
+
+# And some content and take a backup
+$node_primary->safe_psql('postgres',
+	"CREATE TABLE wait_test AS SELECT generate_series(1,10) AS a");
+my $backup_name = 'my_backup';
+$node_primary->backup($backup_name);
+
+# Create a streaming standby with a 1 second delay from the backup
+my $node_standby = PostgreSQL::Test::Cluster->new('standby');
+my $delay        = 2;
+$node_standby->init_from_backup($node_primary, $backup_name,
+	has_streaming => 1);
+$node_standby->append_conf('postgresql.conf', qq[
+	recovery_min_apply_delay = '${delay}s'
+]);
+$node_standby->start;
+
+
+
+# Make sure that AFTER works: add new content to primary and memorize
+# primary's new LSN, then wait for primary's LSN on standby. Prove that AFTER is
+# able to setup an infinite waiting loop and exit it if given no wait timeout.
+$node_primary->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(11, 20))");
+my $lsn1 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+$node_standby->safe_psql('postgres', "AFTER '$lsn1'");
+
+# Get the current LSN on standby and make sure it's the same as primary's LSN
+my $lsn_standby = $node_standby->safe_psql('postgres',
+	"SELECT pg_last_wal_replay_lsn()");
+my $compare_lsns = $node_standby->safe_psql('postgres',
+	"SELECT pg_lsn_ge('$lsn_standby'::pg_lsn, '$lsn1'::pg_lsn)");
+ok($compare_lsns eq "t", "standby reached the same LSN as primary AFTER");
+
+
+
+# Check that timeouts work on their own and let us wait for specified time (1s)
+my $current_time = $node_standby->safe_psql('postgres', "SELECT now()");
+my $one_second = 1000; # in milliseconds
+my $start_time = time();
+
+# Now, check that timeouts work as expected when waiting for LSN
+$node_primary->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(21, 30))");
+my $lsn2 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+my $reached_lsn = $node_standby->safe_psql('postgres',
+	"AFTER '$lsn2' WITHIN 1");
+ok($reached_lsn eq "f", "AFTER doesn't reach LSN if given too little wait time");
+
+
+
+# We need to check that WAIT works fine inside transactions. For that, let's
+# get two LSNs that will correspond to two different max values in our table.
+$node_primary->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(31, 40))");
+my $lsn3 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+$node_primary->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(41, 50))");
+my $lsn4 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+
+# Before starting transaction, AFTER which ensures a max value of 40.
+# Inside the transaction, AFTER that ensures a max value of 50.
+# Due to ISOLATION LEVEL REPEATABLE READ, we should NOT see the new max value.
+my $standby_results = $node_standby->safe_psql(
+	'postgres', qq[
+	AFTER '$lsn3';
+	BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;
+	SELECT max(a) FROM wait_test;
+	AFTER '$lsn4';
+	SELECT pg_last_wal_replay_lsn();
+	SELECT max(a) FROM wait_test;
+	COMMIT;
+]);
+
+# Make sure that we indeed reach primary's last LSN inside the transaction.
+# For that, check that calling pg_last_wal_replay_lsn returned that LSN.
+my $last_lsn_reached = $standby_results =~ /$lsn4/;
+ok($last_lsn_reached, "AFTER works inside a transaction");
+
+# Check that transaction doesn't break and show us the new max value after AFTER.
+# For that, make sure that the older max value is repeated twice in the results.
+my $count = () = $standby_results =~ /40/g;
+ok($count eq 2, "transaction isolation level doesn't get broken due to AFTER");
+
+$node_standby->stop;
+$node_primary->stop;
+done_testing();

Reply via email to