Alexander Korotkov писал 2017-10-23 13:19:
Despite code cleanup, you still have some random empty lines removals
in your patch.
I reconfigured my IDE to avoid this in the future.
--
Ivan Kartyshov
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company
diff --git a/doc/src/sgml/ref/allfiles.sgml b/doc/src/sgml/ref/allfiles.sgml
index 01acc2e..6792eb0 100644
--- a/doc/src/sgml/ref/allfiles.sgml
+++ b/doc/src/sgml/ref/allfiles.sgml
@@ -181,6 +181,7 @@ Complete list of usable sgml source files in this directory.
<!ENTITY update SYSTEM "update.sgml">
<!ENTITY vacuum SYSTEM "vacuum.sgml">
<!ENTITY values SYSTEM "values.sgml">
+<!ENTITY waitlsn SYSTEM "waitlsn.sgml">
<!-- applications and utilities -->
<!ENTITY clusterdb SYSTEM "clusterdb.sgml">
diff --git a/doc/src/sgml/ref/waitlsn.sgml b/doc/src/sgml/ref/waitlsn.sgml
new file mode 100644
index 0000000..6f389ca
--- /dev/null
+++ b/doc/src/sgml/ref/waitlsn.sgml
@@ -0,0 +1,144 @@
+<!--
+doc/src/sgml/ref/waitlsn.sgml
+PostgreSQL documentation
+-->
+
+<refentry id="SQL-WAITLSN">
+ <indexterm zone="sql-waitlsn">
+ <primary>WAITLSN</primary>
+ </indexterm>
+
+ <refmeta>
+ <refentrytitle>WAITLSN</refentrytitle>
+ <manvolnum>7</manvolnum>
+ <refmiscinfo>SQL - Language Statements</refmiscinfo>
+ </refmeta>
+
+ <refnamediv>
+ <refname>WAITLSN</refname>
+ <refpurpose>wait for the target <acronym>LSN</> to be replayed</refpurpose>
+ </refnamediv>
+
+ <refsynopsisdiv>
+<synopsis>
+WAITLSN '<replaceable class="PARAMETER">LSN</replaceable>' [ INFINITELY ]
+WAITLSN '<replaceable class="PARAMETER">LSN</replaceable>' TIMEOUT <replaceable class="PARAMETER">wait_time</replaceable>
+WAITLSN '<replaceable class="PARAMETER">LSN</replaceable>' NOWAIT
+</synopsis>
+ </refsynopsisdiv>
+
+ <refsect1>
+ <title>Description</title>
+
+ <para>
+ interprocess communication mechanism to wait for the target log sequence
+ number (<acronym>LSN</>) on standby in <productname>&productname;</productname>
+ databases with master-standby asynchronous replication. When run with the
+ <replaceable>LSN</replaceable> option, the <command>WAITLSN</command> command
+ waits for the specified <acronym>LSN</> to be replayed. By default, wait
+ time is unlimited. Waiting can be interrupted using <literal>Ctrl+C</>, or
+ by shutting down the <literal>postgres</> server. You can also limit the wait
+ time using the <option>TIMEOUT</> option, or check the target <acronym>LSN</>
+ status immediately using the <option>NOWAIT</> option.
+ </para>
+
+ </refsect1>
+
+ <refsect1>
+ <title>Parameters</title>
+
+ <variablelist>
+ <varlistentry>
+ <term><replaceable class="PARAMETER">LSN</replaceable></term>
+ <listitem>
+ <para>
+ Specify the target log sequence number to wait for.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>INFINITELY</term>
+ <listitem>
+ <para>
+ Wait until the target <acronym>LSN</> is replayed on standby.
+ This is an optional parameter reinforcing the default behavior.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ </refsect1>
+
+ <varlistentry>
+ <term>TIMEOUT <replaceable class="PARAMETER">wait_time</replaceable></term>
+ <listitem>
+ <para>
+ Limit the time to wait for the LSN to be replayed.
+ The specified <replaceable>wait_time</replaceable> must be an integer
+ and is measured in milliseconds.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term>NOWAIT</term>
+ <listitem>
+ <para>
+ Report whether the target <acronym>LSN</> has been replayed already,
+ without any waiting.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+
+ <refsect1>
+ <title>Examples</title>
+
+ <para>
+ Run <literal>WAITLSN</> from <application>psql</application>,
+ limiting wait time to 10000 milliseconds:
+
+<screen>
+WAITLSN '0/3F07A6B1' TIMEOUT 10000;
+NOTICE: LSN is not reached. Try to increase wait time.
+LSN reached
+-------------
+ f
+(1 row)
+</screen>
+ </para>
+
+ <para>
+ Wait until the specified <acronym>LSN</> is replayed:
+<screen>
+WAITLSN '0/3F07A611';
+LSN reached
+-------------
+ t
+(1 row)
+</screen>
+ </para>
+
+ <para>
+ Limit <acronym>LSN</> wait time to 500000 milliseconds, and then cancel the command:
+<screen>
+WAITLSN '0/3F0FF791' TIMEOUT 500000;
+^CCancel request sent
+NOTICE: LSN is not reached. Try to increase wait time.
+ERROR: canceling statement due to user request
+ LSN reached
+-------------
+ f
+(1 row)
+</screen>
+</para>
+ </refsect1>
+
+ <refsect1>
+ <title>Compatibility</title>
+
+ <para>
+ There is no <command>WAITLSN</command> statement in the SQL
+ standard.
+ </para>
+ </refsect1>
+</refentry>
diff --git a/doc/src/sgml/reference.sgml b/doc/src/sgml/reference.sgml
index 9000b3a..0c5951a 100644
--- a/doc/src/sgml/reference.sgml
+++ b/doc/src/sgml/reference.sgml
@@ -209,6 +209,7 @@
&update;
&vacuum;
&values;
+ &waitlsn;
</reference>
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index dd028a1..117cc9b 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -40,6 +40,7 @@
#include "catalog/pg_control.h"
#include "catalog/pg_database.h"
#include "commands/tablespace.h"
+#include "commands/waitlsn.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "port/atomics.h"
@@ -7312,6 +7312,15 @@ StartupXLOG(void)
break;
}
+ /*
+ * After update lastReplayedEndRecPtr set Latches in SHMEM array
+ */
+ if (XLogCtl->lastReplayedEndRecPtr >= GetMinWaitLSN())
+ {
+
+ WaitLSNSetLatch(XLogCtl->lastReplayedEndRecPtr);
+ }
+
/* Else, try to fetch the next WAL record */
record = ReadRecord(xlogreader, InvalidXLogRecPtr, LOG, false);
} while (record != NULL);
diff --git a/src/backend/commands/Makefile b/src/backend/commands/Makefile
index 4a6c99e..0d10117 100644
--- a/src/backend/commands/Makefile
+++ b/src/backend/commands/Makefile
@@ -20,6 +20,6 @@ OBJS = amcmds.o aggregatecmds.o alter.o analyze.o async.o cluster.o comment.o \
policy.o portalcmds.o prepare.o proclang.o publicationcmds.o \
schemacmds.o seclabel.o sequence.o statscmds.o subscriptioncmds.o \
tablecmds.o tablespace.o trigger.o tsearchcmds.o typecmds.o user.o \
- vacuum.o vacuumlazy.o variable.o view.o
+ vacuum.o vacuumlazy.o variable.o view.o waitlsn.o
include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/commands/waitlsn.c b/src/backend/commands/waitlsn.c
new file mode 100644
index 0000000..db2f549
--- /dev/null
+++ b/src/backend/commands/waitlsn.c
@@ -0,0 +1,273 @@
+/*-------------------------------------------------------------------------
+ *
+ * waitlsn.c
+ * WaitLSN statment: WAITLSN
+ *
+ * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
+ * Portions Copyright (c) 2017, Regents of PostgresPro
+ *
+ * IDENTIFICATION
+ * src/backend/commands/waitlsn.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+/*
+ * -------------------------------------------------------------------------
+ * Wait for LSN been replayed on slave
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+#include "fmgr.h"
+#include "pgstat.h"
+#include "utils/pg_lsn.h"
+#include "storage/latch.h"
+#include "miscadmin.h"
+#include "storage/spin.h"
+#include "storage/backendid.h"
+#include "access/xact.h"
+#include "storage/shmem.h"
+#include "storage/ipc.h"
+#include "utils/timestamp.h"
+#include "storage/pmsignal.h"
+#include "access/xlog.h"
+#include "access/xlogdefs.h"
+#include "commands/waitlsn.h"
+#include "storage/proc.h"
+#include "access/transam.h"
+#include "funcapi.h"
+#include "catalog/pg_type.h"
+#include "utils/builtins.h"
+
+/* Latches Own-DisownLatch and AbortCallBack */
+static uint32 WaitLSNShmemSize(void);
+static void WLDisownLatchAbort(XactEvent event, void *arg);
+static void WLOwnLatch(XLogRecPtr trg_lsn);
+static void WLDisownLatch(void);
+
+void _PG_init(void);
+
+/* Shared memory structures */
+typedef struct
+{
+ int pid;
+ volatile slock_t slock;
+ Latch latch;
+ XLogRecPtr trg_lsn;
+} BIDLatch;
+
+
+typedef struct
+{
+ int backend_maxid;
+ XLogRecPtr min_lsn;
+ BIDLatch l_arr[FLEXIBLE_ARRAY_MEMBER];
+} GlobState;
+
+static volatile GlobState *state;
+bool is_latch_owned = false;
+
+/* Take Latch for current backend at the begining of WAITLSN */
+static void
+WLOwnLatch(XLogRecPtr trg_lsn)
+{
+ SpinLockAcquire(&state->l_arr[MyBackendId].slock);
+ OwnLatch(&state->l_arr[MyBackendId].latch);
+ is_latch_owned = true;
+
+ if (state->backend_maxid < MyBackendId)
+ state->backend_maxid = MyBackendId;
+
+ state->l_arr[MyBackendId].pid = MyProcPid;
+ state->l_arr[MyBackendId].trg_lsn = trg_lsn;
+ SpinLockRelease(&state->l_arr[MyBackendId].slock);
+
+ if (trg_lsn < state->min_lsn)
+ state->min_lsn = trg_lsn;
+}
+
+/* Release Latch for current backend at the end of WAITLSN */
+static void
+WLDisownLatch(void)
+{
+ int i;
+ XLogRecPtr trg_lsn = state->l_arr[MyBackendId].trg_lsn;
+
+ SpinLockAcquire(&state->l_arr[MyBackendId].slock);
+ DisownLatch(&state->l_arr[MyBackendId].latch);
+ is_latch_owned = false;
+ state->l_arr[MyBackendId].pid = 0;
+ state->l_arr[MyBackendId].trg_lsn = InvalidXLogRecPtr;
+
+ /* Update state->min_lsn iff it is nessesary choosing next min_lsn */
+ if (state->min_lsn == trg_lsn)
+ {
+ state->min_lsn = PG_UINT64_MAX;
+ for (i = 2; i <= state->backend_maxid; i++)
+ if (state->l_arr[i].trg_lsn != InvalidXLogRecPtr &&
+ state->l_arr[i].trg_lsn < state->min_lsn)
+ state->min_lsn = state->l_arr[i].trg_lsn;
+ }
+
+ if (state->backend_maxid == MyBackendId)
+ for (i = (MaxConnections+1); i >=2; i--)
+ if (state->l_arr[i].pid != 0)
+ {
+ state->backend_maxid = i;
+ break;
+ }
+
+ SpinLockRelease(&state->l_arr[MyBackendId].slock);
+}
+
+/* CallBack function on abort*/
+static void
+WLDisownLatchAbort(XactEvent event, void *arg)
+{
+ if (is_latch_owned && (event == XACT_EVENT_PARALLEL_ABORT ||
+ event == XACT_EVENT_ABORT))
+ {
+ WLDisownLatch();
+ }
+}
+
+/* Module load callback */
+void
+_PG_init(void)
+{
+ if (!IsUnderPostmaster)
+ RegisterXactCallback(WLDisownLatchAbort, NULL);
+}
+
+/* Get size of shared memory to room GlobState */
+static uint32
+WaitLSNShmemSize(void)
+{
+ return offsetof(GlobState, l_arr) + sizeof(BIDLatch) * (MaxConnections+1);
+}
+
+/* Init array of Latches in shared memory */
+void
+WaitLSNShmemInit(void)
+{
+ bool found;
+ uint32 i;
+
+ state = (GlobState *) ShmemInitStruct("pg_wait_lsn",
+ WaitLSNShmemSize(),
+ &found);
+ if (!found)
+ {
+ for (i = 0; i < (MaxConnections+1); i++)
+ {
+ state->l_arr[i].pid = 0;
+ state->l_arr[i].trg_lsn = InvalidXLogRecPtr;
+ SpinLockInit(&state->l_arr[i].slock);
+ InitSharedLatch(&state->l_arr[i].latch);
+ }
+ state->backend_maxid = 0;
+ state->min_lsn = PG_UINT64_MAX;
+ }
+}
+
+/* Set all Latches in shared memory cause new LSN been replayed*/
+void
+WaitLSNSetLatch(XLogRecPtr cur_lsn)
+{
+ uint32 i;
+
+ for (i = 2; i <= state->backend_maxid; i++)
+ {
+ if (state->l_arr[i].trg_lsn != 0)
+ {
+ SpinLockAcquire(&state->l_arr[i].slock);
+ if (state->l_arr[i].trg_lsn <= cur_lsn)
+ SetLatch(&state->l_arr[i].latch);
+ SpinLockRelease(&state->l_arr[i].slock);
+ }
+ }
+}
+
+/* Get minimal LSN that will be next */
+XLogRecPtr
+GetMinWaitLSN(void)
+{
+ return state->min_lsn;
+}
+
+/*
+ * On WAITLSN own latch and wait till LSN is replayed, Postmaster death, interruption
+ * or timeout.
+ */
+void
+WaitLSNUtility(const char *lsn, const int delay, DestReceiver *dest)
+{
+ XLogRecPtr trg_lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in, CStringGetDatum(lsn)));
+ XLogRecPtr cur_lsn;
+ int latch_events;
+ uint64 tdelay = delay;
+ long secs;
+ int microsecs;
+ TimestampTz timer = GetCurrentTimestamp();
+ TupOutputState *tstate;
+ TupleDesc tupdesc;
+ char *value = "f";
+
+ if (delay > 0)
+ latch_events = WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH;
+ else
+ latch_events = WL_LATCH_SET | WL_POSTMASTER_DEATH;
+
+ WLOwnLatch(trg_lsn);
+
+ for (;;)
+ {
+ cur_lsn = GetXLogReplayRecPtr(NULL);
+
+ /* If LSN had been Replayed */
+ if (trg_lsn <= cur_lsn)
+ break;
+
+ /* If the postmaster dies, finish immediately */
+ if (!PostmasterIsAlive())
+ break;
+
+ /* If Delay time is over */
+ if (latch_events & WL_TIMEOUT)
+ {
+ if (TimestampDifferenceExceeds(timer,GetCurrentTimestamp(),delay))
+ break;
+ TimestampDifference(timer,GetCurrentTimestamp(),&secs, µsecs);
+ tdelay = delay - (secs*1000 + microsecs/1000);
+ }
+
+ MyPgXact->xmin = InvalidTransactionId;
+ WaitLatch(&state->l_arr[MyBackendId].latch, latch_events, tdelay, WAIT_EVENT_CLIENT_READ);
+ ResetLatch(&state->l_arr[MyBackendId].latch);
+
+ /* CHECK_FOR_INTERRUPTS if they comes then disown latch current */
+ if (InterruptPending)
+ {
+ WLDisownLatch();
+ ProcessInterrupts();
+ }
+
+ }
+
+ WLDisownLatch();
+
+ if (trg_lsn > cur_lsn)
+ elog(NOTICE,"LSN is not reached. Try to increase wait time.");
+ else
+ value = "t";
+
+ /* need a tuple descriptor representing a single TEXT column */
+ tupdesc = CreateTemplateTupleDesc(1, false);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 1, "LSN reached", TEXTOID, -1, 0);
+ /* prepare for projection of tuples */
+ tstate = begin_tup_output_tupdesc(dest, tupdesc);
+ /* Send it */
+ do_text_output_oneline(tstate, value);
+ end_tup_output(tstate);
+}
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 4c83a63..a149b54 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -275,7 +275,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
SecLabelStmt SelectStmt TransactionStmt TruncateStmt
UnlistenStmt UpdateStmt VacuumStmt
VariableResetStmt VariableSetStmt VariableShowStmt
- ViewStmt CheckPointStmt CreateConversionStmt
+ ViewStmt WaitLSNStmt CheckPointStmt CreateConversionStmt
DeallocateStmt PrepareStmt ExecuteStmt
DropOwnedStmt ReassignOwnedStmt
AlterTSConfigurationStmt AlterTSDictionaryStmt
@@ -322,7 +322,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
%type <list> OptSchemaEltList
%type <boolean> TriggerForSpec TriggerForType
-%type <ival> TriggerActionTime
+%type <ival> TriggerActionTime WaitDelay
%type <list> TriggerEvents TriggerOneEvent
%type <value> TriggerFuncArg
%type <node> TriggerWhen
@@ -636,7 +636,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
HANDLER HAVING HEADER_P HOLD HOUR_P
IDENTITY_P IF_P ILIKE IMMEDIATE IMMUTABLE IMPLICIT_P IMPORT_P IN_P
- INCLUDING INCREMENT INDEX INDEXES INHERIT INHERITS INITIALLY INLINE_P
+ INCLUDING INCREMENT INDEX INDEXES INFINITELY INHERIT INHERITS INITIALLY INLINE_P
INNER_P INOUT INPUT_P INSENSITIVE INSERT INSTEAD INT_P INTEGER
INTERSECT INTERVAL INTO INVOKER IS ISNULL ISOLATION
@@ -675,7 +675,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
SUBSCRIPTION SUBSTRING SYMMETRIC SYSID SYSTEM_P
TABLE TABLES TABLESAMPLE TABLESPACE TEMP TEMPLATE TEMPORARY TEXT_P THEN
- TIME TIMESTAMP TO TRAILING TRANSACTION TRANSFORM TREAT TRIGGER TRIM TRUE_P
+ TIME TIMEOUT TIMESTAMP TO TRAILING TRANSACTION TRANSFORM TREAT TRIGGER TRIM TRUE_P
TRUNCATE TRUSTED TYPE_P TYPES_P
UNBOUNDED UNCOMMITTED UNENCRYPTED UNION UNIQUE UNKNOWN UNLISTEN UNLOGGED
@@ -684,7 +684,8 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
VACUUM VALID VALIDATE VALIDATOR VALUE_P VALUES VARCHAR VARIADIC VARYING
VERBOSE VERSION_P VIEW VIEWS VOLATILE
- WHEN WHERE WHITESPACE_P WINDOW WITH WITHIN WITHOUT WORK WRAPPER WRITE
+ WAITLSN WHEN WHERE WHITESPACE_P WINDOW
+ WITH WITHIN WITHOUT WORK WRAPPER WRITE
XML_P XMLATTRIBUTES XMLCONCAT XMLELEMENT XMLEXISTS XMLFOREST XMLNAMESPACES
XMLPARSE XMLPI XMLROOT XMLSERIALIZE XMLTABLE
@@ -935,6 +936,7 @@ stmt :
| VariableSetStmt
| VariableShowStmt
| ViewStmt
+ | WaitLSNStmt
| /*EMPTY*/
{ $$ = NULL; }
;
@@ -13831,6 +13833,44 @@ frame_bound:
}
;
+/*****************************************************************************
+ *
+ * QUERY:
+ * WAITLSN <LSN> can appear as a query-level command
+ *
+ *
+ *****************************************************************************/
+
+WaitLSNStmt:
+ WAITLSN Sconst
+ {
+ WaitLSNStmt *n = makeNode(WaitLSNStmt);
+ n->lsn = $2;
+ n->delay = 0;
+ $$ = (Node *)n;
+ }
+ | WAITLSN Sconst TIMEOUT Iconst
+ {
+ WaitLSNStmt *n = makeNode(WaitLSNStmt);
+ n->lsn = $2;
+ n->delay = $4;
+ $$ = (Node *)n;
+ }
+ | WAITLSN Sconst INFINITELY
+ {
+ WaitLSNStmt *n = makeNode(WaitLSNStmt);
+ n->lsn = $2;
+ n->delay = 0;
+ $$ = (Node *)n;
+ }
+ | WAITLSN Sconst NOWAIT
+ {
+ WaitLSNStmt *n = makeNode(WaitLSNStmt);
+ n->lsn = $2;
+ n->delay = 1;
+ $$ = (Node *)n;
+ }
+ ;
/*
* Supporting nonterminals for expressions.
@@ -14705,6 +14745,7 @@ unreserved_keyword:
| INCREMENT
| INDEX
| INDEXES
+ | INFINITELY
| INHERIT
| INHERITS
| INLINE_P
@@ -14843,6 +14884,7 @@ unreserved_keyword:
| TEMPLATE
| TEMPORARY
| TEXT_P
+ | TIMEOUT
| TRANSACTION
| TRANSFORM
| TRIGGER
@@ -14868,6 +14910,7 @@ unreserved_keyword:
| VIEW
| VIEWS
| VOLATILE
+ | WAITLSN
| WHITESPACE_P
| WITHIN
| WITHOUT
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 2d1ed14..932136f 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -22,6 +22,7 @@
#include "access/subtrans.h"
#include "access/twophase.h"
#include "commands/async.h"
+#include "commands/waitlsn.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/autovacuum.h"
@@ -271,6 +272,11 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
AsyncShmemInit();
BackendRandomShmemInit();
+ /*
+ * Init array of Latches in SHMEM for WAITLSN
+ */
+ WaitLSNShmemInit();
+
#ifdef EXEC_BACKEND
/*
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 82a707a..544baeb 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -56,6 +56,7 @@
#include "commands/user.h"
#include "commands/vacuum.h"
#include "commands/view.h"
+#include "commands/waitlsn.h"
#include "miscadmin.h"
#include "parser/parse_utilcmd.h"
#include "postmaster/bgwriter.h"
@@ -923,6 +924,20 @@ standard_ProcessUtility(PlannedStmt *pstmt,
break;
}
+ case T_WaitLSNStmt:
+ {
+ WaitLSNStmt *stmt = (WaitLSNStmt *) parsetree;
+ if (!RecoveryInProgress())
+ {
+ ereport(ERROR,(errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
+ errmsg("cannot execute %s not during recovery",
+ "WaitLSN")));
+ }
+ else
+ WaitLSNUtility(stmt->lsn, stmt->delay, dest);
+ }
+ break;
+
default:
/* All other statement types have event trigger support */
ProcessUtilitySlow(pstate, pstmt, queryString,
@@ -2481,6 +2496,10 @@ CreateCommandTag(Node *parsetree)
tag = "NOTIFY";
break;
+ case T_WaitLSNStmt:
+ tag = "WAITLSN";
+ break;
+
case T_ListenStmt:
tag = "LISTEN";
break;
@@ -3104,6 +3123,10 @@ GetCommandLogLevel(Node *parsetree)
lev = LOGSTMT_ALL;
break;
+ case T_WaitLSNStmt:
+ lev = LOGSTMT_ALL;
+ break;
+
case T_ListenStmt:
lev = LOGSTMT_ALL;
break;
diff --git a/src/include/commands/waitlsn.h b/src/include/commands/waitlsn.h
new file mode 100644
index 0000000..49cf9e8
--- /dev/null
+++ b/src/include/commands/waitlsn.h
@@ -0,0 +1,22 @@
+/*-------------------------------------------------------------------------
+ *
+ * waitlsn.h
+ * WaitLSN notification: WAITLSN
+ *
+ * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
+ * Portions Copyright (c) 2016, Regents of PostgresPRO
+ *
+ * src/include/commands/waitlsn.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef WAITLSN_H
+#define WAITLSN_H
+#include "tcop/dest.h"
+
+extern void WaitLSNUtility(const char *lsn, const int delay, DestReceiver *dest);
+extern void WaitLSNShmemInit(void);
+extern void WaitLSNSetLatch(XLogRecPtr cur_lsn);
+extern XLogRecPtr GetMinWaitLSN(void);
+
+#endif /* WAITLSN_H */
diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h
index ffeeb49..201677b 100644
--- a/src/include/nodes/nodes.h
+++ b/src/include/nodes/nodes.h
@@ -479,6 +479,7 @@ typedef enum NodeTag
T_DropReplicationSlotCmd,
T_StartReplicationCmd,
T_TimeLineHistoryCmd,
+ T_WaitLSNStmt,
T_SQLCmd,
/*
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 732e5d6..55ffda8 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3446,4 +3446,16 @@ typedef struct DropSubscriptionStmt
DropBehavior behavior; /* RESTRICT or CASCADE behavior */
} DropSubscriptionStmt;
+/* ----------------------
+ * WaitLSN Statement
+ * ----------------------
+ */
+typedef struct WaitLSNStmt
+{
+ NodeTag type;
+ char *lsn; /* Taraget LSN to wait for */
+ int delay; /* Delay to wait for LSN*/
+ bool nowait; /* No wait for LSN just result*/
+} WaitLSNStmt;
+
#endif /* PARSENODES_H */
diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h
index f50e45e..618cdb2 100644
--- a/src/include/parser/kwlist.h
+++ b/src/include/parser/kwlist.h
@@ -198,6 +198,7 @@ PG_KEYWORD("including", INCLUDING, UNRESERVED_KEYWORD)
PG_KEYWORD("increment", INCREMENT, UNRESERVED_KEYWORD)
PG_KEYWORD("index", INDEX, UNRESERVED_KEYWORD)
PG_KEYWORD("indexes", INDEXES, UNRESERVED_KEYWORD)
+PG_KEYWORD("infinitely", INFINITELY, UNRESERVED_KEYWORD)
PG_KEYWORD("inherit", INHERIT, UNRESERVED_KEYWORD)
PG_KEYWORD("inherits", INHERITS, UNRESERVED_KEYWORD)
PG_KEYWORD("initially", INITIALLY, RESERVED_KEYWORD)
@@ -394,6 +395,7 @@ PG_KEYWORD("temporary", TEMPORARY, UNRESERVED_KEYWORD)
PG_KEYWORD("text", TEXT_P, UNRESERVED_KEYWORD)
PG_KEYWORD("then", THEN, RESERVED_KEYWORD)
PG_KEYWORD("time", TIME, COL_NAME_KEYWORD)
+PG_KEYWORD("timeout", TIMEOUT, UNRESERVED_KEYWORD)
PG_KEYWORD("timestamp", TIMESTAMP, COL_NAME_KEYWORD)
PG_KEYWORD("to", TO, RESERVED_KEYWORD)
PG_KEYWORD("trailing", TRAILING, RESERVED_KEYWORD)
@@ -433,6 +435,7 @@ PG_KEYWORD("version", VERSION_P, UNRESERVED_KEYWORD)
PG_KEYWORD("view", VIEW, UNRESERVED_KEYWORD)
PG_KEYWORD("views", VIEWS, UNRESERVED_KEYWORD)
PG_KEYWORD("volatile", VOLATILE, UNRESERVED_KEYWORD)
+PG_KEYWORD("waitlsn", WAITLSN, UNRESERVED_KEYWORD)
PG_KEYWORD("when", WHEN, RESERVED_KEYWORD)
PG_KEYWORD("where", WHERE, RESERVED_KEYWORD)
PG_KEYWORD("whitespace", WHITESPACE_P, UNRESERVED_KEYWORD)
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers