From d3872df7b117425e21c055e4459de1bfd24ddeaa Mon Sep 17 00:00:00 2001
From: Mark Dilger <mark.dilger@enterprisedb.com>
Date: Tue, 23 Mar 2021 22:57:50 -0700
Subject: [PATCH v11 2/2] pg_amcheck: extend toast corruption reports

Modify amcheck to perform additional toast corruption checks.  Fix
some amcheck messages not to include the attribute number, which is
redundant given that attnum is already one of the returned columns.
Fix others to include the toast value ID in the message text.

Commit bbe0a81db69bd10bd166907c3701492a29aca294 introduced lz4 as a
toast compression method.  Add checks for the new compression method
ID field and report corruption on unexpected methods and for
compression method TOAST_LZ4_COMPRESSION_ID if encountered on a
server built without lz4.  Stop short of calling the appropriate
decompression function on the toasted attribute, as users may not
appreciate the extra CPU overhead that entails, unless compiled with
new symbol DECOMPRESSION_CORRUPTION_CHECKING defined, in which case
decompression failures are reported as corruption.
---
 contrib/amcheck/Makefile                  |   4 +
 contrib/amcheck/verify_heapam.c           | 327 ++++++++++++++++------
 src/bin/pg_amcheck/t/004_verify_heapam.pl |  70 ++++-
 3 files changed, 312 insertions(+), 89 deletions(-)

diff --git a/contrib/amcheck/Makefile b/contrib/amcheck/Makefile
index b82f221e50..78c4051096 100644
--- a/contrib/amcheck/Makefile
+++ b/contrib/amcheck/Makefile
@@ -12,6 +12,10 @@ PGFILEDESC = "amcheck - function for verifying relation integrity"
 
 REGRESS = check check_btree check_heap
 
+## Uncomment if compiling with DECOMPRESSION_CORRUPTION_CHECKING
+#
+# SHLIB_LINK += $(filter -llz4, $(LIBS))
+
 TAP_TESTS = 1
 
 ifdef USE_PGXS
diff --git a/contrib/amcheck/verify_heapam.c b/contrib/amcheck/verify_heapam.c
index e377b9ab8e..1f80db82f4 100644
--- a/contrib/amcheck/verify_heapam.c
+++ b/contrib/amcheck/verify_heapam.c
@@ -10,6 +10,10 @@
  */
 #include "postgres.h"
 
+#ifdef USE_LZ4
+#include <lz4.h>
+#endif
+
 #include "access/detoast.h"
 #include "access/genam.h"
 #include "access/heapam.h"
@@ -18,6 +22,7 @@
 #include "access/toast_internals.h"
 #include "access/visibilitymap.h"
 #include "catalog/pg_am.h"
+#include "common/pg_lzcompress.h"
 #include "funcapi.h"
 #include "miscadmin.h"
 #include "storage/bufmgr.h"
@@ -114,6 +119,8 @@ typedef struct HeapCheckContext
 	AttrNumber	attnum;
 
 	/* Values for iterating over toast for the attribute */
+	struct varatt_external toast_pointer;
+	bool		checking_toastptr;
 	int32		chunkno;
 	int32		attrsize;
 	int32		endchunk;
@@ -130,11 +137,11 @@ typedef struct HeapCheckContext
 /* Internal implementation */
 static void sanity_check_relation(Relation rel);
 static void check_tuple(HeapCheckContext *ctx);
-static void check_toast_tuple(HeapTuple toasttup, HeapCheckContext *ctx);
+static int32 check_toast_tuple(HeapTuple toasttup, HeapCheckContext *ctx,
+							   bool *toast_error);
 
 static bool check_tuple_attribute(HeapCheckContext *ctx);
-static bool check_tuple_header_and_visibilty(HeapTupleHeader tuphdr,
-											 HeapCheckContext *ctx);
+static bool check_tuple_header(HeapTupleHeader tuphdr, HeapCheckContext *ctx);
 
 static void report_corruption(HeapCheckContext *ctx, char *msg);
 static TupleDesc verify_heapam_tupdesc(void);
@@ -345,6 +352,7 @@ verify_heapam(PG_FUNCTION_ARGS)
 	if (TransactionIdIsNormal(ctx.relfrozenxid))
 		ctx.oldest_xid = ctx.relfrozenxid;
 
+	ctx.checking_toastptr = false;
 	for (ctx.blkno = first_block; ctx.blkno <= last_block; ctx.blkno++)
 	{
 		OffsetNumber maxoff;
@@ -557,16 +565,11 @@ verify_heapam_tupdesc(void)
 }
 
 /*
- * Check for tuple header corruption and tuple visibility.
- *
- * Since we do not hold a snapshot, tuple visibility is not a question of
- * whether we should be able to see the tuple relative to any particular
- * snapshot, but rather a question of whether it is safe and reasonable to
- * check the tuple attributes.
+ * Check for tuple header corruption.
  *
  * Some kinds of corruption make it unsafe to check the tuple attributes, for
  * example when the line pointer refers to a range of bytes outside the page.
- * In such cases, we return false (not visible) after recording appropriate
+ * In such cases, we return false (not checkable) after recording appropriate
  * corruption messages.
  *
  * Some other kinds of tuple header corruption confuse the question of where
@@ -581,23 +584,11 @@ verify_heapam_tupdesc(void)
  * messages for them but do not base our visibility determination on them.  (In
  * other words, we do not return false merely because we detected them.)
  *
- * For visibility determination not specifically related to corruption, what we
- * want to know is if a tuple is potentially visible to any running
- * transaction.  If you are tempted to replace this function's visibility logic
- * with a call to another visibility checking function, keep in mind that this
- * function does not update hint bits, as it seems imprudent to write hint bits
- * (or anything at all) to a table during a corruption check.  Nor does this
- * function bother classifying tuple visibility beyond a boolean visible vs.
- * not visible.
- *
- * The caller should already have checked that xmin and xmax are not out of
- * bounds for the relation.
- *
- * Returns whether the tuple is both visible and sufficiently sensible to
- * undergo attribute checks.
+ * Returns whether the tuple is sufficiently sensible to undergo attribute
+ * checks.
  */
 static bool
-check_tuple_header_and_visibilty(HeapTupleHeader tuphdr, HeapCheckContext *ctx)
+check_tuple_header(HeapTupleHeader tuphdr, HeapCheckContext *ctx)
 {
 	uint16		infomask = tuphdr->t_infomask;
 	bool		header_garbled = false;
@@ -653,14 +644,7 @@ check_tuple_header_and_visibilty(HeapTupleHeader tuphdr, HeapCheckContext *ctx)
 	if (header_garbled)
 		return false;			/* checking of this tuple should not continue */
 
-	/*
-	 * Ok, we can examine the header for tuple visibility purposes, though we
-	 * still need to be careful about a few remaining types of header
-	 * corruption.  This logic roughly follows that of
-	 * HeapTupleSatisfiesVacuum.  Where possible the comments indicate which
-	 * HTSV_Result we think that function might return for this tuple.
-	 */
-	return heap_tuple_satisfies_corruption_checking(tuphdr);
+	return true;
 }
 
 /*
@@ -673,9 +657,12 @@ check_tuple_header_and_visibilty(HeapTupleHeader tuphdr, HeapCheckContext *ctx)
  * tuples that store the toasted value are retrieved and checked in order, with
  * each toast tuple being checked against where we are in the sequence, as well
  * as each toast tuple having its varlena structure sanity checked.
+ *
+ * Returns the size of the chunk, not including the header, or zero if it
+ * cannot be determined due to corruption.
  */
-static void
-check_toast_tuple(HeapTuple toasttup, HeapCheckContext *ctx)
+static int32
+check_toast_tuple(HeapTuple toasttup, HeapCheckContext *ctx, bool *toast_error)
 {
 	int32		curchunk;
 	Pointer		chunk;
@@ -692,7 +679,8 @@ check_toast_tuple(HeapTuple toasttup, HeapCheckContext *ctx)
 	{
 		report_corruption(ctx,
 						  pstrdup("toast chunk sequence number is null"));
-		return;
+		*toast_error = true;
+		return 0;
 	}
 	chunk = DatumGetPointer(fastgetattr(toasttup, 3,
 										ctx->toast_rel->rd_att, &isnull));
@@ -700,7 +688,8 @@ check_toast_tuple(HeapTuple toasttup, HeapCheckContext *ctx)
 	{
 		report_corruption(ctx,
 						  pstrdup("toast chunk data is null"));
-		return;
+		*toast_error = true;
+		return 0;
 	}
 	if (!VARATT_IS_EXTENDED(chunk))
 		chunksize = VARSIZE(chunk) - VARHDRSZ;
@@ -717,9 +706,11 @@ check_toast_tuple(HeapTuple toasttup, HeapCheckContext *ctx)
 		uint32		header = ((varattrib_4b *) chunk)->va_4byte.va_header;
 
 		report_corruption(ctx,
-						  psprintf("corrupt extended toast chunk has invalid varlena header: %0x (sequence number %d)",
-								   header, curchunk));
-		return;
+						  psprintf("toast value ID %u corrupt extended chunk has invalid varlena header: %0x (sequence number %d)",
+								   ctx->toast_pointer.va_valueid, header,
+								   curchunk));
+		*toast_error = true;
+		return 0;
 	}
 
 	/*
@@ -728,16 +719,20 @@ check_toast_tuple(HeapTuple toasttup, HeapCheckContext *ctx)
 	if (curchunk != ctx->chunkno)
 	{
 		report_corruption(ctx,
-						  psprintf("toast chunk sequence number %u does not match the expected sequence number %u",
-								   curchunk, ctx->chunkno));
-		return;
+						  psprintf("toast value ID %u chunk sequence number %u does not match the expected sequence number %u",
+								   ctx->toast_pointer.va_valueid, curchunk,
+								   ctx->chunkno));
+		*toast_error = true;
+		return chunksize;
 	}
 	if (curchunk > ctx->endchunk)
 	{
 		report_corruption(ctx,
-						  psprintf("toast chunk sequence number %u exceeds the end chunk sequence number %u",
-								   curchunk, ctx->endchunk));
-		return;
+						  psprintf("toast value ID %u chunk sequence number %u exceeds the end chunk sequence number %u",
+								   ctx->toast_pointer.va_valueid, curchunk,
+								   ctx->endchunk));
+		*toast_error = true;
+		return chunksize;
 	}
 
 	expected_size = curchunk < ctx->totalchunks - 1 ? TOAST_MAX_CHUNK_SIZE
@@ -745,10 +740,14 @@ check_toast_tuple(HeapTuple toasttup, HeapCheckContext *ctx)
 	if (chunksize != expected_size)
 	{
 		report_corruption(ctx,
-						  psprintf("toast chunk size %u differs from the expected size %u",
-								   chunksize, expected_size));
-		return;
+						  psprintf("toast value ID %u chunk size %u differs from the expected size %u",
+								   ctx->toast_pointer.va_valueid, chunksize,
+								   expected_size));
+		*toast_error = true;
+		return chunksize;
 	}
+
+	return chunksize;
 }
 
 /*
@@ -774,18 +773,21 @@ check_toast_tuple(HeapTuple toasttup, HeapCheckContext *ctx)
 static bool
 check_tuple_attribute(HeapCheckContext *ctx)
 {
-	struct varatt_external toast_pointer;
 	ScanKeyData toastkey;
 	SysScanDesc toastscan;
 	SnapshotData SnapshotToast;
 	HeapTuple	toasttup;
+	int64		toastsize;		/* corrupt toast could overflow 32 bits */
 	bool		found_toasttup;
+	bool		toast_error;
 	Datum		attdatum;
 	struct varlena *attr;
 	char	   *tp;				/* pointer to the tuple data */
 	uint16		infomask;
 	Form_pg_attribute thisatt;
 
+	Assert(!ctx->checking_toastptr);
+
 	infomask = ctx->tuphdr->t_infomask;
 	thisatt = TupleDescAttr(RelationGetDescr(ctx->rel), ctx->attnum);
 
@@ -794,8 +796,7 @@ check_tuple_attribute(HeapCheckContext *ctx)
 	if (ctx->tuphdr->t_hoff + ctx->offset > ctx->lp_len)
 	{
 		report_corruption(ctx,
-						  psprintf("attribute %u with length %u starts at offset %u beyond total tuple length %u",
-								   ctx->attnum,
+						  psprintf("attribute with length %u starts at offset %u beyond total tuple length %u",
 								   thisatt->attlen,
 								   ctx->tuphdr->t_hoff + ctx->offset,
 								   ctx->lp_len));
@@ -815,8 +816,7 @@ check_tuple_attribute(HeapCheckContext *ctx)
 		if (ctx->tuphdr->t_hoff + ctx->offset > ctx->lp_len)
 		{
 			report_corruption(ctx,
-							  psprintf("attribute %u with length %u ends at offset %u beyond total tuple length %u",
-									   ctx->attnum,
+							  psprintf("attribute with length %u ends at offset %u beyond total tuple length %u",
 									   thisatt->attlen,
 									   ctx->tuphdr->t_hoff + ctx->offset,
 									   ctx->lp_len));
@@ -848,8 +848,7 @@ check_tuple_attribute(HeapCheckContext *ctx)
 		if (va_tag != VARTAG_ONDISK)
 		{
 			report_corruption(ctx,
-							  psprintf("toasted attribute %u has unexpected TOAST tag %u",
-									   ctx->attnum,
+							  psprintf("toasted attribute has unexpected TOAST tag %u",
 									   va_tag));
 			/* We can't know where the next attribute begins */
 			return false;
@@ -863,8 +862,7 @@ check_tuple_attribute(HeapCheckContext *ctx)
 	if (ctx->tuphdr->t_hoff + ctx->offset > ctx->lp_len)
 	{
 		report_corruption(ctx,
-						  psprintf("attribute %u with length %u ends at offset %u beyond total tuple length %u",
-								   ctx->attnum,
+						  psprintf("attribute with length %u ends at offset %u beyond total tuple length %u",
 								   thisatt->attlen,
 								   ctx->tuphdr->t_hoff + ctx->offset,
 								   ctx->lp_len));
@@ -891,12 +889,20 @@ check_tuple_attribute(HeapCheckContext *ctx)
 
 	/* It is external, and we're looking at a page on disk */
 
+	/*
+	 * Must copy attr into toast_pointer for alignment considerations
+	 */
+	VARATT_EXTERNAL_GET_POINTER(ctx->toast_pointer, attr);
+	ctx->checking_toastptr = true;
+	toast_error = false;
+
 	/* The tuple header better claim to contain toasted values */
 	if (!(infomask & HEAP_HASEXTERNAL))
 	{
 		report_corruption(ctx,
-						  psprintf("attribute %u is external but tuple header flag HEAP_HASEXTERNAL not set",
-								   ctx->attnum));
+						  psprintf("toast value ID %u is external but tuple header flag HEAP_HASEXTERNAL not set",
+								   ctx->toast_pointer.va_valueid));
+		ctx->checking_toastptr = false;
 		return true;
 	}
 
@@ -904,21 +910,41 @@ check_tuple_attribute(HeapCheckContext *ctx)
 	if (!ctx->rel->rd_rel->reltoastrelid)
 	{
 		report_corruption(ctx,
-						  psprintf("attribute %u is external but relation has no toast relation",
-								   ctx->attnum));
+						  psprintf("toast value ID %u is external but relation has no toast relation",
+								   ctx->toast_pointer.va_valueid));
+		ctx->checking_toastptr = false;
+		return true;
+	}
+
+	if (VARATT_EXTERNAL_GET_EXTSIZE(ctx->toast_pointer) > ctx->toast_pointer.va_rawsize - VARHDRSZ)
+	{
+		report_corruption(ctx,
+						  psprintf("toast value ID %u external size %u exceeds maximum expected for rawsize %u",
+								   ctx->toast_pointer.va_valueid,
+								   VARATT_EXTERNAL_GET_EXTSIZE(ctx->toast_pointer),
+								   ctx->toast_pointer.va_rawsize));
+		toast_error = true;
+	}
+
+	if (ctx->toast_pointer.va_toastrelid != ctx->rel->rd_rel->reltoastrelid)
+	{
+		report_corruption(ctx,
+						  psprintf("toast value ID %u toast relation oid %u differs from expected oid %u",
+								   ctx->toast_pointer.va_valueid,
+								   ctx->toast_pointer.va_toastrelid,
+								   ctx->rel->rd_rel->reltoastrelid));
+		ctx->checking_toastptr = false;
 		return true;
 	}
 
 	/* If we were told to skip toast checking, then we're done. */
 	if (ctx->toast_rel == NULL)
+	{
+		ctx->checking_toastptr = false;
 		return true;
+	}
 
-	/*
-	 * Must copy attr into toast_pointer for alignment considerations
-	 */
-	VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
-
-	ctx->attrsize = VARATT_EXTERNAL_GET_EXTSIZE(toast_pointer);
+	ctx->attrsize = VARATT_EXTERNAL_GET_EXTSIZE(ctx->toast_pointer);
 	ctx->endchunk = (ctx->attrsize - 1) / TOAST_MAX_CHUNK_SIZE;
 	ctx->totalchunks = ctx->endchunk + 1;
 
@@ -928,7 +954,7 @@ check_tuple_attribute(HeapCheckContext *ctx)
 	ScanKeyInit(&toastkey,
 				(AttrNumber) 1,
 				BTEqualStrategyNumber, F_OIDEQ,
-				ObjectIdGetDatum(toast_pointer.va_valueid));
+				ObjectIdGetDatum(ctx->toast_pointer.va_valueid));
 
 	/*
 	 * Check if any chunks for this toasted object exist in the toast table,
@@ -941,24 +967,140 @@ check_tuple_attribute(HeapCheckContext *ctx)
 										   &toastkey);
 	ctx->chunkno = 0;
 	found_toasttup = false;
+	toastsize = 0;
 	while ((toasttup =
 			systable_getnext_ordered(toastscan,
 									 ForwardScanDirection)) != NULL)
 	{
 		found_toasttup = true;
-		check_toast_tuple(toasttup, ctx);
+		toastsize += check_toast_tuple(toasttup, ctx, &toast_error);
 		ctx->chunkno++;
 	}
+	systable_endscan_ordered(toastscan);
+
 	if (!found_toasttup)
 		report_corruption(ctx,
-						  psprintf("toasted value for attribute %u missing from toast table",
-								   ctx->attnum));
+						  psprintf("toasted value ID %u missing from toast table",
+								   ctx->toast_pointer.va_valueid));
 	else if (ctx->chunkno != (ctx->endchunk + 1))
 		report_corruption(ctx,
-						  psprintf("final toast chunk number %u differs from expected value %u",
-								   ctx->chunkno, (ctx->endchunk + 1)));
-	systable_endscan_ordered(toastscan);
+						  psprintf("toast value ID %u final chunk number %u differs from expected value %u",
+								   ctx->toast_pointer.va_valueid, ctx->chunkno,
+								   (ctx->endchunk + 1)));
+	else if (toastsize != VARATT_EXTERNAL_GET_EXTSIZE(ctx->toast_pointer))
+		report_corruption(ctx,
+						  psprintf("toast value ID %u total toast size " INT64_FORMAT " differs from expected size %u",
+								   ctx->toast_pointer.va_valueid, toastsize,
+								   VARATT_EXTERNAL_GET_EXTSIZE(ctx->toast_pointer)));
+	else if (!toast_error)
+	{
+		if (!AllocSizeIsValid(ctx->toast_pointer.va_rawsize))
+		{
+			report_corruption(ctx,
+							  psprintf("toast value ID %u rawsize %u too large to be allocated",
+									   ctx->toast_pointer.va_valueid,
+									   ctx->toast_pointer.va_rawsize));
+			toast_error = true;
+		}
+
+		if (!AllocSizeIsValid(VARATT_EXTERNAL_GET_EXTSIZE(ctx->toast_pointer)))
+		{
+			report_corruption(ctx,
+							  psprintf("toast value ID %u extsize %u too large to be allocated",
+									   VARATT_EXTERNAL_GET_EXTSIZE(ctx->toast_pointer),
+									   ctx->toast_pointer.va_valueid));
+			toast_error = true;
+		}
+
+		if (!toast_error)
+		{
+			Size		allocsize;
+			struct varlena *attr;
+
+			/* Fetch all chunks */
+			allocsize = VARATT_EXTERNAL_GET_EXTSIZE(ctx->toast_pointer) + VARHDRSZ;
+			attr = (struct varlena *) palloc(allocsize);
+			if (VARATT_EXTERNAL_IS_COMPRESSED(ctx->toast_pointer))
+				SET_VARSIZE_COMPRESSED(attr, allocsize);
+			else
+				SET_VARSIZE(attr, allocsize);
+
+			table_relation_fetch_toast_slice(ctx->toast_rel, ctx->toast_pointer.va_valueid,
+											 toastsize, 0, toastsize, attr);
 
+			if (VARATT_IS_COMPRESSED(attr))
+			{
+#ifdef DECOMPRESSION_CORRUPTION_CHECKING
+				struct varlena *uncompressed;
+				int32		rawsize;
+#endif
+				Size		allocsize;
+				ToastCompressionId cmid;
+
+				/* allocate memory for the uncompressed data */
+				allocsize = VARDATA_COMPRESSED_GET_EXTSIZE(attr) + VARHDRSZ;
+				if (!AllocSizeIsValid(allocsize))
+					report_corruption(ctx,
+									  psprintf("toast value ID %u invalid uncompressed size %zu",
+											   ctx->toast_pointer.va_valueid,
+											   allocsize));
+				cmid = TOAST_COMPRESS_METHOD(attr);
+				switch (cmid)
+				{
+					case TOAST_PGLZ_COMPRESSION_ID:
+#ifdef DECOMPRESSION_CORRUPTION_CHECKING
+						/* decompress the data */
+						uncompressed = (struct varlena *) palloc(allocsize);
+						rawsize = pglz_decompress((char *) attr + VARHDRSZ_COMPRESSED,
+												  VARSIZE(attr) - VARHDRSZ_COMPRESSED,
+												  VARDATA(uncompressed),
+												  VARDATA_COMPRESSED_GET_EXTSIZE(attr), true);
+						if (rawsize < 0)
+							report_corruption(ctx,
+											  psprintf("toast value ID %u compressed pglz data is corrupt",
+													   ctx->toast_pointer.va_valueid));
+						pfree(uncompressed);
+#endif
+						break;
+					case TOAST_LZ4_COMPRESSION_ID:
+#ifndef USE_LZ4
+						report_corruption(ctx,
+										  psprintf("toast value ID %u unsupported LZ4 compression method",
+												   ctx->toast_pointer.va_valueid));
+#else
+#ifdef DECOMPRESSION_CORRUPTION_CHECKING
+						/* decompress the data */
+						uncompressed = (struct varlena *) palloc(allocsize);
+						rawsize = LZ4_decompress_safe((char *) attr + VARHDRSZ_COMPRESSED,
+													  VARDATA(uncompressed),
+													  VARSIZE(attr) - VARHDRSZ_COMPRESSED,
+													  VARDATA_COMPRESSED_GET_EXTSIZE(attr));
+						if (rawsize < 0)
+							report_corruption(ctx,
+											  psprintf("toast value ID %u compressed lz4 data is corrupt",
+													   ctx->toast_pointer.va_valueid));
+						pfree(uncompressed);
+#endif
+#endif
+						break;
+					default:
+						report_corruption(ctx,
+										  psprintf("toast value ID %u invalid compression method id %d",
+												   ctx->toast_pointer.va_valueid,
+												   cmid));
+				}
+			}
+			else if (VARSIZE(attr) != ctx->toast_pointer.va_rawsize)
+				report_corruption(ctx,
+								  psprintf("toast value ID %u detoasted attribute size %u differs from expected rawsize %u",
+										   ctx->toast_pointer.va_valueid,
+										   VARSIZE(attr),
+										   ctx->toast_pointer.va_rawsize));
+			pfree(attr);
+		}
+	}
+
+	ctx->checking_toastptr = false;
 	return true;
 }
 
@@ -1093,10 +1235,16 @@ check_tuple(HeapCheckContext *ctx)
 
 	/*
 	 * Check various forms of tuple header corruption.  If the header is too
-	 * corrupt to continue checking, or if the tuple is not visible to anyone,
-	 * we cannot continue with other checks.
+	 * corrupt to continue checking, we cannot continue with other checks.
 	 */
-	if (!check_tuple_header_and_visibilty(ctx->tuphdr, ctx))
+	if (!check_tuple_header(ctx->tuphdr, ctx))
+		return;
+
+	/*
+	 * If we know the tuple is visible to at least some running transactions,
+	 * we can check it.  Otherwise, we are done with this tuple.
+	 */
+	if (!heap_tuple_satisfies_corruption_checking(ctx->tuphdr))
 		return;
 
 	/*
@@ -1314,12 +1462,25 @@ get_xid_status(TransactionId xid, HeapCheckContext *ctx,
 /*
  * heap_tuple_satisfies_corruption_checking
  *
- * Determine the visibility of tuples for corruption checking purposes.  If a
- * tuple might not be visible to any running transaction, then we must not
- * check it.  This function is based heavily on
- * HeapTupleSatisfiesVacuumHorizon, with comments indicating what we think that
- * function would return for the tuple.
+ * Since we do not hold a snapshot, tuple visibility is not a question of
+ * whether we should be able to see the tuple relative to any particular
+ * snapshot, but rather a question of whether it is safe and reasonable to
+ * check the tuple attributes.
+ *
+ * For visibility determination, what we want to know is if a tuple is
+ * potentially visible to any running transaction.  If you are tempted to
+ * replace this function's visibility logic with a call to another visibility
+ * checking function, keep in mind that this function does not update hint
+ * bits, as it seems imprudent to write hint bits (or anything at all) to a
+ * table during a corruption check.  Nor does this function bother classifying
+ * tuple visibility beyond a boolean visible vs. not visible.
+ *
+ * The caller should already have checked that xmin and xmax are not out of
+ * bounds for the relation, and that the tuple header is not too garbled for
+ * header fields to be consulted.
  *
+ * This function is based heavily on HeapTupleSatisfiesVacuumHorizon, with
+ * comments indicating what we think that function would return for the tuple.
  * Callers should first check for tuple header corruption that might cause this
  * function to error or assert prior to calling.  Changing this function to be
  * more robust against errors is less desireable than hardening the code prior
diff --git a/src/bin/pg_amcheck/t/004_verify_heapam.pl b/src/bin/pg_amcheck/t/004_verify_heapam.pl
index 36607596b1..ef5043ba30 100644
--- a/src/bin/pg_amcheck/t/004_verify_heapam.pl
+++ b/src/bin/pg_amcheck/t/004_verify_heapam.pl
@@ -224,7 +224,7 @@ my $rel = $node->safe_psql('postgres', qq(SELECT pg_relation_filepath('public.te
 my $relpath = "$pgdata/$rel";
 
 # Insert data and freeze public.test
-use constant ROWCOUNT => 16;
+use constant ROWCOUNT => 21;
 $node->safe_psql('postgres', qq(
 	INSERT INTO public.test (a, b, c)
 		VALUES (
@@ -259,6 +259,13 @@ select lp_off from heap_page_items(get_raw_page('test', 'main', 0))
 	offset $tup limit 1)));
 }
 
+# Find our toast relation id
+my $toastrelid = $node->safe_psql('postgres', qq(
+	SELECT c.reltoastrelid
+		FROM pg_catalog.pg_class c
+		WHERE c.oid = 'public.test'::regclass
+		));
+
 # Sanity check that our 'test' table on disk layout matches expectations.  If
 # this is not so, we will have to skip the test until somebody updates the test
 # to work on this platform.
@@ -296,7 +303,7 @@ close($file)
 $node->start;
 
 # Ok, Xids and page layout look ok.  We can run corruption tests.
-plan tests => 19;
+plan tests => 29;
 
 # Check that pg_amcheck runs against the uncorrupted table without error.
 $node->command_ok(['pg_amcheck', '-p', $port, 'postgres'],
@@ -310,6 +317,7 @@ $node->stop;
 
 # Some #define constants from access/htup_details.h for use while corrupting.
 use constant HEAP_HASNULL            => 0x0001;
+use constant HEAP_HASEXTERNAL        => 0x0004;
 use constant HEAP_XMAX_LOCK_ONLY     => 0x0080;
 use constant HEAP_XMIN_COMMITTED     => 0x0100;
 use constant HEAP_XMIN_INVALID       => 0x0200;
@@ -362,7 +370,7 @@ for (my $tupidx = 0; $tupidx < ROWCOUNT; $tupidx++)
 		push @expected,
 			qr/${header}xmin $xmin precedes relation freeze threshold 0:\d+/;
 	}
-	if ($offnum == 2)
+	elsif ($offnum == 2)
 	{
 		# Corruptly set xmin < datfrozenxid
 		my $xmin = 3;
@@ -480,7 +488,7 @@ for (my $tupidx = 0; $tupidx < ROWCOUNT; $tupidx++)
 
 		$header = header(0, $offnum, 1);
 		push @expected,
-			qr/${header}attribute \d+ with length \d+ ends at offset \d+ beyond total tuple length \d+/;
+			qr/${header}attribute with length \d+ ends at offset \d+ beyond total tuple length \d+/;
 	}
 	elsif ($offnum == 13)
 	{
@@ -489,9 +497,18 @@ for (my $tupidx = 0; $tupidx < ROWCOUNT; $tupidx++)
 
 		$header = header(0, $offnum, 2);
 		push @expected,
-			qr/${header}toasted value for attribute 2 missing from toast table/;
+			qr/${header}toasted value ID \d+ missing from toast table/;
 	}
 	elsif ($offnum == 14)
+	{
+		# Corrupt infomask to claim there are no external attributes, which conflicts
+		# with column 'c' which is toasted
+		$tup->{t_infomask} &= ~HEAP_HASEXTERNAL;
+		$header = header(0, $offnum, 2);
+		push @expected,
+			qr/${header}toast value ID \d+ is external but tuple header flag HEAP_HASEXTERNAL not set/;
+	}
+	elsif ($offnum == 15)
 	{
 		# Set both HEAP_XMAX_COMMITTED and HEAP_XMAX_IS_MULTI
 		$tup->{t_infomask} |= HEAP_XMAX_COMMITTED;
@@ -501,7 +518,7 @@ for (my $tupidx = 0; $tupidx < ROWCOUNT; $tupidx++)
 		push @expected,
 			qr/${header}multitransaction ID 4 equals or exceeds next valid multitransaction ID 1/;
 	}
-	elsif ($offnum == 15)	# Last offnum must equal ROWCOUNT
+	elsif ($offnum == 16)
 	{
 		# Set both HEAP_XMAX_COMMITTED and HEAP_XMAX_IS_MULTI
 		$tup->{t_infomask} |= HEAP_XMAX_COMMITTED;
@@ -511,6 +528,47 @@ for (my $tupidx = 0; $tupidx < ROWCOUNT; $tupidx++)
 		push @expected,
 			qr/${header}multitransaction ID 4000000000 precedes relation minimum multitransaction ID threshold 1/;
 	}
+	elsif ($offnum == 17)
+	{
+		# Corrupt column c's toast pointer va_vartag field
+		$tup->{c_va_vartag} = 42;
+		$header = header(0, $offnum, 2);
+		push @expected,
+			qr/$header/,
+			qr/toasted attribute has unexpected TOAST tag 42/;
+	}
+	elsif ($offnum == 18)
+	{
+		# Corrupt column c's toast pointer va_extinfo field
+		$tup->{c_va_extinfo} = 7654321;
+		$header = header(0, $offnum, 2);
+		push @expected,
+			qr/$header/,
+			qr/toast value ID \d+ external size 7654321 exceeds maximum expected for rawsize 10004/,
+			qr/toast value ID \d+ chunk size \d+ differs from the expected size \d+/,
+			qr/toast value ID \d+ final chunk number \d+ differs from expected value \d+/;
+	}
+	elsif ($offnum == 19)
+	{
+		# Corrupt column c's toast pointer va_valueid field.  We have not
+		# consumed enough oids for any valueid in the toast table to be large.
+		# Use a large oid for the corruption to avoid colliding with an
+		# existent entry in the toast.
+		my $corrupt = $tup->{c_va_valueid} + 100000000;
+		$tup->{c_va_valueid} = $corrupt;
+		$header = header(0, $offnum, 2);
+		push @expected,
+			qr/${header}/,
+			qr/toasted value ID \d+ missing from toast table/;
+	}
+	elsif ($offnum == 20)	# Last offnum must less than or equal to ROWCOUNT-1
+	{
+		# Corrupt column c's toast pointer va_toastrelid field
+		my $otherid = $toastrelid + 1;
+		$tup->{c_va_toastrelid} = $otherid;
+		push @expected,
+			qr/toast value ID \d+ toast relation oid $otherid differs from expected oid $toastrelid/;
+	}
 	write_tuple($file, $offset, $tup);
 }
 close($file)
-- 
2.21.1 (Apple Git-122.3)

