Sorry, I have some troubles on email sending.
On 2020-03-06 08:54, Kyotaro Horiguchi wrote:
The syntax seems getting confused. What happens if we typed in the
command "WAIT FOR TIMESTAMP '...' UNTIL TIMESTAMP '....'"? It seems
to me the options is useles. Couldn't the TIMEOUT option be a part of
event? I know gram.y doesn't accept that syntax but it is not
apparent from the description above.
I`ll fix the doc file.
Synopsis
==========
WAIT FOR [ANY | SOME | ALL] event [, event ...]
and event is:
LSN value [options]
TIMESTAMP value
and options is:
TIMEOUT delay
UNTIL TIMESTAMP timestamp
As I read through the previous thread, one of the reason for this
feature implemented as a syntax is it was inteded to be combined into
BEGIN statement. If there is not any use case for the feature midst
of a transaction, why don't you turn it into a part of BEGIN command?
It`s seem to have some limitations on hot standbys. I`ll take few days
to make a prototype.
Description
==========
WAIT FOR - make to wait statements (that are beneath) on sleep until
event happens (Don’t process new queries until an event happens).
...
Notice: WAIT FOR will release on PostmasterDeath or Interruption
events
if they come earlier then LSN or timeout.
I think interrupts ought to result in ERROR.
wait.c adds a fair amount of code and uses proc-array based
approach. But Thomas suggested queue-based approach and I also think
it is better. We already have a queue-based mechanism that behaves
almost the same with this feature in the comit code on master-side. It
avoids spurious backend wakeups. Couldn't we extend SyncRepWaitForLSN
or share a part of the code/infrastructures so that this feature can
share the code?
I`ll take a look on.
Thank you for your review.
Rebased patch is attached.
--
Ivan Kartyshov
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company
diff --git a/doc/src/sgml/ref/allfiles.sgml b/doc/src/sgml/ref/allfiles.sgml
index 8d91f3529e..1a2bc7dfa9 100644
--- a/doc/src/sgml/ref/allfiles.sgml
+++ b/doc/src/sgml/ref/allfiles.sgml
@@ -187,6 +187,7 @@ Complete list of usable sgml source files in this directory.
<!ENTITY update SYSTEM "update.sgml">
<!ENTITY vacuum SYSTEM "vacuum.sgml">
<!ENTITY values SYSTEM "values.sgml">
+<!ENTITY waitfor SYSTEM "waitfor.sgml">
<!-- applications and utilities -->
<!ENTITY clusterdb SYSTEM "clusterdb.sgml">
diff --git a/doc/src/sgml/ref/waitfor.sgml b/doc/src/sgml/ref/waitfor.sgml
new file mode 100644
index 0000000000..8fa2ddb449
--- /dev/null
+++ b/doc/src/sgml/ref/waitfor.sgml
@@ -0,0 +1,136 @@
+<!--
+doc/src/sgml/ref/waitlsn.sgml
+PostgreSQL documentation
+-->
+
+<refentry id="sql-waitlsn">
+ <indexterm zone="sql-waitlsn">
+ <primary>WAIT FOR</primary>
+ </indexterm>
+
+ <refmeta>
+ <refentrytitle>WAIT FOR</refentrytitle>
+ <manvolnum>7</manvolnum>
+ <refmiscinfo>SQL - Language Statements</refmiscinfo>
+ </refmeta>
+
+ <refnamediv>
+ <refname>WAIT FOR</refname>
+ <refpurpose>wait for the target <acronym>LSN</acronym> to be replayed</refpurpose>
+ </refnamediv>
+
+ <refsynopsisdiv>
+<synopsis>
+WAIT FOR LSN '<replaceable class="parameter">lsn_number</replaceable>'
+WAIT FOR LSN '<replaceable class="parameter">lsn_number</replaceable>' TIMEOUT <replaceable class="parameter">wait_timeout</replaceable>
+WAIT FOR LSN '<replaceable class="parameter">lsn_number</replaceable>' UNTIL TIMESTAMP <replaceable class="parameter">wait_time</replaceable>
+WAIT FOR TIMESTAMP <replaceable class="parameter">wait_time</replaceable>
+</synopsis>
+ </refsynopsisdiv>
+
+ <refsect1>
+ <title>Description</title>
+
+ <para>
+ <command>WAIT FOR</command> provides a simple
+ interprocess communication mechanism to wait for timestamp or the target log sequence
+ number (<acronym>LSN</acronym>) on standby in <productname>PostgreSQL</productname>
+ databases with master-standby asynchronous replication. When run with the
+ <replaceable>LSN</replaceable> option, the <command>WAIT FOR</command> command
+ waits for the specified <acronym>LSN</acronym> to be replayed. By default, wait
+ time is unlimited. Waiting can be interrupted using <literal>Ctrl+C</literal>, or
+ by shutting down the <literal>postgres</literal> server. You can also limit the wait
+ time using the <option>TIMEOUT</option> option.
+ </para>
+
+ </refsect1>
+
+ <refsect1>
+ <title>Parameters</title>
+
+ <variablelist>
+ <varlistentry>
+ <term><replaceable class="parameter">LSN</replaceable></term>
+ <listitem>
+ <para>
+ Specify the target log sequence number to wait for.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term>TIMEOUT <replaceable class="parameter">wait_timeout</replaceable></term>
+ <listitem>
+ <para>
+ Limit the time interval to wait for the LSN to be replayed.
+ The specified <replaceable>wait_timeout</replaceable> must be an integer
+ and is measured in milliseconds.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term>UNTIL TIMESTAMP <replaceable class="parameter">wait_time</replaceable></term>
+ <listitem>
+ <para>
+ Limit the time to wait for the LSN to be replayed.
+ The specified <replaceable>wait_time</replaceable> must be timestamp.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ </variablelist>
+ </refsect1>
+
+ <refsect1>
+ <title>Examples</title>
+
+ <para>
+ Run <literal>WAIT FOR</literal> from <application>psql</application>,
+ limiting wait time to 10000 milliseconds:
+
+<screen>
+WAIT FOR LSN '0/3F07A6B1' TIMEOUT 10000;
+NOTICE: LSN is not reached. Try to increase wait time.
+LSN reached
+-------------
+ f
+(1 row)
+</screen>
+ </para>
+
+ <para>
+ Wait until the specified <acronym>LSN</acronym> is replayed:
+<screen>
+WAIT FOR LSN '0/3F07A611';
+LSN reached
+-------------
+ t
+(1 row)
+</screen>
+ </para>
+
+ <para>
+ Limit <acronym>LSN</acronym> wait time to 500000 milliseconds, and then cancel the command:
+<screen>
+WAIT FOR LSN '0/3F0FF791' TIMEOUT 500000;
+^CCancel request sent
+NOTICE: LSN is not reached. Try to increase wait time.
+ERROR: canceling statement due to user request
+ LSN reached
+-------------
+ f
+(1 row)
+</screen>
+</para>
+ </refsect1>
+
+ <refsect1>
+ <title>Compatibility</title>
+
+ <para>
+ There is no <command>WAIT FOR</command> statement in the SQL
+ standard.
+ </para>
+ </refsect1>
+</refentry>
diff --git a/doc/src/sgml/reference.sgml b/doc/src/sgml/reference.sgml
index cef09dd38b..bfe62ee351 100644
--- a/doc/src/sgml/reference.sgml
+++ b/doc/src/sgml/reference.sgml
@@ -215,6 +215,7 @@
&update;
&vacuum;
&values;
+ &waitfor;
</reference>
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 4361568882..d1963b1728 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -41,6 +41,7 @@
#include "catalog/pg_database.h"
#include "commands/progress.h"
#include "commands/tablespace.h"
+#include "commands/wait.h"
#include "common/controldata_utils.h"
#include "miscadmin.h"
#include "pg_trace.h"
@@ -7285,6 +7286,15 @@ StartupXLOG(void)
break;
}
+ /*
+ * After update lastReplayedEndRecPtr set Latches in SHMEM array
+ */
+ if (XLogCtl->lastReplayedEndRecPtr >= GetMinWait())
+ {
+
+ WaitSetLatch(XLogCtl->lastReplayedEndRecPtr);
+ }
+
/* Else, try to fetch the next WAL record */
record = ReadRecord(xlogreader, LOG, false);
} while (record != NULL);
diff --git a/src/backend/commands/Makefile b/src/backend/commands/Makefile
index d4815d3ce6..9b310926c1 100644
--- a/src/backend/commands/Makefile
+++ b/src/backend/commands/Makefile
@@ -57,6 +57,7 @@ OBJS = \
user.o \
vacuum.o \
variable.o \
- view.o
+ view.o \
+ wait.o
include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/commands/wait.c b/src/backend/commands/wait.c
new file mode 100644
index 0000000000..bceffeb600
--- /dev/null
+++ b/src/backend/commands/wait.c
@@ -0,0 +1,265 @@
+/*-------------------------------------------------------------------------
+ *
+ * wait.c
+ * wait statment: wait
+ *
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
+ * Portions Copyright (c) 2019, Regents of PostgresPro
+ *
+ * IDENTIFICATION
+ * src/backend/commands/wait.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+/*
+ * -------------------------------------------------------------------------
+ * Wait for LSN been replayed on replica
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+#include "pgstat.h"
+#include "fmgr.h"
+#include "access/transam.h"
+#include "access/xact.h"
+#include "access/xlogdefs.h"
+#include "access/xlog.h"
+#include "catalog/pg_type.h"
+#include "commands/wait.h"
+#include "funcapi.h"
+#include "miscadmin.h"
+#include "storage/backendid.h"
+#include "storage/ipc.h"
+#include "storage/latch.h"
+#include "storage/pmsignal.h"
+#include "storage/proc.h"
+#include "storage/shmem.h"
+#include "storage/spin.h"
+#include "storage/sinvaladt.h"
+#include "utils/builtins.h"
+#include "utils/pg_lsn.h"
+#include "utils/timestamp.h"
+
+/* Add/delete to shmem array */
+static void AddEvent(XLogRecPtr trg_lsn);
+static void DeleteEvent(void);
+
+/* Shared memory structures */
+typedef struct
+{
+ XLogRecPtr trg_lsn;
+ /* Left here struct BIDState for future compatibility */
+} BIDState;
+
+typedef struct
+{
+ int backend_maxid;
+ XLogRecPtr min_lsn;
+ slock_t mutex;
+ BIDState event_arr[FLEXIBLE_ARRAY_MEMBER];
+} GlobState;
+
+static volatile GlobState *state;
+
+/* Add event for current backend to shmem array */
+static void
+AddEvent(XLogRecPtr trg_lsn)
+{
+ SpinLockAcquire(&state->mutex);
+ if (state->backend_maxid < MyBackendId)
+ state->backend_maxid = MyBackendId;
+
+ state->event_arr[MyBackendId].trg_lsn = trg_lsn;
+
+ if (trg_lsn < state->min_lsn)
+ state->min_lsn = trg_lsn;
+ SpinLockRelease(&state->mutex);
+}
+
+/*
+ * Delete event for current backend to shared array.
+ *
+ * TODO: Consider state cleanup on backend failure.
+ */
+static void
+DeleteEvent(void)
+{
+ int i;
+ XLogRecPtr trg_lsn = state->event_arr[MyBackendId].trg_lsn;
+
+ state->event_arr[MyBackendId].trg_lsn = InvalidXLogRecPtr;
+
+ SpinLockAcquire(&state->mutex);
+ /* Update state->min_lsn iff it is nessesary choosing next min_lsn */
+ if (state->min_lsn == trg_lsn)
+ {
+ state->min_lsn = PG_UINT64_MAX;
+ for (i = 2; i <= state->backend_maxid; i++)
+ if (state->event_arr[i].trg_lsn != InvalidXLogRecPtr &&
+ state->event_arr[i].trg_lsn < state->min_lsn)
+ state->min_lsn = state->event_arr[i].trg_lsn;
+ }
+
+ if (state->backend_maxid == MyBackendId)
+ for (i = (MyBackendId); i >=2; i--)
+ if (state->event_arr[i].trg_lsn != InvalidXLogRecPtr)
+ {
+ state->backend_maxid = i;
+ break;
+ }
+
+ SpinLockRelease(&state->mutex);
+}
+
+/* Get size of shared memory for GlobState */
+Size
+WaitShmemSize(void)
+{
+ return offsetof(GlobState, event_arr) + sizeof(BIDState) * (MaxBackends+1);
+}
+
+/* Init array of events in shared memory */
+void
+WaitShmemInit(void)
+{
+ bool found;
+ uint32 i;
+
+ state = (GlobState *) ShmemInitStruct("pg_wait_lsn",
+ WaitShmemSize(),
+ &found);
+ if (!found)
+ {
+ SpinLockInit(&state->mutex);
+
+ for (i = 0; i < (MaxBackends+1); i++)
+ state->event_arr[i].trg_lsn = InvalidXLogRecPtr;
+
+ state->backend_maxid = 0;
+ state->min_lsn = PG_UINT64_MAX;
+ }
+}
+
+/* Set all Latches in shared memory cause new LSN being replayed */
+void
+WaitSetLatch(XLogRecPtr cur_lsn)
+{
+ uint32 i;
+ int backend_maxid;
+ PGPROC *backend;
+
+ SpinLockAcquire(&state->mutex);
+ backend_maxid = state->backend_maxid;
+ SpinLockRelease(&state->mutex);
+
+ for (i = 2; i <= backend_maxid; i++)
+ {
+ backend = BackendIdGetProc(i);
+ if (state->event_arr[i].trg_lsn != 0)
+ {
+ if (state->event_arr[i].trg_lsn <= cur_lsn)
+ SetLatch(&backend->procLatch);
+ }
+ }
+}
+
+/* Get minimal LSN that will be next */
+XLogRecPtr
+GetMinWait(void)
+{
+ return state->min_lsn;
+}
+
+/*
+ * On WAIT own MyLatch and wait till LSN is replayed, Postmaster death interruption
+ * or timeout.
+ */
+void
+WaitUtility(const char *lsn, const int delay, DestReceiver *dest)
+{
+ XLogRecPtr trg_lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in, CStringGetDatum(lsn)));
+ XLogRecPtr cur_lsn;
+ int latch_events;
+ uint64 tdelay = delay;
+ long secs;
+ int microsecs;
+ TimestampTz timer = GetCurrentTimestamp();
+ TupOutputState *tstate;
+ TupleDesc tupdesc;
+ char *value = "f";
+
+ if (delay > 0)
+ latch_events = WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH;
+ else
+ latch_events = WL_LATCH_SET | WL_POSTMASTER_DEATH;
+
+ AddEvent(trg_lsn);
+
+ for (;;)
+ {
+ cur_lsn = GetXLogReplayRecPtr(NULL);
+
+ /* If LSN has been Replayed */
+ if (trg_lsn <= cur_lsn)
+ break;
+
+ /* If the postmaster dies, finish immediately */
+ if (!PostmasterIsAlive())
+ break;
+
+ /* If delay time is over */
+ if (latch_events & WL_TIMEOUT)
+ {
+ if (TimestampDifferenceExceeds(timer,GetCurrentTimestamp(),delay))
+ break;
+ TimestampDifference(timer,GetCurrentTimestamp(),&secs, µsecs);
+ tdelay = delay - (secs*1000 + microsecs/1000);
+ }
+
+ /* Little hack behaviour like SnapshotResetXmin to work outoff snapshot*/
+ MyPgXact->xmin = InvalidTransactionId;
+ WaitLatch(MyLatch, latch_events, tdelay, WAIT_EVENT_CLIENT_READ);
+ ResetLatch(MyLatch);
+
+ /* CHECK_FOR_INTERRUPTS if they come then Delete current event from array */
+ if (InterruptPending)
+ {
+ DeleteEvent();
+ ProcessInterrupts();
+ }
+
+ }
+
+ DeleteEvent();
+
+ if (trg_lsn > cur_lsn)
+ elog(NOTICE,"LSN is not reached. Try to increase wait time.");
+ else
+ value = "t";
+
+ /* Need a tuple descriptor representing a single TEXT column */
+ tupdesc = CreateTemplateTupleDesc(1);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 1, "LSN reached", TEXTOID, -1, 0);
+ /* Prepare for projection of tuples */
+ tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsMinimalTuple);
+
+ /* Send it */
+ do_text_output_oneline(tstate, value);
+ end_tup_output(tstate);
+}
+
+void
+WaitTimeUtility(const int delay)
+{
+ int latch_events;
+
+ if (delay < 0)
+ return ;
+
+ latch_events = WL_TIMEOUT | WL_POSTMASTER_DEATH;
+
+ MyPgXact->xmin = InvalidTransactionId;
+ WaitLatch(MyLatch, latch_events, delay, WAIT_EVENT_CLIENT_READ);
+ ResetLatch(MyLatch);
+}
diff --git a/src/backend/parser/analyze.c b/src/backend/parser/analyze.c
index 6676412842..56d2f15f99 100644
--- a/src/backend/parser/analyze.c
+++ b/src/backend/parser/analyze.c
@@ -78,6 +78,7 @@ static Query *transformCreateTableAsStmt(ParseState *pstate,
CreateTableAsStmt *stmt);
static Query *transformCallStmt(ParseState *pstate,
CallStmt *stmt);
+static Query *transformWaitForStmt(ParseState *pstate, WaitStmt *stmt);
static void transformLockingClause(ParseState *pstate, Query *qry,
LockingClause *lc, bool pushedDown);
#ifdef RAW_EXPRESSION_COVERAGE_TEST
@@ -326,6 +327,9 @@ transformStmt(ParseState *pstate, Node *parseTree)
result = transformCallStmt(pstate,
(CallStmt *) parseTree);
break;
+ case T_WaitStmt:
+ result = transformWaitForStmt(pstate, (WaitStmt *) parseTree);
+ break;
default:
@@ -2981,6 +2985,20 @@ applyLockingClause(Query *qry, Index rtindex,
qry->rowMarks = lappend(qry->rowMarks, rc);
}
+static Query *
+transformWaitForStmt(ParseState *pstate, WaitStmt *stmt)
+{
+ Query *result;
+
+ stmt->time = transformExpr(pstate, stmt->time, EXPR_KIND_OTHER);
+
+ result = makeNode(Query);
+ result->commandType = CMD_UTILITY;
+ result->utilityStmt = (Node *) stmt;
+
+ return result;
+}
+
/*
* Coverage testing for raw_expression_tree_walker().
*
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 96e7fdbcfe..b80195a85f 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -276,7 +276,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
SecLabelStmt SelectStmt TransactionStmt TruncateStmt
UnlistenStmt UpdateStmt VacuumStmt
VariableResetStmt VariableSetStmt VariableShowStmt
- ViewStmt CheckPointStmt CreateConversionStmt
+ ViewStmt WaitStmt CheckPointStmt CreateConversionStmt
DeallocateStmt PrepareStmt ExecuteStmt
DropOwnedStmt ReassignOwnedStmt
AlterTSConfigurationStmt AlterTSDictionaryStmt
@@ -487,7 +487,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
%type <list> row explicit_row implicit_row type_list array_expr_list
%type <node> case_expr case_arg when_clause case_default
%type <list> when_clause_list
-%type <ival> sub_type opt_materialized
+%type <ival> sub_type wait_strategy opt_materialized
%type <value> NumericOnly
%type <list> NumericOnly_list
%type <alias> alias_clause opt_alias_clause
@@ -660,7 +660,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
LABEL LANGUAGE LARGE_P LAST_P LATERAL_P
LEADING LEAKPROOF LEAST LEFT LEVEL LIKE LIMIT LISTEN LOAD LOCAL
- LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOCKED LOGGED
+ LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOCKED LOGGED LSN
MAPPING MATCH MATERIALIZED MAXVALUE METHOD MINUTE_P MINVALUE MODE MONTH_P MOVE
@@ -690,7 +690,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
SUBSCRIPTION SUBSTRING SUPPORT SYMMETRIC SYSID SYSTEM_P
TABLE TABLES TABLESAMPLE TABLESPACE TEMP TEMPLATE TEMPORARY TEXT_P THEN
- TIES TIME TIMESTAMP TO TRAILING TRANSACTION TRANSFORM
+ TIES TIME TIMEOUT TIMESTAMP TO TRAILING TRANSACTION TRANSFORM
TREAT TRIGGER TRIM TRUE_P
TRUNCATE TRUSTED TYPE_P TYPES_P
@@ -700,7 +700,8 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
VACUUM VALID VALIDATE VALIDATOR VALUE_P VALUES VARCHAR VARIADIC VARYING
VERBOSE VERSION_P VIEW VIEWS VOLATILE
- WHEN WHERE WHITESPACE_P WINDOW WITH WITHIN WITHOUT WORK WRAPPER WRITE
+ WAIT WHEN WHERE WHITESPACE_P WINDOW
+ WITH WITHIN WITHOUT WORK WRAPPER WRITE
XML_P XMLATTRIBUTES XMLCONCAT XMLELEMENT XMLEXISTS XMLFOREST XMLNAMESPACES
XMLPARSE XMLPI XMLROOT XMLSERIALIZE XMLTABLE
@@ -953,6 +954,7 @@ stmt :
| VariableSetStmt
| VariableShowStmt
| ViewStmt
+ | WaitStmt
| /*EMPTY*/
{ $$ = NULL; }
;
@@ -14128,6 +14130,88 @@ xml_passing_mech:
| BY VALUE_P
;
+/*****************************************************************************
+ *
+ * QUERY:
+ * WAIT FOR <event> [, <event> ...] [option]
+ * event:
+ * LSN value
+ * TIMEOUT value
+ * TIMESTAMP timestamp
+ * option:
+ * TIMEOUT delay
+ * UNTIL TIMESTAMP timestamp
+ *
+ *****************************************************************************/
+
+WaitStmt:
+ WAIT FOR wait_strategy LSN Sconst
+ {
+ WaitStmt *n = makeNode(WaitStmt);
+ n->kind = WAIT_FOR_LSN;
+ n->lsn = $5;
+ n->delay = 0;
+ n->time = 0;
+ $$ = (Node *)n;
+ }
+ | WAIT FOR wait_strategy LSN Sconst TIMEOUT Iconst
+ {
+ WaitStmt *n = makeNode(WaitStmt);
+ n->kind = WAIT_FOR_MIX;
+ n->lsn = $5;
+ n->delay = $7;
+ n->time = 0;
+ $$ = (Node *)n;
+ }
+ | WAIT FOR wait_strategy LSN Sconst UNTIL ConstDatetime Sconst
+ {
+ WaitStmt *n = makeNode(WaitStmt);
+ n->kind = WAIT_FOR_MIX;
+ n->lsn = $5;
+ n->delay = 0;
+ n->time = makeStringConstCast($8, @8, $7);
+ $$ = (Node *)n;
+ }
+ | WAIT FOR wait_strategy ConstDatetime Sconst
+ {
+ WaitStmt *n = makeNode(WaitStmt);
+ n->kind = WAIT_FOR_TIME;
+ n->lsn = NULL;
+ n->delay = 0;
+ n->time = makeStringConstCast($5, @5, $4);
+ $$ = (Node *)n;
+ }
+ ;
+
+wait_strategy:
+ ALL { $$ = WAIT_FOR_ALL; }
+ | ANY { $$ = WAIT_FOR_ANY; }
+ | SOME { $$ = WAIT_FOR_ANY; }
+ | /* EMPTY */ { $$ = WAIT_FOR_ANY; }
+ ;
+/*
+WaitEvent:
+ LSN Sconst WaitOption
+ {
+ TypeName *t = makeTypeNameFromNameList("pg_lsn");
+ t->location = @1;
+ $$ = makeStringConstCast($2, @2, t);
+ }
+ | WaitOption
+ {
+ $$ = $1;
+ }
+ ;
+
+WaitOption:
+ ConstDatetime Sconst
+ {
+ $$ = makeStringConstCast($2, @2, $1);
+ }
+ |
+ { $$ - NIL; }
+ ;
+*/
/*
* Aggregate decoration clauses
@@ -15272,6 +15356,7 @@ unreserved_keyword:
| LOCK_P
| LOCKED
| LOGGED
+ | LSN
| MAPPING
| MATCH
| MATERIALIZED
@@ -15394,6 +15479,7 @@ unreserved_keyword:
| TEMPORARY
| TEXT_P
| TIES
+ | TIMEOUT
| TRANSACTION
| TRANSFORM
| TRIGGER
@@ -15420,6 +15506,7 @@ unreserved_keyword:
| VIEW
| VIEWS
| VOLATILE
+ | WAIT
| WHITESPACE_P
| WITHIN
| WITHOUT
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 427b0d59cd..8c3d196a9a 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -22,6 +22,7 @@
#include "access/subtrans.h"
#include "access/twophase.h"
#include "commands/async.h"
+#include "commands/wait.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/autovacuum.h"
@@ -147,6 +148,7 @@ CreateSharedMemoryAndSemaphores(void)
size = add_size(size, BTreeShmemSize());
size = add_size(size, SyncScanShmemSize());
size = add_size(size, AsyncShmemSize());
+ size = add_size(size, WaitShmemSize());
#ifdef EXEC_BACKEND
size = add_size(size, ShmemBackendArraySize());
#endif
@@ -264,6 +266,11 @@ CreateSharedMemoryAndSemaphores(void)
SyncScanShmemInit();
AsyncShmemInit();
+ /*
+ * Init array of Latches in SHMEM for Wait
+ */
+ WaitShmemInit();
+
#ifdef EXEC_BACKEND
/*
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 1b460a2612..b8061863d7 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -5,7 +5,7 @@
* commands. At one time acted as an interface between the Lisp and C
* systems.
*
- * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
+d * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
@@ -57,6 +57,7 @@
#include "commands/user.h"
#include "commands/vacuum.h"
#include "commands/view.h"
+#include "commands/wait.h"
#include "miscadmin.h"
#include "parser/parse_utilcmd.h"
#include "postmaster/bgwriter.h"
@@ -70,6 +71,7 @@
#include "utils/lsyscache.h"
#include "utils/rel.h"
#include "utils/syscache.h"
+#include "executor/spi.h"
/* Hook for plugins to get control in ProcessUtility() */
ProcessUtility_hook_type ProcessUtility_hook = NULL;
@@ -267,6 +269,7 @@ ClassifyUtilityCommandAsReadOnly(Node *parsetree)
case T_LoadStmt:
case T_PrepareStmt:
case T_UnlistenStmt:
+ case T_WaitStmt:
case T_VariableSetStmt:
{
/*
@@ -1061,6 +1064,61 @@ standard_ProcessUtility(PlannedStmt *pstmt,
break;
}
+ case T_WaitStmt:
+ {
+ WaitStmt *stmt = (WaitStmt *) parsetree;
+ float8 time_val;
+
+ if (stmt->time)
+ {
+ Const *time = (Const *) stmt->time;
+ int ret;
+
+ Oid types[] = { time->consttype };
+ Datum values[] = { time->constvalue };
+ char nulls[] = { " " };
+
+ Datum result;
+ bool isnull;
+
+ SPI_connect();
+
+ if (time->consttype == 1083)
+ ret = SPI_execute_with_args("select extract (epoch from ($1 - now()::time))",
+ 1, types, values, nulls, true, 0);
+ else if (time->consttype == 1266)
+ ret = SPI_execute_with_args("select extract (epoch from (timezone('UTC',$1)::time - timezone('UTC', now()::timetz)::time))",
+ 1, types, values, nulls, true, 0);
+ else
+ ret = SPI_execute_with_args("select extract (epoch from ($1 - now()))",
+ 1, types, values, nulls, true, 0);
+
+ Assert(ret >= 0);
+ result = SPI_getbinval(SPI_tuptable->vals[0],
+ SPI_tuptable->tupdesc,
+ 1, &isnull);
+
+ Assert(!isnull);
+ time_val = DatumGetFloat8(result);
+
+ elog(INFO, "time: %f", time_val);
+
+ SPI_finish();
+ }
+
+ if (time_val <= 0)
+ time_val = 0;
+
+ if (!stmt->delay)
+ stmt->delay = (int)(time_val * 1000);
+
+ if (stmt->kind == WAIT_FOR_TIME)
+ WaitTimeUtility(time_val * 1000);
+ else
+ WaitUtility(stmt->lsn, stmt->delay, dest);
+ }
+ break;
+
default:
/* All other statement types have event trigger support */
ProcessUtilitySlow(pstate, pstmt, queryString,
@@ -2713,6 +2771,10 @@ CreateCommandTag(Node *parsetree)
tag = CMDTAG_NOTIFY;
break;
+ case T_WaitStmt:
+ tag = CMDTAG_WAIT;
+ break;
+
case T_ListenStmt:
tag = CMDTAG_LISTEN;
break;
@@ -3344,6 +3406,10 @@ GetCommandLogLevel(Node *parsetree)
lev = LOGSTMT_ALL;
break;
+ case T_WaitStmt:
+ lev = LOGSTMT_ALL;
+ break;
+
case T_ListenStmt:
lev = LOGSTMT_ALL;
break;
diff --git a/src/include/commands/wait.h b/src/include/commands/wait.h
new file mode 100644
index 0000000000..815aaa2f92
--- /dev/null
+++ b/src/include/commands/wait.h
@@ -0,0 +1,25 @@
+/*-------------------------------------------------------------------------
+ *
+ * wait.h
+ * wait notification: wait
+ *
+ * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
+ * Portions Copyright (c) 2016, Regents of PostgresPRO
+ *
+ * src/include/commands/wait.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef WAIT_H
+#define WAIT_H
+#include "postgres.h"
+#include "tcop/dest.h"
+
+extern void WaitUtility(const char *lsn, const int delay, DestReceiver *dest);
+extern void WaitTimeUtility(const int delay);
+extern Size WaitShmemSize(void);
+extern void WaitShmemInit(void);
+extern void WaitSetLatch(XLogRecPtr cur_lsn);
+extern XLogRecPtr GetMinWait(void);
+
+#endif /* WAIT_H */
diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h
index baced7eec0..a3cfa92ab2 100644
--- a/src/include/nodes/nodes.h
+++ b/src/include/nodes/nodes.h
@@ -487,6 +487,7 @@ typedef enum NodeTag
T_DropReplicationSlotCmd,
T_StartReplicationCmd,
T_TimeLineHistoryCmd,
+ T_WaitStmt,
T_SQLCmd,
/*
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index da0706add5..463afdc817 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3554,4 +3554,32 @@ typedef struct DropSubscriptionStmt
DropBehavior behavior; /* RESTRICT or CASCADE behavior */
} DropSubscriptionStmt;
+/* ----------------------
+ * Wait Statement
+ * ----------------------
+ */
+
+typedef enum WaitForType
+{
+ WAIT_FOR_LSN = 0,
+ WAIT_FOR_TIME,
+ WAIT_FOR_MIX
+} WaitForType;
+
+typedef enum WaitForStrategy
+{
+ WAIT_FOR_ANY = 0,
+ WAIT_FOR_ALL
+} WaitForStrategy;
+
+typedef struct WaitStmt
+{
+ NodeTag type;
+ WaitForType kind;
+ WaitForStrategy strategy;
+ char *lsn; /* Taraget LSN to wait for */
+ int delay; /* Delay to wait for LSN */
+ Node *time; /* Wait for timestamp */
+} WaitStmt;
+
#endif /* PARSENODES_H */
diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h
index b1184c2d15..dd22e358b9 100644
--- a/src/include/parser/kwlist.h
+++ b/src/include/parser/kwlist.h
@@ -243,6 +243,7 @@ PG_KEYWORD("location", LOCATION, UNRESERVED_KEYWORD)
PG_KEYWORD("lock", LOCK_P, UNRESERVED_KEYWORD)
PG_KEYWORD("locked", LOCKED, UNRESERVED_KEYWORD)
PG_KEYWORD("logged", LOGGED, UNRESERVED_KEYWORD)
+PG_KEYWORD("lsn", LSN, UNRESERVED_KEYWORD)
PG_KEYWORD("mapping", MAPPING, UNRESERVED_KEYWORD)
PG_KEYWORD("match", MATCH, UNRESERVED_KEYWORD)
PG_KEYWORD("materialized", MATERIALIZED, UNRESERVED_KEYWORD)
@@ -404,6 +405,7 @@ PG_KEYWORD("text", TEXT_P, UNRESERVED_KEYWORD)
PG_KEYWORD("then", THEN, RESERVED_KEYWORD)
PG_KEYWORD("ties", TIES, UNRESERVED_KEYWORD)
PG_KEYWORD("time", TIME, COL_NAME_KEYWORD)
+PG_KEYWORD("timeout", TIMEOUT, UNRESERVED_KEYWORD)
PG_KEYWORD("timestamp", TIMESTAMP, COL_NAME_KEYWORD)
PG_KEYWORD("to", TO, RESERVED_KEYWORD)
PG_KEYWORD("trailing", TRAILING, RESERVED_KEYWORD)
@@ -444,6 +446,7 @@ PG_KEYWORD("version", VERSION_P, UNRESERVED_KEYWORD)
PG_KEYWORD("view", VIEW, UNRESERVED_KEYWORD)
PG_KEYWORD("views", VIEWS, UNRESERVED_KEYWORD)
PG_KEYWORD("volatile", VOLATILE, UNRESERVED_KEYWORD)
+PG_KEYWORD("wait", WAIT, UNRESERVED_KEYWORD)
PG_KEYWORD("when", WHEN, RESERVED_KEYWORD)
PG_KEYWORD("where", WHERE, RESERVED_KEYWORD)
PG_KEYWORD("whitespace", WHITESPACE_P, UNRESERVED_KEYWORD)
diff --git a/src/include/tcop/cmdtaglist.h b/src/include/tcop/cmdtaglist.h
index d28145a50d..28f5a2ff67 100644
--- a/src/include/tcop/cmdtaglist.h
+++ b/src/include/tcop/cmdtaglist.h
@@ -216,3 +216,4 @@ PG_CMDTAG(CMDTAG_TRUNCATE_TABLE, "TRUNCATE TABLE", false, false, false)
PG_CMDTAG(CMDTAG_UNLISTEN, "UNLISTEN", false, false, false)
PG_CMDTAG(CMDTAG_UPDATE, "UPDATE", false, false, true)
PG_CMDTAG(CMDTAG_VACUUM, "VACUUM", false, false, false)
+PG_CMDTAG(CMDTAG_WAIT, "WAIT", false, false, false)
diff --git a/src/test/recovery/t/018_waitfor.pl b/src/test/recovery/t/018_waitfor.pl
new file mode 100644
index 0000000000..6817431e9c
--- /dev/null
+++ b/src/test/recovery/t/018_waitfor.pl
@@ -0,0 +1,64 @@
+# Checks WAIT FOR
+use strict;
+use warnings;
+
+use PostgresNode;
+use TestLib;
+use Test::More tests => 1;
+
+# Initialize master node
+my $node_master = get_new_node('master');
+$node_master->init(allows_streaming => 1);
+$node_master->start;
+
+# And some content
+$node_master->safe_psql('postgres',
+ "CREATE TABLE tab_int AS SELECT generate_series(1, 10) AS a");
+
+# Take backup
+my $backup_name = 'my_backup';
+$node_master->backup($backup_name);
+
+# Create streaming standby from backup
+my $node_standby = get_new_node('standby');
+my $delay = 4;
+$node_standby->init_from_backup($node_master, $backup_name,
+ has_streaming => 1);
+$node_standby->append_conf(
+ 'postgresql.conf', qq(
+recovery_min_apply_delay = '${delay}s'
+));
+$node_standby->start;
+
+# Make new content on master and check its presence in standby depending
+# on the delay applied above. Before doing the insertion, get the
+# current timestamp that will be used as a comparison base. Even on slow
+# machines, this allows to have a predictable behavior when comparing the
+# delay between data insertion moment on master and replay time on standby.
+my $master_insert_time = time();
+$node_master->safe_psql('postgres',
+ "INSERT INTO tab_int VALUES (generate_series(11, 20))");
+
+# Now wait for replay to complete on standby. We're done waiting when the
+# standby has replayed up to the previously saved master LSN.
+my $until_lsn =
+ $node_master->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+
+$node_master->safe_psql('postgres',
+ "INSERT INTO tab_int VALUES (generate_series(21, 30))");
+
+# Check that waitlsn is able to setup infinite waiting loop and exit
+# it without timeouts.
+$node_standby->safe_psql('postgres',
+ "WAIT FOR LSN '$until_lsn'", 't')
+ or die "standby never caught up";
+
+# Check that waitlsn can return result immediately with NOWAIT.
+$node_standby->poll_query_until('postgres',
+ "WAIT FOR LSN '$until_lsn' TIMEOUT 1", 't')
+ or die "standby never caught up";
+
+# This test is successful if and only if the LSN has been applied with at least
+# the configured apply delay.
+my $time_waited = time() - $master_insert_time;
+ok($time_waited >= $delay,"standby applies WAL only after replication delay");