I rerun my little testing tool that compares buffer page contents after every modification, in master and in WAL replay. Previously discussed here: http://www.postgresql.org/message-id/5357b582.7060...@vmware.com. Here's an updated version of my original hack, for current git master. (Michael posted less-hacky versions of that, but unfortunately I haven't gotten around to review his stuff.)

I did not find any new bugs. There were a couple of false positives however. Firstly, the post-processing tool needed to be taught that BRIN pages can have the PD_HAS_FREE_LINES flag set, and ignore that (like it does for heap and other indexam pages).

Another issue was with the new speculative insertions. Replaying a speculative insertion record sets the tuple's CTID to point to itself, like in a regular insertion. But in the original system, the CTID is set to a special speculative insertion token. The tool flagged up that difference.

I propose the attached patch (mark-speculative-insertions-in-replay.patch) to fix that in the replay routine. This is not required for correctness, but helps this tool, and seems like a good idea for debugging purposes anyway.

Any objections?

- Heikki
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -7776,8 +7776,27 @@ heap_xlog_insert(XLogReaderState *record)
 	XLogRedoAction action;
 
 	XLogRecGetBlockTag(record, 0, &target_node, NULL, &blkno);
-	ItemPointerSetBlockNumber(&target_tid, blkno);
-	ItemPointerSetOffsetNumber(&target_tid, xlrec->offnum);
+
+	if (xlrec->flags & XLH_INSERT_IS_SPECULATIVE)
+	{
+		/*
+		 * In a speculative insertion, the 'token' field is set to a token
+		 * used to arbitrate between concurrent insertions. The token is not
+		 * included in the WAL record, so we just set it to zero here. (There
+		 * can be no competing updates during recovery, so we don't need the
+		 * token.) It will be overwritten by a later XLOG_HEAP_CONFIRM or
+		 * XLOG_HEAP_DELETE record, or the transaction will abort, so the
+		 * value we store here doesn't matter, but it's nice to set it to
+		 * something that shows that this was a speculative insertion, for
+		 * debugging purposes.
+		 */
+		ItemPointerSet(&target_tid, 0, SpecTokenOffsetNumber);
+	}
+	else
+	{
+		/* In a normal insertion, point ctid to the tuple itself */
+		ItemPointerSet(&target_tid, blkno, xlrec->offnum);
+	}
 
 	/*
 	 * The visibility map may need to be fixed even if the heap page is
>From 28b7382958272e399f5f611bf0d399e6778c9a59 Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakan...@iki.fi>
Date: Mon, 7 Apr 2014 14:51:45 +0300
Subject: [PATCH 1/1] Write buffer image before and after every WAL record.


diff --git a/contrib/page_image_logging/Makefile b/contrib/page_image_logging/Makefile
new file mode 100644
index 0000000..9c68bbc
--- /dev/null
+++ b/contrib/page_image_logging/Makefile
@@ -0,0 +1,20 @@
+# contrib/page_image_logging/Makefile
+
+PGFILEDESC = "postprocess-images - "
+
+PROGRAM = postprocess-images
+OBJS	= postprocess-images.o
+
+PG_CPPFLAGS = -I$(libpq_srcdir)
+PG_LIBS = $(libpq_pgport)
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = contrib/postprocess-images
+top_builddir = ../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/contrib/page_image_logging/README b/contrib/page_image_logging/README
new file mode 100644
index 0000000..e2f8225
--- /dev/null
+++ b/contrib/page_image_logging/README
@@ -0,0 +1,50 @@
+Usage
+-----
+
+1. Apply the patch
+
+2. Set up a master and standby.
+
+3. stop master, then standby.
+
+4. Remove $PGDATA/buffer-images from both servers.
+
+5. Start master and standby
+
+6. Run "make installcheck", or whatever you want to test
+
+7. Stop master, then standby
+
+8. compare the logged page images using the postprocessing tool:
+
+./postprocess-images ~/data-master/buffer-images ~/data-standby/buffer-images  > differences
+
+9. The 'differences' file should be empty. If not, investigate.
+
+Tips
+----
+
+The page images take up a lot of disk space! The PostgreSQL regression
+suite generates about 11GB - double that when the same is generated also
+in a standby.
+
+Always stop the master first, then standby. Otherwise, when you restart
+the standby, it will start WAL replay from the previous checkpoint, and
+log some page images already. Stopping the master creates a checkpoint
+record, avoiding the problem.
+
+If you get errors like this from postprocess-images:
+
+    could not reorder line XXX
+
+It can be caused by an all-zeros page being logged with XLOG HEAP_NEWPAGE
+records. Look at the line in the buffer-image file, see if it's all-zeros.
+This can happen e.g when you change the tablespace of a table. See
+log_newpage() in xlog.c.
+
+You can use pg_xlogdump to see which WAL record a page image corresponds
+to. But beware that the LSN in the page image points to the *end* of the
+WAL record, while the LSN that pg_xlogdump prints is the *beginning* of
+the WAL record. So to find which WAL record a page image corresponds to,
+find the LSN from the page image in pg_xlogdump output, and back off one
+record. (you can't just grep for the line containing the LSN).
diff --git a/contrib/page_image_logging/postprocess-images.c b/contrib/page_image_logging/postprocess-images.c
new file mode 100644
index 0000000..f440cfd
--- /dev/null
+++ b/contrib/page_image_logging/postprocess-images.c
@@ -0,0 +1,624 @@
+#include "postgres_fe.h"
+
+typedef uintptr_t Datum;
+typedef struct MemoryContextData *MemoryContext;
+
+#include "access/gin_private.h"
+#include "access/htup_details.h"
+#include "access/nbtree.h"
+#include "storage/bufpage.h"
+
+#define LINESZ (BLCKSZ*2 + 100)
+
+/* ----------------------------------------------------------------
+ * Masking functions.
+ *
+ * Most pages cannot be compared directly, because some parts of the
+ * page are not expected to be byte-by-byte identical. For example,
+ * hint bits. Our strategy is to normalize all pages by creating a
+ * mask of those bits that are not expected to match.
+ */
+
+/*
+ * Build a mask that covers unused space between pd_lower and pd_upper.
+ */
+static void
+mask_unused_space(char *page, uint8 *mask)
+{
+	int			pd_lower = ((PageHeader) page)->pd_lower;
+	int			pd_upper = ((PageHeader) page)->pd_upper;
+	int			pd_special = ((PageHeader) page)->pd_upper;
+
+	if (pd_lower > pd_upper || pd_special < pd_upper ||
+		pd_lower < SizeOfPageHeaderData || pd_special > BLCKSZ)
+	{
+		fprintf(stderr, "invalid page at %X/%08X\n",
+				((PageHeader) page)->pd_lsn.xlogid,
+				((PageHeader) page)->pd_lsn.xrecoff);
+	}
+
+	memset(mask + pd_lower, 0xFF, pd_upper - pd_lower);
+}
+
+static void
+build_heap_mask(char *page, uint8 *mask)
+{
+	OffsetNumber off;
+	PageHeader mask_phdr = (PageHeader) mask;
+
+	mask_unused_space(page, mask);
+
+	/* Ignore prune_xid (it's like a hint-bit) */
+	mask_phdr->pd_prune_xid = 0xFFFFFFFF;
+
+	/* Ignore PD_PAGE_FULL and PD_HAS_FREE_LINES flags, they are just hints */
+	mask_phdr->pd_flags |= PD_PAGE_FULL | PD_HAS_FREE_LINES;
+
+	/*
+	 * Also mask the all-visible flag.
+	 *
+	 * XXX: It is unfortunate that we have to do this. If the flag is set
+	 * incorrectly, that's serious, and we would like to catch it. If the flag
+	 * is cleared incorrectly, that's serious too. But redo of HEAP_CLEAN
+	 * records don't currently set the flag, even though it is set in the
+	 * master, so we must silence failures that that causes.
+	 */
+	mask_phdr->pd_flags |= PD_ALL_VISIBLE;
+
+	for (off = 1; off <= PageGetMaxOffsetNumber(page); off++)
+	{
+		ItemId		iid = PageGetItemId(page, off);
+		Item		item = PageGetItem(page, iid);
+		char	   *mask_item;
+
+		mask_item = (char *) (mask + ItemIdGetOffset(iid));
+
+		if (ItemIdIsNormal(iid))
+		{
+			HeapTupleHeader mask_htup = (HeapTupleHeader) mask_item;
+
+			/*
+			 * Ignore hint bits and command ID.
+			 */
+			mask_htup->t_infomask =
+				HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID |
+				HEAP_XMAX_COMMITTED | HEAP_XMAX_INVALID;
+			mask_htup->t_infomask |= HEAP_COMBOCID;
+			mask_htup->t_choice.t_heap.t_field3.t_cid = 0xFFFFFFFF;
+
+			/*
+			 * Ignore speculative insertion token on speculatively
+			 * inserted tuples.
+			 */
+
+			if (HeapTupleHeaderIsSpeculative((HeapTupleHeader) item))
+			{
+				mask_htup->t_ctid.ip_blkid.bi_hi = 0xFFFF;
+				mask_htup->t_ctid.ip_blkid.bi_lo = 0xFFFF;
+			}
+		}
+
+		/*
+		 * Ignore any padding bytes after the tuple, when the length of
+		 * the item is not MAXALIGNed.
+		 */
+		if (ItemIdHasStorage(iid))
+		{
+			int		len = ItemIdGetLength(iid);
+			int		padlen = MAXALIGN(len) - len;
+
+			if (padlen > 0)
+				memset(mask_item + len, 0xFF, padlen);
+		}
+	}
+}
+
+static void
+build_spgist_mask(char *page, uint8 *mask)
+{
+	mask_unused_space(page, mask);
+}
+
+static void
+build_gist_mask(char *page, uint8 *mask)
+{
+	mask_unused_space(page, mask);
+}
+
+static void
+build_gin_mask(BlockNumber blkno, char *page, uint8 *mask)
+{
+	/* GIN metapage doesn't use pd_lower/pd_upper. Other page types do. */
+	if (blkno != 0)
+	{
+		/*
+		 * Mask everything on a DELETED page.
+		 */
+		if (GinPageIsDeleted(page))
+		{
+			/* page content, between standard page header and opaque struct */
+			memset(mask + SizeOfPageHeaderData, 0xFF, BLCKSZ - MAXALIGN(sizeof(GinPageOpaqueData)));
+
+			/* pd_lower and upper */
+			memset(&((PageHeader) mask)->pd_lower, 0xFF, sizeof(uint16));
+			memset(&((PageHeader) mask)->pd_upper, 0xFF, sizeof(uint16));
+		}
+		else
+			mask_unused_space(page, mask);
+	}
+}
+
+static void
+build_brin_mask(char *page, uint8 *mask)
+{
+	OffsetNumber off;
+	PageHeader mask_phdr = (PageHeader) mask;
+
+	mask_unused_space(page, mask);
+
+	/* Ignore PD_HAS_FREE_LINES flag, it is just a hint */
+	mask_phdr->pd_flags |= PD_HAS_FREE_LINES;
+}
+
+static void
+build_sequence_mask(char *page, uint8 *mask)
+{
+	/*
+	 * FIXME: currently, we just ignore sequence records altogether. nextval
+	 * records a different value in the WAL record than it writes to the
+	 * buffer. Ideally we would only mask out the value in the tuple.
+	 */
+	memset(mask, 0xFF, BLCKSZ);
+}
+
+static void
+build_btree_mask(char *page, uint8 *mask)
+{
+	OffsetNumber off;
+	OffsetNumber maxoff;
+
+	mask_unused_space(page, mask);
+
+	/*
+	 * Mask everything on a DELETED page.
+	 */
+	if (((BTPageOpaque) PageGetSpecialPointer(page))->btpo_flags & BTP_DELETED)
+	{
+		/* page content, between standard page header and opaque struct */
+		memset(mask + SizeOfPageHeaderData, 0xFF, BLCKSZ - MAXALIGN(sizeof(BTPageOpaqueData)));
+
+		/* pd_lower and upper */
+		memset(&((PageHeader) mask)->pd_lower, 0xFF, sizeof(uint16));
+		memset(&((PageHeader) mask)->pd_upper, 0xFF, sizeof(uint16));
+	}
+	else
+	{
+		/* Mask DEAD line pointer bits */
+		maxoff = PageGetMaxOffsetNumber(page);
+		for (off = 1; off <= maxoff; off++)
+		{
+			ItemId iid = PageGetItemId(page, off);
+			ItemIdData m;
+
+			if (ItemIdIsDead(iid))
+			{
+				memset(&m, 0, sizeof(ItemIdData));
+				m.lp_flags = 2;
+
+				memcpy((char *) mask + (((char *) iid) - page), &m, sizeof(ItemIdData));
+			}
+		}
+	}
+
+	/* Mask BTP_HAS_GARBAGE flag and cycle-ID in the opaque struct */
+	{
+		BTPageOpaque maskopaq = (BTPageOpaque) (((char *) mask) + ((PageHeader) page)->pd_special);
+
+		maskopaq->btpo_flags |= BTP_HAS_GARBAGE | BTP_SPLIT_END;
+
+		maskopaq->btpo_cycleid = 0xFFFF;
+	}
+}
+
+static inline unsigned char
+parsehex(char digit, bool *success)
+{
+	if (digit >= '0' && digit <= '9')
+	{
+		*success = true;
+		return (unsigned char) (digit - '0');
+	}
+	if (digit >= 'a' && digit <= 'f')
+	{
+		*success = true;
+		return (unsigned char) (digit - 'a' + 10);
+	}
+	if (digit >= 'A' && digit <= 'F')
+	{
+		*success = true;
+		return (unsigned char) (digit - 'A' + 10);
+	}
+	*success = false;
+	return 0;
+}
+
+static inline void
+tohex(uint8 byte, char *out)
+{
+	const char *digits = "0123456789ABCDEF";
+
+	out[0] = digits[byte >> 4];
+	out[1] = digits[byte & 0x0F];
+}
+
+/*
+ * Mask any known changing parts, like hint bits, from the line. The line
+ * is modified in place. Full nibbles to be ignored are set to 'X' in the
+ * hex output, and individiual bits are set to 0.
+ */
+static void
+maskline(char *line)
+{
+	char		page[BLCKSZ];
+	uint8		mask[BLCKSZ];
+	int			i;
+	uint16		tail;
+	char	   *pstart;
+	char	   *p;
+	BlockNumber blkno;
+
+	/* Parse the line */
+	p = strstr(line, " blk: ");
+	if (p == NULL)
+		return;
+
+	sscanf(p, " blk: %u", &blkno);
+
+	pstart = strstr(line, "after: ");
+	if (pstart == NULL)
+		return;
+	pstart += strlen("after: ");
+
+	/* Decode the hex-encoded page back to raw bytes */
+	p = pstart;
+	for (i = 0; i < BLCKSZ; i++)
+	{
+		bool		success;
+		unsigned char c;
+
+		c = parsehex(*(p++), &success) << 4;
+		if (!success)
+			return;
+		c |= parsehex(*(p++), &success);
+		if (!success)
+			return;
+
+		page[i] = (char) c;
+	}
+
+	/*
+	 * Ok, we now have the original block contents in 'page'. Look at the
+	 * size of the special area, and the last two bytes in it, to detect
+	 * what kind of a page it is. Call the appropriate masking function.
+	 */
+
+	/* begin with an empty mask */
+	memset(mask, 0, BLCKSZ);
+
+	memcpy(&tail, &page[BLCKSZ - 2], 2);
+
+	/* Try to detect what kind of a page it is */
+	if (PageGetSpecialSize(page) == 0)
+	{
+		build_heap_mask(page, mask);
+	}
+	else if (PageGetSpecialSize(page) == 16)
+	{
+		if (tail == 0xFF81)
+			build_gist_mask(page, mask);
+		else if (tail <= 0xFF7F)
+			build_btree_mask(page, mask);
+	}
+	else if (PageGetSpecialSize(page) == 8)
+	{
+		if (tail == 0xFF82)
+			build_spgist_mask(page, mask);
+		else if (tail == 0xF093)
+			build_brin_mask(page, mask);
+		else if (*((uint32 *) (page + BLCKSZ - MAXALIGN(sizeof(uint32)))) == 0x1717)
+			build_sequence_mask(page, mask);
+		else
+			build_gin_mask(blkno, page, mask);
+	}
+
+	/* Apply the mask, replacing masked nibbles with # */
+	for (i = 0; i < BLCKSZ; i++)
+	{
+		uint8 c;
+
+		if (mask[i] == 0)
+			continue;
+
+		c = ((uint8) page[i]) & ~mask[i];
+
+		tohex(c, &pstart[2 * i]);
+
+		if ((mask[i] & 0xF0) == 0xF0)
+			pstart[2 * i] = '#';
+		if ((mask[i] & 0x0F) == 0x0F)
+			pstart[2 * i + 1] = '#';
+	}
+}
+
+
+
+/* ----------------------------------------------------------------
+ * Line reordering
+ *
+ * When the page images are logged in master and standby, they are
+ * not necessarily written out in the same order. For example, if a
+ * single WAL-logged operation modifies multiple pages, like an index
+ * page split, the standby might release the locks in different order
+ * than the master. Another cause is concurrent operations; writing
+ * the page images is not atomic with WAL insertion, so if two
+ * backends are running concurrently, their modifications in the
+ * image log can be interleaved in different order than their WAL
+ * records.
+ *
+ * To fix that, the lines are read into a reorder buffer, and sorted
+ * there. Sorting the whole file would be overkill, as the lines are
+ * mostly in order already. The fixed-size reorder buffer works as
+ * long as the lines are not out-of-order by more than
+ * REORDER_BUFFER_SIZE lines.
+ */
+
+#define REORDER_BUFFER_SIZE 1000
+
+typedef struct
+{
+	char	    *lines[REORDER_BUFFER_SIZE];
+	int 		nlines;		/* number of lines currently in buffer */
+
+	FILE	   *fp;
+	int			lineno;		/* current input line number (for debugging) */
+	bool		eof;		/* have we reached EOF from this source? */
+} reorder_buffer;
+
+/*
+ * Read lines from the file into the reorder buffer, until the buffer is full.
+ */
+static void
+fill_reorder_buffer(reorder_buffer *buf)
+{
+	if (buf->eof)
+		return;
+
+	while (buf->nlines < REORDER_BUFFER_SIZE)
+	{
+		char *linebuf = pg_malloc(LINESZ);
+
+		if (fgets(linebuf, LINESZ, buf->fp) == NULL)
+		{
+			buf->eof = true;
+			pg_free(linebuf);
+			break;
+		}
+		buf->lineno++;
+
+		/* common case: the new line goes to the end */
+		if (buf->nlines == 0 ||
+			strcmp(linebuf, buf->lines[buf->nlines - 1]) >= 0)
+		{
+			buf->lines[buf->nlines] = linebuf;
+		}
+		else
+		{
+			/* find the right place in the queue */
+			int			i;
+
+			for (i = buf->nlines - 2; i >= 0; i--)
+			{
+				if (strcmp(linebuf, buf->lines[i]) >= 0)
+					break;
+			}
+			if (i < 0)
+			{
+				fprintf(stderr, "could not reorder line %d\n", buf->lineno);
+				pg_free(linebuf);
+				continue;
+			}
+			i++;
+			memmove(&buf->lines[i + 1], &buf->lines[i],
+					(buf->nlines - i) * sizeof(char *));
+			buf->lines[i] = linebuf;
+		}
+		buf->nlines++;
+	}
+}
+
+static reorder_buffer *
+init_reorder_buffer(FILE *fp)
+{
+	reorder_buffer *buf;
+
+	buf = pg_malloc(sizeof(reorder_buffer));
+	buf->fp = fp;
+	buf->eof = false;
+	buf->lineno = 0;
+	buf->nlines = 0;
+
+	fill_reorder_buffer(buf);
+
+	return buf;
+}
+
+/*
+ * Read all the lines that belong to the next WAL record from the reorder
+ * buffer.
+ */
+static int
+readrecord(reorder_buffer *buf, char **lines, uint64 *lsn)
+{
+	int			nlines;
+	uint32		line_xlogid;
+	uint32		line_xrecoff;
+	uint64		line_lsn;
+	uint64		rec_lsn;
+
+	/* Get all the lines with the same LSN */
+	for (nlines = 0; nlines < buf->nlines; nlines++)
+	{
+		sscanf(buf->lines[nlines], "LSN: %X/%08X", &line_xlogid, &line_xrecoff);
+		line_lsn = ((uint64) line_xlogid) << 32 | (uint64) line_xrecoff;
+
+		if (nlines == 0)
+			*lsn = rec_lsn = line_lsn;
+		else
+		{
+			if (line_lsn != rec_lsn)
+				break;
+		}
+	}
+	if (nlines == buf->nlines)
+	{
+		if (!buf->eof)
+		{
+			fprintf(stderr, "max number of lines in record reached, LSN: %X/%08X\n",
+					line_xlogid, line_xrecoff);
+			exit(1);
+		}
+	}
+
+	/* consume the lines from the reorder buffer */
+	memcpy(lines, buf->lines, sizeof(char *) * nlines);
+	memmove(&buf->lines[0], &buf->lines[nlines],
+			sizeof(char *) * (buf->nlines - nlines));
+	buf->nlines -= nlines;
+
+	fill_reorder_buffer(buf);
+
+	return nlines;
+}
+
+static void
+freerecord(char **lines, int nlines)
+{
+	int			i;
+
+	for (i = 0; i < nlines; i++)
+		pg_free(lines[i]);
+}
+
+static void
+printrecord(char **lines, int nlines)
+{
+	int			i;
+
+	for (i = 0; i < nlines; i++)
+		printf("%s", lines[i]);
+}
+
+static bool
+diffrecords(char **lines_a, int nlines_a, char **lines_b, int nlines_b)
+{
+	int i;
+
+	if (nlines_a != nlines_b)
+		return true;
+	for (i = 0; i < nlines_a; i++)
+	{
+		/* First try a straight byte-per-byte comparison. */
+		if (strcmp(lines_a[i], lines_b[i]) != 0)
+		{
+			/* They were not byte-per-byte identical. Try again after masking */
+			maskline(lines_a[i]);
+			maskline(lines_b[i]);
+			if (strcmp(lines_a[i], lines_b[i]) != 0)
+				return true;
+		}
+	}
+
+	return false;
+}
+
+static void
+usage(void)
+{
+	printf("usage: postprocess-images <master's image file> <standby's image file>\n");
+}
+
+int
+main(int argc, char **argv)
+{
+	char	   *lines_a[REORDER_BUFFER_SIZE];
+	int			nlines_a;
+	char	   *lines_b[REORDER_BUFFER_SIZE];
+	int			nlines_b;
+	FILE	   *fp_a;
+	FILE	   *fp_b;
+	uint64		lsn_a;
+	uint64		lsn_b;
+	reorder_buffer *buf_a;
+	reorder_buffer *buf_b;
+
+	if (argc != 3)
+	{
+		usage();
+		exit(1);
+	}
+
+	fp_a = fopen(argv[1], "rb");
+	if (fp_a == NULL)
+	{
+		fprintf(stderr, "could not open file \"%s\"\n", argv[1]);
+		exit(1);
+	}
+	fp_b = fopen(argv[2], "rb");
+	if (fp_b == NULL)
+	{
+		fprintf(stderr, "could not open file \"%s\"\n", argv[2]);
+		exit(1);
+	}
+
+	buf_a = init_reorder_buffer(fp_a);
+	buf_b = init_reorder_buffer(fp_b);
+
+	/* read first record from both */
+	nlines_a = readrecord(buf_a, lines_a, &lsn_a);
+	nlines_b = readrecord(buf_b, lines_b, &lsn_b);
+
+	while (nlines_a > 0 || nlines_b > 0)
+	{
+		/* compare the records */
+		if (lsn_a < lsn_b || nlines_b == 0)
+		{
+			printf("Only in A:\n");
+			printrecord(lines_a, nlines_a);
+			freerecord(lines_a, nlines_a);
+			nlines_a = readrecord(buf_a, lines_a, &lsn_a);
+		}
+		else if (lsn_a > lsn_b || nlines_a == 0)
+		{
+			printf("Only in B:\n");
+			printrecord(lines_b, nlines_b);
+			freerecord(lines_b, nlines_b);
+			nlines_b = readrecord(buf_b, lines_b, &lsn_b);
+		}
+		else if (lsn_a == lsn_b)
+		{
+			if (diffrecords(lines_a, nlines_a, lines_b, nlines_b))
+			{
+				printf("lines differ, A:\n");
+				printrecord(lines_a, nlines_a);
+				printf("B:\n");
+				printrecord(lines_b, nlines_b);
+			}
+			freerecord(lines_a, nlines_a);
+			freerecord(lines_b, nlines_b);
+			nlines_a = readrecord(buf_a, lines_a, &lsn_a);
+			nlines_b = readrecord(buf_b, lines_b, &lsn_b);
+		}
+	}
+
+	return 0;
+}
diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c
index abd8420..fa82f30 100644
--- a/src/backend/access/transam/xloginsert.c
+++ b/src/backend/access/transam/xloginsert.c
@@ -957,6 +957,12 @@ log_newpage(RelFileNode *rnode, ForkNumber forkNum, BlockNumber blkno,
 		PageSetLSN(page, recptr);
 	}
 
+	/*
+	 * the normal mechanism that hooks into LockBuffer doesn't work for this,
+	 * because we're bypassing buffer manager.
+	 */
+	log_page_change(page, rnode, forkNum, blkno);
+
 	return recptr;
 }
 
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index cd3aaad..b06973f 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -414,6 +414,154 @@ static void AtProcExit_Buffers(int code, Datum arg);
 static void CheckForBufferLeaks(void);
 static int	rnode_comparator(const void *p1, const void *p2);
 
+/* Support for capturing changes to pages */
+typedef struct
+{
+	Buffer		buffer;
+	char		content[BLCKSZ];
+} BufferImage;
+
+#define MAX_BEFORE_IMAGES		100
+static BufferImage *before_images[MAX_BEFORE_IMAGES];
+int		   before_images_cnt = 0;
+
+static FILE *imagefp;
+
+static bool
+log_before_after_images(char *msg, BufferImage *img)
+{
+	Page		newcontent = BufferGetPage(img->buffer);
+	Page		oldcontent = (Page) img->content;
+	XLogRecPtr	oldlsn;
+	XLogRecPtr	newlsn;
+	RelFileNode rnode;
+	ForkNumber	forknum;
+	BlockNumber	blkno;
+
+	oldlsn = PageGetLSN(oldcontent);
+	newlsn = PageGetLSN(newcontent);
+
+	if (oldlsn == newlsn)
+	{
+		/* no change */
+		return false;
+	}
+
+	BufferGetTag(img->buffer, &rnode, &forknum, &blkno);
+
+	log_page_change(newcontent, &rnode, forknum, blkno);
+
+	return true;
+}
+
+void
+log_page_change(char *newcontent, RelFileNode *rnode, int forknum, uint32 blkno)
+{
+	XLogRecPtr newlsn = PageGetLSN((Page) newcontent);
+	int			i;
+
+	/*
+	 * We need a lock to make sure that only one backend writes to the file
+	 * at a time. Abuse SyncScanLock for that - it happens to never be used
+	 * while a buffer is locked/unlocked.
+	 */
+	LWLockAcquire(SyncScanLock, LW_EXCLUSIVE);
+
+	fprintf(imagefp, "LSN: %X/%08X, rel: %u/%u/%u, blk: %u; ",
+			(uint32) (newlsn >> 32), (uint32) newlsn,
+			rnode->spcNode, rnode->dbNode, rnode->relNode,
+			blkno);
+	if (forknum != MAIN_FORKNUM)
+		fprintf(imagefp, "forknum: %u; ", forknum);
+
+	/* write the page contents, in hex */
+	{
+		char		buf[BLCKSZ * 2];
+		int			j = 0;
+
+		fprintf(imagefp, "after: ");
+		for (i = 0; i < BLCKSZ; i++)
+		{
+			const char *digits = "0123456789ABCDEF";
+			uint8		byte = (uint8) newcontent[i];
+
+			buf[j++] = digits[byte >> 4];
+			buf[j++] = digits[byte & 0x0F];
+		}
+		fwrite(buf, BLCKSZ * 2, 1, imagefp);
+	}
+
+	fprintf(imagefp, "\n");
+	fflush(imagefp);
+
+	LWLockRelease(SyncScanLock);
+}
+
+static void
+remember_before_image(Buffer buffer)
+{
+	BufferImage *img;
+
+	Assert(before_images_cnt < MAX_BEFORE_IMAGES);
+
+	img = before_images[before_images_cnt];
+	img->buffer = buffer;
+	memcpy (img->content, BufferGetPage(buffer), BLCKSZ);
+	before_images_cnt++;
+}
+
+/*
+ * Forget a buffer image. If the page was modified, log the new contents.
+ */
+static void
+forget_image(Buffer buffer)
+{
+	int			i;
+
+	for (i = 0; i < before_images_cnt; i++)
+	{
+		BufferImage *img = before_images[i];
+
+		if (img->buffer == buffer)
+		{
+
+			log_before_after_images("forget", before_images[i]);
+			if (i != before_images_cnt)
+			{
+				/* swap the last still-used slot with this one */
+				before_images[i] = before_images[before_images_cnt - 1];
+				before_images[before_images_cnt - 1] = img;
+			}
+			before_images_cnt--;
+
+			return;
+		}
+	}
+	elog(LOG, "could not find image for buffer %u", buffer);
+}
+
+/*
+ * See if any of the buffers we've memorized have changed.
+ */
+void
+log_page_changes(char *msg)
+{
+	int i;
+
+	for (i = 0; i < before_images_cnt; i++)
+	{
+		BufferImage *img = before_images[i];
+
+		/*
+		 * Print the contents of the page if it was changed. Remember the
+		 * new contents as the current image.
+		 */
+		if (log_before_after_images(msg, img))
+		{
+			memcpy(img->content, BufferGetPage(img->buffer), BLCKSZ);
+		}
+	}
+}
 
 /*
  * PrefetchBuffer -- initiate asynchronous read of a block of a relation
@@ -677,7 +825,10 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
 			if (!isLocalBuf)
 			{
 				if (mode == RBM_ZERO_AND_LOCK)
+				{
 					LWLockAcquire(bufHdr->content_lock, LW_EXCLUSIVE);
+					remember_before_image(BufferDescriptorGetBuffer(bufHdr));
+				}
 				else if (mode == RBM_ZERO_AND_CLEANUP_LOCK)
 					LockBufferForCleanup(BufferDescriptorGetBuffer(bufHdr));
 			}
@@ -819,6 +970,7 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
 		!isLocalBuf)
 	{
 		LWLockAcquire(bufHdr->content_lock, LW_EXCLUSIVE);
+		remember_before_image(BufferDescriptorGetBuffer(bufHdr));
 	}
 
 	if (isLocalBuf)
@@ -2077,6 +2229,10 @@ AtEOXact_Buffers(bool isCommit)
 {
 	CheckForBufferLeaks();
 
+	if (before_images_cnt > 0)
+		elog(LOG, "released all page-images (AtEOXact_Buffers)");
+	before_images_cnt = 0;
+
 	AtEOXact_LocalBuffers(isCommit);
 
 	Assert(PrivateRefCountOverflowed == 0);
@@ -2121,7 +2277,18 @@ InitBufferPoolAccess(void)
 void
 InitBufferPoolBackend(void)
 {
+	int			i;
+	BufferImage *images;
+
 	on_shmem_exit(AtProcExit_Buffers, 0);
+
+	/* Initialize page image capturing */
+	images = palloc(MAX_BEFORE_IMAGES * sizeof(BufferImage));
+
+	for (i = 0; i < MAX_BEFORE_IMAGES; i++)
+		before_images[i] = &images[i];
+
+	imagefp = fopen("buffer-images", "ab");
 }
 
 /*
@@ -3107,7 +3274,30 @@ MarkBufferDirtyHint(Buffer buffer, bool buffer_std)
 			 * sometime later in this checkpoint cycle.
 			 */
 			if (!XLogRecPtrIsInvalid(lsn))
+			{
 				PageSetLSN(page, lsn);
+
+				/*
+				 * the normal mechanism that hooks into LockBuffer doesn't work
+				 * for this, if we're only holding a SHARE lock on the buffer.
+				 */
+				if (!LWLockHeldByMeInExclusiveMode(bufHdr->content_lock))
+				{
+					RelFileNode rnode;
+					ForkNumber forkno;
+					BlockNumber blkno;
+
+					/*
+					 * XXX: we pass the original page to log_page_change,
+					 * ignoring the possibility that the buffer is modified
+					 * concurrently by another hint bit update. That's OK,
+					 * the comparison tool ignores differences in hint bits
+					 * anyway.
+					 */
+					BufferGetTag(buffer, &rnode, &forkno, &blkno);
+					log_page_change(BufferGetPage(buffer), &rnode, forkno, blkno);
+				}
+			}
 		}
 		bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
 		UnlockBufHdr(bufHdr);
@@ -3171,6 +3361,16 @@ LockBuffer(Buffer buffer, int mode)
 
 	buf = GetBufferDescriptor(buffer - 1);
 
+	/*
+	 * If we were holding it in exclusive-mode, log any changes we made to
+	 * it while holding the log in the page-image log.
+	 */
+	if (mode == BUFFER_LOCK_UNLOCK &&
+		LWLockHeldByMeInExclusiveMode(buf->content_lock))
+	{
+		forget_image(buffer);
+	}
+
 	if (mode == BUFFER_LOCK_UNLOCK)
 		LWLockRelease(buf->content_lock);
 	else if (mode == BUFFER_LOCK_SHARE)
@@ -3179,6 +3379,9 @@ LockBuffer(Buffer buffer, int mode)
 		LWLockAcquire(buf->content_lock, LW_EXCLUSIVE);
 	else
 		elog(ERROR, "unrecognized buffer lock mode: %d", mode);
+
+	if (mode == BUFFER_LOCK_EXCLUSIVE)
+		remember_before_image(buffer);
 }
 
 /*
@@ -3190,6 +3393,7 @@ bool
 ConditionalLockBuffer(Buffer buffer)
 {
 	volatile BufferDesc *buf;
+	bool	res;
 
 	Assert(BufferIsValid(buffer));
 	if (BufferIsLocal(buffer))
@@ -3197,7 +3401,11 @@ ConditionalLockBuffer(Buffer buffer)
 
 	buf = GetBufferDescriptor(buffer - 1);
 
-	return LWLockConditionalAcquire(buf->content_lock, LW_EXCLUSIVE);
+	res = LWLockConditionalAcquire(buf->content_lock, LW_EXCLUSIVE);
+	if (res)
+		remember_before_image(buffer);
+
+	return res;
 }
 
 /*
diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c
index 687ed63..a98b35d 100644
--- a/src/backend/storage/lmgr/lwlock.c
+++ b/src/backend/storage/lmgr/lwlock.c
@@ -1629,6 +1629,10 @@ LWLockReleaseClearVar(LWLock *lock, uint64 *valptr, uint64 val)
 void
 LWLockReleaseAll(void)
 {
+	if (before_images_cnt > 0)
+		elog(LOG, "released all page images");
+	before_images_cnt = 0;
+
 	while (num_held_lwlocks > 0)
 	{
 		HOLD_INTERRUPTS();		/* match the upcoming RESUME_INTERRUPTS */
@@ -1656,3 +1660,21 @@ LWLockHeldByMe(LWLock *l)
 	}
 	return false;
 }
+
+bool
+LWLockHeldByMeInExclusiveMode(LWLock *l)
+{
+	int			i;
+
+	for (i = 0; i < num_held_lwlocks; i++)
+	{
+		if (held_lwlocks[i].lock == l)
+		{
+			if (held_lwlocks[i].mode == LW_EXCLUSIVE)
+				return true;
+			else
+				return false;
+		}
+	}
+	return false;
+}
diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h
index e0cc69f..50692ea 100644
--- a/src/include/miscadmin.h
+++ b/src/include/miscadmin.h
@@ -91,6 +91,14 @@ extern PGDLLIMPORT volatile uint32 CritSectionCount;
 /* in tcop/postgres.c */
 extern void ProcessInterrupts(void);
 
+/* in bufmgr.c, related to capturing page images */
+extern void log_page_changes(char *msg);
+struct RelFileNode;
+extern void log_page_change(char *newcontent, struct RelFileNode *rnode, int forknum, uint32 blkno);
+
+extern int before_images_cnt;
+
+
 #ifndef WIN32
 
 #define CHECK_FOR_INTERRUPTS() \
@@ -132,6 +140,8 @@ do { \
 do { \
 	Assert(CritSectionCount > 0); \
 	CritSectionCount--; \
+	if (CritSectionCount == 0) \
+		log_page_changes("END_CRIT_SECTION");		\
 } while(0)
 
 
diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h
index f2ff6a0..6e9ad48 100644
--- a/src/include/storage/lwlock.h
+++ b/src/include/storage/lwlock.h
@@ -189,6 +189,7 @@ extern void LWLockRelease(LWLock *lock);
 extern void LWLockReleaseClearVar(LWLock *lock, uint64 *valptr, uint64 val);
 extern void LWLockReleaseAll(void);
 extern bool LWLockHeldByMe(LWLock *lock);
+extern bool LWLockHeldByMeInExclusiveMode(LWLock *lock);
 
 extern bool LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval);
 extern void LWLockUpdateVar(LWLock *lock, uint64 *valptr, uint64 value);
-- 
2.1.4

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to