From e9063dba6d6eac55db15c148380a85aa394f647b Mon Sep 17 00:00:00 2001
From: David Christensen <david.christensen@crunchydata.com>
Date: Wed, 20 Apr 2022 19:59:35 -0500
Subject: [PATCH] Teach pg_waldump to extract FPIs from the WAL stream

Extracts full-page images from the WAL stream into a target directory, which must be empty or not
exist.  These images are subject to the same filtering rules as normal display in pg_waldump, which
means that you can isolate the full page writes to a target relation, among other things.

Files are saved with the filename: <lsn>.<ts>.<db>.<rel>.<blk> with formatting to make things
somewhat sortable; for instance:

00000000-010000C0.1663.1.6117.0
00000000-01000150.1664.0.6115.0
00000000-010001E0.1664.0.6114.0
00000000-01000270.1663.1.6116.0
00000000-01000300.1663.1.6113.0
00000000-01000390.1663.1.6112.0
00000000-01000420.1663.1.8903.0
00000000-010004B0.1663.1.8902.0
00000000-01000540.1663.1.6111.0
00000000-010005D0.1663.1.6110.0

It's noteworthy that the raw images do not have the current LSN stored with them in the WAL
stream (as would be true for on-heap versions of the blocks), nor would the checksum be valid in
them (though WAL itself has checksums, so there is some protection there).  This patch chooses to
place the LSN and calculate the proper checksum (if non-zero in the source image) in the outputted
block.  (This could perhaps be a targetted flag if we decide we don't always want this.)

These images could be loaded/inspected via `pg_read_binary_file()` and used in the `pageinspect`
suite of tools to perform detailed analysis on the pages in question, based on historical
information, and may come in handy for forensics work.
---
 doc/src/sgml/ref/pg_waldump.sgml |  57 +++++++++++++
 src/bin/pg_waldump/pg_waldump.c  | 132 ++++++++++++++++++++++++++++++-
 2 files changed, 188 insertions(+), 1 deletion(-)

diff --git a/doc/src/sgml/ref/pg_waldump.sgml b/doc/src/sgml/ref/pg_waldump.sgml
index 57746d9421..215e63d962 100644
--- a/doc/src/sgml/ref/pg_waldump.sgml
+++ b/doc/src/sgml/ref/pg_waldump.sgml
@@ -240,6 +240,63 @@ PostgreSQL documentation
        </listitem>
      </varlistentry>
 
+     <varlistentry>
+       <term><option>-W <replaceable>save_path</replaceable></option></term>
+       <term><option>--raw-fpi=<replaceable>save_path</replaceable></option></term>
+       <listitem>
+       <para>
+        Save full page images seen in the WAL stream to the
+        given <replaceable>save_path</replaceable>, which should not exist or
+        should be empty.  The images saved will be subject to the same
+        filtering and limiting criteria as display records, but in this
+        mode <application>pg_waldump</application> will not output any other
+        information.
+       </para>
+       <para>
+        The page images will be saved with the file
+        format: <literal><replaceable>LSN</replaceable>.<replaceable>TSOID</replaceable>.<replaceable>DBOID</replaceable>.<replaceable>RELNODE</replaceable>.<replaceable>BLKNO</replaceable></literal>
+
+        The dot-separated components are (in order):
+
+        <table id="pgwaldump-save-fpi-components">
+         <tgroup cols="2">
+          <thead>
+           <row>
+            <entry>Component</entry>
+            <entry>Description</entry>
+           </row>
+          </thead>
+          <tbody>
+           <row>
+            <entry>LSN</entry>
+            <entry>The LSN of the record with this block, formatted
+            as <literal>%08x-%08X</literal> instead of the
+            conventional <literal>%X/%X</literal> due to filesystem naming
+            limits</entry>
+           </row>
+           <row>
+            <entry>TSOID</entry>
+            <entry>tablespace OID for the block</entry>
+           </row>
+           <row>
+            <entry>DBOID</entry>
+            <entry>database OID for the block</entry>
+           </row>
+           <row>
+            <entry>RELNODE</entry>
+            <entry>relnode id for the block</entry>
+           </row>
+           <row>
+            <entry>BLKNO</entry>
+            <entry>the block number of this block</entry>
+           </row>
+          </tbody>
+         </tgroup>
+        </table>        
+       </para>
+       </listitem>
+     </varlistentry>
+
      <varlistentry>
       <term><option>-x <replaceable>xid</replaceable></option></term>
       <term><option>--xid=<replaceable>xid</replaceable></option></term>
diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c
index 4f265ef546..bcb3e2c350 100644
--- a/src/bin/pg_waldump/pg_waldump.c
+++ b/src/bin/pg_waldump/pg_waldump.c
@@ -26,6 +26,9 @@
 #include "common/logging.h"
 #include "getopt_long.h"
 #include "rmgrdesc.h"
+#include "storage/bufpage.h"
+#include "storage/checksum.h"
+#include "storage/checksum_impl.h"
 
 /*
  * NOTE: For any code change or issue fix here, it is highly recommended to
@@ -70,6 +73,10 @@ typedef struct XLogDumpConfig
 	bool		filter_by_relation_block_enabled;
 	ForkNumber	filter_by_relation_forknum;
 	bool		filter_by_fpw;
+
+	/* output options */
+	bool        save_fpw;
+	char        *save_fpw_path;
 } XLogDumpConfig;
 
 
@@ -439,6 +446,68 @@ XLogRecordHasFPW(XLogReaderState *record)
 	return false;
 }
 
+/*
+ * Function to externally save all FPWs stored in the given WAL record
+ */
+static void
+XLogRecordSaveFPWs(XLogReaderState *record, const char *savepath)
+{
+	int			block_id;
+
+	for (block_id = 0; block_id <= XLogRecMaxBlockId(record); block_id++)
+	{
+		/* we will now extract the fullpage image from the XLogRecord and save
+		 * it to a calculated filename */
+
+		if (XLogRecHasBlockImage(record, block_id))
+		{
+			char page[BLCKSZ];
+
+			memset(page, 0, BLCKSZ);
+
+			if (RestoreBlockImage(record, block_id, page))
+			{
+				/* we have our extracted FPI, let's save it now */
+				char filename[MAXPGPATH];
+				FILE *OPF;
+				BlockNumber blk;
+				RelFileNode rnode;
+
+				XLogRecGetBlockTagExtended(record, block_id,
+										   &rnode, NULL, &blk, NULL);
+
+				/*
+				 * The page may be uninitialized. If so, we can't set the LSN because
+				 * that would corrupt the page.
+				 */
+				if (!PageIsNew(page))
+				{
+					PageSetLSN(page, record->ReadRecPtr);
+					/* if checksum field is non-zero then we have checksums enabled,
+					 * so recalculate the checksum with new LSN (yes, this is a hack)
+					 */
+					if (((PageHeader) page)->pd_checksum)
+						((PageHeader) page)->pd_checksum = pg_checksum_page((char *) page, blk);
+				}
+
+				/* TODO: do we need fork number here? */
+				snprintf(filename, MAXPGPATH, "%s/%08X-%08X.%u.%u.%u.%u", savepath,
+						 LSN_FORMAT_ARGS(record->ReadRecPtr),
+						 rnode.spcNode, rnode.dbNode, rnode.relNode, blk);
+
+				OPF = fopen(filename, PG_BINARY_W);
+				if (!OPF)
+					pg_fatal("Couldn't open file for output: %s", filename);
+
+				if (pg_pwrite(fileno(OPF), page, BLCKSZ, 0) != BLCKSZ)
+					pg_fatal("Couldn't write out complete FPI to file: %s", filename);
+
+				fclose(OPF);
+			}
+		}
+	}
+}
+
 /*
  * Print a record to stdout
  */
@@ -679,6 +748,7 @@ usage(void)
 			 "                         (default: 1 or the value used in STARTSEG)\n"));
 	printf(_("  -V, --version          output version information, then exit\n"));
 	printf(_("  -w, --fullpage         only show records with a full page write\n"));
+	printf(_("  -W, --raw-fpi=path     save found full page images to given path\n"));
 	printf(_("  -x, --xid=XID          only show records with transaction ID XID\n"));
 	printf(_("  -z, --stats[=record]   show statistics instead of records\n"
 			 "                         (optionally, show per-record statistics)\n"));
@@ -712,6 +782,7 @@ main(int argc, char **argv)
 		{"limit", required_argument, NULL, 'n'},
 		{"path", required_argument, NULL, 'p'},
 		{"quiet", no_argument, NULL, 'q'},
+		{"raw-fpi", required_argument, NULL, 'W'},
 		{"relation", required_argument, NULL, 'R'},
 		{"rmgr", required_argument, NULL, 'r'},
 		{"start", required_argument, NULL, 's'},
@@ -772,6 +843,8 @@ main(int argc, char **argv)
 	config.filter_by_fpw = false;
 	config.stats = false;
 	config.stats_per_record = false;
+	config.save_fpw = false;
+	config.save_fpw_path = NULL;
 
 	stats.startptr = InvalidXLogRecPtr;
 	stats.endptr = InvalidXLogRecPtr;
@@ -782,7 +855,7 @@ main(int argc, char **argv)
 		goto bad_argument;
 	}
 
-	while ((option = getopt_long(argc, argv, "bB:e:fF:n:p:qr:R:s:t:wx:z",
+	while ((option = getopt_long(argc, argv, "bB:e:fF:n:p:qr:R:s:t:wW:x:z",
 								 long_options, &optindex)) != -1)
 	{
 		switch (option)
@@ -919,6 +992,10 @@ main(int argc, char **argv)
 			case 'w':
 				config.filter_by_fpw = true;
 				break;
+			case 'W':
+				config.save_fpw = true;
+				config.save_fpw_path = pg_strdup(optarg);
+				break;
 			case 'x':
 				if (sscanf(optarg, "%u", &config.filter_by_xid) != 1)
 				{
@@ -972,6 +1049,54 @@ main(int argc, char **argv)
 		}
 	}
 
+	if (config.save_fpw_path != NULL)
+	{
+		struct stat st;
+		bool		is_empty = false;
+
+		/* we accept an empty existing directory */
+		if (stat(config.save_fpw_path, &st) == 0 && S_ISDIR(st.st_mode))
+		{
+			DIR		   *dir = opendir(config.save_fpw_path);
+
+			if (dir)
+			{
+				struct dirent *d;
+
+				is_empty = true;
+				while (errno = 0, (d = readdir(dir)))
+				{
+					if (strcmp(d->d_name, ".") != 0 && strcmp(d->d_name, "..") != 0)
+					{
+						is_empty = false;
+						break;
+					}
+				}
+
+				if (errno)
+				{
+					pg_log_error("could not read raw-fpi directory \"%s\": %m",
+								 config.save_fpw_path);
+					goto bad_argument;
+				}
+
+				if (closedir(dir))
+				{
+					pg_log_error("could not close raw-fpi directory \"%s\": %m",
+								 config.save_fpw_path);
+					goto bad_argument;
+				}
+			}
+		}
+		if (!is_empty && mkdir(config.save_fpw_path, 0700) < 0)
+		{
+			pg_fatal("could not create raw-fpi output directory \"%s\": %m",
+					 config.save_fpw_path);
+
+			goto bad_argument;
+		}
+	}
+
 	/* parse files as start/end boundaries, extract path if not specified */
 	if (optind < argc)
 	{
@@ -1150,6 +1275,11 @@ main(int argc, char **argv)
 				XLogRecStoreStats(&stats, xlogreader_state);
 				stats.endptr = xlogreader_state->EndRecPtr;
 			}
+			else if (config.save_fpw)
+			{
+				if (XLogRecordHasFPW(xlogreader_state))
+					XLogRecordSaveFPWs(xlogreader_state, config.save_fpw_path);
+			}
 			else
 				XLogDumpDisplayRecord(&config, xlogreader_state);
 		}
-- 
2.32.0 (Apple Git-132)

