From 8a153e0b992db14aa602958f2d80794773204178 Mon Sep 17 00:00:00 2001
From: Dilip Kumar <dilipkumar@localhost.localdomain>
Date: Tue, 30 Apr 2019 19:06:02 +0530
Subject: [PATCH] undo page consistency checker

Patch provide a mechanism for masking the cid bit in undo pages so that
consistecy checker function can compared the undo pages.  Actual consistency
check should be called under the RM's consistency checker function who is
writing the undo because undo pages will be registered under that RM's WAL

Dilip Kumar based on initial version from Amit Khandekar and Rafia Sabih and design input from Amit Kapila
---
 src/backend/access/undo/undoinsert.c |   4 +-
 src/backend/access/undo/undorecord.c | 135 ++++++++++++++++++++++++++++++++---
 src/backend/storage/page/bufpage.c   |  33 +++++++++
 src/include/access/undoinsert.h      |   2 +
 src/include/access/undolog.h         |   2 +-
 src/include/access/undorecord.h      |   2 +-
 src/include/storage/bufpage.h        |  34 +++++++++
 7 files changed, 201 insertions(+), 11 deletions(-)

diff --git a/src/backend/access/undo/undoinsert.c b/src/backend/access/undo/undoinsert.c
index 1f3762f..71a3210 100644
--- a/src/backend/access/undo/undoinsert.c
+++ b/src/backend/access/undo/undoinsert.c
@@ -796,7 +796,9 @@ InsertPreparedUndo(void)
 				 * block header.
 				 */
 				if (starting_byte == UndoLogBlockHeaderSize)
-					PageInit(page, BLCKSZ, 0);
+					UndoPageInit(page, BLCKSZ, uur->uur_info,
+								 ucontext.already_processed,
+								 uur->uur_tuple.len, uur->uur_payload.len);
 
 				/*
 				 * Try to insert the record into the current page. If it
diff --git a/src/backend/access/undo/undorecord.c b/src/backend/access/undo/undorecord.c
index 7d7e088..fb0b691 100644
--- a/src/backend/access/undo/undorecord.c
+++ b/src/backend/access/undo/undorecord.c
@@ -12,6 +12,7 @@
 
 #include "postgres.h"
 
+#include "access/bufmask.h"
 #include "access/subtrans.h"
 #include "access/undorecord.h"
 #include "catalog/pg_tablespace.h"
@@ -26,31 +27,70 @@ static bool ReadUndoBytes(char *destptr, int readlen,
 			  int *total_bytes_read, int *partial_read);
 
 /*
- * Compute and return the expected size of an undo record.
+ * Compute the header size of the undo record.
  */
-Size
-UndoRecordExpectedSize(UnpackedUndoRecord *uur)
+static inline Size
+UndoRecordHeaderSize(uint8 uur_info)
 {
 	Size		size;
 
 	size = SizeOfUndoRecordHeader + sizeof(uint16);
-	if ((uur->uur_info & UREC_INFO_RELATION_DETAILS) != 0)
+	if ((uur_info & UREC_INFO_RELATION_DETAILS) != 0)
 		size += SizeOfUndoRecordRelationDetails;
-	if ((uur->uur_info & UREC_INFO_BLOCK) != 0)
+	if ((uur_info & UREC_INFO_BLOCK) != 0)
 		size += SizeOfUndoRecordBlock;
-	if ((uur->uur_info & UREC_INFO_BLOCKPREV) != 0)
+	if ((uur_info & UREC_INFO_BLOCKPREV) != 0)
 		size += SizeOfUndoRecordBlockPrev;
-	if ((uur->uur_info & UREC_INFO_TRANSACTION) != 0)
+	if ((uur_info & UREC_INFO_TRANSACTION) != 0)
 		size += SizeOfUndoRecordTransaction;
+	if ((uur_info & UREC_INFO_PAYLOAD) != 0)
+		size += SizeOfUndoRecordPayload;
+
+	return size;
+}
+/*
+ * Compute and return the expected size of an undo record.
+ */
+Size
+UndoRecordExpectedSize(UnpackedUndoRecord *uur)
+{
+	Size		size;
+
+	/* Header size. */
+	size = UndoRecordHeaderSize(uur->uur_info);
+
+	/* Payload data size. */
 	if ((uur->uur_info & UREC_INFO_PAYLOAD) != 0)
 	{
-		size += SizeOfUndoRecordPayload;
 		size += uur->uur_payload.len;
 		size += uur->uur_tuple.len;
 	}
 
 	return size;
 }
+/*
+ * Calculate the size of the undo record stored on the page.
+ */
+static inline Size
+UndoRecordSizeOnPage(char *page_ptr)
+{
+	uint8		uur_info = ((UndoRecordHeader *) page_ptr)->urec_info;
+	Size		size;
+
+	/* Header size. */
+	size = UndoRecordHeaderSize(uur_info);
+
+	/* Payload data size. */
+	if ((uur_info & UREC_INFO_PAYLOAD) != 0)
+	{
+		UndoRecordPayload  *payload = (UndoRecordPayload *) page_ptr + size;
+
+		size += payload->urec_payload_len;
+		size +=	payload->urec_tuple_len;
+	}
+
+	return size;
+}
 
 /*
  * Compute size of the Unpacked undo record in memory
@@ -72,6 +112,85 @@ UnpackedUndoRecordSize(UnpackedUndoRecord *uur)
 	return size;
 }
 
+
+/*
+ * Mask a undo page before performing consistency checks on it.
+ */
+void
+mask_undo_page(char *pagedata)
+{
+	Page		page = (Page) pagedata;
+	char	   *page_end = pagedata + PageGetPageSize(page);
+	char	   *next_record;
+	int			cid_offset = SizeOfUndoRecordHeader - sizeof(CommandId);
+	UndoPageHeader	phdr = (UndoPageHeader) page;
+
+	next_record = (char *) page + SizeOfUndoPageHeaderData;
+
+	/*
+	 * If record_offset is non-zero value in the page header that means page has
+	 * a partial record.
+	 */
+	if (phdr->record_offset != 0)
+	{
+		Size	partial_rec_size;
+
+		/* Calculate the size of the partial record. */
+		partial_rec_size = UndoRecordHeaderSize(phdr->uur_info) +
+						   phdr->tuple_len + phdr->payload_len -
+						   phdr->record_offset;
+
+		/*
+		 * We just want to mask the cid in the undo record header.  So only if
+		 * the partial record in the current page include the undo record header
+		 * then we need to mask the cid bytes in this page.  Otherwise, directly
+		 * jump to the next record.
+		 */
+		if (phdr->record_offset < SizeOfUndoRecordHeader)
+		{
+			char   *cid_data;
+			Size	mask_size;
+
+			mask_size = Min(SizeOfUndoRecordHeader -
+							phdr->record_offset, sizeof(CommandId));
+
+			cid_data = next_record + cid_offset - phdr->record_offset;
+			memset(&cid_data, MASK_MARKER, mask_size);
+		}
+
+		next_record += partial_rec_size;
+	}
+
+	/*
+	 * Process the undo record of the page and mask their cid filed.
+	 */
+	while (next_record < page_end)
+	{
+		UndoRecordHeader *header = (UndoRecordHeader *) next_record;
+
+		/*
+		 * If this is not complete record then check whether cid is on
+		 * this page or not.  If not then we are done with this page.
+		 */
+		if (page_end - next_record < SizeOfUndoRecordHeader)
+		{
+			int		mask_size = page_end - next_record - cid_offset;
+
+			if (mask_size > 0)
+				memset(&header->urec_cid, MASK_MARKER, mask_size);
+			break;
+		}
+		else
+		{
+			/* Mask cid */
+			memset(&header->urec_cid, MASK_MARKER, sizeof(header->urec_cid));
+		}
+
+		/* Go to next record. */
+		next_record += UndoRecordSizeOnPage(next_record);
+	}
+}
+
 /*
  * BeginUnpackUndo - Initiate unpacking a single one record.
  */
diff --git a/src/backend/storage/page/bufpage.c b/src/backend/storage/page/bufpage.c
index 14bc61b..8519fd7 100644
--- a/src/backend/storage/page/bufpage.c
+++ b/src/backend/storage/page/bufpage.c
@@ -59,6 +59,39 @@ PageInit(Page page, Size pageSize, Size specialSize)
 	/* p->pd_prune_xid = InvalidTransactionId;		done by above MemSet */
 }
 
+/*
+ * UndoPageInit
+ *		Initializes the contents of an undo page.
+ *		Note that we don't calculate an initial checksum here; that's not done
+ *		until it's time to write.
+ */
+void
+UndoPageInit(Page page, Size pageSize, uint8 uur_info, uint16 record_offset,
+			 uint16 tuple_len, uint16 payload_len)
+{
+	UndoPageHeader	p = (UndoPageHeader) page;
+
+	Assert(pageSize == BLCKSZ);
+
+	/* Make sure all fields of page are zero, as well as unused space. */
+	MemSet(p, 0, pageSize);
+
+	p->pd_flags = 0;
+	/*
+	 * TODO: We can update the value of the p->pd_lower whenever we insert
+	 * a record into an undo page.  By doing this we can avoid processing
+	 * complete undo page if there are no more records.
+	 */
+	p->pd_lower = SizeOfUndoPageHeaderData;
+	p->pd_upper = pageSize;
+	p->pd_special = pageSize;
+	p->uur_info = uur_info;
+	p->record_offset = record_offset;
+	p->tuple_len = tuple_len;
+	p->payload_len = payload_len;
+	PageSetPageSizeAndVersion(page, pageSize, PG_PAGE_LAYOUT_VERSION);
+}
+
 
 /*
  * PageIsVerified
diff --git a/src/include/access/undoinsert.h b/src/include/access/undoinsert.h
index d50085e..daec742 100644
--- a/src/include/access/undoinsert.h
+++ b/src/include/access/undoinsert.h
@@ -44,6 +44,7 @@ extern void InsertPreparedUndo(void);
 extern void RegisterUndoLogBuffers(uint8 first_block_id);
 extern void UndoLogBuffersSetLSN(XLogRecPtr recptr);
 extern void UnlockReleaseUndoBuffers(void);
+extern void mask_undo_page(char *pagedata);
 
 extern UnpackedUndoRecord *UndoFetchRecord(UndoRecPtr urp,
 				BlockNumber blkno, OffsetNumber offset,
@@ -60,5 +61,6 @@ extern UndoRecPtr UndoGetPrevUndoRecptr(UndoRecPtr urp, UndoRecPtr prevurp,
 					  Buffer buffer,
 					  UndoPersistence upersistence);
 extern void AtAbort_ResetUndoBuffers(void);
+extern void mask_undo_page(char *pagedata);
 
 #endif							/* UNDOINSERT_H */
diff --git a/src/include/access/undolog.h b/src/include/access/undolog.h
index 2cfce8c..abbaf47 100644
--- a/src/include/access/undolog.h
+++ b/src/include/access/undolog.h
@@ -128,7 +128,7 @@ typedef int UndoLogNumber;
 	(((uint64) (logno) << UndoLogOffsetBits) | (offset))
 
 /* The number of unusable bytes in the header of each block. */
-#define UndoLogBlockHeaderSize SizeOfPageHeaderData
+#define UndoLogBlockHeaderSize SizeOfUndoPageHeaderData
 
 /* The number of usable bytes we can store per block. */
 #define UndoLogUsableBytesPerPage (BLCKSZ - UndoLogBlockHeaderSize)
diff --git a/src/include/access/undorecord.h b/src/include/access/undorecord.h
index c607ba9..adbe3bb 100644
--- a/src/include/access/undorecord.h
+++ b/src/include/access/undorecord.h
@@ -20,7 +20,6 @@
 #include "storage/buf.h"
 #include "storage/off.h"
 
-
 /*
  * Every undo record begins with an UndoRecordHeader structure, which is
  * followed by the additional structures indicated by the contents of
@@ -251,5 +250,6 @@ extern void InsertUndoData(UndoPackContext *ucontext, Page page,
 			   int starting_byte);
 extern void SkipInsertingUndoData(UndoPackContext *ucontext,
 					  int starting_byte);
+extern void mask_undo_page(char *pagedata);
 
 #endif							/* UNDORECORD_H */
diff --git a/src/include/storage/bufpage.h b/src/include/storage/bufpage.h
index 85cd0ab..5be7d1e 100644
--- a/src/include/storage/bufpage.h
+++ b/src/include/storage/bufpage.h
@@ -212,6 +212,37 @@ typedef PageHeaderData *PageHeader;
 #define SizeOfPageHeaderData (offsetof(PageHeaderData, pd_linp))
 
 /*
+ * FIXME:  It should be declared in undolog.h ?
+ *
+ * Same as PageHeaderData + some additional information to detect partial
+ * undo record on a undo page.
+ */
+typedef struct UndoPageHeaderData
+{
+	/* XXX LSN is member of *any* block, not only page-organized ones */
+	PageXLogRecPtr pd_lsn;		/* LSN: next byte after last byte of xlog
+								 * record for last change to this page */
+	uint16		pd_checksum;	/* checksum */
+	uint16		pd_flags;		/* flag bits, see below */
+	LocationIndex pd_lower;		/* offset to start of free space */
+	LocationIndex pd_upper;		/* offset to end of free space */
+	LocationIndex pd_special;	/* offset to start of special space */
+	uint16		pd_pagesize_version;
+	/* Fields required for undolog consistency checker */
+	uint8		uur_info;		/* uur_info field of the partial record. */
+	uint16		record_offset;	/* offset of the partial undo record. */
+	uint16		tuple_len;		/* Length of the tuple data in the partial
+								 * record. */
+	uint16		payload_len;	/* Length of the payload data in the partial
+								 * record. */
+} UndoPageHeaderData;
+
+typedef UndoPageHeaderData *UndoPageHeader;
+
+#define SizeOfUndoPageHeaderData (offsetof(UndoPageHeaderData, payload_len) + \
+								  sizeof(uint16))
+
+/*
  * PageIsEmpty
  *		returns true iff no itemid has been allocated on the page
  */
@@ -415,6 +446,9 @@ do { \
 						((is_heap) ? PAI_IS_HEAP : 0))
 
 extern void PageInit(Page page, Size pageSize, Size specialSize);
+extern void UndoPageInit(Page page, Size pageSize, uint8 uur_info,
+						 uint16 record_offset, uint16 tuple_len,
+						 uint16 payload_len);
 extern bool PageIsVerified(Page page, BlockNumber blkno);
 extern OffsetNumber PageAddItemExtended(Page page, Item item, Size size,
 					OffsetNumber offsetNumber, int flags);
-- 
1.8.3.1

