From 82b5cc8356b49ca4a781e066f9abc703aead15c5 Mon Sep 17 00:00:00 2001
From: David Christensen <david.christensen@crunchydata.com>
Date: Thu, 10 Nov 2022 10:29:38 -0600
Subject: [PATCH v8] Teach pg_waldump to extract FPIs from the WAL stream

Extracts full-page images from the WAL stream into a given target directory.
These images are subject to the same filtering rules as normal display in
pg_waldump, which means that you can isolate the full page writes to a
target relation, among other things.

Files are saved with the filename: <lsn>.<ts>.<db>.<rel>.<blk>_<fork> with
formatting to make things somewhat sortable; for instance:

00000000-010000C0.1663.1.6117.0_main
00000000-01000150.1664.0.6115.0_main
00000000-010001E0.1664.0.6114.0_main
00000000-01000270.1663.1.6116.0_main
00000000-01000300.1663.1.6113.0_main
00000000-01000390.1663.1.6112.0_main
00000000-01000420.1663.1.8903.0_main
00000000-010004B0.1663.1.8902.0_main
00000000-01000540.1663.1.6111.0_main
00000000-010005D0.1663.1.6110.0_main

It's noteworthy that the raw block images do not have the current LSN
stored with them in the WAL stream (as would be true for on-heap
versions of the blocks), nor would the checksum be updated in
them (though WAL itself has checksums, so there is some protection
there).

These images could be loaded/inspected via `pg_read_binary_file()` and
used in the `pageinspect` suite of tools to perform detailed analysis on
the pages in question, based on historical information, and may come in
handy for forensics work.
---
 doc/src/sgml/ref/pg_waldump.sgml          |  69 ++++++++++++++
 src/bin/pg_waldump/meson.build            |   1 +
 src/bin/pg_waldump/pg_waldump.c           |  95 +++++++++++++++++++-
 src/bin/pg_waldump/t/002_save_fullpage.pl | 104 ++++++++++++++++++++++
 4 files changed, 268 insertions(+), 1 deletion(-)
 create mode 100644 src/bin/pg_waldump/t/002_save_fullpage.pl

diff --git a/doc/src/sgml/ref/pg_waldump.sgml b/doc/src/sgml/ref/pg_waldump.sgml
index d559f091e5..3c5bd9eb3f 100644
--- a/doc/src/sgml/ref/pg_waldump.sgml
+++ b/doc/src/sgml/ref/pg_waldump.sgml
@@ -240,6 +240,75 @@ PostgreSQL documentation
        </listitem>
      </varlistentry>
 
+     <varlistentry>
+       <term><option>-W <replaceable>save_path</replaceable></option></term>
+       <term><option>--save-fpi=<replaceable>save_path</replaceable></option></term>
+       <listitem>
+       <para>
+        Save full page images seen in the WAL stream to the
+        <replaceable>save_path</replaceable> directory, which will be created
+        if it does not exist.  The images saved will be subject to the same
+        filtering and limiting criteria as display records, but in this
+        mode <application>pg_waldump</application> will not output any other
+        information.
+       </para>
+       <para>
+        The page images will be saved with the file
+        format: <literal><replaceable>LSN</replaceable>.<replaceable>RELTABLESPACE</replaceable>.<replaceable>DATOID</replaceable>.<replaceable>RELNODE</replaceable>.<replaceable>BLKNO</replaceable><replaceable>FORK</replaceable></literal>
+
+        The dot-separated components are (in order):
+
+        <informaltable>
+         <tgroup cols="2">
+          <thead>
+           <row>
+            <entry>Component</entry>
+            <entry>Description</entry>
+           </row>
+          </thead>
+
+          <tbody>
+           <row>
+            <entry>LSN</entry>
+            <entry>The LSN of the record with this block, formatted
+             as two 8-character hexadecimal numbers <literal>%08X-%08X</literal></entry>
+           </row>
+
+           <row>
+            <entry>RELTABLESPACE</entry>
+            <entry>tablespace OID for the block</entry>
+           </row>
+
+           <row>
+            <entry>DATOID</entry>
+            <entry>database OID for the block</entry>
+           </row>
+
+           <row>
+            <entry>RELNODE</entry>
+            <entry>relnode id for the block</entry>
+           </row>
+
+           <row>
+            <entry>BLKNO</entry>
+            <entry>the block number of this block</entry>
+           </row>
+
+           <row>
+            <entry>FORK</entry>
+            <entry>
+             The name of the fork the full page image came from.  One
+             of <literal>_main</literal>, <literal>_fsm</literal>,
+             <literal>_vm</literal>, or <literal>_init</literal>.
+            </entry>
+           </row>
+          </tbody>
+         </tgroup>
+        </informaltable>
+       </para>
+       </listitem>
+     </varlistentry>
+
      <varlistentry>
       <term><option>-x <replaceable>xid</replaceable></option></term>
       <term><option>--xid=<replaceable>xid</replaceable></option></term>
diff --git a/src/bin/pg_waldump/meson.build b/src/bin/pg_waldump/meson.build
index 9605976870..34e37bffc3 100644
--- a/src/bin/pg_waldump/meson.build
+++ b/src/bin/pg_waldump/meson.build
@@ -29,6 +29,7 @@ tests += {
   'tap': {
     'tests': [
       't/001_basic.pl',
+      't/002_save_fullpage.pl',
     ],
   },
 }
diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c
index 9993378ca5..b79f09ad4f 100644
--- a/src/bin/pg_waldump/pg_waldump.c
+++ b/src/bin/pg_waldump/pg_waldump.c
@@ -23,9 +23,15 @@
 #include "access/xlogrecord.h"
 #include "access/xlogstats.h"
 #include "common/fe_memutils.h"
+#include "common/file_perm.h"
+#include "common/file_utils.h"
 #include "common/logging.h"
+#include "common/relpath.h"
 #include "getopt_long.h"
 #include "rmgrdesc.h"
+#include "storage/bufpage.h"
+#include "storage/checksum.h"
+#include "storage/checksum_impl.h"
 
 /*
  * NOTE: For any code change or issue fix here, it is highly recommended to
@@ -70,6 +76,9 @@ typedef struct XLogDumpConfig
 	bool		filter_by_relation_block_enabled;
 	ForkNumber	filter_by_relation_forknum;
 	bool		filter_by_fpw;
+
+	/* save options */
+	char	   *save_fpw_path;
 } XLogDumpConfig;
 
 
@@ -439,6 +448,61 @@ XLogRecordHasFPW(XLogReaderState *record)
 	return false;
 }
 
+/*
+ * Function to externally save all FPWs stored in the given WAL record
+ */
+static void
+XLogRecordSaveFPWs(XLogReaderState *record, const char *savepath)
+{
+	int			block_id;
+
+	for (block_id = 0; block_id <= XLogRecMaxBlockId(record); block_id++)
+	{
+		PGAlignedBlock buf;
+		Page		page;
+		char		filename[MAXPGPATH];
+		char		forkname[FORKNAMECHARS + 2];	/* _ + \0 */
+		FILE	   *OPF;
+		BlockNumber blk;
+		RelFileLocator rnode;
+		ForkNumber	fork;
+
+		page = (Page) buf.data;
+
+		if (!XLogRecHasBlockRef(record, block_id))
+			continue;
+
+		if (!XLogRecHasBlockImage(record, block_id))
+			continue;
+
+		if (!RestoreBlockImage(record, block_id, page))
+			continue;
+
+		/* we have our extracted FPI, let's save it now */
+
+		XLogRecGetBlockTagExtended(record, block_id,
+								   &rnode, &fork, &blk, NULL);
+
+		if (fork >= 0 && fork <= MAX_FORKNUM)
+			sprintf(forkname, "_%s", forkNames[fork]);
+		else
+			pg_fatal("found invalid fork number: %u", fork);
+
+		snprintf(filename, MAXPGPATH, "%s/%08X-%08X.%u.%u.%u.%u%s", savepath,
+				 LSN_FORMAT_ARGS(record->ReadRecPtr),
+				 rnode.spcOid, rnode.dbOid, rnode.relNumber, blk, forkname);
+
+		OPF = fopen(filename, PG_BINARY_W);
+		if (!OPF)
+			pg_fatal("couldn't open file for output: %s", filename);
+
+		if (fwrite(page, BLCKSZ, 1, OPF) != 1)
+			pg_fatal("couldn't write out complete full page image to file: %s", filename);
+
+		fclose(OPF);
+	}
+}
+
 /*
  * Print a record to stdout
  */
@@ -679,6 +743,8 @@ usage(void)
 			 "                         (default: 1 or the value used in STARTSEG)\n"));
 	printf(_("  -V, --version          output version information, then exit\n"));
 	printf(_("  -w, --fullpage         only show records with a full page write\n"));
+	printf(_("  -W, --save-fpi=path    save full page images to given path as\n"
+			 "                         LSN.T.D.R.B_F\n"));
 	printf(_("  -x, --xid=XID          only show records with transaction ID XID\n"));
 	printf(_("  -z, --stats[=record]   show statistics instead of records\n"
 			 "                         (optionally, show per-record statistics)\n"));
@@ -712,6 +778,7 @@ main(int argc, char **argv)
 		{"limit", required_argument, NULL, 'n'},
 		{"path", required_argument, NULL, 'p'},
 		{"quiet", no_argument, NULL, 'q'},
+		{"save-fpi", required_argument, NULL, 'W'},
 		{"relation", required_argument, NULL, 'R'},
 		{"rmgr", required_argument, NULL, 'r'},
 		{"start", required_argument, NULL, 's'},
@@ -772,6 +839,7 @@ main(int argc, char **argv)
 	config.filter_by_fpw = false;
 	config.stats = false;
 	config.stats_per_record = false;
+	config.save_fpw_path = NULL;
 
 	stats.startptr = InvalidXLogRecPtr;
 	stats.endptr = InvalidXLogRecPtr;
@@ -782,7 +850,7 @@ main(int argc, char **argv)
 		goto bad_argument;
 	}
 
-	while ((option = getopt_long(argc, argv, "bB:e:fF:n:p:qr:R:s:t:wx:z",
+	while ((option = getopt_long(argc, argv, "bB:e:fF:n:p:qr:R:s:t:wW:x:X:z",
 								 long_options, &optindex)) != -1)
 	{
 		switch (option)
@@ -918,6 +986,9 @@ main(int argc, char **argv)
 			case 'w':
 				config.filter_by_fpw = true;
 				break;
+			case 'W':
+				config.save_fpw_path = pg_strdup(optarg);
+				break;
 			case 'x':
 				if (sscanf(optarg, "%u", &config.filter_by_xid) != 1)
 				{
@@ -972,6 +1043,17 @@ main(int argc, char **argv)
 		}
 	}
 
+	if (config.save_fpw_path != NULL)
+	{
+		/* Create the dir if it doesn't exist */
+		if (pg_mkdir_p(config.save_fpw_path, pg_dir_create_mode) < 0)
+		{
+			pg_log_error("could not create output directory \"%s\": %m",
+						 config.save_fpw_path);
+			goto bad_argument;
+		}
+	}
+
 	/* parse files as start/end boundaries, extract path if not specified */
 	if (optind < argc)
 	{
@@ -1150,6 +1232,11 @@ main(int argc, char **argv)
 				XLogRecStoreStats(&stats, xlogreader_state);
 				stats.endptr = xlogreader_state->EndRecPtr;
 			}
+			else if (config.save_fpw_path)
+			{
+				if (XLogRecordHasFPW(xlogreader_state))
+					XLogRecordSaveFPWs(xlogreader_state, config.save_fpw_path);
+			}
 			else
 				XLogDumpDisplayRecord(&config, xlogreader_state);
 		}
@@ -1167,6 +1254,12 @@ main(int argc, char **argv)
 	if (time_to_stop)
 		exit(0);
 
+	if (config.save_fpw_path != NULL)
+	{
+		/* Fsync our output directory */
+		fsync_fname(config.save_fpw_path, true);
+	}
+
 	if (errormsg)
 		pg_fatal("error in WAL record at %X/%X: %s",
 				 LSN_FORMAT_ARGS(xlogreader_state->ReadRecPtr),
diff --git a/src/bin/pg_waldump/t/002_save_fullpage.pl b/src/bin/pg_waldump/t/002_save_fullpage.pl
new file mode 100644
index 0000000000..c9a2ac733a
--- /dev/null
+++ b/src/bin/pg_waldump/t/002_save_fullpage.pl
@@ -0,0 +1,104 @@
+
+# Copyright (c) 2022, PostgreSQL Global Development Group
+
+use strict;
+use warnings;
+use File::Basename;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::RecursiveCopy;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+my $blocksize;
+
+# routine to extract the LSN and checksum from the given block structure
+sub get_block_info
+{
+	my $path = shift;
+	my $block;
+
+	open my $fh, '<', $path or die "couldn't open file: $path\n";
+	die "couldn't read full block\n" if $blocksize != read $fh, $block, $blocksize;
+	my ($lsn_hi, $lsn_lo, $checksum) = unpack('VVv', $block);
+
+	$lsn_hi = sprintf('%08X', $lsn_hi);
+	$lsn_lo = sprintf('%08X', $lsn_lo);
+
+	return ($lsn_hi, $lsn_lo, $checksum);
+}
+
+# Set umask so test directories and files are created with default permissions
+umask(0077);
+
+my $node =  PostgreSQL::Test::Cluster->new('primary');
+$node->init(extra => ['-k'], allows_streaming => 1);
+$node->start;
+
+# Sanity checks for command line options.
+$node->command_fails(
+	[ 'pg_waldump', '--save-fpi' ],
+	'--save-fpi fails without path');
+
+# generate data/wal to examine that will have FPIs in them
+$node->safe_psql('postgres', <<EOF);
+SELECT 'init' FROM pg_create_physical_replication_slot('regress_pg_waldump_slot', true, false);
+CREATE TABLE test_table AS SELECT generate_series(1,100) a;
+CHECKPOINT;
+SELECT pg_switch_wal();
+UPDATE test_table SET a = a + 1;
+SELECT pg_switch_wal();
+EOF
+
+# get our blocksize from the db
+$blocksize = $node->safe_psql('postgres', "SELECT current_setting('block_size')");
+
+# get the relation node, etc for the new table
+my $relation = $node->safe_psql('postgres',
+	q{SELECT format('%s/%s/%s', CASE WHEN reltablespace = 0 THEN dattablespace ELSE reltablespace END, pg_database.oid, pg_relation_filenode(pg_class.oid)) FROM pg_class, pg_database WHERE relname = 'test_table' AND datname = current_database()}
+);
+
+diag $relation;
+
+my $waldir = $node->basedir . '/pgdata/pg_wal';
+my $walfile = [sort { $a <=> $b } glob("$waldir/00*")]->[1]; # we want the second WAL file, which will be a complete WAL file with full-page writes for our specific relation.
+my $tmp_folder = PostgreSQL::Test::Utils::tempdir;
+diag "using walfile: $walfile";
+
+ok($walfile, "Got a WAL file");
+
+$node->command_ok(['pg_waldump', '--save-fpi', "$tmp_folder/raw", '--relation', $relation, $walfile]);
+
+my $file_re =
+  qr/^([0-9A-F]{8})-([0-9A-F]{8})[.][0-9]+[.][0-9]+[.][0-9]+[.][0-9]+(?:_vm|_init|_fsm|_main)?$/;
+
+my %checksums;
+my %files;
+
+# verify filename formats matches w/--save-fpi
+for my $fullpath (glob "$tmp_folder/raw/*")
+{
+	my $file = File::Basename::basename($fullpath);
+
+	like($file, $file_re, "verify filename format for file $file");
+
+	# save filename for later verification
+	$files{$file}++;
+
+	my ($hi_lsn_fn, $lo_lsn_fn) = ($file =~ $file_re);
+	my ($hi_lsn_bk, $lo_lsn_bk, $checksum) = get_block_info($fullpath);
+
+	# verify the lsn in the block comes before the file's lsn
+	ok( $hi_lsn_fn . $lo_lsn_fn gt $hi_lsn_bk . $lo_lsn_bk,
+		'verify file-based LSN precedes block-based one');
+
+	# stash checksum for later comparisons
+	$checksums{$file} = $checksum;
+}
+
+# validate that we ended up with some files output and they were the same
+ok(keys %files > 0, 'verify we processed some files');
+
+$node->safe_psql('postgres', <<EOQ);
+SELECT pg_drop_replication_slot('regress_pg_waldump_slot');
+EOQ
+done_testing();
-- 
2.37.1 (Apple Git-137.1)

