On 2018-03-06 14:50, Simon Riggs wrote:
On 6 March 2018 at 11:24, Dmitry Ivanov <d.iva...@postgrespro.ru> wrote:
In PG11, I propose the following command, sticking mostly to Ants'
syntax, and allowing to wait for multiple events before it returns. It
doesn't hold snapshot and will not get cancelled by Hot Standby.

WAIT FOR event [, event ...] options

event is
LSN value
TIMESTAMP value

options
TIMEOUT delay
UNTIL TIMESTAMP timestamp
(we have both, so people don't need to do math, they can use whichever
they have)


I have a (possibly) dumb question: if we have specified several events,
should WAIT finish if only one of them triggered? It's not immediately
obvious if we're waiting for ALL of them to trigger, or just one will
suffice (ANY). IMO the syntax could be extended to something like:

WAIT FOR [ANY | ALL] event [, event ...] options,

with ANY being the default variant.

+1

Here I made new patch of feature, discussed above.

WAIT FOR - wait statement to pause beneath statements
==========

Synopsis
==========
    WAIT FOR [ANY | SOME | ALL] event [, event ...] options
    and event is:
        LSN value
        TIMESTAMP value

    and options is:
        TIMEOUT delay
        UNTIL TIMESTAMP timestamp
Description
==========
WAIT FOR - make to wait statements (that are beneath) on sleep until
event happens (Don’t process new queries until an event happens).

How to use it
==========
WAIT FOR LSN ‘LSN’ [, timeout in ms];

#Wait until LSN 0/303EC60 will be replayed, or 10 second passed.
WAIT FOR ANY LSN ‘0/303EC60’, TIMEOUT 10000;

#Or same without timeout.
WAIT FOR LSN ‘0/303EC60’;

#Or wait for some timestamp.
WAIT FOR TIMESTAMP '2020-01-02 17:20:19.028161+03';

#Wait until ALL events occur: LSN to be replayed and timestamp
passed.
WAIT FOR ALL LSN ‘0/303EC60’, TIMESTAMP '2020-01-28 11:10:39.021341+03';

Notice: WAIT FOR will release on PostmasterDeath or Interruption events
if they come earlier then LSN or timeout.

Testing the implementation
======================
The implementation was tested with src/test/recovery/t/018_waitfor.pl

--
Ivan Kartyshov
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company
diff --git a/doc/src/sgml/ref/allfiles.sgml b/doc/src/sgml/ref/allfiles.sgml
index 8d91f3529e6..1a2bc7dfa93 100644
--- a/doc/src/sgml/ref/allfiles.sgml
+++ b/doc/src/sgml/ref/allfiles.sgml
@@ -187,6 +187,7 @@ Complete list of usable sgml source files in this directory.
 <!ENTITY update             SYSTEM "update.sgml">
 <!ENTITY vacuum             SYSTEM "vacuum.sgml">
 <!ENTITY values             SYSTEM "values.sgml">
+<!ENTITY waitfor            SYSTEM "waitfor.sgml">
 
 <!-- applications and utilities -->
 <!ENTITY clusterdb          SYSTEM "clusterdb.sgml">
diff --git a/doc/src/sgml/ref/waitfor.sgml b/doc/src/sgml/ref/waitfor.sgml
new file mode 100644
index 00000000000..8fa2ddb4492
--- /dev/null
+++ b/doc/src/sgml/ref/waitfor.sgml
@@ -0,0 +1,136 @@
+<!--
+doc/src/sgml/ref/waitlsn.sgml
+PostgreSQL documentation
+-->
+
+<refentry id="sql-waitlsn">
+ <indexterm zone="sql-waitlsn">
+  <primary>WAIT FOR</primary>
+ </indexterm>
+
+ <refmeta>
+  <refentrytitle>WAIT FOR</refentrytitle>
+  <manvolnum>7</manvolnum>
+  <refmiscinfo>SQL - Language Statements</refmiscinfo>
+ </refmeta>
+
+ <refnamediv>
+  <refname>WAIT FOR</refname>
+  <refpurpose>wait for the target <acronym>LSN</acronym> to be replayed</refpurpose>
+ </refnamediv>
+
+ <refsynopsisdiv>
+<synopsis>
+WAIT FOR LSN '<replaceable class="parameter">lsn_number</replaceable>' 
+WAIT FOR LSN '<replaceable class="parameter">lsn_number</replaceable>' TIMEOUT <replaceable class="parameter">wait_timeout</replaceable>
+WAIT FOR LSN '<replaceable class="parameter">lsn_number</replaceable>' UNTIL TIMESTAMP <replaceable class="parameter">wait_time</replaceable>
+WAIT FOR TIMESTAMP <replaceable class="parameter">wait_time</replaceable> 
+</synopsis>
+ </refsynopsisdiv>
+
+ <refsect1>
+  <title>Description</title>
+
+  <para>
+   <command>WAIT FOR</command> provides a simple
+   interprocess communication mechanism to wait for timestamp or the target log sequence
+   number (<acronym>LSN</acronym>) on standby in <productname>PostgreSQL</productname>
+   databases with master-standby asynchronous replication. When run with the
+   <replaceable>LSN</replaceable> option, the <command>WAIT FOR</command> command
+   waits for the specified <acronym>LSN</acronym> to be replayed. By default, wait
+   time is unlimited. Waiting can be interrupted using <literal>Ctrl+C</literal>, or
+   by shutting down the <literal>postgres</literal> server. You can also limit the wait
+   time using the <option>TIMEOUT</option> option.
+  </para>
+
+ </refsect1>
+
+ <refsect1>
+  <title>Parameters</title>
+
+  <variablelist>
+   <varlistentry>
+    <term><replaceable class="parameter">LSN</replaceable></term>
+    <listitem>
+     <para>
+      Specify the target log sequence number to wait for.
+     </para>
+    </listitem>
+   </varlistentry>
+
+   <varlistentry>
+    <term>TIMEOUT <replaceable class="parameter">wait_timeout</replaceable></term>
+    <listitem>
+     <para>
+      Limit the time interval to wait for the LSN to be replayed.
+      The specified <replaceable>wait_timeout</replaceable> must be an integer
+      and is measured in milliseconds.
+     </para>
+    </listitem>
+   </varlistentry>
+
+   <varlistentry>
+    <term>UNTIL TIMESTAMP <replaceable class="parameter">wait_time</replaceable></term>
+    <listitem>
+     <para>
+      Limit the time to wait for the LSN to be replayed.
+      The specified <replaceable>wait_time</replaceable> must be timestamp.
+     </para>
+    </listitem>
+   </varlistentry>
+
+  </variablelist>
+ </refsect1>
+
+ <refsect1>
+  <title>Examples</title>
+
+  <para>
+   Run <literal>WAIT FOR</literal> from <application>psql</application>,
+   limiting wait time to 10000 milliseconds:
+
+<screen>
+WAIT FOR LSN '0/3F07A6B1' TIMEOUT 10000;
+NOTICE:  LSN is not reached. Try to increase wait time.
+LSN reached
+-------------
+ f
+(1 row)
+</screen>
+  </para>
+
+  <para>
+   Wait until the specified <acronym>LSN</acronym> is replayed:
+<screen>
+WAIT FOR LSN '0/3F07A611';
+LSN reached
+-------------
+ t
+(1 row)
+</screen>
+  </para>
+
+  <para>
+   Limit <acronym>LSN</acronym> wait time to 500000 milliseconds, and then cancel the command:
+<screen>
+WAIT FOR LSN '0/3F0FF791' TIMEOUT 500000;
+^CCancel request sent
+NOTICE:  LSN is not reached. Try to increase wait time.
+ERROR:  canceling statement due to user request
+ LSN reached
+-------------
+ f
+(1 row)
+</screen>
+</para>
+ </refsect1>
+
+ <refsect1>
+  <title>Compatibility</title>
+
+  <para>
+   There is no <command>WAIT FOR</command> statement in the SQL
+   standard.
+  </para>
+ </refsect1>
+</refentry>
diff --git a/doc/src/sgml/reference.sgml b/doc/src/sgml/reference.sgml
index cef09dd38b3..bfe62ee3516 100644
--- a/doc/src/sgml/reference.sgml
+++ b/doc/src/sgml/reference.sgml
@@ -215,6 +215,7 @@
    &update;
    &vacuum;
    &values;
+   &waitfor;
 
  </reference>
 
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 63ab0ab6c2e..2263e930ac6 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -40,6 +40,7 @@
 #include "catalog/pg_control.h"
 #include "catalog/pg_database.h"
 #include "commands/tablespace.h"
+#include "commands/wait.h"
 #include "common/controldata_utils.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -7245,6 +7246,15 @@ StartupXLOG(void)
 					break;
 				}
 
+				/*
+				 * After update lastReplayedEndRecPtr set Latches in SHMEM array
+				 */
+				if (XLogCtl->lastReplayedEndRecPtr >= GetMinWait())
+				{
+
+					WaitSetLatch(XLogCtl->lastReplayedEndRecPtr);
+				}
+
 				/* Else, try to fetch the next WAL record */
 				record = ReadRecord(xlogreader, InvalidXLogRecPtr, LOG, false);
 			} while (record != NULL);
diff --git a/src/backend/commands/Makefile b/src/backend/commands/Makefile
index d64628566d3..ba42e494aa0 100644
--- a/src/backend/commands/Makefile
+++ b/src/backend/commands/Makefile
@@ -20,6 +20,6 @@ OBJS = amcmds.o aggregatecmds.o alter.o analyze.o async.o cluster.o comment.o \
 	policy.o portalcmds.o prepare.o proclang.o publicationcmds.o \
 	schemacmds.o seclabel.o sequence.o statscmds.o subscriptioncmds.o \
 	tablecmds.o tablespace.o trigger.o tsearchcmds.o typecmds.o user.o \
-	vacuum.o variable.o view.o
+	vacuum.o variable.o view.o wait.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/commands/wait.c b/src/backend/commands/wait.c
new file mode 100644
index 00000000000..bceffeb600c
--- /dev/null
+++ b/src/backend/commands/wait.c
@@ -0,0 +1,265 @@
+/*-------------------------------------------------------------------------
+ *
+ * wait.c
+ *	  wait statment: wait
+ *
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
+ * Portions Copyright (c) 2019, Regents of PostgresPro
+ *
+ * IDENTIFICATION
+ *	  src/backend/commands/wait.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+/*
+ * -------------------------------------------------------------------------
+ * Wait for LSN been replayed on replica
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+#include "pgstat.h"
+#include "fmgr.h"
+#include "access/transam.h"
+#include "access/xact.h"
+#include "access/xlogdefs.h"
+#include "access/xlog.h"
+#include "catalog/pg_type.h"
+#include "commands/wait.h"
+#include "funcapi.h"
+#include "miscadmin.h"
+#include "storage/backendid.h"
+#include "storage/ipc.h"
+#include "storage/latch.h"
+#include "storage/pmsignal.h"
+#include "storage/proc.h"
+#include "storage/shmem.h"
+#include "storage/spin.h"
+#include "storage/sinvaladt.h"
+#include "utils/builtins.h"
+#include "utils/pg_lsn.h"
+#include "utils/timestamp.h"
+
+/* Add/delete to shmem array */
+static void AddEvent(XLogRecPtr trg_lsn);
+static void DeleteEvent(void);
+
+/* Shared memory structures */
+typedef struct
+{
+	XLogRecPtr	trg_lsn;
+	/* Left here struct BIDState for future compatibility */
+} BIDState;
+
+typedef struct
+{
+	int			backend_maxid;
+	XLogRecPtr	min_lsn;
+	slock_t		mutex;
+	BIDState	event_arr[FLEXIBLE_ARRAY_MEMBER];
+} GlobState;
+
+static volatile GlobState *state;
+
+/* Add event for current backend to shmem array */
+static void
+AddEvent(XLogRecPtr trg_lsn)
+{
+	SpinLockAcquire(&state->mutex);
+	if (state->backend_maxid < MyBackendId)
+		state->backend_maxid = MyBackendId;
+
+	state->event_arr[MyBackendId].trg_lsn = trg_lsn;
+
+	if (trg_lsn < state->min_lsn)
+		state->min_lsn = trg_lsn;
+	SpinLockRelease(&state->mutex);
+}
+
+/*
+ * Delete event for current backend to shared array.
+ *
+ * TODO: Consider state cleanup on backend failure.
+ */
+static void
+DeleteEvent(void)
+{
+	int i;
+	XLogRecPtr trg_lsn = state->event_arr[MyBackendId].trg_lsn;
+
+	state->event_arr[MyBackendId].trg_lsn = InvalidXLogRecPtr;
+
+	SpinLockAcquire(&state->mutex);
+	/* Update state->min_lsn iff it is nessesary choosing next min_lsn */
+	if (state->min_lsn == trg_lsn)
+	{
+		state->min_lsn = PG_UINT64_MAX;
+		for (i = 2; i <= state->backend_maxid; i++)
+			if (state->event_arr[i].trg_lsn != InvalidXLogRecPtr &&
+				state->event_arr[i].trg_lsn < state->min_lsn)
+				state->min_lsn = state->event_arr[i].trg_lsn;
+	}
+
+	if (state->backend_maxid == MyBackendId)
+		for (i = (MyBackendId); i >=2; i--)
+			if (state->event_arr[i].trg_lsn != InvalidXLogRecPtr)
+			{
+				state->backend_maxid = i;
+				break;
+			}
+
+	SpinLockRelease(&state->mutex);
+}
+
+/* Get size of shared memory for GlobState */
+Size
+WaitShmemSize(void)
+{
+	return offsetof(GlobState, event_arr) + sizeof(BIDState) * (MaxBackends+1);
+}
+
+/* Init array of events in shared memory */
+void
+WaitShmemInit(void)
+{
+	bool	found;
+	uint32	i;
+
+	state = (GlobState *) ShmemInitStruct("pg_wait_lsn",
+										  WaitShmemSize(),
+										  &found);
+	if (!found)
+	{
+		SpinLockInit(&state->mutex);
+
+		for (i = 0; i < (MaxBackends+1); i++)
+			state->event_arr[i].trg_lsn = InvalidXLogRecPtr;
+
+		state->backend_maxid = 0;
+		state->min_lsn = PG_UINT64_MAX;
+	}
+}
+
+/* Set all Latches in shared memory cause new LSN being replayed */
+void
+WaitSetLatch(XLogRecPtr cur_lsn)
+{
+	uint32		i;
+	int 		backend_maxid;
+	PGPROC	   *backend;
+
+	SpinLockAcquire(&state->mutex);
+	backend_maxid = state->backend_maxid;
+	SpinLockRelease(&state->mutex);
+
+	for (i = 2; i <= backend_maxid; i++)
+	{
+		backend = BackendIdGetProc(i);
+		if (state->event_arr[i].trg_lsn != 0)
+		{
+			if (state->event_arr[i].trg_lsn <= cur_lsn)
+				SetLatch(&backend->procLatch);
+		}
+	}
+}
+
+/* Get minimal LSN that will be next */
+XLogRecPtr
+GetMinWait(void)
+{
+	return state->min_lsn;
+}
+
+/*
+ * On WAIT own MyLatch and wait till LSN is replayed, Postmaster death interruption
+ * or timeout.
+ */
+void
+WaitUtility(const char *lsn, const int delay, DestReceiver *dest)
+{
+	XLogRecPtr		trg_lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in, CStringGetDatum(lsn)));
+	XLogRecPtr		cur_lsn;
+	int				latch_events;
+	uint64			tdelay = delay;
+	long			secs;
+	int				microsecs;
+	TimestampTz		timer = GetCurrentTimestamp();
+	TupOutputState *tstate;
+	TupleDesc		tupdesc;
+	char		   *value = "f";
+
+	if (delay > 0)
+		latch_events = WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH;
+	else
+		latch_events = WL_LATCH_SET | WL_POSTMASTER_DEATH;
+
+	AddEvent(trg_lsn);
+
+	for (;;)
+	{
+		cur_lsn = GetXLogReplayRecPtr(NULL);
+
+		/* If LSN has been Replayed */
+		if (trg_lsn <= cur_lsn)
+			break;
+
+		/* If the postmaster dies, finish immediately */
+		if (!PostmasterIsAlive())
+			break;
+
+		/* If delay time is over */
+		if (latch_events & WL_TIMEOUT)
+		{
+			if (TimestampDifferenceExceeds(timer,GetCurrentTimestamp(),delay))
+				break;
+			TimestampDifference(timer,GetCurrentTimestamp(),&secs, &microsecs);
+			tdelay = delay - (secs*1000 + microsecs/1000);
+		}
+
+		/* Little hack behaviour like SnapshotResetXmin to work outoff snapshot*/
+		MyPgXact->xmin = InvalidTransactionId;
+		WaitLatch(MyLatch, latch_events, tdelay, WAIT_EVENT_CLIENT_READ);
+		ResetLatch(MyLatch);
+
+		/* CHECK_FOR_INTERRUPTS if they come then Delete current event from array */
+		if (InterruptPending)
+		{
+			DeleteEvent();
+			ProcessInterrupts();
+		}
+
+	}
+
+	DeleteEvent();
+
+	if (trg_lsn > cur_lsn)
+		elog(NOTICE,"LSN is not reached. Try to increase wait time.");
+	else
+		value = "t";
+
+	/* Need a tuple descriptor representing a single TEXT column */
+	tupdesc = CreateTemplateTupleDesc(1);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 1, "LSN reached", TEXTOID, -1, 0);
+	/* Prepare for projection of tuples */
+	tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsMinimalTuple);
+
+	/* Send it */
+	do_text_output_oneline(tstate, value);
+	end_tup_output(tstate);
+}
+
+void
+WaitTimeUtility(const int delay)
+{
+	int				latch_events;
+
+	if (delay < 0)
+		return ;
+
+	latch_events = WL_TIMEOUT | WL_POSTMASTER_DEATH;
+
+	MyPgXact->xmin = InvalidTransactionId;
+	WaitLatch(MyLatch, latch_events, delay, WAIT_EVENT_CLIENT_READ);
+	ResetLatch(MyLatch);
+}
diff --git a/src/backend/parser/analyze.c b/src/backend/parser/analyze.c
index ecd6a8bae7d..5ccf14ead46 100644
--- a/src/backend/parser/analyze.c
+++ b/src/backend/parser/analyze.c
@@ -78,6 +78,7 @@ static Query *transformCreateTableAsStmt(ParseState *pstate,
 										 CreateTableAsStmt *stmt);
 static Query *transformCallStmt(ParseState *pstate,
 								CallStmt *stmt);
+static Query *transformWaitForStmt(ParseState *pstate, WaitStmt *stmt);
 static void transformLockingClause(ParseState *pstate, Query *qry,
 								   LockingClause *lc, bool pushedDown);
 #ifdef RAW_EXPRESSION_COVERAGE_TEST
@@ -326,6 +327,9 @@ transformStmt(ParseState *pstate, Node *parseTree)
 			result = transformCallStmt(pstate,
 									   (CallStmt *) parseTree);
 			break;
+		case T_WaitStmt:
+			result = transformWaitForStmt(pstate, (WaitStmt *) parseTree);
+			break;
 
 		default:
 
@@ -2981,6 +2985,20 @@ applyLockingClause(Query *qry, Index rtindex,
 	qry->rowMarks = lappend(qry->rowMarks, rc);
 }
 
+static Query *
+transformWaitForStmt(ParseState *pstate, WaitStmt *stmt)
+{
+	Query *result;
+
+	stmt->time = transformExpr(pstate, stmt->time, EXPR_KIND_OTHER);
+
+	result = makeNode(Query);
+	result->commandType = CMD_UTILITY;
+	result->utilityStmt = (Node *) stmt;
+
+	return result;
+}
+
 /*
  * Coverage testing for raw_expression_tree_walker().
  *
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 208b4a1f28a..d13e22fea86 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -276,7 +276,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 		SecLabelStmt SelectStmt TransactionStmt TruncateStmt
 		UnlistenStmt UpdateStmt VacuumStmt
 		VariableResetStmt VariableSetStmt VariableShowStmt
-		ViewStmt CheckPointStmt CreateConversionStmt
+		ViewStmt WaitStmt CheckPointStmt CreateConversionStmt
 		DeallocateStmt PrepareStmt ExecuteStmt
 		DropOwnedStmt ReassignOwnedStmt
 		AlterTSConfigurationStmt AlterTSDictionaryStmt
@@ -485,7 +485,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 %type <list>	row explicit_row implicit_row type_list array_expr_list
 %type <node>	case_expr case_arg when_clause case_default
 %type <list>	when_clause_list
-%type <ival>	sub_type opt_materialized
+%type <ival>	sub_type wait_strategy opt_materialized
 %type <value>	NumericOnly
 %type <list>	NumericOnly_list
 %type <alias>	alias_clause opt_alias_clause
@@ -655,7 +655,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 
 	LABEL LANGUAGE LARGE_P LAST_P LATERAL_P
 	LEADING LEAKPROOF LEAST LEFT LEVEL LIKE LIMIT LISTEN LOAD LOCAL
-	LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOCKED LOGGED
+	LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOCKED LOGGED LSN
 
 	MAPPING MATCH MATERIALIZED MAXVALUE METHOD MINUTE_P MINVALUE MODE MONTH_P MOVE
 
@@ -685,7 +685,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 	SUBSCRIPTION SUBSTRING SUPPORT SYMMETRIC SYSID SYSTEM_P
 
 	TABLE TABLES TABLESAMPLE TABLESPACE TEMP TEMPLATE TEMPORARY TEXT_P THEN
-	TIES TIME TIMESTAMP TO TRAILING TRANSACTION TRANSFORM
+	TIES TIME TIMEOUT TIMESTAMP TO TRAILING TRANSACTION TRANSFORM
 	TREAT TRIGGER TRIM TRUE_P
 	TRUNCATE TRUSTED TYPE_P TYPES_P
 
@@ -695,7 +695,8 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 	VACUUM VALID VALIDATE VALIDATOR VALUE_P VALUES VARCHAR VARIADIC VARYING
 	VERBOSE VERSION_P VIEW VIEWS VOLATILE
 
-	WHEN WHERE WHITESPACE_P WINDOW WITH WITHIN WITHOUT WORK WRAPPER WRITE
+	WAIT WHEN WHERE WHITESPACE_P WINDOW
+	WITH WITHIN WITHOUT WORK WRAPPER WRITE
 
 	XML_P XMLATTRIBUTES XMLCONCAT XMLELEMENT XMLEXISTS XMLFOREST XMLNAMESPACES
 	XMLPARSE XMLPI XMLROOT XMLSERIALIZE XMLTABLE
@@ -947,6 +948,7 @@ stmt :
 			| VariableSetStmt
 			| VariableShowStmt
 			| ViewStmt
+			| WaitStmt
 			| /*EMPTY*/
 				{ $$ = NULL; }
 		;
@@ -14010,6 +14012,88 @@ xml_passing_mech:
 			| BY VALUE_P
 		;
 
+/*****************************************************************************
+ *
+ *		QUERY:
+ *				WAIT FOR <event> [, <event> ...] [option]
+ *				event:
+ *					LSN value
+ *					TIMEOUT value
+ *					TIMESTAMP timestamp
+ *				option:
+ *					TIMEOUT delay
+ *					UNTIL TIMESTAMP timestamp
+ *
+ *****************************************************************************/
+
+WaitStmt:
+			WAIT FOR wait_strategy LSN Sconst
+				{
+					WaitStmt *n = makeNode(WaitStmt);
+					n->kind = WAIT_FOR_LSN;
+					n->lsn = $5;
+					n->delay = 0;
+					n->time = 0;
+					$$ = (Node *)n;
+				}
+			| WAIT FOR  wait_strategy LSN Sconst TIMEOUT Iconst
+				{
+					WaitStmt *n = makeNode(WaitStmt);
+					n->kind = WAIT_FOR_MIX;
+					n->lsn = $5;
+					n->delay = $7;
+					n->time = 0;
+					$$ = (Node *)n;
+				}
+			| WAIT FOR  wait_strategy LSN Sconst UNTIL ConstDatetime Sconst
+				{
+					WaitStmt *n = makeNode(WaitStmt);
+					n->kind = WAIT_FOR_MIX;
+					n->lsn = $5;
+					n->delay = 0;
+					n->time = makeStringConstCast($8, @8, $7);
+					$$ = (Node *)n;
+				}
+			| WAIT FOR wait_strategy ConstDatetime Sconst
+				{
+					WaitStmt *n = makeNode(WaitStmt);
+					n->kind = WAIT_FOR_TIME;
+					n->lsn = NULL;
+					n->delay = 0;
+					n->time = makeStringConstCast($5, @5, $4);
+					$$ = (Node *)n;
+				}
+		;
+
+wait_strategy:
+			ALL					{ $$ = WAIT_FOR_ALL; }
+			| ANY				{ $$ = WAIT_FOR_ANY; }
+			| SOME				{ $$ = WAIT_FOR_ANY; }
+			| /* EMPTY */		{ $$ = WAIT_FOR_ANY; }
+		;
+/*
+WaitEvent:
+			LSN Sconst WaitOption
+				{
+					TypeName *t = makeTypeNameFromNameList("pg_lsn");
+					t->location = @1;
+					$$ = makeStringConstCast($2, @2, t);
+				}
+			| WaitOption
+				{
+					$$ = $1;
+				}
+		;
+
+WaitOption:
+			ConstDatetime Sconst
+				{
+				$$ = makeStringConstCast($2, @2, $1);
+				}
+			|
+				{ $$ - NIL; }
+		;
+*/
 
 /*
  * Aggregate decoration clauses
@@ -15153,6 +15237,7 @@ unreserved_keyword:
 			| LOCK_P
 			| LOCKED
 			| LOGGED
+			| LSN
 			| MAPPING
 			| MATCH
 			| MATERIALIZED
@@ -15275,6 +15360,7 @@ unreserved_keyword:
 			| TEMPORARY
 			| TEXT_P
 			| TIES
+			| TIMEOUT
 			| TRANSACTION
 			| TRANSFORM
 			| TRIGGER
@@ -15300,6 +15386,7 @@ unreserved_keyword:
 			| VIEW
 			| VIEWS
 			| VOLATILE
+			| WAIT
 			| WHITESPACE_P
 			| WITHIN
 			| WITHOUT
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index d7d733530ff..05876634d22 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -22,6 +22,7 @@
 #include "access/subtrans.h"
 #include "access/twophase.h"
 #include "commands/async.h"
+#include "commands/wait.h"
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "postmaster/autovacuum.h"
@@ -147,6 +148,7 @@ CreateSharedMemoryAndSemaphores(int port)
 		size = add_size(size, BTreeShmemSize());
 		size = add_size(size, SyncScanShmemSize());
 		size = add_size(size, AsyncShmemSize());
+		size = add_size(size, WaitShmemSize());
 #ifdef EXEC_BACKEND
 		size = add_size(size, ShmemBackendArraySize());
 #endif
@@ -264,6 +266,11 @@ CreateSharedMemoryAndSemaphores(int port)
 	SyncScanShmemInit();
 	AsyncShmemInit();
 
+	/*
+	 * Init array of Latches in SHMEM for Wait
+	 */
+	WaitShmemInit();
+
 #ifdef EXEC_BACKEND
 
 	/*
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 05ec7f3ac61..a1e07c2c8f0 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -57,6 +57,7 @@
 #include "commands/user.h"
 #include "commands/vacuum.h"
 #include "commands/view.h"
+#include "commands/wait.h"
 #include "miscadmin.h"
 #include "parser/parse_utilcmd.h"
 #include "postmaster/bgwriter.h"
@@ -70,6 +71,7 @@
 #include "utils/lsyscache.h"
 #include "utils/syscache.h"
 #include "utils/rel.h"
+#include "executor/spi.h"
 
 
 /* Hook for plugins to get control in ProcessUtility() */
@@ -922,6 +924,61 @@ standard_ProcessUtility(PlannedStmt *pstmt,
 				break;
 			}
 
+		case T_WaitStmt:
+			{
+				WaitStmt *stmt = (WaitStmt *) parsetree;
+				float8 time_val;
+
+				if (stmt->time)
+				{
+					Const *time = (Const *) stmt->time;
+					int ret;
+
+					Oid		types[] = { time->consttype };
+					Datum	values[] = { time->constvalue };
+					char	nulls[] = { " " };
+
+					Datum result;
+					bool isnull;
+
+					SPI_connect();
+
+					if (time->consttype == 1083)
+						ret = SPI_execute_with_args("select extract (epoch from ($1 - now()::time))",
+													1, types, values, nulls, true, 0);
+					else if (time->consttype == 1266)
+						ret = SPI_execute_with_args("select extract (epoch from (timezone('UTC',$1)::time - timezone('UTC', now()::timetz)::time))",
+													1, types, values, nulls, true, 0);
+					else
+						ret = SPI_execute_with_args("select extract (epoch from ($1 - now()))",
+													1, types, values, nulls, true, 0);
+
+					Assert(ret >= 0);
+					result = SPI_getbinval(SPI_tuptable->vals[0],
+										   SPI_tuptable->tupdesc,
+										   1, &isnull);
+
+					Assert(!isnull);
+					time_val = DatumGetFloat8(result);
+
+					elog(INFO, "time: %f", time_val);
+
+					SPI_finish();
+				}
+
+				if (time_val <= 0)
+					time_val = 0;
+
+				if (!stmt->delay)
+					stmt->delay = (int)(time_val * 1000);
+
+				if (stmt->kind == WAIT_FOR_TIME)
+					WaitTimeUtility(time_val * 1000);
+				else
+					WaitUtility(stmt->lsn, stmt->delay, dest);
+			}
+			break;
+
 		default:
 			/* All other statement types have event trigger support */
 			ProcessUtilitySlow(pstate, pstmt, queryString,
@@ -2567,6 +2624,10 @@ CreateCommandTag(Node *parsetree)
 			tag = "NOTIFY";
 			break;
 
+		case T_WaitStmt:
+			tag = "WAIT";
+			break;
+
 		case T_ListenStmt:
 			tag = "LISTEN";
 			break;
@@ -3194,6 +3255,10 @@ GetCommandLogLevel(Node *parsetree)
 			lev = LOGSTMT_ALL;
 			break;
 
+		case T_WaitStmt:
+			lev = LOGSTMT_ALL;
+			break;
+
 		case T_ListenStmt:
 			lev = LOGSTMT_ALL;
 			break;
diff --git a/src/include/commands/wait.h b/src/include/commands/wait.h
new file mode 100644
index 00000000000..815aaa2f92f
--- /dev/null
+++ b/src/include/commands/wait.h
@@ -0,0 +1,25 @@
+/*-------------------------------------------------------------------------
+ *
+ * wait.h
+ *	  wait notification: wait
+ *
+ * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
+ * Portions Copyright (c) 2016, Regents of PostgresPRO
+ *
+ * src/include/commands/wait.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef WAIT_H
+#define WAIT_H
+#include "postgres.h"
+#include "tcop/dest.h"
+
+extern void WaitUtility(const char *lsn, const int delay, DestReceiver *dest);
+extern void WaitTimeUtility(const int delay);
+extern Size WaitShmemSize(void);
+extern void WaitShmemInit(void);
+extern void WaitSetLatch(XLogRecPtr cur_lsn);
+extern XLogRecPtr GetMinWait(void);
+
+#endif   /* WAIT_H */
diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h
index 4e2fb39105b..ef94eaf50bf 100644
--- a/src/include/nodes/nodes.h
+++ b/src/include/nodes/nodes.h
@@ -486,6 +486,7 @@ typedef enum NodeTag
 	T_DropReplicationSlotCmd,
 	T_StartReplicationCmd,
 	T_TimeLineHistoryCmd,
+	T_WaitStmt,
 	T_SQLCmd,
 
 	/*
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index d6b943c898c..4c58bf3737a 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3521,4 +3521,32 @@ typedef struct DropSubscriptionStmt
 	DropBehavior behavior;		/* RESTRICT or CASCADE behavior */
 } DropSubscriptionStmt;
 
+/* ----------------------
+ *		Wait Statement
+ * ----------------------
+ */
+
+typedef enum WaitForType
+{
+	WAIT_FOR_LSN = 0,
+	WAIT_FOR_TIME,
+	WAIT_FOR_MIX
+} WaitForType;
+
+typedef enum WaitForStrategy
+{
+	WAIT_FOR_ANY = 0,
+	WAIT_FOR_ALL
+} WaitForStrategy;
+
+typedef struct WaitStmt
+{
+	NodeTag			type;
+	WaitForType		kind;
+	WaitForStrategy	strategy;
+	char		   *lsn;		/* Taraget LSN to wait for */
+	int				delay;		/* Delay to wait for LSN */
+	Node		   *time;		/* Wait for timestamp */
+} WaitStmt;
+
 #endif							/* PARSENODES_H */
diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h
index 00ace8425e2..51d01e10f56 100644
--- a/src/include/parser/kwlist.h
+++ b/src/include/parser/kwlist.h
@@ -242,6 +242,7 @@ PG_KEYWORD("location", LOCATION, UNRESERVED_KEYWORD)
 PG_KEYWORD("lock", LOCK_P, UNRESERVED_KEYWORD)
 PG_KEYWORD("locked", LOCKED, UNRESERVED_KEYWORD)
 PG_KEYWORD("logged", LOGGED, UNRESERVED_KEYWORD)
+PG_KEYWORD("lsn", LSN, UNRESERVED_KEYWORD)
 PG_KEYWORD("mapping", MAPPING, UNRESERVED_KEYWORD)
 PG_KEYWORD("match", MATCH, UNRESERVED_KEYWORD)
 PG_KEYWORD("materialized", MATERIALIZED, UNRESERVED_KEYWORD)
@@ -403,6 +404,7 @@ PG_KEYWORD("text", TEXT_P, UNRESERVED_KEYWORD)
 PG_KEYWORD("then", THEN, RESERVED_KEYWORD)
 PG_KEYWORD("ties", TIES, UNRESERVED_KEYWORD)
 PG_KEYWORD("time", TIME, COL_NAME_KEYWORD)
+PG_KEYWORD("timeout", TIMEOUT, UNRESERVED_KEYWORD)
 PG_KEYWORD("timestamp", TIMESTAMP, COL_NAME_KEYWORD)
 PG_KEYWORD("to", TO, RESERVED_KEYWORD)
 PG_KEYWORD("trailing", TRAILING, RESERVED_KEYWORD)
@@ -442,6 +444,7 @@ PG_KEYWORD("version", VERSION_P, UNRESERVED_KEYWORD)
 PG_KEYWORD("view", VIEW, UNRESERVED_KEYWORD)
 PG_KEYWORD("views", VIEWS, UNRESERVED_KEYWORD)
 PG_KEYWORD("volatile", VOLATILE, UNRESERVED_KEYWORD)
+PG_KEYWORD("wait", WAIT, UNRESERVED_KEYWORD)
 PG_KEYWORD("when", WHEN, RESERVED_KEYWORD)
 PG_KEYWORD("where", WHERE, RESERVED_KEYWORD)
 PG_KEYWORD("whitespace", WHITESPACE_P, UNRESERVED_KEYWORD)
diff --git a/src/test/recovery/t/018_waitfor.pl b/src/test/recovery/t/018_waitfor.pl
new file mode 100644
index 00000000000..6817431e9c8
--- /dev/null
+++ b/src/test/recovery/t/018_waitfor.pl
@@ -0,0 +1,64 @@
+# Checks WAIT FOR
+use strict;
+use warnings;
+
+use PostgresNode;
+use TestLib;
+use Test::More tests => 1;
+
+# Initialize master node
+my $node_master = get_new_node('master');
+$node_master->init(allows_streaming => 1);
+$node_master->start;
+
+# And some content
+$node_master->safe_psql('postgres',
+	"CREATE TABLE tab_int AS SELECT generate_series(1, 10) AS a");
+
+# Take backup
+my $backup_name = 'my_backup';
+$node_master->backup($backup_name);
+
+# Create streaming standby from backup
+my $node_standby = get_new_node('standby');
+my $delay        = 4;
+$node_standby->init_from_backup($node_master, $backup_name,
+	has_streaming => 1);
+$node_standby->append_conf(
+	'postgresql.conf', qq(
+recovery_min_apply_delay = '${delay}s'
+));
+$node_standby->start;
+
+# Make new content on master and check its presence in standby depending
+# on the delay applied above. Before doing the insertion, get the
+# current timestamp that will be used as a comparison base. Even on slow
+# machines, this allows to have a predictable behavior when comparing the
+# delay between data insertion moment on master and replay time on standby.
+my $master_insert_time = time();
+$node_master->safe_psql('postgres',
+	"INSERT INTO tab_int VALUES (generate_series(11, 20))");
+
+# Now wait for replay to complete on standby. We're done waiting when the
+# standby has replayed up to the previously saved master LSN.
+my $until_lsn =
+  $node_master->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+
+$node_master->safe_psql('postgres',
+	"INSERT INTO tab_int VALUES (generate_series(21, 30))");
+
+# Check that waitlsn is able to setup infinite waiting loop and exit
+# it without timeouts.
+$node_standby->safe_psql('postgres',
+    "WAIT FOR LSN '$until_lsn'", 't')
+  or die "standby never caught up";
+
+# Check that waitlsn can return result immediately with NOWAIT.
+$node_standby->poll_query_until('postgres',
+    "WAIT FOR LSN '$until_lsn' TIMEOUT 1", 't')
+  or die "standby never caught up";
+
+# This test is successful if and only if the LSN has been applied with at least
+# the configured apply delay.
+my $time_waited = time() - $master_insert_time;
+ok($time_waited >= $delay,"standby applies WAL only after replication delay");

Reply via email to