From be0d3e08151de723a4a104bd172d8e1d772303e5 Mon Sep 17 00:00:00 2001
From: Matthias van de Meent <boekewurm+postgres@gmail.com>
Date: Thu, 8 Sep 2022 16:24:52 +0200
Subject: [PATCH v1] Add cid to heap xlog records that insert/update/delete
 tuples

This allows physical replicas to participate in remote transactions and
statements, and physical replicas to serve pages to the primary without
loss of transaction guarantees.

Authors: Anastasia Lubennikova, Matthias van de Meent
---
 src/backend/access/heap/heapam.c | 21 +++++++++++++++------
 src/include/access/heapam_xlog.h |  5 +++++
 2 files changed, 20 insertions(+), 6 deletions(-)

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index aab8d6fa4e..9ddb262808 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -2454,6 +2454,7 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples,
 				xlrec->flags = XLH_INSERT_ALL_FROZEN_SET;
 
 			xlrec->ntuples = nthispage;
+			xlrec->cid = cid;
 
 			/*
 			 * Write out an xl_multi_insert_tuple and the tuple data itself
@@ -2977,6 +2978,7 @@ l1:
 											  tp.t_data->t_infomask2);
 		xlrec.offnum = ItemPointerGetOffsetNumber(&tp.t_self);
 		xlrec.xmax = new_xmax;
+		xlrec.cid = HeapTupleHeaderGetRawCommandId(tp.t_data);
 
 		if (old_key_tuple != NULL)
 		{
@@ -2999,6 +3001,7 @@ l1:
 			xlhdr.t_infomask2 = old_key_tuple->t_data->t_infomask2;
 			xlhdr.t_infomask = old_key_tuple->t_data->t_infomask;
 			xlhdr.t_hoff = old_key_tuple->t_data->t_hoff;
+			xlhdr.t_cid = HeapTupleHeaderGetRawCommandId(old_key_tuple->t_data);
 
 			XLogRegisterData((char *) &xlhdr, SizeOfHeapHeader);
 			XLogRegisterData((char *) old_key_tuple->t_data
@@ -3705,6 +3708,7 @@ l2:
 												  oldtup.t_data->t_infomask2);
 			xlrec.flags =
 				cleared_all_frozen ? XLH_LOCK_ALL_FROZEN_CLEARED : 0;
+			xlrec.cid = HeapTupleHeaderGetRawCommandId(oldtup.t_data);
 			XLogRegisterData((char *) &xlrec, SizeOfHeapLock);
 			recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_LOCK);
 			PageSetLSN(page, recptr);
@@ -4892,6 +4896,7 @@ failed:
 		xlrec.infobits_set = compute_infobits(new_infomask,
 											  tuple->t_data->t_infomask2);
 		xlrec.flags = cleared_all_frozen ? XLH_LOCK_ALL_FROZEN_CLEARED : 0;
+		xlrec.cid = HeapTupleHeaderGetRawCommandId(tuple->t_data);
 		XLogRegisterData((char *) &xlrec, SizeOfHeapLock);
 
 		/* we don't decode row locks atm, so no need to log the origin */
@@ -5941,6 +5946,7 @@ heap_abort_speculative(Relation relation, ItemPointer tid)
 											  tp.t_data->t_infomask2);
 		xlrec.offnum = ItemPointerGetOffsetNumber(&tp.t_self);
 		xlrec.xmax = xid;
+		xlrec.cid = HeapTupleHeaderGetRawCommandId(tp.t_data);
 
 		XLogBeginInsert();
 		XLogRegisterData((char *) &xlrec, SizeOfHeapDelete);
@@ -8342,6 +8348,7 @@ log_heap_update(Relation reln, Buffer oldbuf,
 	/* Prepare WAL data for the new page */
 	xlrec.new_offnum = ItemPointerGetOffsetNumber(&newtup->t_self);
 	xlrec.new_xmax = HeapTupleHeaderGetRawXmax(newtup->t_data);
+	xlrec.cid = HeapTupleHeaderGetRawCommandId(newtup->t_data);
 
 	bufflags = REGBUF_STANDARD;
 	if (init)
@@ -8379,6 +8386,7 @@ log_heap_update(Relation reln, Buffer oldbuf,
 	xlhdr.t_infomask2 = newtup->t_data->t_infomask2;
 	xlhdr.t_infomask = newtup->t_data->t_infomask;
 	xlhdr.t_hoff = newtup->t_data->t_hoff;
+	xlhdr.t_cid = HeapTupleHeaderGetRawCommandId(newtup->t_data);
 	Assert(SizeofHeapTupleHeader + prefixlen + suffixlen <= newtup->t_len);
 
 	/*
@@ -8420,6 +8428,7 @@ log_heap_update(Relation reln, Buffer oldbuf,
 		xlhdr_idx.t_infomask2 = old_key_tuple->t_data->t_infomask2;
 		xlhdr_idx.t_infomask = old_key_tuple->t_data->t_infomask;
 		xlhdr_idx.t_hoff = old_key_tuple->t_data->t_hoff;
+		xlhdr_idx.t_cid = HeapTupleHeaderGetRawCommandId(old_key_tuple->t_data);
 
 		XLogRegisterData((char *) &xlhdr_idx, SizeOfHeapHeader);
 
@@ -9050,7 +9059,7 @@ heap_xlog_delete(XLogReaderState *record)
 			HeapTupleHeaderSetXmax(htup, xlrec->xmax);
 		else
 			HeapTupleHeaderSetXmin(htup, InvalidTransactionId);
-		HeapTupleHeaderSetCmax(htup, FirstCommandId, false);
+		HeapTupleHeaderSetCmax(htup, xlrec->cid, false);
 
 		/* Mark the page as a candidate for pruning */
 		PageSetPrunable(page, XLogRecGetXid(record));
@@ -9151,7 +9160,7 @@ heap_xlog_insert(XLogReaderState *record)
 		htup->t_infomask = xlhdr.t_infomask;
 		htup->t_hoff = xlhdr.t_hoff;
 		HeapTupleHeaderSetXmin(htup, XLogRecGetXid(record));
-		HeapTupleHeaderSetCmin(htup, FirstCommandId);
+		HeapTupleHeaderSetCmin(htup, xlhdr.t_cid);
 		htup->t_ctid = target_tid;
 
 		if (PageAddItem(page, (Item) htup, newlen, xlrec->offnum,
@@ -9294,7 +9303,7 @@ heap_xlog_multi_insert(XLogReaderState *record)
 			htup->t_infomask = xlhdr->t_infomask;
 			htup->t_hoff = xlhdr->t_hoff;
 			HeapTupleHeaderSetXmin(htup, XLogRecGetXid(record));
-			HeapTupleHeaderSetCmin(htup, FirstCommandId);
+			HeapTupleHeaderSetCmin(htup, xlrec->cid);
 			ItemPointerSetBlockNumber(&htup->t_ctid, blkno);
 			ItemPointerSetOffsetNumber(&htup->t_ctid, offnum);
 
@@ -9434,7 +9443,7 @@ heap_xlog_update(XLogReaderState *record, bool hot_update)
 		fix_infomask_from_infobits(xlrec->old_infobits_set, &htup->t_infomask,
 								   &htup->t_infomask2);
 		HeapTupleHeaderSetXmax(htup, xlrec->old_xmax);
-		HeapTupleHeaderSetCmax(htup, FirstCommandId, false);
+		HeapTupleHeaderSetCmax(htup, xlrec->cid, false);
 		/* Set forward chain link in t_ctid */
 		htup->t_ctid = newtid;
 
@@ -9567,7 +9576,7 @@ heap_xlog_update(XLogReaderState *record, bool hot_update)
 		htup->t_hoff = xlhdr.t_hoff;
 
 		HeapTupleHeaderSetXmin(htup, XLogRecGetXid(record));
-		HeapTupleHeaderSetCmin(htup, FirstCommandId);
+		HeapTupleHeaderSetCmin(htup, xlhdr.t_cid);
 		HeapTupleHeaderSetXmax(htup, xlrec->new_xmax);
 		/* Make sure there is no forward chain link in t_ctid */
 		htup->t_ctid = newtid;
@@ -9708,7 +9717,7 @@ heap_xlog_lock(XLogReaderState *record)
 						   offnum);
 		}
 		HeapTupleHeaderSetXmax(htup, xlrec->locking_xid);
-		HeapTupleHeaderSetCmax(htup, FirstCommandId, false);
+		HeapTupleHeaderSetCmax(htup, xlrec->cid, false);
 		PageSetLSN(page, lsn);
 		MarkBufferDirty(buffer);
 	}
diff --git a/src/include/access/heapam_xlog.h b/src/include/access/heapam_xlog.h
index 1705e736be..ab42e0c83f 100644
--- a/src/include/access/heapam_xlog.h
+++ b/src/include/access/heapam_xlog.h
@@ -107,6 +107,7 @@
 typedef struct xl_heap_delete
 {
 	TransactionId xmax;			/* xmax of the deleted tuple */
+	CommandId	cid;			/* CommandId of the deletion operation */
 	OffsetNumber offnum;		/* deleted tuple's offset */
 	uint8		infobits_set;	/* infomask bits */
 	uint8		flags;
@@ -145,6 +146,7 @@ typedef struct xl_heap_header
 {
 	uint16		t_infomask2;
 	uint16		t_infomask;
+	CommandId	t_cid;
 	uint8		t_hoff;
 } xl_heap_header;
 
@@ -176,6 +178,7 @@ typedef struct xl_heap_multi_insert
 {
 	uint8		flags;
 	uint16		ntuples;
+	CommandId	cid;
 	OffsetNumber offsets[FLEXIBLE_ARRAY_MEMBER];
 } xl_heap_multi_insert;
 
@@ -216,6 +219,7 @@ typedef struct xl_heap_update
 	uint8		old_infobits_set;	/* infomask bits to set on old tuple */
 	uint8		flags;
 	TransactionId new_xmax;		/* xmax of the new tuple */
+	CommandId	cid;			/* CommandId of the update operation */
 	OffsetNumber new_offnum;	/* new tuple's offset */
 
 	/*
@@ -278,6 +282,7 @@ typedef struct xl_heap_vacuum
 typedef struct xl_heap_lock
 {
 	TransactionId locking_xid;	/* might be a MultiXactId not xid */
+	CommandId	cid;			/* CommandId of the locking command */
 	OffsetNumber offnum;		/* locked tuple's offset on page */
 	int8		infobits_set;	/* infomask and infomask2 bits to set */
 	uint8		flags;			/* XLH_LOCK_* flag bits */
-- 
2.30.2

