On 25 October 2016 at 00:19, Petr Jelinek <p...@2ndquadrant.com> wrote:

>> Now formatted a series:
>>
>> 1.      Send catalog_xmin in hot standby feedback protocol
>
>> +     xmin_epoch = nextEpoch;
>>       if (nextXid < xmin)
>> -             nextEpoch--;
>> +             xmin_epoch --;
>> +     catalog_xmin_epoch = nextEpoch;
>> +     if (nextXid < catalog_xmin)
>> +             catalog_xmin_epoch --;
>
> Don't understand why you keep the nextEpoch here, it's not used anywhere
> that I can see, you could just as well use the xmin_epoch directly if
> that's how you want to name it.

Fixed, thanks.

>> +     /*
>> +      * A 10.0+ standby's walsender passes the lowest catalog xmin of any
>> +      * replication slot up to the master.
>> +      */
>> +     feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
>> +     feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);
>>
>
> I'd be more interested to know why this is sent rather than it's sent
> since version 10+ in this comment.

Removed. It's explained in a comment inside the if
(hot_standby_feedback) block in walreceiver anyway.

>> 2.      Make walsender respect catalog_xmin in hot standby feedback messages
>
>> +             if (TransactionIdIsNormal(feedbackCatalogXmin)
>> +                     && TransactionIdPrecedes(feedbackCatalogXmin, 
>> feedbackXmin))
>> +             {
>> +                     MyPgXact->xmin = feedbackCatalogXmin;
>> +             }
>> +             else
>> +             {
>> +                     MyPgXact->xmin = feedbackXmin;
>> +             }
>
> This not how we usually use the {} brackets (there are some more
> instances of using them for one line of code in this particular commit).

Whoops. Thanks. I find the Pg convention pretty ghastly when dealing
with multi-line 'if' conditions followed by a single-line statement,
but it's still the convention whether I like it or not.

>> 3.      Allow GetOldestXmin(...) to optionally disregard the catalog_xmin
>> By ignoring slot catalog_xmin in the GetOldestXmin() call then
>> separately calling ProcArrayGetReplicationSlotXmin() to get the
>> catalog_xmin to  we might produce a catalog xmin slightly later than
>> what was in the procarray at the time we previously called
>> GetOldestXmin() to examine backend/session state. ProcArrayLock is
>> released so it can be advanced in-between the calls. That's harmless -
>> it isn't necessary for the reported catalog_xmin to be exactly
>> consistent with backend state. If it advances it's safe to report the
>> new position since we know the confirmed positions are on-disk
>> locally.
>>
>> The alternative here is to extend GetOldestXmin() to take an out-param
>> to report the slot catalog xmin. That really just duplicates  the
>> functionality of ProcArrayGetReplicationSlotXmin but means we can grab
>> it within a single ProcArray lock. Variants of GetOldestXmin and
>> ProcArrayGetReplicationSlotXmin that take an already-locked flag would
>> work too, but would hold the lock across parts of GetOldestXmin() that
>> currently don't retain it. I could also convert the current boolean
>> param ignoreVacuum into a flags argument instead of adding another
>> boolean. No real preference from me.
>>
>
> I would honestly prefer the change to GetOldestXmin to return the
> catalog_xmin. It seems both cleaner and does less locking.

Fair enough. Done.

> In general it's simpler patch than I expected which is good. But it
> would be good to have some tests.

Agreed. OK, I've added basic tests for physical replication slots and
hot_standby_feedback to t/001_stream_rep.pl since it make sense to
test both along with stream_rep.pl's tests of cascading, etc and I
don't think a separate test is needed.

It's not actually practical to add tests for the catalog_xmin on
standby functionality until the next patch in the series (pending)
which enables logical decoding on standby. Currently you can't create
a slot on a standby so you can't cause the standby to hold down
catalog_xmin. But the tests show things work how they should within
the range of currently exposed functionality.

In the process I noticed how skeletal those tests still are. We have a
great framework now (thanks Michael!) and I'd like to start filling it
out with tests involving unclean shutdowns, promotions, etc. There's a
lot still to write to get solid coverage. Tests aren't hard. Who's
keen to write some? I'll happily help any volunteers out.

New patch series attached. 0001 is the new tests. The guts is patches
2-5. I'm not sure whether 2, 3, 4 and 5 should be squashed for commit
or not, but I left them separate for easier review.

For complete functionality this series will want to be coupled with
logical decoding timeline following and a pending patch to enable
logical decoding on standby.

-- 
 Craig Ringer                   http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services
From 80964a3b4f6a98f83a123f69924f2627d89c7a5b Mon Sep 17 00:00:00 2001
From: Craig Ringer <cr...@2ndquadrant.com>
Date: Mon, 5 Sep 2016 16:23:57 +0800
Subject: [PATCH 5/7] Send catalog_xmin separately in hot_standby_feedback
 messages

Now that the protocol supports reporting catalog_xmin separately and
GetOldestXmin() allows us to exclude the catalog_xmin from the calculated xmin,
actually send a separate catalog_xmin to the master.

This change is necessary, but not sufficient, to allow logical decoding
on a standby.
---
 src/backend/replication/walreceiver.c | 18 ++++++++++++++----
 1 file changed, 14 insertions(+), 4 deletions(-)

diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 80cc482..318d8ce 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -1206,11 +1206,21 @@ XLogWalRcvSendHSFeedback(bool immed)
 	 * everything else has been checked.
 	 */
 	if (hot_standby_feedback)
-		xmin = GetOldestXmin(NULL, false, NULL);
+	{
+		/*
+		 * Usually GetOldestXmin() would include the catalog_xmin in its
+		 * calculations, but we don't want to hold upstream back from vacuuming
+		 * normal user table tuples just because they're within the
+		 * catalog_xmin horizon of logical replication slots on this standby.
+		 * Instead we report the catalog_xmin to the upstream separately.
+		 */
+		xmin = GetOldestXmin(NULL, false, &catalog_xmin);
+	}
 	else
+	{
 		xmin = InvalidTransactionId;
-
-	catalog_xmin = InvalidTransactionId;
+		catalog_xmin = InvalidTransactionId;
+	}
 
 	/*
 	 * Get epoch and adjust if nextXid and oldestXmin are different sides of
@@ -1235,7 +1245,7 @@ XLogWalRcvSendHSFeedback(bool immed)
 	pq_sendint(&reply_message, catalog_xmin, 4);
 	pq_sendint(&reply_message, catalog_xmin_epoch, 4);
 	walrcv_send(reply_message.data, reply_message.len);
-	if (TransactionIdIsValid(xmin))
+	if (TransactionIdIsValid(xmin) || TransactionIdIsValid(catalog_xmin))
 		master_has_standby_xmin = true;
 	else
 		master_has_standby_xmin = false;
-- 
2.5.5

From 35cd1cf69f11afece009f4508f8194f61ceb70cc Mon Sep 17 00:00:00 2001
From: Craig Ringer <cr...@2ndquadrant.com>
Date: Mon, 5 Sep 2016 16:13:35 +0800
Subject: [PATCH 4/7] Allow GetOldestXmin(...) to optionally disregard the
 catalog_xmin

Add a new catalog_xmin out-parameter to GetOldestXmin(...), for use when
calculating hot standby feedback xmins. When passed, any needed catalog_xmin is
returned separately instead of being merged with the return value. Adjust
existing call sites.
---
 contrib/pg_visibility/pg_visibility.c |  4 +--
 contrib/pgstattuple/pgstatapprox.c    |  2 +-
 src/backend/access/transam/xlog.c     |  4 +--
 src/backend/catalog/index.c           |  2 +-
 src/backend/commands/analyze.c        |  2 +-
 src/backend/commands/vacuum.c         |  4 +--
 src/backend/replication/walreceiver.c |  2 +-
 src/backend/storage/ipc/procarray.c   | 51 +++++++++++++++++++++++------------
 src/include/storage/procarray.h       |  2 +-
 9 files changed, 45 insertions(+), 28 deletions(-)

diff --git a/contrib/pg_visibility/pg_visibility.c b/contrib/pg_visibility/pg_visibility.c
index 9985e3e..4fa3ad4 100644
--- a/contrib/pg_visibility/pg_visibility.c
+++ b/contrib/pg_visibility/pg_visibility.c
@@ -538,7 +538,7 @@ collect_corrupt_items(Oid relid, bool all_visible, bool all_frozen)
 	if (all_visible)
 	{
 		/* Don't pass rel; that will fail in recovery. */
-		OldestXmin = GetOldestXmin(NULL, true);
+		OldestXmin = GetOldestXmin(NULL, true, false);
 	}
 
 	rel = relation_open(relid, AccessShareLock);
@@ -660,7 +660,7 @@ collect_corrupt_items(Oid relid, bool all_visible, bool all_frozen)
 				 * a buffer lock. And this shouldn't happen often, so it's
 				 * worth being careful so as to avoid false positives.
 				 */
-				RecomputedOldestXmin = GetOldestXmin(NULL, true);
+				RecomputedOldestXmin = GetOldestXmin(NULL, true, false);
 
 				if (!TransactionIdPrecedes(OldestXmin, RecomputedOldestXmin))
 					record_corrupt_item(items, &tuple.t_self);
diff --git a/contrib/pgstattuple/pgstatapprox.c b/contrib/pgstattuple/pgstatapprox.c
index f524fc4..5b33c97 100644
--- a/contrib/pgstattuple/pgstatapprox.c
+++ b/contrib/pgstattuple/pgstatapprox.c
@@ -70,7 +70,7 @@ statapprox_heap(Relation rel, output_type *stat)
 	TransactionId OldestXmin;
 	uint64		misc_count = 0;
 
-	OldestXmin = GetOldestXmin(rel, true);
+	OldestXmin = GetOldestXmin(rel, true, false);
 	bstrategy = GetAccessStrategy(BAS_BULKREAD);
 
 	nblocks = RelationGetNumberOfBlocks(rel);
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 6cec027..56c672c 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -8653,7 +8653,7 @@ CreateCheckPoint(int flags)
 	 * StartupSUBTRANS hasn't been called yet.
 	 */
 	if (!RecoveryInProgress())
-		TruncateSUBTRANS(GetOldestXmin(NULL, false));
+		TruncateSUBTRANS(GetOldestXmin(NULL, false, NULL));
 
 	/* Real work is done, but log and update stats before releasing lock. */
 	LogCheckpointEnd(false);
@@ -9016,7 +9016,7 @@ CreateRestartPoint(int flags)
 	 * this because StartupSUBTRANS hasn't been called yet.
 	 */
 	if (EnableHotStandby)
-		TruncateSUBTRANS(GetOldestXmin(NULL, false));
+		TruncateSUBTRANS(GetOldestXmin(NULL, false, NULL));
 
 	/* Real work is done, but log and update before releasing lock. */
 	LogCheckpointEnd(true);
diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c
index 08b646d..b673c06 100644
--- a/src/backend/catalog/index.c
+++ b/src/backend/catalog/index.c
@@ -2272,7 +2272,7 @@ IndexBuildHeapRangeScan(Relation heapRelation,
 	{
 		snapshot = SnapshotAny;
 		/* okay to ignore lazy VACUUMs here */
-		OldestXmin = GetOldestXmin(heapRelation, true);
+		OldestXmin = GetOldestXmin(heapRelation, true, NULL);
 	}
 
 	scan = heap_beginscan_strat(heapRelation,	/* relation */
diff --git a/src/backend/commands/analyze.c b/src/backend/commands/analyze.c
index c617abb..9b0cc3a 100644
--- a/src/backend/commands/analyze.c
+++ b/src/backend/commands/analyze.c
@@ -992,7 +992,7 @@ acquire_sample_rows(Relation onerel, int elevel,
 	totalblocks = RelationGetNumberOfBlocks(onerel);
 
 	/* Need a cutoff xmin for HeapTupleSatisfiesVacuum */
-	OldestXmin = GetOldestXmin(onerel, true);
+	OldestXmin = GetOldestXmin(onerel, true, NULL);
 
 	/* Prepare for sampling block numbers */
 	BlockSampler_Init(&bs, totalblocks, targrows, random());
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index 58bbf55..aaee9a6 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -497,7 +497,7 @@ vacuum_set_xid_limits(Relation rel,
 	 * always an independent transaction.
 	 */
 	*oldestXmin =
-		TransactionIdLimitedForOldSnapshots(GetOldestXmin(rel, true), rel);
+		TransactionIdLimitedForOldSnapshots(GetOldestXmin(rel, true, NULL), rel);
 
 	Assert(TransactionIdIsNormal(*oldestXmin));
 
@@ -909,7 +909,7 @@ vac_update_datfrozenxid(void)
 	 * committed pg_class entries for new tables; see AddNewRelationTuple().
 	 * So we cannot produce a wrong minimum by starting with this.
 	 */
-	newFrozenXid = GetOldestXmin(NULL, true);
+	newFrozenXid = GetOldestXmin(NULL, true, NULL);
 
 	/*
 	 * Similarly, initialize the MultiXact "min" with the value that would be
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 06ca9e4..80cc482 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -1206,7 +1206,7 @@ XLogWalRcvSendHSFeedback(bool immed)
 	 * everything else has been checked.
 	 */
 	if (hot_standby_feedback)
-		xmin = GetOldestXmin(NULL, false);
+		xmin = GetOldestXmin(NULL, false, NULL);
 	else
 		xmin = InvalidTransactionId;
 
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index e5d487d..a4e3549 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -1298,17 +1298,22 @@ TransactionIdIsActive(TransactionId xid)
  * process can set its xmin based on transactions that are no longer running
  * in the master but are still being replayed on the standby, thus possibly
  * making the GetOldestXmin reading go backwards.  In this case there is a
- * possibility that we lose data that the standby would like to have, but
- * there is little we can do about that --- data is only protected if the
- * walsender runs continuously while queries are executed on the standby.
- * (The Hot Standby code deals with such cases by failing standby queries
- * that needed to access already-removed data, so there's no integrity bug.)
+ * possibility that we lose data that the standby would like to have
+ * unless the standby uses a replication slot to make its xmin persistent
+ * even when it isn't connected. The Hot Standby code deals with such cases by
+ * failing standby queries that needed to access already-removed data, so
+ * there's no integrity bug.
+ *
  * The return value is also adjusted with vacuum_defer_cleanup_age, so
  * increasing that setting on the fly is another easy way to make
  * GetOldestXmin() move backwards, with no consequences for data integrity.
+ *
+ * The caller may request that replication slots' catalog_xmin values be
+ * disregarded when calculating the global xmin. The caller must account
+ * for catalog_xmin separately.
  */
 TransactionId
-GetOldestXmin(Relation rel, bool ignoreVacuum)
+GetOldestXmin(Relation rel, bool ignoreVacuum, TransactionId *catalog_xmin)
 {
 	ProcArrayStruct *arrayP = procArray;
 	TransactionId result;
@@ -1433,17 +1438,29 @@ GetOldestXmin(Relation rel, bool ignoreVacuum)
 		NormalTransactionIdPrecedes(replication_slot_xmin, result))
 		result = replication_slot_xmin;
 
-	/*
-	 * After locks have been released and defer_cleanup_age has been applied,
-	 * check whether we need to back up further to make logical decoding
-	 * possible. We need to do so if we're computing the global limit (rel =
-	 * NULL) or if the passed relation is a catalog relation of some kind.
-	 */
-	if ((rel == NULL ||
-		 RelationIsAccessibleInLogicalDecoding(rel)) &&
-		TransactionIdIsValid(replication_slot_catalog_xmin) &&
-		NormalTransactionIdPrecedes(replication_slot_catalog_xmin, result))
-		result = replication_slot_catalog_xmin;
+	if (!(rel == NULL || RelationIsAccessibleInLogicalDecoding(rel)))
+		replication_slot_catalog_xmin = InvalidXLogRecPtr;
+
+	if (catalog_xmin != NULL)
+	{
+		/*
+		 * The caller wants any logical decoding specific xmin reported
+		 * separately, so don't merge it with the xmin we'll return.
+		 */
+		*catalog_xmin = replication_slot_catalog_xmin;
+	}
+	else
+	{
+		/*
+		 * After locks have been released and defer_cleanup_age has been applied,
+		 * check whether we need to back up further to make logical decoding
+		 * possible. We need to do so if we're computing the global limit (rel =
+		 * NULL) or if the passed relation is a catalog relation of some kind.
+		 */
+		if (TransactionIdIsValid(replication_slot_catalog_xmin) &&
+			NormalTransactionIdPrecedes(replication_slot_catalog_xmin, result))
+			result = replication_slot_catalog_xmin;
+	}
 
 	return result;
 }
diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h
index dd37c0c..f7d1d96 100644
--- a/src/include/storage/procarray.h
+++ b/src/include/storage/procarray.h
@@ -53,7 +53,7 @@ extern RunningTransactions GetRunningTransactionData(void);
 
 extern bool TransactionIdIsInProgress(TransactionId xid);
 extern bool TransactionIdIsActive(TransactionId xid);
-extern TransactionId GetOldestXmin(Relation rel, bool ignoreVacuum);
+extern TransactionId GetOldestXmin(Relation rel, bool ignoreVacuum, TransactionId *catalog_xmin);
 extern TransactionId GetOldestActiveTransactionId(void);
 extern TransactionId GetOldestSafeDecodingTransactionId(void);
 
-- 
2.5.5

From 2b2dea6589916a04b8e40fd7ba6d3ccc8f6e8745 Mon Sep 17 00:00:00 2001
From: Craig Ringer <cr...@2ndquadrant.com>
Date: Mon, 5 Sep 2016 15:38:40 +0800
Subject: [PATCH 3/7] Make walsender respect catalog_xmin in hot standby
 feedback messages

The walsender now respects the new catalog_xmin field in the hot standby
feedback message. It uses it to set the catalog_xmin field on its physical
replication slot if one is in use. Otherwise it sets its process xmin to the
older of the xmin and catalog_xmin, so the outcome is the same as before
the protocol change.

In the process, factor out walsender's sanity check for xid+epoch wraparound
into a separate TransactionIdInRecentPast() function since we're now checking
it in two places.
---
 src/backend/replication/walsender.c | 111 +++++++++++++++++++++++++++---------
 1 file changed, 84 insertions(+), 27 deletions(-)

diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index aca3ca1..ba3b471 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -215,6 +215,7 @@ static long WalSndComputeSleeptime(TimestampTz now);
 static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
 static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
 static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
+static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
 
 static void XLogRead(char *buf, XLogRecPtr startptr, Size count);
 
@@ -1533,6 +1534,11 @@ PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
 	 * be energy wasted - the worst lost information can do here is give us
 	 * wrong information in a statistics view - we'll just potentially be more
 	 * conservative in removing files.
+	 *
+	 * We don't have to do any effective_xmin / effective_catalog_xmin testing
+	 * here either, like for LogicalConfirmReceivedLocation. If we received
+	 * the xmin and catalog_xmin from downstream replication slots we know they
+	 * were already confirmed there,
 	 */
 }
 
@@ -1595,7 +1601,7 @@ ProcessStandbyReplyMessage(void)
 
 /* compute new replication slot xmin horizon if needed */
 static void
-PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin)
+PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
 {
 	bool		changed = false;
 	ReplicationSlot *slot = MyReplicationSlot;
@@ -1616,6 +1622,22 @@ PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin)
 		slot->data.xmin = feedbackXmin;
 		slot->effective_xmin = feedbackXmin;
 	}
+	/*
+	 * If the physical slot is relaying catalog_xmin for logical replication
+	 * slots on the replica it's safe to act on catalog_xmin advances
+	 * immediately too. The replica will only send a new catalog_xmin via
+	 * feedback when it advances its effective_catalog_xmin, so it's done the
+	 * delay-until-confirmed dance for us and knows it won't need the data
+	 * we're protecting from vacuum again.
+	 */
+	if (!TransactionIdIsNormal(slot->data.catalog_xmin) ||
+		!TransactionIdIsNormal(feedbackCatalogXmin) ||
+		TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin))
+	{
+		changed = true;
+		slot->data.catalog_xmin = feedbackCatalogXmin;
+		slot->effective_catalog_xmin = feedbackCatalogXmin;
+	}
 	SpinLockRelease(&slot->mutex);
 
 	if (changed)
@@ -1626,13 +1648,46 @@ PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin)
 }
 
 /*
+ * Check that the provided xmin/epoch are sane, that is, not in the future
+ * and not so far back as to be already wrapped around.
+ *
+ * Epoch of nextXid should be same as standby, or if the counter has
+ * wrapped, then one greater than standby.
+ *
+ * This check doesn't care about whether clog exists for these xids
+ * at all.
+ */
+static bool
+TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
+{
+	TransactionId nextXid;
+	uint32		nextEpoch;
+
+	GetNextXidAndEpoch(&nextXid, &nextEpoch);
+
+	if (xid <= nextXid)
+	{
+		if (epoch != nextEpoch)
+			return false;
+	}
+	else
+	{
+		if (epoch + 1 != nextEpoch)
+			return false;
+	}
+
+	if (!TransactionIdPrecedesOrEquals(xid, nextXid))
+		return false;				/* epoch OK, but it's wrapped around */
+
+	return true;
+}
+
+/*
  * Hot Standby feedback
  */
 static void
 ProcessStandbyHSFeedbackMessage(void)
 {
-	TransactionId nextXid;
-	uint32		nextEpoch;
 	TransactionId feedbackXmin;
 	uint32		feedbackEpoch;
 	TransactionId feedbackCatalogXmin;
@@ -1640,7 +1695,8 @@ ProcessStandbyHSFeedbackMessage(void)
 
 	/*
 	 * Decipher the reply message. The caller already consumed the msgtype
-	 * byte.
+	 * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
+	 * of this message.
 	 */
 	(void) pq_getmsgint64(&reply_message);		/* sendTime; not used ATM */
 	feedbackXmin = pq_getmsgint(&reply_message, 4);
@@ -1654,37 +1710,30 @@ ProcessStandbyHSFeedbackMessage(void)
 		 feedbackCatalogXmin,
 		 feedbackCatalogEpoch);
 
-	/* Unset WalSender's xmin if the feedback message value is invalid */
-	if (!TransactionIdIsNormal(feedbackXmin))
+	/*
+	 * Unset WalSender's xmins if the feedback message values are invalid.
+	 * This happens when the downstream turned hot_standby_feedback off.
+	 */
+	if (!TransactionIdIsNormal(feedbackXmin)
+		&& !TransactionIdIsNormal(feedbackCatalogXmin))
 	{
 		MyPgXact->xmin = InvalidTransactionId;
 		if (MyReplicationSlot != NULL)
-			PhysicalReplicationSlotNewXmin(feedbackXmin);
+			PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
 		return;
 	}
 
 	/*
 	 * Check that the provided xmin/epoch are sane, that is, not in the future
 	 * and not so far back as to be already wrapped around.  Ignore if not.
-	 *
-	 * Epoch of nextXid should be same as standby, or if the counter has
-	 * wrapped, then one greater than standby.
 	 */
-	GetNextXidAndEpoch(&nextXid, &nextEpoch);
-
-	if (feedbackXmin <= nextXid)
-	{
-		if (feedbackEpoch != nextEpoch)
-			return;
-	}
-	else
-	{
-		if (feedbackEpoch + 1 != nextEpoch)
-			return;
-	}
+	if (TransactionIdIsNormal(feedbackXmin) &&
+		!TransactionIdInRecentPast(feedbackXmin, feedbackEpoch))
+		return;
 
-	if (!TransactionIdPrecedesOrEquals(feedbackXmin, nextXid))
-		return;					/* epoch OK, but it's wrapped around */
+	if (TransactionIdIsNormal(feedbackCatalogXmin) &&
+		!TransactionIdInRecentPast(feedbackCatalogXmin, feedbackCatalogEpoch))
+		return;
 
 	/*
 	 * Set the WalSender's xmin equal to the standby's requested xmin, so that
@@ -1709,15 +1758,23 @@ ProcessStandbyHSFeedbackMessage(void)
 	 * already since a VACUUM could have just finished calling GetOldestXmin.)
 	 *
 	 * If we're using a replication slot we reserve the xmin via that,
-	 * otherwise via the walsender's PGXACT entry.
+	 * otherwise via the walsender's PGXACT entry. We can only track the
+	 * catalog xmin separately when using a slot, so we store the least
+	 * of the two provided when not using a slot.
 	 *
 	 * XXX: It might make sense to generalize the ephemeral slot concept and
 	 * always use the slot mechanism to handle the feedback xmin.
 	 */
 	if (MyReplicationSlot != NULL)		/* XXX: persistency configurable? */
-		PhysicalReplicationSlotNewXmin(feedbackXmin);
+		PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
 	else
-		MyPgXact->xmin = feedbackXmin;
+	{
+		if (TransactionIdIsNormal(feedbackCatalogXmin)
+			&& TransactionIdPrecedes(feedbackCatalogXmin, feedbackXmin))
+			MyPgXact->xmin = feedbackCatalogXmin;
+		else
+			MyPgXact->xmin = feedbackXmin;
+	}
 }
 
 /*
-- 
2.5.5

From dcd32f60383666715ea8476a708c461fe2a056f6 Mon Sep 17 00:00:00 2001
From: Craig Ringer <cr...@2ndquadrant.com>
Date: Mon, 5 Sep 2016 15:30:53 +0800
Subject: [PATCH 2/7] Send catalog_xmin in hot standby feedback protocol

Add catalog_xmin to the to the hot standby feedback protocol so a read replica
that has logical slots can use its physical slot to the master to hold down the
master's catalog_xmin. This information will let a replica prevent vacuuming of
catalog tuples still required by the replica's logical slots.

This is the hot standby feedback protocol change, the new value is always set
to zero by the walreceiver and is ignored by the walsender.
---
 doc/src/sgml/protocol.sgml            | 33 ++++++++++++++++++++++++++++-----
 src/backend/replication/walreceiver.c | 21 ++++++++++++++-------
 src/backend/replication/walsender.c   | 10 ++++++++--
 3 files changed, 50 insertions(+), 14 deletions(-)

diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index 50cf527..e0fd9aa 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -1783,10 +1783,11 @@ The commands accepted in walsender mode are:
       </term>
       <listitem>
       <para>
-          The standby's current xmin. This may be 0, if the standby is
-          sending notification that Hot Standby feedback will no longer
-          be sent on this connection. Later non-zero messages may
-          reinitiate the feedback mechanism.
+          The standby's current global xmin, excluding the catalog_xmin from any
+          replication slots. If both this value and the following
+          catalog_xmin are 0 this is treated as a notification that Hot Standby
+          feedback will no longer be sent on this connection. Later non-zero
+          messages may reinitiate the feedback mechanism.
       </para>
       </listitem>
       </varlistentry>
@@ -1796,7 +1797,29 @@ The commands accepted in walsender mode are:
       </term>
       <listitem>
       <para>
-          The standby's current epoch.
+          The epoch of the global xmin xid on the standby.
+      </para>
+      </listitem>
+      </varlistentry>
+      <varlistentry>
+      <term>
+          Int32
+      </term>
+      <listitem>
+      <para>
+          The lowest catalog_xmin of any replication slots on the standby. Set to 0
+          if no catalog_xmin exists on the standby or if hot standby feedback is being
+          disabled. New in 10.0.
+      </para>
+      </listitem>
+      </varlistentry>
+      <varlistentry>
+      <term>
+          Int32
+      </term>
+      <listitem>
+      <para>
+          The epoch of the catalog_xmin xid on the standby. New in 10.0.
       </para>
       </listitem>
       </varlistentry>
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 2bb3dce..06ca9e4 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -1164,8 +1164,8 @@ XLogWalRcvSendHSFeedback(bool immed)
 {
 	TimestampTz now;
 	TransactionId nextXid;
-	uint32		nextEpoch;
-	TransactionId xmin;
+	uint32		xmin_epoch, catalog_xmin_epoch;
+	TransactionId xmin, catalog_xmin;
 	static TimestampTz sendTime = 0;
 	static bool master_has_standby_xmin = false;
 
@@ -1210,23 +1210,30 @@ XLogWalRcvSendHSFeedback(bool immed)
 	else
 		xmin = InvalidTransactionId;
 
+	catalog_xmin = InvalidTransactionId;
+
 	/*
 	 * Get epoch and adjust if nextXid and oldestXmin are different sides of
 	 * the epoch boundary.
 	 */
-	GetNextXidAndEpoch(&nextXid, &nextEpoch);
+	GetNextXidAndEpoch(&nextXid, &xmin_epoch);
+	catalog_xmin_epoch = xmin_epoch;
 	if (nextXid < xmin)
-		nextEpoch--;
+		xmin_epoch --;
+	if (nextXid < catalog_xmin)
+		catalog_xmin_epoch --;
 
-	elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u",
-		 xmin, nextEpoch);
+	elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u catalog_xmin %u catalog_xmin_epoch %u",
+		 xmin, xmin_epoch, catalog_xmin, catalog_xmin_epoch);
 
 	/* Construct the message and send it. */
 	resetStringInfo(&reply_message);
 	pq_sendbyte(&reply_message, 'h');
 	pq_sendint64(&reply_message, GetCurrentIntegerTimestamp());
 	pq_sendint(&reply_message, xmin, 4);
-	pq_sendint(&reply_message, nextEpoch, 4);
+	pq_sendint(&reply_message, xmin_epoch, 4);
+	pq_sendint(&reply_message, catalog_xmin, 4);
+	pq_sendint(&reply_message, catalog_xmin_epoch, 4);
 	walrcv_send(reply_message.data, reply_message.len);
 	if (TransactionIdIsValid(xmin))
 		master_has_standby_xmin = true;
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index bc5e508..aca3ca1 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1635,6 +1635,8 @@ ProcessStandbyHSFeedbackMessage(void)
 	uint32		nextEpoch;
 	TransactionId feedbackXmin;
 	uint32		feedbackEpoch;
+	TransactionId feedbackCatalogXmin;
+	uint32		feedbackCatalogEpoch;
 
 	/*
 	 * Decipher the reply message. The caller already consumed the msgtype
@@ -1643,10 +1645,14 @@ ProcessStandbyHSFeedbackMessage(void)
 	(void) pq_getmsgint64(&reply_message);		/* sendTime; not used ATM */
 	feedbackXmin = pq_getmsgint(&reply_message, 4);
 	feedbackEpoch = pq_getmsgint(&reply_message, 4);
+	feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
+	feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);
 
-	elog(DEBUG2, "hot standby feedback xmin %u epoch %u",
+	elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u",
 		 feedbackXmin,
-		 feedbackEpoch);
+		 feedbackEpoch,
+		 feedbackCatalogXmin,
+		 feedbackCatalogEpoch);
 
 	/* Unset WalSender's xmin if the feedback message value is invalid */
 	if (!TransactionIdIsNormal(feedbackXmin))
-- 
2.5.5

From e0cbf6ff72b14a11e5ab48b442f6b74d0aa9ff4c Mon Sep 17 00:00:00 2001
From: Craig Ringer <cr...@2ndquadrant.com>
Date: Wed, 9 Nov 2016 13:44:04 +0800
Subject: [PATCH 1/7] Expand streaming replication tests to cover hot standby
 feedback and physical replication slots

---
 src/test/recovery/t/001_stream_rep.pl | 112 ++++++++++++++++++++++++++++++----
 1 file changed, 101 insertions(+), 11 deletions(-)

diff --git a/src/test/recovery/t/001_stream_rep.pl b/src/test/recovery/t/001_stream_rep.pl
index 981c00b..89d0e90 100644
--- a/src/test/recovery/t/001_stream_rep.pl
+++ b/src/test/recovery/t/001_stream_rep.pl
@@ -3,7 +3,7 @@ use strict;
 use warnings;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 4;
+use Test::More tests => 18;
 
 # Initialize master node
 my $node_master = get_new_node('master');
@@ -40,16 +40,21 @@ $node_master->safe_psql('postgres',
 	"CREATE TABLE tab_int AS SELECT generate_series(1,1002) AS a");
 
 # Wait for standbys to catch up
-my $applname_1 = $node_standby_1->name;
-my $applname_2 = $node_standby_2->name;
-my $caughtup_query =
-"SELECT pg_current_xlog_location() <= replay_location FROM pg_stat_replication WHERE application_name = '$applname_1';";
-$node_master->poll_query_until('postgres', $caughtup_query)
-  or die "Timed out while waiting for standby 1 to catch up";
-$caughtup_query =
-"SELECT pg_last_xlog_replay_location() <= replay_location FROM pg_stat_replication WHERE application_name = '$applname_2';";
-$node_standby_1->poll_query_until('postgres', $caughtup_query)
-  or die "Timed out while waiting for standby 2 to catch up";
+sub wait_for_catchup
+{
+	my $applname_1 = $node_standby_1->name;
+	my $applname_2 = $node_standby_2->name;
+	my $caughtup_query =
+	"SELECT pg_current_xlog_location() <= replay_location FROM pg_stat_replication WHERE application_name = '$applname_1';";
+	$node_master->poll_query_until('postgres', $caughtup_query)
+	  or die "Timed out while waiting for standby 1 to catch up";
+	$caughtup_query =
+	"SELECT pg_last_xlog_replay_location() <= replay_location FROM pg_stat_replication WHERE application_name = '$applname_2';";
+	$node_standby_1->poll_query_until('postgres', $caughtup_query)
+	  or die "Timed out while waiting for standby 2 to catch up";
+}
+
+wait_for_catchup();
 
 my $result =
   $node_standby_1->safe_psql('postgres', "SELECT count(*) FROM tab_int");
@@ -66,3 +71,88 @@ is($node_standby_1->psql('postgres', 'INSERT INTO tab_int VALUES (1)'),
 	3, 'read-only queries on standby 1');
 is($node_standby_2->psql('postgres', 'INSERT INTO tab_int VALUES (1)'),
 	3, 'read-only queries on standby 2');
+
+diag "switching to physical replication slot";
+# Switch to using a physical replication slot. We can do this without a new
+# backup since physical slots can go backwards if needed. Do so on both
+# standbys. Since we're going to be testing things that affect the slot state,
+# also increase the standby feedback interval to ensure timely updates.
+my ($slotname_1, $slotname_2) = ('standby_1', 'standby_2');
+$node_master->append_conf('postgresql.conf', "max_replication_slots = 4\n");
+$node_master->restart;
+is($node_master->psql('postgres', qq[SELECT pg_create_physical_replication_slot('$slotname_1');]), 0, 'physical slot created on master');
+$node_standby_1->append_conf('recovery.conf', "primary_slot_name = $slotname_1\n");
+$node_standby_1->append_conf('postgresql.conf', "wal_receiver_status_interval = 1\n");
+$node_standby_1->append_conf('postgresql.conf', "max_replication_slots = 4\n");
+$node_standby_1->restart;
+is($node_standby_1->psql('postgres', qq[SELECT pg_create_physical_replication_slot('$slotname_2');]), 0, 'physical slot created on intermediate replica');
+$node_standby_2->append_conf('recovery.conf', "primary_slot_name = $slotname_2\n");
+$node_standby_2->append_conf('postgresql.conf', "wal_receiver_status_interval = 1\n");
+$node_standby_2->restart;
+
+sub get_slot_xmins
+{
+	my ($node, $slotname) = @_;
+	my ($xmin, $catalog_xmin) = split(qr/\|/, $node->safe_psql('postgres', qq[SELECT xmin, catalog_xmin FROM pg_replication_slots WHERE slot_name = '$slotname';]));
+	return (defined($xmin) ? $xmin : '', defined($catalog_xmin) ? $catalog_xmin : '');
+}
+
+# There's no hot standby feedback and there are no logical slots on either peer
+# so xmin and catalog_xmin should be null on both slots.
+my ($xmin, $catalog_xmin) = get_slot_xmins($node_master, $slotname_1);
+is($xmin, '', 'non-cascaded slot xmin null with no hs_feedback');
+is($catalog_xmin, '', 'non-cascaded slot xmin null with no hs_feedback');
+
+($xmin, $catalog_xmin) = get_slot_xmins($node_standby_1, $slotname_2);
+is($xmin, '', 'cascaded slot xmin null with no hs_feedback');
+is($catalog_xmin, '', 'cascaded slot xmin null with no hs_feedback');
+
+# Replication still works?
+$node_master->safe_psql('postgres', 'CREATE TABLE replayed(val integer);');
+
+sub replay_check
+{
+	my $newval = $node_master->safe_psql('postgres', 'INSERT INTO replayed(val) SELECT coalesce(max(val),0) + 1 AS newval FROM replayed RETURNING val');
+	wait_for_catchup();
+	$node_standby_1->safe_psql('postgres', qq[SELECT 1 FROM replayed WHERE val = $newval])
+		or die "standby_1 didn't replay master value $newval";
+	$node_standby_2->safe_psql('postgres', qq[SELECT 1 FROM replayed WHERE val = $newval])
+		or die "standby_2 didn't replay standby_1 value $newval";
+}
+
+replay_check();
+
+diag "enabling hot_standby_feedback";
+# Enable hs_feedback. The slot should gain an xmin. We set the status interval
+# so we'll see the results promptly.
+$node_standby_1->safe_psql('postgres', 'ALTER SYSTEM SET hot_standby_feedback = on;');
+$node_standby_1->reload;
+$node_standby_2->safe_psql('postgres', 'ALTER SYSTEM SET hot_standby_feedback = on;');
+$node_standby_2->reload;
+replay_check();
+sleep(2);
+
+($xmin, $catalog_xmin) = get_slot_xmins($node_master, $slotname_1);
+isnt($xmin, '', 'non-cascaded slot xmin non-null with hs feedback');
+is($catalog_xmin, '', 'non-cascaded slot xmin still null with hs_feedback');
+
+($xmin, $catalog_xmin) = get_slot_xmins($node_standby_1, $slotname_2);
+isnt($xmin, '', 'cascaded slot xmin non-null with hs feedback');
+is($catalog_xmin, '', 'cascaded slot xmin still null with hs_feedback');
+
+diag "disabling hot_standby_feedback";
+# Disable hs_feedback. Xmin should be cleared.
+$node_standby_1->safe_psql('postgres', 'ALTER SYSTEM SET hot_standby_feedback = off;');
+$node_standby_1->reload;
+$node_standby_2->safe_psql('postgres', 'ALTER SYSTEM SET hot_standby_feedback = off;');
+$node_standby_2->reload;
+replay_check();
+sleep(2);
+
+($xmin, $catalog_xmin) = get_slot_xmins($node_master, $slotname_1);
+is($xmin, '', 'non-cascaded slot xmin null with hs feedback reset');
+is($catalog_xmin, '', 'non-cascaded slot xmin still null with hs_feedback reset');
+
+($xmin, $catalog_xmin) = get_slot_xmins($node_standby_1, $slotname_2);
+is($xmin, '', 'cascaded slot xmin null with hs feedback reset');
+is($catalog_xmin, '', 'cascaded slot xmin still null with hs_feedback reset');
-- 
2.5.5

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to