Hi all

Currently hot standby feedback sends GetOldestXmin()'s result to the
upstream as the required xmin. GetOldestXmin() returns a slot's
catalog_xmin if that's the lowest xmin on the system.

That's fine so long as we don't do logical decoding on standbys, but
if we start allowing logical slots on standbys it'll cause the master
to retain too much bloat since it'll pin the master's xmin (not
catalog_xmin) down based on the catalog_xmin of any slots on the
downstream.

To fix that, add new fields to the hot standby feedback protocol
message to carry a separate catalog_xmin.

This doesn't need any special care for backward compatibility because
the only thing that has any business sending hot standby feedback is a
physical standby and they're required to be the same major version as
the master. If someone tries to connect with a standby of the wrong
version they'll fail long before this, and even if they didn't they'd
just get an error saying there's not enough data in the message.
pg_basebackup, pg_recvlogical and pg_receivexlog don't send hot
standby feedback messages.

I'm posting this now because Petr was interested in it for his work on
logical replication. I'll be following it a subsequent patch to allow
logical slot creation on physical replicas if they're using a slot to
talk to the master and have hot_standby_feedback enabled, just so you
know the direction this is going in.

Passes 'make check', src/bin/pg_basebackup and src/test/recovery TAP
tests. I haven't added specific tests for this functionality since
there isn't (yet) a way to set catalog_xmin separately on a physical
standby without a dedicated test module.

The logical decoding timeline following patch[1] is also relevant for
this, since it is required for logical decoding on standby to survive
promotion.

Next steps will be:


* Expose information about whether or not a slot is in use from walreceiver.c

* Allow logical slots to be created on replicas if
hot_standby_feedback is enabled and a logical slot is in use. Return
null as the exported snapshot ID when creating over the walsender
protocol, since we can't export a snapshot on a standby due to the
need to allocate an xid. (That can be addressed separately).

* Now that recovery tests are possible, write the recovery test suite
for logical decoding on standby

* Auto-drop replication slots when dropping a database in dbase_redo

* Add a safety mechanism to stop users disabling hs feedback on the
replica or stopping using a physical slot to the upstream while
logical slots exist on the replica. Or mark such logical slots as
unusable using a new persistent field on the slot. Not trivial because
we must allow crash recovery without a slot to upstream (obviously),
and should preferably also allow fallback to archive recovery when
server with slot is temporarily unreachable. Must also consider
handling of physical slot with catalog_xmin set from cascading
physical replica with logical slots on it.

* Extend the logical replication patch to add support for following
physical failover using this functionality, likely in 11.0.

[1] https://commitfest.postgresql.org/10/779/


I'll add this to the next CF, but I realise the inability to test it
standalone may mean it can only be committed as part of a series along
with full support for logical decoding from standby.

-- 
 Craig Ringer                   http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services
From 041682447587190e1d5b8ebea437f1bb33c33a84 Mon Sep 17 00:00:00 2001
From: Craig Ringer <cr...@2ndquadrant.com>
Date: Wed, 13 Jul 2016 15:23:08 +0800
Subject: [PATCH] Send catalog_xmin in hot standby feedback

Add catalog_xmin to the to the hot standby feedback protocol so a read replica
that has logical slots can use its physical slot to the master to hold down the
master's catalog_xmin. This lets a replica prevent vacuuming of catalog tuples
still required by the replica's logical slots.

This functionality is required, but not sufficient, to allow logical decoding
to work on a replia. It is also useful for handling physical failover of
database instances that serve as upstreams for logical decoding clients.
---
 contrib/pg_visibility/pg_visibility.c     |  4 +-
 contrib/pgstattuple/pgstatapprox.c        |  2 +-
 doc/src/sgml/protocol.sgml                | 33 ++++++++++--
 src/backend/access/transam/xlog.c         | 36 ++++++++++++-
 src/backend/catalog/index.c               |  2 +-
 src/backend/commands/analyze.c            |  2 +-
 src/backend/commands/vacuum.c             |  4 +-
 src/backend/replication/logical/logical.c | 20 +++++--
 src/backend/replication/walreceiver.c     | 54 ++++++++++++++++---
 src/backend/replication/walsender.c       | 90 ++++++++++++++++++++++---------
 src/backend/storage/ipc/procarray.c       | 51 +++++++++---------
 src/include/access/xlog.h                 |  1 +
 src/include/replication/slot.h            | 30 +++++++++--
 src/include/storage/procarray.h           |  2 +-
 14 files changed, 252 insertions(+), 79 deletions(-)

diff --git a/contrib/pg_visibility/pg_visibility.c b/contrib/pg_visibility/pg_visibility.c
index 7034066..318caab 100644
--- a/contrib/pg_visibility/pg_visibility.c
+++ b/contrib/pg_visibility/pg_visibility.c
@@ -523,7 +523,7 @@ collect_corrupt_items(Oid relid, bool all_visible, bool all_frozen)
 	if (all_visible)
 	{
 		/* Don't pass rel; that will fail in recovery. */
-		OldestXmin = GetOldestXmin(NULL, true);
+		OldestXmin = GetOldestXmin(NULL, true, false);
 	}
 
 	rel = relation_open(relid, AccessShareLock);
@@ -646,7 +646,7 @@ collect_corrupt_items(Oid relid, bool all_visible, bool all_frozen)
 				 * a buffer lock. And this shouldn't happen often, so it's
 				 * worth being careful so as to avoid false positives.
 				 */
-				RecomputedOldestXmin = GetOldestXmin(NULL, true);
+				RecomputedOldestXmin = GetOldestXmin(NULL, true, false);
 
 				if (!TransactionIdPrecedes(OldestXmin, RecomputedOldestXmin))
 					record_corrupt_item(items, &tuple.t_data->t_ctid);
diff --git a/contrib/pgstattuple/pgstatapprox.c b/contrib/pgstattuple/pgstatapprox.c
index a49ff54..3674c05 100644
--- a/contrib/pgstattuple/pgstatapprox.c
+++ b/contrib/pgstattuple/pgstatapprox.c
@@ -67,7 +67,7 @@ statapprox_heap(Relation rel, output_type *stat)
 	TransactionId OldestXmin;
 	uint64		misc_count = 0;
 
-	OldestXmin = GetOldestXmin(rel, true);
+	OldestXmin = GetOldestXmin(rel, true, false);
 	bstrategy = GetAccessStrategy(BAS_BULKREAD);
 
 	nblocks = RelationGetNumberOfBlocks(rel);
diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index 68b0941..c4e41ca 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -1783,10 +1783,11 @@ The commands accepted in walsender mode are:
       </term>
       <listitem>
       <para>
-          The standby's current xmin. This may be 0, if the standby is
-          sending notification that Hot Standby feedback will no longer
-          be sent on this connection. Later non-zero messages may
-          reinitiate the feedback mechanism.
+          The standby's current global xmin, excluding the catalog_xmin from any
+          replication slots. If both this value and the following
+          catalog_xmin are 0 this is treated as a notification that Hot Standby
+          feedback will no longer be sent on this connection. Later non-zero
+          messages may reinitiate the feedback mechanism.
       </para>
       </listitem>
       </varlistentry>
@@ -1796,7 +1797,29 @@ The commands accepted in walsender mode are:
       </term>
       <listitem>
       <para>
-          The standby's current epoch.
+          The epoch of the global xmin xid on the standby.
+      </para>
+      </listitem>
+      </varlistentry>
+      <varlistentry>
+      <term>
+          Int32
+      </term>
+      <listitem>
+      <para>
+          The lowest catalog_xmin of any replication slots on the standby. Set to 0
+          if no catalog_xmin exists on the standby or if hot standby feedback is being
+          disabled. New in 10.0.
+      </para>
+      </listitem>
+      </varlistentry>
+      <varlistentry>
+      <term>
+          Int32
+      </term>
+      <listitem>
+      <para>
+          The epoch of the catalog_xmin xid on the standby. New in 10.0.
       </para>
       </listitem>
       </varlistentry>
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 0b991bb..550bb40 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -7962,6 +7962,38 @@ GetNextXidAndEpoch(TransactionId *xid, uint32 *epoch)
 }
 
 /*
+ * Check that the provided xmin/epoch are sane, that is, not in the future
+ * and not so far back as to be already wrapped around.
+ *
+ * Epoch of nextXid should be same as standby, or if the counter has
+ * wrapped, then one greater than standby.
+ */
+bool
+TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
+{
+	TransactionId nextXid;
+	uint32		nextEpoch;
+
+	GetNextXidAndEpoch(&nextXid, &nextEpoch);
+
+	if (xid <= nextXid)
+	{
+		if (epoch != nextEpoch)
+			return false;
+	}
+	else
+	{
+		if (epoch + 1 != nextEpoch)
+			return false;
+	}
+
+	if (!TransactionIdPrecedesOrEquals(xid, nextXid))
+		return false;				/* epoch OK, but it's wrapped around */
+
+	return true;
+}
+
+/*
  * This must be called ONCE during postmaster or standalone-backend shutdown
  */
 void
@@ -8565,7 +8597,7 @@ CreateCheckPoint(int flags)
 	 * StartupSUBTRANS hasn't been called yet.
 	 */
 	if (!RecoveryInProgress())
-		TruncateSUBTRANS(GetOldestXmin(NULL, false));
+		TruncateSUBTRANS(GetOldestXmin(NULL, false, false));
 
 	/* Real work is done, but log and update stats before releasing lock. */
 	LogCheckpointEnd(false);
@@ -8904,7 +8936,7 @@ CreateRestartPoint(int flags)
 	 * this because StartupSUBTRANS hasn't been called yet.
 	 */
 	if (EnableHotStandby)
-		TruncateSUBTRANS(GetOldestXmin(NULL, false));
+		TruncateSUBTRANS(GetOldestXmin(NULL, false, false));
 
 	/* Real work is done, but log and update before releasing lock. */
 	LogCheckpointEnd(true);
diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c
index b0b43cf..fd86530 100644
--- a/src/backend/catalog/index.c
+++ b/src/backend/catalog/index.c
@@ -2255,7 +2255,7 @@ IndexBuildHeapRangeScan(Relation heapRelation,
 	{
 		snapshot = SnapshotAny;
 		/* okay to ignore lazy VACUUMs here */
-		OldestXmin = GetOldestXmin(heapRelation, true);
+		OldestXmin = GetOldestXmin(heapRelation, true, false);
 	}
 
 	scan = heap_beginscan_strat(heapRelation,	/* relation */
diff --git a/src/backend/commands/analyze.c b/src/backend/commands/analyze.c
index c617abb..2170566 100644
--- a/src/backend/commands/analyze.c
+++ b/src/backend/commands/analyze.c
@@ -992,7 +992,7 @@ acquire_sample_rows(Relation onerel, int elevel,
 	totalblocks = RelationGetNumberOfBlocks(onerel);
 
 	/* Need a cutoff xmin for HeapTupleSatisfiesVacuum */
-	OldestXmin = GetOldestXmin(onerel, true);
+	OldestXmin = GetOldestXmin(onerel, true, false);
 
 	/* Prepare for sampling block numbers */
 	BlockSampler_Init(&bs, totalblocks, targrows, random());
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index 58bbf55..79ec690 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -497,7 +497,7 @@ vacuum_set_xid_limits(Relation rel,
 	 * always an independent transaction.
 	 */
 	*oldestXmin =
-		TransactionIdLimitedForOldSnapshots(GetOldestXmin(rel, true), rel);
+		TransactionIdLimitedForOldSnapshots(GetOldestXmin(rel, true, false), rel);
 
 	Assert(TransactionIdIsNormal(*oldestXmin));
 
@@ -909,7 +909,7 @@ vac_update_datfrozenxid(void)
 	 * committed pg_class entries for new tables; see AddNewRelationTuple().
 	 * So we cannot produce a wrong minimum by starting with this.
 	 */
-	newFrozenXid = GetOldestXmin(NULL, true);
+	newFrozenXid = GetOldestXmin(NULL, true, false);
 
 	/*
 	 * Similarly, initialize the MultiXact "min" with the value that would be
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 1512be5..85f8f0e 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -88,16 +88,28 @@ CheckLogicalDecodingRequirements(void)
 				 errmsg("logical decoding requires a database connection")));
 
 	/* ----
-	 * TODO: We got to change that someday soon...
+	 * To allow logical decoding on a standby we must ensure that:
 	 *
-	 * There's basically three things missing to allow this:
 	 * 1) We need to be able to correctly and quickly identify the timeline a
-	 *	  LSN belongs to
+	 *	  LSN belongs to so we can follow timeline switches
+	 *
 	 * 2) We need to force hot_standby_feedback to be enabled at all times so
 	 *	  the primary cannot remove rows we need.
-	 * 3) support dropping replication slots referring to a database, in
+	 *
+	 * 3) ensure a replication slot is used to connect to the upstream so
+	 *    we know the catalog_xmin is persistent even over connection loss.
+	 *
+	 * 4) support dropping replication slots referring to a database, in
 	 *	  dbase_redo. There can't be any active ones due to HS recovery
 	 *	  conflicts, so that should be relatively easy.
+	 *
+	 * This means we can't allow logical decoding from a standby that's only
+	 * configured for archive recovery. It would be OK to run temporarily in
+	 * archive recovery during connectivity drops so long as we have a slot
+	 * with a catalog_xmin set; it'd cause extra bloat on the master until we
+	 * can reconnect, but that's unavoidable. We don't currently have any
+	 * book-keeping about whether we have a slot unless it's in active use,
+	 * though, so we have to assume there's no slot.
 	 * ----
 	 */
 	if (RecoveryInProgress())
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 413ee3a..8e682bb 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -1161,8 +1161,8 @@ XLogWalRcvSendHSFeedback(bool immed)
 {
 	TimestampTz now;
 	TransactionId nextXid;
-	uint32		nextEpoch;
-	TransactionId xmin;
+	uint32		nextEpoch, xmin_epoch, catalog_xmin_epoch, slot_xmin;
+	TransactionId xmin, catalog_xmin;
 	static TimestampTz sendTime = 0;
 	static bool master_has_standby_xmin = false;
 
@@ -1203,29 +1203,67 @@ XLogWalRcvSendHSFeedback(bool immed)
 	 * everything else has been checked.
 	 */
 	if (hot_standby_feedback)
-		xmin = GetOldestXmin(NULL, false);
+	{
+		/*
+		 * Usually GetOldestXmin() would include the catalog_xmin in its
+		 * calculations, but we don't want to hold upstream back from vacuuming
+		 * normal user table tuples just because they're within the
+		 * catalog_xmin horizon of logical replication slots on this standby.
+		 * Instead we report the catalog_xmin to the upstream separately.
+		 *
+		 * (XXX-REVIEW-ATTENTION)
+		 * By doing the GetOldestXmin(...) lookup separately we might produce
+		 * an xmin and catalog xmin slightly later than what was in the
+		 * procarray at the time we previously called GetOldestXmin() for
+		 * session state, since it can be advanced in-between.  That's harmless
+		 * though, since we only move the position backwards here.
+		 *
+		 * The alternative here is to extend GetOldestXmin() to take an
+		 * out-param to report the slot catalog xmin. That really just
+		 * duplicates ProcArrayGetReplicationSlotXmin but means we can grab it
+		 * within a single ProcArray lock. A variant of GetOldestXmin that
+		 * takes an already-locked flag would work too, but would hold the lock
+		 * across parts of GetOldestXmin() that currently don't retain it.
+		 * (XXX-REVIEW-ATTENTION)
+		 */
+		xmin = GetOldestXmin(NULL, false, true);
+		ProcArrayGetReplicationSlotXmin(&slot_xmin, &catalog_xmin);
+
+		if (TransactionIdIsValid(slot_xmin) &&
+			NormalTransactionIdPrecedes(slot_xmin, xmin))
+			xmin = slot_xmin;
+	}
 	else
+	{
 		xmin = InvalidTransactionId;
+		catalog_xmin = InvalidTransactionId;
+	}
 
 	/*
 	 * Get epoch and adjust if nextXid and oldestXmin are different sides of
 	 * the epoch boundary.
 	 */
 	GetNextXidAndEpoch(&nextXid, &nextEpoch);
+	xmin_epoch = nextEpoch;
 	if (nextXid < xmin)
-		nextEpoch--;
+		xmin_epoch --;
+	catalog_xmin_epoch = nextEpoch;
+	if (nextXid < catalog_xmin)
+		catalog_xmin_epoch --;
 
-	elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u",
-		 xmin, nextEpoch);
+	elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u catalog_xmin %u catalog_xmin_epoch %u",
+		 xmin, xmin_epoch, catalog_xmin, catalog_xmin_epoch);
 
 	/* Construct the message and send it. */
 	resetStringInfo(&reply_message);
 	pq_sendbyte(&reply_message, 'h');
 	pq_sendint64(&reply_message, GetCurrentIntegerTimestamp());
 	pq_sendint(&reply_message, xmin, 4);
-	pq_sendint(&reply_message, nextEpoch, 4);
+	pq_sendint(&reply_message, xmin_epoch, 4);
+	pq_sendint(&reply_message, catalog_xmin, 4);
+	pq_sendint(&reply_message, catalog_xmin_epoch, 4);
 	walrcv_send(reply_message.data, reply_message.len);
-	if (TransactionIdIsValid(xmin))
+	if (TransactionIdIsValid(xmin) || TransactionIdIsValid(catalog_xmin))
 		master_has_standby_xmin = true;
 	else
 		master_has_standby_xmin = false;
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 1ea2a5c..a1ff16f 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1531,6 +1531,11 @@ PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
 	 * be energy wasted - the worst lost information can do here is give us
 	 * wrong information in a statistics view - we'll just potentially be more
 	 * conservative in removing files.
+	 *
+	 * We don't have to do any effective_xmin / effective_catalog_xmin testing
+	 * here either, like for LogicalConfirmReceivedLocation. If we received
+	 * the xmin and catalog_xmin from downstream replication slots we know they
+	 * were already confirmed there,
 	 */
 }
 
@@ -1593,7 +1598,7 @@ ProcessStandbyReplyMessage(void)
 
 /* compute new replication slot xmin horizon if needed */
 static void
-PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin)
+PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
 {
 	bool		changed = false;
 	ReplicationSlot *slot = MyReplicationSlot;
@@ -1614,6 +1619,22 @@ PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin)
 		slot->data.xmin = feedbackXmin;
 		slot->effective_xmin = feedbackXmin;
 	}
+	/*
+	 * If the physical slot is relaying catalog_xmin for logical replication
+	 * slots on the replica it's safe to act on catalog_xmin advances
+	 * immediately too. The replica will only send a new catalog_xmin via
+	 * feedback when it advances its effective_catalog_xmin, so it's done the
+	 * delay-until-confirmed dance for us and knows it won't need the data
+	 * we're protecting from vacuum again.
+	 */
+	if (!TransactionIdIsNormal(slot->data.catalog_xmin) ||
+		!TransactionIdIsNormal(feedbackCatalogXmin) ||
+		TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin))
+	{
+		changed = true;
+		slot->data.catalog_xmin = feedbackCatalogXmin;
+		slot->effective_catalog_xmin = feedbackCatalogXmin;
+	}
 	SpinLockRelease(&slot->mutex);
 
 	if (changed)
@@ -1629,10 +1650,10 @@ PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin)
 static void
 ProcessStandbyHSFeedbackMessage(void)
 {
-	TransactionId nextXid;
-	uint32		nextEpoch;
 	TransactionId feedbackXmin;
 	uint32		feedbackEpoch;
+	TransactionId feedbackCatalogXmin;
+	uint32		feedbackCatalogEpoch;
 
 	/*
 	 * Decipher the reply message. The caller already consumed the msgtype
@@ -1641,43 +1662,50 @@ ProcessStandbyHSFeedbackMessage(void)
 	(void) pq_getmsgint64(&reply_message);		/* sendTime; not used ATM */
 	feedbackXmin = pq_getmsgint(&reply_message, 4);
 	feedbackEpoch = pq_getmsgint(&reply_message, 4);
+	/*
+	 * A 10.0+ standby's walsender passes the lowest catalog xmin of any
+	 * replication slot up to the master. There's need to handle backward
+	 * compatibility here as only the same major version Pg has any business
+	 * sending us hot standby feedback. pg_receivexlog, pg_basebackup etc don't
+	 * use hs feedback.
+	 */
+	feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
+	feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);
 
-	elog(DEBUG2, "hot standby feedback xmin %u epoch %u",
+	elog(DEBUG2, "hot standby feedback xmin %u catalog_xmin %u epoch %u",
 		 feedbackXmin,
+		 feedbackCatalogXmin,
 		 feedbackEpoch);
 
-	/* Unset WalSender's xmin if the feedback message value is invalid */
-	if (!TransactionIdIsNormal(feedbackXmin))
+	/*
+	 * Unset WalSender's xmins if the feedback message values are invalid.
+	 * This happens when the downstream turned hot_standby_feedback off.
+	 */
+	if (!TransactionIdIsNormal(feedbackXmin)
+		&& !TransactionIdIsNormal(feedbackCatalogXmin))
 	{
 		MyPgXact->xmin = InvalidTransactionId;
 		if (MyReplicationSlot != NULL)
-			PhysicalReplicationSlotNewXmin(feedbackXmin);
+			PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
 		return;
 	}
 
 	/*
 	 * Check that the provided xmin/epoch are sane, that is, not in the future
 	 * and not so far back as to be already wrapped around.  Ignore if not.
-	 *
-	 * Epoch of nextXid should be same as standby, or if the counter has
-	 * wrapped, then one greater than standby.
 	 */
-	GetNextXidAndEpoch(&nextXid, &nextEpoch);
-
-	if (feedbackXmin <= nextXid)
+	if (TransactionIdIsNormal(feedbackXmin) &&
+		!TransactionIdInRecentPast(feedbackXmin, feedbackEpoch))
 	{
-		if (feedbackEpoch != nextEpoch)
-			return;
+		return;
 	}
-	else
+
+	if (TransactionIdIsNormal(feedbackCatalogXmin) &&
+		!TransactionIdInRecentPast(feedbackCatalogXmin, feedbackCatalogEpoch))
 	{
-		if (feedbackEpoch + 1 != nextEpoch)
-			return;
+		return;
 	}
 
-	if (!TransactionIdPrecedesOrEquals(feedbackXmin, nextXid))
-		return;					/* epoch OK, but it's wrapped around */
-
 	/*
 	 * Set the WalSender's xmin equal to the standby's requested xmin, so that
 	 * the xmin will be taken into account by GetOldestXmin.  This will hold
@@ -1701,15 +1729,29 @@ ProcessStandbyHSFeedbackMessage(void)
 	 * already since a VACUUM could have just finished calling GetOldestXmin.)
 	 *
 	 * If we're using a replication slot we reserve the xmin via that,
-	 * otherwise via the walsender's PGXACT entry.
+	 * otherwise via the walsender's PGXACT entry. We can only track the
+	 * catalog xmin separately when using a slot, so we store the least
+	 * of the two provided when not using a slot.
 	 *
 	 * XXX: It might make sense to generalize the ephemeral slot concept and
 	 * always use the slot mechanism to handle the feedback xmin.
 	 */
 	if (MyReplicationSlot != NULL)		/* XXX: persistency configurable? */
-		PhysicalReplicationSlotNewXmin(feedbackXmin);
+	{
+		PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
+	}
 	else
-		MyPgXact->xmin = feedbackXmin;
+	{
+		if (TransactionIdIsNormal(feedbackCatalogXmin)
+			&& TransactionIdPrecedes(feedbackCatalogXmin, feedbackXmin))
+		{
+			MyPgXact->xmin = feedbackCatalogXmin;
+		}
+		else
+		{
+			MyPgXact->xmin = feedbackXmin;
+		}
+	}
 }
 
 /*
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index e5d487d..99535b4 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -1298,17 +1298,18 @@ TransactionIdIsActive(TransactionId xid)
  * process can set its xmin based on transactions that are no longer running
  * in the master but are still being replayed on the standby, thus possibly
  * making the GetOldestXmin reading go backwards.  In this case there is a
- * possibility that we lose data that the standby would like to have, but
- * there is little we can do about that --- data is only protected if the
- * walsender runs continuously while queries are executed on the standby.
- * (The Hot Standby code deals with such cases by failing standby queries
- * that needed to access already-removed data, so there's no integrity bug.)
+ * possibility that we lose data that the standby would like to have
+ * unless the standby uses a replication slot to make its xmin persistent
+ * even when it isn't connected. The Hot Standby code deals with such cases by
+ * failing standby queries that needed to access already-removed data, so
+ * there's no integrity bug.
+ *
  * The return value is also adjusted with vacuum_defer_cleanup_age, so
  * increasing that setting on the fly is another easy way to make
  * GetOldestXmin() move backwards, with no consequences for data integrity.
  */
 TransactionId
-GetOldestXmin(Relation rel, bool ignoreVacuum)
+GetOldestXmin(Relation rel, bool ignoreVacuum, bool ignoreSlots)
 {
 	ProcArrayStruct *arrayP = procArray;
 	TransactionId result;
@@ -1426,25 +1427,27 @@ GetOldestXmin(Relation rel, bool ignoreVacuum)
 			result = FirstNormalTransactionId;
 	}
 
-	/*
-	 * Check whether there are replication slots requiring an older xmin.
-	 */
-	if (TransactionIdIsValid(replication_slot_xmin) &&
-		NormalTransactionIdPrecedes(replication_slot_xmin, result))
-		result = replication_slot_xmin;
-
-	/*
-	 * After locks have been released and defer_cleanup_age has been applied,
-	 * check whether we need to back up further to make logical decoding
-	 * possible. We need to do so if we're computing the global limit (rel =
-	 * NULL) or if the passed relation is a catalog relation of some kind.
-	 */
-	if ((rel == NULL ||
-		 RelationIsAccessibleInLogicalDecoding(rel)) &&
-		TransactionIdIsValid(replication_slot_catalog_xmin) &&
-		NormalTransactionIdPrecedes(replication_slot_catalog_xmin, result))
-		result = replication_slot_catalog_xmin;
+	if (!ignoreSlots)
+	{
+		/*
+		 * Check whether there are replication slots requiring an older xmin.
+		 */
+		if (TransactionIdIsValid(replication_slot_xmin) &&
+			NormalTransactionIdPrecedes(replication_slot_xmin, result))
+			result = replication_slot_xmin;
 
+		/*
+		 * After locks have been released and defer_cleanup_age has been applied,
+		 * check whether we need to back up further to make logical decoding
+		 * possible. We need to do so if we're computing the global limit (rel =
+		 * NULL) or if the passed relation is a catalog relation of some kind.
+		 */
+		if ((rel == NULL ||
+			 RelationIsAccessibleInLogicalDecoding(rel)) &&
+			TransactionIdIsValid(replication_slot_catalog_xmin) &&
+			NormalTransactionIdPrecedes(replication_slot_catalog_xmin, result))
+			result = replication_slot_catalog_xmin;
+	}
 	return result;
 }
 
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 14b7f7f..27990fa 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -262,6 +262,7 @@ extern XLogRecPtr GetRedoRecPtr(void);
 extern XLogRecPtr GetInsertRecPtr(void);
 extern XLogRecPtr GetFlushRecPtr(void);
 extern void GetNextXidAndEpoch(TransactionId *xid, uint32 *epoch);
+extern bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
 extern void RemovePromoteSignalFiles(void);
 
 extern bool CheckPromoteSignal(void);
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index e00562d..d777644 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -59,11 +59,24 @@ typedef struct ReplicationSlotPersistentData
 	 * xmin horizon for catalog tuples
 	 *
 	 * NB: This may represent a value that hasn't been written to disk yet;
-	 * see notes for effective_xmin, below.
+	 * see notes for effective_catalog_xmin, below.
 	 */
 	TransactionId catalog_xmin;
 
-	/* oldest LSN that might be required by this replication slot */
+	/*
+	 * oldest LSN that might be required by this replication slot
+	 *
+	 * For logical slots this is the location of the most recent
+	 * xl_running_xacts record prior to the start of the oldest xact still
+	 * in-progress as of the confirmed_flush lsn. We must resume decoding at
+	 * this point to ensure we see every change made by every xact that we
+	 * might have to replay to the client.
+	 *
+	 * For physical slots this is the confirmed flush pointer from
+	 * the most recent standby reply message.
+	 *
+	 * WAL older than restart_lsn may not be removed.
+	 */
 	XLogRecPtr	restart_lsn;
 
 	/*
@@ -71,6 +84,14 @@ typedef struct ReplicationSlotPersistentData
 	 * start_lsn point in case the client doesn't specify one, and also as a
 	 * safety measure to jump forwards in case the client specifies a
 	 * start_lsn that's further in the past than this value.
+	 *
+	 * We may skip past the client's requested start point to our
+	 * confirmed_flush point when the client previously sent us feedback
+	 * in response to keepalives, empty transactions that caused no
+	 * client-side writes and no replication identifier update, then
+	 * crashed. On reconnect the client won't know it's further ahead than
+	 * it remembers, but we know it won't need this data to be processed
+	 * since it didn't do anything with it the first time.
 	 */
 	XLogRecPtr	confirmed_flush;
 
@@ -104,8 +125,9 @@ typedef struct ReplicationSlot
 	 * too soon, but the worst consequence we might encounter there is
 	 * unwanted query cancellations on the standby.  Thus, for logical
 	 * decoding, this value represents the latest xmin that has actually been
-	 * written to disk, whereas for streaming replication, it's just the same
-	 * as the persistent value (data.xmin).
+	 * written to disk and can be safely used by vacuum, etc, whereas for
+	 * streaming replication it's just the same as the persistent value
+	 * (data.xmin).
 	 */
 	TransactionId effective_xmin;
 	TransactionId effective_catalog_xmin;
diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h
index dd37c0c..a803268 100644
--- a/src/include/storage/procarray.h
+++ b/src/include/storage/procarray.h
@@ -53,7 +53,7 @@ extern RunningTransactions GetRunningTransactionData(void);
 
 extern bool TransactionIdIsInProgress(TransactionId xid);
 extern bool TransactionIdIsActive(TransactionId xid);
-extern TransactionId GetOldestXmin(Relation rel, bool ignoreVacuum);
+extern TransactionId GetOldestXmin(Relation rel, bool ignoreVacuum, bool ignoreSlots);
 extern TransactionId GetOldestActiveTransactionId(void);
 extern TransactionId GetOldestSafeDecodingTransactionId(void);
 
-- 
2.5.5

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to