On 22 March 2017 at 10:51, Craig Ringer <cr...@2ndquadrant.com> wrote:
> Hi all
>
> Updated timeline following patch attached.
>
> There's a change in read_local_xlog_page to ensure we maintain
> ThisTimeLineID properly, otherwise it's just comment changes.

OK, so we're looking OK with the TL following.

I'm splitting up the rest of the decoding on standby patch set with
the goal of getting minimal functionality for creating and managing
slots on standbys in, so we can maintain slots on standbys and use
them when the standby is promoted to master.

The first, to send catalog_xmin separately to the global xmin on
hot_standby_feedback and store it in the upstream physical slot's
catalog_xmin, is attached.

These are extracted directly from the logical decoding on standby
patch, with comments by Petr and Andres made re the relevant code
addressed.

I will next be working on a bare-minimum facility for creating and
advancing logical slots on a replica without support for buffering
changes, creating historic snapshots or invoking output plugin. The
slots will become usable after the replica is promoted. They'll track
their own restart_lsn, etc, and will keep track of xids so they can
manage their catalog_xmin, so there'll be no need for dodgy slot
syncing from the master, but they'll avoid most of the complex and
messy bits. The application will be expected to make sure a slot on
the master exists and is advanced before the corresponding slot on the
replica to protect required catalogs.

Then if there's agreement that it's the right way forward I can add
the catalog_xmin xlog'ing stuff as the next patch.

-- 
 Craig Ringer                   http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services
From b719c0b556a6823c9b48c0f4042aaf77a8d5f69e Mon Sep 17 00:00:00 2001
From: Craig Ringer <cr...@2ndquadrant.com>
Date: Wed, 22 Mar 2017 12:11:17 +0800
Subject: [PATCH 1/2] Allow GetOldestXmin to ignore replication slot xmin

For walsender to report replication slots' catalog_xmin separately, it's
necessary to be able to ask GetOldestXmin to ignore replication slots.
---
 contrib/pg_visibility/pg_visibility.c |  4 ++--
 contrib/pgstattuple/pgstatapprox.c    |  2 +-
 src/backend/access/transam/xlog.c     |  4 ++--
 src/backend/catalog/index.c           |  2 +-
 src/backend/commands/analyze.c        |  2 +-
 src/backend/commands/vacuum.c         |  4 ++--
 src/backend/replication/walreceiver.c |  2 +-
 src/backend/storage/ipc/procarray.c   | 36 +++++++++++++++++++++++++++--------
 src/include/storage/procarray.h       |  2 +-
 9 files changed, 39 insertions(+), 19 deletions(-)

diff --git a/contrib/pg_visibility/pg_visibility.c b/contrib/pg_visibility/pg_visibility.c
index d0f7618..6261e68 100644
--- a/contrib/pg_visibility/pg_visibility.c
+++ b/contrib/pg_visibility/pg_visibility.c
@@ -557,7 +557,7 @@ collect_corrupt_items(Oid relid, bool all_visible, bool all_frozen)
 	if (all_visible)
 	{
 		/* Don't pass rel; that will fail in recovery. */
-		OldestXmin = GetOldestXmin(NULL, true);
+		OldestXmin = GetOldestXmin(NULL, true, false);
 	}
 
 	rel = relation_open(relid, AccessShareLock);
@@ -674,7 +674,7 @@ collect_corrupt_items(Oid relid, bool all_visible, bool all_frozen)
 				 * a buffer lock. And this shouldn't happen often, so it's
 				 * worth being careful so as to avoid false positives.
 				 */
-				RecomputedOldestXmin = GetOldestXmin(NULL, true);
+				RecomputedOldestXmin = GetOldestXmin(NULL, true, false);
 
 				if (!TransactionIdPrecedes(OldestXmin, RecomputedOldestXmin))
 					record_corrupt_item(items, &tuple.t_self);
diff --git a/contrib/pgstattuple/pgstatapprox.c b/contrib/pgstattuple/pgstatapprox.c
index 8db1e20..743cbee 100644
--- a/contrib/pgstattuple/pgstatapprox.c
+++ b/contrib/pgstattuple/pgstatapprox.c
@@ -70,7 +70,7 @@ statapprox_heap(Relation rel, output_type *stat)
 	TransactionId OldestXmin;
 	uint64		misc_count = 0;
 
-	OldestXmin = GetOldestXmin(rel, true);
+	OldestXmin = GetOldestXmin(rel, true, false);
 	bstrategy = GetAccessStrategy(BAS_BULKREAD);
 
 	nblocks = RelationGetNumberOfBlocks(rel);
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 9480377..c2b4f2c 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -8895,7 +8895,7 @@ CreateCheckPoint(int flags)
 	 * StartupSUBTRANS hasn't been called yet.
 	 */
 	if (!RecoveryInProgress())
-		TruncateSUBTRANS(GetOldestXmin(NULL, false));
+		TruncateSUBTRANS(GetOldestXmin(NULL, false, false));
 
 	/* Real work is done, but log and update stats before releasing lock. */
 	LogCheckpointEnd(false);
@@ -9258,7 +9258,7 @@ CreateRestartPoint(int flags)
 	 * this because StartupSUBTRANS hasn't been called yet.
 	 */
 	if (EnableHotStandby)
-		TruncateSUBTRANS(GetOldestXmin(NULL, false));
+		TruncateSUBTRANS(GetOldestXmin(NULL, false, false));
 
 	/* Real work is done, but log and update before releasing lock. */
 	LogCheckpointEnd(true);
diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c
index 8d42a34..7ce7c8f 100644
--- a/src/backend/catalog/index.c
+++ b/src/backend/catalog/index.c
@@ -2270,7 +2270,7 @@ IndexBuildHeapRangeScan(Relation heapRelation,
 	{
 		snapshot = SnapshotAny;
 		/* okay to ignore lazy VACUUMs here */
-		OldestXmin = GetOldestXmin(heapRelation, true);
+		OldestXmin = GetOldestXmin(heapRelation, true, false);
 	}
 
 	scan = heap_beginscan_strat(heapRelation,	/* relation */
diff --git a/src/backend/commands/analyze.c b/src/backend/commands/analyze.c
index b91df98..0f166a0 100644
--- a/src/backend/commands/analyze.c
+++ b/src/backend/commands/analyze.c
@@ -1000,7 +1000,7 @@ acquire_sample_rows(Relation onerel, int elevel,
 	totalblocks = RelationGetNumberOfBlocks(onerel);
 
 	/* Need a cutoff xmin for HeapTupleSatisfiesVacuum */
-	OldestXmin = GetOldestXmin(onerel, true);
+	OldestXmin = GetOldestXmin(onerel, true, false);
 
 	/* Prepare for sampling block numbers */
 	BlockSampler_Init(&bs, totalblocks, targrows, random());
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index ff633fa..bdc7e16 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -527,7 +527,7 @@ vacuum_set_xid_limits(Relation rel,
 	 * always an independent transaction.
 	 */
 	*oldestXmin =
-		TransactionIdLimitedForOldSnapshots(GetOldestXmin(rel, true), rel);
+		TransactionIdLimitedForOldSnapshots(GetOldestXmin(rel, true, false), rel);
 
 	Assert(TransactionIdIsNormal(*oldestXmin));
 
@@ -939,7 +939,7 @@ vac_update_datfrozenxid(void)
 	 * committed pg_class entries for new tables; see AddNewRelationTuple().
 	 * So we cannot produce a wrong minimum by starting with this.
 	 */
-	newFrozenXid = GetOldestXmin(NULL, true);
+	newFrozenXid = GetOldestXmin(NULL, true, false);
 
 	/*
 	 * Similarly, initialize the MultiXact "min" with the value that would be
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 18d9d7e..b1ab8e0 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -1221,7 +1221,7 @@ XLogWalRcvSendHSFeedback(bool immed)
 	 * everything else has been checked.
 	 */
 	if (hot_standby_feedback)
-		xmin = GetOldestXmin(NULL, false);
+		xmin = GetOldestXmin(NULL, false, false);
 	else
 		xmin = InvalidTransactionId;
 
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index 0f8f435..63083c9 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -1292,17 +1292,22 @@ TransactionIdIsActive(TransactionId xid)
  * process can set its xmin based on transactions that are no longer running
  * in the master but are still being replayed on the standby, thus possibly
  * making the GetOldestXmin reading go backwards.  In this case there is a
- * possibility that we lose data that the standby would like to have, but
- * there is little we can do about that --- data is only protected if the
- * walsender runs continuously while queries are executed on the standby.
- * (The Hot Standby code deals with such cases by failing standby queries
- * that needed to access already-removed data, so there's no integrity bug.)
+ * possibility that we lose data that the standby would like to have
+ * unless the standby uses a replication slot to make its xmin persistent
+ * even when it isn't connected. The Hot Standby code deals with such cases by
+ * failing standby queries that needed to access already-removed data, so
+ * there's no integrity bug.
+ *
  * The return value is also adjusted with vacuum_defer_cleanup_age, so
  * increasing that setting on the fly is another easy way to make
  * GetOldestXmin() move backwards, with no consequences for data integrity.
+ *
+ * The caller may request that replication slots' catalog_xmin values be
+ * disregarded when calculating the global xmin. The caller must account
+ * for catalog_xmin separately.
  */
 TransactionId
-GetOldestXmin(Relation rel, bool ignoreVacuum)
+GetOldestXmin(Relation rel, bool ignoreVacuum, bool ignoreCatalogXmin)
 {
 	ProcArrayStruct *arrayP = procArray;
 	TransactionId result;
@@ -1376,7 +1381,9 @@ GetOldestXmin(Relation rel, bool ignoreVacuum)
 		}
 	}
 
-	/* fetch into volatile var while ProcArrayLock is held */
+	/*
+	 * Fetch slot xmins into volatile var while ProcArrayLock is held.
+	 */
 	replication_slot_xmin = procArray->replication_slot_xmin;
 	replication_slot_catalog_xmin = procArray->replication_slot_catalog_xmin;
 
@@ -1430,11 +1437,24 @@ GetOldestXmin(Relation rel, bool ignoreVacuum)
 	/*
 	 * After locks have been released and defer_cleanup_age has been applied,
 	 * check whether we need to back up further to make logical decoding
+	 * safe. We need to do so if we're computing the global limit (rel =
+	 * NULL) or if the passed relation is a catalog relation of some kind,
+	 * unless the caller asked us not to.
+	 */
+	if (!ignoreCatalogXmin &&
+		(rel == NULL || RelationIsAccessibleInLogicalDecoding(rel)) &&
+		TransactionIdIsValid(replication_slot_catalog_xmin) &&
+		NormalTransactionIdPrecedes(replication_slot_catalog_xmin, result))
+		result = replication_slot_catalog_xmin;
+
+	/*
+	 * After locks have been released and defer_cleanup_age has been applied,
+	 * check whether we need to back up further to make logical decoding
 	 * possible. We need to do so if we're computing the global limit (rel =
 	 * NULL) or if the passed relation is a catalog relation of some kind.
 	 */
 	if ((rel == NULL ||
-		 RelationIsAccessibleInLogicalDecoding(rel)) &&
+		RelationIsAccessibleInLogicalDecoding(rel)) &&
 		TransactionIdIsValid(replication_slot_catalog_xmin) &&
 		NormalTransactionIdPrecedes(replication_slot_catalog_xmin, result))
 		result = replication_slot_catalog_xmin;
diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h
index 9d5a13e..21d022f 100644
--- a/src/include/storage/procarray.h
+++ b/src/include/storage/procarray.h
@@ -53,7 +53,7 @@ extern RunningTransactions GetRunningTransactionData(void);
 
 extern bool TransactionIdIsInProgress(TransactionId xid);
 extern bool TransactionIdIsActive(TransactionId xid);
-extern TransactionId GetOldestXmin(Relation rel, bool ignoreVacuum);
+extern TransactionId GetOldestXmin(Relation rel, bool ignoreVacuum, bool ignoreCatalogXmin);
 extern TransactionId GetOldestActiveTransactionId(void);
 extern TransactionId GetOldestSafeDecodingTransactionId(void);
 
-- 
2.5.5

From e303c2f8706c7a54460ab66fd2d1d0196361a99a Mon Sep 17 00:00:00 2001
From: Craig Ringer <cr...@2ndquadrant.com>
Date: Wed, 22 Mar 2017 12:29:13 +0800
Subject: [PATCH 2/2] Report catalog_xmin separately to xmin in hot standby
 feedback

The catalog_xmin of slots on a standby was reported as part of the standby's
xmin, causing the master's xmin to be held down. This could cause considerable
unnecessary bloat on the master.

Instead, report catalog_xmin as a separate field in hot_standby_feedback. If
the upstream walsender is using a physical replication slot, store the
catalog_xmin in the slot's catalog_xmin field. If the upstream doesn't use a
slot and has only a PGPROC entry behaviour doesn't change, as we store the
combined xmin and catalog_xmin in the PGPROC entry.

There's no backward compatibility concern here, as nothing except another
postgres instance of the same major version has any business sending hot
standby feedback and it's only used on the physical replication protocol.

e Please enter the commit message for your changes. Lines starting
---
 doc/src/sgml/protocol.sgml                         |  33 ++++++-
 src/backend/replication/walreceiver.c              |  43 ++++++--
 src/backend/replication/walsender.c                | 110 +++++++++++++++------
 .../recovery/t/010_logical_decoding_timelines.pl   |  38 ++++++-
 4 files changed, 175 insertions(+), 49 deletions(-)

diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index 244e381..d8786f0 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -1911,10 +1911,11 @@ The commands accepted in walsender mode are:
       </term>
       <listitem>
       <para>
-          The standby's current xmin. This may be 0, if the standby is
-          sending notification that Hot Standby feedback will no longer
-          be sent on this connection. Later non-zero messages may
-          reinitiate the feedback mechanism.
+          The standby's current global xmin, excluding the catalog_xmin from any
+          replication slots. If both this value and the following
+          catalog_xmin are 0 this is treated as a notification that Hot Standby
+          feedback will no longer be sent on this connection. Later non-zero
+          messages may reinitiate the feedback mechanism.
       </para>
       </listitem>
       </varlistentry>
@@ -1924,7 +1925,29 @@ The commands accepted in walsender mode are:
       </term>
       <listitem>
       <para>
-          The standby's current epoch.
+          The epoch of the global xmin xid on the standby.
+      </para>
+      </listitem>
+      </varlistentry>
+      <varlistentry>
+      <term>
+          Int32
+      </term>
+      <listitem>
+      <para>
+          The lowest catalog_xmin of any replication slots on the standby. Set to 0
+          if no catalog_xmin exists on the standby or if hot standby feedback is being
+          disabled.
+      </para>
+      </listitem>
+      </varlistentry>
+      <varlistentry>
+      <term>
+          Int32
+      </term>
+      <listitem>
+      <para>
+          The epoch of the catalog_xmin xid on the standby.
       </para>
       </listitem>
       </varlistentry>
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index b1ab8e0..60c1aba 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -1175,8 +1175,8 @@ XLogWalRcvSendHSFeedback(bool immed)
 {
 	TimestampTz now;
 	TransactionId nextXid;
-	uint32		nextEpoch;
-	TransactionId xmin;
+	uint32		xmin_epoch, catalog_xmin_epoch;
+	TransactionId xmin, catalog_xmin;
 	static TimestampTz sendTime = 0;
 	/* initially true so we always send at least one feedback message */
 	static bool master_has_standby_xmin = true;
@@ -1221,29 +1221,54 @@ XLogWalRcvSendHSFeedback(bool immed)
 	 * everything else has been checked.
 	 */
 	if (hot_standby_feedback)
-		xmin = GetOldestXmin(NULL, false, false);
+	{
+		/*
+		 * Usually GetOldestXmin() would include the catalog_xmin in its
+		 * calculations, but we don't want to hold upstream back from vacuuming
+		 * normal user table tuples just because they're within the
+		 * catalog_xmin horizon of logical replication slots on this standby.
+		 * Instead we report the catalog_xmin to the upstream separately.
+		 */
+		xmin = GetOldestXmin(NULL,
+							 false, /* don't ignore vacuum */
+							 true /* ignore catalog xmin */);
+
+		/*
+		 * Obtain catalog_xmin to send separately, so the walsender can store
+		 * it on a physical slot's catalog_xmin if one is in use.
+		 */
+		ProcArrayGetReplicationSlotXmin(NULL, &catalog_xmin);
+	}
 	else
+	{
 		xmin = InvalidTransactionId;
+		catalog_xmin = InvalidTransactionId;
+	}
 
 	/*
 	 * Get epoch and adjust if nextXid and oldestXmin are different sides of
 	 * the epoch boundary.
 	 */
-	GetNextXidAndEpoch(&nextXid, &nextEpoch);
+	GetNextXidAndEpoch(&nextXid, &xmin_epoch);
+	catalog_xmin_epoch = xmin_epoch;
 	if (nextXid < xmin)
-		nextEpoch--;
+		xmin_epoch --;
+	if (nextXid < catalog_xmin)
+		catalog_xmin_epoch --;
 
-	elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u",
-		 xmin, nextEpoch);
+	elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u catalog_xmin %u catalog_xmin_epoch %u",
+		 xmin, xmin_epoch, catalog_xmin, catalog_xmin_epoch);
 
 	/* Construct the message and send it. */
 	resetStringInfo(&reply_message);
 	pq_sendbyte(&reply_message, 'h');
 	pq_sendint64(&reply_message, GetCurrentTimestamp());
 	pq_sendint(&reply_message, xmin, 4);
-	pq_sendint(&reply_message, nextEpoch, 4);
+	pq_sendint(&reply_message, xmin_epoch, 4);
+	pq_sendint(&reply_message, catalog_xmin, 4);
+	pq_sendint(&reply_message, catalog_xmin_epoch, 4);
 	walrcv_send(wrconn, reply_message.data, reply_message.len);
-	if (TransactionIdIsValid(xmin))
+	if (TransactionIdIsValid(xmin) || TransactionIdIsValid(catalog_xmin))
 		master_has_standby_xmin = true;
 	else
 		master_has_standby_xmin = false;
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 7561770..05b51a0 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -221,6 +221,7 @@ static long WalSndComputeSleeptime(TimestampTz now);
 static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
 static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
 static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
+static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
 
 static void XLogRead(char *buf, XLogRecPtr startptr, Size count);
 
@@ -1605,7 +1606,7 @@ ProcessStandbyReplyMessage(void)
 
 /* compute new replication slot xmin horizon if needed */
 static void
-PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin)
+PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
 {
 	bool		changed = false;
 	ReplicationSlot *slot = MyReplicationSlot;
@@ -1626,6 +1627,14 @@ PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin)
 		slot->data.xmin = feedbackXmin;
 		slot->effective_xmin = feedbackXmin;
 	}
+	if (!TransactionIdIsNormal(slot->data.catalog_xmin) ||
+		!TransactionIdIsNormal(feedbackCatalogXmin) ||
+		TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin))
+	{
+		changed = true;
+		slot->data.catalog_xmin = feedbackCatalogXmin;
+		slot->effective_catalog_xmin = feedbackCatalogXmin;
+	}
 	SpinLockRelease(&slot->mutex);
 
 	if (changed)
@@ -1636,59 +1645,92 @@ PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin)
 }
 
 /*
- * Hot Standby feedback
+ * Check that the provided xmin/epoch are sane, that is, not in the future
+ * and not so far back as to be already wrapped around.
+ *
+ * Epoch of nextXid should be same as standby, or if the counter has
+ * wrapped, then one greater than standby.
+ *
+ * This check doesn't care about whether clog exists for these xids
+ * at all.
  */
-static void
-ProcessStandbyHSFeedbackMessage(void)
+static bool
+TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
 {
 	TransactionId nextXid;
 	uint32		nextEpoch;
+
+	GetNextXidAndEpoch(&nextXid, &nextEpoch);
+
+	if (xid <= nextXid)
+	{
+		if (epoch != nextEpoch)
+			return false;
+	}
+	else
+	{
+		if (epoch + 1 != nextEpoch)
+			return false;
+	}
+
+	if (!TransactionIdPrecedesOrEquals(xid, nextXid))
+		return false;				/* epoch OK, but it's wrapped around */
+
+	return true;
+}
+
+/*
+ * Hot Standby feedback
+ */
+static void
+ProcessStandbyHSFeedbackMessage(void)
+{
 	TransactionId feedbackXmin;
 	uint32		feedbackEpoch;
+	TransactionId feedbackCatalogXmin;
+	uint32		feedbackCatalogEpoch;
 
 	/*
 	 * Decipher the reply message. The caller already consumed the msgtype
-	 * byte.
+	 * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
+	 * of this message.
 	 */
 	(void) pq_getmsgint64(&reply_message);		/* sendTime; not used ATM */
 	feedbackXmin = pq_getmsgint(&reply_message, 4);
 	feedbackEpoch = pq_getmsgint(&reply_message, 4);
+	feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
+	feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);
 
-	elog(DEBUG2, "hot standby feedback xmin %u epoch %u",
+	elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u",
 		 feedbackXmin,
-		 feedbackEpoch);
+		 feedbackEpoch,
+		 feedbackCatalogXmin,
+		 feedbackCatalogEpoch);
 
-	/* Unset WalSender's xmin if the feedback message value is invalid */
-	if (!TransactionIdIsNormal(feedbackXmin))
+	/*
+	 * Unset WalSender's xmins if the feedback message values are invalid.
+	 * This happens when the downstream turned hot_standby_feedback off.
+	 */
+	if (!TransactionIdIsNormal(feedbackXmin)
+		&& !TransactionIdIsNormal(feedbackCatalogXmin))
 	{
 		MyPgXact->xmin = InvalidTransactionId;
 		if (MyReplicationSlot != NULL)
-			PhysicalReplicationSlotNewXmin(feedbackXmin);
+			PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
 		return;
 	}
 
 	/*
 	 * Check that the provided xmin/epoch are sane, that is, not in the future
 	 * and not so far back as to be already wrapped around.  Ignore if not.
-	 *
-	 * Epoch of nextXid should be same as standby, or if the counter has
-	 * wrapped, then one greater than standby.
 	 */
-	GetNextXidAndEpoch(&nextXid, &nextEpoch);
+	if (TransactionIdIsNormal(feedbackXmin) &&
+		!TransactionIdInRecentPast(feedbackXmin, feedbackEpoch))
+		return;
 
-	if (feedbackXmin <= nextXid)
-	{
-		if (feedbackEpoch != nextEpoch)
-			return;
-	}
-	else
-	{
-		if (feedbackEpoch + 1 != nextEpoch)
-			return;
-	}
-
-	if (!TransactionIdPrecedesOrEquals(feedbackXmin, nextXid))
-		return;					/* epoch OK, but it's wrapped around */
+	if (TransactionIdIsNormal(feedbackCatalogXmin) &&
+		!TransactionIdInRecentPast(feedbackCatalogXmin, feedbackCatalogEpoch))
+		return;
 
 	/*
 	 * Set the WalSender's xmin equal to the standby's requested xmin, so that
@@ -1713,15 +1755,23 @@ ProcessStandbyHSFeedbackMessage(void)
 	 * already since a VACUUM could have just finished calling GetOldestXmin.)
 	 *
 	 * If we're using a replication slot we reserve the xmin via that,
-	 * otherwise via the walsender's PGXACT entry.
+	 * otherwise via the walsender's PGXACT entry. We can only track the
+	 * catalog xmin separately when using a slot, so we store the least
+	 * of the two provided when not using a slot.
 	 *
 	 * XXX: It might make sense to generalize the ephemeral slot concept and
 	 * always use the slot mechanism to handle the feedback xmin.
 	 */
 	if (MyReplicationSlot != NULL)		/* XXX: persistency configurable? */
-		PhysicalReplicationSlotNewXmin(feedbackXmin);
+		PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
 	else
-		MyPgXact->xmin = feedbackXmin;
+	{
+		if (TransactionIdIsNormal(feedbackCatalogXmin)
+			&& TransactionIdPrecedes(feedbackCatalogXmin, feedbackXmin))
+			MyPgXact->xmin = feedbackCatalogXmin;
+		else
+			MyPgXact->xmin = feedbackXmin;
+	}
 }
 
 /*
diff --git a/src/test/recovery/t/010_logical_decoding_timelines.pl b/src/test/recovery/t/010_logical_decoding_timelines.pl
index 09830dc..4561a06 100644
--- a/src/test/recovery/t/010_logical_decoding_timelines.pl
+++ b/src/test/recovery/t/010_logical_decoding_timelines.pl
@@ -20,7 +20,7 @@ use warnings;
 
 use PostgresNode;
 use TestLib;
-use Test::More tests => 7;
+use Test::More tests => 10;
 use RecursiveCopy;
 use File::Copy;
 use IPC::Run ();
@@ -31,10 +31,14 @@ my ($stdout, $stderr, $ret);
 # Initialize master node
 my $node_master = get_new_node('master');
 $node_master->init(allows_streaming => 1, has_archiving => 1);
-$node_master->append_conf('postgresql.conf', "wal_level = 'logical'\n");
-$node_master->append_conf('postgresql.conf', "max_replication_slots = 2\n");
-$node_master->append_conf('postgresql.conf', "max_wal_senders = 2\n");
-$node_master->append_conf('postgresql.conf', "log_min_messages = 'debug2'\n");
+$node_master->append_conf('postgresql.conf', q[
+wal_level = 'logical'
+max_replication_slots = 3
+max_wal_senders = 2
+log_min_messages = 'debug2'
+hot_standby_feedback = on
+wal_receiver_status_interval = 1
+]);
 $node_master->dump_info;
 $node_master->start;
 
@@ -51,11 +55,17 @@ $node_master->safe_psql('postgres', 'CHECKPOINT;');
 my $backup_name = 'b1';
 $node_master->backup_fs_hot($backup_name);
 
+$node_master->safe_psql('postgres',
+	q[SELECT pg_create_physical_replication_slot('phys_slot');]);
+
 my $node_replica = get_new_node('replica');
 $node_replica->init_from_backup(
 	$node_master, $backup_name,
 	has_streaming => 1,
 	has_restoring => 1);
+$node_replica->append_conf(
+	'recovery.conf', q[primary_slot_name = 'phys_slot']);
+
 $node_replica->start;
 
 $node_master->safe_psql('postgres',
@@ -71,6 +81,24 @@ $stdout = $node_replica->safe_psql('postgres',
 is($stdout, 'before_basebackup',
 	'Expected to find only slot before_basebackup on replica');
 
+# Examine the physical slot the replica uses to stream changes
+# from the master to make sure its hot_standby_feedback
+# has locked in a catalog_xmin on the physical slot, and that
+# any xmin is < the catalog_xmin
+$node_master->poll_query_until('postgres', q[
+	SELECT catalog_xmin IS NOT NULL
+	FROM pg_replication_slots
+	WHERE slot_name = 'phys_slot'
+	]);
+my $phys_slot = $node_master->slot('phys_slot');
+isnt($phys_slot->{'xmin'}, '',
+	'xmin assigned on physical slot of master');
+isnt($phys_slot->{'catalog_xmin'}, '',
+	'catalog_xmin assigned on physical slot of master');
+# Ignore wrap-around here, we're on a new cluster:
+cmp_ok($phys_slot->{'xmin'}, '>=', $phys_slot->{'catalog_xmin'},
+	   'xmin on physical slot must not be lower than catalog_xmin');
+
 # Boom, crash
 $node_master->stop('immediate');
 
-- 
2.5.5

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to