Rebase done.

Meanwhile I made some more changes.

Changes
=======
1) WAITLSN is now implemented as an extension called "pg_waitlsn"

2) Call new hook "lsn_updated_hook" right after xact_redo_commit (xlog.c)

3) Corresponding functions:
pg_waitlsn('0/693FF800', 10000) - wait 10 seconds
pg_waitlsn_infinite('0/693FF800') - for infinite wait
pg_waitlsn_no_wait('0/693FF800') - once check if LSN was replayed or not.

4) Add two GUCs which help tuning influence on StartupXLOG:
count_waitlsn (denominator to check not each LSN)
int count_waitlsn    = 10;

interval_waitlsn (Interval in milliseconds to additional LSN check)
int interval_waitlsn = 100;

5) Optimize loop that set latches.

How to use it
==========
Master:
1) Make "wal_level = replica"
Slave:
2) Add  shared_preload_libraries = 'pg_waitlsn'
        hot_standby = on (in postgresql.conf)
3) Create extension pg_waitlsn;
4) And in hot_standby you can wait for LSN (pgsleep), when LSN will replayed on slave pg_waitlsn will release

select pg_waitlsn(‘LSN’ [, timeout in ms]);
select pg_waitlsn_infinite(‘LSN’);
select pg_waitlsn_no_wait(‘LSN’);

#Wait until LSN 0/303EC60 will be replayed, or 10 second passed.
select pg_waitlsn(‘0/303EC60’, 10000);

#Or same without timeout.
select pg_waitlsn(‘0/303EC60’);
select pg_waitlsn_infinite('0/693FF800');

#To check if LSN is replayed can be used.
select pg_waitlsn_no_wait('0/693FF800');

Notice: select pg_waitlsn will release on PostmasterDeath or Interruption events if they come earlier then target LSN or timeout.

--
Ivan Kartyshov
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company
diff --git a/contrib/pg_waitlsn/Makefile b/contrib/pg_waitlsn/Makefile
new file mode 100644
index 0000000..49a326c
--- /dev/null
+++ b/contrib/pg_waitlsn/Makefile
@@ -0,0 +1,21 @@
+# pg_waitlsn/Makefile
+
+MODULE_big = pg_waitlsn
+OBJS = pg_waitlsn.o
+EXTENSION = pg_waitlsn
+DATA = pg_waitlsn--1.0.sql
+
+
+
+ifdef USE_PGXS
+
+	PG_CONFIG = pg_config
+	PGXS := $( shell $( PG_CONFIG ) --pgxs )
+	include $(PGXS)
+else
+
+	subdir = contrib/pg_waitlsn
+	top_builddir = ../..
+	include $(top_builddir)/src/Makefile.global
+	include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/contrib/pg_waitlsn/pg_waitlsn--1.0.sql b/contrib/pg_waitlsn/pg_waitlsn--1.0.sql
new file mode 100644
index 0000000..8b251f3
--- /dev/null
+++ b/contrib/pg_waitlsn/pg_waitlsn--1.0.sql
@@ -0,0 +1,19 @@
+/* contrib/pg_waitlsn/pg_waitlsn--1.0.sql */
+
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "CREATE EXTENSION pg_waitlsn" to wait target LSN to been replayed, delay for waiting in miliseconds (default infinity) \quit
+
+CREATE FUNCTION pg_waitlsn(lsn pg_lsn, delay int default 0)
+RETURNS bool
+AS 'MODULE_PATHNAME', 'pg_waitlsn'
+LANGUAGE C IMMUTABLE STRICT ;
+
+CREATE FUNCTION pg_waitlsn_infinite(lsn pg_lsn)
+RETURNS bool
+AS 'MODULE_PATHNAME', 'pg_waitlsn_infinite'
+LANGUAGE C IMMUTABLE STRICT ;
+
+CREATE FUNCTION pg_waitlsn_no_wait(lsn pg_lsn)
+RETURNS bool
+AS 'MODULE_PATHNAME', 'pg_waitlsn_no_wait'
+LANGUAGE C IMMUTABLE STRICT ;
diff --git a/contrib/pg_waitlsn/pg_waitlsn.c b/contrib/pg_waitlsn/pg_waitlsn.c
new file mode 100644
index 0000000..d210678
--- /dev/null
+++ b/contrib/pg_waitlsn/pg_waitlsn.c
@@ -0,0 +1,299 @@
+/*-------------------------------------------------------------------------
+ *
+ * pg_waitlsn
+ *
+ * Portions Copyright (c) 2012-2017, PostgresPro Global Development Group
+ *
+ * IDENTIFICATION
+ *		  contrib/pg_waitlsn/pg_waitlsn.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+#include "fmgr.h"
+#include "pgstat.h"
+#include "access/xlog.h"
+#include "utils/pg_lsn.h"
+#include "storage/latch.h"
+#include "miscadmin.h"
+#include "storage/spin.h"
+#include "storage/backendid.h"
+#include "access/xact.h"
+#include "storage/shmem.h"
+#include "storage/ipc.h"
+#include "utils/timestamp.h"
+#include "storage/pmsignal.h"
+#include "storage/proc.h"
+#include "access/transam.h"
+#include "utils/guc.h"
+
+PG_MODULE_MAGIC;
+
+static bool pg_waitlsn_internal(XLogRecPtr lsn, uint64_t delay);
+
+/* Hooks values */
+static lsn_updated_hook_type prev_lsn_updated_hook = NULL;
+static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
+static void wl_lsn_updated_hook(void);
+static uint32 estimate_shmem_size(void);
+
+/* Latches Own-DisownLatch and AbortCаllBack */
+static void disown_latches_on_abort(XactEvent event, void *arg);
+static void wl_own_latch(void);
+static void wl_disown_latch(void);
+
+/* GUC variable */
+int				count_waitlsn = 10;
+int				interval_waitlsn = 100;
+
+/* Globals */
+TimestampTz		time_waitlsn = 0;
+int				counter_waitlsn = 0;
+
+void			_PG_init(void);
+
+/* Shared memory structures */
+typedef struct
+{
+	int					pid;
+	volatile slock_t	slock;
+	Latch				latch;
+} BIDLatch;
+
+typedef struct
+{
+	char		dummy;
+	int			backend_maxid;
+	BIDLatch	l_arr[FLEXIBLE_ARRAY_MEMBER];
+} GlobState;
+
+static volatile GlobState  *state;
+bool						is_latch_owned = false;
+
+static uint32
+estimate_shmem_size(void)
+{
+	return offsetof(GlobState, l_arr) + sizeof(BIDLatch) * (MaxConnections+1);
+}
+
+static void
+wl_own_latch(void)
+{
+	SpinLockAcquire(&state->l_arr[MyBackendId].slock);
+	OwnLatch(&state->l_arr[MyBackendId].latch);
+	is_latch_owned = true;
+
+	if (state->backend_maxid < MyBackendId)
+		state->backend_maxid = MyBackendId;
+
+	state->l_arr[MyBackendId].pid = MyProcPid;
+	SpinLockRelease(&state->l_arr[MyBackendId].slock);
+}
+
+static void
+wl_disown_latch(void)
+{
+	int		i;
+	SpinLockAcquire(&state->l_arr[MyBackendId].slock);
+	DisownLatch(&state->l_arr[MyBackendId].latch);
+	is_latch_owned = false;
+	state->l_arr[MyBackendId].pid = 0;
+
+	if (state->backend_maxid == MyBackendId)
+		for (i = (MaxConnections+1); i >=0; i--)
+			if (state->l_arr[i].pid != 0)
+			{
+				state->backend_maxid = i;
+				break;
+			}
+
+	SpinLockRelease(&state->l_arr[MyBackendId].slock);
+}
+
+/* CallBack function */
+static void
+disown_latches_on_abort(XactEvent event, void *arg)
+{
+	if (is_latch_owned && (event == XACT_EVENT_PARALLEL_ABORT ||
+						   event == XACT_EVENT_ABORT))
+	{
+		wl_disown_latch();
+	}
+}
+
+/*
+ * Distribute shared memor, initlocks and latches.
+ */
+static void
+qs_shmem_startup(void)
+{
+	bool	found;
+	uint	i;
+
+	state = (GlobState *) ShmemInitStruct("pg_wait_lsn",
+										  estimate_shmem_size(),
+										  &found);
+	if (!found)
+	{
+		for (i = 0; i < (MaxConnections+1); i++)
+		{
+			state->l_arr[i].pid = 0;
+			SpinLockInit(&state->l_arr[i].slock);
+			InitSharedLatch(&state->l_arr[i].latch);
+		}
+	}
+	if (prev_shmem_startup_hook)
+		prev_shmem_startup_hook();
+}
+
+/* Module load callback */
+void
+_PG_init(void)
+{
+	if (!process_shared_preload_libraries_in_progress)
+		return;
+
+	time_waitlsn = GetCurrentTimestamp();
+
+	RequestAddinShmemSpace(sizeof(GlobState));
+
+	/* Define interval_waitlsn */
+	DefineCustomIntVariable(
+		"interval_waitlsn",
+
+		"Set interval of time (ms) how often LSN will be checked.",
+
+		"Set interval of time (ms) how often LSN will be checked to "
+		"make less influence on StartupXLOG() process.",
+		&interval_waitlsn,
+		100, 0, INT_MAX,
+		PGC_SUSET,
+		GUC_UNIT_MS,
+		NULL, NULL, NULL);
+
+	/* Define count_waitlsn */
+	DefineCustomIntVariable(
+		"count_waitlsn",
+
+		"How often LSN will be checked.",
+
+		"Set count of LSNs that will be passed befor LSN check to "
+		"make less influence on StartupXLOG() process.",
+		&count_waitlsn,
+		10, 1, INT_MAX,
+		PGC_SUSET,
+		GUC_NOT_IN_SAMPLE,
+		NULL, NULL, NULL);
+
+	prev_lsn_updated_hook = lsn_updated_hook;
+	lsn_updated_hook = wl_lsn_updated_hook;
+
+	prev_shmem_startup_hook = shmem_startup_hook;
+	shmem_startup_hook = qs_shmem_startup;
+
+	if (!IsUnderPostmaster)
+		RegisterXactCallback(disown_latches_on_abort, NULL);
+}
+
+/* Hook function */
+static void
+wl_lsn_updated_hook(void)
+{
+	uint i;
+	/*
+	 * After update lastReplayedEndRecPtr set Latches in SHMEM array
+	 */
+	if (counter_waitlsn % count_waitlsn == 0
+		|| TimestampDifferenceExceeds(time_waitlsn,GetCurrentTimestamp(),interval_waitlsn))
+	{
+		for (i = 0; i <= state->backend_maxid; i++)
+		{
+			SpinLockAcquire(&state->l_arr[i].slock);
+			if (state->l_arr[i].pid != 0)
+				SetLatch(&state->l_arr[i].latch);
+			SpinLockRelease(&state->l_arr[i].slock);
+		}
+		elog(DEBUG2,"WAITLSN  - %d / %s", counter_waitlsn, timestamptz_to_str(GetCurrentTimestamp()));
+		time_waitlsn = GetCurrentTimestamp();
+	}
+	counter_waitlsn++;
+}
+
+PG_FUNCTION_INFO_V1( pg_waitlsn );
+PG_FUNCTION_INFO_V1( pg_waitlsn_infinite );
+PG_FUNCTION_INFO_V1( pg_waitlsn_no_wait );
+
+
+Datum
+pg_waitlsn(PG_FUNCTION_ARGS)
+{
+	XLogRecPtr		trg_lsn = PG_GETARG_LSN(0);
+	uint64_t		delay = PG_GETARG_INT32(1);
+
+	PG_RETURN_BOOL(pg_waitlsn_internal(trg_lsn, delay));
+}
+
+Datum
+pg_waitlsn_infinite(PG_FUNCTION_ARGS)
+{
+	XLogRecPtr		trg_lsn = PG_GETARG_LSN(0);
+
+	PG_RETURN_BOOL(pg_waitlsn_internal(trg_lsn, 0));
+}
+
+Datum
+pg_waitlsn_no_wait(PG_FUNCTION_ARGS)
+{
+	XLogRecPtr		trg_lsn = PG_GETARG_LSN(0);
+
+	PG_RETURN_BOOL(pg_waitlsn_internal(trg_lsn, 1));
+}
+
+static bool
+pg_waitlsn_internal(XLogRecPtr trg_lsn, uint64_t delay)
+{
+	XLogRecPtr		cur_lsn = GetXLogReplayRecPtr(NULL);
+	int				latch_events;
+	uint64_t		tdelay = delay;
+	long			secs;
+	int				microsecs;
+	TimestampTz		timer = GetCurrentTimestamp();
+
+	if (delay > 0)
+		latch_events = WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH;
+	else
+		latch_events = WL_LATCH_SET | WL_POSTMASTER_DEATH;
+
+	wl_own_latch();
+	for (;;)
+	{
+		ResetLatch(&state->l_arr[MyBackendId].latch);
+		cur_lsn = GetXLogReplayRecPtr(NULL);
+
+		/* If LSN had been Replayed */
+		if (trg_lsn <= cur_lsn)
+			break;
+
+		/* If the postmaster dies, finish immediately */
+		if (!PostmasterIsAlive())
+			break;
+
+		/* If Delay time is over */
+		if (latch_events & WL_TIMEOUT)
+		{
+			if (TimestampDifferenceExceeds(timer,GetCurrentTimestamp(),delay))
+				break;
+			TimestampDifference(timer,GetCurrentTimestamp(),&secs, &microsecs);
+			tdelay = delay - (secs*1000 + microsecs/1000);
+		}
+
+		elog(DEBUG2,"WAITLSN  %x", MyPgXact->xmin);
+		MyPgXact->xmin = InvalidTransactionId;
+		WaitLatch(&state->l_arr[MyBackendId].latch, latch_events, tdelay, WAIT_EVENT_CLIENT_READ);
+		CHECK_FOR_INTERRUPTS();
+	}
+	wl_disown_latch();
+
+	return trg_lsn <= GetXLogReplayRecPtr(NULL);
+}
diff --git a/contrib/pg_waitlsn/pg_waitlsn.control b/contrib/pg_waitlsn/pg_waitlsn.control
new file mode 100644
index 0000000..7be85d6
--- /dev/null
+++ b/contrib/pg_waitlsn/pg_waitlsn.control
@@ -0,0 +1,5 @@
+# pg_waitlsn extension
+comment = 'target LSN waiter for slave replica'
+default_version = '1.0'
+module_pathname = '$libdir/pg_waitlsn'
+relocatable = true
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 2dcff7f..c6018e5 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -832,6 +832,9 @@ static bool holdingAllLocks = false;
 static MemoryContext walDebugCxt = NULL;
 #endif
 
+/* Hook after xlogreader replayed lsn */
+lsn_updated_hook_type lsn_updated_hook = NULL;
+
 static void readRecoveryCommandFile(void);
 static void exitArchiveRecovery(TimeLineID endTLI, XLogRecPtr endOfLog);
 static bool recoveryStopsBefore(XLogReaderState *record);
@@ -7174,6 +7177,12 @@ StartupXLOG(void)
 					break;
 				}
 
+				/*
+				 * Hook after update lastReplayedEndRecPtr
+				 */
+				if (lsn_updated_hook != NULL)
+					lsn_updated_hook();
+
 				/* Else, try to fetch the next WAL record */
 				record = ReadRecord(xlogreader, InvalidXLogRecPtr, LOG, false);
 			} while (record != NULL);
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 9f036c7..175023c 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -287,6 +287,12 @@ extern void assign_max_wal_size(int newval, void *extra);
 extern void assign_checkpoint_completion_target(double newval, void *extra);
 
 /*
+ * Hook after xlogreader replayed lsn
+ */
+typedef void (*lsn_updated_hook_type) (void);
+extern PGDLLIMPORT lsn_updated_hook_type lsn_updated_hook;
+
+/*
  * Starting/stopping a base backup
  */
 extern XLogRecPtr do_pg_start_backup(const char *backupidstr, bool fast,
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to