From bcbabe645ce807ce115cf394f6badc7eeff45a82 Mon Sep 17 00:00:00 2001
From: Mark Dilger <mark.dilger@enterprisedb.com>
Date: Mon, 20 Apr 2020 08:05:58 -0700
Subject: [PATCH v3] Adding verify_heapam to amcheck contrib module.

Adding a new function for checking a heap relation and associated
toast relation, if any, for corruption.

The postgres backend already defends against certain forms of
corruption, by checking the page header of each page before allowing
it into the page cache, and by checking the page checksum, if
enabled.  Experience shows that broken or ill-conceived backup and
restore mechanisms can result in a page, or an entire file, being
overwritten with an earlier version of itself, restored from backup.
Pages thus overwritten will appear to have valid page headers and
checksums, while potentially containing xmin, xmax, and toast
pointers that are invalid.

contrib/amcheck now has a function, verify_heapam, that takes a
regclass argument, scans the given heap relation, and returns rows
containing information about corruption found within the table.  The
main focus of the scan is to find invalid xmin, xmax, and toast
pointer values.  It also checks for structural corruption within the
page (such as invalid t_hoff values) that could lead to the backend
aborting should the function blindly trust the data as it finds it.
A second boolean argument, stop_on_error, can be used to return
after the first corrupt page is detected.
---
 contrib/amcheck/Makefile                      |   7 +-
 contrib/amcheck/amcheck--1.2--1.3.sql         |  28 +
 contrib/amcheck/amcheck.control               |   2 +-
 .../amcheck/expected/disallowed_reltypes.out  |  27 +
 contrib/amcheck/sql/disallowed_reltypes.sql   |  29 +
 contrib/amcheck/t/verify_heapam.pl            | 387 +++++++
 contrib/amcheck/verify_heapam.c               | 966 ++++++++++++++++++
 contrib/heapcheck/.gitignore                  |   4 +
 contrib/heapcheck/Makefile                    |  25 +
 .../expected/001_create_extension.out         |   1 +
 contrib/heapcheck/heapcheck--1.0.sql          |  21 +
 contrib/heapcheck/heapcheck.control           |   5 +
 .../heapcheck/sql/001_create_extension.sql    |   1 +
 doc/src/sgml/amcheck.sgml                     | 102 ++
 14 files changed, 1602 insertions(+), 3 deletions(-)
 create mode 100644 contrib/amcheck/amcheck--1.2--1.3.sql
 create mode 100644 contrib/amcheck/expected/disallowed_reltypes.out
 create mode 100644 contrib/amcheck/sql/disallowed_reltypes.sql
 create mode 100644 contrib/amcheck/t/verify_heapam.pl
 create mode 100644 contrib/amcheck/verify_heapam.c
 create mode 100644 contrib/heapcheck/.gitignore
 create mode 100644 contrib/heapcheck/Makefile
 create mode 100644 contrib/heapcheck/expected/001_create_extension.out
 create mode 100644 contrib/heapcheck/heapcheck--1.0.sql
 create mode 100644 contrib/heapcheck/heapcheck.control
 create mode 100644 contrib/heapcheck/sql/001_create_extension.sql

diff --git a/contrib/amcheck/Makefile b/contrib/amcheck/Makefile
index a2b1b1036b..410f0a76ad 100644
--- a/contrib/amcheck/Makefile
+++ b/contrib/amcheck/Makefile
@@ -3,13 +3,16 @@
 MODULE_big	= amcheck
 OBJS = \
 	$(WIN32RES) \
+	verify_heapam.o \
 	verify_nbtree.o
 
 EXTENSION = amcheck
-DATA = amcheck--1.1--1.2.sql amcheck--1.0--1.1.sql amcheck--1.0.sql
+DATA = amcheck--1.2--1.3.sql amcheck--1.1--1.2.sql amcheck--1.0--1.1.sql amcheck--1.0.sql
 PGFILEDESC = "amcheck - function for verifying relation integrity"
 
-REGRESS = check check_btree
+REGRESS = check check_btree disallowed_reltypes
+
+TAP_TESTS = 1
 
 ifdef USE_PGXS
 PG_CONFIG = pg_config
diff --git a/contrib/amcheck/amcheck--1.2--1.3.sql b/contrib/amcheck/amcheck--1.2--1.3.sql
new file mode 100644
index 0000000000..f685ccd868
--- /dev/null
+++ b/contrib/amcheck/amcheck--1.2--1.3.sql
@@ -0,0 +1,28 @@
+/* contrib/amcheck/amcheck--1.2--1.3.sql */
+
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "ALTER EXTENSION amcheck UPDATE TO '1.3'" to load this file. \quit
+
+-- In order to avoid issues with dependencies when updating amcheck to 1.3,
+-- create new, overloaded version of the 1.2 function signature
+
+--
+-- verify_heapam()
+--
+CREATE FUNCTION verify_heapam(regclass,
+							  boolean,
+							  blkno OUT bigint,
+							  offnum OUT integer,
+							  lp_off OUT smallint,
+							  lp_flags OUT smallint,
+							  lp_len OUT smallint,
+							  attnum OUT integer,
+							  chunk OUT integer,
+							  msg OUT text
+							  )
+RETURNS SETOF record
+AS 'MODULE_PATHNAME', 'verify_heapam'
+LANGUAGE C STRICT;
+
+-- Don't want this to be available to public
+REVOKE ALL ON FUNCTION verify_heapam(regclass, boolean) FROM PUBLIC;
diff --git a/contrib/amcheck/amcheck.control b/contrib/amcheck/amcheck.control
index c6e310046d..ab50931f75 100644
--- a/contrib/amcheck/amcheck.control
+++ b/contrib/amcheck/amcheck.control
@@ -1,5 +1,5 @@
 # amcheck extension
 comment = 'functions for verifying relation integrity'
-default_version = '1.2'
+default_version = '1.3'
 module_pathname = '$libdir/amcheck'
 relocatable = true
diff --git a/contrib/amcheck/expected/disallowed_reltypes.out b/contrib/amcheck/expected/disallowed_reltypes.out
new file mode 100644
index 0000000000..1829320a2f
--- /dev/null
+++ b/contrib/amcheck/expected/disallowed_reltypes.out
@@ -0,0 +1,27 @@
+--
+-- check that using the module's functions with unsupported relations will fail
+--
+-- partitioned tables (the parent ones) don't have visibility maps
+create table test_partitioned (a int, b text default repeat('x', 5000)) partition by list (a);
+-- these should all fail
+select * from verify_heapam('test_partitioned', false);
+ERROR:  "test_partitioned" is not a table, materialized view, or TOAST table
+create table test_partition partition of test_partitioned for values in (1);
+create index test_index on test_partition (a);
+-- indexes do not, so these all fail
+select * from verify_heapam('test_index', false);
+ERROR:  "test_index" is not a table, materialized view, or TOAST table
+create view test_view as select 1;
+-- views do not have vms, so these all fail
+select * from verify_heapam('test_view', false);
+ERROR:  "test_view" is not a table, materialized view, or TOAST table
+create sequence test_sequence;
+-- sequences do not have vms, so these all fail
+select * from verify_heapam('test_sequence', false);
+ERROR:  "test_sequence" is not a table, materialized view, or TOAST table
+create foreign data wrapper dummy;
+create server dummy_server foreign data wrapper dummy;
+create foreign table test_foreign_table () server dummy_server;
+-- foreign tables do not have vms, so these all fail
+select * from verify_heapam('test_foreign_table', false);
+ERROR:  "test_foreign_table" is not a table, materialized view, or TOAST table
diff --git a/contrib/amcheck/sql/disallowed_reltypes.sql b/contrib/amcheck/sql/disallowed_reltypes.sql
new file mode 100644
index 0000000000..c923e54b6f
--- /dev/null
+++ b/contrib/amcheck/sql/disallowed_reltypes.sql
@@ -0,0 +1,29 @@
+--
+-- check that using the module's functions with unsupported relations will fail
+--
+
+-- partitioned tables (the parent ones) don't have visibility maps
+create table test_partitioned (a int, b text default repeat('x', 5000)) partition by list (a);
+-- these should all fail
+select * from verify_heapam('test_partitioned', false);
+
+create table test_partition partition of test_partitioned for values in (1);
+create index test_index on test_partition (a);
+-- indexes do not, so these all fail
+select * from verify_heapam('test_index', false);
+
+create view test_view as select 1;
+-- views do not have vms, so these all fail
+select * from verify_heapam('test_view', false);
+
+create sequence test_sequence;
+-- sequences do not have vms, so these all fail
+select * from verify_heapam('test_sequence', false);
+
+create foreign data wrapper dummy;
+create server dummy_server foreign data wrapper dummy;
+create foreign table test_foreign_table () server dummy_server;
+-- foreign tables do not have vms, so these all fail
+select * from verify_heapam('test_foreign_table', false);
+
+
diff --git a/contrib/amcheck/t/verify_heapam.pl b/contrib/amcheck/t/verify_heapam.pl
new file mode 100644
index 0000000000..65be5963ec
--- /dev/null
+++ b/contrib/amcheck/t/verify_heapam.pl
@@ -0,0 +1,387 @@
+use strict;
+use warnings;
+
+use PostgresNode;
+use TestLib;
+
+use Test::More;
+
+# This regression test demonstrates that the verify_heapam() function
+# supplied with this contrib module correctly identifies specific kinds of
+# corruption within pages.  To test this, we need a mechanism to create corrupt
+# pages with predictable, repeatable corruption.  The postgres backend cannot be
+# expected to help us with this, as its design is not consistent with the goal
+# of intentionally corrupting pages.
+#
+# Instead, we create a table to corrupt, and with careful consideration of how
+# postgresql lays out heap pages, we seek to offsets within the page and
+# overwrite deliberately chosen bytes with specific values calculated to
+# corrupt the page in expected ways.  We then verify that verify_heapam
+# reports the corruption, and that it runs without crashing.  Note that the
+# backend cannot simply be started to run queries against the corrupt table, as
+# the backend will crash, at least for some of the corruption types we
+# generate.
+#
+# Autovacuum potentially touching the table in the background makes the exact
+# behavior of this test harder to reason about.  We turn it off to keep things
+# simpler.  We use a "belt and suspenders" approach, turning it off for the
+# system generally in postgresql.conf, and turning it off specifically for the
+# test table.
+#
+# This test depends on the table being written to the heap file exactly as we
+# expect it to be, so we take care to arrange the columns of the table, and
+# insert rows of the table, that give predictable sizes and locations within
+# the table page.
+#
+# The HeapTupleHeaderData has 23 bytes of fixed size fields before the variable
+# length t_bits[] array.  We have exactly 3 columns in the table, so natts = 3,
+# t_bits is 1 byte long, and t_hoff = MAXALIGN(23 + 1) = 24.
+#
+# We're not too fussy about which datatypes we use for the test, but we do care
+# about some specific properties.  We'd like to test both fixed size and
+# varlena types.  We'd like some varlena data inline and some toasted.  And
+# we'd like the layout of the table such that the datums land at predictable
+# offsets within the tuple.  We choose a structure without padding on all
+# supported architectures:
+#
+# 	a BIGINT
+#	b TEXT
+#	c TEXT
+#
+# We always insert a 7-ascii character string into field 'b', which with a
+# 1-byte varlena header gives an 8 byte inline value.  We always insert a long
+# text string in field 'c', long enough to force toast storage.
+#
+# This formatting produces heap pages where each tuple is 58 bytes long, padded
+# out to 64 bytes for alignment, with the first one on the page starting at
+# offset 8128, as follows:
+#
+#    [ lp_off: 8128 lp_len:   58]
+#    [ lp_off: 8064 lp_len:   58]
+#    [ lp_off: 8000 lp_len:   58]
+#    [ lp_off: 7936 lp_len:   58]
+#    [ lp_off: 7872 lp_len:   58]
+#    [ lp_off: 7808 lp_len:   58]
+#               ...
+#
+
+use constant LP_OFF_BEGIN => 8128;
+use constant LP_OFF_DELTA => 64;
+
+# We choose to read and write binary copies of our table's tuples, using perl's
+# pack() and unpack() functions.  Perl uses a packing code system in which:
+#
+#	L = "Unsigned 32-bit Long",
+#	S = "Unsigned 16-bit Short",
+#	C = "Unsigned 8-bit Octet",
+#	c = "signed 8-bit octet",
+#	q = "signed 64-bit quadword"
+#	
+# Each tuple in our table has a layout as follows:
+#
+#    xx xx xx xx            t_xmin: xxxx		offset = 0		L
+#    xx xx xx xx            t_xmax: xxxx		offset = 4		L
+#    xx xx xx xx          t_field3: xxxx		offset = 8		L
+#    xx xx                   bi_hi: xx			offset = 12		S
+#    xx xx                   bi_lo: xx			offset = 14		S
+#    xx xx                ip_posid: xx			offset = 16		S
+#    xx xx             t_infomask2: xx			offset = 18		S
+#    xx xx              t_infomask: xx			offset = 20		S
+#    xx                     t_hoff: x			offset = 22		C
+#    xx                     t_bits: x			offset = 23		C
+#    xx xx xx xx xx xx xx xx   'a': xxxxxxxx	offset = 24		q
+#    xx xx xx xx xx xx xx xx   'b': xxxxxxxx	offset = 32		Cccccccc
+#    xx xx xx xx xx xx xx xx   'c': xxxxxxxx	offset = 40		SSSS
+#    xx xx xx xx xx xx xx xx      : xxxxxxxx	 ...continued	SSSS
+#    xx xx                        : xx      	 ...continued	S
+#	
+# We could choose to read and write columns 'b' and 'c' in other ways, but
+# it is convenient enough to do it this way.  We define packing code
+# constants here, where they can be compared easily against the layout.
+
+use constant HEAPTUPLE_PACK_CODE => 'LLLSSSSSCCqCcccccccSSSSSSSSS';
+use constant HEAPTUPLE_PACK_LENGTH => 58;     # Total size
+
+# Read a tuple of our table from a heap page.
+#
+# Takes an open filehandle to the heap file, and the offset of the tuple.
+#
+# Rather than returning the binary data from the file, unpacks the data into a
+# perl hash with named fields.  These fields exactly match the ones understood
+# by write_tuple(), below.  Returns a reference to this hash.
+#
+sub read_tuple ($$)
+{
+	my ($fh, $offset) = @_;
+	my ($buffer, %tup);
+	seek($fh, $offset, 0);
+	sysread($fh, $buffer, HEAPTUPLE_PACK_LENGTH);
+	
+	@_ = unpack(HEAPTUPLE_PACK_CODE, $buffer);
+	%tup = (t_xmin => shift,
+			t_xmax => shift,
+			t_field3 => shift,
+			bi_hi => shift,
+			bi_lo => shift,
+			ip_posid => shift,
+			t_infomask2 => shift,
+			t_infomask => shift,
+			t_hoff => shift,
+			t_bits => shift,
+			a => shift,
+			b_header => shift,
+			b_body1 => shift,
+			b_body2 => shift,
+			b_body3 => shift,
+			b_body4 => shift,
+			b_body5 => shift,
+			b_body6 => shift,
+			b_body7 => shift,
+			c1 => shift,
+			c2 => shift,
+			c3 => shift,
+			c4 => shift,
+			c5 => shift,
+			c6 => shift,
+			c7 => shift,
+			c8 => shift,
+			c9 => shift);
+	# Stitch together the text for column 'b'
+	$tup{b} = join('', map { chr($tup{"b_body$_"}) } (1..7));
+	return \%tup;
+}
+
+# Write a tuple of our table to a heap page.
+#
+# Takes an open filehandle to the heap file, the offset of the tuple, and a
+# reference to a hash with the tuple values, as returned by read_tuple().
+# Writes the tuple fields from the hash into the heap file.
+#
+# The purpose of this function is to write a tuple back to disk with some
+# subset of fields modified.  The function does no error checking.  Use
+# cautiously.
+#
+sub write_tuple($$$)
+{
+	my ($fh, $offset, $tup) = @_;
+	my $buffer = pack(HEAPTUPLE_PACK_CODE,
+					$tup->{t_xmin},
+					$tup->{t_xmax},
+					$tup->{t_field3},
+					$tup->{bi_hi},
+					$tup->{bi_lo},
+					$tup->{ip_posid},
+					$tup->{t_infomask2},
+					$tup->{t_infomask},
+					$tup->{t_hoff},
+					$tup->{t_bits},
+					$tup->{a},
+					$tup->{b_header},
+					$tup->{b_body1},
+					$tup->{b_body2},
+					$tup->{b_body3},
+					$tup->{b_body4},
+					$tup->{b_body5},
+					$tup->{b_body6},
+					$tup->{b_body7},
+					$tup->{c1},
+					$tup->{c2},
+					$tup->{c3},
+					$tup->{c4},
+					$tup->{c5},
+					$tup->{c6},
+					$tup->{c7},
+					$tup->{c8},
+					$tup->{c9});
+	seek($fh, $offset, 0);
+	syswrite($fh, $buffer, HEAPTUPLE_PACK_LENGTH);
+	return;
+}
+
+# Set umask so test directories and files are created with default permissions
+umask(0077);
+
+my ($result, $node);
+
+# Set up the node and test table.
+$node = get_new_node('test');
+$node->init;
+$node->append_conf('postgresql.conf', 'autovacuum=off');
+$node->start;
+my $pgdata = $node->data_dir;
+$node->safe_psql('postgres', "CREATE EXTENSION amcheck");
+
+$node->safe_psql(
+	'postgres', qq(
+		CREATE TABLE public.test (a BIGINT, b TEXT, c TEXT);
+		ALTER TABLE public.test SET (autovacuum_enabled=false);
+		ALTER TABLE public.test ALTER COLUMN c SET STORAGE EXTERNAL;
+	));
+
+$result = $node->safe_psql('postgres', q(SHOW block_size));
+if ($result != 8192)
+{
+	plan skip_all => 'Only default 8192 byte block size supported by this test';
+	$node->teardown_node;
+	$node->clean_node;
+	exit;
+}
+
+my $rel = $node->safe_psql('postgres', qq(SELECT pg_relation_filepath('public.test')));
+my $relpath = "$pgdata/$rel";
+
+use constant ROWCOUNT => 12;
+$node->safe_psql('postgres', qq(
+	INSERT INTO public.test (a, b, c)
+		VALUES (
+			12345678,
+			'abcdefg',
+			repeat('w', 10000)
+		);
+	VACUUM FREEZE public.test
+	)) for (1..ROWCOUNT);
+
+my $relfrozenxid = $node->safe_psql('postgres',
+	q(select relfrozenxid from pg_class where relname = 'test'));
+
+$node->stop;
+
+# Some #define constants from access/htup_details.h for use while corrupting.
+use constant HEAP_HASNULL            => 0x0001;
+use constant HEAP_XMIN_COMMITTED     => 0x0100;
+use constant HEAP_XMIN_INVALID       => 0x0200;
+use constant HEAP_XMAX_INVALID       => 0x0800;
+use constant HEAP_NATTS_MASK         => 0x07FF;
+
+# Corrupt the tuples, one type of corruption per tuple.  Some types of
+# corruption cause verify_heapam to skip to the next tuple without
+# performing any remaining checks, so we can't exercise the system properly if
+# we focus all our corruption on a single tuple.
+#
+# If we (this regression test) are being run on a system with different alignment
+# our offsets into the page may be wrong.  Rather than automatically configuring
+# for different alignment sizes, we just skip the test if the aligments aren't
+# what we expect.
+#
+my $file;
+open($file, '+<', $relpath);
+binmode $file;
+
+for (my $offset = LP_OFF_BEGIN, my $tupidx = 0; $tupidx < ROWCOUNT; $tupidx++, $offset -= LP_OFF_DELTA)
+{
+	my $tup = read_tuple($file, $offset);
+
+	# Verify the data appears to be where we would expect on the page.  If alignment
+	# issues have caused data to be placed elsewhere, we should be able to tell.
+	if ($tup->{a} ne '12345678' || $tup->{b} ne 'abcdefg')
+	{
+		plan skip_all => 'Page layout differs from our expectations';
+		$node->clean_node;
+		exit;
+	}
+
+	if ($tupidx == 0)
+	{
+		# Corruptly set xmin < relfrozenxid
+		$tup->{t_xmin} = 3;
+		$tup->{t_infomask} &= ~HEAP_XMIN_COMMITTED;
+		$tup->{t_infomask} &= ~HEAP_XMIN_INVALID;
+	}
+	elsif ($tupidx == 1)
+	{
+		# Corruptly set xmin < relfrozenxid, further back
+		$tup->{t_xmin} = 4026531839;		# Note circularity of xid comparison
+		$tup->{t_infomask} &= ~HEAP_XMIN_COMMITTED;
+		$tup->{t_infomask} &= ~HEAP_XMIN_INVALID;
+	}
+	elsif ($tupidx == 2)
+	{
+		# Corruptly set xmax < relminmxid;
+		$tup->{t_xmax} = 4026531839;		# Note circularity of xid comparison
+		$tup->{t_infomask} &= ~HEAP_XMAX_INVALID;
+	}
+	elsif ($tupidx == 3)
+	{
+		# Corrupt the tuple t_hoff, but keep it aligned properly
+		$tup->{t_hoff} += 128;
+	}
+	elsif ($tupidx == 4)
+	{
+		# Corrupt the tuple t_hoff, wrong alignment
+		$tup->{t_hoff} += 3;
+	}
+	elsif ($tupidx == 5)
+	{
+		# Corrupt the tuple t_hoff, underflow but correct alignment
+		$tup->{t_hoff} -= 8;
+	}
+	elsif ($tupidx == 6)
+	{
+		# Corrupt the tuple t_hoff, underflow and wrong alignment
+		$tup->{t_hoff} -= 3;
+	}
+	elsif ($tupidx == 7)
+	{
+		# Corrupt the tuple to look like it has lots of attributes, not just 3
+		$tup->{t_infomask2} |= HEAP_NATTS_MASK;
+	}
+	elsif ($tupidx == 8)
+	{
+		# Corrupt the tuple to look like it has lots of attributes, some of
+		# them null.  This falsely creates the impression that the t_bits
+		# array is longer than just one byte, but t_hoff still says otherwise.
+		$tup->{t_infomask} |= HEAP_HASNULL;
+		$tup->{t_infomask2} |= HEAP_NATTS_MASK;
+		$tup->{t_bits} = 0xAA;
+	}
+	elsif ($tupidx == 9)
+	{
+		# Same as above, but this time t_hoff plays along
+		$tup->{t_infomask} |= HEAP_HASNULL;
+		$tup->{t_infomask2} |= (HEAP_NATTS_MASK & 0x40);
+		$tup->{t_bits} = 0xAA;
+		$tup->{t_hoff} = 32;
+	}
+	elsif ($tupidx == 10)
+	{
+		# Corrupt the bits in column 'b' 1-byte varlena header
+		$tup->{b_header} = 0x80;
+	}
+	elsif ($tupidx == 11)
+	{
+		# Corrupt the bits in column 'c' toast pointer
+		$tup->{c6} = 41;
+		$tup->{c7} = 41;
+	}
+	write_tuple($file, $offset, $tup);
+}
+close($file);
+
+
+
+# Run verify_heapam on the corrupted file
+$node->start;
+
+plan tests => 1;
+
+$result = $node->safe_psql('postgres', q(SELECT * FROM verify_heapam('test', false)));
+is ($result,
+"0|1|8128|1|58|||tuple xmin = 3 precedes relation relfrozenxid = $relfrozenxid
+0|2|8064|1|58|||tuple xmin = 4026531839 precedes relation relfrozenxid = $relfrozenxid
+0|3|8000|1|58|||tuple xmax = 4026531839 precedes relation relfrozenxid = $relfrozenxid
+0|4|7936|1|58|||t_hoff > lp_len (152 > 58)
+0|5|7872|1|58|||t_hoff not max-aligned (27)
+0|6|7808|1|58|||t_hoff < SizeofHeapTupleHeader (16 < 23)
+0|7|7744|1|58|||t_hoff < SizeofHeapTupleHeader (21 < 23)
+0|7|7744|1|58|||t_hoff not max-aligned (21)
+0|8|7680|1|58|||relation natts < tuple natts (3 < 2047)
+0|9|7616|1|58|||SizeofHeapTupleHeader + BITMAPLEN(natts) > t_hoff (23 + 256 > 24)
+0|10|7552|1|58|||relation natts < tuple natts (3 < 67)
+0|11|7488|1|58|2||t_hoff + offset > lp_len (24 + 416847976 > 58)
+0|12|7424|1|58|2|0|final chunk number differs from expected (0 vs. 6)
+0|12|7424|1|58|2|0|toasted value missing from toast table",
+"Expected verify_heapam output");
+
+$node->teardown_node;
+$node->clean_node;
+
diff --git a/contrib/amcheck/verify_heapam.c b/contrib/amcheck/verify_heapam.c
new file mode 100644
index 0000000000..5d547f2ff9
--- /dev/null
+++ b/contrib/amcheck/verify_heapam.c
@@ -0,0 +1,966 @@
+/*-------------------------------------------------------------------------
+ *
+ * verify_heapam.c
+ *	  Functions to check postgresql heap relations for corruption
+ *
+ * Copyright (c) 2016-2020, PostgreSQL Global Development Group
+ *
+ *	  contrib/amcheck/verify_heapam.c
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/detoast.h"
+#include "access/genam.h"
+#include "access/heapam.h"
+#include "access/heaptoast.h"
+#include "access/htup_details.h"
+#include "access/multixact.h"
+#include "access/toast_internals.h"
+#include "access/visibilitymap.h"
+#include "access/xact.h"
+#include "catalog/pg_am.h"
+#include "catalog/pg_type.h"
+#include "catalog/storage_xlog.h"
+#include "funcapi.h"
+#include "miscadmin.h"
+#include "storage/bufmgr.h"
+#include "storage/procarray.h"
+#include "storage/smgr.h"
+#include "utils/builtins.h"
+#include "utils/fmgroids.h"
+#include "utils/lsyscache.h"
+#include "utils/rel.h"
+#include "utils/snapmgr.h"
+#include "utils/syscache.h"
+
+PG_FUNCTION_INFO_V1(verify_heapam);
+
+/*
+ * Struct holding the running context information during
+ * a lifetime of a verify_heapam() execution.
+ */
+typedef struct HeapCheckContext
+{
+	TransactionId nextKnownValidXid;
+	TransactionId oldestValidXid;
+
+	/* Values concerning the heap relation being checked */
+	Relation	rel;
+	TransactionId relfrozenxid;
+	TransactionId relminmxid;
+	Relation	toastrel;
+	Relation   *toast_indexes;
+	Relation	valid_toast_index;
+	int			num_toast_indexes;
+
+	/* Values for iterating over pages in the relation */
+	BlockNumber nblocks;
+	BlockNumber blkno;
+	BufferAccessStrategy bstrategy;
+	Buffer		buffer;
+	Page		page;
+
+	/* Values for iterating over tuples within a page */
+	OffsetNumber offnum;
+	ItemId		itemid;
+	uint16		lp_len;
+	HeapTupleHeader tuphdr;
+	int			natts;
+
+	/* Values for iterating over attributes within the tuple */
+	uint32		offset;			/* offset in tuple data */
+	AttrNumber	attnum;
+
+	/* Values for iterating over toast for the attribute */
+	int32		chunkno;
+	int32		attrsize;
+	int32		endchunk;
+	int32		totalchunks;
+
+	/* Values for returning tuples */
+	bool		is_corrupt;		/* have we encountered any corruption? */
+	TupleDesc	tupdesc;
+	Tuplestorestate *tupstore;
+}			HeapCheckContext;
+
+/* Public API */
+Datum		verify_heapam(PG_FUNCTION_ARGS);
+
+/* Internal implementation */
+static void check_relation_relkind_and_relam(Relation rel);
+
+static void confess(HeapCheckContext * ctx, char *msg);
+static TupleDesc verify_heapam_tupdesc(void);
+
+static bool TransactionIdValidInRel(TransactionId xid, HeapCheckContext * ctx);
+static bool check_tuphdr_xids(HeapTupleHeader tuphdr, HeapCheckContext * ctx);
+static void check_toast_tuple(HeapTuple toasttup, HeapCheckContext * ctx);
+static bool check_tuple_attribute(HeapCheckContext * ctx);
+static void check_tuple(HeapCheckContext * ctx);
+
+/*
+ * verify_heapam
+ *
+ *   Scan and report corruption in heap pages or in associated toast relation.
+ */
+Datum
+verify_heapam(PG_FUNCTION_ARGS)
+{
+#define HEAPCHECK_RELATION_COLS 8
+	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+	MemoryContext oldcontext;
+	bool		randomAccess;
+	HeapCheckContext ctx;
+	FullTransactionId nextFullXid;
+
+	Oid                     relid = PG_GETARG_OID(0);
+	bool                    stop_on_error = PG_GETARG_BOOL(1);
+
+	/* check to see if caller supports us returning a tuplestore */
+	if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("set-valued function called in context that cannot accept a set")));
+	if (!(rsinfo->allowedModes & SFRM_Materialize))
+		ereport(ERROR,
+				(errcode(ERRCODE_SYNTAX_ERROR),
+				 errmsg("materialize mode required, but it is not allowed in this context")));
+
+	memset(&ctx, 0, sizeof(HeapCheckContext));
+
+	/* The tupdesc and tuplestore must be created in ecxt_per_query_memory */
+	oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
+	randomAccess = (rsinfo->allowedModes & SFRM_Materialize_Random) != 0;
+	ctx.tupdesc = verify_heapam_tupdesc();
+	ctx.tupstore = tuplestore_begin_heap(randomAccess, false, work_mem);
+	rsinfo->returnMode = SFRM_Materialize;
+	rsinfo->setResult = ctx.tupstore;
+	rsinfo->setDesc = ctx.tupdesc;
+
+	MemoryContextSwitchTo(oldcontext);
+
+	/*
+	 * Open the relation.  We use ShareUpdateExclusive to prevent concurrent
+	 * vacuums from changing the relfrozenxid, relminmxid, or advancing the
+	 * global oldestXid to be newer than those.  This protection saves us from
+	 * having to reacquire the locks and recheck those minimums for every
+	 * tuple, which would be expensive.
+	 */
+	ctx.rel = relation_open(relid, ShareUpdateExclusiveLock);
+	check_relation_relkind_and_relam(ctx.rel);
+
+	/*
+	 * Open the toast relation, if any, also protected from concurrent
+	 * vacuums.
+	 */
+	if (ctx.rel->rd_rel->reltoastrelid)
+	{
+		int			offset;
+
+		/* Main relation has associated toast relation */
+		ctx.toastrel = table_open(ctx.rel->rd_rel->reltoastrelid,
+								  ShareUpdateExclusiveLock);
+		offset = toast_open_indexes(ctx.toastrel,
+									ShareUpdateExclusiveLock,
+									&(ctx.toast_indexes),
+									&(ctx.num_toast_indexes));
+		ctx.valid_toast_index = ctx.toast_indexes[offset];
+	}
+	else
+	{
+		/* Main relation has no associated toast relation */
+		ctx.toast_indexes = NULL;
+		ctx.num_toast_indexes = 0;
+	}
+
+	/*
+	 * Now that we have our relation(s) locked, oldestXid cannot advance
+	 * beyond the oldest valid xid in our table, nor can our relfrozenxid
+	 * advance.  We keep a cached copy of the oldest valid xid that we may
+	 * encounter in the table, which is relfrozenxid if valid, and oldestXid
+	 * otherwise.
+	 */
+	ctx.relfrozenxid = ctx.rel->rd_rel->relfrozenxid;
+	ctx.relminmxid = ctx.rel->rd_rel->relminmxid;
+
+	LWLockAcquire(XidGenLock, LW_SHARED);
+	nextFullXid = ShmemVariableCache->nextFullXid;
+	ctx.oldestValidXid = ShmemVariableCache->oldestXid;
+	LWLockRelease(XidGenLock);
+	ctx.nextKnownValidXid = XidFromFullTransactionId(nextFullXid);
+
+	if (TransactionIdIsNormal(ctx.relfrozenxid) &&
+		TransactionIdPrecedes(ctx.relfrozenxid, ctx.oldestValidXid))
+	{
+		confess(&ctx, psprintf("relfrozenxid %u precedes global "
+										 "oldest valid xid %u ",
+										 ctx.relfrozenxid, ctx.oldestValidXid));
+		PG_RETURN_NULL();
+	}
+
+	if (TransactionIdIsNormal(ctx.relminmxid) &&
+		TransactionIdPrecedes(ctx.relminmxid, ctx.oldestValidXid))
+	{
+		confess(&ctx, psprintf("relfrozenxid %u precedes global "
+										 "oldest valid xid %u ",
+										 ctx.relfrozenxid, ctx.oldestValidXid));
+		PG_RETURN_NULL();
+	}
+
+	if (TransactionIdIsNormal(ctx.relfrozenxid))
+		ctx.oldestValidXid = ctx.relfrozenxid;
+
+	/* check all blocks of the relation */
+	ctx.nblocks = RelationGetNumberOfBlocks(ctx.rel);
+	ctx.bstrategy = GetAccessStrategy(BAS_BULKREAD);
+	ctx.buffer = InvalidBuffer;
+	ctx.page = NULL;
+
+	for (ctx.blkno = 0; ctx.blkno < ctx.nblocks; ctx.blkno++)
+	{
+		OffsetNumber maxoff;
+
+		/* Read and lock the next page. */
+		ctx.buffer = ReadBufferExtended(ctx.rel, MAIN_FORKNUM, ctx.blkno,
+										RBM_NORMAL, ctx.bstrategy);
+		LockBuffer(ctx.buffer, BUFFER_LOCK_SHARE);
+		ctx.page = BufferGetPage(ctx.buffer);
+
+		/* We must unlock the page from the prior iteration, if any */
+		Assert(ctx.blkno == InvalidBlockNumber || ctx.buffer != InvalidBuffer);
+
+		/* We rely on this math property for the first iteration */
+		StaticAssertStmt(InvalidOffsetNumber + 1 == FirstOffsetNumber,
+						 "InvalidOffsetNumber increments to FirstOffsetNumber");
+
+		ctx.offnum = InvalidOffsetNumber;
+		ctx.itemid = NULL;
+		ctx.lp_len = 0;
+		ctx.tuphdr = NULL;
+		ctx.natts = 0;
+
+		/* Perform tuple checks */
+		maxoff = PageGetMaxOffsetNumber(ctx.page);
+		for (ctx.offnum = 0; ctx.offnum <= maxoff;
+			 ctx.offnum = OffsetNumberNext(ctx.offnum))
+		{
+			ctx.itemid = PageGetItemId(ctx.page, ctx.offnum);
+
+			/* Skip over unused/dead/redirected line pointers */
+			if (!ItemIdIsUsed(ctx.itemid) ||
+				ItemIdIsDead(ctx.itemid) ||
+				ItemIdIsRedirected(ctx.itemid))
+				continue;
+
+			/* Set up context information about this next tuple */
+			ctx.lp_len = ItemIdGetLength(ctx.itemid);
+			ctx.tuphdr = (HeapTupleHeader) PageGetItem(ctx.page, ctx.itemid);
+			ctx.natts = HeapTupleHeaderGetNatts(ctx.tuphdr);
+
+			/*
+			 * Reset information about individual attributes and related toast
+			 * values, so they show as NULL in the corruption report if we
+			 * record a corruption before beginning to iterate over the
+			 * attributes.
+			 */
+			ctx.attnum = -1;
+			ctx.chunkno = -1;
+
+			/* Ok, ready to check this next tuple */
+			check_tuple(&ctx);
+		}
+
+		/* clean up */
+		ctx.offnum = InvalidOffsetNumber;
+		ctx.itemid = NULL;
+		ctx.lp_len = 0;
+		UnlockReleaseBuffer(ctx.buffer);
+
+	 	if (stop_on_error && ctx.is_corrupt)
+			break;
+	}
+
+	/* Close the associated toast table and indexes, if any. */
+	if (ctx.rel->rd_rel->reltoastrelid)
+	{
+		toast_close_indexes(ctx.toast_indexes, ctx.num_toast_indexes,
+							ShareUpdateExclusiveLock);
+		table_close(ctx.toastrel, ShareUpdateExclusiveLock);
+	}
+
+	/* Close the main relation */
+	relation_close(ctx.rel, ShareUpdateExclusiveLock);
+
+	PG_RETURN_NULL();
+}
+
+/*
+ * check_relation_relkind_and_relam
+ *
+ *   convenience routine to check that relation is of a supported relkind.
+ */
+static void
+check_relation_relkind_and_relam(Relation rel)
+{
+	if (rel->rd_rel->relkind != RELKIND_RELATION &&
+		rel->rd_rel->relkind != RELKIND_MATVIEW &&
+		rel->rd_rel->relkind != RELKIND_TOASTVALUE)
+		ereport(ERROR,
+				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
+				 errmsg("\"%s\" is not a table, materialized view, "
+						"or TOAST table",
+						RelationGetRelationName(rel))));
+	if (rel->rd_rel->relam != HEAP_TABLE_AM_OID)
+		ereport(ERROR,
+				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
+				 errmsg("\"%s\" is not a heap AM",
+						RelationGetRelationName(rel))));
+}
+
+/*
+ * confess
+ *
+ *   Return a message about corruption, including information
+ *   about where in the relation the corruption was found.
+ *
+ *   The msg argument is pfree'd by this function.
+ */
+static void
+confess(HeapCheckContext * ctx, char *msg)
+{
+	Datum		values[HEAPCHECK_RELATION_COLS];
+	bool		nulls[HEAPCHECK_RELATION_COLS];
+	HeapTuple	tuple;
+	int16		lp_off = ItemIdGetOffset(ctx->itemid);
+	int16		lp_flags = ItemIdGetFlags(ctx->itemid);
+	int16		lp_len = ItemIdGetLength(ctx->itemid);
+
+	MemSet(values, 0, sizeof(values));
+	MemSet(nulls, 0, sizeof(nulls));
+	values[0] = Int64GetDatum(ctx->blkno);
+	values[1] = Int32GetDatum(ctx->offnum);
+	nulls[1] = (ctx->offnum < 0);
+	values[2] = Int16GetDatum(lp_off);
+	nulls[2] = (lp_off < 0);
+	values[3] = Int16GetDatum(lp_flags);
+	nulls[3] = (lp_flags < 0);
+	values[4] = Int16GetDatum(lp_len);
+	nulls[4] = (lp_len < 0);
+	values[5] = Int32GetDatum(ctx->attnum);
+	nulls[5] = (ctx->attnum < 0);
+	values[6] = Int32GetDatum(ctx->chunkno);
+	nulls[6] = (ctx->chunkno < 0);
+	values[7] = CStringGetTextDatum(msg);
+
+	/*
+	 * In principle, there is nothing to prevent a scan over a large, highly
+	 * corrupted table from using workmem worth of memory building up the
+	 * tuplestore.  Don't leak the msg argument memory.
+	 */
+	pfree(msg);
+
+	tuple = heap_form_tuple(ctx->tupdesc, values, nulls);
+	tuplestore_puttuple(ctx->tupstore, tuple);
+	ctx->is_corrupt = true;
+}
+
+/*
+ * Helper function to construct the TupleDesc needed by verify_heapam.
+ */
+static TupleDesc
+verify_heapam_tupdesc(void)
+{
+	TupleDesc	tupdesc;
+	AttrNumber	a = 0;
+
+	tupdesc = CreateTemplateTupleDesc(HEAPCHECK_RELATION_COLS);
+	TupleDescInitEntry(tupdesc, ++a, "blkno", INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, ++a, "offnum", INT4OID, -1, 0);
+	TupleDescInitEntry(tupdesc, ++a, "lp_off", INT2OID, -1, 0);
+	TupleDescInitEntry(tupdesc, ++a, "lp_flags", INT2OID, -1, 0);
+	TupleDescInitEntry(tupdesc, ++a, "lp_len", INT2OID, -1, 0);
+	TupleDescInitEntry(tupdesc, ++a, "attnum", INT4OID, -1, 0);
+	TupleDescInitEntry(tupdesc, ++a, "chunk", INT4OID, -1, 0);
+	TupleDescInitEntry(tupdesc, ++a, "msg", TEXTOID, -1, 0);
+	Assert(a == HEAPCHECK_RELATION_COLS);
+
+	return BlessTupleDesc(tupdesc);
+}
+
+static inline bool
+XidInValidRange(TransactionId xid, HeapCheckContext * ctx)
+{
+	return (TransactionIdPrecedes(ctx->oldestValidXid, xid) &&
+			TransactionIdPrecedes(xid, ctx->nextKnownValidXid));
+}
+
+/*
+ * Given a TransactionId, attempt to interpret it as a valid
+ * FullTransactionId, neither in the future nor overlong in
+ * the past.  Stores the inferred FullTransactionId in *fxid.
+ *
+ * Returns whether the xid is newer than the oldest clog xid.
+ */
+static bool
+TransactionIdValidInRel(TransactionId xid, HeapCheckContext * ctx)
+{
+	/* Quick return for special oids */
+	switch (xid)
+	{
+		case InvalidTransactionId:
+			return false;
+		case BootstrapTransactionId:
+		case FrozenTransactionId:
+			return true;
+	}
+
+	/*
+	 * If this xid is within the last known valid range of xids, then it has
+	 * to be ok.  The oldest valid xid cannot advance, because we have too
+	 * strong a lock on the relation for that, and although the newest valid
+	 * xid may advance, that doesn't invalidate anything from the range we've
+	 * already identified.
+	 */
+	if (XidInValidRange(xid, ctx))
+		return true;
+
+	/* The latest valid xid may have advanced.  Recheck. */
+	ctx->nextKnownValidXid =
+		XidFromFullTransactionId(ReadNextFullTransactionId());
+	if (XidInValidRange(xid, ctx))
+		return true;
+
+	/* No good.  This xid is invalid. */
+	return false;
+}
+
+/*
+ * check_tuphdr_xids
+ *
+ *	Determine whether tuples are visible for verification.  Similar to
+ *  HeapTupleSatisfiesVacuum, but with critical differences.
+ *
+ *  1) Does not touch hint bits.  It seems imprudent to write hint bits
+ *     to a table during a corruption check.
+ *  2) Only makes a boolean determination of whether verification should
+ *     see the tuple, rather than doing extra work for vacuum-related
+ *     categorization.
+ *
+ *  The caller should already have checked that xmin and xmax are not out of
+ *  bounds for the relation.
+ */
+static bool
+check_tuphdr_xids(HeapTupleHeader tuphdr, HeapCheckContext * ctx)
+{
+	uint16		infomask = tuphdr->t_infomask;
+
+	if (!HeapTupleHeaderXminCommitted(tuphdr))
+	{
+		TransactionId raw_xmin = HeapTupleHeaderGetRawXmin(tuphdr);
+
+		if (HeapTupleHeaderXminInvalid(tuphdr))
+		{
+			return false;		/* HEAPTUPLE_DEAD */
+		}
+		/* Used by pre-9.0 binary upgrades */
+		else if (infomask & HEAP_MOVED_OFF ||
+				 infomask & HEAP_MOVED_IN)
+		{
+			TransactionId xvac = HeapTupleHeaderGetXvac(tuphdr);
+
+			if (TransactionIdIsCurrentTransactionId(xvac))
+				return false;	/* HEAPTUPLE_DELETE_IN_PROGRESS */
+			if (TransactionIdIsInProgress(xvac))
+				return false;	/* HEAPTUPLE_DELETE_IN_PROGRESS */
+
+			if (!TransactionIdValidInRel(xvac, ctx))
+			{
+				confess(ctx, psprintf("tuple xvac = %u invalid", xvac));
+				return false;
+			}
+			else if (TransactionIdDidCommit(xvac))
+				return false;	/* HEAPTUPLE_DEAD */
+		}
+		else if (TransactionIdIsCurrentTransactionId(raw_xmin))
+			return false;		/* insert or delete in progress */
+		else if (TransactionIdIsInProgress(raw_xmin))
+			return false;		/* HEAPTUPLE_INSERT_IN_PROGRESS */
+		else if (!TransactionIdDidCommit(raw_xmin))
+		{
+			return false;		/* HEAPTUPLE_DEAD */
+		}
+	}
+
+	if (!(infomask & HEAP_XMAX_INVALID) && !HEAP_XMAX_IS_LOCKED_ONLY(infomask))
+	{
+		if (infomask & HEAP_XMAX_IS_MULTI)
+		{
+			TransactionId xmax = HeapTupleGetUpdateXid(tuphdr);
+
+			/* not LOCKED_ONLY, so it has to have an xmax */
+			if (!TransactionIdIsValid(xmax))
+			{
+				confess(ctx, 
+								  pstrdup("heap tuple with XMAX_IS_MULTI is "
+										 "neither LOCKED_ONLY nor has a "
+										 "valid xmax"));
+				return false;
+			}
+			if (TransactionIdIsInProgress(xmax))
+				return false;	/* HEAPTUPLE_DELETE_IN_PROGRESS */
+
+			else if (TransactionIdDidCommit(xmax))
+			{
+				return false;	/* HEAPTUPLE_RECENTLY_DEAD or HEAPTUPLE_DEAD */
+			}
+			/* Ok, the tuple is live */
+		}
+		else if (!(infomask & HEAP_XMAX_COMMITTED))
+		{
+			if (TransactionIdIsInProgress(HeapTupleHeaderGetRawXmax(tuphdr)))
+				return false;	/* HEAPTUPLE_DELETE_IN_PROGRESS */
+			/* Ok, the tuple is live */
+		}
+		else
+			return false;		/* HEAPTUPLE_RECENTLY_DEAD or HEAPTUPLE_DEAD */
+	}
+	return true;
+}
+
+/*
+ * check_toast_tuple
+ *
+ *   Checks the current toast tuple as tracked in ctx for corruption.  Records
+ *   any corruption found in ctx->corruption.
+ */
+static void
+check_toast_tuple(HeapTuple toasttup, HeapCheckContext * ctx)
+{
+	int32		curchunk;
+	Pointer		chunk;
+	bool		isnull;
+	char	   *chunkdata;
+	int32		chunksize;
+	int32		expected_size;
+
+	/*
+	 * Have a chunk, extract the sequence number and the data
+	 */
+	curchunk = DatumGetInt32(fastgetattr(toasttup, 2,
+										 ctx->toastrel->rd_att, &isnull));
+	if (isnull)
+	{
+		confess(ctx, 
+						  pstrdup("toast chunk sequencenumber is null"));
+		return;
+	}
+	chunk = DatumGetPointer(fastgetattr(toasttup, 3,
+										ctx->toastrel->rd_att, &isnull));
+	if (isnull)
+	{
+		confess(ctx, pstrdup("toast chunk data is null"));
+		return;
+	}
+	if (!VARATT_IS_EXTENDED(chunk))
+	{
+		chunksize = VARSIZE(chunk) - VARHDRSZ;
+		chunkdata = VARDATA(chunk);
+	}
+	else if (VARATT_IS_SHORT(chunk))
+	{
+		/*
+		 * could happen due to heap_form_tuple doing its thing
+		 */
+		chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
+		chunkdata = VARDATA_SHORT(chunk);
+	}
+	else
+	{
+		/* should never happen */
+		confess(ctx, 
+						  pstrdup("toast chunk is neither short nor extended"));
+		return;
+	}
+
+	/*
+	 * Some checks on the data we've found
+	 */
+	if (curchunk != ctx->chunkno)
+	{
+		confess(ctx, psprintf("toast chunk sequence number %u "
+										"not the expected sequence number %u",
+										curchunk, ctx->chunkno));
+		return;
+	}
+	if (curchunk > ctx->endchunk)
+	{
+		confess(ctx, psprintf("toast chunk sequence number %u "
+										"exceeds the end chunk sequence "
+										"number %u",
+										curchunk, ctx->endchunk));
+		return;
+	}
+
+	expected_size = curchunk < ctx->totalchunks - 1 ? TOAST_MAX_CHUNK_SIZE
+		: ctx->attrsize - ((ctx->totalchunks - 1) * TOAST_MAX_CHUNK_SIZE);
+	if (chunksize != expected_size)
+	{
+		confess(ctx, psprintf("chunk size %u differs from "
+										"expected size %u",
+										chunksize, expected_size));
+		return;
+	}
+
+	ctx->chunkno++;
+}
+
+/*
+ * check_tuple_attribute
+ *
+ *   Checks the current attribute as tracked in ctx for corruption.  Records
+ *   any corruption found in ctx->corruption.
+ *
+ *   The caller should have iterated to a tuple via
+ *   tupleAttributeIteration_next.
+ */
+static bool
+check_tuple_attribute(HeapCheckContext * ctx)
+{
+	Datum		attdatum;
+	struct varlena *attr;
+	char	   *tp;				/* pointer to the tuple data */
+	uint16		infomask = ctx->tuphdr->t_infomask;
+	Form_pg_attribute thisatt = TupleDescAttr(RelationGetDescr(ctx->rel),
+											  ctx->attnum);
+
+	tp = (char *) ctx->tuphdr + ctx->tuphdr->t_hoff;
+
+	if (ctx->tuphdr->t_hoff + ctx->offset > ctx->lp_len)
+	{
+		confess(ctx, psprintf("t_hoff + offset > lp_len (%u + %u > %u)",
+										ctx->tuphdr->t_hoff, ctx->offset,
+										ctx->lp_len));
+		return false;
+	}
+
+	/* Skip null values */
+	if (infomask & HEAP_HASNULL && att_isnull(ctx->attnum, ctx->tuphdr->t_bits))
+		return true;
+
+	/* Skip non-varlena values, but update offset first */
+	if (thisatt->attlen != -1)
+	{
+		ctx->offset = att_align_nominal(ctx->offset, thisatt->attalign);
+		ctx->offset = att_addlength_pointer(ctx->offset, thisatt->attlen,
+											tp + ctx->offset);
+		return true;
+	}
+
+	/* Ok, we're looking at a varlena attribute. */
+	ctx->offset = att_align_pointer(ctx->offset, thisatt->attalign, -1,
+									tp + ctx->offset);
+
+	/* Get the (possibly corrupt) varlena datum */
+	attdatum = fetchatt(thisatt, tp + ctx->offset);
+
+	/*
+	 * We have the datum, but we cannot decode it carelessly, as it may still
+	 * be corrupt.
+	 */
+
+	/*
+	 * Check that VARTAG_SIZE won't hit a TrapMacro on a corrupt va_tag before
+	 * risking a call into att_addlength_pointer
+	 */
+	if (VARATT_IS_1B_E(tp + ctx->offset))
+	{
+		uint8		va_tag = va_tag = VARTAG_EXTERNAL(tp + ctx->offset);
+
+		if (va_tag != VARTAG_ONDISK)
+		{
+			confess(ctx, psprintf("unexpected TOAST vartag %u for "
+											"attribute #%u at t_hoff = %u, "
+											"offset = %u",
+											va_tag, ctx->attnum,
+											ctx->tuphdr->t_hoff, ctx->offset));
+			return false;		/* We can't know where the next attribute
+								 * begins */
+		}
+	}
+
+	/* Ok, should be safe now */
+	ctx->offset = att_addlength_pointer(ctx->offset, thisatt->attlen,
+										tp + ctx->offset);
+
+	/*
+	 * heap_deform_tuple would be done with this attribute at this point,
+	 * having stored it in values[], and would continue to the next attribute.
+	 * We go further, because we need to check if the toast datum is corrupt.
+	 */
+
+	attr = (struct varlena *) DatumGetPointer(attdatum);
+
+	/*
+	 * Now we follow the logic of detoast_external_attr(), with the same
+	 * caveats about being paranoid about corruption.
+	 */
+
+	/* Skip values that are not external */
+	if (!VARATT_IS_EXTERNAL(attr))
+		return true;
+
+	/* It is external, and we're looking at a page on disk */
+	if (!VARATT_IS_EXTERNAL_ONDISK(attr))
+	{
+		confess(ctx,
+						  pstrdup("attribute is external but not marked as on disk"));
+		return true;
+	}
+
+	/* The tuple header better claim to contain toasted values */
+	if (!(infomask & HEAP_HASEXTERNAL))
+	{
+		confess(ctx, pstrdup("attribute is external but tuple header "
+								 "flag HEAP_HASEXTERNAL not set"));
+		return true;
+	}
+
+	/* The relation better have a toast table */
+	if (!ctx->rel->rd_rel->reltoastrelid)
+	{
+		confess(ctx, pstrdup("attribute is external but relation has "
+								 "no toast relation"));
+		return true;
+	}
+
+	/*
+	 * Must dereference indirect toast pointers before we can check them
+	 */
+	if (VARATT_IS_EXTERNAL_INDIRECT(attr))
+	{
+		struct varatt_indirect redirect;
+
+		VARATT_EXTERNAL_GET_POINTER(redirect, attr);
+		attr = (struct varlena *) redirect.pointer;
+
+		/* nested indirect Datums aren't allowed */
+		if (VARATT_IS_EXTERNAL_INDIRECT(attr))
+		{
+			confess(ctx, pstrdup("attribute has nested external "
+									 "indirect toast pointer"));
+			return true;
+		}
+	}
+
+	if (VARATT_IS_EXTERNAL_ONDISK(attr))
+	{
+		struct varatt_external toast_pointer;
+		ScanKeyData toastkey;
+		SysScanDesc toastscan;
+		SnapshotData SnapshotToast;
+		HeapTuple	toasttup;
+		bool		found_toasttup;
+
+		/*
+		 * Must copy attr into toast_pointer for alignment considerations
+		 */
+		VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
+
+		ctx->attrsize = toast_pointer.va_extsize;
+		ctx->endchunk = (ctx->attrsize - 1) / TOAST_MAX_CHUNK_SIZE;
+		ctx->totalchunks = ctx->endchunk + 1;
+
+		/*
+		 * Setup a scan key to find chunks in toast table with matching
+		 * va_valueid
+		 */
+		ScanKeyInit(&toastkey,
+					(AttrNumber) 1,
+					BTEqualStrategyNumber, F_OIDEQ,
+					ObjectIdGetDatum(toast_pointer.va_valueid));
+
+		/*
+		 * Check if any chunks for this toasted object exist in the toast
+		 * table, accessible via the index.
+		 */
+		init_toast_snapshot(&SnapshotToast);
+		toastscan = systable_beginscan_ordered(ctx->toastrel,
+											   ctx->valid_toast_index,
+											   &SnapshotToast, 1,
+											   &toastkey);
+		ctx->chunkno = 0;
+
+		found_toasttup = false;
+		while ((toasttup =
+				systable_getnext_ordered(toastscan,
+										 ForwardScanDirection)) != NULL)
+		{
+			found_toasttup = true;
+			check_toast_tuple(toasttup, ctx);
+		}
+		if (ctx->chunkno != (ctx->endchunk + 1))
+			confess(ctx, psprintf("final chunk number differs from "
+											"expected (%u vs. %u)",
+											ctx->chunkno, (ctx->endchunk + 1)));
+		if (!found_toasttup)
+			confess(ctx, pstrdup("toasted value missing from "
+									 "toast table"));
+		systable_endscan_ordered(toastscan);
+	}
+	return true;
+}
+
+/*
+ * check_tuple
+ *
+ *   Checks the current tuple as tracked in ctx for corruption.  Records any
+ *   corruption found in ctx->corruption.
+ */
+static void
+check_tuple(HeapCheckContext * ctx)
+{
+	TransactionId xmin;
+	TransactionId xmax;
+	bool		fatal = false;
+	uint16		infomask = ctx->tuphdr->t_infomask;
+
+	/* Check relminmxid against mxid, if any */
+	xmax = HeapTupleHeaderGetRawXmax(ctx->tuphdr);
+	if (infomask & HEAP_XMAX_IS_MULTI &&
+		MultiXactIdPrecedes(xmax, ctx->relminmxid))
+	{
+		confess(ctx, psprintf("tuple xmax = %u precedes relation "
+										"relminmxid = %u",
+										xmax, ctx->relminmxid));
+		fatal = true;
+	}
+
+	/* Check xmin against relfrozenxid */
+	xmin = HeapTupleHeaderGetXmin(ctx->tuphdr);
+	if (TransactionIdIsNormal(ctx->relfrozenxid) &&
+		TransactionIdIsNormal(xmin))
+	{
+		if (TransactionIdPrecedes(xmin, ctx->relfrozenxid))
+		{
+			confess(ctx, psprintf("tuple xmin = %u precedes relation "
+											"relfrozenxid = %u",
+											xmin, ctx->relfrozenxid));
+			fatal = true;
+		}
+		else if (!TransactionIdValidInRel(xmin, ctx))
+		{
+			confess(ctx, psprintf("tuple xmin = %u is in the future",
+											xmin));
+			fatal = true;
+		}
+	}
+
+	/* Check xmax against relfrozenxid */
+	if (TransactionIdIsNormal(ctx->relfrozenxid) &&
+		TransactionIdIsNormal(xmax))
+	{
+		if (TransactionIdPrecedes(xmax, ctx->relfrozenxid))
+		{
+			confess(ctx, psprintf("tuple xmax = %u precedes relation "
+											"relfrozenxid = %u",
+											xmax, ctx->relfrozenxid));
+			fatal = true;
+		}
+		else if (!TransactionIdValidInRel(xmax, ctx))
+		{
+			confess(ctx, psprintf("tuple xmax = %u is in the future",
+											xmax));
+			fatal = true;
+		}
+	}
+
+	/* Check for tuple header corruption */
+	if (ctx->tuphdr->t_hoff < SizeofHeapTupleHeader)
+	{
+		confess(ctx,
+						  psprintf("t_hoff < SizeofHeapTupleHeader (%u < %u)",
+								   ctx->tuphdr->t_hoff,
+								   (unsigned) SizeofHeapTupleHeader));
+		fatal = true;
+	}
+	if (ctx->tuphdr->t_hoff > ctx->lp_len)
+	{
+		confess(ctx, psprintf("t_hoff > lp_len (%u > %u)",
+										ctx->tuphdr->t_hoff, ctx->lp_len));
+		fatal = true;
+	}
+	if (ctx->tuphdr->t_hoff != MAXALIGN(ctx->tuphdr->t_hoff))
+	{
+		confess(ctx, psprintf("t_hoff not max-aligned (%u)",
+										ctx->tuphdr->t_hoff));
+		fatal = true;
+	}
+
+	/*
+	 * If the tuple has nulls, check that the implied length of the variable
+	 * length nulls bitmap field t_bits does not overflow the allowed space.
+	 * We don't know if the corruption is in the natts field or the infomask
+	 * bit HEAP_HASNULL.
+	 */
+	if (infomask & HEAP_HASNULL &&
+		SizeofHeapTupleHeader + BITMAPLEN(ctx->natts) > ctx->tuphdr->t_hoff)
+	{
+		confess(ctx, psprintf("SizeofHeapTupleHeader + "
+										"BITMAPLEN(natts) > t_hoff "
+										"(%u + %u > %u)",
+										(unsigned) SizeofHeapTupleHeader,
+										BITMAPLEN(ctx->natts),
+										ctx->tuphdr->t_hoff));
+		fatal = true;
+	}
+
+	/*
+	 * Cannot process tuple data if tuple header was corrupt, as the offsets
+	 * within the page cannot be trusted, leaving too much risk of reading
+	 * garbage if we continue.
+	 *
+	 * We also cannot process the tuple if the xmin or xmax were invalid
+	 * relative to relfrozenxid or relminmxid, as clog entries for the xids
+	 * may already be gone.
+	 */
+	if (fatal)
+		return;
+
+	/*
+	 * Skip tuples that are invisible, as we cannot assume the TupleDesc we
+	 * are using is appropriate.
+	 */
+	if (!check_tuphdr_xids(ctx->tuphdr, ctx))
+		return;
+
+	/*
+	 * If we get this far, the tuple is visible to us, so it must not be
+	 * incompatible with our relDesc.  The natts field could be legitimately
+	 * shorter than rel's natts, but it cannot be longer than rel's natts.
+	 */
+	if (RelationGetDescr(ctx->rel)->natts < ctx->natts)
+	{
+		confess(ctx,
+						  psprintf("relation natts < tuple natts (%u < %u)",
+								   RelationGetDescr(ctx->rel)->natts,
+								   ctx->natts));
+		return;
+	}
+
+	/*
+	 * Iterate over the attributes looking for broken toast values. This
+	 * roughly follows the logic of heap_deform_tuple, except that it doesn't
+	 * bother building up isnull[] and values[] arrays, since nobody wants
+	 * them, and it unrolls anything that might trip over an Assert when
+	 * processing corrupt data.
+	 */
+	ctx->offset = 0;
+	for (ctx->attnum = 0; ctx->attnum < ctx->natts; ctx->attnum++)
+	{
+		if (!check_tuple_attribute(ctx))
+			break;
+	}
+	ctx->offset = -1;
+	ctx->attnum = -1;
+}
diff --git a/contrib/heapcheck/.gitignore b/contrib/heapcheck/.gitignore
new file mode 100644
index 0000000000..5dcb3ff972
--- /dev/null
+++ b/contrib/heapcheck/.gitignore
@@ -0,0 +1,4 @@
+# Generated subdirectories
+/log/
+/results/
+/tmp_check/
diff --git a/contrib/heapcheck/Makefile b/contrib/heapcheck/Makefile
new file mode 100644
index 0000000000..8d780a41ab
--- /dev/null
+++ b/contrib/heapcheck/Makefile
@@ -0,0 +1,25 @@
+# contrib/heapcheck/Makefile
+
+MODULE_big = heapcheck
+OBJS = \
+	$(WIN32RES) \
+	heapcheck.o
+
+EXTENSION = heapcheck
+DATA = heapcheck--1.0.sql
+PGFILEDESC = "heapcheck - page corruption information"
+
+REGRESS = 001_create_extension 002_disallowed_reltypes
+
+TAP_TESTS = 1
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = contrib/heapcheck
+top_builddir = ../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/contrib/heapcheck/expected/001_create_extension.out b/contrib/heapcheck/expected/001_create_extension.out
new file mode 100644
index 0000000000..0ca79c22be
--- /dev/null
+++ b/contrib/heapcheck/expected/001_create_extension.out
@@ -0,0 +1 @@
+create extension heapcheck;
diff --git a/contrib/heapcheck/heapcheck--1.0.sql b/contrib/heapcheck/heapcheck--1.0.sql
new file mode 100644
index 0000000000..48251e6781
--- /dev/null
+++ b/contrib/heapcheck/heapcheck--1.0.sql
@@ -0,0 +1,21 @@
+/* contrib/heapcheck/heapcheck--1.0.sql */
+
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "CREATE EXTENSION heapcheck" to load this file. \quit
+
+-- Show visibility map and page-level visibility information for each block.
+CREATE FUNCTION heapcheck_relation(regclass,
+								  blkno OUT bigint,
+								  offnum OUT integer,
+								  lp_off OUT smallint,
+								  lp_flags OUT smallint,
+								  lp_len OUT smallint,
+								  attnum OUT integer,
+								  chunk OUT integer,
+								  msg OUT text
+								  )
+RETURNS SETOF record
+AS 'MODULE_PATHNAME', 'heapcheck_relation'
+LANGUAGE C STRICT;
+REVOKE ALL ON FUNCTION heapcheck_relation(regclass) FROM PUBLIC;
+GRANT EXECUTE ON FUNCTION heapcheck_relation(regclass) TO pg_stat_scan_tables;
diff --git a/contrib/heapcheck/heapcheck.control b/contrib/heapcheck/heapcheck.control
new file mode 100644
index 0000000000..23b076169e
--- /dev/null
+++ b/contrib/heapcheck/heapcheck.control
@@ -0,0 +1,5 @@
+# heapcheck extension
+comment = 'examine relations for corruption'
+default_version = '1.0'
+module_pathname = '$libdir/heapcheck'
+relocatable = true
diff --git a/contrib/heapcheck/sql/001_create_extension.sql b/contrib/heapcheck/sql/001_create_extension.sql
new file mode 100644
index 0000000000..0ca79c22be
--- /dev/null
+++ b/contrib/heapcheck/sql/001_create_extension.sql
@@ -0,0 +1 @@
+create extension heapcheck;
diff --git a/doc/src/sgml/amcheck.sgml b/doc/src/sgml/amcheck.sgml
index 75518a7820..6bf3110bb3 100644
--- a/doc/src/sgml/amcheck.sgml
+++ b/doc/src/sgml/amcheck.sgml
@@ -165,6 +165,108 @@ ORDER BY c.relpages DESC LIMIT 10;
      </para>
     </listitem>
    </varlistentry>
+
+   <varlistentry>
+    <term>
+     <function>
+      verify_heapam(relation regclass,
+                    stop_on_error boolean,
+                    blkno OUT bigint,
+                    offnum OUT integer,
+                    lp_off OUT smallint,
+                    lp_flags OUT smallint,
+                    lp_len OUT smallint,
+                    attnum OUT integer,
+                    chunk OUT integer,
+                    msg OUT text)
+      returns record
+     </function>
+    </term>
+    <listitem>
+     <para>
+      Checks for "logical" corruption, where the page is valid but inconsistent
+      with the rest of the database cluster. This can happen due to faulty or
+      ill-conceived backup and restore tools, or bad storage, or user error, or
+      bugs in the server itself.  It checks xmin and xmax values against
+      relfrozenxid and relminmxid, and also validates TOAST pointers.
+     </para>
+
+     <para>
+      For each block in the relation where corruption is detected, or for just
+      the first block if stop_on_error is true, for each corruption detected,
+      returns one row containing the following fields:
+     </para>
+     <variablelist>
+      <varlistentry>
+       <term>blkno</term>
+       <listitem>
+        <para>
+         The number of the block containing the corrupt page.
+        </para>
+       </listitem>
+      </varlistentry>
+      <varlistentry>
+       <term>offnum</term>
+       <listitem>
+        <para>
+         The OffsetNumber of the corrupt tuple.
+        </para>
+       </listitem>
+      </varlistentry>
+      <varlistentry>
+       <term>lp_off</term>
+       <listitem>
+        <para>
+         The offset into the page of the line pointer for the corrupt tuple.
+        </para>
+       </listitem>
+      </varlistentry>
+      <varlistentry>
+       <term>lp_flags</term>
+       <listitem>
+        <para>
+         The flags in the line pointer for the corrupt tuple.
+        </para>
+       </listitem>
+      </varlistentry>
+      <varlistentry>
+       <term>lp_len</term>
+       <listitem>
+        <para>
+         The length of the corrupt tuple as recorded in the line pointer.
+        </para>
+       </listitem>
+      </varlistentry>
+      <varlistentry>
+       <term>attnum</term>
+       <listitem>
+        <para>
+         The attribute number of the corrupt column in the tuple, if the corruption
+         is specific to a column and not the tuple as a whole.
+        </para>
+       </listitem>
+      </varlistentry>
+      <varlistentry>
+       <term>chunk</term>
+       <listitem>
+        <para>
+         The chunk number of the corrupt toasted attribute, if the corruption
+         is specific to a toasted value.
+        </para>
+       </listitem>
+      </varlistentry>
+      <varlistentry>
+       <term>msg</term>
+       <listitem>
+        <para>
+         A human readable message describing the corruption in the page.
+        </para>
+       </listitem>
+      </varlistentry>
+     </variablelist>
+    </listitem>
+   </varlistentry>
+
   </variablelist>
   <tip>
    <para>
-- 
2.21.1 (Apple Git-122.3)

