Fix build.meson troubles
--
Ivan Kartyshov
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company
diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index dbe9394762..422bb1ed82 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -43,6 +43,7 @@
#include "backup/basebackup.h"
#include "catalog/pg_control.h"
#include "commands/tablespace.h"
+#include "commands/wait.h"
#include "common/file_utils.h"
#include "miscadmin.h"
#include "pgstat.h"
@@ -1752,6 +1753,15 @@ PerformWalRecovery(void)
break;
}
+ /*
+ * If we replayed an LSN that someone was waiting for,
+ * set latches in shared memory array to notify the waiter.
+ */
+ if (XLogRecoveryCtl->lastReplayedEndRecPtr >= GetMinWaitedLSN())
+ {
+ WaitSetLatch(XLogRecoveryCtl->lastReplayedEndRecPtr);
+ }
+
/* Else, try to fetch the next WAL record */
record = ReadRecord(xlogprefetcher, LOG, false, replayTLI);
} while (record != NULL);
diff --git a/src/backend/commands/Makefile b/src/backend/commands/Makefile
index 48f7348f91..d8f6965d8c 100644
--- a/src/backend/commands/Makefile
+++ b/src/backend/commands/Makefile
@@ -61,6 +61,7 @@ OBJS = \
vacuum.o \
vacuumparallel.o \
variable.o \
- view.o
+ view.o \
+ wait.o
include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/commands/meson.build b/src/backend/commands/meson.build
index 42cced9ebe..ec6ab7722a 100644
--- a/src/backend/commands/meson.build
+++ b/src/backend/commands/meson.build
@@ -50,4 +50,5 @@ backend_sources += files(
'vacuumparallel.c',
'variable.c',
'view.c',
+ 'wait.c',
)
diff --git a/src/backend/commands/wait.c b/src/backend/commands/wait.c
new file mode 100644
index 0000000000..3465673f0a
--- /dev/null
+++ b/src/backend/commands/wait.c
@@ -0,0 +1,275 @@
+/*-------------------------------------------------------------------------
+ *
+ * wait.c
+ * Implements wait lsn, which allows waiting for events such as
+ * LSN having been replayed on replica.
+ *
+ * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
+ * Portions Copyright (c) 2020, Regents of PostgresPro
+ *
+ * IDENTIFICATION
+ * src/backend/commands/wait.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include <math.h>
+
+#include "access/xact.h"
+#include "access/xlogrecovery.h"
+#include "access/xlogdefs.h"
+#include "commands/wait.h"
+#include "funcapi.h"
+#include "miscadmin.h"
+#include "pgstat.h"
+#include "storage/backendid.h"
+#include "storage/pmsignal.h"
+#include "storage/proc.h"
+#include "storage/shmem.h"
+#include "storage/sinvaladt.h"
+#include "storage/spin.h"
+#include "utils/builtins.h"
+#include "utils/pg_lsn.h"
+#include "utils/timestamp.h"
+
+/* Add to shared memory array */
+static void AddWaitedLSN(XLogRecPtr lsn_to_wait);
+
+/* Shared memory structure */
+typedef struct
+{
+ int backend_maxid;
+ pg_atomic_uint64 min_lsn; /* XLogRecPtr of minimal waited for LSN */
+ slock_t mutex;
+ /* LSNs that different backends are waiting */
+ XLogRecPtr lsn[FLEXIBLE_ARRAY_MEMBER];
+} WaitState;
+
+static WaitState *state;
+
+/*
+ * Add the wait event of the current backend to shared memory array
+ */
+static void
+AddWaitedLSN(XLogRecPtr lsn_to_wait)
+{
+ SpinLockAcquire(&state->mutex);
+ if (state->backend_maxid < MyBackendId)
+ state->backend_maxid = MyBackendId;
+
+ state->lsn[MyBackendId] = lsn_to_wait;
+
+ if (lsn_to_wait < state->min_lsn.value)
+ state->min_lsn.value = lsn_to_wait;
+ SpinLockRelease(&state->mutex);
+}
+
+/*
+ * Delete wait event of the current backend from the shared memory array.
+ */
+void
+DeleteWaitedLSN(void)
+{
+ int i;
+ XLogRecPtr lsn_to_delete;
+
+ SpinLockAcquire(&state->mutex);
+
+ lsn_to_delete = state->lsn[MyBackendId];
+ state->lsn[MyBackendId] = InvalidXLogRecPtr;
+
+ /* If we are deleting the minimal LSN, then choose the next min_lsn */
+ if (lsn_to_delete != InvalidXLogRecPtr &&
+ lsn_to_delete == state->min_lsn.value)
+ {
+ state->min_lsn.value = PG_UINT64_MAX;
+ for (i = 2; i <= state->backend_maxid; i++)
+ if (state->lsn[i] != InvalidXLogRecPtr &&
+ state->lsn[i] < state->min_lsn.value)
+ state->min_lsn.value = state->lsn[i];
+ }
+
+ /* If deleting from the end of the array, shorten the array's used part */
+ if (state->backend_maxid == MyBackendId)
+ for (i = (MyBackendId); i >= 2; i--)
+ if (state->lsn[i] != InvalidXLogRecPtr)
+ {
+ state->backend_maxid = i;
+ break;
+ }
+
+ SpinLockRelease(&state->mutex);
+}
+
+/*
+ * Report amount of shared memory space needed for WaitState
+ */
+Size
+WaitShmemSize(void)
+{
+ Size size;
+
+ size = offsetof(WaitState, lsn);
+ size = add_size(size, mul_size(MaxBackends + 1, sizeof(XLogRecPtr)));
+ return size;
+}
+
+/*
+ * Initialize an array of events to wait for in shared memory
+ */
+void
+WaitShmemInit(void)
+{
+ bool found;
+ uint32 i;
+
+ state = (WaitState *) ShmemInitStruct("pg_wait_lsn",
+ WaitShmemSize(),
+ &found);
+ if (!found)
+ {
+ SpinLockInit(&state->mutex);
+
+ for (i = 0; i < (MaxBackends + 1); i++)
+ state->lsn[i] = InvalidXLogRecPtr;
+
+ state->backend_maxid = 0;
+ state->min_lsn.value = PG_UINT64_MAX;
+ }
+}
+
+/*
+ * Set latches in shared memory to signal that new LSN has been replayed
+ */
+void
+WaitSetLatch(XLogRecPtr cur_lsn)
+{
+ uint32 i;
+ int backend_maxid;
+ PGPROC *backend;
+
+ SpinLockAcquire(&state->mutex);
+ backend_maxid = state->backend_maxid;
+
+ for (i = 2; i <= backend_maxid; i++)
+ {
+ backend = BackendIdGetProc(i);
+
+ if (backend && state->lsn[i] != 0 &&
+ state->lsn[i] <= cur_lsn)
+ {
+ SetLatch(&backend->procLatch);
+ }
+ }
+ SpinLockRelease(&state->mutex);
+}
+
+/*
+ * Get minimal LSN that someone waits for
+ */
+XLogRecPtr
+GetMinWaitedLSN(void)
+{
+ return state->min_lsn.value;
+}
+
+/*
+ * On WAIT use a latch to wait till LSN is replayed,
+ * postmaster dies or timeout happens. Timeout is specified in milliseconds.
+ * Returns true if LSN was reached and false otherwise.
+ */
+bool
+WaitUtility(XLogRecPtr target_lsn, const int timeout_ms)
+{
+ XLogRecPtr cur_lsn = GetXLogReplayRecPtr(NULL);
+ int latch_events;
+ float8 endtime;
+ bool res = false;
+ bool wait_forever = (timeout_ms <= 0);
+
+ /*
+ * In transactions, that have isolation level repeatable read or higher
+ * wait lsn creates a snapshot if called first in a block, which can
+ * lead the transaction to working incorrectly
+ */
+
+ if (IsTransactionBlock() && XactIsoLevel != XACT_READ_COMMITTED) {
+ ereport(WARNING,
+ errmsg("Waitlsn may work incorrectly in this isolation level"),
+ errhint("Call wait lsn before starting the transaction"));
+ }
+
+ endtime = GetNowFloat() + timeout_ms / 1000.0;
+
+ latch_events = WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH;
+
+ /* Check if we already reached the needed LSN */
+ if (cur_lsn >= target_lsn)
+ return true;
+
+ AddWaitedLSN(target_lsn);
+
+ for (;;)
+ {
+ int rc;
+ float8 time_left = 0;
+ long time_left_ms = 0;
+
+ time_left = endtime - GetNowFloat();
+
+ /* Use 100 ms as the default timeout to check for interrupts */
+ if (wait_forever || time_left < 0 || time_left > 0.1)
+ time_left_ms = 100;
+ else
+ time_left_ms = (long) ceil(time_left * 1000.0);
+
+ /* If interrupt, LockErrorCleanup() will do DeleteWaitedLSN() for us */
+ CHECK_FOR_INTERRUPTS();
+
+ /* If postmaster dies, finish immediately */
+ if (!PostmasterIsAlive())
+ break;
+
+ rc = WaitLatch(MyLatch, latch_events, time_left_ms,
+ WAIT_EVENT_CLIENT_READ);
+
+ ResetLatch(MyLatch);
+
+ if (rc & WL_LATCH_SET)
+ cur_lsn = GetXLogReplayRecPtr(NULL);
+
+ if (rc & WL_TIMEOUT)
+ {
+ cur_lsn = GetXLogReplayRecPtr(NULL);
+ /* If the time specified by user has passed, stop waiting */
+ time_left = endtime - GetNowFloat();
+ if (!wait_forever && time_left <= 0.0)
+ break;
+ }
+
+ /* If LSN has been replayed */
+ if (target_lsn <= cur_lsn)
+ break;
+ }
+
+ DeleteWaitedLSN();
+
+ if (cur_lsn < target_lsn)
+ ereport(WARNING,
+ errmsg("LSN was not reached"),
+ errhint("Try to increase wait time."));
+ else
+ res = true;
+
+ return res;
+}
+
+Datum
+pg_wait_lsn(PG_FUNCTION_ARGS)
+{
+ XLogRecPtr trg_lsn = PG_GETARG_LSN(0);
+ uint64_t delay = PG_GETARG_INT32(1);
+
+ PG_RETURN_BOOL(WaitUtility(trg_lsn, delay));
+}
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 8f1ded7338..760d760356 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -25,6 +25,7 @@
#include "access/xlogprefetcher.h"
#include "access/xlogrecovery.h"
#include "commands/async.h"
+#include "commands/wait.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/autovacuum.h"
@@ -142,6 +143,7 @@ CalculateShmemSize(int *num_semaphores)
size = add_size(size, SyncScanShmemSize());
size = add_size(size, AsyncShmemSize());
size = add_size(size, StatsShmemSize());
+ size = add_size(size, WaitShmemSize());
#ifdef EXEC_BACKEND
size = add_size(size, ShmemBackendArraySize());
#endif
@@ -295,6 +297,11 @@ CreateSharedMemoryAndSemaphores(void)
AsyncShmemInit();
StatsShmemInit();
+ /*
+ * Init array of events for the wait clause in shared memory
+ */
+ WaitShmemInit();
+
#ifdef EXEC_BACKEND
/*
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index 22b4278610..1dec418992 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -36,6 +36,7 @@
#include "access/transam.h"
#include "access/twophase.h"
#include "access/xlogutils.h"
+#include "commands/wait.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/autovacuum.h"
@@ -704,6 +705,9 @@ LockErrorCleanup(void)
AbortStrongLockAcquire();
+ /* If wait lsn was interrupted, then stop waiting for that LSN */
+ DeleteWaitedLSN();
+
/* Nothing to do if we weren't waiting for a lock */
if (lockAwaited == NULL)
{
diff --git a/src/backend/utils/adt/misc.c b/src/backend/utils/adt/misc.c
index 5d78d6dc06..fbc96db198 100644
--- a/src/backend/utils/adt/misc.c
+++ b/src/backend/utils/adt/misc.c
@@ -384,8 +384,6 @@ pg_sleep(PG_FUNCTION_ARGS)
* less than the specified time when WaitLatch is terminated early by a
* non-query-canceling signal such as SIGHUP.
*/
-#define GetNowFloat() ((float8) GetCurrentTimestamp() / 1000000.0)
-
endtime = GetNowFloat() + secs;
for (;;)
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 505595620e..0979c25217 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -11931,6 +11931,11 @@
prorettype => 'bytea', proargtypes => 'pg_brin_minmax_multi_summary',
prosrc => 'brin_minmax_multi_summary_send' },
+{ oid => '16387', descr => 'wait for LSN until timeout',
+ proname => 'pg_wait_lsn', prorettype => 'bool', proargtypes => 'pg_lsn int8',
+ proargnames => '{trg_lsn,delay}',
+ prosrc => 'pg_wait_lsn' },
+
{ oid => '8981', descr => 'arbitrary value from among input values',
proname => 'any_value', prokind => 'a', proisstrict => 'f',
prorettype => 'anyelement', proargtypes => 'anyelement',
diff --git a/src/include/commands/wait.h b/src/include/commands/wait.h
new file mode 100644
index 0000000000..fd21e43416
--- /dev/null
+++ b/src/include/commands/wait.h
@@ -0,0 +1,26 @@
+/*-------------------------------------------------------------------------
+ *
+ * wait.h
+ * prototypes for commands/wait.c
+ *
+ * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
+ * Portions Copyright (c) 2020, Regents of PostgresPro
+ *
+ * src/include/commands/wait.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef WAIT_H
+#define WAIT_H
+#include "postgres.h"
+#include "tcop/dest.h"
+#include "nodes/parsenodes.h"
+
+extern bool WaitUtility(XLogRecPtr lsn, const int timeout_ms);
+extern Size WaitShmemSize(void);
+extern void WaitShmemInit(void);
+extern void WaitSetLatch(XLogRecPtr cur_lsn);
+extern XLogRecPtr GetMinWaitedLSN(void);
+extern void DeleteWaitedLSN(void);
+
+#endif /* WAIT_H */
diff --git a/src/include/utils/timestamp.h b/src/include/utils/timestamp.h
index edd59dc432..db2926f965 100644
--- a/src/include/utils/timestamp.h
+++ b/src/include/utils/timestamp.h
@@ -140,4 +140,6 @@ extern int date2isoyearday(int year, int mon, int mday);
extern bool TimestampTimestampTzRequiresRewrite(void);
+#define GetNowFloat() ((float8) GetCurrentTimestamp() / 1000000.0)
+
#endif /* TIMESTAMP_H */
diff --git a/src/test/recovery/t/034_waitlsn.pl b/src/test/recovery/t/034_waitlsn.pl
new file mode 100644
index 0000000000..dc9e899671
--- /dev/null
+++ b/src/test/recovery/t/034_waitlsn.pl
@@ -0,0 +1,76 @@
+# Checks wait lsn
+use strict;
+use warnings;
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Initialize primary node
+my $node_primary = PostgreSQL::Test::Cluster->new('primary');
+$node_primary->init(allows_streaming => 1);
+$node_primary->start;
+
+# And some content and take a backup
+$node_primary->safe_psql('postgres',
+ "CREATE TABLE wait_test AS SELECT generate_series(1,10) AS a");
+my $backup_name = 'my_backup';
+$node_primary->backup($backup_name);
+
+# Using the backup, create a streaming standby with a 1 second delay
+my $node_standby = PostgreSQL::Test::Cluster->new('standby');
+my $delay = 1;
+$node_standby->init_from_backup($node_primary, $backup_name,
+ has_streaming => 1);
+$node_standby->append_conf('postgresql.conf', qq[
+ recovery_min_apply_delay = '${delay}s'
+]);
+$node_standby->start;
+
+# Check that timeouts make us wait for the specified time (1s here)
+my $current_time = $node_standby->safe_psql('postgres', "SELECT now()");
+my $two_seconds = 2000; # in milliseconds
+my $start_time = time();
+$node_standby->safe_psql('postgres',
+ "SELECT pg_wait_lsn('0/FFFFFFFF', $two_seconds)");
+my $time_waited = (time() - $start_time) * 1000; # convert to milliseconds
+ok($time_waited >= $two_seconds, "wait lsn waits for enough time");
+
+# Check that timeouts let us stop waiting right away, before reaching target LSN
+# Wait for no wait
+$node_primary->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(11, 20))");
+my $lsn1 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+my ($ret, $out, $err) = $node_standby->psql('postgres',
+ "SELECT pg_wait_lsn('$lsn1', 1)");
+
+ok($ret == 0, "zero return value when failed to wait lsn on standby");
+ok($err =~ /WARNING: LSN was not reached/,
+ "correct error message when failed to wait lsn on standby");
+ok($out eq "f", "if given too little wait time, WAIT doesn't reach target LSN");
+
+
+# Check that wait lsn works fine and reaches target LSN if given no timeout
+# Wait for infinite
+
+# Add data on primary, memorize primary's last LSN
+$node_primary->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(21, 30))");
+my $lsn2 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+
+# Wait for it to appear on replica, memorize replica's last LSN
+$node_standby->safe_psql('postgres',
+ "SELECT pg_wait_lsn('$lsn2', 0)");
+my $reached_lsn = $node_standby->safe_psql('postgres',
+ "SELECT pg_last_wal_replay_lsn()");
+
+# Make sure that primary's and replica's LSNs are the same after WAIT
+my $compare_lsns = $node_standby->safe_psql('postgres',
+ "SELECT pg_lsn_cmp('$reached_lsn'::pg_lsn, '$lsn2'::pg_lsn)");
+ok($compare_lsns eq 0,
+ "standby reached the same LSN as primary before starting transaction");
+
+$node_standby->stop;
+$node_primary->stop;
+
+done_testing();
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 86a9303bf5..374f1bc267 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2968,6 +2968,7 @@ WaitEventIPC
WaitEventSet
WaitEventTimeout
WaitPMResult
+WaitState
WalCloseMethod
WalCompression
WalLevel