diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index f7800f01a6..6855380057 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1332,6 +1332,13 @@ LANGUAGE INTERNAL
 STRICT STABLE PARALLEL SAFE
 AS 'jsonb_path_query_first_tz';
 
+CREATE OR REPLACE FUNCTION pg_check_relation(
+  IN relation regclass DEFAULT NULL::regclass, IN fork text DEFAULT NULL::text,
+  OUT relid oid, OUT forknum integer, OUT failed_blocknum bigint,
+  OUT expected_checksum integer, OUT found_checksum integer)
+  RETURNS SETOF record VOLATILE LANGUAGE internal AS 'pg_check_relation'
+  PARALLEL RESTRICTED;
+
 --
 -- The default permissions for functions mean that anyone can execute them.
 -- A number of functions shouldn't be executable by just anyone, but rather
diff --git a/src/backend/storage/page/checksum.c b/src/backend/storage/page/checksum.c
index 8f7a110e81..8bfaaae740 100644
--- a/src/backend/storage/page/checksum.c
+++ b/src/backend/storage/page/checksum.c
@@ -15,8 +15,530 @@
 
 #include "storage/checksum.h"
 /*
- * The actual code is in storage/checksum_impl.h.  This is done so that
- * external programs can incorporate the checksum code by #include'ing
- * that file from the exported Postgres headers.  (Compare our CRC code.)
+ * The actual checksum computation code is in storage/checksum_impl.h.  This
+ * is done so that external programs can incorporate the checksum code by
+ * #include'ing that file from the exported Postgres headers.  (Compare our
+ * CRC code.)
  */
 #include "storage/checksum_impl.h"
+
+#include "access/heapam.h"
+#include "access/htup_details.h"
+#include "catalog/pg_authid_d.h"
+#include "commands/dbcommands.h"
+#include "funcapi.h"
+#include "miscadmin.h"
+#include "storage/buf_internals.h"
+#include "storage/checksum.h"
+#include "storage/lockdefs.h"
+#include "utils/builtins.h"
+#include "utils/guc.h"
+#include "utils/lsyscache.h"
+#include "utils/rel.h"
+#include "utils/syscache.h"
+
+/*
+ * A zero checksum can never be computed, see pg_checksum_page() */
+#define NoComputedChecksum	0
+
+/* The rest of this module provides a set of functions that can be used to
+ * safely check all checksums on a running cluster.
+ *
+ * Please note that those only perform standard buffered reads, and don't try
+ * to bypass or discard the operating system cache.  If you want to check the
+ * actual storage, you have to discard the operating system cache before
+ * running those functions.
+ *
+ * To avoid torn page and possible false postives when reading data, and
+ * keeping overhead as low as possible, the following heuristics are used:
+ *
+ * - a shared LWLock is taken on the target buffer pool partition mapping, and
+ *   we detect if a block is in shared_buffers or not.  See get_buffer()
+ *   comments for more details about the locking strategy.
+ *
+ * - if a block is dirty in shared_buffers, it's ignored as it'll be flushed to
+ *   disk either before the end of the next checkpoint or during recovery in
+ *   case of unsafe shutdown
+ *
+ * - if a block is otherwise found in shared_buffers, an IO lock is taken on
+ *   the block and the block is then read from storage, ignoring the block in
+ *   shared_buffers
+ *
+ * - if a block is not found in shared_buffers, the LWLock is relased and the
+ *   block is read from disk without taking any lock.  If an error is detected,
+ *   the read block will be discarded and retrieved again while holding the
+ *   LWLock.  This is because an error due to concurrent write is possible but
+ *   very unlikely, so it's better to have an optimistic approach to limit
+ *   locking overhead
+ *
+ * The check can be performed using an SQL function, returning the list of
+ * problematic blocks.
+ *
+ * Vacuum's GUCs are used to avoid consuming too much resources while running
+ * this tool.
+ */
+
+static void check_all_relations(TupleDesc tupdesc, Tuplestorestate *tupstore,
+									  ForkNumber forknum);
+static bool check_buffer(char *buffer, uint32 blkno, uint16 *chk_expected,
+							   uint16 *chk_found);
+static void check_one_relation(TupleDesc tupdesc, Tuplestorestate *tupstore,
+								 Oid relid, ForkNumber single_forknum);
+static void check_relation_fork(TupleDesc tupdesc, Tuplestorestate *tupstore,
+									  Relation relation, ForkNumber forknum);
+static void check_delay_point(void);
+static void check_get_buffer(Relation relation, ForkNumber forknum,
+							 BlockNumber blkno, char *buffer, bool needlock,
+							 bool *checkit, bool *found_in_sb);
+
+/*
+ * Iterate over all relation having a physical storage in the current database
+ * and perform a check for all of them.  Note that this function can run for
+ * a very long time, and as such can cause bloat or other issues.  Client
+ * should iterate over a local list of oid using the per-table mode if
+ * keeping an open transaction for very long can be a concern.
+ */
+static void
+check_all_relations(TupleDesc tupdesc, Tuplestorestate *tupstore,
+						  ForkNumber forknum)
+{
+	List	   *relids = NIL;
+	ListCell   *lc;
+	Relation	relation;
+	TableScanDesc scan;
+	HeapTuple	tuple;
+
+	relation = table_open(RelationRelationId, AccessShareLock);
+	scan = table_beginscan_catalog(relation, 0, NULL);
+
+	while (HeapTupleIsValid(tuple = heap_getnext(scan, ForwardScanDirection)))
+	{
+		Form_pg_class class = (Form_pg_class) GETSTRUCT(tuple);
+
+		if (!RELKIND_HAS_STORAGE(class->relkind))
+			continue;
+
+		relids = lappend_oid(relids, class->oid);
+	}
+
+	table_endscan(scan);
+	table_close(relation, AccessShareLock);	/* release lock */
+
+	foreach(lc, relids)
+	{
+		Oid			relid = lfirst_oid(lc);
+
+		check_one_relation(tupdesc, tupstore, relid, forknum);
+	}
+}
+
+/*
+ * Perform a checksum check on the passed page.  Returns whether the page is
+ * valid or not, and assign the expected and found checksum in chk_expected and
+ * chk_found, respectively.  Note that a page can look like new but could be
+ * the result of a corruption.  We still check for this case, but we can't
+ * compute its checksum as pg_checksum_page() is explictly checking for non-new
+ * pages, so NoComputedChecksum will be set in chk_found.
+ */
+static bool
+check_buffer(char *buffer, uint32 blkno, uint16 *chk_expected,
+				   uint16 *chk_found)
+{
+	PageHeader	hdr = (PageHeader) buffer;
+
+	Assert(chk_expected && chk_found);
+
+	if (PageIsNew(hdr))
+	{
+		if (PageIsVerified(buffer, blkno))
+		{
+			/*
+			 * If the page is really new, there won't by any checksum to be
+			 * computed or expected.
+			 */
+			*chk_expected = *chk_found = NoComputedChecksum;
+			return true;
+		}
+		else
+		{
+			/*
+			 * There's a corruption, but since this affect PageIsNew, we
+			 * can't compute a checksum, so set NoComputedChecksum for the
+			 * expected checksum.
+			 */
+			*chk_expected = NoComputedChecksum;
+			*chk_found = hdr->pd_checksum;
+		}
+		return false;
+	}
+
+	*chk_expected = pg_checksum_page(buffer, blkno);
+	*chk_found = hdr->pd_checksum;
+
+	return (*chk_expected == *chk_found);
+}
+
+/*
+ * Perform the check on a single relation, possibly filtered with a single
+ * fork.  This function will check if the given relation exists or not, as
+ * a relation could be dropped after checking for the list of relations and
+ * before getting here, and we don't want to error out in this case.
+ */
+static void
+check_one_relation(TupleDesc tupdesc, Tuplestorestate *tupstore,
+					 Oid relid, ForkNumber single_forknum)
+{
+	Relation	relation = NULL;
+	ForkNumber	forknum;
+
+	/* Check if the relation (still) exists */
+	if (SearchSysCacheExists1(RELOID, ObjectIdGetDatum(relid)))
+	{
+		/*
+		 * We use a standard relation_open() to acquire the initial lock.  It
+		 * means that this will block until the lock is acquired, or will
+		 * raise an ERROR if lock_timeout has been set.  If caller wants to
+		 * check multiple tables while relying on a maximum wait time, it
+		 * should process tables one by one instead of relying on a global
+		 * processing with the main SRF.
+		 */
+		relation = relation_open(relid, AccessShareLock);
+	}
+
+	if (!RelationIsValid(relation))
+	{
+		elog(WARNING, "relation %u does not exists in database \"%s\"",
+			 relid, get_database_name(MyDatabaseId));
+		return;
+	}
+
+	if (!RELKIND_HAS_STORAGE(relation->rd_rel->relkind))
+		ereport(ERROR,
+				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
+				 errmsg("Relation \"%s\" does not have storage to be checked.",
+				 quote_qualified_identifier(
+					 get_namespace_name(get_rel_namespace(relid)),
+					 get_rel_name(relid)))));
+
+	RelationOpenSmgr(relation);
+
+	for (forknum = 0; forknum <= MAX_FORKNUM; forknum++)
+	{
+		if (single_forknum != InvalidForkNumber && single_forknum != forknum)
+			continue;
+
+		if (smgrexists(relation->rd_smgr, forknum))
+			check_relation_fork(tupdesc, tupstore, relation, forknum);
+	}
+
+	relation_close(relation, AccessShareLock);	/* release the lock */
+}
+
+/*
+ * For a given relation and fork, Do the real work of iterating over all pages
+ * and doing the check.  Caller must hold an AccessShareLock lock on the given
+ * relation.
+ */
+static void
+check_relation_fork(TupleDesc tupdesc, Tuplestorestate *tupstore,
+						  Relation relation, ForkNumber forknum)
+{
+	BlockNumber blkno,
+				nblocks;
+	char		buffer[BLCKSZ];
+	bool		checkit,
+				found_in_sb;
+
+	/*
+	 * We remember the number of blocks here.  Since caller must hold a lock on
+	 * the relation, we know that it won't be truncated while we're iterating
+	 * over the blocks.  Any block added after this function started won't be
+	 * checked, but this is out of scope as such pages will be flushed before
+	 * the next checkpoint's completion.
+	 */
+	nblocks = RelationGetNumberOfBlocksInFork(relation, forknum);
+
+#define CRF_COLS			5	/* Number of output arguments in the SRF */
+	for (blkno = 0; blkno < nblocks; blkno++)
+	{
+		bool		force_lock = false;
+		uint16		chk_expected,
+					chk_found;
+		Datum		values[CRF_COLS];
+		bool		nulls[CRF_COLS];
+		int			i = 0;
+
+		memset(values, 0, sizeof(values));
+		memset(nulls, 0, sizeof(nulls));
+
+		/*
+		 * To avoid too much overhead, the buffer will be first read without
+		 * the locks that would guarantee the lack of false positive, as such
+		 * events should be quite rare.
+		 */
+Retry:
+		check_get_buffer(relation, forknum, blkno, buffer, force_lock,
+						 &checkit, &found_in_sb);
+
+		if (!checkit)
+			continue;
+
+		if (check_buffer(buffer, blkno, &chk_expected, &chk_found))
+			continue;
+
+		/*
+		 * If we get a failure and the buffer wasn't found in shared buffers,
+		 * reread the buffer with suitable lock to avoid false positive.  See
+		 * check_get_buffer for more details.
+		 */
+		if (!found_in_sb && !force_lock)
+		{
+			force_lock = true;
+			goto Retry;
+		}
+
+		values[i++] = ObjectIdGetDatum(relation->rd_id);
+		values[i++] = Int32GetDatum(forknum);
+		values[i++] = UInt32GetDatum(blkno);
+		/*
+		 * This can happen if a corruption makes the block appears as
+		 * PageIsNew() but isn't a new page.
+		 */
+		if (chk_expected == NoComputedChecksum)
+			nulls[i++] = true;
+		else
+			values[i++] = UInt16GetDatum(chk_expected);
+		values[i++] = UInt16GetDatum(chk_found);
+
+		Assert(i == CRF_COLS);
+
+		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
+	}
+}
+
+/*
+ * Check for interrupts and cost-based delay.
+ */
+static void
+check_delay_point(void)
+{
+	/* Always check for interrupts */
+	CHECK_FOR_INTERRUPTS();
+
+	/* Nap if appropriate */
+	if (!InterruptPending && VacuumCostBalance >= VacuumCostLimit)
+	{
+		int			msec;
+
+		msec = VacuumCostDelay * VacuumCostBalance / VacuumCostLimit;
+		if (msec > VacuumCostDelay * 4)
+			msec = VacuumCostDelay * 4;
+
+		pg_usleep(msec * 1000L);
+
+		VacuumCostBalance = 0;
+
+		/* Might have gotten an interrupt while sleeping */
+		CHECK_FOR_INTERRUPTS();
+	}
+}
+
+/*
+ * Safely read the wanted buffer from disk, dealing with possible concurrency
+ * issue.  Note that if a buffer is found dirty in shared_buffers, no read will
+ * be performed and the caller will be informed that no check should be done.
+ * We can safely ignore such buffers as they'll be written before next
+ * checkpoint's completion..
+ *
+ * The following locks can be used in this function:
+ *
+ *   - shared LWLock on the target buffer pool partition mapping.
+ *   - IOLock on the buffer
+ *
+ * The IOLock is taken when reading the buffer from disk if it exists in
+ * shared_buffers, to avoid torn pages.
+ *
+ * If the buffer isn't in shared_buffers, it'll be read from disk without any
+ * lock unless caller asked otherwise, setting needlock.  In this case, the
+ * read will be done while the buffer mapping partition LWLock is still being
+ * held.  Reading with this lock is to avoid the unlikely but possible case
+ * that a buffer wasn't present in shared buffers when we checked but it then
+ * alloc'ed in shared_buffers, modified and flushed concurrently when we
+ * later try to read it, leading to false positive due to torn page.  Caller
+ * can read first the buffer without holding the target buffer mapping
+ * partition LWLock to have an optimistic approach, and reread the buffer
+ * from disk in case of error.
+ *
+ * Caller should hold an AccessShareLock on the Relation
+ */
+static void
+check_get_buffer(Relation relation, ForkNumber forknum,
+				 BlockNumber blkno, char *buffer, bool needlock, bool *checkit,
+				 bool *found_in_sb)
+{
+	BufferTag	buf_tag;		/* identity of requested block */
+	uint32		buf_hash;		/* hash value for buf_tag */
+	LWLock	   *partLock;		/* buffer partition lock for the buffer */
+	BufferDesc *bufdesc;
+	int			buf_id;
+
+	*checkit = true;
+	*found_in_sb = false;
+
+	/* Check for interrupts and take throttling into account. */
+	check_delay_point();
+
+	/* create a tag so we can lookup the buffer */
+	INIT_BUFFERTAG(buf_tag, relation->rd_smgr->smgr_rnode.node, forknum, blkno);
+
+	/* determine its hash code and partition lock ID */
+	buf_hash = BufTableHashCode(&buf_tag);
+	partLock = BufMappingPartitionLock(buf_hash);
+
+	/* see if the block is in the buffer pool already */
+	LWLockAcquire(partLock, LW_SHARED);
+	buf_id = BufTableLookup(&buf_tag, buf_hash);
+	if (buf_id >= 0)
+	{
+		uint32		buf_state;
+
+		*found_in_sb = true;
+
+		/*
+		 * Found it.  Now, retrieve its state to know what to do with it, and
+		 * release the pin immediately.  We do so to limit overhead as much
+		 * as possible.  We'll keep the shared lightweight lock on the target
+		 * buffer mapping partition, so this buffer can't be evicted, and
+		 * we'll acquire an IOLock on the buffer if we need to read the
+		 * content on disk.
+		 */
+		bufdesc = GetBufferDescriptor(buf_id);
+
+		buf_state = LockBufHdr(bufdesc);
+		UnlockBufHdr(bufdesc, buf_state);
+
+		/* Dirty pages are ignored as they'll be flushed soon. */
+		if (buf_state & BM_DIRTY)
+			*checkit = false;
+
+		/*
+		 * Read the buffer from disk, taking on IO lock to prevent torn-page
+		 * reads, in the unlikely event that it was concurrently dirted and
+		 * flushed.
+		 */
+		if (*checkit)
+		{
+			LWLockAcquire(BufferDescriptorGetIOLock(bufdesc), LW_SHARED);
+			smgrread(relation->rd_smgr, forknum, blkno, buffer);
+			LWLockRelease(BufferDescriptorGetIOLock(bufdesc));
+
+			/*
+			 * Add a page miss cost, as we're always reading outside the
+			 * shared buffers.
+			 */
+			VacuumCostBalance += VacuumCostPageMiss;
+		}
+	}
+	else if (needlock)
+	{
+		/*
+		 * Caller asked to read the buffer while we have a lock on the target
+		 * partition.
+		 */
+		smgrread(relation->rd_smgr, forknum, blkno, buffer);
+
+		/*
+		 * Add a page miss cost, as we're always reading outside the shared
+		 * buffers.
+		 */
+		VacuumCostBalance += VacuumCostPageMiss;
+	}
+
+	LWLockRelease(partLock);
+
+	if (*found_in_sb || needlock)
+		return;
+
+	/*
+	 * Didn't find it in the buffer pool and didn't read it while holding the
+	 * buffer mapping partition lock.  We'll have to try to read it from
+	 * disk, after releasing the target partition lock to avoid too much
+	 * overhead.  It means that it's possible to get a torn page later, so
+	 * we'll have to retry with a suitable lock in case of error to avoid
+	 * false positive.
+	 */
+	smgrread(relation->rd_smgr, forknum, blkno, buffer);
+
+	/*
+	 * Add a page miss cost, as we're always reading outside the shared
+	 * buffers.
+	 */
+	VacuumCostBalance += VacuumCostPageMiss;
+}
+
+Datum
+pg_check_relation(PG_FUNCTION_ARGS)
+{
+	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+	TupleDesc	tupdesc;
+	Tuplestorestate *tupstore;
+	MemoryContext per_query_ctx;
+	MemoryContext oldcontext;
+	ForkNumber	forknum = InvalidForkNumber;
+	Oid			relid = InvalidOid;
+	const char *forkname;
+
+	if (!is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_SERVER_FILES))
+		ereport(ERROR,
+				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+				 errmsg("only superuser or a member of the pg_read_server_files role may use this function")));
+
+	if (!DataChecksumsEnabled())
+		elog(ERROR, "Data checksums are not enabled");
+
+	/* check to see if caller supports us returning a tuplestore */
+	if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("set-valued function called in context that cannot accept a set")));
+	if (!(rsinfo->allowedModes & SFRM_Materialize))
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("materialize mode required, but it is not " \
+						"allowed in this context")));
+
+	/* Switch into long-lived context to construct returned data structures */
+	per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
+	oldcontext = MemoryContextSwitchTo(per_query_ctx);
+
+	/* Build a tuple descriptor for our result type */
+	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+		elog(ERROR, "return type must be a row type");
+
+	tupstore = tuplestore_begin_heap(true, false, work_mem);
+	rsinfo->returnMode = SFRM_Materialize;
+	rsinfo->setResult = tupstore;
+	rsinfo->setDesc = tupdesc;
+
+	MemoryContextSwitchTo(oldcontext);
+
+	if (PG_NARGS() >= 1 && !PG_ARGISNULL(0))
+		relid = PG_GETARG_OID(0);
+
+	if (PG_NARGS() == 2 && !PG_ARGISNULL(1))
+	{
+		forkname = TextDatumGetCString(PG_GETARG_TEXT_PP(1));
+		forknum = forkname_to_number(forkname);
+	}
+
+	VacuumCostBalance = 0;
+
+	if (OidIsValid(relid))
+		check_one_relation(tupdesc, tupstore, relid, forknum);
+	else
+		check_all_relations(tupdesc, tupstore, forknum);
+
+	tuplestore_donestoring(tupstore);
+
+	return (Datum) 0;
+}
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index ac8f64b219..6a1c760b15 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -10703,6 +10703,14 @@
   proallargtypes => '{oid,text,int8,timestamptz}', proargmodes => '{i,o,o,o}',
   proargnames => '{tablespace,name,size,modification}',
   prosrc => 'pg_ls_tmpdir_1arg' },
+{ oid => '9147', descr => 'check data integrity for one or all relations',
+  proname => 'pg_check_relation', proisstrict => 'f', procost => '10000',
+  prorows => '20', proretset => 't', proparallel => 'r',
+  provolatile => 'v', prorettype => 'record', proargtypes => 'regclass text',
+  proallargtypes => '{regclass,text,oid,int4,int8,int4,int4}',
+  proargmodes => '{i,i,o,o,o,o,o}',
+  proargnames => '{relation,fork,relid,forknum,failed_blocknum,expected_checksum,found_checksum}',
+  prosrc => 'pg_check_relation' },
 
 # hash partitioning constraint function
 { oid => '5028', descr => 'hash partition CHECK constraint',
diff --git a/src/test/Makefile b/src/test/Makefile
index efb206aa75..832868099f 100644
--- a/src/test/Makefile
+++ b/src/test/Makefile
@@ -12,7 +12,8 @@ subdir = src/test
 top_builddir = ../..
 include $(top_builddir)/src/Makefile.global
 
-SUBDIRS = perl regress isolation modules authentication recovery subscription
+SUBDIRS = perl regress isolation modules authentication check_relation \
+	  recovery subscription
 
 # Test suites that are not safe by default but can be run if selected
 # by the user via the whitespace-separated list in variable
diff --git a/src/test/check_relation/.gitignore b/src/test/check_relation/.gitignore
new file mode 100644
index 0000000000..871e943d50
--- /dev/null
+++ b/src/test/check_relation/.gitignore
@@ -0,0 +1,2 @@
+# Generated by test suite
+/tmp_check/
diff --git a/src/test/check_relation/Makefile b/src/test/check_relation/Makefile
new file mode 100644
index 0000000000..792098fa65
--- /dev/null
+++ b/src/test/check_relation/Makefile
@@ -0,0 +1,23 @@
+#-------------------------------------------------------------------------
+#
+# Makefile for src/test/check_relation
+#
+# Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
+# Portions Copyright (c) 1994, Regents of the University of California
+#
+# src/test/check_relation/Makefile
+#
+#-------------------------------------------------------------------------
+
+subdir = src/test/check_relation
+top_builddir = ../../..
+include $(top_builddir)/src/Makefile.global
+
+check:
+	$(prove_check)
+
+installcheck:
+	$(prove_installcheck)
+
+clean distclean maintainer-clean:
+	rm -rf tmp_check
diff --git a/src/test/check_relation/README b/src/test/check_relation/README
new file mode 100644
index 0000000000..415c4b21ad
--- /dev/null
+++ b/src/test/check_relation/README
@@ -0,0 +1,23 @@
+src/test/check_relation/README
+
+Regression tests for online checksums verification
+==================================================
+
+This directory contains a test suite for online checksums verification.
+
+Running the tests
+=================
+
+NOTE: You must have given the --enable-tap-tests argument to configure.
+
+Run
+    make check
+or
+    make installcheck
+You can use "make installcheck" if you previously did "make install".
+In that case, the code in the installation tree is tested.  With
+"make check", a temporary installation tree is built from the current
+sources and then tested.
+
+Either way, this test initializes, starts, and stops a test Postgres
+cluster.
diff --git a/src/test/check_relation/t/01_checksums_check.pl b/src/test/check_relation/t/01_checksums_check.pl
new file mode 100644
index 0000000000..e98385a677
--- /dev/null
+++ b/src/test/check_relation/t/01_checksums_check.pl
@@ -0,0 +1,250 @@
+use strict;
+use warnings;
+
+use PostgresNode;
+use TestLib;
+use Test::More tests => 41;
+
+our $CHECKSUM_UINT16_OFFSET = 4;
+our $PD_UPPER_UINT16_OFFSET = 7;
+our $BLOCKSIZE;
+
+sub get_block
+{
+	my ($filename, $blkno) = @_;
+	my $block;
+
+	open(my $infile, '<', $filename) or die;
+	binmode($infile);
+
+	my $success = read($infile, $block, $BLOCKSIZE, ($blkno * $BLOCKSIZE));
+	die($!) if not defined $success;
+
+	close($infile);
+
+	return($block);
+}
+
+sub overwrite_block
+{
+	my ($filename, $block, $blkno) = @_;
+
+	open(my $outfile, '>', $filename) or die;
+	binmode ($outfile);
+
+	my $nb = syswrite($outfile, $block, $BLOCKSIZE, ($blkno * $BLOCKSIZE));
+
+	die($!) if not defined $nb;
+	die("Write error") if ($nb != $BLOCKSIZE);
+
+	$outfile->flush();
+
+	close($outfile);
+}
+
+sub get_uint16_from_page
+{
+	my ($block, $offset) = @_;
+
+	return (unpack("S*", $block))[$offset];
+}
+
+sub set_uint16_to_page
+{
+	my ($block, $data, $offset) = @_;
+
+	my $pack = pack("S", $data);
+
+	# vec with 16B or more won't preserve endianness
+	vec($block, 2*$offset, 8) = (unpack('C*', $pack))[0];
+	vec($block, (2*$offset) + 1, 8) = (unpack('C*', $pack))[1];
+
+	return $block;
+}
+
+sub check_checksums_call
+{
+	my ($node, $relname) = @_;
+
+	my ($cmdret, $stdout, $stderr) = $node->psql('postgres', "SELECT COUNT(*)"
+		. " FROM pg_catalog.pg_check_relation('$relname')"
+	);
+
+	 return ($stderr eq '');
+}
+
+sub check_checksums_nb_error
+{
+	my ($node, $nb) = @_;
+
+	my ($cmdret, $stdout, $stderr) = $node->psql('postgres', "SELECT COUNT(*)"
+		. " FROM pg_catalog.pg_check_relation()"
+	);
+
+	 is($stderr, '', 'Function should run successfully');
+	 is($stdout, $nb, "Should have $nb error");
+}
+
+sub get_checksums_errors
+{
+	my ($node, $nb) = @_;
+
+	my ($cmdret, $stdout, $stderr) = $node->psql('postgres', "SELECT"
+		. " relid::regclass::text, forknum, failed_blocknum,"
+		. " expected_checksum, found_checksum"
+		. " FROM pg_catalog.pg_check_relation()"
+	);
+
+	 is($stderr, '', 'Function should run successfully');
+
+	 return $stdout;
+}
+
+# This function will perform various test by modifying the specified block at
+# the specified uint16 offset, checking that the corruption is correctly
+# detected, and finally restore the specified block to its original content.
+sub corrupt_and_test_block
+{
+	my ($node, $filename, $blkno, $offset, $fake_data) = @_;
+
+	check_checksums_nb_error($node, 0,
+		"There shouldn't be checksums errors before modifications");
+
+	$node->stop();
+
+	my $original_block = get_block($filename, 0);
+	my $original_data = get_uint16_from_page($original_block, $offset);
+
+	isnt($original_data, $fake_data,
+		"The fake data at offset $offset should be different"
+		. " from the existing one");
+
+	my $new_block = set_uint16_to_page($original_block, $fake_data, $offset);
+	isnt($original_data, get_uint16_from_page($new_block, $offset),
+		"The fake data at offset $offset should have been changed in memory");
+
+	overwrite_block($filename, $new_block, 0);
+
+	my $written_data = get_uint16_from_page(get_block($filename, 0), $offset);
+	isnt($original_data, $written_data,
+		"The data written at offset $offset should be different"
+		. " from the original one");
+	is(get_uint16_from_page($new_block, $offset), $written_data,
+		"The data written at offset $offset should be the same"
+		. " as the one in memory");
+	is($written_data, $fake_data,
+		"The data written at offset $offset should be the one"
+	   . "	we wanted to write");
+
+	$node->start();
+
+	check_checksums_nb_error($node, 1, "There should be one checksums errors");
+
+	my $expected_checksum;
+	my $found_checksum = get_uint16_from_page($new_block,
+		$CHECKSUM_UINT16_OFFSET);
+	if ($offset == $PD_UPPER_UINT16_OFFSET)
+	{
+		# A checksum can't be computed if it's detected as PageIsNew(), so the
+		# function returns NULL for the computed checksum
+		$expected_checksum = '';
+	}
+	else
+	{
+		$expected_checksum = get_uint16_from_page($original_block,
+			$CHECKSUM_UINT16_OFFSET);
+	}
+
+	my $det = get_checksums_errors($node);
+	is($det, "t1|0|0|$expected_checksum|$found_checksum",
+		"The checksums error for modification at offset $offset"
+		. " should be detected");
+
+	$node->stop();
+
+	$new_block = set_uint16_to_page($original_block, $original_data, $offset);
+	is($original_data, get_uint16_from_page($new_block, $offset),
+		"The data at offset $offset should have been restored in memory");
+
+	overwrite_block($filename, $new_block, 0);
+	is($original_data, get_uint16_from_page(get_block($filename, $blkno),
+			$offset),
+		"The data at offset $offset should have been restored on disk");
+
+	$node->start();
+
+	check_checksums_nb_error($node, 0, "There shouldn't be checksums errors");
+}
+
+if (exists $ENV{MY_PG_REGRESS})
+{
+	$ENV{PG_REGRESS} = $ENV{MY_PG_REGRESS};
+}
+
+my $node = get_new_node('main');
+
+my %params;
+$params{'extra'} = ['--data-checksums'];
+$node->init(%params);
+
+$node->start();
+
+$ENV{PGOPTIONS} = '--client-min-messages=WARNING';
+
+my ($cmdret, $stdout, $stderr) = $node->psql('postgres', "SELECT"
+	. " current_setting('data_checksums')");
+
+is($stdout, 'on', 'Data checksums shoud be enabled');
+
+($cmdret, $stdout, $stderr) = $node->psql('postgres', "SELECT"
+	. " current_setting('block_size')");
+
+$BLOCKSIZE = $stdout;
+
+$node->safe_psql(
+	'postgres', q|
+	CREATE TABLE public.t1(id integer);
+	CREATE INDEX t1_id_idx ON public.t1 (id);
+	INSERT INTO public.t1 SELECT generate_series(1, 100);
+	CREATE VIEW public.v1 AS SELECT * FROM t1;
+	CREATE MATERIALIZED VIEW public.mv1 AS SELECT * FROM t1;
+	CREATE SEQUENCE public.s1;
+	CREATE UNLOGGED TABLE public.u_t1(id integer);
+	CREATE INDEX u_t1_id_idx ON public.u_t1 (id);
+	INSERT INTO public.u_t1 SELECT generate_series(1, 100);
+	CHECKPOINT;
+|);
+
+# Check sane behavior on various objects type, including those that  don't have
+# a storage.
+is(check_checksums_call($node, 't1'), '1', 'Can check a table');
+is(check_checksums_call($node, 't1_id_idx'), '1', 'Can check an index');
+is(check_checksums_call($node, 'v1'), '', 'Cannot check a view');
+is(check_checksums_call($node, 'mv1'), '1', 'Can check a materialized view');
+is(check_checksums_call($node, 's1'), '1', 'Can check a sequence');
+is(check_checksums_call($node, 'u_t1'), '1', 'Can check an unlogged table');
+is(check_checksums_call($node, 'u_t1_id_idx'), '1', 'Can check an unlogged index');
+
+# get the underlying heap absolute path
+($cmdret, $stdout, $stderr) = $node->psql('postgres', "SELECT"
+	. " current_setting('data_directory') || '/' || pg_relation_filepath('t1')"
+);
+
+isnt($stdout, '', 'A relfinode should be returned');
+
+my $filename = $stdout;
+
+check_checksums_nb_error($node, 0, "There shouldn't be checksums errors");
+
+my $fake_uint16 = hex '0x0000';
+
+# Test with a modified checksum.  We use a zero checksum here as it's the only
+# one that cannot exist on a checksummed page.  We also don't have an easy way
+# to compute what the checksum would be after a modification in a random place
+# in the block.
+corrupt_and_test_block($node, $filename, 0, $CHECKSUM_UINT16_OFFSET,
+	$fake_uint16);
+
+# Test corruption making the block looks like it's PageIsNew().
+corrupt_and_test_block($node, $filename, 0, $PD_UPPER_UINT16_OFFSET,
+	$fake_uint16);
