Hi all

Attached is a rebased and updated logical decoding timeline following
patch for 10.0.

This is a pre-requisite for the pending work on logical decoding on
standby servers and simplified failover of logical decoding.

Restating the commit message:
__________

Follow timeline switches in logical decoding

When decoding from a logical slot, it's necessary for xlog reading to
be able to read xlog from historical (i.e. not current) timelines.
Otherwise decoding fails after failover to a physical replica because
the oldest still-needed archives are in the historical timeline.

Supporting logical decoding timeline following is a pre-requisite for
logical decoding on physical standby servers. It also makes it
possible to promote a replica with logical slots to a master and
replay from those slots, allowing logical decoding applications to
follow physical failover.

Logical slots cannot actually be created on a replica without use of
the low-level C slot management APIs so this is mostly foundation work
for subsequent changes to enable logical decoding on standbys.

This commit includes a module in src/test/modules with functions to
manipulate the slots (which is not otherwise possible in SQL code) in
order to enable testing, and a new test in src/test/recovery to ensure
that the behavior is as expected.

Note that an earlier version of logical decoding timeline following
was committed to 9.5 as 24c5f1a103ce, 3a3b309041b0, 82c83b337202, and
f07d18b6e94d. It was then reverted by c1543a81a7a8 just after 9.5
feature freeze when issues were discovered too late to safely fix them
in the 9.5 release cycle.

The prior approach failed to consider that a record could be split
across pages that are on different segments, where the new segment
contains the start of a new timeline. In that case the old segment
might be missing or renamed with a .partial suffix.

This patch reworks the logic to be page-based and in the process
simplify how the last timeline for a segment is looked up.

Slot timeline following only works in a backend. Frontend support can
be aded separately, where it could be useful for pg_xlogdump etc once
support for timeline.c, List, etc is added for frontend code.
__________


I'm hoping to find time to refactor timeline following so that we
avoid passing timeline information around the xlogreader using
globals, but that'd be a separate change that can be made after this.

I've omitted the --endpos changes for pg_recvlogical, which again can
be added separately.

The test harness code will become unnecessary when proper support for
logical failover or logical decoding on standby is added, so I'm not
really sure it should be committed.

Prior threads:

* 
https://www.postgresql.org/message-id/camsr+yg_1fu_-l8qwsk6okft4jt8dpory2rhxdymy0b5zfk...@mail.gmail.com

* 
https://www.postgresql.org/message-id/CAMsr+YH-C1-X_+s=2nzapnr0wwqja-rumvhsyyzansn93mu...@mail.gmail.com

* http://www.postgresql.org/message-id/20160503165812.GA29604@alvherre.pgsql


-- 
 Craig Ringer                   http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services
From 10d5f4652689fbcaf5b14fe6bd991c98dbf60e00 Mon Sep 17 00:00:00 2001
From: Craig Ringer <cr...@2ndquadrant.com>
Date: Thu, 1 Sep 2016 10:16:55 +0800
Subject: [PATCH] Follow timeline switches in logical decoding

When decoding from a logical slot, it's necessary for xlog reading to
be able to read xlog from historical (i.e. not current) timelines.
Otherwise decoding fails after failover to a physical replica because
the oldest still-needed archives are in the historical timeline.

Supporting logical decoding timeline following is a pre-requisite for
logical decoding on physical standby servers. It also makes it
possible to promote a replica with logical slots to a master and
replay from those slots, allowing logical decoding applications to
follow physical failover.

Logical slots cannot actually be created on a replica without use of
the low-level C slot management APIs so this is mostly foundation work
for subsequent changes to enable logical decoding on standbys.

This commit includes a module in src/test/modules with functions to
manipulate the slots (which is not otherwise possible in SQL code) in
order to enable testing, and a new test in src/test/recovery to ensure
that the behavior is as expected.

Note that an earlier version of logical decoding timeline following
was committed to 9.5 as 24c5f1a103ce, 3a3b309041b0, 82c83b337202, and
f07d18b6e94d. It was then reverted by c1543a81a7a8 just after 9.5
feature freeze when issues were discovered too late to safely fix them
in the 9.5 release cycle.

The prior approach failed to consider that a record could be split
across pages that are on different segments, where the new segment
contains the start of a new timeline. In that case the old segment
might be missing or renamed with a .partial suffix.

This patch reworks the logic to be page-based and in the process
simplify how the last timeline for a segment is looked up.

Slot timeline following only works in a backend. Frontend support can
be aded separately, where it could be useful for pg_xlogdump etc once
support for timeline.c, List, etc is added for frontend code.
---
 src/backend/access/transam/xlogutils.c             | 207 +++++++++++++-
 src/backend/replication/logical/logicalfuncs.c     |  12 +-
 src/include/access/xlogreader.h                    |  11 +
 src/test/modules/Makefile                          |   1 +
 src/test/modules/test_slot_timelines/.gitignore    |   3 +
 src/test/modules/test_slot_timelines/Makefile      |  22 ++
 src/test/modules/test_slot_timelines/README        |  19 ++
 .../expected/load_extension.out                    |  19 ++
 .../expected/load_extension_1.out                  |   7 +
 .../test_slot_timelines/sql/load_extension.sql     |   7 +
 .../test_slot_timelines--1.0.sql                   |  16 ++
 .../test_slot_timelines/test_slot_timelines.c      | 133 +++++++++
 .../test_slot_timelines/test_slot_timelines.conf   |   2 +
 .../test_slot_timelines.control                    |   5 +
 src/test/recovery/Makefile                         |   2 +
 .../recovery/t/006_logical_decoding_timelines.pl   | 307 +++++++++++++++++++++
 16 files changed, 752 insertions(+), 21 deletions(-)
 create mode 100644 src/test/modules/test_slot_timelines/.gitignore
 create mode 100644 src/test/modules/test_slot_timelines/Makefile
 create mode 100644 src/test/modules/test_slot_timelines/README
 create mode 100644 src/test/modules/test_slot_timelines/expected/load_extension.out
 create mode 100644 src/test/modules/test_slot_timelines/expected/load_extension_1.out
 create mode 100644 src/test/modules/test_slot_timelines/sql/load_extension.sql
 create mode 100644 src/test/modules/test_slot_timelines/test_slot_timelines--1.0.sql
 create mode 100644 src/test/modules/test_slot_timelines/test_slot_timelines.c
 create mode 100644 src/test/modules/test_slot_timelines/test_slot_timelines.conf
 create mode 100644 src/test/modules/test_slot_timelines/test_slot_timelines.control
 create mode 100644 src/test/recovery/t/006_logical_decoding_timelines.pl

diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index 51a8e8d..014978f 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -19,6 +19,7 @@
 
 #include <unistd.h>
 
+#include "access/timeline.h"
 #include "access/xlog.h"
 #include "access/xlog_internal.h"
 #include "access/xlogutils.h"
@@ -660,6 +661,7 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
 	/* state maintained across calls */
 	static int	sendFile = -1;
 	static XLogSegNo sendSegNo = 0;
+	static TimeLineID sendTLI = 0;
 	static uint32 sendOff = 0;
 
 	p = buf;
@@ -675,7 +677,8 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
 		startoff = recptr % XLogSegSize;
 
 		/* Do we need to switch to a different xlog segment? */
-		if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo))
+		if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo) ||
+			sendTLI != tli)
 		{
 			char		path[MAXPGPATH];
 
@@ -702,6 +705,7 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
 									path)));
 			}
 			sendOff = 0;
+			sendTLI = tli;
 		}
 
 		/* Need to seek in the file? */
@@ -750,6 +754,142 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
 }
 
 /*
+ * Determine which timeline to read an xlog page from and set the
+ * XLogReaderState's state->currTLI to that timeline ID.
+ *
+ * wantPage must be set to the start address of the page to read and
+ * wantLength to the amount of the page that will be read, up to
+ * XLOG_BLCKSZ. If the amount to be read isn't known, pass XLOG_BLCKSZ.
+ *
+ * We switch to an xlog segment from the new timeline eagerly when on a
+ * historical timeline, as soon as we reach the start of the xlog segment
+ * containing the timeline switch.  The server copied the segment to the new
+ * timeline so all the data up to the switch point is the same, but there's no
+ * guarantee the old segment will still exist. It may have been deleted or
+ * renamed with a .partial suffix so we can't necessarily keep reading from
+ * the old TLI even though tliSwitchPoint says it's OK.
+ *
+ * We can't just check the timeline when we read a page on a different segment
+ * to the last page. We could've received a timeline switch from a cascading
+ * upstream, so the current segment ends and we have to switch to a new one.
+ * Even in the middle of reading a page we could have to dump the cached page
+ * and switch to a new TLI.
+ *
+ * Because of this, callers MAY NOT assume that currTLI is the timeline that
+ * will be in a page's xlp_tli; the page may begin on an older timeline or we
+ * might be reading from historical timeline data on a segment that's been
+ * copied to a new timeline.
+ *
+ * The caller must also make sure it doesn't read past the current redo pointer
+ * so it doesn't fail to notice that the current timeline became historical.
+ */
+static void
+XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength)
+{
+	const XLogRecPtr lastReadPage = state->readSegNo * XLogSegSize + state->readOff;
+
+	elog(DEBUG4, "Determining timeline for read at %X/%X+%X",
+			(uint32)(wantPage>>32), (uint32)wantPage, wantLength);
+
+	Assert(wantPage != InvalidXLogRecPtr && wantPage % XLOG_BLCKSZ == 0);
+	Assert(wantLength <= XLOG_BLCKSZ);
+	Assert(state->readLen == 0 || state->readLen <= XLOG_BLCKSZ);
+
+	/*
+	 * If the desired page is currently read in and valid, we have nothing to do.
+	 *
+	 * The caller should've ensured that it didn't previously advance readOff
+	 * past the valid limit of this timeline, so it doesn't matter if the current
+	 * TLI has since become historical.
+	 */
+	if (lastReadPage == wantPage &&
+		state->readLen != 0 &&
+		lastReadPage + state->readLen >= wantPage + Min(wantLength,XLOG_BLCKSZ-1))
+	{
+		elog(DEBUG4, "Wanted data already valid"); //XXX
+		return;
+	}
+
+	/*
+	 * If we're reading from the current timeline, it hasn't become historical
+	 * and the page we're reading is after the last page read, we can again
+	 * just carry on. (Seeking backwards requires a check to make sure the older
+	 * page isn't on a prior timeline).
+	 */
+	if (state->currTLI == ThisTimeLineID && wantPage >= lastReadPage)
+	{
+		Assert(state->currTLIValidUntil == InvalidXLogRecPtr);
+		elog(DEBUG4, "On current timeline");
+		return;
+	}
+
+	/*
+	 * If we're just reading pages from a previously validated historical
+	 * timeline and the timeline we're reading from is valid until the
+	 * end of the current segment we can just keep reading.
+	 */
+	if (state->currTLIValidUntil != InvalidXLogRecPtr &&
+		state->currTLI != ThisTimeLineID &&
+		state->currTLI != 0 &&
+		(wantPage + wantLength) / XLogSegSize < state->currTLIValidUntil / XLogSegSize)
+	{
+		elog(DEBUG4, "Still on historical timeline %u until %X/%X",
+				state->currTLI,
+				(uint32)(state->currTLIValidUntil >> 32),
+				(uint32)(state->currTLIValidUntil));
+		return;
+	}
+
+	/*
+	 * If we reach this point we're either looking up a page for random access,
+	 * the current timeline just became historical, or we're reading from a new
+	 * segment containing a timeline switch. In all cases we need to determine
+	 * the newest timeline on the segment.
+	 *
+	 * If it's the current timeline we can just keep reading from here unless
+	 * we detect a timeline switch that makes the current timeline historical.
+	 * If it's a historical timeline we can read all the segment on the newest
+	 * timeline because it contains all the old timelines' data too. So only
+	 * one switch check is required.
+	 */
+	{
+		/*
+		 * We need to re-read the timeline history in case it's been changed
+		 * by a promotion or replay from a cascaded replica.
+		 */
+		List *timelineHistory = readTimeLineHistory(ThisTimeLineID);
+
+		XLogRecPtr endOfSegment = (((wantPage / XLogSegSize) + 1) * XLogSegSize) - 1;
+
+		Assert(wantPage / XLogSegSize == endOfSegment / XLogSegSize);
+
+		/* Find the timeline of the last LSN on the segment containing wantPage. */
+		state->currTLI = tliOfPointInHistory(endOfSegment, timelineHistory);
+		state->currTLIValidUntil = tliSwitchPoint(state->currTLI, timelineHistory, NULL);
+
+		Assert(state->currTLIValidUntil == InvalidXLogRecPtr ||
+				wantPage + wantLength < state->currTLIValidUntil);
+
+		list_free_deep(timelineHistory);
+
+		elog(DEBUG3, "switched to timeline %u valid until %X/%X",
+				state->currTLI,
+				(uint32)(state->currTLIValidUntil >> 32),
+				(uint32)(state->currTLIValidUntil));
+	}
+
+	elog(DEBUG3, "page read ptr %X/%X (for record %X/%X) is on segment with TLI %u valid until %X/%X, server current TLI is %u",
+		 (uint32) (wantPage >> 32),
+		 (uint32) wantPage,
+		 (uint32) (state->currRecPtr >> 32),
+		 (uint32) state->currRecPtr,
+		 state->currTLI,
+		 (uint32) (state->currTLIValidUntil >> 32),
+		 (uint32) (state->currTLIValidUntil),
+		 ThisTimeLineID);
+}
+
+/*
  * read_page callback for reading local xlog files
  *
  * Public because it would likely be very helpful for someone writing another
@@ -770,28 +910,65 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
 	int			count;
 
 	loc = targetPagePtr + reqLen;
+
+	/* Make sure enough xlog is available... */
 	while (1)
 	{
 		/*
-		 * TODO: we're going to have to do something more intelligent about
-		 * timelines on standbys. Use readTimeLineHistory() and
-		 * tliOfPointInHistory() to get the proper LSN? For now we'll catch
-		 * that case earlier, but the code and TODO is left in here for when
-		 * that changes.
+		 * Check which timeline to get the record from.
+		 *
+		 * We have to do it each time through the loop because if we're in
+		 * recovery as a cascading standby, the current timeline might've
+		 * become historical.
 		 */
-		if (!RecoveryInProgress())
+		XLogReadDetermineTimeline(state, targetPagePtr, reqLen);
+
+		if (state->currTLI == ThisTimeLineID)
 		{
-			*pageTLI = ThisTimeLineID;
-			read_upto = GetFlushRecPtr();
+			/*
+			 * We're reading from the current timeline so we might have to
+			 * wait for the desired record to be generated (or, for a standby,
+			 * received & replayed)
+			 */
+			if (!RecoveryInProgress())
+			{
+				*pageTLI = ThisTimeLineID;
+				read_upto = GetFlushRecPtr();
+			}
+			else
+				read_upto = GetXLogReplayRecPtr(pageTLI);
+
+			if (loc <= read_upto)
+				break;
+
+			CHECK_FOR_INTERRUPTS();
+			pg_usleep(1000L);
 		}
 		else
-			read_upto = GetXLogReplayRecPtr(pageTLI);
-
-		if (loc <= read_upto)
+		{
+			/*
+			 * We're on a historical timeline, so limit reading to the switch
+			 * point where we moved to the next timeline.
+			 *
+			 * We don't need to GetFlushRecPtr or GetXLogReplayRecPtr. We know
+			 * about the new timeline, so we must've received past the end of
+			 * it.
+			 */
+			read_upto = state->currTLIValidUntil;
+
+			/*
+			 * Setting pageTLI to our wanted record's TLI is slightly wrong;
+			 * the page might begin on an older timeline if it contains a
+			 * timeline switch, since its xlog segment will have been copied
+			 * from the prior timeline. This is pretty harmless though, as
+			 * nothing cares so long as the timeline doesn't go backwards.  We
+			 * should read the page header instead; FIXME someday.
+			 */
+			*pageTLI = state->currTLI;
+
+			/* No need to wait on a historical timeline */
 			break;
-
-		CHECK_FOR_INTERRUPTS();
-		pg_usleep(1000L);
+		}
 	}
 
 	if (targetPagePtr + XLOG_BLCKSZ <= read_upto)
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index 4e4c8cd..99112ac 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -234,12 +234,6 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 	rsinfo->setResult = p->tupstore;
 	rsinfo->setDesc = p->tupdesc;
 
-	/* compute the current end-of-wal */
-	if (!RecoveryInProgress())
-		end_of_wal = GetFlushRecPtr();
-	else
-		end_of_wal = GetXLogReplayRecPtr(NULL);
-
 	ReplicationSlotAcquire(NameStr(*name));
 
 	PG_TRY();
@@ -279,6 +273,12 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 		/* invalidate non-timetravel entries */
 		InvalidateSystemCaches();
 
+		if (!RecoveryInProgress())
+			end_of_wal = GetFlushRecPtr();
+		else
+			end_of_wal = GetXLogReplayRecPtr(NULL);
+
+		/* Decode until we run out of records */
 		while ((startptr != InvalidXLogRecPtr && startptr < end_of_wal) ||
 			   (ctx->reader->EndRecPtr != InvalidXLogRecPtr && ctx->reader->EndRecPtr < end_of_wal))
 		{
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index deaa7f5..caff9a6 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -27,6 +27,10 @@
 
 #include "access/xlogrecord.h"
 
+#ifndef FRONTEND
+#include "nodes/pg_list.h"
+#endif
+
 typedef struct XLogReaderState XLogReaderState;
 
 /* Function type definition for the read_page callback */
@@ -160,6 +164,13 @@ struct XLogReaderState
 
 	/* beginning of the WAL record being read. */
 	XLogRecPtr	currRecPtr;
+	/* timeline to read it from, 0 if a lookup is required */
+	TimeLineID	currTLI;
+	/*
+	 * Safe point to read to in currTLI if current TLI is historical
+	 * (tliSwitchPoint) or InvalidXLogRecPtr if on current timeline.
+	 */
+	XLogRecPtr	currTLIValidUntil;
 
 	/* Buffer for current ReadRecord result (expandable) */
 	char	   *readRecordBuf;
diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile
index 3ce9904..106b816 100644
--- a/src/test/modules/Makefile
+++ b/src/test/modules/Makefile
@@ -15,6 +15,7 @@ SUBDIRS = \
 		  test_pg_dump \
 		  test_rls_hooks \
 		  test_shm_mq \
+		  test_slot_timelines \
 		  worker_spi
 
 all: submake-generated-headers
diff --git a/src/test/modules/test_slot_timelines/.gitignore b/src/test/modules/test_slot_timelines/.gitignore
new file mode 100644
index 0000000..543c50d
--- /dev/null
+++ b/src/test/modules/test_slot_timelines/.gitignore
@@ -0,0 +1,3 @@
+results/
+tmp_check/
+log/
diff --git a/src/test/modules/test_slot_timelines/Makefile b/src/test/modules/test_slot_timelines/Makefile
new file mode 100644
index 0000000..21757c5
--- /dev/null
+++ b/src/test/modules/test_slot_timelines/Makefile
@@ -0,0 +1,22 @@
+# src/test/modules/test_slot_timelines/Makefile
+
+MODULES = test_slot_timelines
+PGFILEDESC = "test_slot_timelines - test utility for slot timeline following"
+
+EXTENSION = test_slot_timelines
+DATA = test_slot_timelines--1.0.sql
+
+EXTRA_INSTALL=contrib/test_decoding
+REGRESS=load_extension
+REGRESS_OPTS = --temp-config=$(top_srcdir)/src/test/modules/test_slot_timelines/test_slot_timelines.conf
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = src/test/modules/test_slot_timelines
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/src/test/modules/test_slot_timelines/README b/src/test/modules/test_slot_timelines/README
new file mode 100644
index 0000000..585f02f
--- /dev/null
+++ b/src/test/modules/test_slot_timelines/README
@@ -0,0 +1,19 @@
+A test module for logical decoding failover and timeline following.
+
+This module provides a minimal way to maintain logical slots on replicas
+that mirror the state on the master. It doesn't make decoding possible,
+just tracking slot state so that a decoding client that's using the master
+can follow a physical failover to the standby. The master doesn't know
+about the slots on the standby, they're synced by a client that connects
+to both.
+
+This is intentionally not part of the test_decoding module because that's meant
+to serve as example code, where this module exercises internal server features
+by unsafely exposing internal state to SQL. It's not the right way to do
+failover, it's just a simple way to test it from the perl TAP framework to
+prove the feature works.
+
+In a practical implementation of this approach a bgworker on the master would
+monitor slot positions and relay them to a bgworker on the standby that applies
+the position updates without exposing slot internals to SQL. That's too complex
+for this test framework though.
diff --git a/src/test/modules/test_slot_timelines/expected/load_extension.out b/src/test/modules/test_slot_timelines/expected/load_extension.out
new file mode 100644
index 0000000..7c2ad9d
--- /dev/null
+++ b/src/test/modules/test_slot_timelines/expected/load_extension.out
@@ -0,0 +1,19 @@
+CREATE EXTENSION test_slot_timelines;
+SELECT test_slot_timelines_create_logical_slot('test_slot', 'test_decoding');
+ test_slot_timelines_create_logical_slot 
+-----------------------------------------
+ 
+(1 row)
+
+SELECT test_slot_timelines_advance_logical_slot('test_slot', txid_current()::text::xid, txid_current()::text::xid, pg_current_xlog_location(), pg_current_xlog_location());
+ test_slot_timelines_advance_logical_slot 
+------------------------------------------
+ 
+(1 row)
+
+SELECT pg_drop_replication_slot('test_slot');
+ pg_drop_replication_slot 
+--------------------------
+ 
+(1 row)
+
diff --git a/src/test/modules/test_slot_timelines/expected/load_extension_1.out b/src/test/modules/test_slot_timelines/expected/load_extension_1.out
new file mode 100644
index 0000000..0db21e4
--- /dev/null
+++ b/src/test/modules/test_slot_timelines/expected/load_extension_1.out
@@ -0,0 +1,7 @@
+CREATE EXTENSION test_slot_timelines;
+SELECT test_slot_timelines_create_logical_slot('test_slot', 'test_decoding');
+ERROR:  replication slots can only be used if max_replication_slots > 0
+SELECT test_slot_timelines_advance_logical_slot('test_slot', txid_current()::text::xid, txid_current()::text::xid, pg_current_xlog_location(), pg_current_xlog_location());
+ERROR:  replication slots can only be used if max_replication_slots > 0
+SELECT pg_drop_replication_slot('test_slot');
+ERROR:  replication slots can only be used if max_replication_slots > 0
diff --git a/src/test/modules/test_slot_timelines/sql/load_extension.sql b/src/test/modules/test_slot_timelines/sql/load_extension.sql
new file mode 100644
index 0000000..2440355
--- /dev/null
+++ b/src/test/modules/test_slot_timelines/sql/load_extension.sql
@@ -0,0 +1,7 @@
+CREATE EXTENSION test_slot_timelines;
+
+SELECT test_slot_timelines_create_logical_slot('test_slot', 'test_decoding');
+
+SELECT test_slot_timelines_advance_logical_slot('test_slot', txid_current()::text::xid, txid_current()::text::xid, pg_current_xlog_location(), pg_current_xlog_location());
+
+SELECT pg_drop_replication_slot('test_slot');
diff --git a/src/test/modules/test_slot_timelines/test_slot_timelines--1.0.sql b/src/test/modules/test_slot_timelines/test_slot_timelines--1.0.sql
new file mode 100644
index 0000000..a1886f7
--- /dev/null
+++ b/src/test/modules/test_slot_timelines/test_slot_timelines--1.0.sql
@@ -0,0 +1,16 @@
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "CREATE EXTENSION test_slot_timelines" to load this file. \quit
+
+CREATE OR REPLACE FUNCTION test_slot_timelines_create_logical_slot(slot_name text, plugin text)
+RETURNS void
+STRICT LANGUAGE c AS 'MODULE_PATHNAME';
+
+COMMENT ON FUNCTION test_slot_timelines_create_logical_slot(text, text)
+IS 'Create a logical slot at a particular lsn and xid. Do not use in production servers, it is not safe. The slot is created with an invalid xmin and lsn.';
+
+CREATE OR REPLACE FUNCTION test_slot_timelines_advance_logical_slot(slot_name text, new_xmin xid, new_catalog_xmin xid, new_restart_lsn pg_lsn, new_confirmed_lsn pg_lsn)
+RETURNS void
+STRICT LANGUAGE c AS 'MODULE_PATHNAME';
+
+COMMENT ON FUNCTION test_slot_timelines_advance_logical_slot(text, xid, xid, pg_lsn, pg_lsn)
+IS 'Advance a logical slot directly. Do not use this in production servers, it is not safe.';
diff --git a/src/test/modules/test_slot_timelines/test_slot_timelines.c b/src/test/modules/test_slot_timelines/test_slot_timelines.c
new file mode 100644
index 0000000..1f07488
--- /dev/null
+++ b/src/test/modules/test_slot_timelines/test_slot_timelines.c
@@ -0,0 +1,133 @@
+/*--------------------------------------------------------------------------
+ *
+ * test_slot_timelines.c
+ *              Test harness code for slot timeline following
+ *
+ * Copyright (c) 2016, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *              src/test/modules/test_slot_timelines/test_slot_timelines.c
+ *
+ * -------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/transam.h"
+#include "fmgr.h"
+#include "miscadmin.h"
+#include "replication/slot.h"
+#include "utils/builtins.h"
+#include "utils/pg_lsn.h"
+
+PG_MODULE_MAGIC;
+
+PG_FUNCTION_INFO_V1(test_slot_timelines_create_logical_slot);
+PG_FUNCTION_INFO_V1(test_slot_timelines_advance_logical_slot);
+
+static void clear_slot_transient_state(void);
+
+/*
+ * Create a new logical slot, with invalid LSN and xid, directly. This does not
+ * use the snapshot builder or logical decoding machinery. It's only intended
+ * for creating a slot on a replica that mirrors the state of a slot on an
+ * upstream master.
+ *
+ * Note that this is test harness code. You shouldn't expose slot internals
+ * to SQL like this for any real world usage. See the README.
+ */
+Datum
+test_slot_timelines_create_logical_slot(PG_FUNCTION_ARGS)
+{
+	char	   *slotname = text_to_cstring(PG_GETARG_TEXT_P(0));
+	char	   *plugin = text_to_cstring(PG_GETARG_TEXT_P(1));
+
+	CheckSlotRequirements();
+
+	ReplicationSlotCreate(slotname, true, RS_PERSISTENT);
+
+	/* register the plugin name with the slot */
+	StrNCpy(NameStr(MyReplicationSlot->data.plugin), plugin, NAMEDATALEN);
+
+	/*
+	 * Initialize persistent state to placeholders to be set by
+	 * test_slot_timelines_advance_logical_slot .
+	 */
+	MyReplicationSlot->data.xmin = InvalidTransactionId;
+	MyReplicationSlot->data.catalog_xmin = InvalidTransactionId;
+	MyReplicationSlot->data.restart_lsn = InvalidXLogRecPtr;
+	MyReplicationSlot->data.confirmed_flush = InvalidXLogRecPtr;
+
+	clear_slot_transient_state();
+
+	ReplicationSlotRelease();
+
+	PG_RETURN_VOID();
+}
+
+/*
+ * Set the state of a slot.
+ *
+ * This doesn't maintain the non-persistent state at all,
+ * but since the slot isn't in use that's OK.
+ *
+ * There's intentionally no check to prevent slots going backwards
+ * because they can actually go backwards if the master crashes when
+ * it hasn't yet flushed slot state to disk then we copy the older
+ * slot state after recovery.
+ *
+ * There's no checking done for xmin or catalog xmin either, since
+ * we can't really do anything useful that accounts for xid wrap-around.
+ *
+ * Note that this is test harness code. You shouldn't expose slot internals
+ * to SQL like this for any real world usage. See the README.
+ */
+Datum
+test_slot_timelines_advance_logical_slot(PG_FUNCTION_ARGS)
+{
+	char	   *slotname = text_to_cstring(PG_GETARG_TEXT_P(0));
+	TransactionId new_xmin = DatumGetTransactionId(PG_GETARG_DATUM(1));
+	TransactionId new_catalog_xmin = DatumGetTransactionId(PG_GETARG_DATUM(2));
+	XLogRecPtr	restart_lsn = PG_GETARG_LSN(3);
+	XLogRecPtr	confirmed_lsn = PG_GETARG_LSN(4);
+
+	CheckSlotRequirements();
+
+	ReplicationSlotAcquire(slotname);
+
+	if (MyReplicationSlot->data.database != MyDatabaseId)
+		elog(ERROR, "trying to update a slot on a different database");
+
+	MyReplicationSlot->data.xmin = new_xmin;
+	MyReplicationSlot->data.catalog_xmin = new_catalog_xmin;
+	MyReplicationSlot->data.restart_lsn = restart_lsn;
+	MyReplicationSlot->data.confirmed_flush = confirmed_lsn;
+
+	clear_slot_transient_state();
+
+	ReplicationSlotMarkDirty();
+	ReplicationSlotSave();
+	ReplicationSlotRelease();
+
+	ReplicationSlotsComputeRequiredXmin(false);
+	ReplicationSlotsComputeRequiredLSN();
+
+	PG_RETURN_VOID();
+}
+
+static void
+clear_slot_transient_state(void)
+{
+	Assert(MyReplicationSlot != NULL);
+
+	/*
+	 * Make sure the slot state is the same as if it were newly loaded from
+	 * disk on recovery.
+	 */
+	MyReplicationSlot->effective_xmin = MyReplicationSlot->data.xmin;
+	MyReplicationSlot->effective_catalog_xmin = MyReplicationSlot->data.catalog_xmin;
+
+	MyReplicationSlot->candidate_catalog_xmin = InvalidTransactionId;
+	MyReplicationSlot->candidate_xmin_lsn = InvalidXLogRecPtr;
+	MyReplicationSlot->candidate_restart_lsn = InvalidXLogRecPtr;
+	MyReplicationSlot->candidate_restart_valid = InvalidXLogRecPtr;
+}
diff --git a/src/test/modules/test_slot_timelines/test_slot_timelines.conf b/src/test/modules/test_slot_timelines/test_slot_timelines.conf
new file mode 100644
index 0000000..56b46d7
--- /dev/null
+++ b/src/test/modules/test_slot_timelines/test_slot_timelines.conf
@@ -0,0 +1,2 @@
+max_replication_slots=2
+wal_level=logical
diff --git a/src/test/modules/test_slot_timelines/test_slot_timelines.control b/src/test/modules/test_slot_timelines/test_slot_timelines.control
new file mode 100644
index 0000000..dcee1a7
--- /dev/null
+++ b/src/test/modules/test_slot_timelines/test_slot_timelines.control
@@ -0,0 +1,5 @@
+# test_slot_timelines extension
+comment = 'Test utility for slot timeline following and logical decoding'
+default_version = '1.0'
+module_pathname = '$libdir/test_slot_timelines'
+relocatable = true
diff --git a/src/test/recovery/Makefile b/src/test/recovery/Makefile
index 9290719..78570dd 100644
--- a/src/test/recovery/Makefile
+++ b/src/test/recovery/Makefile
@@ -9,6 +9,8 @@
 #
 #-------------------------------------------------------------------------
 
+EXTRA_INSTALL=contrib/test_decoding src/test/modules/test_slot_timelines
+
 subdir = src/test/recovery
 top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
diff --git a/src/test/recovery/t/006_logical_decoding_timelines.pl b/src/test/recovery/t/006_logical_decoding_timelines.pl
new file mode 100644
index 0000000..a3a4b61
--- /dev/null
+++ b/src/test/recovery/t/006_logical_decoding_timelines.pl
@@ -0,0 +1,307 @@
+# Demonstrate that logical can follow timeline switches.
+#
+# Logical replication slots can follow timeline switches but it's
+# normally not possible to have a logical slot on a replica where
+# promotion and a timeline switch can occur. The only ways
+# we can create that circumstance are:
+#
+# * By doing a filesystem-level copy of the DB, since pg_basebackup
+#   excludes pg_replslot but we can copy it directly; or
+#
+# * by creating a slot directly at the C level on the replica and
+#   advancing it as we go using the low level APIs. It can't be done
+#   from SQL since logical decoding isn't allowed on replicas.
+#
+# This module uses the first approach to show that timeline following
+# on a logical slot works.
+#
+use strict;
+use warnings;
+
+use PostgresNode;
+use TestLib;
+use Test::More tests => 20;
+use RecursiveCopy;
+use File::Copy;
+
+my ($stdout, $stderr, $ret);
+
+# Initialize master node
+my $node_master = get_new_node('master');
+$node_master->init(allows_streaming => 1, has_archiving => 1);
+$node_master->append_conf('postgresql.conf', "wal_level = 'logical'\n");
+$node_master->append_conf('postgresql.conf', "max_replication_slots = 2\n");
+$node_master->append_conf('postgresql.conf', "max_wal_senders = 2\n");
+$node_master->append_conf('postgresql.conf', "log_min_messages = 'debug2'\n");
+$node_master->dump_info;
+$node_master->start;
+
+diag "Testing logical timeline following with a filesystem-level copy";
+
+$node_master->safe_psql('postgres',
+"SELECT pg_create_logical_replication_slot('before_basebackup', 'test_decoding');"
+);
+$node_master->safe_psql('postgres', "CREATE TABLE decoding(blah text);");
+$node_master->safe_psql('postgres',
+	"INSERT INTO decoding(blah) VALUES ('beforebb');");
+$node_master->safe_psql('postgres', 'CHECKPOINT;');
+
+my $backup_name = 'b1';
+$node_master->backup_fs_hot($backup_name);
+
+my $node_replica = get_new_node('replica');
+$node_replica->init_from_backup(
+	$node_master, $backup_name,
+	has_streaming => 1,
+	has_restoring => 1);
+$node_replica->start;
+
+$node_master->safe_psql('postgres',
+"SELECT pg_create_logical_replication_slot('after_basebackup', 'test_decoding');"
+);
+$node_master->safe_psql('postgres',
+	"INSERT INTO decoding(blah) VALUES ('afterbb');");
+$node_master->safe_psql('postgres', 'CHECKPOINT;');
+
+# Verify that only the before base_backup slot is on the replica
+$stdout = $node_replica->safe_psql('postgres',
+	'SELECT slot_name FROM pg_replication_slots ORDER BY slot_name');
+is($stdout, 'before_basebackup',
+	'Expected to find only slot before_basebackup on replica');
+
+# Boom, crash
+$node_master->stop('immediate');
+
+$node_replica->promote;
+$node_replica->poll_query_until('postgres',
+	"SELECT NOT pg_is_in_recovery();");
+
+$node_replica->safe_psql('postgres',
+	"INSERT INTO decoding(blah) VALUES ('after failover');");
+
+# Shouldn't be able to read from slot created after base backup
+($ret, $stdout, $stderr) = $node_replica->psql('postgres',
+"SELECT data FROM pg_logical_slot_peek_changes('after_basebackup', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');"
+);
+is($ret, 3, 'replaying from after_basebackup slot fails');
+like(
+	$stderr,
+	qr/replication slot "after_basebackup" does not exist/,
+	'after_basebackup slot missing');
+
+# Should be able to read from slot created before base backup
+($ret, $stdout, $stderr) = $node_replica->psql(
+	'postgres',
+"SELECT data FROM pg_logical_slot_peek_changes('before_basebackup', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');",
+	timeout => 30);
+is($ret, 0, 'replay from slot before_basebackup succeeds');
+is( $stdout, q(BEGIN
+table public.decoding: INSERT: blah[text]:'beforebb'
+COMMIT
+BEGIN
+table public.decoding: INSERT: blah[text]:'afterbb'
+COMMIT
+BEGIN
+table public.decoding: INSERT: blah[text]:'after failover'
+COMMIT), 'decoded expected data from slot before_basebackup');
+is($stderr, '', 'replay from slot before_basebackup produces no stderr');
+
+# We don't need the standby anymore
+$node_replica->teardown_node();
+
+
+# OK, time to try the same thing again, but this time we'll be using slot
+# mirroring on the standby and a pg_basebackup of the master.
+
+diag "Testing logical timeline following with test_slot_timelines module";
+
+$node_master->start();
+
+# Clean up after the last test
+$node_master->safe_psql('postgres', 'DELETE FROM decoding;');
+is( $node_master->psql(
+		'postgres',
+'SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots;'),
+	0,
+	'dropping slots succeeds via pg_drop_replication_slot');
+
+# Same as before, we'll make one slot before basebackup, one after. This time
+# the basebackup will be with pg_basebackup so it'll omit both slots, then
+# we'll use SQL functions provided by the test_slot_timelines test module to sync
+# them to the replica, do some work, sync them and fail over then test again.
+# This time we should have both the before- and after-basebackup slots working.
+
+is( $node_master->psql(
+		'postgres',
+"SELECT pg_create_logical_replication_slot('before_basebackup', 'test_decoding');"
+	),
+	0,
+	'creating slot before_basebackup succeeds');
+
+$node_master->safe_psql('postgres',
+	"INSERT INTO decoding(blah) VALUES ('beforebb');");
+
+$backup_name = 'b2';
+$node_master->backup($backup_name);
+
+is( $node_master->psql(
+		'postgres',
+"SELECT pg_create_logical_replication_slot('after_basebackup', 'test_decoding');"
+	),
+	0,
+	'creating slot after_basebackup succeeds');
+
+$node_master->safe_psql('postgres',
+	"INSERT INTO decoding(blah) VALUES ('afterbb');");
+
+$node_replica = get_new_node('replica2');
+$node_replica->init_from_backup(
+	$node_master, $backup_name,
+	has_streaming => 1,
+	has_restoring => 1);
+
+$node_replica->start;
+
+# Verify the slots are both absent on the replica
+$stdout = $node_replica->safe_psql('postgres',
+	'SELECT slot_name FROM pg_replication_slots ORDER BY slot_name');
+is($stdout, '', 'No slots exist on the replica');
+
+# Now do our magic to sync the slot states across. Normally
+# this would be being done continuously by a bgworker but
+# we're just doing it by hand for this test. This is exposing
+# postgres innards to SQL so it's unsafe except for testing.
+$node_master->safe_psql('postgres', 'CREATE EXTENSION test_slot_timelines;');
+
+my $slotinfo = $node_master->safe_psql(
+	'postgres',
+	qq{SELECT slot_name, plugin,
+	COALESCE(xmin, '0'), catalog_xmin,
+	restart_lsn, confirmed_flush_lsn
+	FROM pg_replication_slots ORDER BY slot_name}
+);
+diag "Copying slots to replica";
+open my $fh, '<', \$slotinfo or die $!;
+while (<$fh>)
+{
+	print $_;
+	chomp $_;
+	my ($slot_name, $plugin, $xmin, $catalog_xmin, $restart_lsn,
+		$confirmed_flush_lsn)
+	  = map { "'$_'" } split qr/\|/, $_;
+
+	print
+"# Copying slot $slot_name,$plugin,$xmin,$catalog_xmin,$restart_lsn,$confirmed_flush_lsn\n";
+	$node_replica->safe_psql('postgres',
+		"SELECT test_slot_timelines_create_logical_slot($slot_name, $plugin);"
+	);
+	$node_replica->safe_psql('postgres',
+"SELECT test_slot_timelines_advance_logical_slot($slot_name, $xmin, $catalog_xmin, $restart_lsn, $confirmed_flush_lsn);"
+	);
+}
+close $fh or die $!;
+
+# Now both slots are present on the replica and exactly match the master
+$stdout = $node_replica->safe_psql('postgres',
+	'SELECT slot_name FROM pg_replication_slots ORDER BY slot_name');
+is( $stdout,
+	"after_basebackup\nbefore_basebackup",
+	'both slots now exist on replica');
+
+$stdout = $node_replica->safe_psql(
+	'postgres',
+	qq{SELECT slot_name, plugin, COALESCE(xmin, '0'), catalog_xmin,
+			  restart_lsn, confirmed_flush_lsn
+		 FROM pg_replication_slots
+	 ORDER BY slot_name});
+is($stdout, $slotinfo,
+	"slot data read back from replica matches slot data on master");
+
+# We now have to copy some extra WAL to satisfy the requirements of the oldest
+# replication slot. pg_basebackup doesn't know to copy the extra WAL for slots
+# so we have to help out. We know the WAL is still retained on the master
+# because we haven't advanced the slots there.
+#
+# Figure out what the oldest segment we need is by looking at the restart_lsn
+# of the oldest slot.
+#
+# It only makes sense to do this once the slots are created on the replica,
+# otherwise it might just delete the segments again.
+
+my $oldest_needed_segment = $node_master->safe_psql(
+	'postgres',
+	qq{SELECT pg_xlogfile_name((
+      SELECT restart_lsn
+      FROM pg_replication_slots
+      ORDER BY restart_lsn ASC
+      LIMIT 1
+     ));}
+);
+
+diag "oldest needed xlog seg is $oldest_needed_segment ";
+
+# WAL segment names sort lexically so we can just grab everything > than this
+# segment.
+opendir(my $pg_xlog, $node_master->data_dir . "/pg_xlog") or die $!;
+while (my $seg = readdir $pg_xlog)
+{
+	next if $seg eq '.' or $seg eq '..';
+	next unless $seg >= $oldest_needed_segment && $seg =~ /^[0-9]{24}/;
+	diag "copying xlog seg $seg";
+	copy(
+		$node_master->data_dir . "/pg_xlog/" . $seg,
+		$node_replica->data_dir . "/pg_xlog/" . $seg
+	) or die "copy of xlog seg $seg failed: $!";
+}
+closedir $pg_xlog;
+
+# Boom, crash the master
+$node_master->stop('immediate');
+
+$node_replica->promote;
+$node_replica->poll_query_until('postgres',
+	"SELECT NOT pg_is_in_recovery();");
+
+$node_replica->safe_psql('postgres',
+	"INSERT INTO decoding(blah) VALUES ('after failover');");
+
+# This time we can read from both slots
+($ret, $stdout, $stderr) = $node_replica->psql(
+	'postgres',
+"SELECT data FROM pg_logical_slot_peek_changes('after_basebackup', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');",
+	timeout => 30);
+is($ret, 0, 'replay from slot after_basebackup succeeds');
+is( $stdout, q(BEGIN
+table public.decoding: INSERT: blah[text]:'afterbb'
+COMMIT
+BEGIN
+table public.decoding: INSERT: blah[text]:'after failover'
+COMMIT), 'decoded expected data from slot after_basebackup');
+is($stderr, '', 'replay from slot after_basebackup produces no stderr');
+
+# Should be able to read from slot created before base backup
+#
+# This would fail with an error about missing WAL segments if we hadn't
+# copied extra WAL earlier.
+($ret, $stdout, $stderr) = $node_replica->psql(
+	'postgres',
+"SELECT data FROM pg_logical_slot_peek_changes('before_basebackup', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');",
+	timeout => 30);
+is($ret, 0, 'replay from slot before_basebackup succeeds');
+is( $stdout, q(BEGIN
+table public.decoding: INSERT: blah[text]:'beforebb'
+COMMIT
+BEGIN
+table public.decoding: INSERT: blah[text]:'afterbb'
+COMMIT
+BEGIN
+table public.decoding: INSERT: blah[text]:'after failover'
+COMMIT), 'decoded expected data from slot before_basebackup');
+is($stderr, '', 'replay from slot before_basebackup produces no stderr');
+
+($ret, $stdout, $stderr) = $node_replica->psql('postgres',
+	'SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots;');
+is($ret,    0,  'dropping slots succeeds via pg_drop_replication_slot');
+is($stderr, '', 'dropping slots produces no stderr output');
+
+1;
-- 
2.5.5

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to