Intro==========
The main purpose of the feature is to achieve
read-your-writes-consistency, while using async replica for reads and
primary for writes. In that case lsn of last modification is stored
inside
application. We cannot store this lsn inside database, since reads are
distributed across all replicas and primary.
https://www.postgresql.org/message-id/195e2d07ead315b1620f1a053313f490%40postgrespro.ru
Suggestions
==========
Lots of proposals were made how this feature may look like.
I aggregate them into the following four types.
1) Classic (wait_classic_v1.patch)
https://www.postgresql.org/message-id/3cc883048264c2e9af022033925ff8db%40postgrespro.ru
==========
advantages: multiple events, standalone WAIT
disadvantages: new words in grammar
WAIT FOR [ANY | ALL] event [, ...]
BEGIN [ WORK | TRANSACTION ] [ transaction_mode [, ...] ]
[ WAIT FOR [ANY | ALL] event [, ...]]
where event is one of:
LSN value
TIMEOUT number_of_milliseconds
timestamp
2) After style: Kyotaro and Freund (wait_after_within_v1.patch)
https://www.postgresql.org/message-id/d3ff2e363af60b345f82396992595a03%40postgrespro.ru
==========
advantages: no new words in grammar, standalone AFTER
disadvantages: a little harder to understand
AFTER lsn_event [ WITHIN delay_milliseconds ] [, ...]
BEGIN [ WORK | TRANSACTION ] [ transaction_mode [, ...] ]
[ AFTER lsn_event [ WITHIN delay_milliseconds ]]
START [ WORK | TRANSACTION ] [ transaction_mode [, ...] ]
[ AFTER lsn_event [ WITHIN delay_milliseconds ]]
3) Procedure style: Tom Lane and Kyotaro (wait_proc_v1.patch)
https://www.postgresql.org/message-id/27171.1586439221%40sss.pgh.pa.us
https://www.postgresql.org/message-id/20210121.173009.235021120161403875.horikyota.ntt%40gmail.com
==========
advantages: no new words in grammar,like it made in
pg_last_wal_replay_lsn, no snapshots need
disadvantages: a little harder to remember names
SELECT pg_waitlsn(‘LSN’, timeout);
SELECT pg_waitlsn_infinite(‘LSN’);
SELECT pg_waitlsn_no_wait(‘LSN’);
4) Brackets style: Kondratov
https://www.postgresql.org/message-id/a8bff0350a27e0a87a6eaf0905d6737f%40postgrespro.ru
==========
advantages: only one new word in grammar,like it made in VACUUM and
REINDEX, ability to extend parameters without grammar fixes
disadvantages:
WAIT (LSN '16/B374D848', TIMEOUT 100);
BEGIN WAIT (LSN '16/B374D848' [, etc_options]);
...
COMMIT;
Consequence
==========
Below I provide the implementation of patches for the first three types.
I propose to discuss this feature again/
Regards
--
Ivan Kartyshov
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company
diff --git a/doc/src/sgml/ref/allfiles.sgml b/doc/src/sgml/ref/allfiles.sgml
index 54b5f22d6e..18695e013e 100644
--- a/doc/src/sgml/ref/allfiles.sgml
+++ b/doc/src/sgml/ref/allfiles.sgml
@@ -188,6 +188,7 @@ Complete list of usable sgml source files in this directory.
<!ENTITY update SYSTEM "update.sgml">
<!ENTITY vacuum SYSTEM "vacuum.sgml">
<!ENTITY values SYSTEM "values.sgml">
+<!ENTITY wait SYSTEM "wait.sgml">
<!-- applications and utilities -->
<!ENTITY clusterdb SYSTEM "clusterdb.sgml">
diff --git a/doc/src/sgml/ref/begin.sgml b/doc/src/sgml/ref/begin.sgml
index 016b021487..a2794763b1 100644
--- a/doc/src/sgml/ref/begin.sgml
+++ b/doc/src/sgml/ref/begin.sgml
@@ -21,13 +21,16 @@ PostgreSQL documentation
<refsynopsisdiv>
<synopsis>
-BEGIN [ WORK | TRANSACTION ] [ <replaceable class="parameter">transaction_mode</replaceable> [, ...] ]
+BEGIN [ WORK | TRANSACTION ] [ <replaceable class="parameter">transaction_mode</replaceable> [, ...] ] <replaceable class="parameter">wait_event</replaceable>
<phrase>where <replaceable class="parameter">transaction_mode</replaceable> is one of:</phrase>
ISOLATION LEVEL { SERIALIZABLE | REPEATABLE READ | READ COMMITTED | READ UNCOMMITTED }
READ WRITE | READ ONLY
[ NOT ] DEFERRABLE
+
+<phrase>where <replaceable class="parameter">wait_event</replaceable> is:</phrase>
+ AFTER <replaceable class="parameter">lsn_value</replaceable> [ WITHIN number_of_milliseconds ]
</synopsis>
</refsynopsisdiv>
diff --git a/doc/src/sgml/ref/start_transaction.sgml b/doc/src/sgml/ref/start_transaction.sgml
index 74ccd7e345..46a3bcf1a8 100644
--- a/doc/src/sgml/ref/start_transaction.sgml
+++ b/doc/src/sgml/ref/start_transaction.sgml
@@ -21,13 +21,16 @@ PostgreSQL documentation
<refsynopsisdiv>
<synopsis>
-START TRANSACTION [ <replaceable class="parameter">transaction_mode</replaceable> [, ...] ]
+START TRANSACTION [ <replaceable class="parameter">transaction_mode</replaceable> [, ...] ] <replaceable class="parameter">wait_event</replaceable>
<phrase>where <replaceable class="parameter">transaction_mode</replaceable> is one of:</phrase>
ISOLATION LEVEL { SERIALIZABLE | REPEATABLE READ | READ COMMITTED | READ UNCOMMITTED }
READ WRITE | READ ONLY
[ NOT ] DEFERRABLE
+
+<phrase>where <replaceable class="parameter">wait_event</replaceable> is:</phrase>
+ AFTER <replaceable class="parameter">lsn_value</replaceable> [ WITHIN number_of_milliseconds ]
</synopsis>
</refsynopsisdiv>
diff --git a/doc/src/sgml/reference.sgml b/doc/src/sgml/reference.sgml
index e11b4b6130..a83ff4551e 100644
--- a/doc/src/sgml/reference.sgml
+++ b/doc/src/sgml/reference.sgml
@@ -216,6 +216,7 @@
&update;
&vacuum;
&values;
+ &wait;
</reference>
diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index dbe9394762..e4d2d8d0a1 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -43,6 +43,7 @@
#include "backup/basebackup.h"
#include "catalog/pg_control.h"
#include "commands/tablespace.h"
+#include "commands/wait.h"
#include "common/file_utils.h"
#include "miscadmin.h"
#include "pgstat.h"
@@ -1752,6 +1753,15 @@ PerformWalRecovery(void)
break;
}
+ /*
+ * If we replayed an LSN that someone was waiting for,
+ * set latches in shared memory array to notify the waiter.
+ */
+ if (XLogRecoveryCtl->lastReplayedEndRecPtr >= GetMinWait())
+ {
+ WaitSetLatch(XLogRecoveryCtl->lastReplayedEndRecPtr);
+ }
+
/* Else, try to fetch the next WAL record */
record = ReadRecord(xlogprefetcher, LOG, false, replayTLI);
} while (record != NULL);
diff --git a/src/backend/commands/Makefile b/src/backend/commands/Makefile
index 48f7348f91..d8f6965d8c 100644
--- a/src/backend/commands/Makefile
+++ b/src/backend/commands/Makefile
@@ -61,6 +61,7 @@ OBJS = \
vacuum.o \
vacuumparallel.o \
variable.o \
- view.o
+ view.o \
+ wait.o
include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/commands/wait.c b/src/backend/commands/wait.c
new file mode 100644
index 0000000000..9a3b8192a6
--- /dev/null
+++ b/src/backend/commands/wait.c
@@ -0,0 +1,329 @@
+/*-------------------------------------------------------------------------
+ *
+ * wait.c
+ * Implements WAIT FOR, which allows waiting for events such as
+ * time passing or LSN having been replayed on replica.
+ *
+ * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
+ * Portions Copyright (c) 2020, Regents of PostgresPro
+ *
+ * IDENTIFICATION
+ * src/backend/commands/wait.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include <float.h>
+#include <math.h>
+#include "postgres.h"
+#include "pgstat.h"
+#include "fmgr.h"
+#include "access/transam.h"
+#include "access/xact.h"
+#include "access/xlogdefs.h"
+#include "access/xlogrecovery.h"
+#include "catalog/pg_type.h"
+#include "commands/wait.h"
+#include "funcapi.h"
+#include "miscadmin.h"
+#include "storage/backendid.h"
+#include "storage/ipc.h"
+#include "storage/latch.h"
+#include "storage/pmsignal.h"
+#include "storage/proc.h"
+#include "storage/shmem.h"
+#include "storage/spin.h"
+#include "storage/sinvaladt.h"
+#include "utils/builtins.h"
+#include "utils/pg_lsn.h"
+#include "utils/timestamp.h"
+#include "executor/spi.h"
+#include "utils/fmgrprotos.h"
+
+/* Add to / delete from shared memory array */
+static void AddEvent(XLogRecPtr lsn_to_wait);
+static void DeleteEvent(void);
+
+/* Shared memory structure */
+typedef struct
+{
+ int backend_maxid;
+ XLogRecPtr min_lsn;
+ slock_t mutex;
+ XLogRecPtr waited_lsn[FLEXIBLE_ARRAY_MEMBER];
+} WaitState;
+
+static volatile WaitState *state;
+
+/* Add the event of the current backend to the shared memory array */
+static void
+AddEvent(XLogRecPtr lsn_to_wait)
+{
+ SpinLockAcquire(&state->mutex);
+ if (state->backend_maxid < MyBackendId)
+ state->backend_maxid = MyBackendId;
+
+ state->waited_lsn[MyBackendId] = lsn_to_wait;
+
+ if (lsn_to_wait < state->min_lsn)
+ state->min_lsn = lsn_to_wait;
+ SpinLockRelease(&state->mutex);
+}
+
+/*
+ * Delete event of the current backend from the shared memory array.
+ *
+ * TODO: Consider state cleanup on backend failure.
+ * Check:
+ * 1) nomal|smart|fast|immediate stop
+ * 2) SIGKILL and SIGTERM
+ */
+static void
+DeleteEvent(void)
+{
+ int i;
+ XLogRecPtr lsn_to_delete = state->waited_lsn[MyBackendId];
+
+ state->waited_lsn[MyBackendId] = InvalidXLogRecPtr;
+
+ SpinLockAcquire(&state->mutex);
+
+ /* If we need to choose the next min_lsn, update state->min_lsn */
+ if (state->min_lsn == lsn_to_delete)
+ {
+ state->min_lsn = PG_UINT64_MAX;
+ for (i = 2; i <= state->backend_maxid; i++)
+ if (state->waited_lsn[i] != InvalidXLogRecPtr &&
+ state->waited_lsn[i] < state->min_lsn)
+ state->min_lsn = state->waited_lsn[i];
+ }
+
+ if (state->backend_maxid == MyBackendId)
+ for (i = (MyBackendId); i >= 2; i--)
+ if (state->waited_lsn[i] != InvalidXLogRecPtr)
+ {
+ state->backend_maxid = i;
+ break;
+ }
+
+ SpinLockRelease(&state->mutex);
+}
+
+/*
+ * Report amount of shared memory space needed for WaitState
+ */
+Size
+WaitShmemSize(void)
+{
+ Size size;
+
+ size = offsetof(WaitState, waited_lsn);
+ size = add_size(size, mul_size(MaxBackends + 1, sizeof(XLogRecPtr)));
+ return size;
+}
+
+/* Init array of events in shared memory */
+void
+WaitShmemInit(void)
+{
+ bool found;
+ uint32 i;
+
+ state = (WaitState *) ShmemInitStruct("pg_wait_lsn",
+ WaitShmemSize(),
+ &found);
+ if (!found)
+ {
+ SpinLockInit(&state->mutex);
+
+ for (i = 0; i < (MaxBackends + 1); i++)
+ state->waited_lsn[i] = InvalidXLogRecPtr;
+
+ state->backend_maxid = 0;
+ state->min_lsn = PG_UINT64_MAX;
+ }
+}
+
+/* Set all latches in shared memory to signal that new LSN has been replayed */
+void
+WaitSetLatch(XLogRecPtr cur_lsn)
+{
+ uint32 i;
+ int backend_maxid;
+ PGPROC *backend;
+
+ SpinLockAcquire(&state->mutex);
+ backend_maxid = state->backend_maxid;
+ SpinLockRelease(&state->mutex);
+
+ for (i = 2; i <= backend_maxid; i++)
+ {
+ backend = BackendIdGetProc(i);
+ if (state->waited_lsn[i] != 0)
+ {
+ if (backend && state->waited_lsn[i] <= cur_lsn)
+ SetLatch(&backend->procLatch);
+ }
+ }
+}
+
+/* Get minimal LSN that will be next */
+XLogRecPtr
+GetMinWait(void)
+{
+ return state->min_lsn;
+}
+
+/*
+ * On WAIT use MyLatch to wait till LSN is replayed,
+ * postmaster dies or timeout happens.
+ */
+int
+WaitUtility(XLogRecPtr lsn, const float8 secs)
+{
+ XLogRecPtr cur_lsn = GetXLogReplayRecPtr(NULL);
+ int latch_events;
+ float8 endtime;
+ uint res = 0;
+
+#define GetNowFloat() ((float8) GetCurrentTimestamp() / 1000000.0)
+ endtime = GetNowFloat() + secs;
+
+latch_events = WL_TIMEOUT | WL_EXIT_ON_PM_DEATH;
+
+ if (lsn != InvalidXLogRecPtr)
+ {
+ /* Just check if we reached */
+ if (lsn < cur_lsn || secs < 0)
+ return (lsn < cur_lsn);
+
+ latch_events |= WL_LATCH_SET;
+ AddEvent(lsn);
+ }
+ else if (!secs)
+ return 1;
+
+ for (;;)
+ {
+ int rc;
+ float8 delay = 0;
+ long delay_ms;
+
+ if (secs > 0)
+ delay = endtime - GetNowFloat();
+ else if (secs == 0)
+ /*
+ * If we wait forever, then 1 minute timeout to check
+ * for Interupts.
+ */
+ delay = 60;
+
+ if (delay > 0.0)
+ delay_ms = (long) ceil(delay * 1000.0);
+ else
+ break;
+
+ /*
+ * If received an interruption from CHECK_FOR_INTERRUPTS,
+ * then delete the current event from array.
+ */
+ if (InterruptPending)
+ {
+ if (lsn != InvalidXLogRecPtr)
+ DeleteEvent();
+ ProcessInterrupts();
+ }
+
+ /* If postmaster dies, finish immediately */
+ if (!PostmasterIsAlive())
+ break;
+
+ rc = WaitLatch(MyLatch, latch_events, delay_ms,
+ WAIT_EVENT_CLIENT_READ);
+
+ if (rc & WL_LATCH_SET)
+ ResetLatch(MyLatch);
+
+ if (lsn && rc & WL_LATCH_SET)
+ cur_lsn = GetXLogReplayRecPtr(NULL);
+
+ /* If LSN has been replayed */
+ if (lsn && lsn <= cur_lsn)
+ break;
+ }
+
+ if (lsn != InvalidXLogRecPtr)
+ DeleteEvent();
+
+ if (lsn != InvalidXLogRecPtr && lsn > cur_lsn)
+ elog(NOTICE,"LSN is not reached. Try to increase wait time.");
+ else
+ res = 1;
+
+ return res;
+}
+
+/*
+ * Get the amount of seconds left till the specified time.
+ */
+float8
+WaitTimeResolve(Const *time)
+{
+ int ret;
+ float8 val;
+ Oid types[] = { time->consttype };
+ Datum values[] = { time->constvalue };
+ char nulls[] = { " " };
+ Datum result;
+ bool isnull;
+
+ SPI_connect();
+
+ if (time->consttype == 1083)
+ ret = SPI_execute_with_args("select extract (epoch from ($1 - now()::time))",
+ 1, types, values, nulls, true, 0);
+ else if (time->consttype == 1266)
+ ret = SPI_execute_with_args("select extract (epoch from (timezone('UTC',$1)::time - timezone('UTC', now()::timetz)::time))",
+ 1, types, values, nulls, true, 0);
+ else
+ ret = SPI_execute_with_args("select extract (epoch from ($1 - now()))",
+ 1, types, values, nulls, true, 0);
+
+ Assert(ret >= 0);
+ result = SPI_getbinval(SPI_tuptable->vals[0],
+ SPI_tuptable->tupdesc,
+ 1, &isnull);
+
+ Assert(!isnull);
+ val = DatumGetFloat8(result);
+
+ elog(INFO, "time: %f", val);
+
+ SPI_finish();
+ return val;
+}
+
+/* Implementation of WAIT FOR */
+int
+WaitMain(WaitStmt *stmt, DestReceiver *dest)
+{
+ TupleDesc tupdesc;
+ TupOutputState *tstate;
+ XLogRecPtr lsn = InvalidXLogRecPtr;
+ int res = 0;
+
+ lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in,
+ CStringGetDatum(stmt->lsn)));
+ res = WaitUtility(lsn, stmt->delay);
+
+ /* Need a tuple descriptor representing a single TEXT column */
+ tupdesc = CreateTemplateTupleDesc(1);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 1, "LSN reached", TEXTOID, -1, 0);
+ /* Prepare for projection of tuples */
+ tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsMinimalTuple);
+
+ /* Send it */
+ do_text_output_oneline(tstate, res?"t":"f");
+ end_tup_output(tstate);
+ return res;
+}
diff --git a/src/backend/parser/analyze.c b/src/backend/parser/analyze.c
index e892df9819..390bf9e27c 100644
--- a/src/backend/parser/analyze.c
+++ b/src/backend/parser/analyze.c
@@ -399,7 +399,6 @@ transformStmt(ParseState *pstate, Node *parseTree)
result = transformCallStmt(pstate,
(CallStmt *) parseTree);
break;
-
default:
/*
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index a0138382a1..24105eca4e 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -310,7 +310,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
SecLabelStmt SelectStmt TransactionStmt TransactionStmtLegacy TruncateStmt
UnlistenStmt UpdateStmt VacuumStmt
VariableResetStmt VariableSetStmt VariableShowStmt
- ViewStmt CheckPointStmt CreateConversionStmt
+ ViewStmt WaitStmt CheckPointStmt CreateConversionStmt
DeallocateStmt PrepareStmt ExecuteStmt
DropOwnedStmt ReassignOwnedStmt
AlterTSConfigurationStmt AlterTSDictionaryStmt
@@ -642,6 +642,8 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
%type <partboundspec> PartitionBoundSpec
%type <list> hash_partbound
%type <defelt> hash_partbound_elem
+%type <ival> wait_time
+%type <node> wait_for
/*
@@ -1064,6 +1066,7 @@ stmt:
| VariableSetStmt
| VariableShowStmt
| ViewStmt
+ | WaitStmt
| /*EMPTY*/
{ $$ = NULL; }
;
@@ -10838,12 +10841,13 @@ TransactionStmt:
n->chain = $3;
$$ = (Node *) n;
}
- | START TRANSACTION transaction_mode_list_or_empty
+ | START TRANSACTION transaction_mode_list_or_empty wait_for
{
TransactionStmt *n = makeNode(TransactionStmt);
n->kind = TRANS_STMT_START;
n->options = $3;
+ n->wait = $4;
$$ = (Node *) n;
}
| COMMIT opt_transaction opt_transaction_chain
@@ -10931,12 +10935,13 @@ TransactionStmt:
;
TransactionStmtLegacy:
- BEGIN_P opt_transaction transaction_mode_list_or_empty
+ BEGIN_P opt_transaction transaction_mode_list_or_empty wait_for
{
TransactionStmt *n = makeNode(TransactionStmt);
n->kind = TRANS_STMT_BEGIN;
n->options = $3;
+ n->wait = $4;
$$ = (Node *) n;
}
| END_P opt_transaction opt_transaction_chain
@@ -15622,6 +15627,37 @@ xml_passing_mech:
| BY VALUE_P
;
+/*****************************************************************************
+ *
+ * QUERY:
+ * AFTER LSN_value [WITHIN delay timestamp]
+ *
+ *****************************************************************************/
+WaitStmt:
+ AFTER Sconst wait_time
+ {
+ WaitStmt *n = makeNode(WaitStmt);
+ n->lsn = $2;
+ n->delay = $3;
+ $$ = (Node *)n;
+ }
+ ;
+wait_for:
+ AFTER Sconst wait_time
+ {
+ WaitStmt *n = makeNode(WaitStmt);
+ n->lsn = $2;
+ n->delay = $3;
+ $$ = (Node *)n;
+ }
+ | /* EMPTY */ { $$ = NULL; }
+ ;
+
+wait_time:
+ WITHIN Iconst { $$ = $2; }
+ | /* EMPTY */ { $$ = 0; }
+ ;
+
/*
* Aggregate decoration clauses
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 8f1ded7338..b7963ca9a2 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -25,6 +25,7 @@
#include "access/xlogprefetcher.h"
#include "access/xlogrecovery.h"
#include "commands/async.h"
+#include "commands/wait.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/autovacuum.h"
@@ -142,6 +143,7 @@ CalculateShmemSize(int *num_semaphores)
size = add_size(size, SyncScanShmemSize());
size = add_size(size, AsyncShmemSize());
size = add_size(size, StatsShmemSize());
+ size = add_size(size, WaitShmemSize());
#ifdef EXEC_BACKEND
size = add_size(size, ShmemBackendArraySize());
#endif
@@ -295,6 +297,11 @@ CreateSharedMemoryAndSemaphores(void)
AsyncShmemInit();
StatsShmemInit();
+ /*
+ * Init array of Latches in shared memory for wait lsn
+ */
+ WaitShmemInit();
+
#ifdef EXEC_BACKEND
/*
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index c7d9d96b45..b2ed7a8b21 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -15,6 +15,7 @@
*-------------------------------------------------------------------------
*/
#include "postgres.h"
+#include <float.h>
#include "access/htup_details.h"
#include "access/reloptions.h"
@@ -59,6 +60,7 @@
#include "commands/user.h"
#include "commands/vacuum.h"
#include "commands/view.h"
+#include "commands/wait.h"
#include "miscadmin.h"
#include "parser/parse_utilcmd.h"
#include "postmaster/bgwriter.h"
@@ -72,6 +74,9 @@
#include "utils/lsyscache.h"
#include "utils/rel.h"
#include "utils/syscache.h"
+#include "executor/spi.h"
+#include "utils/fmgrprotos.h"
+#include "utils/pg_lsn.h"
/* Hook for plugins to get control in ProcessUtility() */
ProcessUtility_hook_type ProcessUtility_hook = NULL;
@@ -272,6 +277,7 @@ ClassifyUtilityCommandAsReadOnly(Node *parsetree)
case T_LoadStmt:
case T_PrepareStmt:
case T_UnlistenStmt:
+ case T_WaitStmt:
case T_VariableSetStmt:
{
/*
@@ -612,6 +618,11 @@ standard_ProcessUtility(PlannedStmt *pstmt,
case TRANS_STMT_START:
{
ListCell *lc;
+ WaitStmt *waitstmt = (WaitStmt *) stmt->wait;
+
+ /* If needed to WAIT FOR something but failed */
+ if (stmt->wait && WaitMain(waitstmt, dest) == 0)
+ break;
BeginTransactionBlock();
foreach(lc, stmt->options)
@@ -1069,6 +1080,13 @@ standard_ProcessUtility(PlannedStmt *pstmt,
break;
}
+ case T_WaitStmt:
+ {
+ WaitStmt *stmt = (WaitStmt *) parsetree;
+ WaitMain(stmt, dest);
+ break;
+ }
+
default:
/* All other statement types have event trigger support */
ProcessUtilitySlow(pstate, pstmt, queryString,
@@ -2833,6 +2851,10 @@ CreateCommandTag(Node *parsetree)
tag = CMDTAG_NOTIFY;
break;
+ case T_WaitStmt:
+ tag = CMDTAG_WAIT;
+ break;
+
case T_ListenStmt:
tag = CMDTAG_LISTEN;
break;
@@ -3481,6 +3503,10 @@ GetCommandLogLevel(Node *parsetree)
lev = LOGSTMT_ALL;
break;
+ case T_WaitStmt:
+ lev = LOGSTMT_ALL;
+ break;
+
case T_ListenStmt:
lev = LOGSTMT_ALL;
break;
diff --git a/src/include/commands/wait.h b/src/include/commands/wait.h
new file mode 100644
index 0000000000..0270160d44
--- /dev/null
+++ b/src/include/commands/wait.h
@@ -0,0 +1,26 @@
+/*-------------------------------------------------------------------------
+ *
+ * wait.h
+ * prototypes for commands/wait.c
+ *
+ * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
+ * Portions Copyright (c) 2016, Regents of PostgresPRO
+ *
+ * src/include/commands/wait.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef WAIT_H
+#define WAIT_H
+#include "postgres.h"
+#include "tcop/dest.h"
+
+extern int WaitUtility(XLogRecPtr lsn, const float8 delay);
+extern Size WaitShmemSize(void);
+extern void WaitShmemInit(void);
+extern void WaitSetLatch(XLogRecPtr cur_lsn);
+extern XLogRecPtr GetMinWait(void);
+extern float8 WaitTimeResolve(Const *time);
+extern int WaitMain(WaitStmt *stmt, DestReceiver *dest);
+
+#endif /* WAIT_H */
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index f7d7f10f7d..5e4cc547c5 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3393,6 +3393,7 @@ typedef struct TransactionStmt
char *savepoint_name; /* for savepoint commands */
char *gid; /* for two-phase-commit related commands */
bool chain; /* AND CHAIN option */
+ Node *wait; /* wait lsn clause */
} TransactionStmt;
/* ----------------------
@@ -3938,4 +3939,16 @@ typedef struct DropSubscriptionStmt
DropBehavior behavior; /* RESTRICT or CASCADE behavior */
} DropSubscriptionStmt;
+/* ----------------------
+ * AFTER Statement + AFTER clause of BEGIN statement
+ * ----------------------
+ */
+
+typedef struct WaitStmt
+{
+ NodeTag type;
+ char *lsn; /* LSN */
+ int delay; /* TIMEOUT */
+} WaitStmt;
+
#endif /* PARSENODES_H */
diff --git a/src/include/tcop/cmdtaglist.h b/src/include/tcop/cmdtaglist.h
index e738ac1c09..a95657a32f 100644
--- a/src/include/tcop/cmdtaglist.h
+++ b/src/include/tcop/cmdtaglist.h
@@ -216,3 +216,4 @@ PG_CMDTAG(CMDTAG_TRUNCATE_TABLE, "TRUNCATE TABLE", false, false, false)
PG_CMDTAG(CMDTAG_UNLISTEN, "UNLISTEN", false, false, false)
PG_CMDTAG(CMDTAG_UPDATE, "UPDATE", false, false, true)
PG_CMDTAG(CMDTAG_VACUUM, "VACUUM", false, false, false)
+PG_CMDTAG(CMDTAG_WAIT, "AFTER", false, false, false)
diff --git a/src/test/recovery/t/034_begin_after.pl b/src/test/recovery/t/034_begin_after.pl
new file mode 100644
index 0000000000..b22e63603c
--- /dev/null
+++ b/src/test/recovery/t/034_begin_after.pl
@@ -0,0 +1,86 @@
+# Checks waiting for lsn on standby AFTER
+use strict;
+use warnings;
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Initialize primary node
+my $node_primary = PostgreSQL::Test::Cluster->new('primary');
+$node_primary->init(allows_streaming => 1);
+$node_primary->start;
+
+# And some content and take a backup
+$node_primary->safe_psql('postgres',
+ "CREATE TABLE wait_test AS SELECT generate_series(1,10) AS a");
+my $backup_name = 'my_backup';
+$node_primary->backup($backup_name);
+
+# Create a streaming standby with a 1 second delay from the backup
+my $node_standby = PostgreSQL::Test::Cluster->new('standby');
+my $delay = 1;
+$node_standby->init_from_backup($node_primary, $backup_name,
+ has_streaming => 1);
+$node_standby->append_conf('postgresql.conf', qq[
+ recovery_min_apply_delay = '${delay}s'
+]);
+$node_standby->start;
+
+
+
+# Make sure that AFTER works: add new content to primary and memorize
+# primary's new LSN, then wait for primary's LSN on standby. Prove that AFTER is
+# able to setup an infinite waiting loop and exit it if given no wait timeout.
+$node_primary->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(11, 20))");
+my $lsn1 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+$node_standby->safe_psql('postgres', "BEGIN AFTER '$lsn1'");
+
+# Get the current LSN on standby and make sure it's the same as primary's LSN
+my $lsn_standby = $node_standby->safe_psql('postgres',
+ "SELECT pg_last_wal_replay_lsn()");
+my $compare_lsns = $node_standby->safe_psql('postgres',
+ "SELECT pg_lsn_cmp('$lsn_standby'::pg_lsn, '$lsn1'::pg_lsn)");
+ok($compare_lsns eq 0, "standby reached the same LSN as primary AFTER");
+
+
+
+#===============================================================================
+# TODO: remove this test if we remove the standalone "AFTER" command
+#===============================================================================
+# We need to check that AFTER works fine inside transactions. For that, let's
+# get two LSNs that will correspond to two different max values in our table.
+$node_primary->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(31, 40))");
+my $lsn3 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+$node_primary->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(41, 50))");
+my $lsn4 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+
+# Before starting transaction, AFTER which ensures a max value of 40.
+# Inside the transaction, AFTER that ensures a max value of 50.
+# Due to ISOLATION LEVEL REPEATABLE READ, we should NOT see the new max value.
+my $standby_results = $node_standby->safe_psql(
+ 'postgres', qq[
+ BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ AFTER '$lsn3';
+ SELECT max(a) FROM wait_test;
+ BEGIN AFTER '$lsn4';
+ SELECT pg_last_wal_replay_lsn();
+ SELECT max(a) FROM wait_test;
+ COMMIT;
+]);
+
+# Make sure that we indeed reach primary's last LSN inside the transaction.
+# For that, check that calling pg_last_wal_replay_lsn returned that LSN.
+my $last_lsn_reached = $standby_results =~ /$lsn4/;
+ok($last_lsn_reached, "AFTER works inside a transaction");
+
+# Check that transaction doesn't break and show us the new max value after AFTER.
+# For that, make sure that the older max value is repeated twice in the results.
+my $count = () = $standby_results =~ /40/g;
+ok($count eq 2, "transaction isolation level doesn't get broken due to AFTER");
+
+$node_standby->stop;
+$node_primary->stop;
+done_testing();
diff --git a/src/test/recovery/t/035_after.pl b/src/test/recovery/t/035_after.pl
new file mode 100644
index 0000000000..480ba6b33b
--- /dev/null
+++ b/src/test/recovery/t/035_after.pl
@@ -0,0 +1,99 @@
+# Checks waiting for lsn on standby AFTER
+use strict;
+use warnings;
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Initialize primary node
+my $node_primary = PostgreSQL::Test::Cluster->new('primary');
+$node_primary->init(allows_streaming => 1);
+$node_primary->start;
+
+# And some content and take a backup
+$node_primary->safe_psql('postgres',
+ "CREATE TABLE wait_test AS SELECT generate_series(1,10) AS a");
+my $backup_name = 'my_backup';
+$node_primary->backup($backup_name);
+
+# Create a streaming standby with a 1 second delay from the backup
+my $node_standby = PostgreSQL::Test::Cluster->new('standby');
+my $delay = 2;
+$node_standby->init_from_backup($node_primary, $backup_name,
+ has_streaming => 1);
+$node_standby->append_conf('postgresql.conf', qq[
+ recovery_min_apply_delay = '${delay}s'
+]);
+$node_standby->start;
+
+
+
+# Make sure that AFTER works: add new content to primary and memorize
+# primary's new LSN, then wait for primary's LSN on standby. Prove that AFTER is
+# able to setup an infinite waiting loop and exit it if given no wait timeout.
+$node_primary->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(11, 20))");
+my $lsn1 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+$node_standby->safe_psql('postgres', "AFTER '$lsn1'");
+
+# Get the current LSN on standby and make sure it's the same as primary's LSN
+my $lsn_standby = $node_standby->safe_psql('postgres',
+ "SELECT pg_last_wal_replay_lsn()");
+my $compare_lsns = $node_standby->safe_psql('postgres',
+ "SELECT pg_lsn_cmp('$lsn_standby'::pg_lsn, '$lsn1'::pg_lsn)");
+ok($compare_lsns eq 0, "standby reached the same LSN as primary AFTER");
+
+
+
+# Check that timeouts work on their own and let us wait for specified time (1s)
+my $current_time = $node_standby->safe_psql('postgres', "SELECT now()");
+my $one_second = 1000; # in milliseconds
+my $start_time = time();
+
+# Now, check that timeouts work as expected when waiting for LSN
+$node_primary->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(21, 30))");
+my $lsn2 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+my $reached_lsn = $node_standby->safe_psql('postgres',
+ "AFTER '$lsn2' WITHIN 1");
+ok($reached_lsn eq "f", "AFTER doesn't reach LSN if given too little wait time");
+
+
+
+# We need to check that WAIT works fine inside transactions. For that, let's
+# get two LSNs that will correspond to two different max values in our table.
+$node_primary->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(31, 40))");
+my $lsn3 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+$node_primary->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(41, 50))");
+my $lsn4 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+
+# Before starting transaction, AFTER which ensures a max value of 40.
+# Inside the transaction, AFTER that ensures a max value of 50.
+# Due to ISOLATION LEVEL REPEATABLE READ, we should NOT see the new max value.
+my $standby_results = $node_standby->safe_psql(
+ 'postgres', qq[
+ AFTER '$lsn3';
+ BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;
+ SELECT max(a) FROM wait_test;
+ AFTER '$lsn4';
+ SELECT pg_last_wal_replay_lsn();
+ SELECT max(a) FROM wait_test;
+ COMMIT;
+]);
+
+# Make sure that we indeed reach primary's last LSN inside the transaction.
+# For that, check that calling pg_last_wal_replay_lsn returned that LSN.
+my $last_lsn_reached = $standby_results =~ /$lsn4/;
+ok($last_lsn_reached, "AFTER works inside a transaction");
+
+# Check that transaction doesn't break and show us the new max value after AFTER.
+# For that, make sure that the older max value is repeated twice in the results.
+my $count = () = $standby_results =~ /40/g;
+ok($count eq 2, "transaction isolation level doesn't get broken due to AFTER");
+
+$node_standby->stop;
+$node_primary->stop;
+done_testing();
diff --git a/doc/src/sgml/ref/allfiles.sgml b/doc/src/sgml/ref/allfiles.sgml
index 54b5f22d6e..18695e013e 100644
--- a/doc/src/sgml/ref/allfiles.sgml
+++ b/doc/src/sgml/ref/allfiles.sgml
@@ -188,6 +188,7 @@ Complete list of usable sgml source files in this directory.
<!ENTITY update SYSTEM "update.sgml">
<!ENTITY vacuum SYSTEM "vacuum.sgml">
<!ENTITY values SYSTEM "values.sgml">
+<!ENTITY wait SYSTEM "wait.sgml">
<!-- applications and utilities -->
<!ENTITY clusterdb SYSTEM "clusterdb.sgml">
diff --git a/doc/src/sgml/ref/begin.sgml b/doc/src/sgml/ref/begin.sgml
index 016b021487..b3af16c09f 100644
--- a/doc/src/sgml/ref/begin.sgml
+++ b/doc/src/sgml/ref/begin.sgml
@@ -21,13 +21,21 @@ PostgreSQL documentation
<refsynopsisdiv>
<synopsis>
-BEGIN [ WORK | TRANSACTION ] [ <replaceable class="parameter">transaction_mode</replaceable> [, ...] ]
+BEGIN [ WORK | TRANSACTION ] [ <replaceable class="parameter">transaction_mode</replaceable> [, ...] ] <replaceable class="parameter">wait_for_event</replaceable>
<phrase>where <replaceable class="parameter">transaction_mode</replaceable> is one of:</phrase>
ISOLATION LEVEL { SERIALIZABLE | REPEATABLE READ | READ COMMITTED | READ UNCOMMITTED }
READ WRITE | READ ONLY
[ NOT ] DEFERRABLE
+
+<phrase>where <replaceable class="parameter">wait_for_event</replaceable> is:</phrase>
+ WAIT FOR [ANY | ALL] <replaceable class="parameter">event</replaceable> [, ...]
+
+<phrase>and <replaceable class="parameter">event</replaceable> is one of:</phrase>
+ LSN lsn_value
+ TIMEOUT number_of_milliseconds
+ timestamp
</synopsis>
</refsynopsisdiv>
diff --git a/doc/src/sgml/ref/start_transaction.sgml b/doc/src/sgml/ref/start_transaction.sgml
index 74ccd7e345..1b54ed2084 100644
--- a/doc/src/sgml/ref/start_transaction.sgml
+++ b/doc/src/sgml/ref/start_transaction.sgml
@@ -21,13 +21,21 @@ PostgreSQL documentation
<refsynopsisdiv>
<synopsis>
-START TRANSACTION [ <replaceable class="parameter">transaction_mode</replaceable> [, ...] ]
+START TRANSACTION [ <replaceable class="parameter">transaction_mode</replaceable> [, ...] ] <replaceable class="parameter">wait_for_event</replaceable>
<phrase>where <replaceable class="parameter">transaction_mode</replaceable> is one of:</phrase>
ISOLATION LEVEL { SERIALIZABLE | REPEATABLE READ | READ COMMITTED | READ UNCOMMITTED }
READ WRITE | READ ONLY
[ NOT ] DEFERRABLE
+
+<phrase>where <replaceable class="parameter">wait_for_event</replaceable> is:</phrase>
+ WAIT FOR [ANY | ALL] <replaceable class="parameter">event</replaceable> [, ...]
+
+<phrase>and <replaceable class="parameter">event</replaceable> is one of:</phrase>
+ LSN lsn_value
+ TIMEOUT number_of_milliseconds
+ timestamp
</synopsis>
</refsynopsisdiv>
diff --git a/doc/src/sgml/ref/wait.sgml b/doc/src/sgml/ref/wait.sgml
new file mode 100644
index 0000000000..26cae3ad85
--- /dev/null
+++ b/doc/src/sgml/ref/wait.sgml
@@ -0,0 +1,146 @@
+<!--
+doc/src/sgml/ref/wait.sgml
+PostgreSQL documentation
+-->
+
+<refentry id="sql-wait">
+ <indexterm zone="sql-wait">
+ <primary>WAIT FOR</primary>
+ </indexterm>
+
+ <refmeta>
+ <refentrytitle>WAIT FOR</refentrytitle>
+ <manvolnum>7</manvolnum>
+ <refmiscinfo>SQL - Language Statements</refmiscinfo>
+ </refmeta>
+
+ <refnamediv>
+ <refname>WAIT FOR</refname>
+ <refpurpose>wait for the target <acronym>LSN</acronym> to be replayed or for specified time to pass</refpurpose>
+ </refnamediv>
+
+ <refsynopsisdiv>
+<synopsis>
+WAIT FOR [ANY | ALL] <replaceable class="parameter">event</replaceable> [, ...]
+
+<phrase>where <replaceable class="parameter">event</replaceable> is one of:</phrase>
+ LSN value
+ TIMEOUT number_of_milliseconds
+ timestamp
+
+WAIT FOR LSN '<replaceable class="parameter">lsn_number</replaceable>'
+WAIT FOR LSN '<replaceable class="parameter">lsn_number</replaceable>' TIMEOUT <replaceable class="parameter">wait_timeout</replaceable>
+WAIT FOR LSN '<replaceable class="parameter">lsn_number</replaceable>', TIMESTAMP <replaceable class="parameter">wait_time</replaceable>
+WAIT FOR TIMESTAMP <replaceable class="parameter">wait_time</replaceable>
+WAIT FOR ALL LSN '<replaceable class="parameter">lsn_number</replaceable>' TIMEOUT <replaceable class="parameter">wait_timeout</replaceable>, TIMESTAMP <replaceable class="parameter">wait_time</replaceable>
+WAIT FOR ANY LSN '<replaceable class="parameter">lsn_number</replaceable>', TIMESTAMP <replaceable class="parameter">wait_time</replaceable>
+</synopsis>
+ </refsynopsisdiv>
+
+ <refsect1>
+ <title>Description</title>
+
+ <para>
+ <command>WAIT FOR</command> provides a simple interprocess communication
+ mechanism to wait for timestamp, timeout or the target log sequence number
+ (<acronym>LSN</acronym>) on standby in <productname>PostgreSQL</productname>
+ databases with master-standby asynchronous replication. When run with the
+ <replaceable>LSN</replaceable> option, the <command>WAIT FOR</command>
+ command waits for the specified <acronym>LSN</acronym> to be replayed.
+ If no timestamp or timeout was specified, wait time is unlimited.
+ Waiting can be interrupted using <literal>Ctrl+C</literal>, or
+ by shutting down the <literal>postgres</literal> server.
+ </para>
+
+ </refsect1>
+
+ <refsect1>
+ <title>Parameters</title>
+
+ <variablelist>
+ <varlistentry>
+ <term><replaceable class="parameter">LSN</replaceable></term>
+ <listitem>
+ <para>
+ Specify the target log sequence number to wait for.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term>TIMEOUT <replaceable class="parameter">wait_timeout</replaceable></term>
+ <listitem>
+ <para>
+ Limit the time interval to wait for the LSN to be replayed.
+ The specified <replaceable>wait_timeout</replaceable> must be an integer
+ and is measured in milliseconds.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term>UNTIL TIMESTAMP <replaceable class="parameter">wait_time</replaceable></term>
+ <listitem>
+ <para>
+ Limit the time to wait for the LSN to be replayed.
+ The specified <replaceable>wait_time</replaceable> must be timestamp.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ </variablelist>
+ </refsect1>
+
+ <refsect1>
+ <title>Examples</title>
+
+ <para>
+ Run <literal>WAIT FOR</literal> from <application>psql</application>,
+ limiting wait time to 10000 milliseconds:
+
+<screen>
+WAIT FOR LSN '0/3F07A6B1' TIMEOUT 10000;
+NOTICE: LSN is not reached. Try to increase wait time.
+LSN reached
+-------------
+ f
+(1 row)
+</screen>
+ </para>
+
+ <para>
+ Wait until the specified <acronym>LSN</acronym> is replayed:
+<screen>
+WAIT FOR LSN '0/3F07A611';
+LSN reached
+-------------
+ t
+(1 row)
+</screen>
+ </para>
+
+ <para>
+ Limit <acronym>LSN</acronym> wait time to 500000 milliseconds,
+ and then cancel the command if <acronym>LSN</acronym> was not reached:
+<screen>
+WAIT FOR LSN '0/3F0FF791' TIMEOUT 500000;
+^CCancel request sent
+NOTICE: LSN is not reached. Try to increase wait time.
+ERROR: canceling statement due to user request
+ LSN reached
+-------------
+ f
+(1 row)
+</screen>
+</para>
+ </refsect1>
+
+ <refsect1>
+ <title>Compatibility</title>
+
+ <para>
+ There is no <command>WAIT FOR</command> statement in the SQL
+ standard.
+ </para>
+ </refsect1>
+</refentry>
diff --git a/doc/src/sgml/reference.sgml b/doc/src/sgml/reference.sgml
index e11b4b6130..a83ff4551e 100644
--- a/doc/src/sgml/reference.sgml
+++ b/doc/src/sgml/reference.sgml
@@ -216,6 +216,7 @@
&update;
&vacuum;
&values;
+ &wait;
</reference>
diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index dbe9394762..5b3e3bab0d 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -43,6 +43,7 @@
#include "backup/basebackup.h"
#include "catalog/pg_control.h"
#include "commands/tablespace.h"
+#include "commands/wait.h"
#include "common/file_utils.h"
#include "miscadmin.h"
#include "pgstat.h"
@@ -1752,6 +1753,13 @@ PerformWalRecovery(void)
break;
}
+ /*
+ * If we replayed an LSN that someone was waiting for,
+ * set latches in shared memory array to notify the waiter.
+ */
+ if (XLogRecoveryCtl->lastReplayedEndRecPtr >= GetMinWait())
+ WaitSetLatch(XLogRecoveryCtl->lastReplayedEndRecPtr);
+
/* Else, try to fetch the next WAL record */
record = ReadRecord(xlogprefetcher, LOG, false, replayTLI);
} while (record != NULL);
diff --git a/src/backend/commands/Makefile b/src/backend/commands/Makefile
index 48f7348f91..d8f6965d8c 100644
--- a/src/backend/commands/Makefile
+++ b/src/backend/commands/Makefile
@@ -61,6 +61,7 @@ OBJS = \
vacuum.o \
vacuumparallel.o \
variable.o \
- view.o
+ view.o \
+ wait.o
include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/commands/wait.c b/src/backend/commands/wait.c
new file mode 100644
index 0000000000..0bb43feb71
--- /dev/null
+++ b/src/backend/commands/wait.c
@@ -0,0 +1,394 @@
+/*-------------------------------------------------------------------------
+ *
+ * wait.c
+ * Implements WAIT FOR, which allows waiting for events such as
+ * time passing or LSN having been replayed on replica.
+ *
+ * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
+ * Portions Copyright (c) 2020, Regents of PostgresPro
+ *
+ * IDENTIFICATION
+ * src/backend/commands/wait.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include <float.h>
+#include <math.h>
+#include "postgres.h"
+#include "pgstat.h"
+#include "fmgr.h"
+#include "access/transam.h"
+#include "access/xact.h"
+#include "access/xlogdefs.h"
+#include "access/xlogrecovery.h"
+#include "catalog/pg_type.h"
+#include "commands/wait.h"
+#include "funcapi.h"
+#include "miscadmin.h"
+#include "storage/backendid.h"
+#include "storage/ipc.h"
+#include "storage/latch.h"
+#include "storage/pmsignal.h"
+#include "storage/proc.h"
+#include "storage/shmem.h"
+#include "storage/spin.h"
+#include "storage/sinvaladt.h"
+#include "utils/builtins.h"
+#include "utils/pg_lsn.h"
+#include "utils/timestamp.h"
+#include "executor/spi.h"
+#include "utils/fmgrprotos.h"
+
+/* Add to / delete from shared memory array */
+static void AddEvent(XLogRecPtr lsn_to_wait);
+static void DeleteEvent(void);
+
+/* Shared memory structure */
+typedef struct
+{
+ int backend_maxid;
+ XLogRecPtr min_lsn;
+ slock_t mutex;
+ XLogRecPtr waited_lsn[FLEXIBLE_ARRAY_MEMBER];
+} WaitState;
+
+static volatile WaitState *state;
+
+/* Add the event of the current backend to the shared memory array */
+static void
+AddEvent(XLogRecPtr lsn_to_wait)
+{
+ SpinLockAcquire(&state->mutex);
+ if (state->backend_maxid < MyBackendId)
+ state->backend_maxid = MyBackendId;
+
+ state->waited_lsn[MyBackendId] = lsn_to_wait;
+
+ if (lsn_to_wait < state->min_lsn)
+ state->min_lsn = lsn_to_wait;
+ SpinLockRelease(&state->mutex);
+}
+
+/*
+ * Delete event of the current backend from the shared memory array.
+ *
+ * TODO: Consider state cleanup on backend failure.
+ * Check:
+ * 1) nomal|smart|fast|immediate stop
+ * 2) SIGKILL and SIGTERM
+ */
+static void
+DeleteEvent(void)
+{
+ int i;
+ XLogRecPtr lsn_to_delete = state->waited_lsn[MyBackendId];
+
+ state->waited_lsn[MyBackendId] = InvalidXLogRecPtr;
+
+ SpinLockAcquire(&state->mutex);
+
+ /* If we need to choose the next min_lsn, update state->min_lsn */
+ if (state->min_lsn == lsn_to_delete)
+ {
+ state->min_lsn = PG_UINT64_MAX;
+ for (i = 2; i <= state->backend_maxid; i++)
+ if (state->waited_lsn[i] != InvalidXLogRecPtr &&
+ state->waited_lsn[i] < state->min_lsn)
+ state->min_lsn = state->waited_lsn[i];
+ }
+
+ if (state->backend_maxid == MyBackendId)
+ for (i = (MyBackendId); i >= 2; i--)
+ if (state->waited_lsn[i] != InvalidXLogRecPtr)
+ {
+ state->backend_maxid = i;
+ break;
+ }
+
+ SpinLockRelease(&state->mutex);
+}
+
+/*
+ * Report amount of shared memory space needed for WaitState
+ */
+Size
+WaitShmemSize(void)
+{
+ Size size;
+
+ size = offsetof(WaitState, waited_lsn);
+ size = add_size(size, mul_size(MaxBackends + 1, sizeof(XLogRecPtr)));
+ return size;
+}
+
+/* Init array of events in shared memory */
+void
+WaitShmemInit(void)
+{
+ bool found;
+ uint32 i;
+
+ state = (WaitState *) ShmemInitStruct("pg_wait_lsn",
+ WaitShmemSize(),
+ &found);
+ if (!found)
+ {
+ SpinLockInit(&state->mutex);
+
+ for (i = 0; i < (MaxBackends + 1); i++)
+ state->waited_lsn[i] = InvalidXLogRecPtr;
+
+ state->backend_maxid = 0;
+ state->min_lsn = PG_UINT64_MAX;
+ }
+}
+
+/* Set all latches in shared memory to signal that new LSN has been replayed */
+void
+WaitSetLatch(XLogRecPtr cur_lsn)
+{
+ uint32 i;
+ int backend_maxid;
+ PGPROC *backend;
+
+ SpinLockAcquire(&state->mutex);
+ backend_maxid = state->backend_maxid;
+ SpinLockRelease(&state->mutex);
+
+ for (i = 2; i <= backend_maxid; i++)
+ {
+ backend = BackendIdGetProc(i);
+ if (state->waited_lsn[i] != 0)
+ {
+ if (backend && state->waited_lsn[i] <= cur_lsn)
+ SetLatch(&backend->procLatch);
+ }
+ }
+}
+
+/* Get minimal LSN that will be next */
+XLogRecPtr
+GetMinWait(void)
+{
+ return state->min_lsn;
+}
+
+/*
+ * On WAIT use MyLatch to wait till LSN is replayed,
+ * postmaster dies or timeout happens.
+ */
+int
+WaitUtility(XLogRecPtr lsn, const float8 secs)
+{
+ XLogRecPtr cur_lsn = GetXLogReplayRecPtr(NULL);
+ int latch_events;
+ float8 endtime;
+ uint res = 0;
+
+#define GetNowFloat() ((float8) GetCurrentTimestamp() / 1000000.0)
+ endtime = GetNowFloat() + secs;
+
+latch_events = WL_TIMEOUT | WL_EXIT_ON_PM_DEATH;
+
+ if (lsn != InvalidXLogRecPtr)
+ {
+ /* Just check if we reached */
+ if (lsn < cur_lsn || secs < 0)
+ return (lsn < cur_lsn);
+
+ latch_events |= WL_LATCH_SET;
+ AddEvent(lsn);
+ }
+ else if (!secs)
+ return 1;
+
+ for (;;)
+ {
+ int rc;
+ float8 delay = 0;
+ long delay_ms;
+
+ if (secs > 0)
+ delay = endtime - GetNowFloat();
+ else if (secs == 0)
+ /*
+ * If we wait forever, then 1 minute timeout to check
+ * for Interupts.
+ */
+ delay = 60;
+
+ if (delay > 0.0)
+ delay_ms = (long) ceil(delay * 1000.0);
+ else
+ break;
+
+ /*
+ * If received an interruption from CHECK_FOR_INTERRUPTS,
+ * then delete the current event from array.
+ */
+ if (InterruptPending)
+ {
+ if (lsn != InvalidXLogRecPtr)
+ DeleteEvent();
+ ProcessInterrupts();
+ }
+
+ /* If postmaster dies, finish immediately */
+ if (!PostmasterIsAlive())
+ break;
+
+ rc = WaitLatch(MyLatch, latch_events, delay_ms,
+ WAIT_EVENT_CLIENT_READ);
+
+ if (rc & WL_LATCH_SET)
+ ResetLatch(MyLatch);
+
+ if (lsn && rc & WL_LATCH_SET)
+ cur_lsn = GetXLogReplayRecPtr(NULL);
+
+ /* If LSN has been replayed */
+ if (lsn && lsn <= cur_lsn)
+ break;
+ }
+
+ if (lsn != InvalidXLogRecPtr)
+ DeleteEvent();
+
+ if (lsn != InvalidXLogRecPtr && lsn > cur_lsn)
+ elog(NOTICE,"LSN is not reached. Try to increase wait time.");
+ else
+ res = 1;
+
+ return res;
+}
+
+/*
+ * Get the amount of seconds left till the specified time.
+ */
+float8
+WaitTimeResolve(Const *time)
+{
+ int ret;
+ float8 val;
+ Oid types[] = { time->consttype };
+ Datum values[] = { time->constvalue };
+ char nulls[] = { " " };
+ Datum result;
+ bool isnull;
+
+ SPI_connect();
+
+ if (time->consttype == 1083)
+ ret = SPI_execute_with_args("select extract (epoch from ($1 - now()::time))",
+ 1, types, values, nulls, true, 0);
+ else if (time->consttype == 1266)
+ ret = SPI_execute_with_args("select extract (epoch from (timezone('UTC',$1)::time - timezone('UTC', now()::timetz)::time))",
+ 1, types, values, nulls, true, 0);
+ else
+ ret = SPI_execute_with_args("select extract (epoch from ($1 - now()))",
+ 1, types, values, nulls, true, 0);
+
+ Assert(ret >= 0);
+ result = SPI_getbinval(SPI_tuptable->vals[0],
+ SPI_tuptable->tupdesc,
+ 1, &isnull);
+
+ Assert(!isnull);
+ val = DatumGetFloat8(result);
+
+ elog(INFO, "time: %f", val);
+
+ SPI_finish();
+ return val;
+}
+
+/* Implementation of WAIT FOR */
+int
+WaitMain(WaitStmt *stmt, DestReceiver *dest)
+{
+ ListCell *events;
+ TupleDesc tupdesc;
+ TupOutputState *tstate;
+ float8 delay = 0;
+ float8 final_delay = 0;
+ XLogRecPtr lsn = InvalidXLogRecPtr;
+ XLogRecPtr final_lsn = InvalidXLogRecPtr;
+ bool has_lsn = false;
+ bool wait_forever = true;
+ int res = 0;
+
+ if (stmt->strategy == WAIT_FOR_ANY)
+ {
+ /* Prepare to find minimum LSN and delay */
+ final_delay = DBL_MAX;
+ final_lsn = PG_UINT64_MAX;
+ }
+
+ /* Extract options from the statement node tree */
+ foreach(events, stmt->events)
+ {
+ WaitStmt *event = (WaitStmt *) lfirst(events);
+
+ /* LSN to wait for */
+ if (event->lsn)
+ {
+ has_lsn = true;
+ lsn = DatumGetLSN(
+ DirectFunctionCall1(pg_lsn_in,
+ CStringGetDatum(event->lsn)));
+
+ /*
+ * When waiting for ALL, select max LSN to wait for.
+ * When waiting for ANY, select min LSN to wait for.
+ */
+ if ((stmt->strategy == WAIT_FOR_ALL && final_lsn <= lsn) ||
+ (stmt->strategy == WAIT_FOR_ANY && final_lsn > lsn))
+ {
+ final_lsn = lsn;
+ }
+ }
+
+ /* Time delay to wait for */
+ if (event->time || event->delay)
+ {
+ wait_forever = false;
+
+ if (event->delay)
+ delay = event->delay / 1000.0;
+
+ if (event->time)
+ {
+ Const *time = (Const *) event->time;
+ delay = WaitTimeResolve(time);
+ }
+
+ /*
+ * When waiting for ALL, select max delay to wait for.
+ * When waiting for ANY, select min delay to wait for.
+ */
+ if ((stmt->strategy == WAIT_FOR_ALL && final_delay <= delay) ||
+ (stmt->strategy == WAIT_FOR_ANY && final_delay > delay))
+ {
+ final_delay = delay;
+ }
+ }
+ }
+ if (wait_forever)
+ final_delay = 0;
+ if (!has_lsn)
+ final_lsn = InvalidXLogRecPtr;
+
+ res = WaitUtility(final_lsn, final_delay);
+
+ /* Need a tuple descriptor representing a single TEXT column */
+ tupdesc = CreateTemplateTupleDesc(1);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 1, "LSN reached", TEXTOID, -1, 0);
+ /* Prepare for projection of tuples */
+ tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsMinimalTuple);
+
+ /* Send it */
+ do_text_output_oneline(tstate, res?"t":"f");
+ end_tup_output(tstate);
+ return res;
+}
diff --git a/src/backend/parser/analyze.c b/src/backend/parser/analyze.c
index e892df9819..c4a303748e 100644
--- a/src/backend/parser/analyze.c
+++ b/src/backend/parser/analyze.c
@@ -84,6 +84,7 @@ static Query *transformCreateTableAsStmt(ParseState *pstate,
CreateTableAsStmt *stmt);
static Query *transformCallStmt(ParseState *pstate,
CallStmt *stmt);
+static void transformWaitForStmt(ParseState *pstate, WaitStmt *stmt);
static void transformLockingClause(ParseState *pstate, Query *qry,
LockingClause *lc, bool pushedDown);
#ifdef RAW_EXPRESSION_COVERAGE_TEST
@@ -399,7 +400,20 @@ transformStmt(ParseState *pstate, Node *parseTree)
result = transformCallStmt(pstate,
(CallStmt *) parseTree);
break;
-
+ case T_WaitStmt:
+ transformWaitForStmt(pstate, (WaitStmt *) parseTree);
+ result = makeNode(Query);
+ result->commandType = CMD_UTILITY;
+ result->utilityStmt = (Node *) parseTree;
+ break;
+ case T_TransactionStmt:
+ {
+ TransactionStmt *stmt = (TransactionStmt *) parseTree;
+ if ((stmt->kind == TRANS_STMT_BEGIN ||
+ stmt->kind == TRANS_STMT_START) && stmt->wait)
+ transformWaitForStmt(pstate, (WaitStmt *) stmt->wait);
+ }
+ /* no break here - we want to fall through to the default */
default:
/*
@@ -3492,6 +3506,23 @@ applyLockingClause(Query *qry, Index rtindex,
qry->rowMarks = lappend(qry->rowMarks, rc);
}
+/*
+ * transformWaitForStmt -
+ * transform the WAIT FOR clause of the BEGIN statement
+ * transform the WAIT FOR statement (TODO: remove this line if we don't keep it)
+ */
+static void
+transformWaitForStmt(ParseState *pstate, WaitStmt *stmt)
+{
+ ListCell *events;
+
+ foreach(events, stmt->events)
+ {
+ WaitStmt *event = (WaitStmt *) lfirst(events);
+ event->time = transformExpr(pstate, event->time, EXPR_KIND_OTHER);
+ }
+}
+
/*
* Coverage testing for raw_expression_tree_walker().
*
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index a0138382a1..03708b5221 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -310,7 +310,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
SecLabelStmt SelectStmt TransactionStmt TransactionStmtLegacy TruncateStmt
UnlistenStmt UpdateStmt VacuumStmt
VariableResetStmt VariableSetStmt VariableShowStmt
- ViewStmt CheckPointStmt CreateConversionStmt
+ ViewStmt WaitStmt CheckPointStmt CreateConversionStmt
DeallocateStmt PrepareStmt ExecuteStmt
DropOwnedStmt ReassignOwnedStmt
AlterTSConfigurationStmt AlterTSDictionaryStmt
@@ -534,7 +534,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
%type <node> case_expr case_arg when_clause case_default
%type <list> when_clause_list
%type <node> opt_search_clause opt_cycle_clause
-%type <ival> sub_type opt_materialized
+%type <ival> sub_type wait_strategy opt_materialized
%type <node> NumericOnly
%type <list> NumericOnly_list
%type <alias> alias_clause opt_alias_clause opt_alias_clause_for_join_using
@@ -642,6 +642,8 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
%type <partboundspec> PartitionBoundSpec
%type <list> hash_partbound
%type <defelt> hash_partbound_elem
+%type <list> wait_list
+%type <node> WaitEvent wait_for
/*
@@ -712,7 +714,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
LABEL LANGUAGE LARGE_P LAST_P LATERAL_P
LEADING LEAKPROOF LEAST LEFT LEVEL LIKE LIMIT LISTEN LOAD LOCAL
- LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOCKED LOGGED
+ LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOCKED LOGGED LSN
MAPPING MATCH MATCHED MATERIALIZED MAXVALUE MERGE METHOD
MINUTE_P MINVALUE MODE MONTH_P MOVE
@@ -745,7 +747,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
SUBSCRIPTION SUBSTRING SUPPORT SYMMETRIC SYSID SYSTEM_P SYSTEM_USER
TABLE TABLES TABLESAMPLE TABLESPACE TEMP TEMPLATE TEMPORARY TEXT_P THEN
- TIES TIME TIMESTAMP TO TRAILING TRANSACTION TRANSFORM
+ TIES TIME TIMEOUT TIMESTAMP TO TRAILING TRANSACTION TRANSFORM
TREAT TRIGGER TRIM TRUE_P
TRUNCATE TRUSTED TYPE_P TYPES_P
@@ -755,7 +757,8 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
VACUUM VALID VALIDATE VALIDATOR VALUE_P VALUES VARCHAR VARIADIC VARYING
VERBOSE VERSION_P VIEW VIEWS VOLATILE
- WHEN WHERE WHITESPACE_P WINDOW WITH WITHIN WITHOUT WORK WRAPPER WRITE
+ WAIT WHEN WHERE WHITESPACE_P WINDOW
+ WITH WITHIN WITHOUT WORK WRAPPER WRITE
XML_P XMLATTRIBUTES XMLCONCAT XMLELEMENT XMLEXISTS XMLFOREST XMLNAMESPACES
XMLPARSE XMLPI XMLROOT XMLSERIALIZE XMLTABLE
@@ -1064,6 +1067,7 @@ stmt:
| VariableSetStmt
| VariableShowStmt
| ViewStmt
+ | WaitStmt
| /*EMPTY*/
{ $$ = NULL; }
;
@@ -10838,12 +10842,13 @@ TransactionStmt:
n->chain = $3;
$$ = (Node *) n;
}
- | START TRANSACTION transaction_mode_list_or_empty
+ | START TRANSACTION transaction_mode_list_or_empty wait_for
{
TransactionStmt *n = makeNode(TransactionStmt);
n->kind = TRANS_STMT_START;
n->options = $3;
+ n->wait = $4;
$$ = (Node *) n;
}
| COMMIT opt_transaction opt_transaction_chain
@@ -10931,12 +10936,13 @@ TransactionStmt:
;
TransactionStmtLegacy:
- BEGIN_P opt_transaction transaction_mode_list_or_empty
+ BEGIN_P opt_transaction transaction_mode_list_or_empty wait_for
{
TransactionStmt *n = makeNode(TransactionStmt);
n->kind = TRANS_STMT_BEGIN;
n->options = $3;
+ n->wait = $4;
$$ = (Node *) n;
}
| END_P opt_transaction opt_transaction_chain
@@ -15622,6 +15628,74 @@ xml_passing_mech:
| BY VALUE_P
;
+/*****************************************************************************
+ *
+ * QUERY:
+ * WAIT FOR <event> [, <event> ...]
+ * event is one of:
+ * LSN value
+ * TIMEOUT delay
+ * timestamp
+ *
+ *****************************************************************************/
+WaitStmt:
+ WAIT FOR wait_strategy wait_list
+ {
+ WaitStmt *n = makeNode(WaitStmt);
+ n->strategy = $3;
+ n->events = $4;
+ $$ = (Node *)n;
+ }
+ ;
+wait_for:
+ WAIT FOR wait_strategy wait_list
+ {
+ WaitStmt *n = makeNode(WaitStmt);
+ n->strategy = $3;
+ n->events = $4;
+ $$ = (Node *)n;
+ }
+ | /* EMPTY */ { $$ = NULL; };
+
+wait_strategy:
+ ALL { $$ = WAIT_FOR_ALL; }
+ | ANY { $$ = WAIT_FOR_ANY; }
+ | /* EMPTY */ { $$ = WAIT_FOR_ALL; }
+ ;
+
+wait_list:
+ WaitEvent { $$ = list_make1($1); }
+ | wait_list ',' WaitEvent { $$ = lappend($1, $3); }
+ | wait_list WaitEvent { $$ = lappend($1, $2); }
+ ;
+
+WaitEvent:
+ LSN Sconst
+ {
+ WaitStmt *n = makeNode(WaitStmt);
+ n->lsn = $2;
+ n->delay = 0;
+ n->time = NULL;
+ $$ = (Node *)n;
+ }
+ | TIMEOUT Iconst
+ {
+ WaitStmt *n = makeNode(WaitStmt);
+ n->lsn = NULL;
+ n->delay = $2;
+ n->time = NULL;
+ $$ = (Node *)n;
+ }
+ | ConstDatetime Sconst
+ {
+ WaitStmt *n = makeNode(WaitStmt);
+ n->lsn = NULL;
+ n->delay = 0;
+ n->time = makeStringConstCast($2, @2, $1);
+ $$ = (Node *)n;
+ }
+ ;
+
/*
* Aggregate decoration clauses
@@ -16853,6 +16927,7 @@ unreserved_keyword:
| LOCK_P
| LOCKED
| LOGGED
+ | LSN
| MAPPING
| MATCH
| MATCHED
@@ -16984,6 +17059,7 @@ unreserved_keyword:
| TEMPORARY
| TEXT_P
| TIES
+ | TIMEOUT
| TRANSACTION
| TRANSFORM
| TRIGGER
@@ -17010,6 +17086,7 @@ unreserved_keyword:
| VIEW
| VIEWS
| VOLATILE
+ | WAIT
| WHITESPACE_P
| WITHIN
| WITHOUT
@@ -17424,6 +17501,7 @@ bare_label_keyword:
| LOCK_P
| LOCKED
| LOGGED
+ | LSN
| MAPPING
| MATCH
| MATCHED
@@ -17585,6 +17663,7 @@ bare_label_keyword:
| THEN
| TIES
| TIME
+ | TIMEOUT
| TIMESTAMP
| TRAILING
| TRANSACTION
@@ -17622,6 +17701,7 @@ bare_label_keyword:
| VIEW
| VIEWS
| VOLATILE
+ | WAIT
| WHEN
| WHITESPACE_P
| WORK
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 8f1ded7338..106c6ac594 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -25,6 +25,7 @@
#include "access/xlogprefetcher.h"
#include "access/xlogrecovery.h"
#include "commands/async.h"
+#include "commands/wait.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/autovacuum.h"
@@ -142,6 +143,7 @@ CalculateShmemSize(int *num_semaphores)
size = add_size(size, SyncScanShmemSize());
size = add_size(size, AsyncShmemSize());
size = add_size(size, StatsShmemSize());
+ size = add_size(size, WaitShmemSize());
#ifdef EXEC_BACKEND
size = add_size(size, ShmemBackendArraySize());
#endif
@@ -295,6 +297,11 @@ CreateSharedMemoryAndSemaphores(void)
AsyncShmemInit();
StatsShmemInit();
+ /*
+ * Init array of Latches in shared memory for WAIT
+ */
+ WaitShmemInit();
+
#ifdef EXEC_BACKEND
/*
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index c7d9d96b45..b2ed7a8b21 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -15,6 +15,7 @@
*-------------------------------------------------------------------------
*/
#include "postgres.h"
+#include <float.h>
#include "access/htup_details.h"
#include "access/reloptions.h"
@@ -59,6 +60,7 @@
#include "commands/user.h"
#include "commands/vacuum.h"
#include "commands/view.h"
+#include "commands/wait.h"
#include "miscadmin.h"
#include "parser/parse_utilcmd.h"
#include "postmaster/bgwriter.h"
@@ -72,6 +74,9 @@
#include "utils/lsyscache.h"
#include "utils/rel.h"
#include "utils/syscache.h"
+#include "executor/spi.h"
+#include "utils/fmgrprotos.h"
+#include "utils/pg_lsn.h"
/* Hook for plugins to get control in ProcessUtility() */
ProcessUtility_hook_type ProcessUtility_hook = NULL;
@@ -272,6 +277,7 @@ ClassifyUtilityCommandAsReadOnly(Node *parsetree)
case T_LoadStmt:
case T_PrepareStmt:
case T_UnlistenStmt:
+ case T_WaitStmt:
case T_VariableSetStmt:
{
/*
@@ -612,6 +618,11 @@ standard_ProcessUtility(PlannedStmt *pstmt,
case TRANS_STMT_START:
{
ListCell *lc;
+ WaitStmt *waitstmt = (WaitStmt *) stmt->wait;
+
+ /* If needed to WAIT FOR something but failed */
+ if (stmt->wait && WaitMain(waitstmt, dest) == 0)
+ break;
BeginTransactionBlock();
foreach(lc, stmt->options)
@@ -1069,6 +1080,13 @@ standard_ProcessUtility(PlannedStmt *pstmt,
break;
}
+ case T_WaitStmt:
+ {
+ WaitStmt *stmt = (WaitStmt *) parsetree;
+ WaitMain(stmt, dest);
+ break;
+ }
+
default:
/* All other statement types have event trigger support */
ProcessUtilitySlow(pstate, pstmt, queryString,
@@ -2833,6 +2851,10 @@ CreateCommandTag(Node *parsetree)
tag = CMDTAG_NOTIFY;
break;
+ case T_WaitStmt:
+ tag = CMDTAG_WAIT;
+ break;
+
case T_ListenStmt:
tag = CMDTAG_LISTEN;
break;
@@ -3481,6 +3503,10 @@ GetCommandLogLevel(Node *parsetree)
lev = LOGSTMT_ALL;
break;
+ case T_WaitStmt:
+ lev = LOGSTMT_ALL;
+ break;
+
case T_ListenStmt:
lev = LOGSTMT_ALL;
break;
diff --git a/src/include/commands/wait.h b/src/include/commands/wait.h
new file mode 100644
index 0000000000..0270160d44
--- /dev/null
+++ b/src/include/commands/wait.h
@@ -0,0 +1,26 @@
+/*-------------------------------------------------------------------------
+ *
+ * wait.h
+ * prototypes for commands/wait.c
+ *
+ * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
+ * Portions Copyright (c) 2016, Regents of PostgresPRO
+ *
+ * src/include/commands/wait.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef WAIT_H
+#define WAIT_H
+#include "postgres.h"
+#include "tcop/dest.h"
+
+extern int WaitUtility(XLogRecPtr lsn, const float8 delay);
+extern Size WaitShmemSize(void);
+extern void WaitShmemInit(void);
+extern void WaitSetLatch(XLogRecPtr cur_lsn);
+extern XLogRecPtr GetMinWait(void);
+extern float8 WaitTimeResolve(Const *time);
+extern int WaitMain(WaitStmt *stmt, DestReceiver *dest);
+
+#endif /* WAIT_H */
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index f7d7f10f7d..0a1ac70991 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3393,6 +3393,7 @@ typedef struct TransactionStmt
char *savepoint_name; /* for savepoint commands */
char *gid; /* for two-phase-commit related commands */
bool chain; /* AND CHAIN option */
+ Node *wait; /* WAIT clause: list of events to wait for */
} TransactionStmt;
/* ----------------------
@@ -3938,4 +3939,26 @@ typedef struct DropSubscriptionStmt
DropBehavior behavior; /* RESTRICT or CASCADE behavior */
} DropSubscriptionStmt;
+/* ----------------------
+ * WAIT FOR Statement + WAIT FOR clause of BEGIN statement
+ * TODO: if we only pick one, remove the other
+ * ----------------------
+ */
+
+typedef enum WaitForStrategy
+{
+ WAIT_FOR_ANY = 0,
+ WAIT_FOR_ALL
+} WaitForStrategy;
+
+typedef struct WaitStmt
+{
+ NodeTag type;
+ WaitForStrategy strategy;
+ List *events; /* used as a pointer to the next WAIT event */
+ char *lsn; /* WAIT FOR LSN */
+ int delay; /* WAIT FOR TIMEOUT */
+ Node *time; /* WAIT FOR TIMESTAMP or TIME */
+} WaitStmt;
+
#endif /* PARSENODES_H */
diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h
index bb36213e6f..13b16c4d94 100644
--- a/src/include/parser/kwlist.h
+++ b/src/include/parser/kwlist.h
@@ -249,6 +249,7 @@ PG_KEYWORD("location", LOCATION, UNRESERVED_KEYWORD, BARE_LABEL)
PG_KEYWORD("lock", LOCK_P, UNRESERVED_KEYWORD, BARE_LABEL)
PG_KEYWORD("locked", LOCKED, UNRESERVED_KEYWORD, BARE_LABEL)
PG_KEYWORD("logged", LOGGED, UNRESERVED_KEYWORD, BARE_LABEL)
+PG_KEYWORD("lsn", LSN, UNRESERVED_KEYWORD, BARE_LABEL)
PG_KEYWORD("mapping", MAPPING, UNRESERVED_KEYWORD, BARE_LABEL)
PG_KEYWORD("match", MATCH, UNRESERVED_KEYWORD, BARE_LABEL)
PG_KEYWORD("matched", MATCHED, UNRESERVED_KEYWORD, BARE_LABEL)
@@ -421,6 +422,7 @@ PG_KEYWORD("text", TEXT_P, UNRESERVED_KEYWORD, BARE_LABEL)
PG_KEYWORD("then", THEN, RESERVED_KEYWORD, BARE_LABEL)
PG_KEYWORD("ties", TIES, UNRESERVED_KEYWORD, BARE_LABEL)
PG_KEYWORD("time", TIME, COL_NAME_KEYWORD, BARE_LABEL)
+PG_KEYWORD("timeout", TIMEOUT, UNRESERVED_KEYWORD, BARE_LABEL)
PG_KEYWORD("timestamp", TIMESTAMP, COL_NAME_KEYWORD, BARE_LABEL)
PG_KEYWORD("to", TO, RESERVED_KEYWORD, AS_LABEL)
PG_KEYWORD("trailing", TRAILING, RESERVED_KEYWORD, BARE_LABEL)
@@ -461,6 +463,7 @@ PG_KEYWORD("version", VERSION_P, UNRESERVED_KEYWORD, BARE_LABEL)
PG_KEYWORD("view", VIEW, UNRESERVED_KEYWORD, BARE_LABEL)
PG_KEYWORD("views", VIEWS, UNRESERVED_KEYWORD, BARE_LABEL)
PG_KEYWORD("volatile", VOLATILE, UNRESERVED_KEYWORD, BARE_LABEL)
+PG_KEYWORD("wait", WAIT, UNRESERVED_KEYWORD, BARE_LABEL)
PG_KEYWORD("when", WHEN, RESERVED_KEYWORD, BARE_LABEL)
PG_KEYWORD("where", WHERE, RESERVED_KEYWORD, AS_LABEL)
PG_KEYWORD("whitespace", WHITESPACE_P, UNRESERVED_KEYWORD, BARE_LABEL)
diff --git a/src/include/tcop/cmdtaglist.h b/src/include/tcop/cmdtaglist.h
index e738ac1c09..c699cbb175 100644
--- a/src/include/tcop/cmdtaglist.h
+++ b/src/include/tcop/cmdtaglist.h
@@ -216,3 +216,4 @@ PG_CMDTAG(CMDTAG_TRUNCATE_TABLE, "TRUNCATE TABLE", false, false, false)
PG_CMDTAG(CMDTAG_UNLISTEN, "UNLISTEN", false, false, false)
PG_CMDTAG(CMDTAG_UPDATE, "UPDATE", false, false, true)
PG_CMDTAG(CMDTAG_VACUUM, "VACUUM", false, false, false)
+PG_CMDTAG(CMDTAG_WAIT, "WAIT FOR", false, false, false)
diff --git a/src/test/recovery/t/034_begin_wait.pl b/src/test/recovery/t/034_begin_wait.pl
new file mode 100644
index 0000000000..f1e5b5b23d
--- /dev/null
+++ b/src/test/recovery/t/034_begin_wait.pl
@@ -0,0 +1,146 @@
+# Checks WAIT FOR
+use strict;
+use warnings;
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Initialize primary node
+my $node_primary = PostgreSQL::Test::Cluster->new('primary');
+$node_primary->init(allows_streaming => 1);
+$node_primary->start;
+
+# And some content and take a backup
+$node_primary->safe_psql('postgres',
+ "CREATE TABLE wait_test AS SELECT generate_series(1,10) AS a");
+my $backup_name = 'my_backup';
+$node_primary->backup($backup_name);
+
+# Create a streaming standby with a 1 second delay from the backup
+my $node_standby = PostgreSQL::Test::Cluster->new('standby');
+my $delay = 1;
+$node_standby->init_from_backup($node_primary, $backup_name,
+ has_streaming => 1);
+$node_standby->append_conf('postgresql.conf', qq[
+ recovery_min_apply_delay = '${delay}s'
+]);
+$node_standby->start;
+
+
+
+# Make sure that WAIT FOR LSN works: add new content to primary and memorize
+# primary's new LSN, then wait for primary's LSN on standby. Prove that WAIT is
+# able to setup an infinite waiting loop and exit it if given no wait timeout.
+$node_primary->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(11, 20))");
+my $lsn1 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+$node_standby->safe_psql('postgres', "BEGIN WAIT FOR LSN '$lsn1'");
+
+# Get the current LSN on standby and make sure it's the same as primary's LSN
+my $lsn_standby = $node_standby->safe_psql('postgres',
+ "SELECT pg_last_wal_replay_lsn()");
+my $compare_lsns = $node_standby->safe_psql('postgres',
+ "SELECT pg_lsn_cmp('$lsn_standby'::pg_lsn, '$lsn1'::pg_lsn)");
+ok($compare_lsns eq 0, "standby reached the same LSN as primary after WAIT");
+
+
+
+# Check that timeouts work on their own and let us wait for specified time (1s)
+my $current_time = $node_standby->safe_psql('postgres', "SELECT now()");
+my $one_second = 1000; # in milliseconds
+my $start_time = time();
+# While we're at it, also make sure that the syntax with commas works fine and
+# that by default we use WAIT FOR ALL strategy, which means waiting for max time
+$node_standby->safe_psql('postgres',
+ "WAIT FOR TIMEOUT $one_second, TIMESTAMP '$current_time'");
+my $time_waited = (time() - $start_time) * 1000; # convert to milliseconds
+ok($time_waited >= $one_second, "WAIT FOR TIMEOUT waits for enough time");
+
+# Now, check that timeouts work as expected when waiting for LSN
+$node_primary->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(21, 30))");
+my $lsn2 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+my $reached_lsn = $node_standby->safe_psql('postgres',
+ "BEGIN WAIT FOR LSN '$lsn2' TIMEOUT 1");
+ok($reached_lsn eq "f", "WAIT doesn't reach LSN if given too little wait time");
+
+
+#===============================================================================
+# TODO: remove this test if we remove the standalone "WAIT FOR" command
+#===============================================================================
+# We need to check that WAIT works fine inside transactions. For that, let's
+# get two LSNs that will correspond to two different max values in our table.
+$node_primary->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(31, 40))");
+my $lsn3 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+$node_primary->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(41, 50))");
+my $lsn4 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+
+# Before starting transaction, wait for LSN which ensures a max value of 40.
+# Inside the transaction, wait for LSN that ensures a max value of 50.
+# Due to ISOLATION LEVEL REPEATABLE READ, we should NOT see the new max value.
+my $standby_results = $node_standby->safe_psql(
+ 'postgres', qq[
+ BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ WAIT FOR LSN '$lsn3';
+ SELECT max(a) FROM wait_test;
+ BEGIN WAIT FOR LSN '$lsn4';
+ SELECT pg_last_wal_replay_lsn();
+ SELECT max(a) FROM wait_test;
+ COMMIT;
+]);
+
+# Make sure that we indeed reach primary's last LSN inside the transaction.
+# For that, check that calling pg_last_wal_replay_lsn returned that LSN.
+my $last_lsn_reached = $standby_results =~ /$lsn4/;
+ok($last_lsn_reached, "WAIT FOR LSN works inside a transaction");
+
+# Check that transaction doesn't break and show us the new max value after WAIT.
+# For that, make sure that the older max value is repeated twice in the results.
+my $count = () = $standby_results =~ /40/g;
+ok($count eq 2, "transaction isolation level doesn't get broken due to WAIT");
+
+
+
+# Get multiple LSNs for testing WAIT FOR ANY / WAIT FOR ALL
+$node_primary->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(51, 60))");
+my $lsn5 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+$node_primary->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(61, 70000))");
+my $lsn6 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+$node_primary->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(61, 800000))");
+my $lsn7 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+
+# Check that WAIT FOR ANY works fine
+$node_standby->safe_psql('postgres',
+ "BEGIN WAIT FOR ANY LSN '$lsn5' LSN '$lsn6' LSN '$lsn7'");
+$lsn_standby = $node_standby->safe_psql('postgres',
+ "SELECT pg_last_wal_replay_lsn()");
+$compare_lsns = $node_standby->safe_psql('postgres',
+ "SELECT pg_lsn_cmp('$lsn_standby'::pg_lsn, '$lsn5'::pg_lsn)");
+ok($compare_lsns ge 0,
+ "WAIT FOR ANY makes us reach at least the minimum LSN from the list");
+$compare_lsns = $node_standby->safe_psql('postgres',
+ "SELECT pg_lsn_cmp('$lsn_standby'::pg_lsn, '$lsn7'::pg_lsn)");
+# TODO: Could this somehow fail due to the machine being very fast at applying LSN?
+ok($compare_lsns lt 0,
+ "WAIT FOR ANY didn't make us reach the maximum LSN from the list");
+
+# Check that WAIT FOR ALL works fine
+$node_standby->safe_psql('postgres',
+ "BEGIN WAIT FOR ALL LSN '$lsn5', LSN '$lsn6', LSN '$lsn7'");
+$lsn_standby = $node_standby->safe_psql('postgres',
+ "SELECT pg_last_wal_replay_lsn()");
+$compare_lsns = $node_standby->safe_psql('postgres',
+ "SELECT pg_lsn_cmp('$lsn_standby'::pg_lsn, '$lsn7'::pg_lsn)");
+ok($compare_lsns eq 0,
+ "WAIT FOR ALL makes us reach the maximum LSN from the list");
+
+
+
+$node_standby->stop;
+$node_primary->stop;
+done_testing();
diff --git a/src/test/recovery/t/035_wait.pl b/src/test/recovery/t/035_wait.pl
new file mode 100644
index 0000000000..6f9d549416
--- /dev/null
+++ b/src/test/recovery/t/035_wait.pl
@@ -0,0 +1,145 @@
+# Checks WAIT FOR
+use strict;
+use warnings;
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Initialize primary node
+my $node_primary = PostgreSQL::Test::Cluster->new('primary');
+$node_primary->init(allows_streaming => 1);
+$node_primary->start;
+
+# And some content and take a backup
+$node_primary->safe_psql('postgres',
+ "CREATE TABLE wait_test AS SELECT generate_series(1,10) AS a");
+my $backup_name = 'my_backup';
+$node_primary->backup($backup_name);
+
+# Create a streaming standby with a 1 second delay from the backup
+my $node_standby = PostgreSQL::Test::Cluster->new('standby');
+my $delay = 1;
+$node_standby->init_from_backup($node_primary, $backup_name,
+ has_streaming => 1);
+$node_standby->append_conf('postgresql.conf', qq[
+ recovery_min_apply_delay = '${delay}s'
+]);
+$node_standby->start;
+
+
+
+# Make sure that WAIT FOR LSN works: add new content to primary and memorize
+# primary's new LSN, then wait for primary's LSN on standby. Prove that WAIT is
+# able to setup an infinite waiting loop and exit it if given no wait timeout.
+$node_primary->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(11, 20))");
+my $lsn1 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+$node_standby->safe_psql('postgres', "WAIT FOR LSN '$lsn1'");
+
+# Get the current LSN on standby and make sure it's the same as primary's LSN
+my $lsn_standby = $node_standby->safe_psql('postgres',
+ "SELECT pg_last_wal_replay_lsn()");
+my $compare_lsns = $node_standby->safe_psql('postgres',
+ "SELECT pg_lsn_cmp('$lsn_standby'::pg_lsn, '$lsn1'::pg_lsn)");
+ok($compare_lsns eq 0, "standby reached the same LSN as primary after WAIT");
+
+
+
+# Check that timeouts work on their own and let us wait for specified time (1s)
+my $current_time = $node_standby->safe_psql('postgres', "SELECT now()");
+my $one_second = 1000; # in milliseconds
+my $start_time = time();
+# While we're at it, also make sure that the syntax with commas works fine and
+# that by default we use WAIT FOR ALL strategy, which means waiting for max time
+$node_standby->safe_psql('postgres',
+ "WAIT FOR TIMEOUT $one_second, TIMESTAMP '$current_time'");
+my $time_waited = (time() - $start_time) * 1000; # convert to milliseconds
+ok($time_waited >= $one_second, "WAIT FOR TIMEOUT waits for enough time");
+
+# Now, check that timeouts work as expected when waiting for LSN
+$node_primary->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(21, 30))");
+my $lsn2 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+my $reached_lsn = $node_standby->safe_psql('postgres',
+ "WAIT FOR LSN '$lsn2' TIMEOUT 1");
+ok($reached_lsn eq "f", "WAIT doesn't reach LSN if given too little wait time");
+
+
+
+# We need to check that WAIT works fine inside transactions. For that, let's
+# get two LSNs that will correspond to two different max values in our table.
+$node_primary->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(31, 40))");
+my $lsn3 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+$node_primary->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(41, 50))");
+my $lsn4 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+
+# Before starting transaction, wait for LSN which ensures a max value of 40.
+# Inside the transaction, wait for LSN that ensures a max value of 50.
+# Due to ISOLATION LEVEL REPEATABLE READ, we should NOT see the new max value.
+my $standby_results = $node_standby->safe_psql(
+ 'postgres', qq[
+ WAIT FOR LSN '$lsn3';
+ BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;
+ SELECT max(a) FROM wait_test;
+ WAIT FOR LSN '$lsn4';
+ SELECT pg_last_wal_replay_lsn();
+ SELECT max(a) FROM wait_test;
+ COMMIT;
+]);
+
+# Make sure that we indeed reach primary's last LSN inside the transaction.
+# For that, check that calling pg_last_wal_replay_lsn returned that LSN.
+my $last_lsn_reached = $standby_results =~ /$lsn4/;
+ok($last_lsn_reached, "WAIT FOR LSN works inside a transaction");
+
+# Check that transaction doesn't break and show us the new max value after WAIT.
+# For that, make sure that the older max value is repeated twice in the results.
+my $count = () = $standby_results =~ /40/g;
+ok($count eq 2, "transaction isolation level doesn't get broken due to WAIT");
+
+
+
+# Get multiple LSNs for testing WAIT FOR ANY / WAIT FOR ALL
+$node_primary->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(51, 60))");
+my $lsn5 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+$node_primary->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(61, 70000))");
+my $lsn6 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+$node_primary->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(61, 800000))");
+my $lsn7 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+
+# Check that WAIT FOR ANY works fine
+$node_standby->safe_psql('postgres',
+ "WAIT FOR ANY LSN '$lsn5' LSN '$lsn6' LSN '$lsn7'");
+$lsn_standby = $node_standby->safe_psql('postgres',
+ "SELECT pg_last_wal_replay_lsn()");
+$compare_lsns = $node_standby->safe_psql('postgres',
+ "SELECT pg_lsn_cmp('$lsn_standby'::pg_lsn, '$lsn5'::pg_lsn)");
+ok($compare_lsns ge 0,
+ "WAIT FOR ANY makes us reach at least the minimum LSN from the list");
+$compare_lsns = $node_standby->safe_psql('postgres',
+ "SELECT pg_lsn_cmp('$lsn_standby'::pg_lsn, '$lsn7'::pg_lsn)");
+# TODO: Could this somehow fail due to the machine being very fast at applying LSN?
+ok($compare_lsns lt 0,
+ "WAIT FOR ANY didn't make us reach the maximum LSN from the list");
+
+# Check that WAIT FOR ALL works fine
+$node_standby->safe_psql('postgres',
+ "WAIT FOR ALL LSN '$lsn5', LSN '$lsn6', LSN '$lsn7'");
+$lsn_standby = $node_standby->safe_psql('postgres',
+ "SELECT pg_last_wal_replay_lsn()");
+$compare_lsns = $node_standby->safe_psql('postgres',
+ "SELECT pg_lsn_cmp('$lsn_standby'::pg_lsn, '$lsn7'::pg_lsn)");
+ok($compare_lsns eq 0,
+ "WAIT FOR ALL makes us reach the maximum LSN from the list");
+
+
+
+$node_standby->stop;
+$node_primary->stop;
+done_testing();
diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index dbe9394762..422bb1ed82 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -43,6 +43,7 @@
#include "backup/basebackup.h"
#include "catalog/pg_control.h"
#include "commands/tablespace.h"
+#include "commands/wait.h"
#include "common/file_utils.h"
#include "miscadmin.h"
#include "pgstat.h"
@@ -1752,6 +1753,15 @@ PerformWalRecovery(void)
break;
}
+ /*
+ * If we replayed an LSN that someone was waiting for,
+ * set latches in shared memory array to notify the waiter.
+ */
+ if (XLogRecoveryCtl->lastReplayedEndRecPtr >= GetMinWaitedLSN())
+ {
+ WaitSetLatch(XLogRecoveryCtl->lastReplayedEndRecPtr);
+ }
+
/* Else, try to fetch the next WAL record */
record = ReadRecord(xlogprefetcher, LOG, false, replayTLI);
} while (record != NULL);
diff --git a/src/backend/commands/Makefile b/src/backend/commands/Makefile
index 48f7348f91..d8f6965d8c 100644
--- a/src/backend/commands/Makefile
+++ b/src/backend/commands/Makefile
@@ -61,6 +61,7 @@ OBJS = \
vacuum.o \
vacuumparallel.o \
variable.o \
- view.o
+ view.o \
+ wait.o
include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/commands/wait.c b/src/backend/commands/wait.c
new file mode 100644
index 0000000000..3f94ef4ec3
--- /dev/null
+++ b/src/backend/commands/wait.c
@@ -0,0 +1,291 @@
+/*-------------------------------------------------------------------------
+ *
+ * wait.c
+ * Implements waitlsn, which allows waiting for events such as
+ * LSN having been replayed on replica.
+ *
+ * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
+ * Portions Copyright (c) 2020, Regents of PostgresPro
+ *
+ * IDENTIFICATION
+ * src/backend/commands/wait.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include <math.h>
+
+#include "access/xact.h"
+#include "access/xlogrecovery.h"
+#include "access/xlogdefs.h"
+#include "commands/wait.h"
+#include "funcapi.h"
+#include "miscadmin.h"
+#include "pgstat.h"
+#include "storage/backendid.h"
+#include "storage/pmsignal.h"
+#include "storage/proc.h"
+#include "storage/shmem.h"
+#include "storage/sinvaladt.h"
+#include "storage/spin.h"
+#include "utils/builtins.h"
+#include "utils/pg_lsn.h"
+#include "utils/timestamp.h"
+
+/* Add to shared memory array */
+static void AddWaitedLSN(XLogRecPtr lsn_to_wait);
+
+/* Shared memory structure */
+typedef struct
+{
+ int backend_maxid;
+ pg_atomic_uint64 min_lsn; /* XLogRecPtr of minimal waited for LSN */
+ slock_t mutex;
+ /* LSNs that different backends are waiting */
+ XLogRecPtr lsn[FLEXIBLE_ARRAY_MEMBER];
+} WaitState;
+
+static WaitState *state;
+
+/*
+ * Add the wait event of the current backend to shared memory array
+ */
+static void
+AddWaitedLSN(XLogRecPtr lsn_to_wait)
+{
+ SpinLockAcquire(&state->mutex);
+ if (state->backend_maxid < MyBackendId)
+ state->backend_maxid = MyBackendId;
+
+ state->lsn[MyBackendId] = lsn_to_wait;
+
+ if (lsn_to_wait < state->min_lsn.value)
+ state->min_lsn.value = lsn_to_wait;
+ SpinLockRelease(&state->mutex);
+}
+
+/*
+ * Delete wait event of the current backend from the shared memory array.
+ */
+void
+DeleteWaitedLSN(void)
+{
+ int i;
+ XLogRecPtr lsn_to_delete;
+
+ SpinLockAcquire(&state->mutex);
+
+ lsn_to_delete = state->lsn[MyBackendId];
+ state->lsn[MyBackendId] = InvalidXLogRecPtr;
+
+ /* If we are deleting the minimal LSN, then choose the next min_lsn */
+ if (lsn_to_delete != InvalidXLogRecPtr &&
+ lsn_to_delete == state->min_lsn.value)
+ {
+ state->min_lsn.value = PG_UINT64_MAX;
+ for (i = 2; i <= state->backend_maxid; i++)
+ if (state->lsn[i] != InvalidXLogRecPtr &&
+ state->lsn[i] < state->min_lsn.value)
+ state->min_lsn.value = state->lsn[i];
+ }
+
+ /* If deleting from the end of the array, shorten the array's used part */
+ if (state->backend_maxid == MyBackendId)
+ for (i = (MyBackendId); i >= 2; i--)
+ if (state->lsn[i] != InvalidXLogRecPtr)
+ {
+ state->backend_maxid = i;
+ break;
+ }
+
+ SpinLockRelease(&state->mutex);
+}
+
+/*
+ * Report amount of shared memory space needed for WaitState
+ */
+Size
+WaitShmemSize(void)
+{
+ Size size;
+
+ size = offsetof(WaitState, lsn);
+ size = add_size(size, mul_size(MaxBackends + 1, sizeof(XLogRecPtr)));
+ return size;
+}
+
+/*
+ * Initialize an array of events to wait for in shared memory
+ */
+void
+WaitShmemInit(void)
+{
+ bool found;
+ uint32 i;
+
+ state = (WaitState *) ShmemInitStruct("pg_wait_lsn",
+ WaitShmemSize(),
+ &found);
+ if (!found)
+ {
+ SpinLockInit(&state->mutex);
+
+ for (i = 0; i < (MaxBackends + 1); i++)
+ state->lsn[i] = InvalidXLogRecPtr;
+
+ state->backend_maxid = 0;
+ state->min_lsn.value = PG_UINT64_MAX;
+ }
+}
+
+/*
+ * Set latches in shared memory to signal that new LSN has been replayed
+ */
+void
+WaitSetLatch(XLogRecPtr cur_lsn)
+{
+ uint32 i;
+ int backend_maxid;
+ PGPROC *backend;
+
+ SpinLockAcquire(&state->mutex);
+ backend_maxid = state->backend_maxid;
+
+ for (i = 2; i <= backend_maxid; i++)
+ {
+ backend = BackendIdGetProc(i);
+
+ if (backend && state->lsn[i] != 0 &&
+ state->lsn[i] <= cur_lsn)
+ {
+ SetLatch(&backend->procLatch);
+ }
+ }
+ SpinLockRelease(&state->mutex);
+}
+
+/*
+ * Get minimal LSN that someone waits for
+ */
+XLogRecPtr
+GetMinWaitedLSN(void)
+{
+ return state->min_lsn.value;
+}
+
+/*
+ * On WAIT use a latch to wait till LSN is replayed,
+ * postmaster dies or timeout happens. Timeout is specified in milliseconds.
+ * Returns true if LSN was reached and false otherwise.
+ */
+bool
+WaitUtility(XLogRecPtr target_lsn, const int timeout_ms)
+{
+ XLogRecPtr cur_lsn = GetXLogReplayRecPtr(NULL);
+ int latch_events;
+ float8 endtime;
+ bool res = false;
+ bool wait_forever = (timeout_ms <= 0);
+
+ /*
+ * In transactions, that have isolation level repeatable read or higher
+ * waitlsn creates a snapshot if called first in a block, which can
+ * lead the transaction to working incorrectly
+ */
+
+ if (IsTransactionBlock() && XactIsoLevel != XACT_READ_COMMITTED) {
+ ereport(WARNING,
+ errmsg("Waitlsn may work incorrectly in this isolation level"),
+ errhint("Call waitlsn before starting the transaction"));
+ }
+
+ endtime = GetNowFloat() + timeout_ms / 1000.0;
+
+ latch_events = WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH;
+
+ /* Check if we already reached the needed LSN */
+ if (cur_lsn >= target_lsn)
+ return true;
+
+ AddWaitedLSN(target_lsn);
+
+ for (;;)
+ {
+ int rc;
+ float8 time_left = 0;
+ long time_left_ms = 0;
+
+ time_left = endtime - GetNowFloat();
+
+ /* Use 100 ms as the default timeout to check for interrupts */
+ if (wait_forever || time_left < 0 || time_left > 0.1)
+ time_left_ms = 100;
+ else
+ time_left_ms = (long) ceil(time_left * 1000.0);
+
+ /* If interrupt, LockErrorCleanup() will do DeleteWaitedLSN() for us */
+ CHECK_FOR_INTERRUPTS();
+
+ /* If postmaster dies, finish immediately */
+ if (!PostmasterIsAlive())
+ break;
+
+ rc = WaitLatch(MyLatch, latch_events, time_left_ms,
+ WAIT_EVENT_CLIENT_READ);
+
+ ResetLatch(MyLatch);
+
+ if (rc & WL_LATCH_SET)
+ cur_lsn = GetXLogReplayRecPtr(NULL);
+
+ if (rc & WL_TIMEOUT)
+ {
+ cur_lsn = GetXLogReplayRecPtr(NULL);
+ /* If the time specified by user has passed, stop waiting */
+ time_left = endtime - GetNowFloat();
+ if (!wait_forever && time_left <= 0.0)
+ break;
+ }
+
+ /* If LSN has been replayed */
+ if (target_lsn <= cur_lsn)
+ break;
+ }
+
+ DeleteWaitedLSN();
+
+ if (cur_lsn < target_lsn)
+ ereport(WARNING,
+ errmsg("LSN was not reached"),
+ errhint("Try to increase wait time."));
+ else
+ res = true;
+
+ return res;
+}
+
+Datum
+pg_waitlsn(PG_FUNCTION_ARGS)
+{
+ XLogRecPtr trg_lsn = PG_GETARG_LSN(0);
+ uint64_t delay = PG_GETARG_INT32(1);
+
+ PG_RETURN_BOOL(WaitUtility(trg_lsn, delay));
+}
+
+Datum
+pg_waitlsn_infinite(PG_FUNCTION_ARGS)
+{
+ XLogRecPtr trg_lsn = PG_GETARG_LSN(0);
+
+ PG_RETURN_BOOL(WaitUtility(trg_lsn, 0));
+}
+
+Datum
+pg_waitlsn_no_wait(PG_FUNCTION_ARGS)
+{
+ XLogRecPtr trg_lsn = PG_GETARG_LSN(0);
+
+ PG_RETURN_BOOL(WaitUtility(trg_lsn, 1));
+}
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index f6446da2d6..e148c0ea24 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -59,6 +59,7 @@
#include "access/xlogrecovery.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_type.h"
+#include "commands/wait.h"
#include "common/ip.h"
#include "funcapi.h"
#include "libpq/pqformat.h"
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 8f1ded7338..760d760356 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -25,6 +25,7 @@
#include "access/xlogprefetcher.h"
#include "access/xlogrecovery.h"
#include "commands/async.h"
+#include "commands/wait.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/autovacuum.h"
@@ -142,6 +143,7 @@ CalculateShmemSize(int *num_semaphores)
size = add_size(size, SyncScanShmemSize());
size = add_size(size, AsyncShmemSize());
size = add_size(size, StatsShmemSize());
+ size = add_size(size, WaitShmemSize());
#ifdef EXEC_BACKEND
size = add_size(size, ShmemBackendArraySize());
#endif
@@ -295,6 +297,11 @@ CreateSharedMemoryAndSemaphores(void)
AsyncShmemInit();
StatsShmemInit();
+ /*
+ * Init array of events for the wait clause in shared memory
+ */
+ WaitShmemInit();
+
#ifdef EXEC_BACKEND
/*
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index 22b4278610..313da0b1fc 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -36,6 +36,7 @@
#include "access/transam.h"
#include "access/twophase.h"
#include "access/xlogutils.h"
+#include "commands/wait.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/autovacuum.h"
@@ -704,6 +705,9 @@ LockErrorCleanup(void)
AbortStrongLockAcquire();
+ /* If waitlsn was interrupted, then stop waiting for that LSN */
+ DeleteWaitedLSN();
+
/* Nothing to do if we weren't waiting for a lock */
if (lockAwaited == NULL)
{
diff --git a/src/backend/utils/adt/misc.c b/src/backend/utils/adt/misc.c
index 220ddb8c01..fd731595b2 100644
--- a/src/backend/utils/adt/misc.c
+++ b/src/backend/utils/adt/misc.c
@@ -384,8 +384,6 @@ pg_sleep(PG_FUNCTION_ARGS)
* less than the specified time when WaitLatch is terminated early by a
* non-query-canceling signal such as SIGHUP.
*/
-#define GetNowFloat() ((float8) GetCurrentTimestamp() / 1000000.0)
-
endtime = GetNowFloat() + secs;
for (;;)
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 66b73c3900..99b0c02c34 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -11927,4 +11927,19 @@
prorettype => 'bytea', proargtypes => 'pg_brin_minmax_multi_summary',
prosrc => 'brin_minmax_multi_summary_send' },
+{ oid => '16387', descr => 'wait for LSN until timeout',
+ proname => 'pg_waitlsn', prorettype => 'bool', proargtypes => 'pg_lsn int8',
+ proargnames => '{trg_lsn,delay}',
+ prosrc => 'pg_waitlsn' },
+
+{ oid => '16388', descr => 'wait for LSN for an infinite time',
+ proname => 'pg_waitlsn_infinite', prorettype => 'bool', proargtypes => 'pg_lsn',
+ proargnames => '{trg_lsn}',
+ prosrc => 'pg_waitlsn_infinite' },
+
+{ oid => '16389', descr => 'wait for LSN with no timeout',
+ proname => 'pg_waitlsn_no_wait', prorettype => 'bool', proargtypes => 'pg_lsn',
+ proargnames => '{trg_lsn}',
+ prosrc => 'pg_waitlsn_no_wait' },
+
]
diff --git a/src/include/commands/wait.h b/src/include/commands/wait.h
new file mode 100644
index 0000000000..fd21e43416
--- /dev/null
+++ b/src/include/commands/wait.h
@@ -0,0 +1,26 @@
+/*-------------------------------------------------------------------------
+ *
+ * wait.h
+ * prototypes for commands/wait.c
+ *
+ * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
+ * Portions Copyright (c) 2020, Regents of PostgresPro
+ *
+ * src/include/commands/wait.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef WAIT_H
+#define WAIT_H
+#include "postgres.h"
+#include "tcop/dest.h"
+#include "nodes/parsenodes.h"
+
+extern bool WaitUtility(XLogRecPtr lsn, const int timeout_ms);
+extern Size WaitShmemSize(void);
+extern void WaitShmemInit(void);
+extern void WaitSetLatch(XLogRecPtr cur_lsn);
+extern XLogRecPtr GetMinWaitedLSN(void);
+extern void DeleteWaitedLSN(void);
+
+#endif /* WAIT_H */
diff --git a/src/include/utils/timestamp.h b/src/include/utils/timestamp.h
index edd59dc432..db2926f965 100644
--- a/src/include/utils/timestamp.h
+++ b/src/include/utils/timestamp.h
@@ -140,4 +140,6 @@ extern int date2isoyearday(int year, int mon, int mday);
extern bool TimestampTimestampTzRequiresRewrite(void);
+#define GetNowFloat() ((float8) GetCurrentTimestamp() / 1000000.0)
+
#endif /* TIMESTAMP_H */
diff --git a/src/test/recovery/t/034_waitlsn.pl b/src/test/recovery/t/034_waitlsn.pl
new file mode 100644
index 0000000000..54a20e089c
--- /dev/null
+++ b/src/test/recovery/t/034_waitlsn.pl
@@ -0,0 +1,74 @@
+# Checks waitlsn
+use strict;
+use warnings;
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Initialize primary node
+my $node_primary = PostgreSQL::Test::Cluster->new('primary');
+$node_primary->init(allows_streaming => 1);
+$node_primary->start;
+
+# And some content and take a backup
+$node_primary->safe_psql('postgres',
+ "CREATE TABLE wait_test AS SELECT generate_series(1,10) AS a");
+my $backup_name = 'my_backup';
+$node_primary->backup($backup_name);
+
+# Using the backup, create a streaming standby with a 1 second delay
+my $node_standby = PostgreSQL::Test::Cluster->new('standby');
+my $delay = 1;
+$node_standby->init_from_backup($node_primary, $backup_name,
+ has_streaming => 1);
+$node_standby->append_conf('postgresql.conf', qq[
+ recovery_min_apply_delay = '${delay}s'
+]);
+$node_standby->start;
+
+# Check that timeouts make us wait for the specified time (1s here)
+my $current_time = $node_standby->safe_psql('postgres', "SELECT now()");
+my $two_seconds = 2000; # in milliseconds
+my $start_time = time();
+$node_standby->safe_psql('postgres',
+ "SELECT pg_waitlsn('0/FFFFFFFF', $two_seconds)");
+my $time_waited = (time() - $start_time) * 1000; # convert to milliseconds
+ok($time_waited >= $two_seconds, "waitlsn waits for enough time");
+
+# Check that timeouts let us stop waiting right away, before reaching target LSN
+$node_primary->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(11, 20))");
+my $lsn1 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+my ($ret, $out, $err) = $node_standby->psql('postgres',
+ "SELECT pg_waitlsn('$lsn1', 1)");
+
+ok($ret == 0, "zero return value when failed to waitlsn on standby");
+ok($err =~ /WARNING: LSN was not reached/,
+ "correct error message when failed to waitlsn on standby");
+ok($out eq "f", "if given too little wait time, WAIT doesn't reach target LSN");
+
+
+# Check that waitlsn works fine and reaches target LSN if given no timeout
+
+# Add data on primary, memorize primary's last LSN
+$node_primary->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(21, 30))");
+my $lsn2 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+
+# Wait for it to appear on replica, memorize replica's last LSN
+$node_standby->safe_psql('postgres',
+ "SELECT pg_waitlsn_infinite('$lsn2')");
+my $reached_lsn = $node_standby->safe_psql('postgres',
+ "SELECT pg_last_wal_replay_lsn()");
+
+# Make sure that primary's and replica's LSNs are the same after WAIT
+my $compare_lsns = $node_standby->safe_psql('postgres',
+ "SELECT pg_lsn_cmp('$reached_lsn'::pg_lsn, '$lsn2'::pg_lsn)");
+ok($compare_lsns eq 0,
+ "standby reached the same LSN as primary before starting transaction");
+
+$node_standby->stop;
+$node_primary->stop;
+
+done_testing();
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 22ea42c16b..6413a661cf 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2964,6 +2964,7 @@ WaitEventIPC
WaitEventSet
WaitEventTimeout
WaitPMResult
+WaitState
WalCloseMethod
WalCompression
WalLevel