From 3ccc7220a18c028e535d1e7617b8997a17e586e4 Mon Sep 17 00:00:00 2001
From: Peter Geoghegan <pg@bowt.ie>
Date: Sun, 12 Jun 2022 15:46:08 -0700
Subject: [PATCH v1 1/4] Add page-level freezing to VACUUM.

Teach VACUUM to decide on whether or not to trigger freezing at the
level of whole heap pages, not individual tuple fields.  OldestXmin is
now treated as the cutoff for freezing eligibility in all cases, while
FreezeLimit is used to trigger freezing at the level of each page (we
now freeze all eligible XIDs on a page when freezing is triggered for
the page).

This approach decouples the question of _how_ VACUUM could/will freeze a
given heap page (which of its XIDs are eligible to be frozen) from the
question of whether it actually makes sense to do so right now.

Just adding page-level freezing does not change all that much on its
own: VACUUM will still typically freeze very lazily, since we're only
forcing freezing of all of a page's eligible tuples when we decide to
freeze at least one (on the basis of XID age and FreezeLimit).  For now
VACUUM still freezes everything almost as lazily as it always has.
Later work will teach VACUUM to apply an alternative eager freezing
strategy that triggers page-level freezing earlier, based on additional
criteria.
---
 src/include/access/heapam.h          |   4 +-
 src/include/access/heapam_xlog.h     |  37 +++++-
 src/backend/access/heap/heapam.c     | 171 ++++++++++++++++-----------
 src/backend/access/heap/vacuumlazy.c | 152 ++++++++++++++----------
 4 files changed, 230 insertions(+), 134 deletions(-)

diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index abf62d9df..c201f8ae6 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -167,8 +167,8 @@ extern void heap_inplace_update(Relation relation, HeapTuple tuple);
 extern bool heap_freeze_tuple(HeapTupleHeader tuple,
 							  TransactionId relfrozenxid, TransactionId relminmxid,
 							  TransactionId cutoff_xid, TransactionId cutoff_multi);
-extern bool heap_tuple_would_freeze(HeapTupleHeader tuple, TransactionId cutoff_xid,
-									MultiXactId cutoff_multi,
+extern bool heap_tuple_would_freeze(HeapTupleHeader tuple,
+									TransactionId limit_xid, MultiXactId limit_multi,
 									TransactionId *relfrozenxid_out,
 									MultiXactId *relminmxid_out);
 extern bool heap_tuple_needs_eventual_freeze(HeapTupleHeader tuple);
diff --git a/src/include/access/heapam_xlog.h b/src/include/access/heapam_xlog.h
index 1705e736b..40556271d 100644
--- a/src/include/access/heapam_xlog.h
+++ b/src/include/access/heapam_xlog.h
@@ -330,6 +330,38 @@ typedef struct xl_heap_freeze_tuple
 	uint8		frzflags;
 } xl_heap_freeze_tuple;
 
+/*
+ * State used by VACUUM to track what the oldest extant XID/MXID will become
+ * when determing whether and how to freeze a page's heap tuples via calls to
+ * heap_prepare_freeze_tuple.
+ *
+ * The relfrozenxid_out and relminmxid_out fields are the current target
+ * relfrozenxid and relminmxid for VACUUM caller's heap rel.  Any and all
+ * unfrozen XIDs or MXIDs that remain in caller's rel after VACUUM finishes
+ * _must_ have values >= the final relfrozenxid/relminmxid values in pg_class.
+ * This includes XIDs that remain as MultiXact members from any tuple's xmax.
+ * Each heap_prepare_freeze_tuple call pushes back relfrozenxid_out and/or
+ * relminmxid_out as needed to avoid unsafe values in rel's authoritative
+ * pg_class tuple.
+ *
+ * Alternative "no freeze" variants of relfrozenxid_nofreeze_out and
+ * relminmxid_nofreeze_out must also be maintained for !freeze pages.
+ */
+typedef struct page_frozenxid_tracker
+{
+	/* Is heap_prepare_freeze_tuple caller required to freeze page? */
+	bool		freeze;
+
+	/* Values used when page is to be frozen based on freeze plans */
+	TransactionId relfrozenxid_out;
+	MultiXactId relminmxid_out;
+
+	/* Used by caller for '!freeze' pages */
+	TransactionId relfrozenxid_nofreeze_out;
+	MultiXactId relminmxid_nofreeze_out;
+
+} page_frozenxid_tracker;
+
 /*
  * This is what we need to know about a block being frozen during vacuum
  *
@@ -409,10 +441,11 @@ extern bool heap_prepare_freeze_tuple(HeapTupleHeader tuple,
 									  TransactionId relminmxid,
 									  TransactionId cutoff_xid,
 									  TransactionId cutoff_multi,
+									  TransactionId limit_xid,
+									  MultiXactId limit_multi,
 									  xl_heap_freeze_tuple *frz,
 									  bool *totally_frozen,
-									  TransactionId *relfrozenxid_out,
-									  MultiXactId *relminmxid_out);
+									  page_frozenxid_tracker *xtrack);
 extern void heap_execute_freeze_tuple(HeapTupleHeader tuple,
 									  xl_heap_freeze_tuple *xlrec_tp);
 extern XLogRecPtr log_heap_visible(RelFileLocator rlocator, Buffer heap_buffer,
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index aab8d6fa4..d6aea370f 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -6431,26 +6431,15 @@ FreezeMultiXactId(MultiXactId multi, uint16 t_infomask,
  * will be totally frozen after these operations are performed and false if
  * more freezing will eventually be required.
  *
- * Caller must set frz->offset itself, before heap_execute_freeze_tuple call.
+ * Caller must initialize xtrack fields for page as a whole before calling
+ * here with first tuple for the page.  See page_frozenxid_tracker comments.
+ *
+ * Caller must set frz->offset itself if heap_execute_freeze_tuple is called.
  *
  * It is assumed that the caller has checked the tuple with
  * HeapTupleSatisfiesVacuum() and determined that it is not HEAPTUPLE_DEAD
  * (else we should be removing the tuple, not freezing it).
  *
- * The *relfrozenxid_out and *relminmxid_out arguments are the current target
- * relfrozenxid and relminmxid for VACUUM caller's heap rel.  Any and all
- * unfrozen XIDs or MXIDs that remain in caller's rel after VACUUM finishes
- * _must_ have values >= the final relfrozenxid/relminmxid values in pg_class.
- * This includes XIDs that remain as MultiXact members from any tuple's xmax.
- * Each call here pushes back *relfrozenxid_out and/or *relminmxid_out as
- * needed to avoid unsafe final values in rel's authoritative pg_class tuple.
- *
- * NB: cutoff_xid *must* be <= VACUUM's OldestXmin, to ensure that any
- * XID older than it could neither be running nor seen as running by any
- * open transaction.  This ensures that the replacement will not change
- * anyone's idea of the tuple state.
- * Similarly, cutoff_multi must be <= VACUUM's OldestMxact.
- *
  * NB: This function has side effects: it might allocate a new MultiXactId.
  * It will be set as tuple's new xmax when our *frz output is processed within
  * heap_execute_freeze_tuple later on.  If the tuple is in a shared buffer
@@ -6463,34 +6452,46 @@ bool
 heap_prepare_freeze_tuple(HeapTupleHeader tuple,
 						  TransactionId relfrozenxid, TransactionId relminmxid,
 						  TransactionId cutoff_xid, TransactionId cutoff_multi,
+						  TransactionId limit_xid, MultiXactId limit_multi,
 						  xl_heap_freeze_tuple *frz, bool *totally_frozen,
-						  TransactionId *relfrozenxid_out,
-						  MultiXactId *relminmxid_out)
+						  page_frozenxid_tracker *xtrack)
 {
 	bool		changed = false;
-	bool		xmax_already_frozen = false;
-	bool		xmin_frozen;
-	bool		freeze_xmax;
+	bool		xmin_already_frozen = false,
+				xmax_already_frozen = false;
+	bool		freeze_xmin,
+				freeze_xmax;
 	TransactionId xid;
 
+	/*
+	 * limit_xid *must* be <= cutoff_xid, to ensure that any XID older than it
+	 * can neither be running nor seen as running by any open transaction.
+	 * This ensures that we only freeze XIDs that are safe to freeze -- those
+	 * that are already unambiguously visible to everybody.
+	 *
+	 * VACUUM calls limit_xid "FreezeLimit", and cutoff_xid "OldestXmin".
+	 * (limit_multi is "MultiXactCutoff", and cutoff_multi "OldestMxact".)
+	 */
+	Assert(TransactionIdPrecedesOrEquals(limit_xid, cutoff_xid));
+	Assert(MultiXactIdPrecedesOrEquals(limit_multi, cutoff_multi));
+
 	frz->frzflags = 0;
 	frz->t_infomask2 = tuple->t_infomask2;
 	frz->t_infomask = tuple->t_infomask;
 	frz->xmax = HeapTupleHeaderGetRawXmax(tuple);
 
 	/*
-	 * Process xmin.  xmin_frozen has two slightly different meanings: in the
-	 * !XidIsNormal case, it means "the xmin doesn't need any freezing" (it's
-	 * already a permanent value), while in the block below it is set true to
-	 * mean "xmin won't need freezing after what we do to it here" (false
-	 * otherwise).  In both cases we're allowed to set totally_frozen, as far
-	 * as xmin is concerned.  Both cases also don't require relfrozenxid_out
-	 * handling, since either way the tuple's xmin will be a permanent value
-	 * once we're done with it.
+	 * Process xmin, while keeping track of whether it's already frozen, or
+	 * will become frozen iff our freeze plan is executed by caller (could be
+	 * neither).
 	 */
 	xid = HeapTupleHeaderGetXmin(tuple);
 	if (!TransactionIdIsNormal(xid))
-		xmin_frozen = true;
+	{
+		freeze_xmin = false;
+		xmin_already_frozen = true;
+		/* No need for relfrozenxid_out handling for already-frozen xmin */
+	}
 	else
 	{
 		if (TransactionIdPrecedes(xid, relfrozenxid))
@@ -6499,8 +6500,8 @@ heap_prepare_freeze_tuple(HeapTupleHeader tuple,
 					 errmsg_internal("found xmin %u from before relfrozenxid %u",
 									 xid, relfrozenxid)));
 
-		xmin_frozen = TransactionIdPrecedes(xid, cutoff_xid);
-		if (xmin_frozen)
+		freeze_xmin = TransactionIdPrecedes(xid, cutoff_xid);
+		if (freeze_xmin)
 		{
 			if (!TransactionIdDidCommit(xid))
 				ereport(ERROR,
@@ -6514,8 +6515,8 @@ heap_prepare_freeze_tuple(HeapTupleHeader tuple,
 		else
 		{
 			/* xmin to remain unfrozen.  Could push back relfrozenxid_out. */
-			if (TransactionIdPrecedes(xid, *relfrozenxid_out))
-				*relfrozenxid_out = xid;
+			if (TransactionIdPrecedes(xid, xtrack->relfrozenxid_out))
+				xtrack->relfrozenxid_out = xid;
 		}
 	}
 
@@ -6526,7 +6527,8 @@ heap_prepare_freeze_tuple(HeapTupleHeader tuple,
 	 * freezing, too.  Also, if a multi needs freezing, we cannot simply take
 	 * it out --- if there's a live updater Xid, it needs to be kept.
 	 *
-	 * Make sure to keep heap_tuple_would_freeze in sync with this.
+	 * Make sure to keep heap_tuple_would_freeze in sync with this.  It needs
+	 * to return true for any tuple that we would force to be frozen here.
 	 */
 	xid = HeapTupleHeaderGetRawXmax(tuple);
 
@@ -6534,7 +6536,7 @@ heap_prepare_freeze_tuple(HeapTupleHeader tuple,
 	{
 		TransactionId newxmax;
 		uint16		flags;
-		TransactionId mxid_oldest_xid_out = *relfrozenxid_out;
+		TransactionId mxid_oldest_xid_out = xtrack->relfrozenxid_out;
 
 		newxmax = FreezeMultiXactId(xid, tuple->t_infomask,
 									relfrozenxid, relminmxid,
@@ -6553,8 +6555,8 @@ heap_prepare_freeze_tuple(HeapTupleHeader tuple,
 			 */
 			Assert(!freeze_xmax);
 			Assert(TransactionIdIsValid(newxmax));
-			if (TransactionIdPrecedes(newxmax, *relfrozenxid_out))
-				*relfrozenxid_out = newxmax;
+			if (TransactionIdPrecedes(newxmax, xtrack->relfrozenxid_out))
+				xtrack->relfrozenxid_out = newxmax;
 
 			/*
 			 * NB -- some of these transformations are only valid because we
@@ -6582,10 +6584,10 @@ heap_prepare_freeze_tuple(HeapTupleHeader tuple,
 			 */
 			Assert(!freeze_xmax);
 			Assert(MultiXactIdIsValid(newxmax));
-			Assert(!MultiXactIdPrecedes(newxmax, *relminmxid_out));
+			Assert(!MultiXactIdPrecedes(newxmax, xtrack->relminmxid_out));
 			Assert(TransactionIdPrecedesOrEquals(mxid_oldest_xid_out,
-												 *relfrozenxid_out));
-			*relfrozenxid_out = mxid_oldest_xid_out;
+												 xtrack->relfrozenxid_out));
+			xtrack->relfrozenxid_out = mxid_oldest_xid_out;
 
 			/*
 			 * We can't use GetMultiXactIdHintBits directly on the new multi
@@ -6613,10 +6615,10 @@ heap_prepare_freeze_tuple(HeapTupleHeader tuple,
 			Assert(!freeze_xmax);
 			Assert(MultiXactIdIsValid(newxmax) && xid == newxmax);
 			Assert(TransactionIdPrecedesOrEquals(mxid_oldest_xid_out,
-												 *relfrozenxid_out));
-			if (MultiXactIdPrecedes(xid, *relminmxid_out))
-				*relminmxid_out = xid;
-			*relfrozenxid_out = mxid_oldest_xid_out;
+												 xtrack->relfrozenxid_out));
+			if (MultiXactIdPrecedes(xid, xtrack->relminmxid_out))
+				xtrack->relminmxid_out = xid;
+			xtrack->relfrozenxid_out = mxid_oldest_xid_out;
 		}
 		else
 		{
@@ -6656,8 +6658,8 @@ heap_prepare_freeze_tuple(HeapTupleHeader tuple,
 		else
 		{
 			freeze_xmax = false;
-			if (TransactionIdPrecedes(xid, *relfrozenxid_out))
-				*relfrozenxid_out = xid;
+			if (TransactionIdPrecedes(xid, xtrack->relfrozenxid_out))
+				xtrack->relfrozenxid_out = xid;
 		}
 	}
 	else if ((tuple->t_infomask & HEAP_XMAX_INVALID) ||
@@ -6673,6 +6675,11 @@ heap_prepare_freeze_tuple(HeapTupleHeader tuple,
 				 errmsg_internal("found xmax %u (infomask 0x%04x) not frozen, not multi, not normal",
 								 xid, tuple->t_infomask)));
 
+	if (freeze_xmin)
+	{
+		Assert(!xmin_already_frozen);
+		Assert(changed);
+	}
 	if (freeze_xmax)
 	{
 		Assert(!xmax_already_frozen);
@@ -6703,11 +6710,7 @@ heap_prepare_freeze_tuple(HeapTupleHeader tuple,
 		 * For Xvac, we ignore the cutoff_xid and just always perform the
 		 * freeze operation.  The oldest release in which such a value can
 		 * actually be set is PostgreSQL 8.4, because old-style VACUUM FULL
-		 * was removed in PostgreSQL 9.0.  Note that if we were to respect
-		 * cutoff_xid here, we'd need to make surely to clear totally_frozen
-		 * when we skipped freezing on that basis.
-		 *
-		 * No need for relfrozenxid_out handling, since we always freeze xvac.
+		 * was removed in PostgreSQL 9.0.
 		 */
 		if (TransactionIdIsNormal(xid))
 		{
@@ -6721,18 +6724,36 @@ heap_prepare_freeze_tuple(HeapTupleHeader tuple,
 			else
 				frz->frzflags |= XLH_FREEZE_XVAC;
 
-			/*
-			 * Might as well fix the hint bits too; usually XMIN_COMMITTED
-			 * will already be set here, but there's a small chance not.
-			 */
+			/* Set XMIN_COMMITTED defensively */
 			Assert(!(tuple->t_infomask & HEAP_XMIN_INVALID));
 			frz->t_infomask |= HEAP_XMIN_COMMITTED;
+
+			/*
+			 * Force freezing any page with an xvac to keep things simple.
+			 * This allows totally_frozen tracking to ignore xvac.
+			 */
 			changed = true;
+			xtrack->freeze = true;
 		}
 	}
 
-	*totally_frozen = (xmin_frozen &&
+	/*
+	 * Determine if this tuple is already totally frozen, or will become
+	 * totally frozen (provided caller executes freeze plan for the page)
+	 */
+	*totally_frozen = ((freeze_xmin || xmin_already_frozen) &&
 					   (freeze_xmax || xmax_already_frozen));
+
+	/*
+	 * Force vacuumlazy.c to freeze page when avoiding it would violate the
+	 * rule that XIDs < limit_xid (and MXIDs < limit_multi) must never remain
+	 */
+	if (!xtrack->freeze && !(xmin_already_frozen && xmax_already_frozen))
+		xtrack->freeze =
+			heap_tuple_would_freeze(tuple, limit_xid, limit_multi,
+									&xtrack->relfrozenxid_nofreeze_out,
+									&xtrack->relminmxid_nofreeze_out);
+
 	return changed;
 }
 
@@ -6785,14 +6806,20 @@ heap_freeze_tuple(HeapTupleHeader tuple,
 	xl_heap_freeze_tuple frz;
 	bool		do_freeze;
 	bool		tuple_totally_frozen;
-	TransactionId relfrozenxid_out = cutoff_xid;
-	MultiXactId relminmxid_out = cutoff_multi;
+	page_frozenxid_tracker dummy;
+
+	dummy.freeze = true;
+	dummy.relfrozenxid_out = cutoff_xid;
+	dummy.relminmxid_out = cutoff_multi;
+	dummy.relfrozenxid_nofreeze_out = cutoff_xid;
+	dummy.relminmxid_nofreeze_out = cutoff_multi;
 
 	do_freeze = heap_prepare_freeze_tuple(tuple,
 										  relfrozenxid, relminmxid,
 										  cutoff_xid, cutoff_multi,
+										  cutoff_xid, cutoff_multi,
 										  &frz, &tuple_totally_frozen,
-										  &relfrozenxid_out, &relminmxid_out);
+										  &dummy);
 
 	/*
 	 * Note that because this is not a WAL-logged operation, we don't need to
@@ -7218,17 +7245,23 @@ heap_tuple_needs_eventual_freeze(HeapTupleHeader tuple)
  * heap_tuple_would_freeze
  *
  * Return value indicates if heap_prepare_freeze_tuple sibling function would
- * freeze any of the XID/XMID fields from the tuple, given the same cutoffs.
- * We must also deal with dead tuples here, since (xmin, xmax, xvac) fields
- * could be processed by pruning away the whole tuple instead of freezing.
+ * force freezing of any of the XID/XMID fields from the tuple, given the same
+ * limits.  We must also deal with dead tuples here, since (xmin, xmax, xvac)
+ * fields could be processed by pruning away the whole tuple instead of
+ * freezing.
+ *
+ * Note: VACUUM refers to limit_xid and limit_multi as "FreezeLimit" and
+ * "MultiXactCutoff" respectively.  These should not be confused with the
+ * absolute cutoffs for freezing.  We just determine whether caller's tuple
+ * and limits trigger heap_prepare_freeze_tuple to force freezing.
  *
  * The *relfrozenxid_out and *relminmxid_out input/output arguments work just
  * like the heap_prepare_freeze_tuple arguments that they're based on.  We
  * never freeze here, which makes tracking the oldest extant XID/MXID simple.
  */
 bool
-heap_tuple_would_freeze(HeapTupleHeader tuple, TransactionId cutoff_xid,
-						MultiXactId cutoff_multi,
+heap_tuple_would_freeze(HeapTupleHeader tuple,
+						TransactionId limit_xid, MultiXactId limit_multi,
 						TransactionId *relfrozenxid_out,
 						MultiXactId *relminmxid_out)
 {
@@ -7242,7 +7275,7 @@ heap_tuple_would_freeze(HeapTupleHeader tuple, TransactionId cutoff_xid,
 	{
 		if (TransactionIdPrecedes(xid, *relfrozenxid_out))
 			*relfrozenxid_out = xid;
-		if (TransactionIdPrecedes(xid, cutoff_xid))
+		if (TransactionIdPrecedes(xid, limit_xid))
 			would_freeze = true;
 	}
 
@@ -7259,7 +7292,7 @@ heap_tuple_would_freeze(HeapTupleHeader tuple, TransactionId cutoff_xid,
 		/* xmax is a non-permanent XID */
 		if (TransactionIdPrecedes(xid, *relfrozenxid_out))
 			*relfrozenxid_out = xid;
-		if (TransactionIdPrecedes(xid, cutoff_xid))
+		if (TransactionIdPrecedes(xid, limit_xid))
 			would_freeze = true;
 	}
 	else if (!MultiXactIdIsValid(multi))
@@ -7282,7 +7315,7 @@ heap_tuple_would_freeze(HeapTupleHeader tuple, TransactionId cutoff_xid,
 
 		if (MultiXactIdPrecedes(multi, *relminmxid_out))
 			*relminmxid_out = multi;
-		if (MultiXactIdPrecedes(multi, cutoff_multi))
+		if (MultiXactIdPrecedes(multi, limit_multi))
 			would_freeze = true;
 
 		/* need to check whether any member of the mxact is old */
@@ -7295,7 +7328,7 @@ heap_tuple_would_freeze(HeapTupleHeader tuple, TransactionId cutoff_xid,
 			Assert(TransactionIdIsNormal(xid));
 			if (TransactionIdPrecedes(xid, *relfrozenxid_out))
 				*relfrozenxid_out = xid;
-			if (TransactionIdPrecedes(xid, cutoff_xid))
+			if (TransactionIdPrecedes(xid, limit_xid))
 				would_freeze = true;
 		}
 		if (nmembers > 0)
@@ -7309,7 +7342,7 @@ heap_tuple_would_freeze(HeapTupleHeader tuple, TransactionId cutoff_xid,
 		{
 			if (TransactionIdPrecedes(xid, *relfrozenxid_out))
 				*relfrozenxid_out = xid;
-			/* heap_prepare_freeze_tuple always freezes xvac */
+			/* heap_prepare_freeze_tuple forces xvac freezing */
 			would_freeze = true;
 		}
 	}
diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c
index b802ed247..75cb31e75 100644
--- a/src/backend/access/heap/vacuumlazy.c
+++ b/src/backend/access/heap/vacuumlazy.c
@@ -169,8 +169,9 @@ typedef struct LVRelState
 
 	/* VACUUM operation's cutoffs for freezing and pruning */
 	TransactionId OldestXmin;
+	MultiXactId OldestMxact;
 	GlobalVisState *vistest;
-	/* VACUUM operation's target cutoffs for freezing XIDs and MultiXactIds */
+	/* Limits on the age of the oldest unfrozen XID and MXID */
 	TransactionId FreezeLimit;
 	MultiXactId MultiXactCutoff;
 	/* Tracks oldest extant XID/MXID for setting relfrozenxid/relminmxid */
@@ -507,6 +508,7 @@ heap_vacuum_rel(Relation rel, VacuumParams *params,
 	 */
 	vacrel->rel_pages = orig_rel_pages = RelationGetNumberOfBlocks(rel);
 	vacrel->OldestXmin = OldestXmin;
+	vacrel->OldestMxact = OldestMxact;
 	vacrel->vistest = GlobalVisTestFor(rel);
 	/* FreezeLimit controls XID freezing (always <= OldestXmin) */
 	vacrel->FreezeLimit = FreezeLimit;
@@ -1554,8 +1556,8 @@ lazy_scan_prune(LVRelState *vacrel,
 				recently_dead_tuples;
 	int			nnewlpdead;
 	int			nfrozen;
-	TransactionId NewRelfrozenXid;
-	MultiXactId NewRelminMxid;
+	page_frozenxid_tracker xtrack;
+	bool		freeze_all_eligible PG_USED_FOR_ASSERTS_ONLY;
 	OffsetNumber deadoffsets[MaxHeapTuplesPerPage];
 	xl_heap_freeze_tuple frozen[MaxHeapTuplesPerPage];
 
@@ -1571,8 +1573,11 @@ lazy_scan_prune(LVRelState *vacrel,
 retry:
 
 	/* Initialize (or reset) page-level state */
-	NewRelfrozenXid = vacrel->NewRelfrozenXid;
-	NewRelminMxid = vacrel->NewRelminMxid;
+	xtrack.freeze = false;
+	xtrack.relfrozenxid_out = vacrel->NewRelfrozenXid;
+	xtrack.relminmxid_out = vacrel->NewRelminMxid;
+	xtrack.relfrozenxid_nofreeze_out = vacrel->NewRelfrozenXid;
+	xtrack.relminmxid_nofreeze_out = vacrel->NewRelminMxid;
 	tuples_deleted = 0;
 	lpdead_items = 0;
 	live_tuples = 0;
@@ -1625,27 +1630,23 @@ retry:
 			continue;
 		}
 
-		/*
-		 * LP_DEAD items are processed outside of the loop.
-		 *
-		 * Note that we deliberately don't set hastup=true in the case of an
-		 * LP_DEAD item here, which is not how count_nondeletable_pages() does
-		 * it -- it only considers pages empty/truncatable when they have no
-		 * items at all (except LP_UNUSED items).
-		 *
-		 * Our assumption is that any LP_DEAD items we encounter here will
-		 * become LP_UNUSED inside lazy_vacuum_heap_page() before we actually
-		 * call count_nondeletable_pages().  In any case our opinion of
-		 * whether or not a page 'hastup' (which is how our caller sets its
-		 * vacrel->nonempty_pages value) is inherently race-prone.  It must be
-		 * treated as advisory/unreliable, so we might as well be slightly
-		 * optimistic.
-		 */
 		if (ItemIdIsDead(itemid))
 		{
+			/*
+			 * Delay unsetting all_visible until after we have decided on
+			 * whether this page should be frozen.  We need to test "is this
+			 * page all_visible, assuming any LP_DEAD items are set LP_UNUSED
+			 * in final heap pass?" to reach a decision.  all_visible will be
+			 * unset before we return, as required by lazy_scan_heap caller.
+			 *
+			 * Deliberately don't set hastup for LP_DEAD items.  We make the
+			 * soft assumption that any LP_DEAD items encountered here will
+			 * become LP_UNUSED later on, before count_nondeletable_pages is
+			 * reached.  Whether the page 'hastup' is inherently race-prone.
+			 * It must be treated as unreliable by caller anyway, so we might
+			 * as well be slightly optimistic about it.
+			 */
 			deadoffsets[lpdead_items++] = offnum;
-			prunestate->all_visible = false;
-			prunestate->has_lpdead_items = true;
 			continue;
 		}
 
@@ -1777,10 +1778,12 @@ retry:
 		if (heap_prepare_freeze_tuple(tuple.t_data,
 									  vacrel->relfrozenxid,
 									  vacrel->relminmxid,
+									  vacrel->OldestXmin,
+									  vacrel->OldestMxact,
 									  vacrel->FreezeLimit,
 									  vacrel->MultiXactCutoff,
 									  &frozen[nfrozen], &tuple_totally_frozen,
-									  &NewRelfrozenXid, &NewRelminMxid))
+									  &xtrack))
 		{
 			/* Will execute freeze below */
 			frozen[nfrozen++].offset = offnum;
@@ -1801,9 +1804,33 @@ retry:
 	 * that will need to be vacuumed in indexes later, or a LP_NORMAL tuple
 	 * that remains and needs to be considered for freezing now (LP_UNUSED and
 	 * LP_REDIRECT items also remain, but are of no further interest to us).
+	 *
+	 * Freeze the page when heap_prepare_freeze_tuple indicates that at least
+	 * one XID/MXID from before FreezeLimit/MultiXactCutoff is present.
 	 */
-	vacrel->NewRelfrozenXid = NewRelfrozenXid;
-	vacrel->NewRelminMxid = NewRelminMxid;
+	if (xtrack.freeze || nfrozen == 0)
+	{
+		/*
+		 * We're freezing the page.  Our final NewRelfrozenXid doesn't need to
+		 * be affected by the XIDs that are just about to be frozen anyway.
+		 *
+		 * Note: although we're freezing all eligible tuples on this page, we
+		 * might not need to freeze anything (might be zero eligible tuples).
+		 */
+		vacrel->NewRelfrozenXid = xtrack.relfrozenxid_out;
+		vacrel->NewRelminMxid = xtrack.relminmxid_out;
+		freeze_all_eligible = true;
+	}
+	else
+	{
+		/* Not freezing this page, so use alternative cutoffs */
+		vacrel->NewRelfrozenXid = xtrack.relfrozenxid_nofreeze_out;
+		vacrel->NewRelminMxid = xtrack.relminmxid_nofreeze_out;
+
+		/* Might still set page all-visible, but never all-frozen */
+		nfrozen = 0;
+		freeze_all_eligible = prunestate->all_frozen = false;
+	}
 
 	/*
 	 * Consider the need to freeze any items with tuple storage from the page
@@ -1811,7 +1838,7 @@ retry:
 	 */
 	if (nfrozen > 0)
 	{
-		Assert(prunestate->hastup);
+		Assert(prunestate->hastup && freeze_all_eligible);
 
 		/*
 		 * At least one tuple with storage needs to be frozen -- execute that
@@ -1841,7 +1868,7 @@ retry:
 		{
 			XLogRecPtr	recptr;
 
-			recptr = log_heap_freeze(vacrel->rel, buf, vacrel->FreezeLimit,
+			recptr = log_heap_freeze(rel, buf, vacrel->NewRelfrozenXid,
 									 frozen, nfrozen);
 			PageSetLSN(page, recptr);
 		}
@@ -1850,6 +1877,41 @@ retry:
 	}
 
 	/*
+	 * Now save details of the LP_DEAD items from the page in vacrel
+	 */
+	if (lpdead_items > 0)
+	{
+		VacDeadItems *dead_items = vacrel->dead_items;
+		ItemPointerData tmp;
+
+		vacrel->lpdead_item_pages++;
+
+		ItemPointerSetBlockNumber(&tmp, blkno);
+
+		for (int i = 0; i < lpdead_items; i++)
+		{
+			ItemPointerSetOffsetNumber(&tmp, deadoffsets[i]);
+			dead_items->items[dead_items->num_items++] = tmp;
+		}
+
+		Assert(dead_items->num_items <= dead_items->max_items);
+		pgstat_progress_update_param(PROGRESS_VACUUM_NUM_DEAD_TUPLES,
+									 dead_items->num_items);
+
+		/* lazy_scan_heap caller expects LP_DEAD item to unset all_visible */
+		prunestate->has_lpdead_items = true;
+		prunestate->all_visible = false;
+	}
+
+	/* Finally, add page-local counts to whole-VACUUM counts */
+	vacrel->tuples_deleted += tuples_deleted;
+	vacrel->lpdead_items += lpdead_items;
+	vacrel->live_tuples += live_tuples;
+	vacrel->recently_dead_tuples += recently_dead_tuples;
+
+	/*
+	 * We're done, but assert that some postconditions hold before returning.
+	 *
 	 * The second pass over the heap can also set visibility map bits, using
 	 * the same approach.  This is important when the table frequently has a
 	 * few old LP_DEAD items on each page by the time we get to it (typically
@@ -1873,7 +1935,7 @@ retry:
 			Assert(false);
 
 		Assert(lpdead_items == 0);
-		Assert(prunestate->all_frozen == all_frozen);
+		Assert(prunestate->all_frozen == all_frozen || !freeze_all_eligible);
 
 		/*
 		 * It's possible that we froze tuples and made the page's XID cutoff
@@ -1885,38 +1947,6 @@ retry:
 			   cutoff == prunestate->visibility_cutoff_xid);
 	}
 #endif
-
-	/*
-	 * Now save details of the LP_DEAD items from the page in vacrel
-	 */
-	if (lpdead_items > 0)
-	{
-		VacDeadItems *dead_items = vacrel->dead_items;
-		ItemPointerData tmp;
-
-		Assert(!prunestate->all_visible);
-		Assert(prunestate->has_lpdead_items);
-
-		vacrel->lpdead_item_pages++;
-
-		ItemPointerSetBlockNumber(&tmp, blkno);
-
-		for (int i = 0; i < lpdead_items; i++)
-		{
-			ItemPointerSetOffsetNumber(&tmp, deadoffsets[i]);
-			dead_items->items[dead_items->num_items++] = tmp;
-		}
-
-		Assert(dead_items->num_items <= dead_items->max_items);
-		pgstat_progress_update_param(PROGRESS_VACUUM_NUM_DEAD_TUPLES,
-									 dead_items->num_items);
-	}
-
-	/* Finally, add page-local counts to whole-VACUUM counts */
-	vacrel->tuples_deleted += tuples_deleted;
-	vacrel->lpdead_items += lpdead_items;
-	vacrel->live_tuples += live_tuples;
-	vacrel->recently_dead_tuples += recently_dead_tuples;
 }
 
 /*
-- 
2.34.1

