 src/backend/access/heap/Makefile      |    1 +
 src/backend/access/heap/heapam.c      | 1321 --------------------------------
 src/backend/access/heap/heapam_xlog.c | 1326 +++++++++++++++++++++++++++++++++
 src/backend/access/heap/meson.build   |    1 +
 src/include/access/heapam.h           |   25 +
 5 files changed, 1353 insertions(+), 1321 deletions(-)

diff --git a/src/backend/access/heap/Makefile b/src/backend/access/heap/Makefile
index af0bd1888e..394534172f 100644
--- a/src/backend/access/heap/Makefile
+++ b/src/backend/access/heap/Makefile
@@ -16,6 +16,7 @@ OBJS = \
 	heapam.o \
 	heapam_handler.o \
 	heapam_visibility.o \
+	heapam_xlog.o \
 	heaptoast.o \
 	hio.o \
 	pruneheap.o \
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 82bb9cb33b..2b6a72043f 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -6809,30 +6809,6 @@ heap_prepare_freeze_tuple(HeapTupleHeader tuple,
 	return freeze_xmin || replace_xvac || replace_xmax || freeze_xmax;
 }
 
-/*
- * heap_execute_freeze_tuple
- *		Execute the prepared freezing of a tuple with caller's freeze plan.
- *
- * Caller is responsible for ensuring that no other backend can access the
- * storage underlying this tuple, either by holding an exclusive lock on the
- * buffer containing it (which is what lazy VACUUM does), or by having it be
- * in private storage (which is what CLUSTER and friends do).
- */
-static inline void
-heap_execute_freeze_tuple(HeapTupleHeader tuple, HeapTupleFreeze *frz)
-{
-	HeapTupleHeaderSetXmax(tuple, frz->xmax);
-
-	if (frz->frzflags & XLH_FREEZE_XVAC)
-		HeapTupleHeaderSetXvac(tuple, FrozenTransactionId);
-
-	if (frz->frzflags & XLH_INVALID_XVAC)
-		HeapTupleHeaderSetXvac(tuple, InvalidTransactionId);
-
-	tuple->t_infomask = frz->t_infomask;
-	tuple->t_infomask2 = frz->t_infomask2;
-}
-
 /*
  * Perform xmin/xmax XID status sanity checks before actually executing freeze
  * plans.
@@ -8743,1303 +8719,6 @@ ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_required,
 	return key_tuple;
 }
 
-/*
- * Replay XLOG_HEAP2_PRUNE_* records.
- */
-static void
-heap_xlog_prune_freeze(XLogReaderState *record)
-{
-	XLogRecPtr	lsn = record->EndRecPtr;
-	char	   *maindataptr = XLogRecGetData(record);
-	xl_heap_prune xlrec;
-	Buffer		buffer;
-	RelFileLocator rlocator;
-	BlockNumber blkno;
-	XLogRedoAction action;
-
-	XLogRecGetBlockTag(record, 0, &rlocator, NULL, &blkno);
-	memcpy(&xlrec, maindataptr, SizeOfHeapPrune);
-	maindataptr += SizeOfHeapPrune;
-
-	/*
-	 * We will take an ordinary exclusive lock or a cleanup lock depending on
-	 * whether the XLHP_CLEANUP_LOCK flag is set.  With an ordinary exclusive
-	 * lock, we better not be doing anything that requires moving existing
-	 * tuple data.
-	 */
-	Assert((xlrec.flags & XLHP_CLEANUP_LOCK) != 0 ||
-		   (xlrec.flags & (XLHP_HAS_REDIRECTIONS | XLHP_HAS_DEAD_ITEMS)) == 0);
-
-	/*
-	 * We are about to remove and/or freeze tuples.  In Hot Standby mode,
-	 * ensure that there are no queries running for which the removed tuples
-	 * are still visible or which still consider the frozen xids as running.
-	 * The conflict horizon XID comes after xl_heap_prune.
-	 */
-	if ((xlrec.flags & XLHP_HAS_CONFLICT_HORIZON) != 0)
-	{
-		TransactionId snapshot_conflict_horizon;
-
-		/* memcpy() because snapshot_conflict_horizon is stored unaligned */
-		memcpy(&snapshot_conflict_horizon, maindataptr, sizeof(TransactionId));
-		maindataptr += sizeof(TransactionId);
-
-		if (InHotStandby)
-			ResolveRecoveryConflictWithSnapshot(snapshot_conflict_horizon,
-												(xlrec.flags & XLHP_IS_CATALOG_REL) != 0,
-												rlocator);
-	}
-
-	/*
-	 * If we have a full-page image, restore it and we're done.
-	 */
-	action = XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL,
-										   (xlrec.flags & XLHP_CLEANUP_LOCK) != 0,
-										   &buffer);
-	if (action == BLK_NEEDS_REDO)
-	{
-		Page		page = (Page) BufferGetPage(buffer);
-		OffsetNumber *redirected;
-		OffsetNumber *nowdead;
-		OffsetNumber *nowunused;
-		int			nredirected;
-		int			ndead;
-		int			nunused;
-		int			nplans;
-		Size		datalen;
-		xlhp_freeze_plan *plans;
-		OffsetNumber *frz_offsets;
-		char	   *dataptr = XLogRecGetBlockData(record, 0, &datalen);
-
-		heap_xlog_deserialize_prune_and_freeze(dataptr, xlrec.flags,
-											   &nplans, &plans, &frz_offsets,
-											   &nredirected, &redirected,
-											   &ndead, &nowdead,
-											   &nunused, &nowunused);
-
-		/*
-		 * Update all line pointers per the record, and repair fragmentation
-		 * if needed.
-		 */
-		if (nredirected > 0 || ndead > 0 || nunused > 0)
-			heap_page_prune_execute(buffer,
-									(xlrec.flags & XLHP_CLEANUP_LOCK) == 0,
-									redirected, nredirected,
-									nowdead, ndead,
-									nowunused, nunused);
-
-		/* Freeze tuples */
-		for (int p = 0; p < nplans; p++)
-		{
-			HeapTupleFreeze frz;
-
-			/*
-			 * Convert freeze plan representation from WAL record into
-			 * per-tuple format used by heap_execute_freeze_tuple
-			 */
-			frz.xmax = plans[p].xmax;
-			frz.t_infomask2 = plans[p].t_infomask2;
-			frz.t_infomask = plans[p].t_infomask;
-			frz.frzflags = plans[p].frzflags;
-			frz.offset = InvalidOffsetNumber;	/* unused, but be tidy */
-
-			for (int i = 0; i < plans[p].ntuples; i++)
-			{
-				OffsetNumber offset = *(frz_offsets++);
-				ItemId		lp;
-				HeapTupleHeader tuple;
-
-				lp = PageGetItemId(page, offset);
-				tuple = (HeapTupleHeader) PageGetItem(page, lp);
-				heap_execute_freeze_tuple(tuple, &frz);
-			}
-		}
-
-		/* There should be no more data */
-		Assert((char *) frz_offsets == dataptr + datalen);
-
-		/*
-		 * Note: we don't worry about updating the page's prunability hints.
-		 * At worst this will cause an extra prune cycle to occur soon.
-		 */
-
-		PageSetLSN(page, lsn);
-		MarkBufferDirty(buffer);
-	}
-
-	/*
-	 * If we released any space or line pointers, update the free space map.
-	 *
-	 * Do this regardless of a full-page image being applied, since the FSM
-	 * data is not in the page anyway.
-	 */
-	if (BufferIsValid(buffer))
-	{
-		if (xlrec.flags & (XLHP_HAS_REDIRECTIONS |
-						   XLHP_HAS_DEAD_ITEMS |
-						   XLHP_HAS_NOW_UNUSED_ITEMS))
-		{
-			Size		freespace = PageGetHeapFreeSpace(BufferGetPage(buffer));
-
-			UnlockReleaseBuffer(buffer);
-
-			XLogRecordPageWithFreeSpace(rlocator, blkno, freespace);
-		}
-		else
-			UnlockReleaseBuffer(buffer);
-	}
-}
-
-/*
- * Replay XLOG_HEAP2_VISIBLE record.
- *
- * The critical integrity requirement here is that we must never end up with
- * a situation where the visibility map bit is set, and the page-level
- * PD_ALL_VISIBLE bit is clear.  If that were to occur, then a subsequent
- * page modification would fail to clear the visibility map bit.
- */
-static void
-heap_xlog_visible(XLogReaderState *record)
-{
-	XLogRecPtr	lsn = record->EndRecPtr;
-	xl_heap_visible *xlrec = (xl_heap_visible *) XLogRecGetData(record);
-	Buffer		vmbuffer = InvalidBuffer;
-	Buffer		buffer;
-	Page		page;
-	RelFileLocator rlocator;
-	BlockNumber blkno;
-	XLogRedoAction action;
-
-	Assert((xlrec->flags & VISIBILITYMAP_XLOG_VALID_BITS) == xlrec->flags);
-
-	XLogRecGetBlockTag(record, 1, &rlocator, NULL, &blkno);
-
-	/*
-	 * If there are any Hot Standby transactions running that have an xmin
-	 * horizon old enough that this page isn't all-visible for them, they
-	 * might incorrectly decide that an index-only scan can skip a heap fetch.
-	 *
-	 * NB: It might be better to throw some kind of "soft" conflict here that
-	 * forces any index-only scan that is in flight to perform heap fetches,
-	 * rather than killing the transaction outright.
-	 */
-	if (InHotStandby)
-		ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
-											xlrec->flags & VISIBILITYMAP_XLOG_CATALOG_REL,
-											rlocator);
-
-	/*
-	 * Read the heap page, if it still exists. If the heap file has dropped or
-	 * truncated later in recovery, we don't need to update the page, but we'd
-	 * better still update the visibility map.
-	 */
-	action = XLogReadBufferForRedo(record, 1, &buffer);
-	if (action == BLK_NEEDS_REDO)
-	{
-		/*
-		 * We don't bump the LSN of the heap page when setting the visibility
-		 * map bit (unless checksums or wal_hint_bits is enabled, in which
-		 * case we must). This exposes us to torn page hazards, but since
-		 * we're not inspecting the existing page contents in any way, we
-		 * don't care.
-		 */
-		page = BufferGetPage(buffer);
-
-		PageSetAllVisible(page);
-
-		if (XLogHintBitIsNeeded())
-			PageSetLSN(page, lsn);
-
-		MarkBufferDirty(buffer);
-	}
-	else if (action == BLK_RESTORED)
-	{
-		/*
-		 * If heap block was backed up, we already restored it and there's
-		 * nothing more to do. (This can only happen with checksums or
-		 * wal_log_hints enabled.)
-		 */
-	}
-
-	if (BufferIsValid(buffer))
-	{
-		Size		space = PageGetFreeSpace(BufferGetPage(buffer));
-
-		UnlockReleaseBuffer(buffer);
-
-		/*
-		 * Since FSM is not WAL-logged and only updated heuristically, it
-		 * easily becomes stale in standbys.  If the standby is later promoted
-		 * and runs VACUUM, it will skip updating individual free space
-		 * figures for pages that became all-visible (or all-frozen, depending
-		 * on the vacuum mode,) which is troublesome when FreeSpaceMapVacuum
-		 * propagates too optimistic free space values to upper FSM layers;
-		 * later inserters try to use such pages only to find out that they
-		 * are unusable.  This can cause long stalls when there are many such
-		 * pages.
-		 *
-		 * Forestall those problems by updating FSM's idea about a page that
-		 * is becoming all-visible or all-frozen.
-		 *
-		 * Do this regardless of a full-page image being applied, since the
-		 * FSM data is not in the page anyway.
-		 */
-		if (xlrec->flags & VISIBILITYMAP_VALID_BITS)
-			XLogRecordPageWithFreeSpace(rlocator, blkno, space);
-	}
-
-	/*
-	 * Even if we skipped the heap page update due to the LSN interlock, it's
-	 * still safe to update the visibility map.  Any WAL record that clears
-	 * the visibility map bit does so before checking the page LSN, so any
-	 * bits that need to be cleared will still be cleared.
-	 */
-	if (XLogReadBufferForRedoExtended(record, 0, RBM_ZERO_ON_ERROR, false,
-									  &vmbuffer) == BLK_NEEDS_REDO)
-	{
-		Page		vmpage = BufferGetPage(vmbuffer);
-		Relation	reln;
-		uint8		vmbits;
-
-		/* initialize the page if it was read as zeros */
-		if (PageIsNew(vmpage))
-			PageInit(vmpage, BLCKSZ, 0);
-
-		/* remove VISIBILITYMAP_XLOG_* */
-		vmbits = xlrec->flags & VISIBILITYMAP_VALID_BITS;
-
-		/*
-		 * XLogReadBufferForRedoExtended locked the buffer. But
-		 * visibilitymap_set will handle locking itself.
-		 */
-		LockBuffer(vmbuffer, BUFFER_LOCK_UNLOCK);
-
-		reln = CreateFakeRelcacheEntry(rlocator);
-		visibilitymap_pin(reln, blkno, &vmbuffer);
-
-		visibilitymap_set(reln, blkno, InvalidBuffer, lsn, vmbuffer,
-						  xlrec->snapshotConflictHorizon, vmbits);
-
-		ReleaseBuffer(vmbuffer);
-		FreeFakeRelcacheEntry(reln);
-	}
-	else if (BufferIsValid(vmbuffer))
-		UnlockReleaseBuffer(vmbuffer);
-}
-
-/*
- * Given an "infobits" field from an XLog record, set the correct bits in the
- * given infomask and infomask2 for the tuple touched by the record.
- *
- * (This is the reverse of compute_infobits).
- */
-static void
-fix_infomask_from_infobits(uint8 infobits, uint16 *infomask, uint16 *infomask2)
-{
-	*infomask &= ~(HEAP_XMAX_IS_MULTI | HEAP_XMAX_LOCK_ONLY |
-				   HEAP_XMAX_KEYSHR_LOCK | HEAP_XMAX_EXCL_LOCK);
-	*infomask2 &= ~HEAP_KEYS_UPDATED;
-
-	if (infobits & XLHL_XMAX_IS_MULTI)
-		*infomask |= HEAP_XMAX_IS_MULTI;
-	if (infobits & XLHL_XMAX_LOCK_ONLY)
-		*infomask |= HEAP_XMAX_LOCK_ONLY;
-	if (infobits & XLHL_XMAX_EXCL_LOCK)
-		*infomask |= HEAP_XMAX_EXCL_LOCK;
-	/* note HEAP_XMAX_SHR_LOCK isn't considered here */
-	if (infobits & XLHL_XMAX_KEYSHR_LOCK)
-		*infomask |= HEAP_XMAX_KEYSHR_LOCK;
-
-	if (infobits & XLHL_KEYS_UPDATED)
-		*infomask2 |= HEAP_KEYS_UPDATED;
-}
-
-static void
-heap_xlog_delete(XLogReaderState *record)
-{
-	XLogRecPtr	lsn = record->EndRecPtr;
-	xl_heap_delete *xlrec = (xl_heap_delete *) XLogRecGetData(record);
-	Buffer		buffer;
-	Page		page;
-	ItemId		lp = NULL;
-	HeapTupleHeader htup;
-	BlockNumber blkno;
-	RelFileLocator target_locator;
-	ItemPointerData target_tid;
-
-	XLogRecGetBlockTag(record, 0, &target_locator, NULL, &blkno);
-	ItemPointerSetBlockNumber(&target_tid, blkno);
-	ItemPointerSetOffsetNumber(&target_tid, xlrec->offnum);
-
-	/*
-	 * The visibility map may need to be fixed even if the heap page is
-	 * already up-to-date.
-	 */
-	if (xlrec->flags & XLH_DELETE_ALL_VISIBLE_CLEARED)
-	{
-		Relation	reln = CreateFakeRelcacheEntry(target_locator);
-		Buffer		vmbuffer = InvalidBuffer;
-
-		visibilitymap_pin(reln, blkno, &vmbuffer);
-		visibilitymap_clear(reln, blkno, vmbuffer, VISIBILITYMAP_VALID_BITS);
-		ReleaseBuffer(vmbuffer);
-		FreeFakeRelcacheEntry(reln);
-	}
-
-	if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
-	{
-		page = BufferGetPage(buffer);
-
-		if (PageGetMaxOffsetNumber(page) >= xlrec->offnum)
-			lp = PageGetItemId(page, xlrec->offnum);
-
-		if (PageGetMaxOffsetNumber(page) < xlrec->offnum || !ItemIdIsNormal(lp))
-			elog(PANIC, "invalid lp");
-
-		htup = (HeapTupleHeader) PageGetItem(page, lp);
-
-		htup->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED);
-		htup->t_infomask2 &= ~HEAP_KEYS_UPDATED;
-		HeapTupleHeaderClearHotUpdated(htup);
-		fix_infomask_from_infobits(xlrec->infobits_set,
-								   &htup->t_infomask, &htup->t_infomask2);
-		if (!(xlrec->flags & XLH_DELETE_IS_SUPER))
-			HeapTupleHeaderSetXmax(htup, xlrec->xmax);
-		else
-			HeapTupleHeaderSetXmin(htup, InvalidTransactionId);
-		HeapTupleHeaderSetCmax(htup, FirstCommandId, false);
-
-		/* Mark the page as a candidate for pruning */
-		PageSetPrunable(page, XLogRecGetXid(record));
-
-		if (xlrec->flags & XLH_DELETE_ALL_VISIBLE_CLEARED)
-			PageClearAllVisible(page);
-
-		/* Make sure t_ctid is set correctly */
-		if (xlrec->flags & XLH_DELETE_IS_PARTITION_MOVE)
-			HeapTupleHeaderSetMovedPartitions(htup);
-		else
-			htup->t_ctid = target_tid;
-		PageSetLSN(page, lsn);
-		MarkBufferDirty(buffer);
-	}
-	if (BufferIsValid(buffer))
-		UnlockReleaseBuffer(buffer);
-}
-
-static void
-heap_xlog_insert(XLogReaderState *record)
-{
-	XLogRecPtr	lsn = record->EndRecPtr;
-	xl_heap_insert *xlrec = (xl_heap_insert *) XLogRecGetData(record);
-	Buffer		buffer;
-	Page		page;
-	union
-	{
-		HeapTupleHeaderData hdr;
-		char		data[MaxHeapTupleSize];
-	}			tbuf;
-	HeapTupleHeader htup;
-	xl_heap_header xlhdr;
-	uint32		newlen;
-	Size		freespace = 0;
-	RelFileLocator target_locator;
-	BlockNumber blkno;
-	ItemPointerData target_tid;
-	XLogRedoAction action;
-
-	XLogRecGetBlockTag(record, 0, &target_locator, NULL, &blkno);
-	ItemPointerSetBlockNumber(&target_tid, blkno);
-	ItemPointerSetOffsetNumber(&target_tid, xlrec->offnum);
-
-	/*
-	 * The visibility map may need to be fixed even if the heap page is
-	 * already up-to-date.
-	 */
-	if (xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED)
-	{
-		Relation	reln = CreateFakeRelcacheEntry(target_locator);
-		Buffer		vmbuffer = InvalidBuffer;
-
-		visibilitymap_pin(reln, blkno, &vmbuffer);
-		visibilitymap_clear(reln, blkno, vmbuffer, VISIBILITYMAP_VALID_BITS);
-		ReleaseBuffer(vmbuffer);
-		FreeFakeRelcacheEntry(reln);
-	}
-
-	/*
-	 * If we inserted the first and only tuple on the page, re-initialize the
-	 * page from scratch.
-	 */
-	if (XLogRecGetInfo(record) & XLOG_HEAP_INIT_PAGE)
-	{
-		buffer = XLogInitBufferForRedo(record, 0);
-		page = BufferGetPage(buffer);
-		PageInit(page, BufferGetPageSize(buffer), 0);
-		action = BLK_NEEDS_REDO;
-	}
-	else
-		action = XLogReadBufferForRedo(record, 0, &buffer);
-	if (action == BLK_NEEDS_REDO)
-	{
-		Size		datalen;
-		char	   *data;
-
-		page = BufferGetPage(buffer);
-
-		if (PageGetMaxOffsetNumber(page) + 1 < xlrec->offnum)
-			elog(PANIC, "invalid max offset number");
-
-		data = XLogRecGetBlockData(record, 0, &datalen);
-
-		newlen = datalen - SizeOfHeapHeader;
-		Assert(datalen > SizeOfHeapHeader && newlen <= MaxHeapTupleSize);
-		memcpy((char *) &xlhdr, data, SizeOfHeapHeader);
-		data += SizeOfHeapHeader;
-
-		htup = &tbuf.hdr;
-		MemSet((char *) htup, 0, SizeofHeapTupleHeader);
-		/* PG73FORMAT: get bitmap [+ padding] [+ oid] + data */
-		memcpy((char *) htup + SizeofHeapTupleHeader,
-			   data,
-			   newlen);
-		newlen += SizeofHeapTupleHeader;
-		htup->t_infomask2 = xlhdr.t_infomask2;
-		htup->t_infomask = xlhdr.t_infomask;
-		htup->t_hoff = xlhdr.t_hoff;
-		HeapTupleHeaderSetXmin(htup, XLogRecGetXid(record));
-		HeapTupleHeaderSetCmin(htup, FirstCommandId);
-		htup->t_ctid = target_tid;
-
-		if (PageAddItem(page, (Item) htup, newlen, xlrec->offnum,
-						true, true) == InvalidOffsetNumber)
-			elog(PANIC, "failed to add tuple");
-
-		freespace = PageGetHeapFreeSpace(page); /* needed to update FSM below */
-
-		PageSetLSN(page, lsn);
-
-		if (xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED)
-			PageClearAllVisible(page);
-
-		/* XLH_INSERT_ALL_FROZEN_SET implies that all tuples are visible */
-		if (xlrec->flags & XLH_INSERT_ALL_FROZEN_SET)
-			PageSetAllVisible(page);
-
-		MarkBufferDirty(buffer);
-	}
-	if (BufferIsValid(buffer))
-		UnlockReleaseBuffer(buffer);
-
-	/*
-	 * If the page is running low on free space, update the FSM as well.
-	 * Arbitrarily, our definition of "low" is less than 20%. We can't do much
-	 * better than that without knowing the fill-factor for the table.
-	 *
-	 * XXX: Don't do this if the page was restored from full page image. We
-	 * don't bother to update the FSM in that case, it doesn't need to be
-	 * totally accurate anyway.
-	 */
-	if (action == BLK_NEEDS_REDO && freespace < BLCKSZ / 5)
-		XLogRecordPageWithFreeSpace(target_locator, blkno, freespace);
-}
-
-/*
- * Handles MULTI_INSERT record type.
- */
-static void
-heap_xlog_multi_insert(XLogReaderState *record)
-{
-	XLogRecPtr	lsn = record->EndRecPtr;
-	xl_heap_multi_insert *xlrec;
-	RelFileLocator rlocator;
-	BlockNumber blkno;
-	Buffer		buffer;
-	Page		page;
-	union
-	{
-		HeapTupleHeaderData hdr;
-		char		data[MaxHeapTupleSize];
-	}			tbuf;
-	HeapTupleHeader htup;
-	uint32		newlen;
-	Size		freespace = 0;
-	int			i;
-	bool		isinit = (XLogRecGetInfo(record) & XLOG_HEAP_INIT_PAGE) != 0;
-	XLogRedoAction action;
-
-	/*
-	 * Insertion doesn't overwrite MVCC data, so no conflict processing is
-	 * required.
-	 */
-	xlrec = (xl_heap_multi_insert *) XLogRecGetData(record);
-
-	XLogRecGetBlockTag(record, 0, &rlocator, NULL, &blkno);
-
-	/* check that the mutually exclusive flags are not both set */
-	Assert(!((xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED) &&
-			 (xlrec->flags & XLH_INSERT_ALL_FROZEN_SET)));
-
-	/*
-	 * The visibility map may need to be fixed even if the heap page is
-	 * already up-to-date.
-	 */
-	if (xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED)
-	{
-		Relation	reln = CreateFakeRelcacheEntry(rlocator);
-		Buffer		vmbuffer = InvalidBuffer;
-
-		visibilitymap_pin(reln, blkno, &vmbuffer);
-		visibilitymap_clear(reln, blkno, vmbuffer, VISIBILITYMAP_VALID_BITS);
-		ReleaseBuffer(vmbuffer);
-		FreeFakeRelcacheEntry(reln);
-	}
-
-	if (isinit)
-	{
-		buffer = XLogInitBufferForRedo(record, 0);
-		page = BufferGetPage(buffer);
-		PageInit(page, BufferGetPageSize(buffer), 0);
-		action = BLK_NEEDS_REDO;
-	}
-	else
-		action = XLogReadBufferForRedo(record, 0, &buffer);
-	if (action == BLK_NEEDS_REDO)
-	{
-		char	   *tupdata;
-		char	   *endptr;
-		Size		len;
-
-		/* Tuples are stored as block data */
-		tupdata = XLogRecGetBlockData(record, 0, &len);
-		endptr = tupdata + len;
-
-		page = (Page) BufferGetPage(buffer);
-
-		for (i = 0; i < xlrec->ntuples; i++)
-		{
-			OffsetNumber offnum;
-			xl_multi_insert_tuple *xlhdr;
-
-			/*
-			 * If we're reinitializing the page, the tuples are stored in
-			 * order from FirstOffsetNumber. Otherwise there's an array of
-			 * offsets in the WAL record, and the tuples come after that.
-			 */
-			if (isinit)
-				offnum = FirstOffsetNumber + i;
-			else
-				offnum = xlrec->offsets[i];
-			if (PageGetMaxOffsetNumber(page) + 1 < offnum)
-				elog(PANIC, "invalid max offset number");
-
-			xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(tupdata);
-			tupdata = ((char *) xlhdr) + SizeOfMultiInsertTuple;
-
-			newlen = xlhdr->datalen;
-			Assert(newlen <= MaxHeapTupleSize);
-			htup = &tbuf.hdr;
-			MemSet((char *) htup, 0, SizeofHeapTupleHeader);
-			/* PG73FORMAT: get bitmap [+ padding] [+ oid] + data */
-			memcpy((char *) htup + SizeofHeapTupleHeader,
-				   (char *) tupdata,
-				   newlen);
-			tupdata += newlen;
-
-			newlen += SizeofHeapTupleHeader;
-			htup->t_infomask2 = xlhdr->t_infomask2;
-			htup->t_infomask = xlhdr->t_infomask;
-			htup->t_hoff = xlhdr->t_hoff;
-			HeapTupleHeaderSetXmin(htup, XLogRecGetXid(record));
-			HeapTupleHeaderSetCmin(htup, FirstCommandId);
-			ItemPointerSetBlockNumber(&htup->t_ctid, blkno);
-			ItemPointerSetOffsetNumber(&htup->t_ctid, offnum);
-
-			offnum = PageAddItem(page, (Item) htup, newlen, offnum, true, true);
-			if (offnum == InvalidOffsetNumber)
-				elog(PANIC, "failed to add tuple");
-		}
-		if (tupdata != endptr)
-			elog(PANIC, "total tuple length mismatch");
-
-		freespace = PageGetHeapFreeSpace(page); /* needed to update FSM below */
-
-		PageSetLSN(page, lsn);
-
-		if (xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED)
-			PageClearAllVisible(page);
-
-		/* XLH_INSERT_ALL_FROZEN_SET implies that all tuples are visible */
-		if (xlrec->flags & XLH_INSERT_ALL_FROZEN_SET)
-			PageSetAllVisible(page);
-
-		MarkBufferDirty(buffer);
-	}
-	if (BufferIsValid(buffer))
-		UnlockReleaseBuffer(buffer);
-
-	/*
-	 * If the page is running low on free space, update the FSM as well.
-	 * Arbitrarily, our definition of "low" is less than 20%. We can't do much
-	 * better than that without knowing the fill-factor for the table.
-	 *
-	 * XXX: Don't do this if the page was restored from full page image. We
-	 * don't bother to update the FSM in that case, it doesn't need to be
-	 * totally accurate anyway.
-	 */
-	if (action == BLK_NEEDS_REDO && freespace < BLCKSZ / 5)
-		XLogRecordPageWithFreeSpace(rlocator, blkno, freespace);
-}
-
-/*
- * Handles UPDATE and HOT_UPDATE
- */
-static void
-heap_xlog_update(XLogReaderState *record, bool hot_update)
-{
-	XLogRecPtr	lsn = record->EndRecPtr;
-	xl_heap_update *xlrec = (xl_heap_update *) XLogRecGetData(record);
-	RelFileLocator rlocator;
-	BlockNumber oldblk;
-	BlockNumber newblk;
-	ItemPointerData newtid;
-	Buffer		obuffer,
-				nbuffer;
-	Page		page;
-	OffsetNumber offnum;
-	ItemId		lp = NULL;
-	HeapTupleData oldtup;
-	HeapTupleHeader htup;
-	uint16		prefixlen = 0,
-				suffixlen = 0;
-	char	   *newp;
-	union
-	{
-		HeapTupleHeaderData hdr;
-		char		data[MaxHeapTupleSize];
-	}			tbuf;
-	xl_heap_header xlhdr;
-	uint32		newlen;
-	Size		freespace = 0;
-	XLogRedoAction oldaction;
-	XLogRedoAction newaction;
-
-	/* initialize to keep the compiler quiet */
-	oldtup.t_data = NULL;
-	oldtup.t_len = 0;
-
-	XLogRecGetBlockTag(record, 0, &rlocator, NULL, &newblk);
-	if (XLogRecGetBlockTagExtended(record, 1, NULL, NULL, &oldblk, NULL))
-	{
-		/* HOT updates are never done across pages */
-		Assert(!hot_update);
-	}
-	else
-		oldblk = newblk;
-
-	ItemPointerSet(&newtid, newblk, xlrec->new_offnum);
-
-	/*
-	 * The visibility map may need to be fixed even if the heap page is
-	 * already up-to-date.
-	 */
-	if (xlrec->flags & XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED)
-	{
-		Relation	reln = CreateFakeRelcacheEntry(rlocator);
-		Buffer		vmbuffer = InvalidBuffer;
-
-		visibilitymap_pin(reln, oldblk, &vmbuffer);
-		visibilitymap_clear(reln, oldblk, vmbuffer, VISIBILITYMAP_VALID_BITS);
-		ReleaseBuffer(vmbuffer);
-		FreeFakeRelcacheEntry(reln);
-	}
-
-	/*
-	 * In normal operation, it is important to lock the two pages in
-	 * page-number order, to avoid possible deadlocks against other update
-	 * operations going the other way.  However, during WAL replay there can
-	 * be no other update happening, so we don't need to worry about that. But
-	 * we *do* need to worry that we don't expose an inconsistent state to Hot
-	 * Standby queries --- so the original page can't be unlocked before we've
-	 * added the new tuple to the new page.
-	 */
-
-	/* Deal with old tuple version */
-	oldaction = XLogReadBufferForRedo(record, (oldblk == newblk) ? 0 : 1,
-									  &obuffer);
-	if (oldaction == BLK_NEEDS_REDO)
-	{
-		page = BufferGetPage(obuffer);
-		offnum = xlrec->old_offnum;
-		if (PageGetMaxOffsetNumber(page) >= offnum)
-			lp = PageGetItemId(page, offnum);
-
-		if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp))
-			elog(PANIC, "invalid lp");
-
-		htup = (HeapTupleHeader) PageGetItem(page, lp);
-
-		oldtup.t_data = htup;
-		oldtup.t_len = ItemIdGetLength(lp);
-
-		htup->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED);
-		htup->t_infomask2 &= ~HEAP_KEYS_UPDATED;
-		if (hot_update)
-			HeapTupleHeaderSetHotUpdated(htup);
-		else
-			HeapTupleHeaderClearHotUpdated(htup);
-		fix_infomask_from_infobits(xlrec->old_infobits_set, &htup->t_infomask,
-								   &htup->t_infomask2);
-		HeapTupleHeaderSetXmax(htup, xlrec->old_xmax);
-		HeapTupleHeaderSetCmax(htup, FirstCommandId, false);
-		/* Set forward chain link in t_ctid */
-		htup->t_ctid = newtid;
-
-		/* Mark the page as a candidate for pruning */
-		PageSetPrunable(page, XLogRecGetXid(record));
-
-		if (xlrec->flags & XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED)
-			PageClearAllVisible(page);
-
-		PageSetLSN(page, lsn);
-		MarkBufferDirty(obuffer);
-	}
-
-	/*
-	 * Read the page the new tuple goes into, if different from old.
-	 */
-	if (oldblk == newblk)
-	{
-		nbuffer = obuffer;
-		newaction = oldaction;
-	}
-	else if (XLogRecGetInfo(record) & XLOG_HEAP_INIT_PAGE)
-	{
-		nbuffer = XLogInitBufferForRedo(record, 0);
-		page = (Page) BufferGetPage(nbuffer);
-		PageInit(page, BufferGetPageSize(nbuffer), 0);
-		newaction = BLK_NEEDS_REDO;
-	}
-	else
-		newaction = XLogReadBufferForRedo(record, 0, &nbuffer);
-
-	/*
-	 * The visibility map may need to be fixed even if the heap page is
-	 * already up-to-date.
-	 */
-	if (xlrec->flags & XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED)
-	{
-		Relation	reln = CreateFakeRelcacheEntry(rlocator);
-		Buffer		vmbuffer = InvalidBuffer;
-
-		visibilitymap_pin(reln, newblk, &vmbuffer);
-		visibilitymap_clear(reln, newblk, vmbuffer, VISIBILITYMAP_VALID_BITS);
-		ReleaseBuffer(vmbuffer);
-		FreeFakeRelcacheEntry(reln);
-	}
-
-	/* Deal with new tuple */
-	if (newaction == BLK_NEEDS_REDO)
-	{
-		char	   *recdata;
-		char	   *recdata_end;
-		Size		datalen;
-		Size		tuplen;
-
-		recdata = XLogRecGetBlockData(record, 0, &datalen);
-		recdata_end = recdata + datalen;
-
-		page = BufferGetPage(nbuffer);
-
-		offnum = xlrec->new_offnum;
-		if (PageGetMaxOffsetNumber(page) + 1 < offnum)
-			elog(PANIC, "invalid max offset number");
-
-		if (xlrec->flags & XLH_UPDATE_PREFIX_FROM_OLD)
-		{
-			Assert(newblk == oldblk);
-			memcpy(&prefixlen, recdata, sizeof(uint16));
-			recdata += sizeof(uint16);
-		}
-		if (xlrec->flags & XLH_UPDATE_SUFFIX_FROM_OLD)
-		{
-			Assert(newblk == oldblk);
-			memcpy(&suffixlen, recdata, sizeof(uint16));
-			recdata += sizeof(uint16);
-		}
-
-		memcpy((char *) &xlhdr, recdata, SizeOfHeapHeader);
-		recdata += SizeOfHeapHeader;
-
-		tuplen = recdata_end - recdata;
-		Assert(tuplen <= MaxHeapTupleSize);
-
-		htup = &tbuf.hdr;
-		MemSet((char *) htup, 0, SizeofHeapTupleHeader);
-
-		/*
-		 * Reconstruct the new tuple using the prefix and/or suffix from the
-		 * old tuple, and the data stored in the WAL record.
-		 */
-		newp = (char *) htup + SizeofHeapTupleHeader;
-		if (prefixlen > 0)
-		{
-			int			len;
-
-			/* copy bitmap [+ padding] [+ oid] from WAL record */
-			len = xlhdr.t_hoff - SizeofHeapTupleHeader;
-			memcpy(newp, recdata, len);
-			recdata += len;
-			newp += len;
-
-			/* copy prefix from old tuple */
-			memcpy(newp, (char *) oldtup.t_data + oldtup.t_data->t_hoff, prefixlen);
-			newp += prefixlen;
-
-			/* copy new tuple data from WAL record */
-			len = tuplen - (xlhdr.t_hoff - SizeofHeapTupleHeader);
-			memcpy(newp, recdata, len);
-			recdata += len;
-			newp += len;
-		}
-		else
-		{
-			/*
-			 * copy bitmap [+ padding] [+ oid] + data from record, all in one
-			 * go
-			 */
-			memcpy(newp, recdata, tuplen);
-			recdata += tuplen;
-			newp += tuplen;
-		}
-		Assert(recdata == recdata_end);
-
-		/* copy suffix from old tuple */
-		if (suffixlen > 0)
-			memcpy(newp, (char *) oldtup.t_data + oldtup.t_len - suffixlen, suffixlen);
-
-		newlen = SizeofHeapTupleHeader + tuplen + prefixlen + suffixlen;
-		htup->t_infomask2 = xlhdr.t_infomask2;
-		htup->t_infomask = xlhdr.t_infomask;
-		htup->t_hoff = xlhdr.t_hoff;
-
-		HeapTupleHeaderSetXmin(htup, XLogRecGetXid(record));
-		HeapTupleHeaderSetCmin(htup, FirstCommandId);
-		HeapTupleHeaderSetXmax(htup, xlrec->new_xmax);
-		/* Make sure there is no forward chain link in t_ctid */
-		htup->t_ctid = newtid;
-
-		offnum = PageAddItem(page, (Item) htup, newlen, offnum, true, true);
-		if (offnum == InvalidOffsetNumber)
-			elog(PANIC, "failed to add tuple");
-
-		if (xlrec->flags & XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED)
-			PageClearAllVisible(page);
-
-		freespace = PageGetHeapFreeSpace(page); /* needed to update FSM below */
-
-		PageSetLSN(page, lsn);
-		MarkBufferDirty(nbuffer);
-	}
-
-	if (BufferIsValid(nbuffer) && nbuffer != obuffer)
-		UnlockReleaseBuffer(nbuffer);
-	if (BufferIsValid(obuffer))
-		UnlockReleaseBuffer(obuffer);
-
-	/*
-	 * If the new page is running low on free space, update the FSM as well.
-	 * Arbitrarily, our definition of "low" is less than 20%. We can't do much
-	 * better than that without knowing the fill-factor for the table.
-	 *
-	 * However, don't update the FSM on HOT updates, because after crash
-	 * recovery, either the old or the new tuple will certainly be dead and
-	 * prunable. After pruning, the page will have roughly as much free space
-	 * as it did before the update, assuming the new tuple is about the same
-	 * size as the old one.
-	 *
-	 * XXX: Don't do this if the page was restored from full page image. We
-	 * don't bother to update the FSM in that case, it doesn't need to be
-	 * totally accurate anyway.
-	 */
-	if (newaction == BLK_NEEDS_REDO && !hot_update && freespace < BLCKSZ / 5)
-		XLogRecordPageWithFreeSpace(rlocator, newblk, freespace);
-}
-
-static void
-heap_xlog_confirm(XLogReaderState *record)
-{
-	XLogRecPtr	lsn = record->EndRecPtr;
-	xl_heap_confirm *xlrec = (xl_heap_confirm *) XLogRecGetData(record);
-	Buffer		buffer;
-	Page		page;
-	OffsetNumber offnum;
-	ItemId		lp = NULL;
-	HeapTupleHeader htup;
-
-	if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
-	{
-		page = BufferGetPage(buffer);
-
-		offnum = xlrec->offnum;
-		if (PageGetMaxOffsetNumber(page) >= offnum)
-			lp = PageGetItemId(page, offnum);
-
-		if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp))
-			elog(PANIC, "invalid lp");
-
-		htup = (HeapTupleHeader) PageGetItem(page, lp);
-
-		/*
-		 * Confirm tuple as actually inserted
-		 */
-		ItemPointerSet(&htup->t_ctid, BufferGetBlockNumber(buffer), offnum);
-
-		PageSetLSN(page, lsn);
-		MarkBufferDirty(buffer);
-	}
-	if (BufferIsValid(buffer))
-		UnlockReleaseBuffer(buffer);
-}
-
-static void
-heap_xlog_lock(XLogReaderState *record)
-{
-	XLogRecPtr	lsn = record->EndRecPtr;
-	xl_heap_lock *xlrec = (xl_heap_lock *) XLogRecGetData(record);
-	Buffer		buffer;
-	Page		page;
-	OffsetNumber offnum;
-	ItemId		lp = NULL;
-	HeapTupleHeader htup;
-
-	/*
-	 * The visibility map may need to be fixed even if the heap page is
-	 * already up-to-date.
-	 */
-	if (xlrec->flags & XLH_LOCK_ALL_FROZEN_CLEARED)
-	{
-		RelFileLocator rlocator;
-		Buffer		vmbuffer = InvalidBuffer;
-		BlockNumber block;
-		Relation	reln;
-
-		XLogRecGetBlockTag(record, 0, &rlocator, NULL, &block);
-		reln = CreateFakeRelcacheEntry(rlocator);
-
-		visibilitymap_pin(reln, block, &vmbuffer);
-		visibilitymap_clear(reln, block, vmbuffer, VISIBILITYMAP_ALL_FROZEN);
-
-		ReleaseBuffer(vmbuffer);
-		FreeFakeRelcacheEntry(reln);
-	}
-
-	if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
-	{
-		page = (Page) BufferGetPage(buffer);
-
-		offnum = xlrec->offnum;
-		if (PageGetMaxOffsetNumber(page) >= offnum)
-			lp = PageGetItemId(page, offnum);
-
-		if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp))
-			elog(PANIC, "invalid lp");
-
-		htup = (HeapTupleHeader) PageGetItem(page, lp);
-
-		htup->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED);
-		htup->t_infomask2 &= ~HEAP_KEYS_UPDATED;
-		fix_infomask_from_infobits(xlrec->infobits_set, &htup->t_infomask,
-								   &htup->t_infomask2);
-
-		/*
-		 * Clear relevant update flags, but only if the modified infomask says
-		 * there's no update.
-		 */
-		if (HEAP_XMAX_IS_LOCKED_ONLY(htup->t_infomask))
-		{
-			HeapTupleHeaderClearHotUpdated(htup);
-			/* Make sure there is no forward chain link in t_ctid */
-			ItemPointerSet(&htup->t_ctid,
-						   BufferGetBlockNumber(buffer),
-						   offnum);
-		}
-		HeapTupleHeaderSetXmax(htup, xlrec->xmax);
-		HeapTupleHeaderSetCmax(htup, FirstCommandId, false);
-		PageSetLSN(page, lsn);
-		MarkBufferDirty(buffer);
-	}
-	if (BufferIsValid(buffer))
-		UnlockReleaseBuffer(buffer);
-}
-
-static void
-heap_xlog_lock_updated(XLogReaderState *record)
-{
-	XLogRecPtr	lsn = record->EndRecPtr;
-	xl_heap_lock_updated *xlrec;
-	Buffer		buffer;
-	Page		page;
-	OffsetNumber offnum;
-	ItemId		lp = NULL;
-	HeapTupleHeader htup;
-
-	xlrec = (xl_heap_lock_updated *) XLogRecGetData(record);
-
-	/*
-	 * The visibility map may need to be fixed even if the heap page is
-	 * already up-to-date.
-	 */
-	if (xlrec->flags & XLH_LOCK_ALL_FROZEN_CLEARED)
-	{
-		RelFileLocator rlocator;
-		Buffer		vmbuffer = InvalidBuffer;
-		BlockNumber block;
-		Relation	reln;
-
-		XLogRecGetBlockTag(record, 0, &rlocator, NULL, &block);
-		reln = CreateFakeRelcacheEntry(rlocator);
-
-		visibilitymap_pin(reln, block, &vmbuffer);
-		visibilitymap_clear(reln, block, vmbuffer, VISIBILITYMAP_ALL_FROZEN);
-
-		ReleaseBuffer(vmbuffer);
-		FreeFakeRelcacheEntry(reln);
-	}
-
-	if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
-	{
-		page = BufferGetPage(buffer);
-
-		offnum = xlrec->offnum;
-		if (PageGetMaxOffsetNumber(page) >= offnum)
-			lp = PageGetItemId(page, offnum);
-
-		if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp))
-			elog(PANIC, "invalid lp");
-
-		htup = (HeapTupleHeader) PageGetItem(page, lp);
-
-		htup->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED);
-		htup->t_infomask2 &= ~HEAP_KEYS_UPDATED;
-		fix_infomask_from_infobits(xlrec->infobits_set, &htup->t_infomask,
-								   &htup->t_infomask2);
-		HeapTupleHeaderSetXmax(htup, xlrec->xmax);
-
-		PageSetLSN(page, lsn);
-		MarkBufferDirty(buffer);
-	}
-	if (BufferIsValid(buffer))
-		UnlockReleaseBuffer(buffer);
-}
-
-static void
-heap_xlog_inplace(XLogReaderState *record)
-{
-	XLogRecPtr	lsn = record->EndRecPtr;
-	xl_heap_inplace *xlrec = (xl_heap_inplace *) XLogRecGetData(record);
-	Buffer		buffer;
-	Page		page;
-	OffsetNumber offnum;
-	ItemId		lp = NULL;
-	HeapTupleHeader htup;
-	uint32		oldlen;
-	Size		newlen;
-
-	if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
-	{
-		char	   *newtup = XLogRecGetBlockData(record, 0, &newlen);
-
-		page = BufferGetPage(buffer);
-
-		offnum = xlrec->offnum;
-		if (PageGetMaxOffsetNumber(page) >= offnum)
-			lp = PageGetItemId(page, offnum);
-
-		if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp))
-			elog(PANIC, "invalid lp");
-
-		htup = (HeapTupleHeader) PageGetItem(page, lp);
-
-		oldlen = ItemIdGetLength(lp) - htup->t_hoff;
-		if (oldlen != newlen)
-			elog(PANIC, "wrong tuple length");
-
-		memcpy((char *) htup + htup->t_hoff, newtup, newlen);
-
-		PageSetLSN(page, lsn);
-		MarkBufferDirty(buffer);
-	}
-	if (BufferIsValid(buffer))
-		UnlockReleaseBuffer(buffer);
-}
-
-void
-heap_redo(XLogReaderState *record)
-{
-	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
-
-	/*
-	 * These operations don't overwrite MVCC data so no conflict processing is
-	 * required. The ones in heap2 rmgr do.
-	 */
-
-	switch (info & XLOG_HEAP_OPMASK)
-	{
-		case XLOG_HEAP_INSERT:
-			heap_xlog_insert(record);
-			break;
-		case XLOG_HEAP_DELETE:
-			heap_xlog_delete(record);
-			break;
-		case XLOG_HEAP_UPDATE:
-			heap_xlog_update(record, false);
-			break;
-		case XLOG_HEAP_TRUNCATE:
-
-			/*
-			 * TRUNCATE is a no-op because the actions are already logged as
-			 * SMGR WAL records.  TRUNCATE WAL record only exists for logical
-			 * decoding.
-			 */
-			break;
-		case XLOG_HEAP_HOT_UPDATE:
-			heap_xlog_update(record, true);
-			break;
-		case XLOG_HEAP_CONFIRM:
-			heap_xlog_confirm(record);
-			break;
-		case XLOG_HEAP_LOCK:
-			heap_xlog_lock(record);
-			break;
-		case XLOG_HEAP_INPLACE:
-			heap_xlog_inplace(record);
-			break;
-		default:
-			elog(PANIC, "heap_redo: unknown op code %u", info);
-	}
-}
-
-void
-heap2_redo(XLogReaderState *record)
-{
-	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
-
-	switch (info & XLOG_HEAP_OPMASK)
-	{
-		case XLOG_HEAP2_PRUNE_ON_ACCESS:
-		case XLOG_HEAP2_PRUNE_VACUUM_SCAN:
-		case XLOG_HEAP2_PRUNE_VACUUM_CLEANUP:
-			heap_xlog_prune_freeze(record);
-			break;
-		case XLOG_HEAP2_VISIBLE:
-			heap_xlog_visible(record);
-			break;
-		case XLOG_HEAP2_MULTI_INSERT:
-			heap_xlog_multi_insert(record);
-			break;
-		case XLOG_HEAP2_LOCK_UPDATED:
-			heap_xlog_lock_updated(record);
-			break;
-		case XLOG_HEAP2_NEW_CID:
-
-			/*
-			 * Nothing to do on a real replay, only used during logical
-			 * decoding.
-			 */
-			break;
-		case XLOG_HEAP2_REWRITE:
-			heap_xlog_logical_rewrite(record);
-			break;
-		default:
-			elog(PANIC, "heap2_redo: unknown op code %u", info);
-	}
-}
-
-/*
- * Mask a heap page before performing consistency checks on it.
- */
-void
-heap_mask(char *pagedata, BlockNumber blkno)
-{
-	Page		page = (Page) pagedata;
-	OffsetNumber off;
-
-	mask_page_lsn_and_checksum(page);
-
-	mask_page_hint_bits(page);
-	mask_unused_space(page);
-
-	for (off = 1; off <= PageGetMaxOffsetNumber(page); off++)
-	{
-		ItemId		iid = PageGetItemId(page, off);
-		char	   *page_item;
-
-		page_item = (char *) (page + ItemIdGetOffset(iid));
-
-		if (ItemIdIsNormal(iid))
-		{
-			HeapTupleHeader page_htup = (HeapTupleHeader) page_item;
-
-			/*
-			 * If xmin of a tuple is not yet frozen, we should ignore
-			 * differences in hint bits, since they can be set without
-			 * emitting WAL.
-			 */
-			if (!HeapTupleHeaderXminFrozen(page_htup))
-				page_htup->t_infomask &= ~HEAP_XACT_MASK;
-			else
-			{
-				/* Still we need to mask xmax hint bits. */
-				page_htup->t_infomask &= ~HEAP_XMAX_INVALID;
-				page_htup->t_infomask &= ~HEAP_XMAX_COMMITTED;
-			}
-
-			/*
-			 * During replay, we set Command Id to FirstCommandId. Hence, mask
-			 * it. See heap_xlog_insert() for details.
-			 */
-			page_htup->t_choice.t_heap.t_field3.t_cid = MASK_MARKER;
-
-			/*
-			 * For a speculative tuple, heap_insert() does not set ctid in the
-			 * caller-passed heap tuple itself, leaving the ctid field to
-			 * contain a speculative token value - a per-backend monotonically
-			 * increasing identifier. Besides, it does not WAL-log ctid under
-			 * any circumstances.
-			 *
-			 * During redo, heap_xlog_insert() sets t_ctid to current block
-			 * number and self offset number. It doesn't care about any
-			 * speculative insertions on the primary. Hence, we set t_ctid to
-			 * current block number and self offset number to ignore any
-			 * inconsistency.
-			 */
-			if (HeapTupleHeaderIsSpeculative(page_htup))
-				ItemPointerSet(&page_htup->t_ctid, blkno, off);
-
-			/*
-			 * NB: Not ignoring ctid changes due to the tuple having moved
-			 * (i.e. HeapTupleHeaderIndicatesMovedPartitions), because that's
-			 * important information that needs to be in-sync between primary
-			 * and standby, and thus is WAL logged.
-			 */
-		}
-
-		/*
-		 * Ignore any padding bytes after the tuple, when the length of the
-		 * item is not MAXALIGNed.
-		 */
-		if (ItemIdHasStorage(iid))
-		{
-			int			len = ItemIdGetLength(iid);
-			int			padlen = MAXALIGN(len) - len;
-
-			if (padlen > 0)
-				memset(page_item + len, MASK_MARKER, padlen);
-		}
-	}
-}
-
 /*
  * HeapCheckForSerializableConflictOut
  *		We are reading a tuple.  If it's not visible, there may be a
diff --git a/src/backend/access/heap/heapam_xlog.c b/src/backend/access/heap/heapam_xlog.c
new file mode 100644
index 0000000000..b372f2b4af
--- /dev/null
+++ b/src/backend/access/heap/heapam_xlog.c
@@ -0,0 +1,1326 @@
+/*-------------------------------------------------------------------------
+ *
+ * heapam_xlog.c
+ *	  WAL replay logic for heaps.
+ *
+ * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ *	  src/backend/access/heap/heapam_xlog.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/bufmask.h"
+#include "access/heapam.h"
+#include "access/heapam_xlog.h"
+#include "access/transam.h"
+#include "access/visibilitymap.h"
+#include "access/xlog.h"
+#include "access/xlogutils.h"
+#include "port/atomics.h"
+#include "storage/bufmgr.h"
+#include "storage/freespace.h"
+#include "storage/standby.h"
+#include "utils/relcache.h"
+
+
+/*
+ * Replay XLOG_HEAP2_PRUNE_* records.
+ */
+static void
+heap_xlog_prune_freeze(XLogReaderState *record)
+{
+	XLogRecPtr	lsn = record->EndRecPtr;
+	char	   *maindataptr = XLogRecGetData(record);
+	xl_heap_prune xlrec;
+	Buffer		buffer;
+	RelFileLocator rlocator;
+	BlockNumber blkno;
+	XLogRedoAction action;
+
+	XLogRecGetBlockTag(record, 0, &rlocator, NULL, &blkno);
+	memcpy(&xlrec, maindataptr, SizeOfHeapPrune);
+	maindataptr += SizeOfHeapPrune;
+
+	/*
+	 * We will take an ordinary exclusive lock or a cleanup lock depending on
+	 * whether the XLHP_CLEANUP_LOCK flag is set.  With an ordinary exclusive
+	 * lock, we better not be doing anything that requires moving existing
+	 * tuple data.
+	 */
+	Assert((xlrec.flags & XLHP_CLEANUP_LOCK) != 0 ||
+		   (xlrec.flags & (XLHP_HAS_REDIRECTIONS | XLHP_HAS_DEAD_ITEMS)) == 0);
+
+	/*
+	 * We are about to remove and/or freeze tuples.  In Hot Standby mode,
+	 * ensure that there are no queries running for which the removed tuples
+	 * are still visible or which still consider the frozen xids as running.
+	 * The conflict horizon XID comes after xl_heap_prune.
+	 */
+	if ((xlrec.flags & XLHP_HAS_CONFLICT_HORIZON) != 0)
+	{
+		TransactionId snapshot_conflict_horizon;
+
+		/* memcpy() because snapshot_conflict_horizon is stored unaligned */
+		memcpy(&snapshot_conflict_horizon, maindataptr, sizeof(TransactionId));
+		maindataptr += sizeof(TransactionId);
+
+		if (InHotStandby)
+			ResolveRecoveryConflictWithSnapshot(snapshot_conflict_horizon,
+												(xlrec.flags & XLHP_IS_CATALOG_REL) != 0,
+												rlocator);
+	}
+
+	/*
+	 * If we have a full-page image, restore it and we're done.
+	 */
+	action = XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL,
+										   (xlrec.flags & XLHP_CLEANUP_LOCK) != 0,
+										   &buffer);
+	if (action == BLK_NEEDS_REDO)
+	{
+		Page		page = (Page) BufferGetPage(buffer);
+		OffsetNumber *redirected;
+		OffsetNumber *nowdead;
+		OffsetNumber *nowunused;
+		int			nredirected;
+		int			ndead;
+		int			nunused;
+		int			nplans;
+		Size		datalen;
+		xlhp_freeze_plan *plans;
+		OffsetNumber *frz_offsets;
+		char	   *dataptr = XLogRecGetBlockData(record, 0, &datalen);
+
+		heap_xlog_deserialize_prune_and_freeze(dataptr, xlrec.flags,
+											   &nplans, &plans, &frz_offsets,
+											   &nredirected, &redirected,
+											   &ndead, &nowdead,
+											   &nunused, &nowunused);
+
+		/*
+		 * Update all line pointers per the record, and repair fragmentation
+		 * if needed.
+		 */
+		if (nredirected > 0 || ndead > 0 || nunused > 0)
+			heap_page_prune_execute(buffer,
+									(xlrec.flags & XLHP_CLEANUP_LOCK) == 0,
+									redirected, nredirected,
+									nowdead, ndead,
+									nowunused, nunused);
+
+		/* Freeze tuples */
+		for (int p = 0; p < nplans; p++)
+		{
+			HeapTupleFreeze frz;
+
+			/*
+			 * Convert freeze plan representation from WAL record into
+			 * per-tuple format used by heap_execute_freeze_tuple
+			 */
+			frz.xmax = plans[p].xmax;
+			frz.t_infomask2 = plans[p].t_infomask2;
+			frz.t_infomask = plans[p].t_infomask;
+			frz.frzflags = plans[p].frzflags;
+			frz.offset = InvalidOffsetNumber;	/* unused, but be tidy */
+
+			for (int i = 0; i < plans[p].ntuples; i++)
+			{
+				OffsetNumber offset = *(frz_offsets++);
+				ItemId		lp;
+				HeapTupleHeader tuple;
+
+				lp = PageGetItemId(page, offset);
+				tuple = (HeapTupleHeader) PageGetItem(page, lp);
+				heap_execute_freeze_tuple(tuple, &frz);
+			}
+		}
+
+		/* There should be no more data */
+		Assert((char *) frz_offsets == dataptr + datalen);
+
+		/*
+		 * Note: we don't worry about updating the page's prunability hints.
+		 * At worst this will cause an extra prune cycle to occur soon.
+		 */
+
+		PageSetLSN(page, lsn);
+		MarkBufferDirty(buffer);
+	}
+
+	/*
+	 * If we released any space or line pointers, update the free space map.
+	 *
+	 * Do this regardless of a full-page image being applied, since the FSM
+	 * data is not in the page anyway.
+	 */
+	if (BufferIsValid(buffer))
+	{
+		if (xlrec.flags & (XLHP_HAS_REDIRECTIONS |
+						   XLHP_HAS_DEAD_ITEMS |
+						   XLHP_HAS_NOW_UNUSED_ITEMS))
+		{
+			Size		freespace = PageGetHeapFreeSpace(BufferGetPage(buffer));
+
+			UnlockReleaseBuffer(buffer);
+
+			XLogRecordPageWithFreeSpace(rlocator, blkno, freespace);
+		}
+		else
+			UnlockReleaseBuffer(buffer);
+	}
+}
+
+/*
+ * Replay XLOG_HEAP2_VISIBLE record.
+ *
+ * The critical integrity requirement here is that we must never end up with
+ * a situation where the visibility map bit is set, and the page-level
+ * PD_ALL_VISIBLE bit is clear.  If that were to occur, then a subsequent
+ * page modification would fail to clear the visibility map bit.
+ */
+static void
+heap_xlog_visible(XLogReaderState *record)
+{
+	XLogRecPtr	lsn = record->EndRecPtr;
+	xl_heap_visible *xlrec = (xl_heap_visible *) XLogRecGetData(record);
+	Buffer		vmbuffer = InvalidBuffer;
+	Buffer		buffer;
+	Page		page;
+	RelFileLocator rlocator;
+	BlockNumber blkno;
+	XLogRedoAction action;
+
+	Assert((xlrec->flags & VISIBILITYMAP_XLOG_VALID_BITS) == xlrec->flags);
+
+	XLogRecGetBlockTag(record, 1, &rlocator, NULL, &blkno);
+
+	/*
+	 * If there are any Hot Standby transactions running that have an xmin
+	 * horizon old enough that this page isn't all-visible for them, they
+	 * might incorrectly decide that an index-only scan can skip a heap fetch.
+	 *
+	 * NB: It might be better to throw some kind of "soft" conflict here that
+	 * forces any index-only scan that is in flight to perform heap fetches,
+	 * rather than killing the transaction outright.
+	 */
+	if (InHotStandby)
+		ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
+											xlrec->flags & VISIBILITYMAP_XLOG_CATALOG_REL,
+											rlocator);
+
+	/*
+	 * Read the heap page, if it still exists. If the heap file has dropped or
+	 * truncated later in recovery, we don't need to update the page, but we'd
+	 * better still update the visibility map.
+	 */
+	action = XLogReadBufferForRedo(record, 1, &buffer);
+	if (action == BLK_NEEDS_REDO)
+	{
+		/*
+		 * We don't bump the LSN of the heap page when setting the visibility
+		 * map bit (unless checksums or wal_hint_bits is enabled, in which
+		 * case we must). This exposes us to torn page hazards, but since
+		 * we're not inspecting the existing page contents in any way, we
+		 * don't care.
+		 */
+		page = BufferGetPage(buffer);
+
+		PageSetAllVisible(page);
+
+		if (XLogHintBitIsNeeded())
+			PageSetLSN(page, lsn);
+
+		MarkBufferDirty(buffer);
+	}
+	else if (action == BLK_RESTORED)
+	{
+		/*
+		 * If heap block was backed up, we already restored it and there's
+		 * nothing more to do. (This can only happen with checksums or
+		 * wal_log_hints enabled.)
+		 */
+	}
+
+	if (BufferIsValid(buffer))
+	{
+		Size		space = PageGetFreeSpace(BufferGetPage(buffer));
+
+		UnlockReleaseBuffer(buffer);
+
+		/*
+		 * Since FSM is not WAL-logged and only updated heuristically, it
+		 * easily becomes stale in standbys.  If the standby is later promoted
+		 * and runs VACUUM, it will skip updating individual free space
+		 * figures for pages that became all-visible (or all-frozen, depending
+		 * on the vacuum mode,) which is troublesome when FreeSpaceMapVacuum
+		 * propagates too optimistic free space values to upper FSM layers;
+		 * later inserters try to use such pages only to find out that they
+		 * are unusable.  This can cause long stalls when there are many such
+		 * pages.
+		 *
+		 * Forestall those problems by updating FSM's idea about a page that
+		 * is becoming all-visible or all-frozen.
+		 *
+		 * Do this regardless of a full-page image being applied, since the
+		 * FSM data is not in the page anyway.
+		 */
+		if (xlrec->flags & VISIBILITYMAP_VALID_BITS)
+			XLogRecordPageWithFreeSpace(rlocator, blkno, space);
+	}
+
+	/*
+	 * Even if we skipped the heap page update due to the LSN interlock, it's
+	 * still safe to update the visibility map.  Any WAL record that clears
+	 * the visibility map bit does so before checking the page LSN, so any
+	 * bits that need to be cleared will still be cleared.
+	 */
+	if (XLogReadBufferForRedoExtended(record, 0, RBM_ZERO_ON_ERROR, false,
+									  &vmbuffer) == BLK_NEEDS_REDO)
+	{
+		Page		vmpage = BufferGetPage(vmbuffer);
+		Relation	reln;
+		uint8		vmbits;
+
+		/* initialize the page if it was read as zeros */
+		if (PageIsNew(vmpage))
+			PageInit(vmpage, BLCKSZ, 0);
+
+		/* remove VISIBILITYMAP_XLOG_* */
+		vmbits = xlrec->flags & VISIBILITYMAP_VALID_BITS;
+
+		/*
+		 * XLogReadBufferForRedoExtended locked the buffer. But
+		 * visibilitymap_set will handle locking itself.
+		 */
+		LockBuffer(vmbuffer, BUFFER_LOCK_UNLOCK);
+
+		reln = CreateFakeRelcacheEntry(rlocator);
+		visibilitymap_pin(reln, blkno, &vmbuffer);
+
+		visibilitymap_set(reln, blkno, InvalidBuffer, lsn, vmbuffer,
+						  xlrec->snapshotConflictHorizon, vmbits);
+
+		ReleaseBuffer(vmbuffer);
+		FreeFakeRelcacheEntry(reln);
+	}
+	else if (BufferIsValid(vmbuffer))
+		UnlockReleaseBuffer(vmbuffer);
+}
+
+/*
+ * Given an "infobits" field from an XLog record, set the correct bits in the
+ * given infomask and infomask2 for the tuple touched by the record.
+ *
+ * (This is the reverse of compute_infobits).
+ */
+static void
+fix_infomask_from_infobits(uint8 infobits, uint16 *infomask, uint16 *infomask2)
+{
+	*infomask &= ~(HEAP_XMAX_IS_MULTI | HEAP_XMAX_LOCK_ONLY |
+				   HEAP_XMAX_KEYSHR_LOCK | HEAP_XMAX_EXCL_LOCK);
+	*infomask2 &= ~HEAP_KEYS_UPDATED;
+
+	if (infobits & XLHL_XMAX_IS_MULTI)
+		*infomask |= HEAP_XMAX_IS_MULTI;
+	if (infobits & XLHL_XMAX_LOCK_ONLY)
+		*infomask |= HEAP_XMAX_LOCK_ONLY;
+	if (infobits & XLHL_XMAX_EXCL_LOCK)
+		*infomask |= HEAP_XMAX_EXCL_LOCK;
+	/* note HEAP_XMAX_SHR_LOCK isn't considered here */
+	if (infobits & XLHL_XMAX_KEYSHR_LOCK)
+		*infomask |= HEAP_XMAX_KEYSHR_LOCK;
+
+	if (infobits & XLHL_KEYS_UPDATED)
+		*infomask2 |= HEAP_KEYS_UPDATED;
+}
+
+static void
+heap_xlog_delete(XLogReaderState *record)
+{
+	XLogRecPtr	lsn = record->EndRecPtr;
+	xl_heap_delete *xlrec = (xl_heap_delete *) XLogRecGetData(record);
+	Buffer		buffer;
+	Page		page;
+	ItemId		lp = NULL;
+	HeapTupleHeader htup;
+	BlockNumber blkno;
+	RelFileLocator target_locator;
+	ItemPointerData target_tid;
+
+	XLogRecGetBlockTag(record, 0, &target_locator, NULL, &blkno);
+	ItemPointerSetBlockNumber(&target_tid, blkno);
+	ItemPointerSetOffsetNumber(&target_tid, xlrec->offnum);
+
+	/*
+	 * The visibility map may need to be fixed even if the heap page is
+	 * already up-to-date.
+	 */
+	if (xlrec->flags & XLH_DELETE_ALL_VISIBLE_CLEARED)
+	{
+		Relation	reln = CreateFakeRelcacheEntry(target_locator);
+		Buffer		vmbuffer = InvalidBuffer;
+
+		visibilitymap_pin(reln, blkno, &vmbuffer);
+		visibilitymap_clear(reln, blkno, vmbuffer, VISIBILITYMAP_VALID_BITS);
+		ReleaseBuffer(vmbuffer);
+		FreeFakeRelcacheEntry(reln);
+	}
+
+	if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
+	{
+		page = BufferGetPage(buffer);
+
+		if (PageGetMaxOffsetNumber(page) >= xlrec->offnum)
+			lp = PageGetItemId(page, xlrec->offnum);
+
+		if (PageGetMaxOffsetNumber(page) < xlrec->offnum || !ItemIdIsNormal(lp))
+			elog(PANIC, "invalid lp");
+
+		htup = (HeapTupleHeader) PageGetItem(page, lp);
+
+		htup->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED);
+		htup->t_infomask2 &= ~HEAP_KEYS_UPDATED;
+		HeapTupleHeaderClearHotUpdated(htup);
+		fix_infomask_from_infobits(xlrec->infobits_set,
+								   &htup->t_infomask, &htup->t_infomask2);
+		if (!(xlrec->flags & XLH_DELETE_IS_SUPER))
+			HeapTupleHeaderSetXmax(htup, xlrec->xmax);
+		else
+			HeapTupleHeaderSetXmin(htup, InvalidTransactionId);
+		HeapTupleHeaderSetCmax(htup, FirstCommandId, false);
+
+		/* Mark the page as a candidate for pruning */
+		PageSetPrunable(page, XLogRecGetXid(record));
+
+		if (xlrec->flags & XLH_DELETE_ALL_VISIBLE_CLEARED)
+			PageClearAllVisible(page);
+
+		/* Make sure t_ctid is set correctly */
+		if (xlrec->flags & XLH_DELETE_IS_PARTITION_MOVE)
+			HeapTupleHeaderSetMovedPartitions(htup);
+		else
+			htup->t_ctid = target_tid;
+		PageSetLSN(page, lsn);
+		MarkBufferDirty(buffer);
+	}
+	if (BufferIsValid(buffer))
+		UnlockReleaseBuffer(buffer);
+}
+
+static void
+heap_xlog_insert(XLogReaderState *record)
+{
+	XLogRecPtr	lsn = record->EndRecPtr;
+	xl_heap_insert *xlrec = (xl_heap_insert *) XLogRecGetData(record);
+	Buffer		buffer;
+	Page		page;
+	union
+	{
+		HeapTupleHeaderData hdr;
+		char		data[MaxHeapTupleSize];
+	}			tbuf;
+	HeapTupleHeader htup;
+	xl_heap_header xlhdr;
+	uint32		newlen;
+	Size		freespace = 0;
+	RelFileLocator target_locator;
+	BlockNumber blkno;
+	ItemPointerData target_tid;
+	XLogRedoAction action;
+
+	XLogRecGetBlockTag(record, 0, &target_locator, NULL, &blkno);
+	ItemPointerSetBlockNumber(&target_tid, blkno);
+	ItemPointerSetOffsetNumber(&target_tid, xlrec->offnum);
+
+	/*
+	 * The visibility map may need to be fixed even if the heap page is
+	 * already up-to-date.
+	 */
+	if (xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED)
+	{
+		Relation	reln = CreateFakeRelcacheEntry(target_locator);
+		Buffer		vmbuffer = InvalidBuffer;
+
+		visibilitymap_pin(reln, blkno, &vmbuffer);
+		visibilitymap_clear(reln, blkno, vmbuffer, VISIBILITYMAP_VALID_BITS);
+		ReleaseBuffer(vmbuffer);
+		FreeFakeRelcacheEntry(reln);
+	}
+
+	/*
+	 * If we inserted the first and only tuple on the page, re-initialize the
+	 * page from scratch.
+	 */
+	if (XLogRecGetInfo(record) & XLOG_HEAP_INIT_PAGE)
+	{
+		buffer = XLogInitBufferForRedo(record, 0);
+		page = BufferGetPage(buffer);
+		PageInit(page, BufferGetPageSize(buffer), 0);
+		action = BLK_NEEDS_REDO;
+	}
+	else
+		action = XLogReadBufferForRedo(record, 0, &buffer);
+	if (action == BLK_NEEDS_REDO)
+	{
+		Size		datalen;
+		char	   *data;
+
+		page = BufferGetPage(buffer);
+
+		if (PageGetMaxOffsetNumber(page) + 1 < xlrec->offnum)
+			elog(PANIC, "invalid max offset number");
+
+		data = XLogRecGetBlockData(record, 0, &datalen);
+
+		newlen = datalen - SizeOfHeapHeader;
+		Assert(datalen > SizeOfHeapHeader && newlen <= MaxHeapTupleSize);
+		memcpy((char *) &xlhdr, data, SizeOfHeapHeader);
+		data += SizeOfHeapHeader;
+
+		htup = &tbuf.hdr;
+		MemSet((char *) htup, 0, SizeofHeapTupleHeader);
+		/* PG73FORMAT: get bitmap [+ padding] [+ oid] + data */
+		memcpy((char *) htup + SizeofHeapTupleHeader,
+			   data,
+			   newlen);
+		newlen += SizeofHeapTupleHeader;
+		htup->t_infomask2 = xlhdr.t_infomask2;
+		htup->t_infomask = xlhdr.t_infomask;
+		htup->t_hoff = xlhdr.t_hoff;
+		HeapTupleHeaderSetXmin(htup, XLogRecGetXid(record));
+		HeapTupleHeaderSetCmin(htup, FirstCommandId);
+		htup->t_ctid = target_tid;
+
+		if (PageAddItem(page, (Item) htup, newlen, xlrec->offnum,
+						true, true) == InvalidOffsetNumber)
+			elog(PANIC, "failed to add tuple");
+
+		freespace = PageGetHeapFreeSpace(page); /* needed to update FSM below */
+
+		PageSetLSN(page, lsn);
+
+		if (xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED)
+			PageClearAllVisible(page);
+
+		/* XLH_INSERT_ALL_FROZEN_SET implies that all tuples are visible */
+		if (xlrec->flags & XLH_INSERT_ALL_FROZEN_SET)
+			PageSetAllVisible(page);
+
+		MarkBufferDirty(buffer);
+	}
+	if (BufferIsValid(buffer))
+		UnlockReleaseBuffer(buffer);
+
+	/*
+	 * If the page is running low on free space, update the FSM as well.
+	 * Arbitrarily, our definition of "low" is less than 20%. We can't do much
+	 * better than that without knowing the fill-factor for the table.
+	 *
+	 * XXX: Don't do this if the page was restored from full page image. We
+	 * don't bother to update the FSM in that case, it doesn't need to be
+	 * totally accurate anyway.
+	 */
+	if (action == BLK_NEEDS_REDO && freespace < BLCKSZ / 5)
+		XLogRecordPageWithFreeSpace(target_locator, blkno, freespace);
+}
+
+/*
+ * Handles MULTI_INSERT record type.
+ */
+static void
+heap_xlog_multi_insert(XLogReaderState *record)
+{
+	XLogRecPtr	lsn = record->EndRecPtr;
+	xl_heap_multi_insert *xlrec;
+	RelFileLocator rlocator;
+	BlockNumber blkno;
+	Buffer		buffer;
+	Page		page;
+	union
+	{
+		HeapTupleHeaderData hdr;
+		char		data[MaxHeapTupleSize];
+	}			tbuf;
+	HeapTupleHeader htup;
+	uint32		newlen;
+	Size		freespace = 0;
+	int			i;
+	bool		isinit = (XLogRecGetInfo(record) & XLOG_HEAP_INIT_PAGE) != 0;
+	XLogRedoAction action;
+
+	/*
+	 * Insertion doesn't overwrite MVCC data, so no conflict processing is
+	 * required.
+	 */
+	xlrec = (xl_heap_multi_insert *) XLogRecGetData(record);
+
+	XLogRecGetBlockTag(record, 0, &rlocator, NULL, &blkno);
+
+	/* check that the mutually exclusive flags are not both set */
+	Assert(!((xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED) &&
+			 (xlrec->flags & XLH_INSERT_ALL_FROZEN_SET)));
+
+	/*
+	 * The visibility map may need to be fixed even if the heap page is
+	 * already up-to-date.
+	 */
+	if (xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED)
+	{
+		Relation	reln = CreateFakeRelcacheEntry(rlocator);
+		Buffer		vmbuffer = InvalidBuffer;
+
+		visibilitymap_pin(reln, blkno, &vmbuffer);
+		visibilitymap_clear(reln, blkno, vmbuffer, VISIBILITYMAP_VALID_BITS);
+		ReleaseBuffer(vmbuffer);
+		FreeFakeRelcacheEntry(reln);
+	}
+
+	if (isinit)
+	{
+		buffer = XLogInitBufferForRedo(record, 0);
+		page = BufferGetPage(buffer);
+		PageInit(page, BufferGetPageSize(buffer), 0);
+		action = BLK_NEEDS_REDO;
+	}
+	else
+		action = XLogReadBufferForRedo(record, 0, &buffer);
+	if (action == BLK_NEEDS_REDO)
+	{
+		char	   *tupdata;
+		char	   *endptr;
+		Size		len;
+
+		/* Tuples are stored as block data */
+		tupdata = XLogRecGetBlockData(record, 0, &len);
+		endptr = tupdata + len;
+
+		page = (Page) BufferGetPage(buffer);
+
+		for (i = 0; i < xlrec->ntuples; i++)
+		{
+			OffsetNumber offnum;
+			xl_multi_insert_tuple *xlhdr;
+
+			/*
+			 * If we're reinitializing the page, the tuples are stored in
+			 * order from FirstOffsetNumber. Otherwise there's an array of
+			 * offsets in the WAL record, and the tuples come after that.
+			 */
+			if (isinit)
+				offnum = FirstOffsetNumber + i;
+			else
+				offnum = xlrec->offsets[i];
+			if (PageGetMaxOffsetNumber(page) + 1 < offnum)
+				elog(PANIC, "invalid max offset number");
+
+			xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(tupdata);
+			tupdata = ((char *) xlhdr) + SizeOfMultiInsertTuple;
+
+			newlen = xlhdr->datalen;
+			Assert(newlen <= MaxHeapTupleSize);
+			htup = &tbuf.hdr;
+			MemSet((char *) htup, 0, SizeofHeapTupleHeader);
+			/* PG73FORMAT: get bitmap [+ padding] [+ oid] + data */
+			memcpy((char *) htup + SizeofHeapTupleHeader,
+				   (char *) tupdata,
+				   newlen);
+			tupdata += newlen;
+
+			newlen += SizeofHeapTupleHeader;
+			htup->t_infomask2 = xlhdr->t_infomask2;
+			htup->t_infomask = xlhdr->t_infomask;
+			htup->t_hoff = xlhdr->t_hoff;
+			HeapTupleHeaderSetXmin(htup, XLogRecGetXid(record));
+			HeapTupleHeaderSetCmin(htup, FirstCommandId);
+			ItemPointerSetBlockNumber(&htup->t_ctid, blkno);
+			ItemPointerSetOffsetNumber(&htup->t_ctid, offnum);
+
+			offnum = PageAddItem(page, (Item) htup, newlen, offnum, true, true);
+			if (offnum == InvalidOffsetNumber)
+				elog(PANIC, "failed to add tuple");
+		}
+		if (tupdata != endptr)
+			elog(PANIC, "total tuple length mismatch");
+
+		freespace = PageGetHeapFreeSpace(page); /* needed to update FSM below */
+
+		PageSetLSN(page, lsn);
+
+		if (xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED)
+			PageClearAllVisible(page);
+
+		/* XLH_INSERT_ALL_FROZEN_SET implies that all tuples are visible */
+		if (xlrec->flags & XLH_INSERT_ALL_FROZEN_SET)
+			PageSetAllVisible(page);
+
+		MarkBufferDirty(buffer);
+	}
+	if (BufferIsValid(buffer))
+		UnlockReleaseBuffer(buffer);
+
+	/*
+	 * If the page is running low on free space, update the FSM as well.
+	 * Arbitrarily, our definition of "low" is less than 20%. We can't do much
+	 * better than that without knowing the fill-factor for the table.
+	 *
+	 * XXX: Don't do this if the page was restored from full page image. We
+	 * don't bother to update the FSM in that case, it doesn't need to be
+	 * totally accurate anyway.
+	 */
+	if (action == BLK_NEEDS_REDO && freespace < BLCKSZ / 5)
+		XLogRecordPageWithFreeSpace(rlocator, blkno, freespace);
+}
+
+/*
+ * Handles UPDATE and HOT_UPDATE
+ */
+static void
+heap_xlog_update(XLogReaderState *record, bool hot_update)
+{
+	XLogRecPtr	lsn = record->EndRecPtr;
+	xl_heap_update *xlrec = (xl_heap_update *) XLogRecGetData(record);
+	RelFileLocator rlocator;
+	BlockNumber oldblk;
+	BlockNumber newblk;
+	ItemPointerData newtid;
+	Buffer		obuffer,
+				nbuffer;
+	Page		page;
+	OffsetNumber offnum;
+	ItemId		lp = NULL;
+	HeapTupleData oldtup;
+	HeapTupleHeader htup;
+	uint16		prefixlen = 0,
+				suffixlen = 0;
+	char	   *newp;
+	union
+	{
+		HeapTupleHeaderData hdr;
+		char		data[MaxHeapTupleSize];
+	}			tbuf;
+	xl_heap_header xlhdr;
+	uint32		newlen;
+	Size		freespace = 0;
+	XLogRedoAction oldaction;
+	XLogRedoAction newaction;
+
+	/* initialize to keep the compiler quiet */
+	oldtup.t_data = NULL;
+	oldtup.t_len = 0;
+
+	XLogRecGetBlockTag(record, 0, &rlocator, NULL, &newblk);
+	if (XLogRecGetBlockTagExtended(record, 1, NULL, NULL, &oldblk, NULL))
+	{
+		/* HOT updates are never done across pages */
+		Assert(!hot_update);
+	}
+	else
+		oldblk = newblk;
+
+	ItemPointerSet(&newtid, newblk, xlrec->new_offnum);
+
+	/*
+	 * The visibility map may need to be fixed even if the heap page is
+	 * already up-to-date.
+	 */
+	if (xlrec->flags & XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED)
+	{
+		Relation	reln = CreateFakeRelcacheEntry(rlocator);
+		Buffer		vmbuffer = InvalidBuffer;
+
+		visibilitymap_pin(reln, oldblk, &vmbuffer);
+		visibilitymap_clear(reln, oldblk, vmbuffer, VISIBILITYMAP_VALID_BITS);
+		ReleaseBuffer(vmbuffer);
+		FreeFakeRelcacheEntry(reln);
+	}
+
+	/*
+	 * In normal operation, it is important to lock the two pages in
+	 * page-number order, to avoid possible deadlocks against other update
+	 * operations going the other way.  However, during WAL replay there can
+	 * be no other update happening, so we don't need to worry about that. But
+	 * we *do* need to worry that we don't expose an inconsistent state to Hot
+	 * Standby queries --- so the original page can't be unlocked before we've
+	 * added the new tuple to the new page.
+	 */
+
+	/* Deal with old tuple version */
+	oldaction = XLogReadBufferForRedo(record, (oldblk == newblk) ? 0 : 1,
+									  &obuffer);
+	if (oldaction == BLK_NEEDS_REDO)
+	{
+		page = BufferGetPage(obuffer);
+		offnum = xlrec->old_offnum;
+		if (PageGetMaxOffsetNumber(page) >= offnum)
+			lp = PageGetItemId(page, offnum);
+
+		if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp))
+			elog(PANIC, "invalid lp");
+
+		htup = (HeapTupleHeader) PageGetItem(page, lp);
+
+		oldtup.t_data = htup;
+		oldtup.t_len = ItemIdGetLength(lp);
+
+		htup->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED);
+		htup->t_infomask2 &= ~HEAP_KEYS_UPDATED;
+		if (hot_update)
+			HeapTupleHeaderSetHotUpdated(htup);
+		else
+			HeapTupleHeaderClearHotUpdated(htup);
+		fix_infomask_from_infobits(xlrec->old_infobits_set, &htup->t_infomask,
+								   &htup->t_infomask2);
+		HeapTupleHeaderSetXmax(htup, xlrec->old_xmax);
+		HeapTupleHeaderSetCmax(htup, FirstCommandId, false);
+		/* Set forward chain link in t_ctid */
+		htup->t_ctid = newtid;
+
+		/* Mark the page as a candidate for pruning */
+		PageSetPrunable(page, XLogRecGetXid(record));
+
+		if (xlrec->flags & XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED)
+			PageClearAllVisible(page);
+
+		PageSetLSN(page, lsn);
+		MarkBufferDirty(obuffer);
+	}
+
+	/*
+	 * Read the page the new tuple goes into, if different from old.
+	 */
+	if (oldblk == newblk)
+	{
+		nbuffer = obuffer;
+		newaction = oldaction;
+	}
+	else if (XLogRecGetInfo(record) & XLOG_HEAP_INIT_PAGE)
+	{
+		nbuffer = XLogInitBufferForRedo(record, 0);
+		page = (Page) BufferGetPage(nbuffer);
+		PageInit(page, BufferGetPageSize(nbuffer), 0);
+		newaction = BLK_NEEDS_REDO;
+	}
+	else
+		newaction = XLogReadBufferForRedo(record, 0, &nbuffer);
+
+	/*
+	 * The visibility map may need to be fixed even if the heap page is
+	 * already up-to-date.
+	 */
+	if (xlrec->flags & XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED)
+	{
+		Relation	reln = CreateFakeRelcacheEntry(rlocator);
+		Buffer		vmbuffer = InvalidBuffer;
+
+		visibilitymap_pin(reln, newblk, &vmbuffer);
+		visibilitymap_clear(reln, newblk, vmbuffer, VISIBILITYMAP_VALID_BITS);
+		ReleaseBuffer(vmbuffer);
+		FreeFakeRelcacheEntry(reln);
+	}
+
+	/* Deal with new tuple */
+	if (newaction == BLK_NEEDS_REDO)
+	{
+		char	   *recdata;
+		char	   *recdata_end;
+		Size		datalen;
+		Size		tuplen;
+
+		recdata = XLogRecGetBlockData(record, 0, &datalen);
+		recdata_end = recdata + datalen;
+
+		page = BufferGetPage(nbuffer);
+
+		offnum = xlrec->new_offnum;
+		if (PageGetMaxOffsetNumber(page) + 1 < offnum)
+			elog(PANIC, "invalid max offset number");
+
+		if (xlrec->flags & XLH_UPDATE_PREFIX_FROM_OLD)
+		{
+			Assert(newblk == oldblk);
+			memcpy(&prefixlen, recdata, sizeof(uint16));
+			recdata += sizeof(uint16);
+		}
+		if (xlrec->flags & XLH_UPDATE_SUFFIX_FROM_OLD)
+		{
+			Assert(newblk == oldblk);
+			memcpy(&suffixlen, recdata, sizeof(uint16));
+			recdata += sizeof(uint16);
+		}
+
+		memcpy((char *) &xlhdr, recdata, SizeOfHeapHeader);
+		recdata += SizeOfHeapHeader;
+
+		tuplen = recdata_end - recdata;
+		Assert(tuplen <= MaxHeapTupleSize);
+
+		htup = &tbuf.hdr;
+		MemSet((char *) htup, 0, SizeofHeapTupleHeader);
+
+		/*
+		 * Reconstruct the new tuple using the prefix and/or suffix from the
+		 * old tuple, and the data stored in the WAL record.
+		 */
+		newp = (char *) htup + SizeofHeapTupleHeader;
+		if (prefixlen > 0)
+		{
+			int			len;
+
+			/* copy bitmap [+ padding] [+ oid] from WAL record */
+			len = xlhdr.t_hoff - SizeofHeapTupleHeader;
+			memcpy(newp, recdata, len);
+			recdata += len;
+			newp += len;
+
+			/* copy prefix from old tuple */
+			memcpy(newp, (char *) oldtup.t_data + oldtup.t_data->t_hoff, prefixlen);
+			newp += prefixlen;
+
+			/* copy new tuple data from WAL record */
+			len = tuplen - (xlhdr.t_hoff - SizeofHeapTupleHeader);
+			memcpy(newp, recdata, len);
+			recdata += len;
+			newp += len;
+		}
+		else
+		{
+			/*
+			 * copy bitmap [+ padding] [+ oid] + data from record, all in one
+			 * go
+			 */
+			memcpy(newp, recdata, tuplen);
+			recdata += tuplen;
+			newp += tuplen;
+		}
+		Assert(recdata == recdata_end);
+
+		/* copy suffix from old tuple */
+		if (suffixlen > 0)
+			memcpy(newp, (char *) oldtup.t_data + oldtup.t_len - suffixlen, suffixlen);
+
+		newlen = SizeofHeapTupleHeader + tuplen + prefixlen + suffixlen;
+		htup->t_infomask2 = xlhdr.t_infomask2;
+		htup->t_infomask = xlhdr.t_infomask;
+		htup->t_hoff = xlhdr.t_hoff;
+
+		HeapTupleHeaderSetXmin(htup, XLogRecGetXid(record));
+		HeapTupleHeaderSetCmin(htup, FirstCommandId);
+		HeapTupleHeaderSetXmax(htup, xlrec->new_xmax);
+		/* Make sure there is no forward chain link in t_ctid */
+		htup->t_ctid = newtid;
+
+		offnum = PageAddItem(page, (Item) htup, newlen, offnum, true, true);
+		if (offnum == InvalidOffsetNumber)
+			elog(PANIC, "failed to add tuple");
+
+		if (xlrec->flags & XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED)
+			PageClearAllVisible(page);
+
+		freespace = PageGetHeapFreeSpace(page); /* needed to update FSM below */
+
+		PageSetLSN(page, lsn);
+		MarkBufferDirty(nbuffer);
+	}
+
+	if (BufferIsValid(nbuffer) && nbuffer != obuffer)
+		UnlockReleaseBuffer(nbuffer);
+	if (BufferIsValid(obuffer))
+		UnlockReleaseBuffer(obuffer);
+
+	/*
+	 * If the new page is running low on free space, update the FSM as well.
+	 * Arbitrarily, our definition of "low" is less than 20%. We can't do much
+	 * better than that without knowing the fill-factor for the table.
+	 *
+	 * However, don't update the FSM on HOT updates, because after crash
+	 * recovery, either the old or the new tuple will certainly be dead and
+	 * prunable. After pruning, the page will have roughly as much free space
+	 * as it did before the update, assuming the new tuple is about the same
+	 * size as the old one.
+	 *
+	 * XXX: Don't do this if the page was restored from full page image. We
+	 * don't bother to update the FSM in that case, it doesn't need to be
+	 * totally accurate anyway.
+	 */
+	if (newaction == BLK_NEEDS_REDO && !hot_update && freespace < BLCKSZ / 5)
+		XLogRecordPageWithFreeSpace(rlocator, newblk, freespace);
+}
+
+static void
+heap_xlog_confirm(XLogReaderState *record)
+{
+	XLogRecPtr	lsn = record->EndRecPtr;
+	xl_heap_confirm *xlrec = (xl_heap_confirm *) XLogRecGetData(record);
+	Buffer		buffer;
+	Page		page;
+	OffsetNumber offnum;
+	ItemId		lp = NULL;
+	HeapTupleHeader htup;
+
+	if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
+	{
+		page = BufferGetPage(buffer);
+
+		offnum = xlrec->offnum;
+		if (PageGetMaxOffsetNumber(page) >= offnum)
+			lp = PageGetItemId(page, offnum);
+
+		if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp))
+			elog(PANIC, "invalid lp");
+
+		htup = (HeapTupleHeader) PageGetItem(page, lp);
+
+		/*
+		 * Confirm tuple as actually inserted
+		 */
+		ItemPointerSet(&htup->t_ctid, BufferGetBlockNumber(buffer), offnum);
+
+		PageSetLSN(page, lsn);
+		MarkBufferDirty(buffer);
+	}
+	if (BufferIsValid(buffer))
+		UnlockReleaseBuffer(buffer);
+}
+
+static void
+heap_xlog_lock(XLogReaderState *record)
+{
+	XLogRecPtr	lsn = record->EndRecPtr;
+	xl_heap_lock *xlrec = (xl_heap_lock *) XLogRecGetData(record);
+	Buffer		buffer;
+	Page		page;
+	OffsetNumber offnum;
+	ItemId		lp = NULL;
+	HeapTupleHeader htup;
+
+	/*
+	 * The visibility map may need to be fixed even if the heap page is
+	 * already up-to-date.
+	 */
+	if (xlrec->flags & XLH_LOCK_ALL_FROZEN_CLEARED)
+	{
+		RelFileLocator rlocator;
+		Buffer		vmbuffer = InvalidBuffer;
+		BlockNumber block;
+		Relation	reln;
+
+		XLogRecGetBlockTag(record, 0, &rlocator, NULL, &block);
+		reln = CreateFakeRelcacheEntry(rlocator);
+
+		visibilitymap_pin(reln, block, &vmbuffer);
+		visibilitymap_clear(reln, block, vmbuffer, VISIBILITYMAP_ALL_FROZEN);
+
+		ReleaseBuffer(vmbuffer);
+		FreeFakeRelcacheEntry(reln);
+	}
+
+	if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
+	{
+		page = (Page) BufferGetPage(buffer);
+
+		offnum = xlrec->offnum;
+		if (PageGetMaxOffsetNumber(page) >= offnum)
+			lp = PageGetItemId(page, offnum);
+
+		if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp))
+			elog(PANIC, "invalid lp");
+
+		htup = (HeapTupleHeader) PageGetItem(page, lp);
+
+		htup->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED);
+		htup->t_infomask2 &= ~HEAP_KEYS_UPDATED;
+		fix_infomask_from_infobits(xlrec->infobits_set, &htup->t_infomask,
+								   &htup->t_infomask2);
+
+		/*
+		 * Clear relevant update flags, but only if the modified infomask says
+		 * there's no update.
+		 */
+		if (HEAP_XMAX_IS_LOCKED_ONLY(htup->t_infomask))
+		{
+			HeapTupleHeaderClearHotUpdated(htup);
+			/* Make sure there is no forward chain link in t_ctid */
+			ItemPointerSet(&htup->t_ctid,
+						   BufferGetBlockNumber(buffer),
+						   offnum);
+		}
+		HeapTupleHeaderSetXmax(htup, xlrec->xmax);
+		HeapTupleHeaderSetCmax(htup, FirstCommandId, false);
+		PageSetLSN(page, lsn);
+		MarkBufferDirty(buffer);
+	}
+	if (BufferIsValid(buffer))
+		UnlockReleaseBuffer(buffer);
+}
+
+static void
+heap_xlog_lock_updated(XLogReaderState *record)
+{
+	XLogRecPtr	lsn = record->EndRecPtr;
+	xl_heap_lock_updated *xlrec;
+	Buffer		buffer;
+	Page		page;
+	OffsetNumber offnum;
+	ItemId		lp = NULL;
+	HeapTupleHeader htup;
+
+	xlrec = (xl_heap_lock_updated *) XLogRecGetData(record);
+
+	/*
+	 * The visibility map may need to be fixed even if the heap page is
+	 * already up-to-date.
+	 */
+	if (xlrec->flags & XLH_LOCK_ALL_FROZEN_CLEARED)
+	{
+		RelFileLocator rlocator;
+		Buffer		vmbuffer = InvalidBuffer;
+		BlockNumber block;
+		Relation	reln;
+
+		XLogRecGetBlockTag(record, 0, &rlocator, NULL, &block);
+		reln = CreateFakeRelcacheEntry(rlocator);
+
+		visibilitymap_pin(reln, block, &vmbuffer);
+		visibilitymap_clear(reln, block, vmbuffer, VISIBILITYMAP_ALL_FROZEN);
+
+		ReleaseBuffer(vmbuffer);
+		FreeFakeRelcacheEntry(reln);
+	}
+
+	if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
+	{
+		page = BufferGetPage(buffer);
+
+		offnum = xlrec->offnum;
+		if (PageGetMaxOffsetNumber(page) >= offnum)
+			lp = PageGetItemId(page, offnum);
+
+		if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp))
+			elog(PANIC, "invalid lp");
+
+		htup = (HeapTupleHeader) PageGetItem(page, lp);
+
+		htup->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED);
+		htup->t_infomask2 &= ~HEAP_KEYS_UPDATED;
+		fix_infomask_from_infobits(xlrec->infobits_set, &htup->t_infomask,
+								   &htup->t_infomask2);
+		HeapTupleHeaderSetXmax(htup, xlrec->xmax);
+
+		PageSetLSN(page, lsn);
+		MarkBufferDirty(buffer);
+	}
+	if (BufferIsValid(buffer))
+		UnlockReleaseBuffer(buffer);
+}
+
+static void
+heap_xlog_inplace(XLogReaderState *record)
+{
+	XLogRecPtr	lsn = record->EndRecPtr;
+	xl_heap_inplace *xlrec = (xl_heap_inplace *) XLogRecGetData(record);
+	Buffer		buffer;
+	Page		page;
+	OffsetNumber offnum;
+	ItemId		lp = NULL;
+	HeapTupleHeader htup;
+	uint32		oldlen;
+	Size		newlen;
+
+	if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
+	{
+		char	   *newtup = XLogRecGetBlockData(record, 0, &newlen);
+
+		page = BufferGetPage(buffer);
+
+		offnum = xlrec->offnum;
+		if (PageGetMaxOffsetNumber(page) >= offnum)
+			lp = PageGetItemId(page, offnum);
+
+		if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp))
+			elog(PANIC, "invalid lp");
+
+		htup = (HeapTupleHeader) PageGetItem(page, lp);
+
+		oldlen = ItemIdGetLength(lp) - htup->t_hoff;
+		if (oldlen != newlen)
+			elog(PANIC, "wrong tuple length");
+
+		memcpy((char *) htup + htup->t_hoff, newtup, newlen);
+
+		PageSetLSN(page, lsn);
+		MarkBufferDirty(buffer);
+	}
+	if (BufferIsValid(buffer))
+		UnlockReleaseBuffer(buffer);
+}
+
+void
+heap_redo(XLogReaderState *record)
+{
+	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+	/*
+	 * These operations don't overwrite MVCC data so no conflict processing is
+	 * required. The ones in heap2 rmgr do.
+	 */
+
+	switch (info & XLOG_HEAP_OPMASK)
+	{
+		case XLOG_HEAP_INSERT:
+			heap_xlog_insert(record);
+			break;
+		case XLOG_HEAP_DELETE:
+			heap_xlog_delete(record);
+			break;
+		case XLOG_HEAP_UPDATE:
+			heap_xlog_update(record, false);
+			break;
+		case XLOG_HEAP_TRUNCATE:
+
+			/*
+			 * TRUNCATE is a no-op because the actions are already logged as
+			 * SMGR WAL records.  TRUNCATE WAL record only exists for logical
+			 * decoding.
+			 */
+			break;
+		case XLOG_HEAP_HOT_UPDATE:
+			heap_xlog_update(record, true);
+			break;
+		case XLOG_HEAP_CONFIRM:
+			heap_xlog_confirm(record);
+			break;
+		case XLOG_HEAP_LOCK:
+			heap_xlog_lock(record);
+			break;
+		case XLOG_HEAP_INPLACE:
+			heap_xlog_inplace(record);
+			break;
+		default:
+			elog(PANIC, "heap_redo: unknown op code %u", info);
+	}
+}
+
+void
+heap2_redo(XLogReaderState *record)
+{
+	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+	switch (info & XLOG_HEAP_OPMASK)
+	{
+		case XLOG_HEAP2_PRUNE_ON_ACCESS:
+		case XLOG_HEAP2_PRUNE_VACUUM_SCAN:
+		case XLOG_HEAP2_PRUNE_VACUUM_CLEANUP:
+			heap_xlog_prune_freeze(record);
+			break;
+		case XLOG_HEAP2_VISIBLE:
+			heap_xlog_visible(record);
+			break;
+		case XLOG_HEAP2_MULTI_INSERT:
+			heap_xlog_multi_insert(record);
+			break;
+		case XLOG_HEAP2_LOCK_UPDATED:
+			heap_xlog_lock_updated(record);
+			break;
+		case XLOG_HEAP2_NEW_CID:
+
+			/*
+			 * Nothing to do on a real replay, only used during logical
+			 * decoding.
+			 */
+			break;
+		case XLOG_HEAP2_REWRITE:
+			heap_xlog_logical_rewrite(record);
+			break;
+		default:
+			elog(PANIC, "heap2_redo: unknown op code %u", info);
+	}
+}
+
+/*
+ * Mask a heap page before performing consistency checks on it.
+ */
+void
+heap_mask(char *pagedata, BlockNumber blkno)
+{
+	Page		page = (Page) pagedata;
+	OffsetNumber off;
+
+	mask_page_lsn_and_checksum(page);
+
+	mask_page_hint_bits(page);
+	mask_unused_space(page);
+
+	for (off = 1; off <= PageGetMaxOffsetNumber(page); off++)
+	{
+		ItemId		iid = PageGetItemId(page, off);
+		char	   *page_item;
+
+		page_item = (char *) (page + ItemIdGetOffset(iid));
+
+		if (ItemIdIsNormal(iid))
+		{
+			HeapTupleHeader page_htup = (HeapTupleHeader) page_item;
+
+			/*
+			 * If xmin of a tuple is not yet frozen, we should ignore
+			 * differences in hint bits, since they can be set without
+			 * emitting WAL.
+			 */
+			if (!HeapTupleHeaderXminFrozen(page_htup))
+				page_htup->t_infomask &= ~HEAP_XACT_MASK;
+			else
+			{
+				/* Still we need to mask xmax hint bits. */
+				page_htup->t_infomask &= ~HEAP_XMAX_INVALID;
+				page_htup->t_infomask &= ~HEAP_XMAX_COMMITTED;
+			}
+
+			/*
+			 * During replay, we set Command Id to FirstCommandId. Hence, mask
+			 * it. See heap_xlog_insert() for details.
+			 */
+			page_htup->t_choice.t_heap.t_field3.t_cid = MASK_MARKER;
+
+			/*
+			 * For a speculative tuple, heap_insert() does not set ctid in the
+			 * caller-passed heap tuple itself, leaving the ctid field to
+			 * contain a speculative token value - a per-backend monotonically
+			 * increasing identifier. Besides, it does not WAL-log ctid under
+			 * any circumstances.
+			 *
+			 * During redo, heap_xlog_insert() sets t_ctid to current block
+			 * number and self offset number. It doesn't care about any
+			 * speculative insertions on the primary. Hence, we set t_ctid to
+			 * current block number and self offset number to ignore any
+			 * inconsistency.
+			 */
+			if (HeapTupleHeaderIsSpeculative(page_htup))
+				ItemPointerSet(&page_htup->t_ctid, blkno, off);
+
+			/*
+			 * NB: Not ignoring ctid changes due to the tuple having moved
+			 * (i.e. HeapTupleHeaderIndicatesMovedPartitions), because that's
+			 * important information that needs to be in-sync between primary
+			 * and standby, and thus is WAL logged.
+			 */
+		}
+
+		/*
+		 * Ignore any padding bytes after the tuple, when the length of the
+		 * item is not MAXALIGNed.
+		 */
+		if (ItemIdHasStorage(iid))
+		{
+			int			len = ItemIdGetLength(iid);
+			int			padlen = MAXALIGN(len) - len;
+
+			if (padlen > 0)
+				memset(page_item + len, MASK_MARKER, padlen);
+		}
+	}
+}
diff --git a/src/backend/access/heap/meson.build b/src/backend/access/heap/meson.build
index e00d5b4f0d..19a990208e 100644
--- a/src/backend/access/heap/meson.build
+++ b/src/backend/access/heap/meson.build
@@ -4,6 +4,7 @@ backend_sources += files(
   'heapam.c',
   'heapam_handler.c',
   'heapam_visibility.c',
+  'heapam_xlog.c',
   'heaptoast.c',
   'hio.c',
   'pruneheap.c',
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 9e9aec88a6..b92eb506ec 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -14,6 +14,7 @@
 #ifndef HEAPAM_H
 #define HEAPAM_H
 
+#include "access/heapam_xlog.h"
 #include "access/relation.h"	/* for backward compatibility */
 #include "access/relscan.h"
 #include "access/sdir.h"
@@ -422,4 +423,28 @@ extern bool ResolveCminCmaxDuringDecoding(struct HTAB *tuplecid_data,
 extern void HeapCheckForSerializableConflictOut(bool visible, Relation relation, HeapTuple tuple,
 												Buffer buffer, Snapshot snapshot);
 
+/*
+ * heap_execute_freeze_tuple
+ *		Execute the prepared freezing of a tuple with caller's freeze plan.
+ *
+ * Caller is responsible for ensuring that no other backend can access the
+ * storage underlying this tuple, either by holding an exclusive lock on the
+ * buffer containing it (which is what lazy VACUUM does), or by having it be
+ * in private storage (which is what CLUSTER and friends do).
+ */
+static inline void
+heap_execute_freeze_tuple(HeapTupleHeader tuple, HeapTupleFreeze *frz)
+{
+	HeapTupleHeaderSetXmax(tuple, frz->xmax);
+
+	if (frz->frzflags & XLH_FREEZE_XVAC)
+		HeapTupleHeaderSetXvac(tuple, FrozenTransactionId);
+
+	if (frz->frzflags & XLH_INVALID_XVAC)
+		HeapTupleHeaderSetXvac(tuple, InvalidTransactionId);
+
+	tuple->t_infomask = frz->t_infomask;
+	tuple->t_infomask2 = frz->t_infomask2;
+}
+
 #endif							/* HEAPAM_H */
