On 2020-04-03 21:51, Anna Akenteva wrote:
I did some code cleanup and added tests - both for the standalone WAIT
FOR statement and for WAIT FOR as a part of BEGIN. The new patch is
attached.

I did more cleanup and code optimization on waiting events on latch.
And rebase patch.

--
Ivan Kartyshov
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company
diff --git a/doc/src/sgml/ref/allfiles.sgml b/doc/src/sgml/ref/allfiles.sgml
index ab71176cdf..6b4e45bc68 100644
--- a/doc/src/sgml/ref/allfiles.sgml
+++ b/doc/src/sgml/ref/allfiles.sgml
@@ -187,6 +187,7 @@ Complete list of usable sgml source files in this directory.
 <!ENTITY update             SYSTEM "update.sgml">
 <!ENTITY vacuum             SYSTEM "vacuum.sgml">
 <!ENTITY values             SYSTEM "values.sgml">
+<!ENTITY wait               SYSTEM "wait.sgml">
 
 <!-- applications and utilities -->
 <!ENTITY clusterdb          SYSTEM "clusterdb.sgml">
diff --git a/doc/src/sgml/ref/begin.sgml b/doc/src/sgml/ref/begin.sgml
index c23bbfb4e7..cfee3c8f10 100644
--- a/doc/src/sgml/ref/begin.sgml
+++ b/doc/src/sgml/ref/begin.sgml
@@ -21,13 +21,21 @@ PostgreSQL documentation
 
  <refsynopsisdiv>
 <synopsis>
-BEGIN [ WORK | TRANSACTION ] [ <replaceable class="parameter">transaction_mode</replaceable> [, ...] ]
+BEGIN [ WORK | TRANSACTION ] [ <replaceable class="parameter">transaction_mode</replaceable> [, ...] ] <replaceable class="parameter">wait_for_event</replaceable>
 
 <phrase>where <replaceable class="parameter">transaction_mode</replaceable> is one of:</phrase>
 
     ISOLATION LEVEL { SERIALIZABLE | REPEATABLE READ | READ COMMITTED | READ UNCOMMITTED }
     READ WRITE | READ ONLY
     [ NOT ] DEFERRABLE
+
+<phrase>where <replaceable class="parameter">wait_for_event</replaceable> is:</phrase>
+    WAIT FOR [ANY | ALL] <replaceable class="parameter">event</replaceable> [, ...]
+
+<phrase>and <replaceable class="parameter">event</replaceable> is one of:</phrase>
+    LSN lsn_value
+    TIMEOUT number_of_milliseconds
+    timestamp
 </synopsis>
  </refsynopsisdiv>
 
diff --git a/doc/src/sgml/ref/start_transaction.sgml b/doc/src/sgml/ref/start_transaction.sgml
index d6cd1d4177..0a2ea7e80b 100644
--- a/doc/src/sgml/ref/start_transaction.sgml
+++ b/doc/src/sgml/ref/start_transaction.sgml
@@ -21,13 +21,21 @@ PostgreSQL documentation
 
  <refsynopsisdiv>
 <synopsis>
-START TRANSACTION [ <replaceable class="parameter">transaction_mode</replaceable> [, ...] ]
+START TRANSACTION [ <replaceable class="parameter">transaction_mode</replaceable> [, ...] ] <replaceable class="parameter">wait_for_event</replaceable>
 
 <phrase>where <replaceable class="parameter">transaction_mode</replaceable> is one of:</phrase>
 
     ISOLATION LEVEL { SERIALIZABLE | REPEATABLE READ | READ COMMITTED | READ UNCOMMITTED }
     READ WRITE | READ ONLY
     [ NOT ] DEFERRABLE
+
+<phrase>where <replaceable class="parameter">wait_for_event</replaceable> is:</phrase>
+    WAIT FOR [ANY | ALL] <replaceable class="parameter">event</replaceable> [, ...]
+
+<phrase>and <replaceable class="parameter">event</replaceable> is one of:</phrase>
+    LSN lsn_value
+    TIMEOUT number_of_milliseconds
+    timestamp
 </synopsis>
  </refsynopsisdiv>
 
diff --git a/doc/src/sgml/ref/wait.sgml b/doc/src/sgml/ref/wait.sgml
new file mode 100644
index 0000000000..26cae3ad85
--- /dev/null
+++ b/doc/src/sgml/ref/wait.sgml
@@ -0,0 +1,146 @@
+<!--
+doc/src/sgml/ref/wait.sgml
+PostgreSQL documentation
+-->
+
+<refentry id="sql-wait">
+ <indexterm zone="sql-wait">
+  <primary>WAIT FOR</primary>
+ </indexterm>
+
+ <refmeta>
+  <refentrytitle>WAIT FOR</refentrytitle>
+  <manvolnum>7</manvolnum>
+  <refmiscinfo>SQL - Language Statements</refmiscinfo>
+ </refmeta>
+
+ <refnamediv>
+  <refname>WAIT FOR</refname>
+  <refpurpose>wait for the target <acronym>LSN</acronym> to be replayed or for specified time to pass</refpurpose>
+ </refnamediv>
+
+ <refsynopsisdiv>
+<synopsis>
+WAIT FOR [ANY | ALL] <replaceable class="parameter">event</replaceable> [, ...]
+
+<phrase>where <replaceable class="parameter">event</replaceable> is one of:</phrase>
+    LSN value
+    TIMEOUT number_of_milliseconds
+    timestamp
+
+WAIT FOR LSN '<replaceable class="parameter">lsn_number</replaceable>'
+WAIT FOR LSN '<replaceable class="parameter">lsn_number</replaceable>' TIMEOUT <replaceable class="parameter">wait_timeout</replaceable>
+WAIT FOR LSN '<replaceable class="parameter">lsn_number</replaceable>', TIMESTAMP <replaceable class="parameter">wait_time</replaceable>
+WAIT FOR TIMESTAMP <replaceable class="parameter">wait_time</replaceable>
+WAIT FOR ALL LSN '<replaceable class="parameter">lsn_number</replaceable>' TIMEOUT <replaceable class="parameter">wait_timeout</replaceable>, TIMESTAMP <replaceable class="parameter">wait_time</replaceable>
+WAIT FOR ANY LSN '<replaceable class="parameter">lsn_number</replaceable>', TIMESTAMP <replaceable class="parameter">wait_time</replaceable>
+</synopsis>
+ </refsynopsisdiv>
+
+ <refsect1>
+  <title>Description</title>
+
+  <para>
+   <command>WAIT FOR</command> provides a simple interprocess communication
+   mechanism to wait for timestamp, timeout or the target log sequence number
+   (<acronym>LSN</acronym>) on standby in <productname>PostgreSQL</productname>
+   databases with master-standby asynchronous replication. When run with the
+   <replaceable>LSN</replaceable> option, the <command>WAIT FOR</command>
+   command waits for the specified <acronym>LSN</acronym> to be replayed.
+   If no timestamp or timeout was specified, wait time is unlimited.
+   Waiting can be interrupted using <literal>Ctrl+C</literal>, or
+   by shutting down the <literal>postgres</literal> server.
+  </para>
+
+ </refsect1>
+
+ <refsect1>
+  <title>Parameters</title>
+
+  <variablelist>
+   <varlistentry>
+    <term><replaceable class="parameter">LSN</replaceable></term>
+    <listitem>
+     <para>
+      Specify the target log sequence number to wait for.
+     </para>
+    </listitem>
+   </varlistentry>
+
+   <varlistentry>
+    <term>TIMEOUT <replaceable class="parameter">wait_timeout</replaceable></term>
+    <listitem>
+     <para>
+      Limit the time interval to wait for the LSN to be replayed.
+      The specified <replaceable>wait_timeout</replaceable> must be an integer
+      and is measured in milliseconds.
+     </para>
+    </listitem>
+   </varlistentry>
+
+   <varlistentry>
+    <term>UNTIL TIMESTAMP <replaceable class="parameter">wait_time</replaceable></term>
+    <listitem>
+     <para>
+      Limit the time to wait for the LSN to be replayed.
+      The specified <replaceable>wait_time</replaceable> must be timestamp.
+     </para>
+    </listitem>
+   </varlistentry>
+
+  </variablelist>
+ </refsect1>
+
+ <refsect1>
+  <title>Examples</title>
+
+  <para>
+   Run <literal>WAIT FOR</literal> from <application>psql</application>,
+   limiting wait time to 10000 milliseconds:
+
+<screen>
+WAIT FOR LSN '0/3F07A6B1' TIMEOUT 10000;
+NOTICE:  LSN is not reached. Try to increase wait time.
+LSN reached
+-------------
+ f
+(1 row)
+</screen>
+  </para>
+
+  <para>
+   Wait until the specified <acronym>LSN</acronym> is replayed:
+<screen>
+WAIT FOR LSN '0/3F07A611';
+LSN reached
+-------------
+ t
+(1 row)
+</screen>
+  </para>
+
+  <para>
+   Limit <acronym>LSN</acronym> wait time to 500000 milliseconds,
+   and then cancel the command if <acronym>LSN</acronym> was not reached:
+<screen>
+WAIT FOR LSN '0/3F0FF791' TIMEOUT 500000;
+^CCancel request sent
+NOTICE:  LSN is not reached. Try to increase wait time.
+ERROR:  canceling statement due to user request
+ LSN reached
+-------------
+ f
+(1 row)
+</screen>
+</para>
+ </refsect1>
+
+ <refsect1>
+  <title>Compatibility</title>
+
+  <para>
+   There is no <command>WAIT FOR</command> statement in the SQL
+   standard.
+  </para>
+ </refsect1>
+</refentry>
diff --git a/doc/src/sgml/reference.sgml b/doc/src/sgml/reference.sgml
index d25a77b13c..dbd40f66ed 100644
--- a/doc/src/sgml/reference.sgml
+++ b/doc/src/sgml/reference.sgml
@@ -215,6 +215,7 @@
    &update;
    &vacuum;
    &values;
+   &wait;
 
  </reference>
 
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index abf954ba39..d2856c8894 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -42,6 +42,7 @@
 #include "catalog/pg_database.h"
 #include "commands/progress.h"
 #include "commands/tablespace.h"
+#include "commands/wait.h"
 #include "common/controldata_utils.h"
 #include "executor/instrument.h"
 #include "miscadmin.h"
@@ -7332,6 +7333,15 @@ StartupXLOG(void)
 					break;
 				}
 
+				/*
+				 * If we replayed an LSN that someone was waiting for,
+				 * set latches in shared memory array to notify the waiter.
+				 */
+				if (XLogCtl->lastReplayedEndRecPtr >= GetMinWait())
+				{
+					WaitSetLatch(XLogCtl->lastReplayedEndRecPtr);
+				}
+
 				/* Else, try to fetch the next WAL record */
 				record = ReadRecord(xlogreader, LOG, false);
 			} while (record != NULL);
diff --git a/src/backend/commands/Makefile b/src/backend/commands/Makefile
index d4815d3ce6..9b310926c1 100644
--- a/src/backend/commands/Makefile
+++ b/src/backend/commands/Makefile
@@ -57,6 +57,7 @@ OBJS = \
 	user.o \
 	vacuum.o \
 	variable.o \
-	view.o
+	view.o \
+	wait.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/commands/wait.c b/src/backend/commands/wait.c
new file mode 100644
index 0000000000..ac9905e632
--- /dev/null
+++ b/src/backend/commands/wait.c
@@ -0,0 +1,394 @@
+/*-------------------------------------------------------------------------
+ *
+ * wait.c
+ *	  Implements WAIT FOR, which allows waiting for events such as
+ *	  time passing or LSN having been replayed on replica.
+ *
+ * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
+ * Portions Copyright (c) 2020, Regents of PostgresPro
+ *
+ * IDENTIFICATION
+ *	  src/backend/commands/wait.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include <float.h>
+#include <math.h>
+#include "postgres.h"
+#include "pgstat.h"
+#include "fmgr.h"
+#include "access/transam.h"
+#include "access/xact.h"
+#include "access/xlogdefs.h"
+#include "access/xlog.h"
+#include "catalog/pg_type.h"
+#include "commands/wait.h"
+#include "funcapi.h"
+#include "miscadmin.h"
+#include "storage/backendid.h"
+#include "storage/ipc.h"
+#include "storage/latch.h"
+#include "storage/pmsignal.h"
+#include "storage/proc.h"
+#include "storage/shmem.h"
+#include "storage/spin.h"
+#include "storage/sinvaladt.h"
+#include "utils/builtins.h"
+#include "utils/pg_lsn.h"
+#include "utils/timestamp.h"
+#include "executor/spi.h"
+#include "utils/fmgrprotos.h"
+
+/* Add to / delete from shared memory array */
+static void AddEvent(XLogRecPtr lsn_to_wait);
+static void DeleteEvent(void);
+
+/* Shared memory structure */
+typedef struct
+{
+	int			backend_maxid;
+	XLogRecPtr	min_lsn;
+	slock_t		mutex;
+	XLogRecPtr	waited_lsn[FLEXIBLE_ARRAY_MEMBER];
+} WaitState;
+
+static volatile WaitState *state;
+
+/* Add the event of the current backend to the shared memory array */
+static void
+AddEvent(XLogRecPtr lsn_to_wait)
+{
+	SpinLockAcquire(&state->mutex);
+	if (state->backend_maxid < MyBackendId)
+		state->backend_maxid = MyBackendId;
+
+	state->waited_lsn[MyBackendId] = lsn_to_wait;
+
+	if (lsn_to_wait < state->min_lsn)
+		state->min_lsn = lsn_to_wait;
+	SpinLockRelease(&state->mutex);
+}
+
+/*
+ * Delete event of the current backend from the shared memory array.
+ *
+ * TODO: Consider state cleanup on backend failure.
+ * Check:
+ * 1) nomal|smart|fast|immediate stop
+ * 2) SIGKILL and SIGTERM
+ */
+static void
+DeleteEvent(void)
+{
+	int			i;
+	XLogRecPtr	lsn_to_delete = state->waited_lsn[MyBackendId];
+
+	state->waited_lsn[MyBackendId] = InvalidXLogRecPtr;
+
+	SpinLockAcquire(&state->mutex);
+
+	/* If we need to choose the next min_lsn, update state->min_lsn */
+	if (state->min_lsn == lsn_to_delete)
+	{
+		state->min_lsn = PG_UINT64_MAX;
+		for (i = 2; i <= state->backend_maxid; i++)
+			if (state->waited_lsn[i] != InvalidXLogRecPtr &&
+				state->waited_lsn[i] < state->min_lsn)
+				state->min_lsn = state->waited_lsn[i];
+	}
+
+	if (state->backend_maxid == MyBackendId)
+		for (i = (MyBackendId); i >= 2; i--)
+			if (state->waited_lsn[i] != InvalidXLogRecPtr)
+			{
+				state->backend_maxid = i;
+				break;
+			}
+
+	SpinLockRelease(&state->mutex);
+}
+
+/*
+ * Report amount of shared memory space needed for WaitState
+ */
+Size
+WaitShmemSize(void)
+{
+	Size		size;
+
+	size = offsetof(WaitState, waited_lsn);
+	size = add_size(size, mul_size(MaxBackends + 1, sizeof(XLogRecPtr)));
+	return size;
+}
+
+/* Init array of events in shared memory */
+void
+WaitShmemInit(void)
+{
+	bool		found;
+	uint32		i;
+
+	state = (WaitState *) ShmemInitStruct("pg_wait_lsn",
+										  WaitShmemSize(),
+										  &found);
+	if (!found)
+	{
+		SpinLockInit(&state->mutex);
+
+		for (i = 0; i < (MaxBackends + 1); i++)
+			state->waited_lsn[i] = InvalidXLogRecPtr;
+
+		state->backend_maxid = 0;
+		state->min_lsn = PG_UINT64_MAX;
+	}
+}
+
+/* Set all latches in shared memory to signal that new LSN has been replayed */
+void
+WaitSetLatch(XLogRecPtr cur_lsn)
+{
+	uint32		i;
+	int 		backend_maxid;
+	PGPROC	   *backend;
+
+	SpinLockAcquire(&state->mutex);
+	backend_maxid = state->backend_maxid;
+	SpinLockRelease(&state->mutex);
+
+	for (i = 2; i <= backend_maxid; i++)
+	{
+		backend = BackendIdGetProc(i);
+		if (state->waited_lsn[i] != 0)
+		{
+			if (backend && state->waited_lsn[i] <= cur_lsn)
+				SetLatch(&backend->procLatch);
+		}
+	}
+}
+
+/* Get minimal LSN that will be next */
+XLogRecPtr
+GetMinWait(void)
+{
+	return state->min_lsn;
+}
+
+/*
+ * On WAIT use MyLatch to wait till LSN is replayed,
+ * postmaster dies or timeout happens.
+ */
+int
+WaitUtility(XLogRecPtr lsn, const float8 secs)
+{
+	XLogRecPtr	cur_lsn = GetXLogReplayRecPtr(NULL);
+	int			latch_events;
+	float8		endtime;
+	uint		res = 0;
+
+#define GetNowFloat()	((float8) GetCurrentTimestamp() / 1000000.0)
+	endtime = GetNowFloat() + secs;
+
+latch_events = WL_TIMEOUT | WL_EXIT_ON_PM_DEATH;
+
+	if (lsn != InvalidXLogRecPtr)
+	{
+		/* Just check if we reached */
+		if (lsn < cur_lsn || secs < 0)
+			return (lsn < cur_lsn);
+
+		latch_events |= WL_LATCH_SET;
+		AddEvent(lsn);
+	}
+	else if (!secs)
+		return 1;
+
+	for (;;)
+	{
+		int			rc;
+		float8		delay = 0;
+		long		delay_ms;
+
+		if (secs > 0)
+			delay = endtime - GetNowFloat();
+		else if (secs == 0)
+			/*
+			* If we wait forever, then 1 minute timeout to check
+			* for Interupts.
+			*/
+			delay = 60;
+
+		if (delay > 0.0)
+			delay_ms = (long) ceil(delay * 1000.0);
+		else
+			break;
+
+		/*
+		 * If received an interruption from CHECK_FOR_INTERRUPTS,
+		 * then delete the current event from array.
+		 */
+		if (InterruptPending)
+		{
+			if (lsn != InvalidXLogRecPtr)
+				DeleteEvent();
+			ProcessInterrupts();
+		}
+
+		/* If postmaster dies, finish immediately */
+		if (!PostmasterIsAlive())
+			break;
+
+		rc = WaitLatch(MyLatch, latch_events, delay_ms,
+					   WAIT_EVENT_CLIENT_READ);
+
+		if (rc & WL_LATCH_SET)
+			ResetLatch(MyLatch);
+
+		if (lsn && rc & WL_LATCH_SET)
+			cur_lsn = GetXLogReplayRecPtr(NULL);
+
+		/* If LSN has been replayed */
+		if (lsn && lsn <= cur_lsn)
+			break;
+	}
+
+	if (lsn != InvalidXLogRecPtr)
+		DeleteEvent();
+
+	if (lsn != InvalidXLogRecPtr && lsn > cur_lsn)
+		elog(NOTICE,"LSN is not reached. Try to increase wait time.");
+	else
+		res = 1;
+
+	return res;
+}
+
+/*
+ * Get the amount of seconds left till the specified time.
+ */
+float8
+WaitTimeResolve(Const *time)
+{
+	int			ret;
+	float8		val;
+	Oid			types[] = { time->consttype };
+	Datum		values[] = { time->constvalue };
+	char		nulls[] = { " " };
+	Datum		result;
+	bool		isnull;
+
+	SPI_connect();
+
+	if (time->consttype == 1083)
+		ret = SPI_execute_with_args("select extract (epoch from ($1 - now()::time))",
+									1, types, values, nulls, true, 0);
+	else if (time->consttype == 1266)
+		ret = SPI_execute_with_args("select extract (epoch from (timezone('UTC',$1)::time - timezone('UTC', now()::timetz)::time))",
+									1, types, values, nulls, true, 0);
+	else
+		ret = SPI_execute_with_args("select extract (epoch from ($1 - now()))",
+									1, types, values, nulls, true, 0);
+
+	Assert(ret >= 0);
+	result = SPI_getbinval(SPI_tuptable->vals[0],
+						   SPI_tuptable->tupdesc,
+						   1, &isnull);
+
+	Assert(!isnull);
+	val = DatumGetFloat8(result);
+
+	elog(INFO, "time: %f", val);
+
+	SPI_finish();
+	return val;
+}
+
+/* Implementation of WAIT FOR */
+int
+WaitMain(WaitStmt *stmt, DestReceiver *dest)
+{
+	ListCell   *events;
+	TupleDesc	tupdesc;
+	TupOutputState *tstate;
+	float8		delay = 0;
+	float8		final_delay = 0;
+	XLogRecPtr	lsn = InvalidXLogRecPtr;
+	XLogRecPtr	final_lsn = InvalidXLogRecPtr;
+	bool		has_lsn = false;
+	bool		wait_forever = true;
+	int			res = 0;
+
+	if (stmt->strategy == WAIT_FOR_ANY)
+	{
+		/* Prepare to find minimum LSN and delay */
+		final_delay = DBL_MAX;
+		final_lsn = PG_UINT64_MAX;
+	}
+
+	/* Extract options from the statement node tree */
+	foreach(events, stmt->events)
+	{
+		WaitStmt   *event = (WaitStmt *) lfirst(events);
+
+		/* LSN to wait for */
+		if (event->lsn)
+		{
+			has_lsn = true;
+			lsn = DatumGetLSN(
+						DirectFunctionCall1(pg_lsn_in,
+							CStringGetDatum(event->lsn)));
+
+			/*
+			 * When waiting for ALL, select max LSN to wait for.
+			 * When waiting for ANY, select min LSN to wait for.
+			 */
+			if ((stmt->strategy == WAIT_FOR_ALL && final_lsn <= lsn) ||
+				(stmt->strategy == WAIT_FOR_ANY && final_lsn > lsn))
+			{
+				final_lsn = lsn;
+			}
+		}
+
+		/* Time delay to wait for */
+		if (event->time || event->delay)
+		{
+			wait_forever = false;
+
+			if (event->delay)
+				delay = event->delay / 1000.0;
+
+			if (event->time)
+			{
+				Const *time = (Const *) event->time;
+				delay = WaitTimeResolve(time);
+			}
+
+			/*
+			 * When waiting for ALL, select max delay to wait for.
+			 * When waiting for ANY, select min delay to wait for.
+			 */
+			if ((stmt->strategy == WAIT_FOR_ALL && final_delay <= delay) ||
+				(stmt->strategy == WAIT_FOR_ANY && final_delay > delay))
+			{
+				final_delay = delay;
+			}
+		}
+	}
+	if (wait_forever)
+		final_delay = 0;
+	if (!has_lsn)
+		final_lsn = InvalidXLogRecPtr;
+
+	res = WaitUtility(final_lsn, final_delay);
+
+	/* Need a tuple descriptor representing a single TEXT column */
+	tupdesc = CreateTemplateTupleDesc(1);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 1, "LSN reached", TEXTOID, -1, 0);
+	/* Prepare for projection of tuples */
+	tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsMinimalTuple);
+
+	/* Send it */
+	do_text_output_oneline(tstate, res?"t":"f");
+	end_tup_output(tstate);
+	return res;
+}
diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c
index f4aecdcbcd..4c2c2735e2 100644
--- a/src/backend/nodes/outfuncs.c
+++ b/src/backend/nodes/outfuncs.c
@@ -2762,6 +2762,18 @@ _outDefElem(StringInfo str, const DefElem *node)
 	WRITE_LOCATION_FIELD(location);
 }
 
+static void
+_outWaitStmt(StringInfo str, const WaitStmt *node)
+{
+	WRITE_NODE_TYPE("WAITSTMT");
+
+	WRITE_STRING_FIELD(lsn);
+	WRITE_INT_FIELD(delay);
+	WRITE_NODE_FIELD(events);
+	WRITE_NODE_FIELD(time);
+	WRITE_ENUM_FIELD(strategy, WaitForStrategy);
+}
+
 static void
 _outTableLikeClause(StringInfo str, const TableLikeClause *node)
 {
@@ -4308,6 +4320,9 @@ outNode(StringInfo str, const void *obj)
 			case T_PartitionRangeDatum:
 				_outPartitionRangeDatum(str, obj);
 				break;
+			case T_WaitStmt:
+				_outWaitStmt(str, obj);
+				break;
 
 			default:
 
diff --git a/src/backend/parser/analyze.c b/src/backend/parser/analyze.c
index 6676412842..08e2649b9d 100644
--- a/src/backend/parser/analyze.c
+++ b/src/backend/parser/analyze.c
@@ -78,6 +78,7 @@ static Query *transformCreateTableAsStmt(ParseState *pstate,
 										 CreateTableAsStmt *stmt);
 static Query *transformCallStmt(ParseState *pstate,
 								CallStmt *stmt);
+static void transformWaitForStmt(ParseState *pstate, WaitStmt *stmt);
 static void transformLockingClause(ParseState *pstate, Query *qry,
 								   LockingClause *lc, bool pushedDown);
 #ifdef RAW_EXPRESSION_COVERAGE_TEST
@@ -326,7 +327,20 @@ transformStmt(ParseState *pstate, Node *parseTree)
 			result = transformCallStmt(pstate,
 									   (CallStmt *) parseTree);
 			break;
-
+		case T_WaitStmt:
+			transformWaitForStmt(pstate, (WaitStmt *) parseTree);
+			result = makeNode(Query);
+			result->commandType = CMD_UTILITY;
+			result->utilityStmt = (Node *) parseTree;
+			break;
+		case T_TransactionStmt:
+			{
+				TransactionStmt *stmt = (TransactionStmt *) parseTree;
+				if ((stmt->kind == TRANS_STMT_BEGIN ||
+						stmt->kind == TRANS_STMT_START) && stmt->wait)
+					transformWaitForStmt(pstate, (WaitStmt *) stmt->wait);
+			}
+			/* no break here - we want to fall through to the default */
 		default:
 
 			/*
@@ -2981,6 +2995,23 @@ applyLockingClause(Query *qry, Index rtindex,
 	qry->rowMarks = lappend(qry->rowMarks, rc);
 }
 
+/*
+ * transformWaitForStmt -
+ *	transform the WAIT FOR clause of the BEGIN statement
+ *	transform the WAIT FOR statement (TODO: remove this line if we don't keep it)
+ */
+static void
+transformWaitForStmt(ParseState *pstate, WaitStmt *stmt)
+{
+	ListCell   *events;
+
+	foreach(events, stmt->events)
+	{
+		WaitStmt   *event = (WaitStmt *) lfirst(events);
+		event->time = transformExpr(pstate, event->time, EXPR_KIND_OTHER);
+	}
+}
+
 /*
  * Coverage testing for raw_expression_tree_walker().
  *
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 3449c26bd1..917bda51ac 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -276,7 +276,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 		SecLabelStmt SelectStmt TransactionStmt TruncateStmt
 		UnlistenStmt UpdateStmt VacuumStmt
 		VariableResetStmt VariableSetStmt VariableShowStmt
-		ViewStmt CheckPointStmt CreateConversionStmt
+		ViewStmt WaitStmt CheckPointStmt CreateConversionStmt
 		DeallocateStmt PrepareStmt ExecuteStmt
 		DropOwnedStmt ReassignOwnedStmt
 		AlterTSConfigurationStmt AlterTSDictionaryStmt
@@ -488,7 +488,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 %type <list>	row explicit_row implicit_row type_list array_expr_list
 %type <node>	case_expr case_arg when_clause case_default
 %type <list>	when_clause_list
-%type <ival>	sub_type opt_materialized
+%type <ival>	sub_type wait_strategy opt_materialized
 %type <value>	NumericOnly
 %type <list>	NumericOnly_list
 %type <alias>	alias_clause opt_alias_clause
@@ -592,6 +592,8 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 %type <partboundspec> PartitionBoundSpec
 %type <list>		hash_partbound
 %type <defelt>		hash_partbound_elem
+%type <list>		wait_list
+%type <node>		WaitEvent wait_for
 
 /*
  * Non-keyword token types.  These are hard-wired into the "flex" lexer.
@@ -661,7 +663,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 
 	LABEL LANGUAGE LARGE_P LAST_P LATERAL_P
 	LEADING LEAKPROOF LEAST LEFT LEVEL LIKE LIMIT LISTEN LOAD LOCAL
-	LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOCKED LOGGED
+	LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOCKED LOGGED LSN
 
 	MAPPING MATCH MATERIALIZED MAXVALUE METHOD MINUTE_P MINVALUE MODE MONTH_P MOVE
 
@@ -692,7 +694,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 	SUBSCRIPTION SUBSTRING SUPPORT SYMMETRIC SYSID SYSTEM_P
 
 	TABLE TABLES TABLESAMPLE TABLESPACE TEMP TEMPLATE TEMPORARY TEXT_P THEN
-	TIES TIME TIMESTAMP TO TRAILING TRANSACTION TRANSFORM
+	TIES TIME TIMEOUT TIMESTAMP TO TRAILING TRANSACTION TRANSFORM
 	TREAT TRIGGER TRIM TRUE_P
 	TRUNCATE TRUSTED TYPE_P TYPES_P
 
@@ -702,7 +704,8 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 	VACUUM VALID VALIDATE VALIDATOR VALUE_P VALUES VARCHAR VARIADIC VARYING
 	VERBOSE VERSION_P VIEW VIEWS VOLATILE
 
-	WHEN WHERE WHITESPACE_P WINDOW WITH WITHIN WITHOUT WORK WRAPPER WRITE
+	WAIT WHEN WHERE WHITESPACE_P WINDOW
+	WITH WITHIN WITHOUT WORK WRAPPER WRITE
 
 	XML_P XMLATTRIBUTES XMLCONCAT XMLELEMENT XMLEXISTS XMLFOREST XMLNAMESPACES
 	XMLPARSE XMLPI XMLROOT XMLSERIALIZE XMLTABLE
@@ -956,6 +959,7 @@ stmt :
 			| VariableSetStmt
 			| VariableShowStmt
 			| ViewStmt
+			| WaitStmt
 			| /*EMPTY*/
 				{ $$ = NULL; }
 		;
@@ -9946,18 +9950,20 @@ TransactionStmt:
 					n->chain = $3;
 					$$ = (Node *)n;
 				}
-			| BEGIN_P opt_transaction transaction_mode_list_or_empty
+			| BEGIN_P opt_transaction transaction_mode_list_or_empty wait_for
 				{
 					TransactionStmt *n = makeNode(TransactionStmt);
 					n->kind = TRANS_STMT_BEGIN;
 					n->options = $3;
+					n->wait = $4;
 					$$ = (Node *)n;
 				}
-			| START TRANSACTION transaction_mode_list_or_empty
+			| START TRANSACTION transaction_mode_list_or_empty wait_for
 				{
 					TransactionStmt *n = makeNode(TransactionStmt);
 					n->kind = TRANS_STMT_START;
 					n->options = $3;
+					n->wait = $4;
 					$$ = (Node *)n;
 				}
 			| COMMIT opt_transaction opt_transaction_chain
@@ -14187,6 +14193,74 @@ xml_passing_mech:
 			| BY VALUE_P
 		;
 
+/*****************************************************************************
+ *
+ *		QUERY:
+ *				WAIT FOR <event> [, <event> ...]
+ *				event is one of:
+ *					LSN value
+ *					TIMEOUT delay
+ *					timestamp
+ *
+ *****************************************************************************/
+WaitStmt:
+			WAIT FOR wait_strategy wait_list
+				{
+					WaitStmt *n = makeNode(WaitStmt);
+					n->strategy = $3;
+					n->events = $4;
+					$$ = (Node *)n;
+				}
+			;
+wait_for:
+			WAIT FOR wait_strategy wait_list
+				{
+					WaitStmt *n = makeNode(WaitStmt);
+					n->strategy = $3;
+					n->events = $4;
+					$$ = (Node *)n;
+				}
+			| /* EMPTY */		{ $$ = NULL; };
+
+wait_strategy:
+			ALL					{ $$ = WAIT_FOR_ALL; }
+			| ANY				{ $$ = WAIT_FOR_ANY; }
+			| /* EMPTY */		{ $$ = WAIT_FOR_ALL; }
+		;
+
+wait_list:
+			WaitEvent					{ $$ = list_make1($1); }
+			| wait_list ',' WaitEvent	{ $$ = lappend($1, $3); }
+			| wait_list WaitEvent		{ $$ = lappend($1, $2); }
+		;
+
+WaitEvent:
+			LSN Sconst
+				{
+					WaitStmt *n = makeNode(WaitStmt);
+					n->lsn = $2;
+					n->delay = 0;
+					n->time = NULL;
+					$$ = (Node *)n;
+				}
+			| TIMEOUT Iconst
+				{
+					WaitStmt *n = makeNode(WaitStmt);
+					n->lsn = NULL;
+					n->delay = $2;
+					n->time = NULL;
+					$$ = (Node *)n;
+				}
+			| ConstDatetime Sconst
+				{
+					WaitStmt *n = makeNode(WaitStmt);
+					n->lsn = NULL;
+					n->delay = 0;
+					n->time = makeStringConstCast($2, @2, $1);
+					$$ = (Node *)n;
+				}
+			;
+
 
 /*
  * Aggregate decoration clauses
@@ -15338,6 +15412,7 @@ unreserved_keyword:
 			| LOCK_P
 			| LOCKED
 			| LOGGED
+			| LSN
 			| MAPPING
 			| MATCH
 			| MATERIALIZED
@@ -15465,6 +15540,7 @@ unreserved_keyword:
 			| TEMPORARY
 			| TEXT_P
 			| TIES
+			| TIMEOUT
 			| TRANSACTION
 			| TRANSFORM
 			| TRIGGER
@@ -15491,6 +15567,7 @@ unreserved_keyword:
 			| VIEW
 			| VIEWS
 			| VOLATILE
+			| WAIT
 			| WHITESPACE_P
 			| WITHIN
 			| WITHOUT
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 427b0d59cd..bb8af34980 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -22,6 +22,7 @@
 #include "access/subtrans.h"
 #include "access/twophase.h"
 #include "commands/async.h"
+#include "commands/wait.h"
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "postmaster/autovacuum.h"
@@ -147,6 +148,7 @@ CreateSharedMemoryAndSemaphores(void)
 		size = add_size(size, BTreeShmemSize());
 		size = add_size(size, SyncScanShmemSize());
 		size = add_size(size, AsyncShmemSize());
+		size = add_size(size, WaitShmemSize());
 #ifdef EXEC_BACKEND
 		size = add_size(size, ShmemBackendArraySize());
 #endif
@@ -264,6 +266,11 @@ CreateSharedMemoryAndSemaphores(void)
 	SyncScanShmemInit();
 	AsyncShmemInit();
 
+	/*
+	 * Init array of Latches in shared memory for WAIT
+	 */
+	WaitShmemInit();
+
 #ifdef EXEC_BACKEND
 
 	/*
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index b1f7f6e2d0..700a13f05b 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -15,6 +15,7 @@
  *-------------------------------------------------------------------------
  */
 #include "postgres.h"
+#include <float.h>
 
 #include "access/htup_details.h"
 #include "access/reloptions.h"
@@ -57,6 +58,7 @@
 #include "commands/user.h"
 #include "commands/vacuum.h"
 #include "commands/view.h"
+#include "commands/wait.h"
 #include "miscadmin.h"
 #include "parser/parse_utilcmd.h"
 #include "postmaster/bgwriter.h"
@@ -70,6 +72,9 @@
 #include "utils/lsyscache.h"
 #include "utils/rel.h"
 #include "utils/syscache.h"
+#include "executor/spi.h"
+#include "utils/fmgrprotos.h"
+#include "utils/pg_lsn.h"
 
 /* Hook for plugins to get control in ProcessUtility() */
 ProcessUtility_hook_type ProcessUtility_hook = NULL;
@@ -268,6 +273,7 @@ ClassifyUtilityCommandAsReadOnly(Node *parsetree)
 		case T_LoadStmt:
 		case T_PrepareStmt:
 		case T_UnlistenStmt:
+		case T_WaitStmt:
 		case T_VariableSetStmt:
 			{
 				/*
@@ -591,6 +597,11 @@ standard_ProcessUtility(PlannedStmt *pstmt,
 					case TRANS_STMT_START:
 						{
 							ListCell   *lc;
+							WaitStmt   *waitstmt = (WaitStmt *) stmt->wait;
+
+							/* If needed to WAIT FOR something but failed */
+							if (stmt->wait && WaitMain(waitstmt, dest) == 0)
+								break;
 
 							BeginTransactionBlock();
 							foreach(lc, stmt->options)
@@ -1062,6 +1073,13 @@ standard_ProcessUtility(PlannedStmt *pstmt,
 				break;
 			}
 
+		case T_WaitStmt:
+			{
+				WaitStmt   *stmt = (WaitStmt *) parsetree;
+				WaitMain(stmt, dest);
+				break;
+			}
+
 		default:
 			/* All other statement types have event trigger support */
 			ProcessUtilitySlow(pstate, pstmt, queryString,
@@ -2718,6 +2736,10 @@ CreateCommandTag(Node *parsetree)
 			tag = CMDTAG_NOTIFY;
 			break;
 
+		case T_WaitStmt:
+			tag = CMDTAG_WAIT;
+			break;
+
 		case T_ListenStmt:
 			tag = CMDTAG_LISTEN;
 			break;
@@ -3357,6 +3379,10 @@ GetCommandLogLevel(Node *parsetree)
 			lev = LOGSTMT_ALL;
 			break;
 
+		case T_WaitStmt:
+			lev = LOGSTMT_ALL;
+			break;
+
 		case T_ListenStmt:
 			lev = LOGSTMT_ALL;
 			break;
diff --git a/src/include/commands/wait.h b/src/include/commands/wait.h
new file mode 100644
index 0000000000..0270160d44
--- /dev/null
+++ b/src/include/commands/wait.h
@@ -0,0 +1,26 @@
+/*-------------------------------------------------------------------------
+ *
+ * wait.h
+ *	  prototypes for commands/wait.c
+ *
+ * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
+ * Portions Copyright (c) 2016, Regents of PostgresPRO
+ *
+ * src/include/commands/wait.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef WAIT_H
+#define WAIT_H
+#include "postgres.h"
+#include "tcop/dest.h"
+
+extern int WaitUtility(XLogRecPtr lsn, const float8 delay);
+extern Size WaitShmemSize(void);
+extern void WaitShmemInit(void);
+extern void WaitSetLatch(XLogRecPtr cur_lsn);
+extern XLogRecPtr GetMinWait(void);
+extern float8 WaitTimeResolve(Const *time);
+extern int WaitMain(WaitStmt *stmt, DestReceiver *dest);
+
+#endif   /* WAIT_H */
diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h
index 8a76afe8cc..348de76c5f 100644
--- a/src/include/nodes/nodes.h
+++ b/src/include/nodes/nodes.h
@@ -488,6 +488,7 @@ typedef enum NodeTag
 	T_DropReplicationSlotCmd,
 	T_StartReplicationCmd,
 	T_TimeLineHistoryCmd,
+	T_WaitStmt,
 	T_SQLCmd,
 
 	/*
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index cd6f1be643..bfed954d21 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3058,6 +3058,7 @@ typedef struct TransactionStmt
 	char	   *savepoint_name; /* for savepoint commands */
 	char	   *gid;			/* for two-phase-commit related commands */
 	bool		chain;			/* AND CHAIN option */
+	Node		*wait;			/* WAIT clause: list of events to wait for */
 } TransactionStmt;
 
 /* ----------------------
@@ -3567,4 +3568,26 @@ typedef struct DropSubscriptionStmt
 	DropBehavior behavior;		/* RESTRICT or CASCADE behavior */
 } DropSubscriptionStmt;
 
+/* ----------------------
+ *		WAIT FOR Statement + WAIT FOR clause of BEGIN statement
+ *		TODO: if we only pick one, remove the other
+ * ----------------------
+ */
+
+typedef enum WaitForStrategy
+{
+	WAIT_FOR_ANY = 0,
+	WAIT_FOR_ALL
+} WaitForStrategy;
+
+typedef struct WaitStmt
+{
+	NodeTag		type;
+	WaitForStrategy strategy;
+	List	   *events;		/* used as a pointer to the next WAIT event */
+	char	   *lsn;		/* WAIT FOR LSN */
+	int			delay;		/* WAIT FOR TIMEOUT */
+	Node	   *time;		/* WAIT FOR TIMESTAMP or TIME */
+} WaitStmt;
+
 #endif							/* PARSENODES_H */
diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h
index 08f22ce211..6e1848fe4c 100644
--- a/src/include/parser/kwlist.h
+++ b/src/include/parser/kwlist.h
@@ -243,6 +243,7 @@ PG_KEYWORD("location", LOCATION, UNRESERVED_KEYWORD)
 PG_KEYWORD("lock", LOCK_P, UNRESERVED_KEYWORD)
 PG_KEYWORD("locked", LOCKED, UNRESERVED_KEYWORD)
 PG_KEYWORD("logged", LOGGED, UNRESERVED_KEYWORD)
+PG_KEYWORD("lsn", LSN, UNRESERVED_KEYWORD)
 PG_KEYWORD("mapping", MAPPING, UNRESERVED_KEYWORD)
 PG_KEYWORD("match", MATCH, UNRESERVED_KEYWORD)
 PG_KEYWORD("materialized", MATERIALIZED, UNRESERVED_KEYWORD)
@@ -410,6 +411,7 @@ PG_KEYWORD("text", TEXT_P, UNRESERVED_KEYWORD)
 PG_KEYWORD("then", THEN, RESERVED_KEYWORD)
 PG_KEYWORD("ties", TIES, UNRESERVED_KEYWORD)
 PG_KEYWORD("time", TIME, COL_NAME_KEYWORD)
+PG_KEYWORD("timeout", TIMEOUT, UNRESERVED_KEYWORD)
 PG_KEYWORD("timestamp", TIMESTAMP, COL_NAME_KEYWORD)
 PG_KEYWORD("to", TO, RESERVED_KEYWORD)
 PG_KEYWORD("trailing", TRAILING, RESERVED_KEYWORD)
@@ -450,6 +452,7 @@ PG_KEYWORD("version", VERSION_P, UNRESERVED_KEYWORD)
 PG_KEYWORD("view", VIEW, UNRESERVED_KEYWORD)
 PG_KEYWORD("views", VIEWS, UNRESERVED_KEYWORD)
 PG_KEYWORD("volatile", VOLATILE, UNRESERVED_KEYWORD)
+PG_KEYWORD("wait", WAIT, UNRESERVED_KEYWORD)
 PG_KEYWORD("when", WHEN, RESERVED_KEYWORD)
 PG_KEYWORD("where", WHERE, RESERVED_KEYWORD)
 PG_KEYWORD("whitespace", WHITESPACE_P, UNRESERVED_KEYWORD)
diff --git a/src/include/tcop/cmdtaglist.h b/src/include/tcop/cmdtaglist.h
index 8ef0f55e74..430bb5c717 100644
--- a/src/include/tcop/cmdtaglist.h
+++ b/src/include/tcop/cmdtaglist.h
@@ -216,3 +216,4 @@ PG_CMDTAG(CMDTAG_TRUNCATE_TABLE, "TRUNCATE TABLE", false, false, false)
 PG_CMDTAG(CMDTAG_UNLISTEN, "UNLISTEN", false, false, false)
 PG_CMDTAG(CMDTAG_UPDATE, "UPDATE", false, false, true)
 PG_CMDTAG(CMDTAG_VACUUM, "VACUUM", false, false, false)
+PG_CMDTAG(CMDTAG_WAIT, "WAIT FOR", false, false, false)
diff --git a/src/test/recovery/t/020_begin_wait.pl b/src/test/recovery/t/020_begin_wait.pl
new file mode 100644
index 0000000000..9bec57ee8f
--- /dev/null
+++ b/src/test/recovery/t/020_begin_wait.pl
@@ -0,0 +1,145 @@
+# Checks WAIT FOR
+use strict;
+use warnings;
+
+use PostgresNode;
+use TestLib;
+use Test::More tests => 8;
+
+# Initialize master node
+my $node_master = get_new_node('master');
+$node_master->init(allows_streaming => 1);
+$node_master->start;
+
+# And some content and take a backup
+$node_master->safe_psql('postgres',
+	"CREATE TABLE wait_test AS SELECT generate_series(1,10) AS a");
+my $backup_name = 'my_backup';
+$node_master->backup($backup_name);
+
+# Create a streaming standby with a 1 second delay from the backup
+my $node_standby = get_new_node('standby');
+my $delay        = 1;
+$node_standby->init_from_backup($node_master, $backup_name,
+	has_streaming => 1);
+$node_standby->append_conf('postgresql.conf', qq[
+	recovery_min_apply_delay = '${delay}s'
+]);
+$node_standby->start;
+
+
+
+# Make sure that WAIT FOR LSN works: add new content to master and memorize
+# master's new LSN, then wait for master's LSN on standby. Prove that WAIT is
+# able to setup an infinite waiting loop and exit it if given no wait timeout.
+$node_master->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(11, 20))");
+my $lsn1 = $node_master->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+$node_standby->safe_psql('postgres', "BEGIN WAIT FOR LSN '$lsn1'");
+
+# Get the current LSN on standby and make sure it's the same as master's LSN
+my $lsn_standby = $node_standby->safe_psql('postgres',
+	"SELECT pg_last_wal_replay_lsn()");
+my $compare_lsns = $node_standby->safe_psql('postgres',
+	"SELECT pg_lsn_cmp('$lsn_standby'::pg_lsn, '$lsn1'::pg_lsn)");
+ok($compare_lsns eq 0, "standby reached the same LSN as master after WAIT");
+
+
+
+# Check that timeouts work on their own and let us wait for specified time (1s)
+my $current_time = $node_standby->safe_psql('postgres', "SELECT now()");
+my $one_second = 1000; # in milliseconds
+my $start_time = time();
+# While we're at it, also make sure that the syntax with commas works fine and
+# that by default we use WAIT FOR ALL strategy, which means waiting for max time
+$node_standby->safe_psql('postgres',
+	"WAIT FOR TIMEOUT $one_second, TIMESTAMP '$current_time'");
+my $time_waited = (time() - $start_time) * 1000; # convert to milliseconds
+ok($time_waited >= $one_second, "WAIT FOR TIMEOUT waits for enough time");
+
+# Now, check that timeouts work as expected when waiting for LSN
+$node_master->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(21, 30))");
+my $lsn2 = $node_master->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+my $reached_lsn = $node_standby->safe_psql('postgres',
+	"BEGIN WAIT FOR LSN '$lsn2' TIMEOUT 1");
+ok($reached_lsn eq "f", "WAIT doesn't reach LSN if given too little wait time");
+
+
+#===============================================================================
+# TODO: remove this test if we remove the standalone "WAIT FOR" command
+#===============================================================================
+# We need to check that WAIT works fine inside transactions. For that, let's
+# get two LSNs that will correspond to two different max values in our table.
+$node_master->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(31, 40))");
+my $lsn3 = $node_master->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+$node_master->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(41, 50))");
+my $lsn4 = $node_master->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+
+# Before starting transaction, wait for LSN which ensures a max value of 40.
+# Inside the transaction, wait for LSN that ensures a max value of 50.
+# Due to ISOLATION LEVEL REPEATABLE READ, we should NOT see the new max value.
+my $standby_results = $node_standby->safe_psql(
+	'postgres', qq[
+	BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ WAIT FOR LSN '$lsn3';
+	SELECT max(a) FROM wait_test;
+	BEGIN WAIT FOR LSN '$lsn4';
+	SELECT pg_last_wal_replay_lsn();
+	SELECT max(a) FROM wait_test;
+	COMMIT;
+]);
+
+# Make sure that we indeed reach master's last LSN inside the transaction.
+# For that, check that calling pg_last_wal_replay_lsn returned that LSN.
+my $last_lsn_reached = $standby_results =~ /$lsn4/;
+ok($last_lsn_reached, "WAIT FOR LSN works inside a transaction");
+
+# Check that transaction doesn't break and show us the new max value after WAIT.
+# For that, make sure that the older max value is repeated twice in the results.
+my $count = () = $standby_results =~ /40/g;
+ok($count eq 2, "transaction isolation level doesn't get broken due to WAIT");
+
+
+
+# Get multiple LSNs for testing WAIT FOR ANY / WAIT FOR ALL
+$node_master->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(51, 60))");
+my $lsn5 = $node_master->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+$node_master->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(61, 70000))");
+my $lsn6 = $node_master->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+$node_master->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(61, 800000))");
+my $lsn7 = $node_master->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+
+# Check that WAIT FOR ANY works fine
+$node_standby->safe_psql('postgres',
+	"BEGIN WAIT FOR ANY LSN '$lsn5' LSN '$lsn6' LSN '$lsn7'");
+$lsn_standby = $node_standby->safe_psql('postgres',
+	"SELECT pg_last_wal_replay_lsn()");
+$compare_lsns = $node_standby->safe_psql('postgres',
+	"SELECT pg_lsn_cmp('$lsn_standby'::pg_lsn, '$lsn5'::pg_lsn)");
+ok($compare_lsns ge 0,
+	"WAIT FOR ANY makes us reach at least the minimum LSN from the list");
+$compare_lsns = $node_standby->safe_psql('postgres',
+	"SELECT pg_lsn_cmp('$lsn_standby'::pg_lsn, '$lsn7'::pg_lsn)");
+# TODO: Could this somehow fail due to the machine being very fast at applying LSN?
+ok($compare_lsns lt 0,
+	"WAIT FOR ANY didn't make us reach the maximum LSN from the list");
+
+# Check that WAIT FOR ALL works fine
+$node_standby->safe_psql('postgres',
+	"BEGIN WAIT FOR ALL LSN '$lsn5', LSN '$lsn6', LSN '$lsn7'");
+$lsn_standby = $node_standby->safe_psql('postgres',
+	"SELECT pg_last_wal_replay_lsn()");
+$compare_lsns = $node_standby->safe_psql('postgres',
+	"SELECT pg_lsn_cmp('$lsn_standby'::pg_lsn, '$lsn7'::pg_lsn)");
+ok($compare_lsns eq 0,
+	"WAIT FOR ALL makes us reach the maximum LSN from the list");
+
+
+
+$node_standby->stop;
+$node_master->stop;
diff --git a/src/test/recovery/t/021_wait.pl b/src/test/recovery/t/021_wait.pl
new file mode 100644
index 0000000000..c270e78574
--- /dev/null
+++ b/src/test/recovery/t/021_wait.pl
@@ -0,0 +1,144 @@
+# Checks WAIT FOR
+use strict;
+use warnings;
+
+use PostgresNode;
+use TestLib;
+use Test::More tests => 8;
+
+# Initialize master node
+my $node_master = get_new_node('master');
+$node_master->init(allows_streaming => 1);
+$node_master->start;
+
+# And some content and take a backup
+$node_master->safe_psql('postgres',
+	"CREATE TABLE wait_test AS SELECT generate_series(1,10) AS a");
+my $backup_name = 'my_backup';
+$node_master->backup($backup_name);
+
+# Create a streaming standby with a 1 second delay from the backup
+my $node_standby = get_new_node('standby');
+my $delay        = 1;
+$node_standby->init_from_backup($node_master, $backup_name,
+	has_streaming => 1);
+$node_standby->append_conf('postgresql.conf', qq[
+	recovery_min_apply_delay = '${delay}s'
+]);
+$node_standby->start;
+
+
+
+# Make sure that WAIT FOR LSN works: add new content to master and memorize
+# master's new LSN, then wait for master's LSN on standby. Prove that WAIT is
+# able to setup an infinite waiting loop and exit it if given no wait timeout.
+$node_master->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(11, 20))");
+my $lsn1 = $node_master->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+$node_standby->safe_psql('postgres', "WAIT FOR LSN '$lsn1'");
+
+# Get the current LSN on standby and make sure it's the same as master's LSN
+my $lsn_standby = $node_standby->safe_psql('postgres',
+	"SELECT pg_last_wal_replay_lsn()");
+my $compare_lsns = $node_standby->safe_psql('postgres',
+	"SELECT pg_lsn_cmp('$lsn_standby'::pg_lsn, '$lsn1'::pg_lsn)");
+ok($compare_lsns eq 0, "standby reached the same LSN as master after WAIT");
+
+
+
+# Check that timeouts work on their own and let us wait for specified time (1s)
+my $current_time = $node_standby->safe_psql('postgres', "SELECT now()");
+my $one_second = 1000; # in milliseconds
+my $start_time = time();
+# While we're at it, also make sure that the syntax with commas works fine and
+# that by default we use WAIT FOR ALL strategy, which means waiting for max time
+$node_standby->safe_psql('postgres',
+	"WAIT FOR TIMEOUT $one_second, TIMESTAMP '$current_time'");
+my $time_waited = (time() - $start_time) * 1000; # convert to milliseconds
+ok($time_waited >= $one_second, "WAIT FOR TIMEOUT waits for enough time");
+
+# Now, check that timeouts work as expected when waiting for LSN
+$node_master->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(21, 30))");
+my $lsn2 = $node_master->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+my $reached_lsn = $node_standby->safe_psql('postgres',
+	"WAIT FOR LSN '$lsn2' TIMEOUT 1");
+ok($reached_lsn eq "f", "WAIT doesn't reach LSN if given too little wait time");
+
+
+
+# We need to check that WAIT works fine inside transactions. For that, let's
+# get two LSNs that will correspond to two different max values in our table.
+$node_master->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(31, 40))");
+my $lsn3 = $node_master->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+$node_master->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(41, 50))");
+my $lsn4 = $node_master->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+
+# Before starting transaction, wait for LSN which ensures a max value of 40.
+# Inside the transaction, wait for LSN that ensures a max value of 50.
+# Due to ISOLATION LEVEL REPEATABLE READ, we should NOT see the new max value.
+my $standby_results = $node_standby->safe_psql(
+	'postgres', qq[
+	WAIT FOR LSN '$lsn3';
+	BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;
+	SELECT max(a) FROM wait_test;
+	WAIT FOR LSN '$lsn4';
+	SELECT pg_last_wal_replay_lsn();
+	SELECT max(a) FROM wait_test;
+	COMMIT;
+]);
+
+# Make sure that we indeed reach master's last LSN inside the transaction.
+# For that, check that calling pg_last_wal_replay_lsn returned that LSN.
+my $last_lsn_reached = $standby_results =~ /$lsn4/;
+ok($last_lsn_reached, "WAIT FOR LSN works inside a transaction");
+
+# Check that transaction doesn't break and show us the new max value after WAIT.
+# For that, make sure that the older max value is repeated twice in the results.
+my $count = () = $standby_results =~ /40/g;
+ok($count eq 2, "transaction isolation level doesn't get broken due to WAIT");
+
+
+
+# Get multiple LSNs for testing WAIT FOR ANY / WAIT FOR ALL
+$node_master->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(51, 60))");
+my $lsn5 = $node_master->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+$node_master->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(61, 70000))");
+my $lsn6 = $node_master->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+$node_master->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(61, 800000))");
+my $lsn7 = $node_master->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+
+# Check that WAIT FOR ANY works fine
+$node_standby->safe_psql('postgres',
+	"WAIT FOR ANY LSN '$lsn5' LSN '$lsn6' LSN '$lsn7'");
+$lsn_standby = $node_standby->safe_psql('postgres',
+	"SELECT pg_last_wal_replay_lsn()");
+$compare_lsns = $node_standby->safe_psql('postgres',
+	"SELECT pg_lsn_cmp('$lsn_standby'::pg_lsn, '$lsn5'::pg_lsn)");
+ok($compare_lsns ge 0,
+	"WAIT FOR ANY makes us reach at least the minimum LSN from the list");
+$compare_lsns = $node_standby->safe_psql('postgres',
+	"SELECT pg_lsn_cmp('$lsn_standby'::pg_lsn, '$lsn7'::pg_lsn)");
+# TODO: Could this somehow fail due to the machine being very fast at applying LSN?
+ok($compare_lsns lt 0,
+	"WAIT FOR ANY didn't make us reach the maximum LSN from the list");
+
+# Check that WAIT FOR ALL works fine
+$node_standby->safe_psql('postgres',
+	"WAIT FOR ALL LSN '$lsn5', LSN '$lsn6', LSN '$lsn7'");
+$lsn_standby = $node_standby->safe_psql('postgres',
+	"SELECT pg_last_wal_replay_lsn()");
+$compare_lsns = $node_standby->safe_psql('postgres',
+	"SELECT pg_lsn_cmp('$lsn_standby'::pg_lsn, '$lsn7'::pg_lsn)");
+ok($compare_lsns eq 0,
+	"WAIT FOR ALL makes us reach the maximum LSN from the list");
+
+
+
+$node_standby->stop;
+$node_master->stop;

Reply via email to