On Thu, Nov 10, 2022 at 3:38 AM Andres Freund <and...@anarazel.de> wrote:

>
> > +             }
> > +
> > +             /*
> > +              * Loop over offset and populate predecessor array from
> all entries
> > +              * that are present in successor array.
> > +              */
> > +             ctx.attnum = -1;
> > +             for (ctx.offnum = FirstOffsetNumber; ctx.offnum <= maxoff;
> > +                      ctx.offnum = OffsetNumberNext(ctx.offnum))
> > +             {
> > +                     ItemId          curr_lp;
> > +                     ItemId          next_lp;
> > +                     HeapTupleHeader curr_htup;
> > +                     HeapTupleHeader next_htup;
> > +                     TransactionId curr_xmax;
> > +                     TransactionId next_xmin;
> > +
> > +                     OffsetNumber nextoffnum = successor[ctx.offnum];
> > +
> > +                     curr_lp = PageGetItemId(ctx.page, ctx.offnum);
>
> Why do we get the item when nextoffnum is 0?
>

Fixed by moving  PageGetItemId() call after the 'if' check.


> > +                     if (nextoffnum == 0 || !lp_valid[ctx.offnum] ||
> !lp_valid[nextoffnum])
> > +                     {
> > +                             /*
> > +                              * This is either the last updated tuple
> in the chain or a
> > +                              * corruption raised for this tuple.
> > +                              */
>
> "or a corruption raised" isn't quite right grammatically.
>

done.

>
> > +                             continue;
> > +                     }
> > +                     if (ItemIdIsRedirected(curr_lp))
> > +                     {
> > +                             next_lp = PageGetItemId(ctx.page,
> nextoffnum);
> > +                             if (ItemIdIsRedirected(next_lp))
> > +                             {
> > +                                     report_corruption(&ctx,
> > +
>  psprintf("redirected line pointer pointing to another redirected line
> pointer at offset %u",
> > +
>                 (unsigned) nextoffnum));
> > +                                     continue;
> > +                             }
> > +                             next_htup = (HeapTupleHeader) PageGetItem(
> ctx.page, next_lp);
> > +                             if (!HeapTupleHeaderIsHeapOnly(next_htup))
> > +                             {
> > +                                     report_corruption(&ctx,
> > +
>  psprintf("redirected tuple at line pointer offset %u is not heap only
> tuple",
> > +
>                 (unsigned) nextoffnum));
> > +                             }
> > +                             if ((next_htup->t_infomask & HEAP_UPDATED)
> == 0)
> > +                             {
> > +                                     report_corruption(&ctx,
> > +
>  psprintf("redirected tuple at line pointer offset %u is not heap updated
> tuple",
> > +
>                 (unsigned) nextoffnum));
> > +                             }
> > +                             continue;
> > +                     }
> > +
> > +                     /*
> > +                      * Add a line pointer offset to the predecessor
> array if xmax is
> > +                      * matching with xmin of next tuple (reaching via
> its t_ctid).
> > +                      * Prior to PostgreSQL 9.4, we actually changed
> the xmin to
> > +                      * FrozenTransactionId
>
> I'm doubtful it's a good idea to try to validate the 9.4 case. The
> likelihood
> of getting that right seems low and I don't see us gaining much by even
> trying.
>
>
>
removed code with regards to frozen tuple checks.

> so we must add offset to predecessor
> > +                      * array(irrespective of xmax-xmin matching) if
> updated tuple xmin
> > +                      * is frozen, so that we can later do validation
> related to frozen
> > +                      * xmin. Raise corruption if we have two tuples
> having the same
> > +                      * predecessor.
> > +                      * We add the offset to the predecessor array
> irrespective of the
> > +                      * transaction (t_xmin) status. We will do
> validation related to
> > +                      * the transaction status (and also all other
> validations) when we
> > +                      * loop over the predecessor array.
> > +                      */
> > +                     curr_htup = (HeapTupleHeader) PageGetItem(ctx.page,
> curr_lp);
> > +                     curr_xmax = HeapTupleHeaderGetUpdateXid(curr_htup);
> > +                     next_lp = PageGetItemId(ctx.page, nextoffnum);
> > +                     next_htup = (HeapTupleHeader) PageGetItem(ctx.page,
> next_lp);
> > +                     next_xmin = HeapTupleHeaderGetXmin(next_htup);
> > +                     if (TransactionIdIsValid(curr_xmax) &&
> > +                             (TransactionIdEquals(curr_xmax, next_xmin)
> ||
> > +                              next_xmin == FrozenTransactionId))
> > +                     {
> > +                             if (predecessor[nextoffnum] != 0)
> > +                             {
> > +                                     report_corruption(&ctx,
> > +
>  psprintf("updated version at offset %u is also the updated version of
> tuple at offset %u",
> > +
>                 (unsigned) nextoffnum, (unsigned) predecessor[nextoffnum]));
> > +                                     continue;
>
> I doubt it is correct to enter this path with next_xmin ==
> FrozenTransactionId. This is following a ctid chain that we normally
> wouldn't
> follow, because it doesn't satisfy the t_self->xmax == t_ctid->xmin
> condition.
>
> removed this frozen check.

> +             }
> > +
> > +             /* Loop over offsets and validate the data in the
> predecessor array. */
> > +             for (OffsetNumber currentoffnum = FirstOffsetNumber;
> currentoffnum <= maxoff;
> > +                      currentoffnum = OffsetNumberNext(currentoffnum))
> > +             {
> > +                     HeapTupleHeader pred_htup;
> > +                     HeapTupleHeader curr_htup;
> > +                     TransactionId pred_xmin;
> > +                     TransactionId curr_xmin;
> > +                     ItemId          pred_lp;
> > +                     ItemId          curr_lp;
> > +
> > +                     ctx.offnum = predecessor[currentoffnum];
> > +                     ctx.attnum = -1;
> > +
> > +                     if (ctx.offnum == 0)
> > +                     {
> > +                             /*
> > +                              * Either the root of the chain or an
> xmin-aborted tuple from
> > +                              * an abandoned portion of the HOT chain.
> > +                              */
>
> Hm - couldn't we check that the tuple could conceivably be at the root of a
> chain? I.e. isn't HEAP_HOT_UPDATED? Or alternatively has an aborted xmin?
>
> Done, I have added code to identify cases of missing offset in the
predecessor[] array and added validation that root of the chain must not be
HEAP_ONLY_TUPLE.

>
> > +                             continue;
> > +                     }
> > +
> > +                     curr_lp = PageGetItemId(ctx.page, currentoffnum);
> > +                     curr_htup = (HeapTupleHeader) PageGetItem(ctx.page,
> curr_lp);
> > +                     curr_xmin = HeapTupleHeaderGetXmin(curr_htup);
> > +
> > +                     ctx.itemid = pred_lp = PageGetItemId(ctx.page,
> ctx.offnum);
> > +                     pred_htup = (HeapTupleHeader) PageGetItem(ctx.page,
> pred_lp);
> > +                     pred_xmin = HeapTupleHeaderGetXmin(pred_htup);
> > +
> > +                     /*
> > +                      * If the predecessor's xmin is aborted or in
> progress, the
> > +                      * current tuples xmin should be aborted or in
> progress
> > +                      * respectively. Also both xmin's must be equal.
> > +                      */
> > +                     if (!TransactionIdEquals(pred_xmin, curr_xmin) &&
> > +                             !TransactionIdDidCommit(pred_xmin))
> > +                     {
> > +                             report_corruption(&ctx,
> > +
>  psprintf("tuple with uncommitted xmin %u was updated to produce a tuple at
> offset %u with differing xmin %u",
> > +
>         (unsigned) pred_xmin, (unsigned) currentoffnum, (unsigned)
> curr_xmin));
>
> Is this necessarily true? What about a tuple that was inserted in a
> subtransaction and then updated in another subtransaction of the same
> toplevel
> transaction?
>
>
patch has been updated to handle cases of sub-transaction.


-- 
Regards,
Himanshu Upadhyaya
EnterpriseDB: http://www.enterprisedb.com
From c8ef516cde2578543b23faba3598ed014a0fb95f Mon Sep 17 00:00:00 2001
From: Himanshu Upadhyaya <himanshu.upadhy...@enterprisedb.com>
Date: Wed, 30 Nov 2022 15:43:56 +0530
Subject: [PATCH v7] Implement HOT chain validation in verify_heapam()

Himanshu Upadhyaya, reviewed by Robert Haas, Aleksander Alekseev, Andres Freund
---
 contrib/amcheck/verify_heapam.c           | 276 ++++++++++++++++++++++
 src/bin/pg_amcheck/t/004_verify_heapam.pl | 193 ++++++++++++++-
 2 files changed, 458 insertions(+), 11 deletions(-)

diff --git a/contrib/amcheck/verify_heapam.c b/contrib/amcheck/verify_heapam.c
index b72a5c96d1..9d0c8e832b 100644
--- a/contrib/amcheck/verify_heapam.c
+++ b/contrib/amcheck/verify_heapam.c
@@ -399,6 +399,9 @@ verify_heapam(PG_FUNCTION_ARGS)
 	for (ctx.blkno = first_block; ctx.blkno <= last_block; ctx.blkno++)
 	{
 		OffsetNumber maxoff;
+		OffsetNumber predecessor[MaxOffsetNumber] = {0};
+		OffsetNumber successor[MaxOffsetNumber] = {0};
+		bool		lp_valid[MaxOffsetNumber] = {false};
 
 		CHECK_FOR_INTERRUPTS();
 
@@ -433,6 +436,8 @@ verify_heapam(PG_FUNCTION_ARGS)
 		for (ctx.offnum = FirstOffsetNumber; ctx.offnum <= maxoff;
 			 ctx.offnum = OffsetNumberNext(ctx.offnum))
 		{
+			OffsetNumber nextoffnum;
+
 			ctx.itemid = PageGetItemId(ctx.page, ctx.offnum);
 
 			/* Skip over unused/dead line pointers */
@@ -469,6 +474,13 @@ verify_heapam(PG_FUNCTION_ARGS)
 					report_corruption(&ctx,
 									  psprintf("line pointer redirection to unused item at offset %u",
 											   (unsigned) rdoffnum));
+
+				/*
+				 * make entry in successor array, redirected tuple will be
+				 * validated at the time when we loop over successor array
+				 */
+				successor[ctx.offnum] = rdoffnum;
+				lp_valid[ctx.offnum] = true;
 				continue;
 			}
 
@@ -504,9 +516,266 @@ verify_heapam(PG_FUNCTION_ARGS)
 			/* It should be safe to examine the tuple's header, at least */
 			ctx.tuphdr = (HeapTupleHeader) PageGetItem(ctx.page, ctx.itemid);
 			ctx.natts = HeapTupleHeaderGetNatts(ctx.tuphdr);
+			lp_valid[ctx.offnum] = true;
 
 			/* Ok, ready to check this next tuple */
 			check_tuple(&ctx);
+
+			/*
+			 * Add the data to the successor array if next updated tuple is in
+			 * the same page. It will be used later to generate the
+			 * predecessor array.
+			 *
+			 * We need to access the tuple's header to populate the
+			 * predecessor array. However the tuple is not necessarily sanity
+			 * checked yet so delaying construction of predecessor array until
+			 * all tuples are sanity checked.
+			 */
+			nextoffnum = ItemPointerGetOffsetNumber(&(ctx.tuphdr)->t_ctid);
+			if (ItemPointerGetBlockNumber(&(ctx.tuphdr)->t_ctid) == ctx.blkno &&
+				nextoffnum != ctx.offnum)
+			{
+				successor[ctx.offnum] = nextoffnum;
+			}
+		}
+
+		/*
+		 * Loop over offset and populate predecessor array from all entries
+		 * that are present in successor array.
+		 */
+		ctx.attnum = -1;
+		for (ctx.offnum = FirstOffsetNumber; ctx.offnum <= maxoff;
+			 ctx.offnum = OffsetNumberNext(ctx.offnum))
+		{
+			ItemId		curr_lp;
+			ItemId		next_lp;
+			HeapTupleHeader curr_htup;
+			HeapTupleHeader next_htup;
+			TransactionId curr_xmax;
+			TransactionId next_xmin;
+
+			OffsetNumber nextoffnum = successor[ctx.offnum];
+
+			if (nextoffnum == 0 || !lp_valid[ctx.offnum] || !lp_valid[nextoffnum])
+			{
+				/*
+				 * Set lp_valid of nextoffnum to false if current tuple's
+				 * lp_valid is true. We don't add this to predecessor array as
+				 * it's of no use to validate tuple if its predecessor is
+				 * already corrupted but we need to identify all those tuple's
+				 * so that we can differentiate between all the cases of
+				 * missing offset in predecessor array, this will help in
+				 * validating the root of chain when we loop over predecessor
+				 * array.
+				 */
+				if (!lp_valid[ctx.offnum] && lp_valid[nextoffnum])
+					lp_valid[nextoffnum] = false;
+
+				/*
+				 * This is either the last updated tuple in the chain or a
+				 * corrupted Tuple/lp or unused/dead line pointer.
+				 */
+				continue;
+			}
+
+			curr_lp = PageGetItemId(ctx.page, ctx.offnum);
+			if (ItemIdIsRedirected(curr_lp))
+			{
+				next_lp = PageGetItemId(ctx.page, nextoffnum);
+				if (ItemIdIsRedirected(next_lp))
+				{
+					report_corruption(&ctx,
+									  psprintf("redirected line pointer pointing to another redirected line pointer at offset %u",
+											   (unsigned) nextoffnum));
+					continue;
+				}
+				next_htup = (HeapTupleHeader) PageGetItem(ctx.page, next_lp);
+				if (!HeapTupleHeaderIsHeapOnly(next_htup))
+				{
+					report_corruption(&ctx,
+									  psprintf("redirected tuple at line pointer offset %u is not heap only tuple",
+											   (unsigned) nextoffnum));
+				}
+				if ((next_htup->t_infomask & HEAP_UPDATED) == 0)
+				{
+					report_corruption(&ctx,
+									  psprintf("redirected tuple at line pointer offset %u is not heap updated tuple",
+											   (unsigned) nextoffnum));
+				}
+
+				/*
+				 * Add data related to redirected offset to predecessor array
+				 * so that we can differentiate between all the cases of
+				 * missing offset in predecessor array, this will help in
+				 * validating the root of chain when we loop over predecessor
+				 * array.
+				 */
+				predecessor[nextoffnum] = ctx.offnum;
+				continue;
+			}
+
+			/*
+			 * Add a line pointer offset to the predecessor array if xmax is
+			 * matching with xmin of next tuple (reaching via its t_ctid).
+			 * Raise corruption if we have two tuples having the same
+			 * predecessor.
+			 *
+			 * We add the offset to the predecessor array irrespective of the
+			 * transaction (t_xmin) status. We will do validation related to
+			 * the transaction status (and also all other validations) when we
+			 * loop over the predecessor array.
+			 */
+			curr_htup = (HeapTupleHeader) PageGetItem(ctx.page, curr_lp);
+			curr_xmax = HeapTupleHeaderGetUpdateXid(curr_htup);
+
+			next_lp = PageGetItemId(ctx.page, nextoffnum);
+			next_htup = (HeapTupleHeader) PageGetItem(ctx.page, next_lp);
+			next_xmin = HeapTupleHeaderGetXmin(next_htup);
+			if (TransactionIdIsValid(curr_xmax) &&
+				TransactionIdEquals(curr_xmax, next_xmin))
+			{
+				if (predecessor[nextoffnum] != 0)
+				{
+					report_corruption(&ctx,
+									  psprintf("updated version at offset %u is also the updated version of tuple at offset %u",
+											   (unsigned) nextoffnum, (unsigned) predecessor[nextoffnum]));
+					continue;
+				}
+				predecessor[nextoffnum] = ctx.offnum;
+			}
+			else
+				lp_valid[nextoffnum] = false;
+			/* Non matching xmax with xmin is not a corruption */
+
+		}
+
+		/* Loop over offsets and validate the data in the predecessor array. */
+		for (OffsetNumber currentoffnum = FirstOffsetNumber; currentoffnum <= maxoff;
+			 currentoffnum = OffsetNumberNext(currentoffnum))
+		{
+			HeapTupleHeader pred_htup;
+			HeapTupleHeader curr_htup;
+			TransactionId pred_xmin;
+			TransactionId curr_xmin;
+			ItemId		pred_lp;
+			ItemId		curr_lp;
+			bool		pred_in_progress;
+			XidCommitStatus xid_commit_status;
+			XidBoundsViolation xid_status;
+
+			ctx.offnum = predecessor[currentoffnum];
+			ctx.attnum = -1;
+			curr_lp = PageGetItemId(ctx.page, currentoffnum);
+
+			if (!lp_valid[currentoffnum] || ItemIdIsRedirected(curr_lp))
+				continue;
+
+			curr_htup = (HeapTupleHeader) PageGetItem(ctx.page, curr_lp);
+			curr_xmin = HeapTupleHeaderGetXmin(curr_htup);
+			xid_status = get_xid_status(curr_xmin, &ctx, &xid_commit_status);
+			if (!(xid_status == XID_BOUNDS_OK || xid_status == XID_INVALID))
+				continue;
+
+			if (ctx.offnum == 0)
+			{
+				/*
+				 * No harm in overriding value of ctx.offnum as we will always
+				 * continue if we are here.
+				 */
+				ctx.offnum = currentoffnum;
+				if (TransactionIdIsInProgress(curr_xmin) || TransactionIdDidCommit(curr_xmin))
+				{
+					/* Root of chain must not be HEAP_ONLY_TUPLE. */
+					if (HeapTupleHeaderIsHeapOnly(curr_htup))
+					{
+						report_corruption(&ctx,
+										  psprintf("tuple is root of chain but it is marked as heap-only tuple"));
+					}
+					continue;
+				}
+				else
+				{
+					/*
+					 * xmin aborted tuple from an abandoned portion of the
+					 * chain.
+					 */
+					continue;
+				}
+			}
+
+			ctx.itemid = pred_lp = PageGetItemId(ctx.page, ctx.offnum);
+
+			/*
+			 * Redirected LP were validated previously, so don't need any
+			 * validation.
+			 */
+			if (ItemIdIsRedirected(pred_lp))
+				continue;
+
+			pred_htup = (HeapTupleHeader) PageGetItem(ctx.page, pred_lp);
+			pred_xmin = HeapTupleHeaderGetXmin(pred_htup);
+			xid_status = get_xid_status(pred_xmin, &ctx, &xid_commit_status);
+			if (!(xid_status == XID_BOUNDS_OK || xid_status == XID_INVALID))
+				continue;
+
+			/*
+			 * If the predecessor's xmin is in progress then current tuple's
+			 * xmin should either be aborted (in case of subtransaction) or
+			 * in-progress.
+			 */
+			pred_in_progress = TransactionIdIsInProgress(pred_xmin);
+			if (pred_in_progress && TransactionIdDidCommit(curr_xmin))
+			{
+				/* Re-check to avoid race condition */
+				if (TransactionIdIsInProgress(pred_xmin))
+				{
+					report_corruption(&ctx,
+									  psprintf("tuple with in-progress xmin %u was updated to produce a tuple at offset %u with committed xmin %u",
+											   (unsigned) pred_xmin, (unsigned) currentoffnum, (unsigned) curr_xmin));
+				}
+				else
+				{
+					pred_in_progress = false;
+				}
+			}
+
+			/* raise corruption if pred_xmin is aborted but curr_xmin is not */
+			if (!pred_in_progress && !TransactionIdDidCommit(pred_xmin))
+			{
+				if (TransactionIdIsInProgress(curr_xmin))
+					report_corruption(&ctx,
+									  psprintf("tuple with aborted xmin %u was updated to produce a tuple at offset %u with in-progress xmin %u",
+											   (unsigned) pred_xmin, (unsigned) currentoffnum, (unsigned) curr_xmin));
+				else if (TransactionIdDidCommit(curr_xmin))
+					report_corruption(&ctx,
+									  psprintf("tuple with aborted xmin %u was updated to produce a tuple at offset %u with committed xmin %u",
+											   (unsigned) pred_xmin, (unsigned) currentoffnum, (unsigned) curr_xmin));
+
+			}
+
+			/*
+			 * If the current tuple is HOT then it's predecessor's tuple must
+			 * be HEAP_HOT_UPDATED.
+			 */
+			if (!HeapTupleHeaderIsHotUpdated(pred_htup) &&
+				HeapTupleHeaderIsHeapOnly(curr_htup))
+			{
+				report_corruption(&ctx,
+								  psprintf("non-heap-only update produced a heap-only tuple at offset %u",
+										   (unsigned) currentoffnum));
+			}
+
+			/*
+			 * If the current tuple is not HOT then its predecessor's tuple
+			 * must not be HEAP_HOT_UPDATED.
+			 */
+			if (HeapTupleHeaderIsHotUpdated(pred_htup) &&
+				!HeapTupleHeaderIsHeapOnly(curr_htup))
+			{
+				report_corruption(&ctx,
+								  psprintf("heap-only update produced a non-heap only tuple at offset %u",
+										   (unsigned) currentoffnum));
+			}
 		}
 
 		/* clean up */
@@ -638,6 +907,7 @@ check_tuple_header(HeapCheckContext *ctx)
 {
 	HeapTupleHeader tuphdr = ctx->tuphdr;
 	uint16		infomask = tuphdr->t_infomask;
+	TransactionId curr_xmax = HeapTupleHeaderGetUpdateXid(tuphdr);
 	bool		result = true;
 	unsigned	expected_hoff;
 
@@ -649,6 +919,12 @@ check_tuple_header(HeapCheckContext *ctx)
 		result = false;
 	}
 
+	if (!TransactionIdIsValid(curr_xmax) && HeapTupleHeaderIsHotUpdated(tuphdr))
+	{
+		report_corruption(ctx,
+						  psprintf("tuple has been HOT updated, but xmax is 0"));
+		result = false;
+	}
 	if ((ctx->tuphdr->t_infomask & HEAP_XMAX_COMMITTED) &&
 		(ctx->tuphdr->t_infomask & HEAP_XMAX_IS_MULTI))
 	{
diff --git a/src/bin/pg_amcheck/t/004_verify_heapam.pl b/src/bin/pg_amcheck/t/004_verify_heapam.pl
index bbada168f0..a1abdfe862 100644
--- a/src/bin/pg_amcheck/t/004_verify_heapam.pl
+++ b/src/bin/pg_amcheck/t/004_verify_heapam.pl
@@ -174,6 +174,8 @@ sub write_tuple
 # Set umask so test directories and files are created with default permissions
 umask(0077);
 
+my $pred_xmax;
+my $aborted_xid;
 # Set up the node.  Once we create and corrupt the table,
 # autovacuum workers visiting the table could crash the backend.
 # Disable autovacuum so that won't happen.
@@ -217,7 +219,9 @@ my $rel = $node->safe_psql('postgres',
 my $relpath = "$pgdata/$rel";
 
 # Insert data and freeze public.test
-use constant ROWCOUNT => 16;
+use constant ROWCOUNT => 33 ; # Total row count in page.
+use constant ROWCOUNT_HOTCHAIN => 17; # Row count related to test of HOT chains validations and redirected LP.
+# First insert data needed for non-HOT chain validation.
 $node->safe_psql(
 	'postgres', qq(
 	INSERT INTO public.test (a, b, c)
@@ -227,7 +231,37 @@ $node->safe_psql(
 			repeat('w', 10000)
 		);
 	VACUUM FREEZE public.test
-	)) for (1 .. ROWCOUNT);
+	)) for (1 .. ROWCOUNT-ROWCOUNT_HOTCHAIN);
+
+# Data for Redirected LP.
+$node->safe_psql(
+	'postgres', qq(
+		INSERT INTO public.test (a, b, c)
+			VALUES ( x'DEADF9F9DEADF9F9'::bigint, 'abcdefg', generate_series(1,2));
+		UPDATE public.test SET c = 'a' WHERE c = '1';
+		UPDATE public.test SET c = 'a' WHERE c = '2';
+		VACUUM FREEZE public.test;
+	));
+
+# Data for HOT chains validation, so not calling VACUUM FREEZE.
+$node->safe_psql(
+	'postgres', qq(
+		INSERT INTO public.test (a, b, c)
+			VALUES ( x'DEADF9F9DEADF9F9'::bigint, 'abcdefg', generate_series(3,9));
+		UPDATE public.test SET c = 'a' WHERE c = '3';
+		UPDATE public.test SET c = 'a' WHERE c = '6';
+		UPDATE public.test SET c = 'a' WHERE c = '7';
+		UPDATE public.test SET c = 'a' WHERE c = '8';
+		UPDATE public.test SET c = 'a' WHERE c = '9';
+	));
+
+# Need one aborted transaction to test corruption in HOT chain.
+$node->safe_psql(
+	'postgres', qq(
+		BEGIN;
+			UPDATE public.test SET c = 'a' WHERE c = '5';
+		ABORT;
+	));
 
 my $relfrozenxid = $node->safe_psql('postgres',
 	q(select relfrozenxid from pg_class where relname = 'test'));
@@ -249,12 +283,21 @@ if ($datfrozenxid <= 3 || $datfrozenxid >= $relfrozenxid)
 my @lp_off;
 for my $tup (0 .. ROWCOUNT - 1)
 {
-	push(
-		@lp_off,
-		$node->safe_psql(
-			'postgres', qq(
-select lp_off from heap_page_items(get_raw_page('test', 'main', 0))
-	offset $tup limit 1)));
+	my $islpredirected = $node->safe_psql('postgres',
+		qq(select lp_flags from heap_page_items(get_raw_page('test', 'main', 0)) offset $tup limit 1));
+	if ($islpredirected != 2)
+	{
+		push(
+			@lp_off,
+			$node->safe_psql(
+				'postgres', qq(
+			select lp_off from heap_page_items(get_raw_page('test', 'main', 0))
+				offset $tup limit 1)));
+	}
+	else
+	{
+		push(@lp_off, (-1));
+	}
 }
 
 # Sanity check that our 'test' table on disk layout matches expectations.  If
@@ -271,6 +314,10 @@ for (my $tupidx = 0; $tupidx < ROWCOUNT; $tupidx++)
 {
 	my $offnum = $tupidx + 1;        # offnum is 1-based, not zero-based
 	my $offset = $lp_off[$tupidx];
+	if ($offset == -1)
+	{
+		next;
+	}
 	my $tup = read_tuple($file, $offset);
 
 	# Sanity-check that the data appears on the page where we expect.
@@ -283,7 +330,7 @@ for (my $tupidx = 0; $tupidx < ROWCOUNT; $tupidx++)
 		$node->clean_node;
 		plan skip_all =>
 		  sprintf(
-			"Page layout differs from our expectations: expected (%x, %x, \"%s\"), got (%x, %x, \"%s\")",
+			"Page layout of index %d differs from our expectations: expected (%x, %x, \"%s\"), got (%x, %x, \"%s\")", $tupidx,
 			0xDEADF9F9, 0xDEADF9F9, "abcdefg", $a_1, $a_2, $b);
 		exit;
 	}
@@ -318,6 +365,9 @@ use constant HEAP_XMAX_INVALID   => 0x0800;
 use constant HEAP_NATTS_MASK     => 0x07FF;
 use constant HEAP_XMAX_IS_MULTI  => 0x1000;
 use constant HEAP_KEYS_UPDATED   => 0x2000;
+use constant HEAP_HOT_UPDATED    => 0x4000;
+use constant HEAP_ONLY_TUPLE     => 0x8000;
+use constant HEAP_UPDATED        => 0x2000;
 
 # Helper function to generate a regular expression matching the header we
 # expect verify_heapam() to return given which fields we expect to be non-null.
@@ -349,9 +399,49 @@ for (my $tupidx = 0; $tupidx < ROWCOUNT; $tupidx++)
 {
 	my $offnum = $tupidx + 1;        # offnum is 1-based, not zero-based
 	my $offset = $lp_off[$tupidx];
+	my $header = header(0, $offnum, undef);
+	# offset -1 means its redirected lp.
+	if ($offset == -1)
+	{	# at offnum 19 we will unset HEAP_ONLY_TUPLE and HEAP_UPDATED flags.
+		if ($offnum == 17)
+		{
+			push @expected,
+			  qr/${header}redirected tuple at line pointer offset \d+ is not heap only tuple/;
+			push @expected,
+			  qr/${header}redirected tuple at line pointer offset \d+ is not heap updated tuple/;
+		}
+		elsif ($offnum == 18)
+		{
+			# we re-set lp offset to 17, we need to rewrite the 4 bytes values so that line pointer will be
+			# lp.off = 17, lp_flags = 2, lp_len = 0.
+			if ($ENDIANNESS eq 'little')
+			{
+				sysseek($file, 92, 0)
+				  or BAIL_OUT("sysseek failed: $!");
+				syswrite(
+					$file,
+					pack("L",
+						0x00010011)
+				) or BAIL_OUT("syswrite failed: $!");
+			}
+			else
+			{
+				sysseek($file, 92, 0)
+				  or BAIL_OUT("sysseek failed: $!");
+				syswrite(
+					$file,
+					pack("L",
+						0x11000100)
+				) or BAIL_OUT("syswrite failed: $!");
+
+			}
+			push @expected,
+			  qr/${header}redirected line pointer pointing to another redirected line pointer at offset \d+/;
+		}
+		next;
+	}
 	my $tup = read_tuple($file, $offset);
 
-	my $header = header(0, $offnum, undef);
 	if ($offnum == 1)
 	{
 		# Corruptly set xmin < relfrozenxid
@@ -502,7 +592,7 @@ for (my $tupidx = 0; $tupidx < ROWCOUNT; $tupidx++)
 		push @expected,
 		  qr/${header}multitransaction ID 4 equals or exceeds next valid multitransaction ID 1/;
 	}
-	elsif ($offnum == 15)    # Last offnum must equal ROWCOUNT
+	elsif ($offnum == 15)
 	{
 		# Set both HEAP_XMAX_COMMITTED and HEAP_XMAX_IS_MULTI
 		$tup->{t_infomask} |= HEAP_XMAX_COMMITTED;
@@ -512,6 +602,87 @@ for (my $tupidx = 0; $tupidx < ROWCOUNT; $tupidx++)
 		push @expected,
 		  qr/${header}multitransaction ID 4000000000 precedes relation minimum multitransaction ID threshold 1/;
 	}
+	# Test for redirected line pointer.
+	# offnum 17 and 18 are redirected line pointer, so don't need any tuple
+	# validation.
+	elsif ($offnum == 19)
+	{
+		# unset HEAP_ONLY_TUPLE and HEAP_UPDATED flag for redirected tuple.
+		$tup->{t_infomask2} &= ~HEAP_ONLY_TUPLE;
+		$tup->{t_infomask} &= ~HEAP_UPDATED;
+	}
+	# offnum 20 is redirected tuple of lp at offset 18,
+	# We have corrupted it to route its lp.off to point it to line pointer at
+	# offset 17.
+
+	# Test related to HOT chains.
+	elsif ($offnum == 21)
+	{
+		# Unset HEAP_HOT_UPDATED.
+		$tup->{t_infomask2} &= ~HEAP_HOT_UPDATED;
+		$pred_xmax = $tup->{t_xmax}; # to be used for tuple at offnum 22.
+		push @expected,
+		  qr/${header}non-heap-only update produced a heap-only tuple at offset \d+/;
+	}
+	elsif ($offnum == 22)
+	{
+		# Set ip_posid and t_xmax from ip_posid and t_xmax of tuple at offnum 21.
+		$tup->{t_xmax} = $pred_xmax;
+		$tup->{ip_posid} = 28;
+		push @expected,
+		  qr/${header}updated version at offset \d+ is also the updated version of tuple at offset \d+/;
+	}
+	elsif ($offnum == 23)
+	{
+		# Get aborted xid, that is needed to test corruption at offnum 24.
+		$aborted_xid = $tup->{t_xmax};
+	}
+	elsif ($offnum == 24)
+	{
+		# Set xmin to aborted xid.
+		$tup->{t_xmin} = $aborted_xid;
+		$tup->{t_infomask} &= ~HEAP_XMIN_COMMITTED;
+		push @expected,
+		  qr/${header}tuple with aborted xmin \d+ was updated to produce a tuple at offset \d+ with committed xmin \d+/;
+	}
+	elsif ($offnum == 25)
+	{
+		# Raised corruption as next updated tuple at offnum 30 is corrupted.
+		# set HEAP_ONLY_TUPLE.
+		$tup->{t_infomask2} |= HEAP_ONLY_TUPLE;
+		push @expected,
+		  qr/${header}tuple is root of chain but it is marked as heap-only tuple/;
+	}
+	elsif ($offnum == 26)
+	{
+		# Next updated Tuple at offnum 31 is corrupted.
+		push @expected,
+		  qr/${header}heap-only update produced a non-heap only tuple at offset \d+/;
+	}
+	elsif ($offnum == 27)
+	{
+		# set xmax to invalid transaction id.
+		$tup->{t_xmax} = 0;
+		push @expected,
+		  qr/${header}tuple has been HOT updated, but xmax is 0/;
+	}
+	# Tuple at offnum 28 is an update of corrupted tuple at offnum 21.
+
+	# Tuple at offnum 29 is an update of corrupted tuple at offnum 24, and is tested for
+	# corruption related to aborted transaction.
+
+	# Tuple at offnum 30 is an update of tuple at 25.
+
+	# Tuple at offnum 31 is an update of tuple at 26.
+	elsif($offnum == 31)
+	{
+		# Unset HEAP_ONLY_TUPLE
+		$tup->{t_infomask2} &= ~HEAP_ONLY_TUPLE;
+	}
+	# Tuple at offnum 32 is an update of corrupted tuple at offnum 27.
+
+	# offset 33 is an updated tuple of tuple at offset #23 and was updated by an aborted transaction.
+	# this is needed to have aborted transaction xid to test corruption related to aborted transaction at offset #24.
 	write_tuple($file, $offset, $tup);
 }
 close($file)
-- 
2.25.1

Reply via email to