On 23 March 2017 at 00:13, Simon Riggs <simon.ri...@2ndquadrant.com> wrote:
> On 22 March 2017 at 08:53, Craig Ringer <cr...@2ndquadrant.com> wrote:
>
>> I'm splitting up the rest of the decoding on standby patch set with
>> the goal of getting minimal functionality for creating and managing
>> slots on standbys in, so we can maintain slots on standbys and use
>> them when the standby is promoted to master.
>>
>> The first, to send catalog_xmin separately to the global xmin on
>> hot_standby_feedback and store it in the upstream physical slot's
>> catalog_xmin, is attached.
>>
>> These are extracted directly from the logical decoding on standby
>> patch, with comments by Petr and Andres made re the relevant code
>> addressed.
>
> I've reduced your two patches back to one with a smaller blast radius.
>
> I'll commit this tomorrow morning, barring objections.

This needs rebasing on top of

commit af4b1a0869bd3bb52e5f662e4491554b7f611489
Author: Simon Riggs <si...@2ndquadrant.com>
Date:   Wed Mar 22 16:51:01 2017 +0000

    Refactor GetOldestXmin() to use flags

    Replace ignoreVacuum parameter with more flexible flags.

    Author: Eiji Seki
    Review: Haribabu Kommi


That patch landed up using PROCARRAY flags directly as flags to
GetOldestXmin, so it doesn't make much sense to add a flag like
PROCARRAY_REPLICATION_SLOTS . There won't be any corresponding PROC_
flag for PGXACT->vacuumFlags, replication slot xmin and catalog_xmin
are global state not tracked in individual proc entries.

Rather than add some kind of "PROC_RESERVED" flag in proc.h that would
never be used and only exist to reserve a bit for use for
PROCARRAY_REPLICATION_SLOTS, which we'd use a flag to GetOldestXmin, I
added a new argument to GetOldestXmin like the prior patch did.

If preferred I can instead add

proc.h:

#define PROC_RESERVED 0x20

procarray.h:

#define PROCARRAY_REPLICATION_SLOTS 0x20

and then test for (flags & PROCARRAY_REPLICATION_SLOTS)

but that's kind of ugly to say the least, I'd rather just add another argument.

-- 
 Craig Ringer                   http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services
From ffa43fae35857dbff0efe83ef199df165d887d97 Mon Sep 17 00:00:00 2001
From: Craig Ringer <cr...@2ndquadrant.com>
Date: Wed, 22 Mar 2017 12:29:13 +0800
Subject: [PATCH] Report catalog_xmin separately to xmin in hot standby
 feedback

The catalog_xmin of slots on a standby was reported as part of the standby's
xmin, causing the master's xmin to be held down. This could cause considerable
unnecessary bloat on the master.

Instead, report catalog_xmin as a separate field in hot_standby_feedback. If
the upstream walsender is using a physical replication slot, store the
catalog_xmin in the slot's catalog_xmin field. If the upstream doesn't use a
slot and has only a PGPROC entry behaviour doesn't change, as we store the
combined xmin and catalog_xmin in the PGPROC entry.

There's no backward compatibility concern here, as nothing except another
postgres instance of the same major version has any business sending hot
standby feedback and it's only used on the physical replication protocol.
---
 contrib/pg_visibility/pg_visibility.c              |   6 +-
 contrib/pgstattuple/pgstatapprox.c                 |   2 +-
 doc/src/sgml/protocol.sgml                         |  33 ++++++-
 src/backend/access/transam/xlog.c                  |   4 +-
 src/backend/catalog/index.c                        |   3 +-
 src/backend/commands/analyze.c                     |   2 +-
 src/backend/commands/vacuum.c                      |   5 +-
 src/backend/replication/walreceiver.c              |  44 +++++++--
 src/backend/replication/walsender.c                | 110 +++++++++++++++------
 src/backend/storage/ipc/procarray.c                |  11 ++-
 src/include/storage/procarray.h                    |   2 +-
 .../recovery/t/010_logical_decoding_timelines.pl   |  38 ++++++-
 12 files changed, 198 insertions(+), 62 deletions(-)

diff --git a/contrib/pg_visibility/pg_visibility.c b/contrib/pg_visibility/pg_visibility.c
index ee3936e..2db5762 100644
--- a/contrib/pg_visibility/pg_visibility.c
+++ b/contrib/pg_visibility/pg_visibility.c
@@ -557,7 +557,7 @@ collect_corrupt_items(Oid relid, bool all_visible, bool all_frozen)
 	if (all_visible)
 	{
 		/* Don't pass rel; that will fail in recovery. */
-		OldestXmin = GetOldestXmin(NULL, PROCARRAY_FLAGS_VACUUM);
+		OldestXmin = GetOldestXmin(NULL, PROCARRAY_FLAGS_VACUUM, false);
 	}
 
 	rel = relation_open(relid, AccessShareLock);
@@ -674,7 +674,9 @@ collect_corrupt_items(Oid relid, bool all_visible, bool all_frozen)
 				 * a buffer lock. And this shouldn't happen often, so it's
 				 * worth being careful so as to avoid false positives.
 				 */
-				RecomputedOldestXmin = GetOldestXmin(NULL, PROCARRAY_FLAGS_VACUUM);
+				RecomputedOldestXmin = GetOldestXmin(NULL,
+													 PROCARRAY_FLAGS_VACUUM,
+													 false);
 
 				if (!TransactionIdPrecedes(OldestXmin, RecomputedOldestXmin))
 					record_corrupt_item(items, &tuple.t_self);
diff --git a/contrib/pgstattuple/pgstatapprox.c b/contrib/pgstattuple/pgstatapprox.c
index 46c167a..5d0eda3 100644
--- a/contrib/pgstattuple/pgstatapprox.c
+++ b/contrib/pgstattuple/pgstatapprox.c
@@ -70,7 +70,7 @@ statapprox_heap(Relation rel, output_type *stat)
 	TransactionId OldestXmin;
 	uint64		misc_count = 0;
 
-	OldestXmin = GetOldestXmin(rel, PROCARRAY_FLAGS_VACUUM);
+	OldestXmin = GetOldestXmin(rel, PROCARRAY_FLAGS_VACUUM, false);
 	bstrategy = GetAccessStrategy(BAS_BULKREAD);
 
 	nblocks = RelationGetNumberOfBlocks(rel);
diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index 244e381..d8786f0 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -1911,10 +1911,11 @@ The commands accepted in walsender mode are:
       </term>
       <listitem>
       <para>
-          The standby's current xmin. This may be 0, if the standby is
-          sending notification that Hot Standby feedback will no longer
-          be sent on this connection. Later non-zero messages may
-          reinitiate the feedback mechanism.
+          The standby's current global xmin, excluding the catalog_xmin from any
+          replication slots. If both this value and the following
+          catalog_xmin are 0 this is treated as a notification that Hot Standby
+          feedback will no longer be sent on this connection. Later non-zero
+          messages may reinitiate the feedback mechanism.
       </para>
       </listitem>
       </varlistentry>
@@ -1924,7 +1925,29 @@ The commands accepted in walsender mode are:
       </term>
       <listitem>
       <para>
-          The standby's current epoch.
+          The epoch of the global xmin xid on the standby.
+      </para>
+      </listitem>
+      </varlistentry>
+      <varlistentry>
+      <term>
+          Int32
+      </term>
+      <listitem>
+      <para>
+          The lowest catalog_xmin of any replication slots on the standby. Set to 0
+          if no catalog_xmin exists on the standby or if hot standby feedback is being
+          disabled.
+      </para>
+      </listitem>
+      </varlistentry>
+      <varlistentry>
+      <term>
+          Int32
+      </term>
+      <listitem>
+      <para>
+          The epoch of the catalog_xmin xid on the standby.
       </para>
       </listitem>
       </varlistentry>
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index ff4cf3a..ed5ee90 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -8895,7 +8895,7 @@ CreateCheckPoint(int flags)
 	 * StartupSUBTRANS hasn't been called yet.
 	 */
 	if (!RecoveryInProgress())
-		TruncateSUBTRANS(GetOldestXmin(NULL, PROCARRAY_FLAGS_DEFAULT));
+		TruncateSUBTRANS(GetOldestXmin(NULL, PROCARRAY_FLAGS_DEFAULT, false));
 
 	/* Real work is done, but log and update stats before releasing lock. */
 	LogCheckpointEnd(false);
@@ -9258,7 +9258,7 @@ CreateRestartPoint(int flags)
 	 * this because StartupSUBTRANS hasn't been called yet.
 	 */
 	if (EnableHotStandby)
-		TruncateSUBTRANS(GetOldestXmin(NULL, PROCARRAY_FLAGS_DEFAULT));
+		TruncateSUBTRANS(GetOldestXmin(NULL, PROCARRAY_FLAGS_DEFAULT, false));
 
 	/* Real work is done, but log and update before releasing lock. */
 	LogCheckpointEnd(true);
diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c
index 7924c30..69ce21c 100644
--- a/src/backend/catalog/index.c
+++ b/src/backend/catalog/index.c
@@ -2270,7 +2270,8 @@ IndexBuildHeapRangeScan(Relation heapRelation,
 	{
 		snapshot = SnapshotAny;
 		/* okay to ignore lazy VACUUMs here */
-		OldestXmin = GetOldestXmin(heapRelation, PROCARRAY_FLAGS_VACUUM);
+		OldestXmin = GetOldestXmin(heapRelation, PROCARRAY_FLAGS_VACUUM,
+								   false);
 	}
 
 	scan = heap_beginscan_strat(heapRelation,	/* relation */
diff --git a/src/backend/commands/analyze.c b/src/backend/commands/analyze.c
index 055338f..b005dc1 100644
--- a/src/backend/commands/analyze.c
+++ b/src/backend/commands/analyze.c
@@ -1000,7 +1000,7 @@ acquire_sample_rows(Relation onerel, int elevel,
 	totalblocks = RelationGetNumberOfBlocks(onerel);
 
 	/* Need a cutoff xmin for HeapTupleSatisfiesVacuum */
-	OldestXmin = GetOldestXmin(onerel, PROCARRAY_FLAGS_VACUUM);
+	OldestXmin = GetOldestXmin(onerel, PROCARRAY_FLAGS_VACUUM, false);
 
 	/* Prepare for sampling block numbers */
 	BlockSampler_Init(&bs, totalblocks, targrows, random());
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index d8ae3e1..d8722c7 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -527,7 +527,8 @@ vacuum_set_xid_limits(Relation rel,
 	 * always an independent transaction.
 	 */
 	*oldestXmin =
-		TransactionIdLimitedForOldSnapshots(GetOldestXmin(rel, PROCARRAY_FLAGS_VACUUM), rel);
+		TransactionIdLimitedForOldSnapshots(
+			GetOldestXmin(rel, PROCARRAY_FLAGS_VACUUM, false), rel);
 
 	Assert(TransactionIdIsNormal(*oldestXmin));
 
@@ -939,7 +940,7 @@ vac_update_datfrozenxid(void)
 	 * committed pg_class entries for new tables; see AddNewRelationTuple().
 	 * So we cannot produce a wrong minimum by starting with this.
 	 */
-	newFrozenXid = GetOldestXmin(NULL, PROCARRAY_FLAGS_VACUUM);
+	newFrozenXid = GetOldestXmin(NULL, PROCARRAY_FLAGS_VACUUM, false);
 
 	/*
 	 * Similarly, initialize the MultiXact "min" with the value that would be
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 31c567b..06199ad 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -1175,8 +1175,8 @@ XLogWalRcvSendHSFeedback(bool immed)
 {
 	TimestampTz now;
 	TransactionId nextXid;
-	uint32		nextEpoch;
-	TransactionId xmin;
+	uint32		xmin_epoch, catalog_xmin_epoch;
+	TransactionId xmin, catalog_xmin;
 	static TimestampTz sendTime = 0;
 	/* initially true so we always send at least one feedback message */
 	static bool master_has_standby_xmin = true;
@@ -1221,29 +1221,55 @@ XLogWalRcvSendHSFeedback(bool immed)
 	 * everything else has been checked.
 	 */
 	if (hot_standby_feedback)
-		xmin = GetOldestXmin(NULL, PROCARRAY_FLAGS_DEFAULT);
+	{
+		TransactionId slot_xmin;
+
+		/*
+		 * Usually GetOldestXmin() would include the global replication slot
+		 * xmin and catalog_xmin in its calculations, but we don't want to hold
+		 * upstream back from vacuuming normal user table tuples just because
+		 * they're within the catalog_xmin horizon of logical replication slots
+		 * on this standby, so we ignore slot xmin and catalog_xmin GetOldestXmin
+		 * then deal with them ourselves.
+		 */
+		xmin = GetOldestXmin(NULL, PROCARRAY_FLAGS_DEFAULT, true);
+
+		ProcArrayGetReplicationSlotXmin(&slot_xmin, &catalog_xmin);
+
+		if (TransactionIdIsValid(slot_xmin) &&
+			TransactionIdPrecedes(slot_xmin, xmin))
+			xmin = slot_xmin;
+	}
 	else
+	{
 		xmin = InvalidTransactionId;
+		catalog_xmin = InvalidTransactionId;
+	}
 
 	/*
 	 * Get epoch and adjust if nextXid and oldestXmin are different sides of
 	 * the epoch boundary.
 	 */
-	GetNextXidAndEpoch(&nextXid, &nextEpoch);
+	GetNextXidAndEpoch(&nextXid, &xmin_epoch);
+	catalog_xmin_epoch = xmin_epoch;
 	if (nextXid < xmin)
-		nextEpoch--;
+		xmin_epoch --;
+	if (nextXid < catalog_xmin)
+		catalog_xmin_epoch --;
 
-	elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u",
-		 xmin, nextEpoch);
+	elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u catalog_xmin %u catalog_xmin_epoch %u",
+		 xmin, xmin_epoch, catalog_xmin, catalog_xmin_epoch);
 
 	/* Construct the message and send it. */
 	resetStringInfo(&reply_message);
 	pq_sendbyte(&reply_message, 'h');
 	pq_sendint64(&reply_message, GetCurrentTimestamp());
 	pq_sendint(&reply_message, xmin, 4);
-	pq_sendint(&reply_message, nextEpoch, 4);
+	pq_sendint(&reply_message, xmin_epoch, 4);
+	pq_sendint(&reply_message, catalog_xmin, 4);
+	pq_sendint(&reply_message, catalog_xmin_epoch, 4);
 	walrcv_send(wrconn, reply_message.data, reply_message.len);
-	if (TransactionIdIsValid(xmin))
+	if (TransactionIdIsValid(xmin) || TransactionIdIsValid(catalog_xmin))
 		master_has_standby_xmin = true;
 	else
 		master_has_standby_xmin = false;
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 7561770..05b51a0 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -221,6 +221,7 @@ static long WalSndComputeSleeptime(TimestampTz now);
 static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
 static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
 static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
+static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
 
 static void XLogRead(char *buf, XLogRecPtr startptr, Size count);
 
@@ -1605,7 +1606,7 @@ ProcessStandbyReplyMessage(void)
 
 /* compute new replication slot xmin horizon if needed */
 static void
-PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin)
+PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
 {
 	bool		changed = false;
 	ReplicationSlot *slot = MyReplicationSlot;
@@ -1626,6 +1627,14 @@ PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin)
 		slot->data.xmin = feedbackXmin;
 		slot->effective_xmin = feedbackXmin;
 	}
+	if (!TransactionIdIsNormal(slot->data.catalog_xmin) ||
+		!TransactionIdIsNormal(feedbackCatalogXmin) ||
+		TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin))
+	{
+		changed = true;
+		slot->data.catalog_xmin = feedbackCatalogXmin;
+		slot->effective_catalog_xmin = feedbackCatalogXmin;
+	}
 	SpinLockRelease(&slot->mutex);
 
 	if (changed)
@@ -1636,59 +1645,92 @@ PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin)
 }
 
 /*
- * Hot Standby feedback
+ * Check that the provided xmin/epoch are sane, that is, not in the future
+ * and not so far back as to be already wrapped around.
+ *
+ * Epoch of nextXid should be same as standby, or if the counter has
+ * wrapped, then one greater than standby.
+ *
+ * This check doesn't care about whether clog exists for these xids
+ * at all.
  */
-static void
-ProcessStandbyHSFeedbackMessage(void)
+static bool
+TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
 {
 	TransactionId nextXid;
 	uint32		nextEpoch;
+
+	GetNextXidAndEpoch(&nextXid, &nextEpoch);
+
+	if (xid <= nextXid)
+	{
+		if (epoch != nextEpoch)
+			return false;
+	}
+	else
+	{
+		if (epoch + 1 != nextEpoch)
+			return false;
+	}
+
+	if (!TransactionIdPrecedesOrEquals(xid, nextXid))
+		return false;				/* epoch OK, but it's wrapped around */
+
+	return true;
+}
+
+/*
+ * Hot Standby feedback
+ */
+static void
+ProcessStandbyHSFeedbackMessage(void)
+{
 	TransactionId feedbackXmin;
 	uint32		feedbackEpoch;
+	TransactionId feedbackCatalogXmin;
+	uint32		feedbackCatalogEpoch;
 
 	/*
 	 * Decipher the reply message. The caller already consumed the msgtype
-	 * byte.
+	 * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
+	 * of this message.
 	 */
 	(void) pq_getmsgint64(&reply_message);		/* sendTime; not used ATM */
 	feedbackXmin = pq_getmsgint(&reply_message, 4);
 	feedbackEpoch = pq_getmsgint(&reply_message, 4);
+	feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
+	feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);
 
-	elog(DEBUG2, "hot standby feedback xmin %u epoch %u",
+	elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u",
 		 feedbackXmin,
-		 feedbackEpoch);
+		 feedbackEpoch,
+		 feedbackCatalogXmin,
+		 feedbackCatalogEpoch);
 
-	/* Unset WalSender's xmin if the feedback message value is invalid */
-	if (!TransactionIdIsNormal(feedbackXmin))
+	/*
+	 * Unset WalSender's xmins if the feedback message values are invalid.
+	 * This happens when the downstream turned hot_standby_feedback off.
+	 */
+	if (!TransactionIdIsNormal(feedbackXmin)
+		&& !TransactionIdIsNormal(feedbackCatalogXmin))
 	{
 		MyPgXact->xmin = InvalidTransactionId;
 		if (MyReplicationSlot != NULL)
-			PhysicalReplicationSlotNewXmin(feedbackXmin);
+			PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
 		return;
 	}
 
 	/*
 	 * Check that the provided xmin/epoch are sane, that is, not in the future
 	 * and not so far back as to be already wrapped around.  Ignore if not.
-	 *
-	 * Epoch of nextXid should be same as standby, or if the counter has
-	 * wrapped, then one greater than standby.
 	 */
-	GetNextXidAndEpoch(&nextXid, &nextEpoch);
+	if (TransactionIdIsNormal(feedbackXmin) &&
+		!TransactionIdInRecentPast(feedbackXmin, feedbackEpoch))
+		return;
 
-	if (feedbackXmin <= nextXid)
-	{
-		if (feedbackEpoch != nextEpoch)
-			return;
-	}
-	else
-	{
-		if (feedbackEpoch + 1 != nextEpoch)
-			return;
-	}
-
-	if (!TransactionIdPrecedesOrEquals(feedbackXmin, nextXid))
-		return;					/* epoch OK, but it's wrapped around */
+	if (TransactionIdIsNormal(feedbackCatalogXmin) &&
+		!TransactionIdInRecentPast(feedbackCatalogXmin, feedbackCatalogEpoch))
+		return;
 
 	/*
 	 * Set the WalSender's xmin equal to the standby's requested xmin, so that
@@ -1713,15 +1755,23 @@ ProcessStandbyHSFeedbackMessage(void)
 	 * already since a VACUUM could have just finished calling GetOldestXmin.)
 	 *
 	 * If we're using a replication slot we reserve the xmin via that,
-	 * otherwise via the walsender's PGXACT entry.
+	 * otherwise via the walsender's PGXACT entry. We can only track the
+	 * catalog xmin separately when using a slot, so we store the least
+	 * of the two provided when not using a slot.
 	 *
 	 * XXX: It might make sense to generalize the ephemeral slot concept and
 	 * always use the slot mechanism to handle the feedback xmin.
 	 */
 	if (MyReplicationSlot != NULL)		/* XXX: persistency configurable? */
-		PhysicalReplicationSlotNewXmin(feedbackXmin);
+		PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
 	else
-		MyPgXact->xmin = feedbackXmin;
+	{
+		if (TransactionIdIsNormal(feedbackCatalogXmin)
+			&& TransactionIdPrecedes(feedbackCatalogXmin, feedbackXmin))
+			MyPgXact->xmin = feedbackCatalogXmin;
+		else
+			MyPgXact->xmin = feedbackXmin;
+	}
 }
 
 /*
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index 40c3247..1401efe 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -1264,6 +1264,9 @@ TransactionIdIsActive(TransactionId xid)
  * corresponding flags is set. Typically, if you want to ignore ones with
  * PROC_IN_VACUUM flag, you can use PROCARRAY_FLAGS_VACUUM.
  *
+ * The include_slots option may be set to false to ignore replication slot
+ * xmin and catalog_xmin when calculating the oldest xmin.
+ *
  * This is used by VACUUM to decide which deleted tuples must be preserved in
  * the passed in table. For shared relations backends in all databases must be
  * considered, but for non-shared relations that's not required, since only
@@ -1304,7 +1307,7 @@ TransactionIdIsActive(TransactionId xid)
  * GetOldestXmin() move backwards, with no consequences for data integrity.
  */
 TransactionId
-GetOldestXmin(Relation rel, int flags)
+GetOldestXmin(Relation rel, int flags, bool ignore_slots)
 {
 	ProcArrayStruct *arrayP = procArray;
 	TransactionId result;
@@ -1418,7 +1421,8 @@ GetOldestXmin(Relation rel, int flags)
 	/*
 	 * Check whether there are replication slots requiring an older xmin.
 	 */
-	if (TransactionIdIsValid(replication_slot_xmin) &&
+	if (!ignore_slots &&
+		TransactionIdIsValid(replication_slot_xmin) &&
 		NormalTransactionIdPrecedes(replication_slot_xmin, result))
 		result = replication_slot_xmin;
 
@@ -1428,7 +1432,8 @@ GetOldestXmin(Relation rel, int flags)
 	 * possible. We need to do so if we're computing the global limit (rel =
 	 * NULL) or if the passed relation is a catalog relation of some kind.
 	 */
-	if ((rel == NULL ||
+	if (!ignore_slots &&
+		(rel == NULL ||
 		 RelationIsAccessibleInLogicalDecoding(rel)) &&
 		TransactionIdIsValid(replication_slot_catalog_xmin) &&
 		NormalTransactionIdPrecedes(replication_slot_catalog_xmin, result))
diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h
index c8e1ae5..0e8f53e 100644
--- a/src/include/storage/procarray.h
+++ b/src/include/storage/procarray.h
@@ -75,7 +75,7 @@ extern RunningTransactions GetRunningTransactionData(void);
 
 extern bool TransactionIdIsInProgress(TransactionId xid);
 extern bool TransactionIdIsActive(TransactionId xid);
-extern TransactionId GetOldestXmin(Relation rel, int flags);
+extern TransactionId GetOldestXmin(Relation rel, int flags, bool ignore_slots);
 extern TransactionId GetOldestActiveTransactionId(void);
 extern TransactionId GetOldestSafeDecodingTransactionId(void);
 
diff --git a/src/test/recovery/t/010_logical_decoding_timelines.pl b/src/test/recovery/t/010_logical_decoding_timelines.pl
index 09830dc..4561a06 100644
--- a/src/test/recovery/t/010_logical_decoding_timelines.pl
+++ b/src/test/recovery/t/010_logical_decoding_timelines.pl
@@ -20,7 +20,7 @@ use warnings;
 
 use PostgresNode;
 use TestLib;
-use Test::More tests => 7;
+use Test::More tests => 10;
 use RecursiveCopy;
 use File::Copy;
 use IPC::Run ();
@@ -31,10 +31,14 @@ my ($stdout, $stderr, $ret);
 # Initialize master node
 my $node_master = get_new_node('master');
 $node_master->init(allows_streaming => 1, has_archiving => 1);
-$node_master->append_conf('postgresql.conf', "wal_level = 'logical'\n");
-$node_master->append_conf('postgresql.conf', "max_replication_slots = 2\n");
-$node_master->append_conf('postgresql.conf', "max_wal_senders = 2\n");
-$node_master->append_conf('postgresql.conf', "log_min_messages = 'debug2'\n");
+$node_master->append_conf('postgresql.conf', q[
+wal_level = 'logical'
+max_replication_slots = 3
+max_wal_senders = 2
+log_min_messages = 'debug2'
+hot_standby_feedback = on
+wal_receiver_status_interval = 1
+]);
 $node_master->dump_info;
 $node_master->start;
 
@@ -51,11 +55,17 @@ $node_master->safe_psql('postgres', 'CHECKPOINT;');
 my $backup_name = 'b1';
 $node_master->backup_fs_hot($backup_name);
 
+$node_master->safe_psql('postgres',
+	q[SELECT pg_create_physical_replication_slot('phys_slot');]);
+
 my $node_replica = get_new_node('replica');
 $node_replica->init_from_backup(
 	$node_master, $backup_name,
 	has_streaming => 1,
 	has_restoring => 1);
+$node_replica->append_conf(
+	'recovery.conf', q[primary_slot_name = 'phys_slot']);
+
 $node_replica->start;
 
 $node_master->safe_psql('postgres',
@@ -71,6 +81,24 @@ $stdout = $node_replica->safe_psql('postgres',
 is($stdout, 'before_basebackup',
 	'Expected to find only slot before_basebackup on replica');
 
+# Examine the physical slot the replica uses to stream changes
+# from the master to make sure its hot_standby_feedback
+# has locked in a catalog_xmin on the physical slot, and that
+# any xmin is < the catalog_xmin
+$node_master->poll_query_until('postgres', q[
+	SELECT catalog_xmin IS NOT NULL
+	FROM pg_replication_slots
+	WHERE slot_name = 'phys_slot'
+	]);
+my $phys_slot = $node_master->slot('phys_slot');
+isnt($phys_slot->{'xmin'}, '',
+	'xmin assigned on physical slot of master');
+isnt($phys_slot->{'catalog_xmin'}, '',
+	'catalog_xmin assigned on physical slot of master');
+# Ignore wrap-around here, we're on a new cluster:
+cmp_ok($phys_slot->{'xmin'}, '>=', $phys_slot->{'catalog_xmin'},
+	   'xmin on physical slot must not be lower than catalog_xmin');
+
 # Boom, crash
 $node_master->stop('immediate');
 
-- 
2.5.5

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to