On 2020-04-07 12:58, Amit Kapila wrote:

Review comments:
1.
+static void
+DeleteEvent(void)
I don't see how this is implemented or called to handle any errors.

2.
+ if (InterruptPending)
+ {
+ DeleteEvent();
+ ProcessInterrupts();
+ }
We generally do this type of handling via CHECK_FOR_INTERRUPTS.

3.
+int
+WaitUtility(XLogRecPtr target_lsn, const float8 secs)
Isn't it better to have a return value as bool?

4.
+#define GetNowFloat() ((float8) GetCurrentTimestamp() / 1000000.0)
This same define is used elsewhere in the code as well, may be we can
define it in some central place and use it.

Thank you for your review!
Ivan and I have worked on the patch and tried to address your comments:

0. Now we wake up at least every 100ms to check for interrupts.
1. Now we call DeleteWaitedLSN() from ProcessInterrupts()=>LockErrorCleanup(). It seems that we can only exit the WAIT cycle improperly due to interrupts, so this should be enough (?)
2. Now we use CHECK_FOR_INTERRUPTS() instead of ProcessInterrupts()
3. Now WaitUtility() returns bool rather than int
4. Now GetNowFloat() is only defined at one place in the code

What we changed additionally:
- Prohibited using WAIT FOR LSN on master
- Added more tests
- Checked the code with pgindent and adjusted pgindent/typedefs.list
- Changed min_lsn's type to pg_atomic_uint64 + fixed how we work with mutex - Code cleanup in wait.[c|h]: cleaned up #include-s, gave better names to functions, changed elog() to ereport()

--
Anna Akenteva
Postgres Professional:
The Russian Postgres Company
http://www.postgrespro.com
diff --git a/doc/src/sgml/ref/begin.sgml b/doc/src/sgml/ref/begin.sgml
index c23bbfb4e71..19a33d7d8fb 100644
--- a/doc/src/sgml/ref/begin.sgml
+++ b/doc/src/sgml/ref/begin.sgml
@@ -21,7 +21,7 @@ PostgreSQL documentation
 
  <refsynopsisdiv>
 <synopsis>
-BEGIN [ WORK | TRANSACTION ] [ <replaceable class="parameter">transaction_mode</replaceable> [, ...] ]
+BEGIN [ WORK | TRANSACTION ] [ <replaceable class="parameter">transaction_mode</replaceable> [, ...] ] [ WAIT FOR LSN <replaceable class="parameter">lsn_value</replaceable> [TIMEOUT <replaceable class="parameter">number_of_milliseconds</replaceable> ] ]
 
 <phrase>where <replaceable class="parameter">transaction_mode</replaceable> is one of:</phrase>
 
@@ -63,6 +63,17 @@ BEGIN [ WORK | TRANSACTION ] [ <replaceable class="parameter">transaction_mode</
    <xref linkend="sql-set-transaction"/>
    was executed.
   </para>
+
+  <para>
+   The <literal>WAIT FOR</literal> clause allows to wait for the target log
+   sequence number (<acronym>LSN</acronym>) to be replayed on standby before
+   starting the transaction in <productname>PostgreSQL</productname> databases
+   with master-standby asynchronous replication. Wait time can be limited by
+   specifying a timeout, which is measured in milliseconds and must be a positive
+   integer. If <acronym>LSN</acronym> was not reached before timeout, transaction
+   doesn't begin. Waiting can be interrupted using <literal>Ctrl+C</literal>, or
+   by shutting down the <literal>postgres</literal> server.
+  </para>
  </refsect1>
 
  <refsect1>
@@ -146,6 +157,10 @@ BEGIN;
    different purpose in embedded SQL. You are advised to be careful
    about the transaction semantics when porting database applications.
   </para>
+
+  <para>
+   There is no <command>WAIT FOR</command> clause in the SQL standard.
+  </para>
  </refsect1>
 
  <refsect1>
diff --git a/doc/src/sgml/ref/start_transaction.sgml b/doc/src/sgml/ref/start_transaction.sgml
index d6cd1d41779..c9f70d2709a 100644
--- a/doc/src/sgml/ref/start_transaction.sgml
+++ b/doc/src/sgml/ref/start_transaction.sgml
@@ -21,7 +21,7 @@ PostgreSQL documentation
 
  <refsynopsisdiv>
 <synopsis>
-START TRANSACTION [ <replaceable class="parameter">transaction_mode</replaceable> [, ...] ]
+START TRANSACTION [ <replaceable class="parameter">transaction_mode</replaceable> [, ...] ] [ WAIT FOR LSN <replaceable class="parameter">lsn_value</replaceable> [TIMEOUT <replaceable class="parameter">number_of_milliseconds</replaceable> ] ]
 
 <phrase>where <replaceable class="parameter">transaction_mode</replaceable> is one of:</phrase>
 
@@ -40,6 +40,17 @@ START TRANSACTION [ <replaceable class="parameter">transaction_mode</replaceable
    characteristics, as if <xref linkend="sql-set-transaction"/> was executed. This is the same
    as the <xref linkend="sql-begin"/> command.
   </para>
+
+  <para>
+   The <literal>WAIT FOR</literal> clause allows to wait for the target log
+   sequence number (<acronym>LSN</acronym>) to be replayed on standby before
+   starting the transaction in <productname>PostgreSQL</productname> databases
+   with master-standby asynchronous replication. Wait time can be limited by
+   specifying a timeout, which is measured in milliseconds and must be a positive
+   integer. If <acronym>LSN</acronym> was not reached before timeout, transaction
+   doesn't begin. Waiting can be interrupted using <literal>Ctrl+C</literal>, or
+   by shutting down the <literal>postgres</literal> server.
+  </para>
  </refsect1>
 
  <refsect1>
@@ -78,6 +89,10 @@ START TRANSACTION [ <replaceable class="parameter">transaction_mode</replaceable
    omitted.
   </para>
 
+  <para>
+   There is no <command>WAIT FOR</command> clause in the SQL standard.
+  </para>
+
   <para>
    See also the compatibility section of <xref linkend="sql-set-transaction"/>.
   </para>
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index abf954ba392..4c7eb0cb219 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -42,6 +42,7 @@
 #include "catalog/pg_database.h"
 #include "commands/progress.h"
 #include "commands/tablespace.h"
+#include "commands/wait.h"
 #include "common/controldata_utils.h"
 #include "executor/instrument.h"
 #include "miscadmin.h"
@@ -7332,6 +7333,15 @@ StartupXLOG(void)
 					break;
 				}
 
+				/*
+				 * If we replayed an LSN that someone was waiting for,
+				 * set latches in shared memory array to notify the waiter.
+				 */
+				if (XLogCtl->lastReplayedEndRecPtr >= GetMinWaitedLSN())
+				{
+					WaitSetLatch(XLogCtl->lastReplayedEndRecPtr);
+				}
+
 				/* Else, try to fetch the next WAL record */
 				record = ReadRecord(xlogreader, LOG, false);
 			} while (record != NULL);
diff --git a/src/backend/commands/Makefile b/src/backend/commands/Makefile
index d4815d3ce65..9b310926c12 100644
--- a/src/backend/commands/Makefile
+++ b/src/backend/commands/Makefile
@@ -57,6 +57,7 @@ OBJS = \
 	user.o \
 	vacuum.o \
 	variable.o \
-	view.o
+	view.o \
+	wait.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/commands/wait.c b/src/backend/commands/wait.c
new file mode 100644
index 00000000000..e1123df321e
--- /dev/null
+++ b/src/backend/commands/wait.c
@@ -0,0 +1,282 @@
+/*-------------------------------------------------------------------------
+ *
+ * wait.c
+ *	  Implements WAIT FOR, which allows waiting for events such as
+ *	  LSN having been replayed on replica.
+ *
+ * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
+ * Portions Copyright (c) 2020, Regents of PostgresPro
+ *
+ * IDENTIFICATION
+ *	  src/backend/commands/wait.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include <math.h>
+
+#include "access/xlog.h"
+#include "access/xlogdefs.h"
+#include "commands/wait.h"
+#include "funcapi.h"
+#include "miscadmin.h"
+#include "pgstat.h"
+#include "storage/backendid.h"
+#include "storage/pmsignal.h"
+#include "storage/proc.h"
+#include "storage/shmem.h"
+#include "storage/sinvaladt.h"
+#include "storage/spin.h"
+#include "utils/builtins.h"
+#include "utils/pg_lsn.h"
+#include "utils/timestamp.h"
+
+/* Add to shared memory array */
+static void AddWaitedLSN(XLogRecPtr lsn_to_wait);
+
+/* Shared memory structure */
+typedef struct
+{
+	int			backend_maxid;
+	pg_atomic_uint64 min_lsn; /* XLogRecPtr of minimal waited for LSN */
+	slock_t		mutex;
+	/* LSNs that different backends are waiting */
+	XLogRecPtr	lsn[FLEXIBLE_ARRAY_MEMBER];
+} WaitState;
+
+static WaitState *state;
+
+/*
+ * Add the wait event of the current backend to shared memory array
+ */
+static void
+AddWaitedLSN(XLogRecPtr lsn_to_wait)
+{
+	SpinLockAcquire(&state->mutex);
+	if (state->backend_maxid < MyBackendId)
+		state->backend_maxid = MyBackendId;
+
+	state->lsn[MyBackendId] = lsn_to_wait;
+
+	if (lsn_to_wait < state->min_lsn.value)
+		state->min_lsn.value = lsn_to_wait;
+	SpinLockRelease(&state->mutex);
+}
+
+/*
+ * Delete wait event of the current backend from the shared memory array.
+ */
+void
+DeleteWaitedLSN(void)
+{
+	int			i;
+	XLogRecPtr	lsn_to_delete;
+
+	SpinLockAcquire(&state->mutex);
+
+	lsn_to_delete = state->lsn[MyBackendId];
+	state->lsn[MyBackendId] = InvalidXLogRecPtr;
+
+	/* If we are deleting the minimal LSN, then choose the next min_lsn */
+	if (lsn_to_delete != InvalidXLogRecPtr &&
+		lsn_to_delete == state->min_lsn.value)
+	{
+		state->min_lsn.value = PG_UINT64_MAX;
+		for (i = 2; i <= state->backend_maxid; i++)
+			if (state->lsn[i] != InvalidXLogRecPtr &&
+				state->lsn[i] < state->min_lsn.value)
+				state->min_lsn.value = state->lsn[i];
+	}
+
+	/* If deleting from the end of the array, shorten the array's used part */
+	if (state->backend_maxid == MyBackendId)
+		for (i = (MyBackendId); i >= 2; i--)
+			if (state->lsn[i] != InvalidXLogRecPtr)
+			{
+				state->backend_maxid = i;
+				break;
+			}
+
+	SpinLockRelease(&state->mutex);
+}
+
+/*
+ * Report amount of shared memory space needed for WaitState
+ */
+Size
+WaitShmemSize(void)
+{
+	Size		size;
+
+	size = offsetof(WaitState, lsn);
+	size = add_size(size, mul_size(MaxBackends + 1, sizeof(XLogRecPtr)));
+	return size;
+}
+
+/*
+ * Initialize an array of events to wait for in shared memory
+ */
+void
+WaitShmemInit(void)
+{
+	bool		found;
+	uint32		i;
+
+	state = (WaitState *) ShmemInitStruct("pg_wait_lsn",
+										  WaitShmemSize(),
+										  &found);
+	if (!found)
+	{
+		SpinLockInit(&state->mutex);
+
+		for (i = 0; i < (MaxBackends + 1); i++)
+			state->lsn[i] = InvalidXLogRecPtr;
+
+		state->backend_maxid = 0;
+		state->min_lsn.value = PG_UINT64_MAX;
+	}
+}
+
+/*
+ * Set latches in shared memory to signal that new LSN has been replayed
+ */
+void
+WaitSetLatch(XLogRecPtr cur_lsn)
+{
+	uint32		i;
+	int			backend_maxid;
+	PGPROC	   *backend;
+
+	SpinLockAcquire(&state->mutex);
+	backend_maxid = state->backend_maxid;
+
+	for (i = 2; i <= backend_maxid; i++)
+	{
+		backend = BackendIdGetProc(i);
+
+		if (backend && state->lsn[i] != 0 &&
+			state->lsn[i] <= cur_lsn)
+		{
+			SetLatch(&backend->procLatch);
+		}
+	}
+	SpinLockRelease(&state->mutex);
+}
+
+/*
+ * Get minimal LSN that someone waits for
+ */
+XLogRecPtr
+GetMinWaitedLSN(void)
+{
+	return state->min_lsn.value;
+}
+
+/*
+ * On WAIT use a latch to wait till LSN is replayed,
+ * postmaster dies or timeout happens. Timeout is specified in milliseconds.
+ * Returns true if LSN was reached and false otherwise.
+ */
+bool
+WaitUtility(XLogRecPtr target_lsn, const int timeout_ms)
+{
+	XLogRecPtr	cur_lsn = GetXLogReplayRecPtr(NULL);
+	int			latch_events;
+	float8		endtime;
+	bool		res = false;
+	bool		wait_forever = (timeout_ms <= 0);
+
+	endtime = GetNowFloat() + timeout_ms / 1000.0;
+
+	latch_events = WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH;
+
+	/* Check if we already reached the needed LSN */
+	if (cur_lsn >= target_lsn)
+		return true;
+
+	AddWaitedLSN(target_lsn);
+
+	for (;;)
+	{
+		int			rc;
+		float8		time_left = 0;
+		long		time_left_ms = 0;
+
+		time_left = endtime - GetNowFloat();
+
+		/* Use 100 ms as the default timeout to check for interrupts */
+		if (wait_forever || time_left < 0 || time_left > 0.1)
+			time_left_ms = 100;
+		else
+			time_left_ms = (long) ceil(time_left * 1000.0);
+
+		/* If interrupt, LockErrorCleanup() will do DeleteWaitedLSN() for us */
+		CHECK_FOR_INTERRUPTS();
+
+		/* If postmaster dies, finish immediately */
+		if (!PostmasterIsAlive())
+			break;
+
+		rc = WaitLatch(MyLatch, latch_events, time_left_ms,
+					   WAIT_EVENT_CLIENT_READ);
+
+		ResetLatch(MyLatch);
+
+		if (rc & WL_LATCH_SET)
+			cur_lsn = GetXLogReplayRecPtr(NULL);
+
+		if (rc & WL_TIMEOUT)
+		{
+			cur_lsn = GetXLogReplayRecPtr(NULL);
+			/* If the time specified by user has passed, stop waiting */
+			time_left = endtime - GetNowFloat();
+			if (!wait_forever && time_left <= 0.0)
+				break;
+		}
+
+		/* If LSN has been replayed */
+		if (target_lsn <= cur_lsn)
+			break;
+	}
+
+	DeleteWaitedLSN();
+
+	if (cur_lsn < target_lsn)
+		ereport(WARNING,
+				(errcode(ERRCODE_NO_ACTIVE_SQL_TRANSACTION),
+				 errmsg("didn't start transaction because LSN was not reached"),
+				 errhint("Try to increase wait time.")));
+	else
+		res = true;
+
+	return res;
+}
+
+/*
+ * Implementation of WAIT FOR
+ */
+int
+WaitMain(WaitClause *stmt, DestReceiver *dest)
+{
+	TupleDesc	tupdesc;
+	TupOutputState *tstate;
+	XLogRecPtr	target_lsn;
+	bool		res = false;
+
+	target_lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in,
+												 CStringGetDatum(stmt->lsn)));
+	res = WaitUtility(target_lsn, stmt->timeout);
+
+	/* Need a tuple descriptor representing a single TEXT column */
+	tupdesc = CreateTemplateTupleDesc(1);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 1, "LSN reached", TEXTOID, -1, 0);
+
+	/* Prepare for projection of tuples */
+	tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsMinimalTuple);
+
+	/* Send the result */
+	do_text_output_oneline(tstate, res ? "t" : "f");
+	end_tup_output(tstate);
+	return res;
+}
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index f9d86859ee7..b00c772b71e 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -3741,10 +3741,22 @@ _copyTransactionStmt(const TransactionStmt *from)
 	COPY_STRING_FIELD(savepoint_name);
 	COPY_STRING_FIELD(gid);
 	COPY_SCALAR_FIELD(chain);
+	COPY_NODE_FIELD(wait);
 
 	return newnode;
 }
 
+static WaitClause *
+_copyWaitClause(const WaitClause *from)
+{
+	WaitClause *newnode = makeNode(WaitClause);
+
+	COPY_STRING_FIELD(lsn);
+	COPY_SCALAR_FIELD(timeout);
+
+	return newnode;
+};
+
 static CompositeTypeStmt *
 _copyCompositeTypeStmt(const CompositeTypeStmt *from)
 {
@@ -5332,6 +5344,9 @@ copyObjectImpl(const void *from)
 		case T_TransactionStmt:
 			retval = _copyTransactionStmt(from);
 			break;
+		case T_WaitClause:
+			retval = _copyWaitClause(from);
+			break;
 		case T_CompositeTypeStmt:
 			retval = _copyCompositeTypeStmt(from);
 			break;
diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c
index e8e781834a5..c1b622ea301 100644
--- a/src/backend/nodes/equalfuncs.c
+++ b/src/backend/nodes/equalfuncs.c
@@ -1539,6 +1539,16 @@ _equalTransactionStmt(const TransactionStmt *a, const TransactionStmt *b)
 	COMPARE_STRING_FIELD(savepoint_name);
 	COMPARE_STRING_FIELD(gid);
 	COMPARE_SCALAR_FIELD(chain);
+	COMPARE_NODE_FIELD(wait);
+
+	return true;
+}
+
+static bool
+_equalWaitClause(const WaitClause *a, const WaitClause *b)
+{
+	COMPARE_STRING_FIELD(lsn);
+	COMPARE_SCALAR_FIELD(timeout);
 
 	return true;
 }
@@ -3389,6 +3399,9 @@ equal(const void *a, const void *b)
 		case T_TransactionStmt:
 			retval = _equalTransactionStmt(a, b);
 			break;
+		case T_WaitClause:
+			retval = _equalWaitClause(a, b);
+			break;
 		case T_CompositeTypeStmt:
 			retval = _equalCompositeTypeStmt(a, b);
 			break;
diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c
index 35ed8c0d538..8eb54d4ab05 100644
--- a/src/backend/nodes/outfuncs.c
+++ b/src/backend/nodes/outfuncs.c
@@ -2778,6 +2778,28 @@ _outDefElem(StringInfo str, const DefElem *node)
 	WRITE_LOCATION_FIELD(location);
 }
 
+static void
+_outTransactionStmt(StringInfo str, const TransactionStmt *node)
+{
+	WRITE_NODE_TYPE("TRANSACTIONSTMT");
+
+	WRITE_STRING_FIELD(savepoint_name);
+	WRITE_STRING_FIELD(gid);
+	WRITE_NODE_FIELD(options);
+	WRITE_BOOL_FIELD(chain);
+	WRITE_ENUM_FIELD(kind, TransactionStmtKind);
+	WRITE_NODE_FIELD(wait);
+}
+
+static void
+_outWaitClause(StringInfo str, const WaitClause *node)
+{
+	WRITE_NODE_TYPE("WAITCLAUSE");
+
+	WRITE_STRING_FIELD(lsn);
+	WRITE_UINT_FIELD(timeout);
+}
+
 static void
 _outTableLikeClause(StringInfo str, const TableLikeClause *node)
 {
@@ -4327,6 +4349,12 @@ outNode(StringInfo str, const void *obj)
 			case T_PartitionRangeDatum:
 				_outPartitionRangeDatum(str, obj);
 				break;
+			case T_TransactionStmt:
+				_outTransactionStmt(str, obj);
+				break;
+			case T_WaitClause:
+				_outWaitClause(str, obj);
+				break;
 
 			default:
 
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 3449c26bd11..a2f40285c6e 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -592,6 +592,8 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 %type <partboundspec> PartitionBoundSpec
 %type <list>		hash_partbound
 %type <defelt>		hash_partbound_elem
+%type <ival>		wait_time
+%type <node>		wait_for
 
 /*
  * Non-keyword token types.  These are hard-wired into the "flex" lexer.
@@ -661,7 +663,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 
 	LABEL LANGUAGE LARGE_P LAST_P LATERAL_P
 	LEADING LEAKPROOF LEAST LEFT LEVEL LIKE LIMIT LISTEN LOAD LOCAL
-	LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOCKED LOGGED
+	LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOCKED LOGGED LSN
 
 	MAPPING MATCH MATERIALIZED MAXVALUE METHOD MINUTE_P MINVALUE MODE MONTH_P MOVE
 
@@ -692,7 +694,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 	SUBSCRIPTION SUBSTRING SUPPORT SYMMETRIC SYSID SYSTEM_P
 
 	TABLE TABLES TABLESAMPLE TABLESPACE TEMP TEMPLATE TEMPORARY TEXT_P THEN
-	TIES TIME TIMESTAMP TO TRAILING TRANSACTION TRANSFORM
+	TIES TIME TIMEOUT TIMESTAMP TO TRAILING TRANSACTION TRANSFORM
 	TREAT TRIGGER TRIM TRUE_P
 	TRUNCATE TRUSTED TYPE_P TYPES_P
 
@@ -702,7 +704,8 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 	VACUUM VALID VALIDATE VALIDATOR VALUE_P VALUES VARCHAR VARIADIC VARYING
 	VERBOSE VERSION_P VIEW VIEWS VOLATILE
 
-	WHEN WHERE WHITESPACE_P WINDOW WITH WITHIN WITHOUT WORK WRAPPER WRITE
+	WAIT WHEN WHERE WHITESPACE_P WINDOW
+	WITH WITHIN WITHOUT WORK WRAPPER WRITE
 
 	XML_P XMLATTRIBUTES XMLCONCAT XMLELEMENT XMLEXISTS XMLFOREST XMLNAMESPACES
 	XMLPARSE XMLPI XMLROOT XMLSERIALIZE XMLTABLE
@@ -9946,18 +9949,20 @@ TransactionStmt:
 					n->chain = $3;
 					$$ = (Node *)n;
 				}
-			| BEGIN_P opt_transaction transaction_mode_list_or_empty
+			| BEGIN_P opt_transaction transaction_mode_list_or_empty wait_for
 				{
 					TransactionStmt *n = makeNode(TransactionStmt);
 					n->kind = TRANS_STMT_BEGIN;
 					n->options = $3;
+					n->wait = $4;
 					$$ = (Node *)n;
 				}
-			| START TRANSACTION transaction_mode_list_or_empty
+			| START TRANSACTION transaction_mode_list_or_empty wait_for
 				{
 					TransactionStmt *n = makeNode(TransactionStmt);
 					n->kind = TRANS_STMT_START;
 					n->options = $3;
+					n->wait = $4;
 					$$ = (Node *)n;
 				}
 			| COMMIT opt_transaction opt_transaction_chain
@@ -14187,6 +14192,25 @@ xml_passing_mech:
 			| BY VALUE_P
 		;
 
+/*
+ * WAIT FOR clause of BEGIN and START TRANSACTION statements
+ */
+wait_for:
+			WAIT FOR LSN Sconst wait_time
+				{
+					WaitClause *n = makeNode(WaitClause);
+					n->lsn = $4;
+					n->timeout = $5;
+					$$ = (Node *)n;
+				}
+			| /* EMPTY */		{ $$ = NULL; }
+		;
+
+wait_time:
+			TIMEOUT Iconst		{ $$ = $2; }
+			| /* EMPTY */		{ $$ = 0; }
+		;
+
 
 /*
  * Aggregate decoration clauses
@@ -15338,6 +15362,7 @@ unreserved_keyword:
 			| LOCK_P
 			| LOCKED
 			| LOGGED
+			| LSN
 			| MAPPING
 			| MATCH
 			| MATERIALIZED
@@ -15465,6 +15490,7 @@ unreserved_keyword:
 			| TEMPORARY
 			| TEXT_P
 			| TIES
+			| TIMEOUT
 			| TRANSACTION
 			| TRANSFORM
 			| TRIGGER
@@ -15491,6 +15517,7 @@ unreserved_keyword:
 			| VIEW
 			| VIEWS
 			| VOLATILE
+			| WAIT
 			| WHITESPACE_P
 			| WITHIN
 			| WITHOUT
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 427b0d59cde..2dcfdde5f3f 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -22,6 +22,7 @@
 #include "access/subtrans.h"
 #include "access/twophase.h"
 #include "commands/async.h"
+#include "commands/wait.h"
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "postmaster/autovacuum.h"
@@ -147,6 +148,7 @@ CreateSharedMemoryAndSemaphores(void)
 		size = add_size(size, BTreeShmemSize());
 		size = add_size(size, SyncScanShmemSize());
 		size = add_size(size, AsyncShmemSize());
+		size = add_size(size, WaitShmemSize());
 #ifdef EXEC_BACKEND
 		size = add_size(size, ShmemBackendArraySize());
 #endif
@@ -264,6 +266,11 @@ CreateSharedMemoryAndSemaphores(void)
 	SyncScanShmemInit();
 	AsyncShmemInit();
 
+	/*
+	 * Init array of events for the WAIT FOR clause in shared memory
+	 */
+	WaitShmemInit();
+
 #ifdef EXEC_BACKEND
 
 	/*
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index 9938cddb570..a7887bd98e2 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -38,6 +38,7 @@
 #include "access/transam.h"
 #include "access/twophase.h"
 #include "access/xact.h"
+#include "commands/wait.h"
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "postmaster/autovacuum.h"
@@ -717,6 +718,9 @@ LockErrorCleanup(void)
 
 	AbortStrongLockAcquire();
 
+	/* If BEGIN WAIT FOR LSN was interrupted, then stop waiting for that LSN */
+	DeleteWaitedLSN();
+
 	/* Nothing to do if we weren't waiting for a lock */
 	if (lockAwaited == NULL)
 	{
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index b1f7f6e2d01..59c041d8507 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -57,6 +57,7 @@
 #include "commands/user.h"
 #include "commands/vacuum.h"
 #include "commands/view.h"
+#include "commands/wait.h"
 #include "miscadmin.h"
 #include "parser/parse_utilcmd.h"
 #include "postmaster/bgwriter.h"
@@ -591,6 +592,18 @@ standard_ProcessUtility(PlannedStmt *pstmt,
 					case TRANS_STMT_START:
 						{
 							ListCell   *lc;
+							WaitClause *waitstmt = (WaitClause *) stmt->wait;
+
+							/* WAIT FOR cannot be used on master */
+							if (stmt->wait && !RecoveryInProgress())
+								ereport(ERROR,
+										(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+										 errmsg("WAIT FOR can only be "
+												"used on standby")));
+
+							/* If needed to WAIT FOR something but failed */
+							if (stmt->wait && WaitMain(waitstmt, dest) == 0)
+								break;
 
 							BeginTransactionBlock();
 							foreach(lc, stmt->options)
diff --git a/src/backend/utils/adt/misc.c b/src/backend/utils/adt/misc.c
index ee340fb0f02..03f997cba70 100644
--- a/src/backend/utils/adt/misc.c
+++ b/src/backend/utils/adt/misc.c
@@ -372,8 +372,6 @@ pg_sleep(PG_FUNCTION_ARGS)
 	 * less than the specified time when WaitLatch is terminated early by a
 	 * non-query-canceling signal such as SIGHUP.
 	 */
-#define GetNowFloat()	((float8) GetCurrentTimestamp() / 1000000.0)
-
 	endtime = GetNowFloat() + secs;
 
 	for (;;)
diff --git a/src/include/commands/wait.h b/src/include/commands/wait.h
new file mode 100644
index 00000000000..d08ee574ed3
--- /dev/null
+++ b/src/include/commands/wait.h
@@ -0,0 +1,27 @@
+/*-------------------------------------------------------------------------
+ *
+ * wait.h
+ *	  prototypes for commands/wait.c
+ *
+ * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
+ * Portions Copyright (c) 2020, Regents of PostgresPro
+ *
+ * src/include/commands/wait.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef WAIT_H
+#define WAIT_H
+#include "postgres.h"
+#include "tcop/dest.h"
+#include "nodes/parsenodes.h"
+
+extern bool WaitUtility(XLogRecPtr lsn, const int timeout_ms);
+extern Size WaitShmemSize(void);
+extern void WaitShmemInit(void);
+extern void WaitSetLatch(XLogRecPtr cur_lsn);
+extern XLogRecPtr GetMinWaitedLSN(void);
+extern int	WaitMain(WaitClause *stmt, DestReceiver *dest);
+extern void DeleteWaitedLSN(void);
+
+#endif							/* WAIT_H */
diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h
index 50b1ba51863..c37663a28bd 100644
--- a/src/include/nodes/nodes.h
+++ b/src/include/nodes/nodes.h
@@ -492,6 +492,7 @@ typedef enum NodeTag
 	T_StartReplicationCmd,
 	T_TimeLineHistoryCmd,
 	T_SQLCmd,
+	T_WaitClause,
 
 	/*
 	 * TAGS FOR RANDOM OTHER STUFF
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index cd6f1be6435..2d0aad8df98 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -1430,6 +1430,17 @@ typedef struct OnConflictClause
 	int			location;		/* token location, or -1 if unknown */
 } OnConflictClause;
 
+/*
+ * WaitClause -
+ *		representation of WAIT FOR clause for BEGIN and START TRANSACTION.
+ */
+typedef struct WaitClause
+{
+	NodeTag		type;
+	char	   *lsn;			/* LSN to wait for */
+	int			timeout;		/* Number of milliseconds to limit wait time */
+} WaitClause;
+
 /*
  * CommonTableExpr -
  *	   representation of WITH list element
@@ -3058,6 +3069,7 @@ typedef struct TransactionStmt
 	char	   *savepoint_name; /* for savepoint commands */
 	char	   *gid;			/* for two-phase-commit related commands */
 	bool		chain;			/* AND CHAIN option */
+	Node	   *wait;			/* WAIT FOR clause */
 } TransactionStmt;
 
 /* ----------------------
diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h
index 08f22ce211d..6e1848fe4cc 100644
--- a/src/include/parser/kwlist.h
+++ b/src/include/parser/kwlist.h
@@ -243,6 +243,7 @@ PG_KEYWORD("location", LOCATION, UNRESERVED_KEYWORD)
 PG_KEYWORD("lock", LOCK_P, UNRESERVED_KEYWORD)
 PG_KEYWORD("locked", LOCKED, UNRESERVED_KEYWORD)
 PG_KEYWORD("logged", LOGGED, UNRESERVED_KEYWORD)
+PG_KEYWORD("lsn", LSN, UNRESERVED_KEYWORD)
 PG_KEYWORD("mapping", MAPPING, UNRESERVED_KEYWORD)
 PG_KEYWORD("match", MATCH, UNRESERVED_KEYWORD)
 PG_KEYWORD("materialized", MATERIALIZED, UNRESERVED_KEYWORD)
@@ -410,6 +411,7 @@ PG_KEYWORD("text", TEXT_P, UNRESERVED_KEYWORD)
 PG_KEYWORD("then", THEN, RESERVED_KEYWORD)
 PG_KEYWORD("ties", TIES, UNRESERVED_KEYWORD)
 PG_KEYWORD("time", TIME, COL_NAME_KEYWORD)
+PG_KEYWORD("timeout", TIMEOUT, UNRESERVED_KEYWORD)
 PG_KEYWORD("timestamp", TIMESTAMP, COL_NAME_KEYWORD)
 PG_KEYWORD("to", TO, RESERVED_KEYWORD)
 PG_KEYWORD("trailing", TRAILING, RESERVED_KEYWORD)
@@ -450,6 +452,7 @@ PG_KEYWORD("version", VERSION_P, UNRESERVED_KEYWORD)
 PG_KEYWORD("view", VIEW, UNRESERVED_KEYWORD)
 PG_KEYWORD("views", VIEWS, UNRESERVED_KEYWORD)
 PG_KEYWORD("volatile", VOLATILE, UNRESERVED_KEYWORD)
+PG_KEYWORD("wait", WAIT, UNRESERVED_KEYWORD)
 PG_KEYWORD("when", WHEN, RESERVED_KEYWORD)
 PG_KEYWORD("where", WHERE, RESERVED_KEYWORD)
 PG_KEYWORD("whitespace", WHITESPACE_P, UNRESERVED_KEYWORD)
diff --git a/src/include/utils/timestamp.h b/src/include/utils/timestamp.h
index 03a1de569f0..eaeeb79c411 100644
--- a/src/include/utils/timestamp.h
+++ b/src/include/utils/timestamp.h
@@ -109,4 +109,6 @@ extern int	date2isoyearday(int year, int mon, int mday);
 
 extern bool TimestampTimestampTzRequiresRewrite(void);
 
+#define GetNowFloat() ((float8) GetCurrentTimestamp() / 1000000.0)
+
 #endif							/* TIMESTAMP_H */
diff --git a/src/test/recovery/t/020_begin_wait.pl b/src/test/recovery/t/020_begin_wait.pl
new file mode 100644
index 00000000000..da4bfb4ef32
--- /dev/null
+++ b/src/test/recovery/t/020_begin_wait.pl
@@ -0,0 +1,85 @@
+# Checks WAIT FOR
+use strict;
+use warnings;
+
+use PostgresNode;
+use TestLib;
+use Test::More tests => 8;
+
+# Initialize master node
+my $node_master = get_new_node('master');
+$node_master->init(allows_streaming => 1);
+$node_master->start;
+
+# And some content and take a backup
+$node_master->safe_psql('postgres',
+	"CREATE TABLE wait_test AS SELECT generate_series(1,10) AS a");
+my $backup_name = 'my_backup';
+$node_master->backup($backup_name);
+
+# Using the backup, create a streaming standby with a 1 second delay
+my $node_standby = get_new_node('standby');
+my $delay        = 1;
+$node_standby->init_from_backup($node_master, $backup_name,
+	has_streaming => 1);
+$node_standby->append_conf('postgresql.conf', qq[
+	recovery_min_apply_delay = '${delay}s'
+]);
+$node_standby->start;
+
+
+# Check that timeouts make us wait for the specified time (1s here)
+my $current_time = $node_standby->safe_psql('postgres', "SELECT now()");
+my $two_seconds = 2000; # in milliseconds
+my $start_time = time();
+$node_standby->safe_psql('postgres',
+	"BEGIN WAIT FOR LSN '0/FFFFFFFF' TIMEOUT $two_seconds");
+my $time_waited = (time() - $start_time) * 1000; # convert to milliseconds
+ok($time_waited >= $two_seconds, "WAIT FOR TIMEOUT waits for enough time");
+
+
+# Check that timeouts let us stop waiting right away, before reaching target LSN
+$node_master->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(11, 20))");
+my $lsn1 = $node_master->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+my ($ret, $out, $err) = $node_standby->psql('postgres',
+	"BEGIN WAIT FOR LSN '$lsn1' TIMEOUT 1");
+
+ok($ret == 0, "zero return value when failed to WAIT FOR LSN on standby");
+ok($err =~ /WARNING:  didn't start transaction because LSN was not reached/,
+	"correct error message when failed to WAIT FOR LSN on standby");
+ok($out eq "f", "if given too little wait time, WAIT doesn't reach target LSN");
+
+
+# Check that WAIT FOR works fine and reaches target LSN if given no timeout
+
+# Add data on master, memorize master's last LSN
+$node_master->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(21, 30))");
+my $lsn2 = $node_master->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+
+# Wait for it to appear on replica, memorize replica's last LSN
+$node_standby->safe_psql('postgres',
+	"BEGIN WAIT FOR LSN '$lsn2'");
+my $reached_lsn = $node_standby->safe_psql('postgres',
+	"SELECT pg_last_wal_replay_lsn()");
+
+# Make sure that master's and replica's LSNs are the same after WAIT
+my $compare_lsns = $node_standby->safe_psql('postgres',
+	"SELECT pg_lsn_cmp('$reached_lsn'::pg_lsn, '$lsn2'::pg_lsn)");
+ok($compare_lsns eq 0,
+	"standby reached the same LSN as master before starting transaction");
+
+
+# Make sure that it's not allowed to use WAIT FOR on master
+($ret, $out, $err) = $node_master->psql('postgres',
+	"BEGIN WAIT FOR LSN '0/FFFFFFFF'");
+
+ok($ret != 0, "non-zero return value when trying to WAIT FOR LSN on master");
+ok($err =~ /ERROR:  WAIT FOR can only be used on standby/,
+	"correct error message when trying to WAIT FOR LSN on master");
+ok($out eq '', "empty output when trying to WAIT FOR LSN on master");
+
+
+$node_standby->stop;
+$node_master->stop;
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 34623523a70..a2d1b9defc2 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2621,6 +2621,7 @@ WSABUF
 WSADATA
 WSANETWORKEVENTS
 WSAPROTOCOL_INFO
+WaitClause
 WaitEvent
 WaitEventActivity
 WaitEventClient
@@ -2629,6 +2630,7 @@ WaitEventIPC
 WaitEventSet
 WaitEventTimeout
 WaitPMResult
+WaitState
 WalCloseMethod
 WalLevel
 WalRcvData

Reply via email to