On Tue, Aug 08, 2023 at 05:44:03PM +0900, Kyotaro Horiguchi wrote:
> I like the overall direction. Though, I'm considering enclosing the
> errormsg and errorcode in a struct.

Yes, this suggestion makes sense as it simplifies all the WAL routines
that need to report back a complete error state, and there are four of
them now:
XLogPrefetcherReadRecord()
XLogReadRecord()
XLogNextRecord()
DecodeXLogRecord()

I have spent more time on 0001, polishing it and fixing a few bugs
that I have found while reviewing the whole.  Most of them were
related to mistakes in resetting the error state when expected.  I
have also expanded DecodeXLogRecord() to use an error structure
instead of only an errmsg, giving more consistency.  The error state
now relies on two structures:
+typedef enum XLogReaderErrorCode
+{
+   XLOG_READER_NONE = 0,
+   XLOG_READER_OOM,            /* out-of-memory */
+   XLOG_READER_INVALID_DATA,   /* record data */
+} XLogReaderErrorCode;
+typedef struct XLogReaderError
+{
+   /* Buffer to hold error message */
+   char       *message;
+   bool        message_deferred;
+   /* Error code when filling *message */
+   XLogReaderErrorCode code;
+} XLogReaderError;

I'm kind of happy with this layer, now.

I have also spent some time on finding a more elegant solution for the
WAL replay, relying on the new facility from 0001.  And it happens
that it is easy enough to loop if facing an out-of-memory failure when
reading a record when we are in crash recovery, as the state is
actually close to what a standby does.  The trick is that we should
not change the state and avoid tracking a continuation record.  This
is done in 0002, making replay more robust.  With the addition of the
error injection tweak in 0003, I am able to finish recovery while the
startup process loops if under memory pressure.  As mentioned
previously, there are more code paths to consider, but that's a start
to fix the data loss problems.

Comments are welcome.
--
Michael
From 9da3751ef46be90cf7a5d4050c5cb281c33dbf70 Mon Sep 17 00:00:00 2001
From: Michael Paquier <mich...@paquier.xyz>
Date: Wed, 9 Aug 2023 13:47:30 +0900
Subject: [PATCH v2 1/3] Add infrastructure to report error codes in WAL reader

This commits moves the error state coming from WAL readers into a new
structure, that includes the existing pointer to the error message
buffer, but it also gains an error code that fed back to the callers of
the following routines:
XLogPrefetcherReadRecord()
XLogReadRecord()
XLogNextRecord()
DecodeXLogRecord()

This will help in improving the decisions to take during recovery
depending on the failure more reported.
---
 src/include/access/xlogprefetcher.h           |   2 +-
 src/include/access/xlogreader.h               |  33 +++-
 src/backend/access/transam/twophase.c         |   8 +-
 src/backend/access/transam/xlog.c             |   6 +-
 src/backend/access/transam/xlogprefetcher.c   |   4 +-
 src/backend/access/transam/xlogreader.c       | 167 ++++++++++++------
 src/backend/access/transam/xlogrecovery.c     |  14 +-
 src/backend/access/transam/xlogutils.c        |   2 +-
 src/backend/replication/logical/logical.c     |   9 +-
 .../replication/logical/logicalfuncs.c        |   9 +-
 src/backend/replication/slotfuncs.c           |   8 +-
 src/backend/replication/walsender.c           |   8 +-
 src/bin/pg_rewind/parsexlog.c                 |  24 +--
 src/bin/pg_waldump/pg_waldump.c               |  10 +-
 contrib/pg_walinspect/pg_walinspect.c         |  11 +-
 src/tools/pgindent/typedefs.list              |   2 +
 16 files changed, 200 insertions(+), 117 deletions(-)

diff --git a/src/include/access/xlogprefetcher.h b/src/include/access/xlogprefetcher.h
index 7dd7f20ad0..5563ad1a67 100644
--- a/src/include/access/xlogprefetcher.h
+++ b/src/include/access/xlogprefetcher.h
@@ -48,7 +48,7 @@ extern void XLogPrefetcherBeginRead(XLogPrefetcher *prefetcher,
 									XLogRecPtr recPtr);
 
 extern XLogRecord *XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher,
-											char **errmsg);
+											XLogReaderError *errordata);
 
 extern void XLogPrefetcherComputeStats(XLogPrefetcher *prefetcher);
 
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index da32c7db77..2b57b5eb01 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -58,6 +58,25 @@ typedef struct WALSegmentContext
 
 typedef struct XLogReaderState XLogReaderState;
 
+/* Values for XLogReaderError.errorcode */
+typedef enum XLogReaderErrorCode
+{
+	XLOG_READER_NONE = 0,
+	XLOG_READER_OOM,			/* out-of-memory */
+	XLOG_READER_INVALID_DATA,	/* record data */
+} XLogReaderErrorCode;
+
+/* Error status generated by a WAL reader on failure */
+typedef struct XLogReaderError
+{
+	/* Buffer to hold error message */
+	char	   *message;
+	bool		message_deferred;
+	/* Error code when filling *message */
+	XLogReaderErrorCode code;
+} XLogReaderError;
+
+
 /* Function type definitions for various xlogreader interactions */
 typedef int (*XLogPageReadCB) (XLogReaderState *xlogreader,
 							   XLogRecPtr targetPagePtr,
@@ -307,9 +326,8 @@ struct XLogReaderState
 	char	   *readRecordBuf;
 	uint32		readRecordBufSize;
 
-	/* Buffer to hold error message */
-	char	   *errormsg_buf;
-	bool		errormsg_deferred;
+	/* Error state data */
+	XLogReaderError errordata;
 
 	/*
 	 * Flag to indicate to XLogPageReadCB that it should not block waiting for
@@ -324,7 +342,8 @@ struct XLogReaderState
 static inline bool
 XLogReaderHasQueuedRecordOrError(XLogReaderState *state)
 {
-	return (state->decode_queue_head != NULL) || state->errormsg_deferred;
+	return (state->decode_queue_head != NULL) ||
+		state->errordata.message_deferred;
 }
 
 /* Get a new XLogReader */
@@ -355,11 +374,11 @@ typedef enum XLogPageReadResult
 
 /* Read the next XLog record. Returns NULL on end-of-WAL or failure */
 extern struct XLogRecord *XLogReadRecord(XLogReaderState *state,
-										 char **errormsg);
+										 XLogReaderError *errordata);
 
 /* Consume the next record or error. */
 extern DecodedXLogRecord *XLogNextRecord(XLogReaderState *state,
-										 char **errormsg);
+										 XLogReaderError *errordata);
 
 /* Release the previously returned record, if necessary. */
 extern XLogRecPtr XLogReleasePreviousRecord(XLogReaderState *state);
@@ -399,7 +418,7 @@ extern bool DecodeXLogRecord(XLogReaderState *state,
 							 DecodedXLogRecord *decoded,
 							 XLogRecord *record,
 							 XLogRecPtr lsn,
-							 char **errormsg);
+							 XLogReaderError *errordata);
 
 /*
  * Macros that provide access to parts of the record most recently returned by
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index c6af8cfd7e..08bd6586ec 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -1399,7 +1399,7 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
 {
 	XLogRecord *record;
 	XLogReaderState *xlogreader;
-	char	   *errormsg;
+	XLogReaderError errordata = {0};
 
 	xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
 									XL_ROUTINE(.page_read = &read_local_xlog_page,
@@ -1413,15 +1413,15 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
 				 errdetail("Failed while allocating a WAL reading processor.")));
 
 	XLogBeginRead(xlogreader, lsn);
-	record = XLogReadRecord(xlogreader, &errormsg);
+	record = XLogReadRecord(xlogreader, &errordata);
 
 	if (record == NULL)
 	{
-		if (errormsg)
+		if (errordata.message)
 			ereport(ERROR,
 					(errcode_for_file_access(),
 					 errmsg("could not read two-phase state from WAL at %X/%X: %s",
-							LSN_FORMAT_ARGS(lsn), errormsg)));
+							LSN_FORMAT_ARGS(lsn), errordata.message)));
 		else
 			ereport(ERROR,
 					(errcode_for_file_access(),
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 60c0b7ec3a..d16acd5e49 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -953,7 +953,7 @@ XLogInsertRecord(XLogRecData *rdata,
 		DecodedXLogRecord *decoded;
 		StringInfoData buf;
 		StringInfoData recordBuf;
-		char	   *errormsg = NULL;
+		XLogReaderError	errordata = {0};
 		MemoryContext oldCxt;
 
 		oldCxt = MemoryContextSwitchTo(walDebugCxt);
@@ -987,10 +987,10 @@ XLogInsertRecord(XLogRecData *rdata,
 								   decoded,
 								   record,
 								   EndPos,
-								   &errormsg))
+								   &errordata))
 		{
 			appendStringInfo(&buf, "error decoding record: %s",
-							 errormsg ? errormsg : "no error message");
+							 errordata.message ? errordata.message : "no error message");
 		}
 		else
 		{
diff --git a/src/backend/access/transam/xlogprefetcher.c b/src/backend/access/transam/xlogprefetcher.c
index 539928cb85..92d691ca49 100644
--- a/src/backend/access/transam/xlogprefetcher.c
+++ b/src/backend/access/transam/xlogprefetcher.c
@@ -984,7 +984,7 @@ XLogPrefetcherBeginRead(XLogPrefetcher *prefetcher, XLogRecPtr recPtr)
  * tries to initiate I/O for blocks referenced in future WAL records.
  */
 XLogRecord *
-XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg)
+XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, XLogReaderError *errdata)
 {
 	DecodedXLogRecord *record;
 	XLogRecPtr	replayed_up_to;
@@ -1052,7 +1052,7 @@ XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg)
 	}
 
 	/* Read the next record. */
-	record = XLogNextRecord(prefetcher->reader, errmsg);
+	record = XLogNextRecord(prefetcher->reader, errdata);
 	if (!record)
 		return NULL;
 
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index c9f9f6e98f..891e38e6e7 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -41,8 +41,10 @@
 #include "common/logging.h"
 #endif
 
-static void report_invalid_record(XLogReaderState *state, const char *fmt,...)
-			pg_attribute_printf(2, 3);
+static void report_invalid_record(XLogReaderState *state,
+								  XLogReaderErrorCode errorcode,
+								  const char *fmt,...)
+			pg_attribute_printf(3, 4);
 static bool allocate_recordbuf(XLogReaderState *state, uint32 reclength);
 static int	ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr,
 							 int reqLen);
@@ -66,21 +68,23 @@ static void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
 #define DEFAULT_DECODE_BUFFER_SIZE (64 * 1024)
 
 /*
- * Construct a string in state->errormsg_buf explaining what's wrong with
+ * Construct a string in state->errordata.message explaining what's wrong with
  * the current record being read.
  */
 static void
-report_invalid_record(XLogReaderState *state, const char *fmt,...)
+report_invalid_record(XLogReaderState *state, XLogReaderErrorCode errorcode,
+					  const char *fmt,...)
 {
 	va_list		args;
 
 	fmt = _(fmt);
 
 	va_start(args, fmt);
-	vsnprintf(state->errormsg_buf, MAX_ERRORMSG_LEN, fmt, args);
+	vsnprintf(state->errordata.message, MAX_ERRORMSG_LEN, fmt, args);
 	va_end(args);
 
-	state->errormsg_deferred = true;
+	state->errordata.message_deferred = true;
+	state->errordata.code = errorcode;
 }
 
 /*
@@ -141,15 +145,16 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir,
 	/* system_identifier initialized to zeroes above */
 	state->private_data = private_data;
 	/* ReadRecPtr, EndRecPtr and readLen initialized to zeroes above */
-	state->errormsg_buf = palloc_extended(MAX_ERRORMSG_LEN + 1,
-										  MCXT_ALLOC_NO_OOM);
-	if (!state->errormsg_buf)
+	state->errordata.message = palloc_extended(MAX_ERRORMSG_LEN + 1,
+											   MCXT_ALLOC_NO_OOM);
+	if (!state->errordata.message)
 	{
 		pfree(state->readBuf);
 		pfree(state);
 		return NULL;
 	}
-	state->errormsg_buf[0] = '\0';
+	state->errordata.message[0] = '\0';
+	state->errordata.code = XLOG_READER_NONE;
 
 	/*
 	 * Allocate an initial readRecordBuf of minimal size, which can later be
@@ -157,7 +162,7 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir,
 	 */
 	if (!allocate_recordbuf(state, 0))
 	{
-		pfree(state->errormsg_buf);
+		pfree(state->errordata.message);
 		pfree(state->readBuf);
 		pfree(state);
 		return NULL;
@@ -175,7 +180,7 @@ XLogReaderFree(XLogReaderState *state)
 	if (state->decode_buffer && state->free_decode_buffer)
 		pfree(state->decode_buffer);
 
-	pfree(state->errormsg_buf);
+	pfree(state->errordata.message);
 	if (state->readRecordBuf)
 		pfree(state->readRecordBuf);
 	pfree(state->readBuf);
@@ -351,23 +356,27 @@ XLogReleasePreviousRecord(XLogReaderState *state)
  *
  * On success, a record is returned.
  *
- * The returned record (or *errormsg) points to an internal buffer that's
- * valid until the next call to XLogNextRecord.
+ * The returned record (or errordata->message) points to an internal buffer
+ * that's valid until the next call to XLogNextRecord.
  */
 DecodedXLogRecord *
-XLogNextRecord(XLogReaderState *state, char **errormsg)
+XLogNextRecord(XLogReaderState *state, XLogReaderError *errordata)
 {
 	/* Release the last record returned by XLogNextRecord(). */
 	XLogReleasePreviousRecord(state);
 
 	if (state->decode_queue_head == NULL)
 	{
-		*errormsg = NULL;
-		if (state->errormsg_deferred)
+		errordata->message = NULL;
+		errordata->code = XLOG_READER_NONE;
+		if (state->errordata.message_deferred)
 		{
-			if (state->errormsg_buf[0] != '\0')
-				*errormsg = state->errormsg_buf;
-			state->errormsg_deferred = false;
+			if (state->errordata.message[0] != '\0')
+				errordata->message = state->errordata.message;
+			if (state->errordata.code != XLOG_READER_NONE)
+				errordata->code = state->errordata.code;
+			state->errordata.message_deferred = false;
+			state->errordata.code = XLOG_READER_NONE;
 		}
 
 		/*
@@ -397,7 +406,8 @@ XLogNextRecord(XLogReaderState *state, char **errormsg)
 	state->ReadRecPtr = state->record->lsn;
 	state->EndRecPtr = state->record->next_lsn;
 
-	*errormsg = NULL;
+	errordata->message = NULL;
+	errordata->code = XLOG_READER_NONE;
 
 	return state->record;
 }
@@ -409,17 +419,17 @@ XLogNextRecord(XLogReaderState *state, char **errormsg)
  * to XLogReadRecord().
  *
  * If the page_read callback fails to read the requested data, NULL is
- * returned.  The callback is expected to have reported the error; errormsg
- * is set to NULL.
+ * returned.  The callback is expected to have reported the error;
+ * errordata->message is set to NULL.
  *
  * If the reading fails for some other reason, NULL is also returned, and
- * *errormsg is set to a string with details of the failure.
+ * *errordata is set with details of the failure.
  *
- * The returned pointer (or *errormsg) points to an internal buffer that's
- * valid until the next call to XLogReadRecord.
+ * The returned pointer (or *errordata.message) points to an internal
+ * buffer that's valid until the next call to XLogReadRecord.
  */
 XLogRecord *
-XLogReadRecord(XLogReaderState *state, char **errormsg)
+XLogReadRecord(XLogReaderState *state, XLogReaderError *errordata)
 {
 	DecodedXLogRecord *decoded;
 
@@ -437,7 +447,7 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
 		XLogReadAhead(state, false /* nonblocking */ );
 
 	/* Consume the head record or error. */
-	decoded = XLogNextRecord(state, errormsg);
+	decoded = XLogNextRecord(state, errordata);
 	if (decoded)
 	{
 		/*
@@ -546,7 +556,7 @@ XLogDecodeNextRecord(XLogReaderState *state, bool nonblocking)
 	bool		gotheader;
 	int			readOff;
 	DecodedXLogRecord *decoded;
-	char	   *errormsg;		/* not used */
+	XLogReaderError errordata = {0};		/* not used */
 
 	/*
 	 * randAccess indicates whether to verify the previous-record pointer of
@@ -556,7 +566,8 @@ XLogDecodeNextRecord(XLogReaderState *state, bool nonblocking)
 	randAccess = false;
 
 	/* reset error state */
-	state->errormsg_buf[0] = '\0';
+	state->errordata.message[0] = '\0';
+	state->errordata.code = XLOG_READER_NONE;
 	decoded = NULL;
 
 	state->abortedRecPtr = InvalidXLogRecPtr;
@@ -623,7 +634,9 @@ restart:
 	}
 	else if (targetRecOff < pageHeaderSize)
 	{
-		report_invalid_record(state, "invalid record offset at %X/%X: expected at least %u, got %u",
+		report_invalid_record(state,
+							  XLOG_READER_INVALID_DATA,
+							  "invalid record offset at %X/%X: expected at least %u, got %u",
 							  LSN_FORMAT_ARGS(RecPtr),
 							  pageHeaderSize, targetRecOff);
 		goto err;
@@ -632,7 +645,9 @@ restart:
 	if ((((XLogPageHeader) state->readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) &&
 		targetRecOff == pageHeaderSize)
 	{
-		report_invalid_record(state, "contrecord is requested by %X/%X",
+		report_invalid_record(state,
+							  XLOG_READER_INVALID_DATA,
+							  "contrecord is requested by %X/%X",
 							  LSN_FORMAT_ARGS(RecPtr));
 		goto err;
 	}
@@ -673,6 +688,7 @@ restart:
 		if (total_len < SizeOfXLogRecord)
 		{
 			report_invalid_record(state,
+								  XLOG_READER_INVALID_DATA,
 								  "invalid record length at %X/%X: expected at least %u, got %u",
 								  LSN_FORMAT_ARGS(RecPtr),
 								  (uint32) SizeOfXLogRecord, total_len);
@@ -691,6 +707,7 @@ restart:
 	decoded = XLogReadRecordAlloc(state,
 								  total_len,
 								  !nonblocking /* allow_oversized */ );
+
 	if (decoded == NULL)
 	{
 		/*
@@ -702,6 +719,7 @@ restart:
 
 		/* We failed to allocate memory for an oversized record. */
 		report_invalid_record(state,
+							  XLOG_READER_OOM,
 							  "out of memory while trying to decode a record of length %u", total_len);
 		goto err;
 	}
@@ -724,7 +742,9 @@ restart:
 			!allocate_recordbuf(state, total_len))
 		{
 			/* We treat this as a "bogus data" condition */
-			report_invalid_record(state, "record length %u at %X/%X too long",
+			report_invalid_record(state,
+								  XLOG_READER_OOM,
+								  "record length %u at %X/%X too long",
 								  total_len, LSN_FORMAT_ARGS(RecPtr));
 			goto err;
 		}
@@ -773,6 +793,7 @@ restart:
 			if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD))
 			{
 				report_invalid_record(state,
+									  XLOG_READER_INVALID_DATA,
 									  "there is no contrecord flag at %X/%X",
 									  LSN_FORMAT_ARGS(RecPtr));
 				goto err;
@@ -786,6 +807,7 @@ restart:
 				total_len != (pageHeader->xlp_rem_len + gotlen))
 			{
 				report_invalid_record(state,
+									  XLOG_READER_INVALID_DATA,
 									  "invalid contrecord length %u (expected %lld) at %X/%X",
 									  pageHeader->xlp_rem_len,
 									  ((long long) total_len) - gotlen,
@@ -867,7 +889,7 @@ restart:
 		state->NextRecPtr -= XLogSegmentOffset(state->NextRecPtr, state->segcxt.ws_segsize);
 	}
 
-	if (DecodeXLogRecord(state, decoded, record, RecPtr, &errormsg))
+	if (DecodeXLogRecord(state, decoded, record, RecPtr, &errordata))
 	{
 		/* Record the location of the next record. */
 		decoded->next_lsn = state->NextRecPtr;
@@ -918,7 +940,7 @@ err:
 		 * queued so that XLogPrefetcherReadRecord() doesn't bring us back a
 		 * second time and clobber the above state.
 		 */
-		state->errormsg_deferred = true;
+		state->errordata.message_deferred = true;
 	}
 
 	if (decoded && decoded->oversized)
@@ -931,9 +953,9 @@ err:
 	XLogReaderInvalReadState(state);
 
 	/*
-	 * If an error was written to errmsg_buf, it'll be returned to the caller
-	 * of XLogReadRecord() after all successfully decoded records from the
-	 * read queue.
+	 * If an error was written to errordata.message, it'll be returned to the
+	 * caller of XLogReadRecord() after all successfully decoded records from
+	 * the read queue.
 	 */
 
 	return XLREAD_FAIL;
@@ -952,7 +974,7 @@ XLogReadAhead(XLogReaderState *state, bool nonblocking)
 {
 	XLogPageReadResult result;
 
-	if (state->errormsg_deferred)
+	if (state->errordata.message_deferred)
 		return NULL;
 
 	result = XLogDecodeNextRecord(state, nonblocking);
@@ -970,8 +992,8 @@ XLogReadAhead(XLogReaderState *state, bool nonblocking)
  * via the page_read() callback.
  *
  * Returns XLREAD_FAIL if the required page cannot be read for some
- * reason; errormsg_buf is set in that case (unless the error occurs in the
- * page_read callback).
+ * reason; errordata.message is set in that case (unless the error occurs in
+ * the page_read callback).
  *
  * Returns XLREAD_WOULDBLOCK if the requested data can't be read without
  * waiting.  This can be returned only if the installed page_read callback
@@ -1116,6 +1138,7 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
 	if (record->xl_tot_len < SizeOfXLogRecord)
 	{
 		report_invalid_record(state,
+							  XLOG_READER_INVALID_DATA,
 							  "invalid record length at %X/%X: expected at least %u, got %u",
 							  LSN_FORMAT_ARGS(RecPtr),
 							  (uint32) SizeOfXLogRecord, record->xl_tot_len);
@@ -1124,6 +1147,7 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
 	if (!RmgrIdIsValid(record->xl_rmid))
 	{
 		report_invalid_record(state,
+							  XLOG_READER_INVALID_DATA,
 							  "invalid resource manager ID %u at %X/%X",
 							  record->xl_rmid, LSN_FORMAT_ARGS(RecPtr));
 		return false;
@@ -1137,6 +1161,7 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
 		if (!(record->xl_prev < RecPtr))
 		{
 			report_invalid_record(state,
+								  XLOG_READER_INVALID_DATA,
 								  "record with incorrect prev-link %X/%X at %X/%X",
 								  LSN_FORMAT_ARGS(record->xl_prev),
 								  LSN_FORMAT_ARGS(RecPtr));
@@ -1153,6 +1178,7 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
 		if (record->xl_prev != PrevRecPtr)
 		{
 			report_invalid_record(state,
+								  XLOG_READER_INVALID_DATA,
 								  "record with incorrect prev-link %X/%X at %X/%X",
 								  LSN_FORMAT_ARGS(record->xl_prev),
 								  LSN_FORMAT_ARGS(RecPtr));
@@ -1189,6 +1215,7 @@ ValidXLogRecord(XLogReaderState *state, XLogRecord *record, XLogRecPtr recptr)
 	if (!EQ_CRC32C(record->xl_crc, crc))
 	{
 		report_invalid_record(state,
+							  XLOG_READER_INVALID_DATA,
 							  "incorrect resource manager data checksum in record at %X/%X",
 							  LSN_FORMAT_ARGS(recptr));
 		return false;
@@ -1223,6 +1250,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
 		XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
 
 		report_invalid_record(state,
+							  XLOG_READER_INVALID_DATA,
 							  "invalid magic number %04X in WAL segment %s, LSN %X/%X, offset %u",
 							  hdr->xlp_magic,
 							  fname,
@@ -1238,6 +1266,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
 		XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
 
 		report_invalid_record(state,
+							  XLOG_READER_INVALID_DATA,
 							  "invalid info bits %04X in WAL segment %s, LSN %X/%X, offset %u",
 							  hdr->xlp_info,
 							  fname,
@@ -1254,6 +1283,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
 			longhdr->xlp_sysid != state->system_identifier)
 		{
 			report_invalid_record(state,
+								  XLOG_READER_INVALID_DATA,
 								  "WAL file is from different database system: WAL file database system identifier is %llu, pg_control database system identifier is %llu",
 								  (unsigned long long) longhdr->xlp_sysid,
 								  (unsigned long long) state->system_identifier);
@@ -1262,12 +1292,14 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
 		else if (longhdr->xlp_seg_size != state->segcxt.ws_segsize)
 		{
 			report_invalid_record(state,
+								  XLOG_READER_INVALID_DATA,
 								  "WAL file is from different database system: incorrect segment size in page header");
 			return false;
 		}
 		else if (longhdr->xlp_xlog_blcksz != XLOG_BLCKSZ)
 		{
 			report_invalid_record(state,
+								  XLOG_READER_INVALID_DATA,
 								  "WAL file is from different database system: incorrect XLOG_BLCKSZ in page header");
 			return false;
 		}
@@ -1280,6 +1312,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
 
 		/* hmm, first page of file doesn't have a long header? */
 		report_invalid_record(state,
+							  XLOG_READER_INVALID_DATA,
 							  "invalid info bits %04X in WAL segment %s, LSN %X/%X, offset %u",
 							  hdr->xlp_info,
 							  fname,
@@ -1300,6 +1333,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
 		XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
 
 		report_invalid_record(state,
+							  XLOG_READER_INVALID_DATA,
 							  "unexpected pageaddr %X/%X in WAL segment %s, LSN %X/%X, offset %u",
 							  LSN_FORMAT_ARGS(hdr->xlp_pageaddr),
 							  fname,
@@ -1326,6 +1360,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
 			XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
 
 			report_invalid_record(state,
+								  XLOG_READER_INVALID_DATA,
 								  "out-of-sequence timeline ID %u (after %u) in WAL segment %s, LSN %X/%X, offset %u",
 								  hdr->xlp_tli,
 								  state->latestPageTLI,
@@ -1347,8 +1382,9 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
 void
 XLogReaderResetError(XLogReaderState *state)
 {
-	state->errormsg_buf[0] = '\0';
-	state->errormsg_deferred = false;
+	state->errordata.message[0] = '\0';
+	state->errordata.message_deferred = false;
+	state->errordata.code = XLOG_READER_NONE;
 }
 
 /*
@@ -1368,7 +1404,7 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
 	XLogRecPtr	tmpRecPtr;
 	XLogRecPtr	found = InvalidXLogRecPtr;
 	XLogPageHeader header;
-	char	   *errormsg;
+	XLogReaderError errordata = {0};
 
 	Assert(!XLogRecPtrIsInvalid(RecPtr));
 
@@ -1453,7 +1489,7 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
 	 * or we just jumped over the remaining data of a continuation.
 	 */
 	XLogBeginRead(state, tmpRecPtr);
-	while (XLogReadRecord(state, &errormsg) != NULL)
+	while (XLogReadRecord(state, &errordata) != NULL)
 	{
 		/* past the record we've found, break out */
 		if (RecPtr <= state->ReadRecPtr)
@@ -1598,8 +1634,9 @@ ResetDecoder(XLogReaderState *state)
 	state->decode_buffer_head = state->decode_buffer;
 
 	/* Clear error state. */
-	state->errormsg_buf[0] = '\0';
-	state->errormsg_deferred = false;
+	state->errordata.message[0] = '\0';
+	state->errordata.message_deferred = false;
+	state->errordata.code = XLOG_READER_NONE;
 }
 
 /*
@@ -1649,7 +1686,7 @@ DecodeXLogRecord(XLogReaderState *state,
 				 DecodedXLogRecord *decoded,
 				 XLogRecord *record,
 				 XLogRecPtr lsn,
-				 char **errormsg)
+				 XLogReaderError *errordata)
 {
 	/*
 	 * read next _size bytes from record buffer, but check for overrun first.
@@ -1732,6 +1769,7 @@ DecodeXLogRecord(XLogReaderState *state,
 			if (block_id <= decoded->max_block_id)
 			{
 				report_invalid_record(state,
+									  XLOG_READER_INVALID_DATA,
 									  "out-of-order block_id %u at %X/%X",
 									  block_id,
 									  LSN_FORMAT_ARGS(state->ReadRecPtr));
@@ -1756,6 +1794,7 @@ DecodeXLogRecord(XLogReaderState *state,
 			if (blk->has_data && blk->data_len == 0)
 			{
 				report_invalid_record(state,
+									  XLOG_READER_INVALID_DATA,
 									  "BKPBLOCK_HAS_DATA set, but no data included at %X/%X",
 									  LSN_FORMAT_ARGS(state->ReadRecPtr));
 				goto err;
@@ -1763,6 +1802,7 @@ DecodeXLogRecord(XLogReaderState *state,
 			if (!blk->has_data && blk->data_len != 0)
 			{
 				report_invalid_record(state,
+									  XLOG_READER_INVALID_DATA,
 									  "BKPBLOCK_HAS_DATA not set, but data length is %u at %X/%X",
 									  (unsigned int) blk->data_len,
 									  LSN_FORMAT_ARGS(state->ReadRecPtr));
@@ -1799,6 +1839,7 @@ DecodeXLogRecord(XLogReaderState *state,
 					 blk->bimg_len == BLCKSZ))
 				{
 					report_invalid_record(state,
+										  XLOG_READER_INVALID_DATA,
 										  "BKPIMAGE_HAS_HOLE set, but hole offset %u length %u block image length %u at %X/%X",
 										  (unsigned int) blk->hole_offset,
 										  (unsigned int) blk->hole_length,
@@ -1815,6 +1856,7 @@ DecodeXLogRecord(XLogReaderState *state,
 					(blk->hole_offset != 0 || blk->hole_length != 0))
 				{
 					report_invalid_record(state,
+										  XLOG_READER_INVALID_DATA,
 										  "BKPIMAGE_HAS_HOLE not set, but hole offset %u length %u at %X/%X",
 										  (unsigned int) blk->hole_offset,
 										  (unsigned int) blk->hole_length,
@@ -1829,6 +1871,7 @@ DecodeXLogRecord(XLogReaderState *state,
 					blk->bimg_len == BLCKSZ)
 				{
 					report_invalid_record(state,
+										  XLOG_READER_INVALID_DATA,
 										  "BKPIMAGE_COMPRESSED set, but block image length %u at %X/%X",
 										  (unsigned int) blk->bimg_len,
 										  LSN_FORMAT_ARGS(state->ReadRecPtr));
@@ -1844,6 +1887,7 @@ DecodeXLogRecord(XLogReaderState *state,
 					blk->bimg_len != BLCKSZ)
 				{
 					report_invalid_record(state,
+										  XLOG_READER_INVALID_DATA,
 										  "neither BKPIMAGE_HAS_HOLE nor BKPIMAGE_COMPRESSED set, but block image length is %u at %X/%X",
 										  (unsigned int) blk->data_len,
 										  LSN_FORMAT_ARGS(state->ReadRecPtr));
@@ -1860,6 +1904,7 @@ DecodeXLogRecord(XLogReaderState *state,
 				if (rlocator == NULL)
 				{
 					report_invalid_record(state,
+										  XLOG_READER_INVALID_DATA,
 										  "BKPBLOCK_SAME_REL set but no previous rel at %X/%X",
 										  LSN_FORMAT_ARGS(state->ReadRecPtr));
 					goto err;
@@ -1872,6 +1917,7 @@ DecodeXLogRecord(XLogReaderState *state,
 		else
 		{
 			report_invalid_record(state,
+								  XLOG_READER_INVALID_DATA,
 								  "invalid block_id %u at %X/%X",
 								  block_id, LSN_FORMAT_ARGS(state->ReadRecPtr));
 			goto err;
@@ -1939,10 +1985,12 @@ DecodeXLogRecord(XLogReaderState *state,
 
 shortdata_err:
 	report_invalid_record(state,
+						  XLOG_READER_INVALID_DATA,
 						  "record with invalid length at %X/%X",
 						  LSN_FORMAT_ARGS(state->ReadRecPtr));
 err:
-	*errormsg = state->errormsg_buf;
+	errordata->message = state->errordata.message;
+	errordata->code = state->errordata.code;
 
 	return false;
 }
@@ -2049,6 +2097,7 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page)
 		!record->record->blocks[block_id].in_use)
 	{
 		report_invalid_record(record,
+							  XLOG_READER_INVALID_DATA,
 							  "could not restore image at %X/%X with invalid block %d specified",
 							  LSN_FORMAT_ARGS(record->ReadRecPtr),
 							  block_id);
@@ -2056,7 +2105,9 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page)
 	}
 	if (!record->record->blocks[block_id].has_image)
 	{
-		report_invalid_record(record, "could not restore image at %X/%X with invalid state, block %d",
+		report_invalid_record(record,
+							  XLOG_READER_INVALID_DATA,
+							  "could not restore image at %X/%X with invalid state, block %d",
 							  LSN_FORMAT_ARGS(record->ReadRecPtr),
 							  block_id);
 		return false;
@@ -2083,7 +2134,9 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page)
 									bkpb->bimg_len, BLCKSZ - bkpb->hole_length) <= 0)
 				decomp_success = false;
 #else
-			report_invalid_record(record, "could not restore image at %X/%X compressed with %s not supported by build, block %d",
+			report_invalid_record(record,
+								  XLOG_READER_INVALID_DATA,
+								  "could not restore image at %X/%X compressed with %s not supported by build, block %d",
 								  LSN_FORMAT_ARGS(record->ReadRecPtr),
 								  "LZ4",
 								  block_id);
@@ -2100,7 +2153,9 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page)
 			if (ZSTD_isError(decomp_result))
 				decomp_success = false;
 #else
-			report_invalid_record(record, "could not restore image at %X/%X compressed with %s not supported by build, block %d",
+			report_invalid_record(record,
+								  XLOG_READER_INVALID_DATA,
+								  "could not restore image at %X/%X compressed with %s not supported by build, block %d",
 								  LSN_FORMAT_ARGS(record->ReadRecPtr),
 								  "zstd",
 								  block_id);
@@ -2109,7 +2164,9 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page)
 		}
 		else
 		{
-			report_invalid_record(record, "could not restore image at %X/%X compressed with unknown method, block %d",
+			report_invalid_record(record,
+								  XLOG_READER_INVALID_DATA,
+								  "could not restore image at %X/%X compressed with unknown method, block %d",
 								  LSN_FORMAT_ARGS(record->ReadRecPtr),
 								  block_id);
 			return false;
@@ -2117,7 +2174,9 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page)
 
 		if (!decomp_success)
 		{
-			report_invalid_record(record, "could not decompress image at %X/%X, block %d",
+			report_invalid_record(record,
+								  XLOG_READER_INVALID_DATA,
+								  "could not decompress image at %X/%X, block %d",
 								  LSN_FORMAT_ARGS(record->ReadRecPtr),
 								  block_id);
 			return false;
diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index becc2bda62..68100bfa4a 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -2454,7 +2454,7 @@ verifyBackupPageConsistency(XLogReaderState *record)
 		if (!RestoreBlockImage(record, block_id, primary_image_masked))
 			ereport(ERROR,
 					(errcode(ERRCODE_INTERNAL_ERROR),
-					 errmsg_internal("%s", record->errormsg_buf)));
+					 errmsg_internal("%s", record->errordata.message)));
 
 		/*
 		 * If masking function is defined, mask both the primary and replay
@@ -3062,9 +3062,9 @@ ReadRecord(XLogPrefetcher *xlogprefetcher, int emode,
 
 	for (;;)
 	{
-		char	   *errormsg;
+		XLogReaderError errordata = {0};
 
-		record = XLogPrefetcherReadRecord(xlogprefetcher, &errormsg);
+		record = XLogPrefetcherReadRecord(xlogprefetcher, &errordata);
 		if (record == NULL)
 		{
 			/*
@@ -3098,9 +3098,9 @@ ReadRecord(XLogPrefetcher *xlogprefetcher, int emode,
 			 * StandbyMode that only happens if we have been triggered, so we
 			 * shouldn't loop anymore in that case.
 			 */
-			if (errormsg)
+			if (errordata.message)
 				ereport(emode_for_corrupt_record(emode, xlogreader->EndRecPtr),
-						(errmsg_internal("%s", errormsg) /* already translated */ ));
+						(errmsg_internal("%s", errordata.message) /* already translated */ ));
 		}
 
 		/*
@@ -3385,9 +3385,9 @@ retry:
 		 * Emit this error right now then retry this page immediately. Use
 		 * errmsg_internal() because the message was already translated.
 		 */
-		if (xlogreader->errormsg_buf[0])
+		if (xlogreader->errordata.message[0])
 			ereport(emode_for_corrupt_record(emode, xlogreader->EndRecPtr),
-					(errmsg_internal("%s", xlogreader->errormsg_buf)));
+					(errmsg_internal("%s", xlogreader->errordata.message)));
 
 		/* reset any error XLogReaderValidatePageHeader() might have set */
 		XLogReaderResetError(xlogreader);
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index e174a2a891..5c64454e7e 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -395,7 +395,7 @@ XLogReadBufferForRedoExtended(XLogReaderState *record,
 		if (!RestoreBlockImage(record, block_id, page))
 			ereport(ERROR,
 					(errcode(ERRCODE_INTERNAL_ERROR),
-					 errmsg_internal("%s", record->errormsg_buf)));
+					 errmsg_internal("%s", record->errordata.message)));
 
 		/*
 		 * The page may be uninitialized. If so, we can't set the LSN because
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 41243d0187..f48feab944 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -641,12 +641,13 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
 	for (;;)
 	{
 		XLogRecord *record;
-		char	   *err = NULL;
+		XLogReaderError errordata = {0};
 
 		/* the read_page callback waits for new WAL */
-		record = XLogReadRecord(ctx->reader, &err);
-		if (err)
-			elog(ERROR, "could not find logical decoding starting point: %s", err);
+		record = XLogReadRecord(ctx->reader, &errordata);
+		if (errordata.message)
+			elog(ERROR, "could not find logical decoding starting point: %s",
+				 errordata.message);
 		if (!record)
 			elog(ERROR, "could not find logical decoding starting point");
 
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index 55a24c02c9..e7f74809e3 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -244,11 +244,12 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 		while (ctx->reader->EndRecPtr < end_of_wal)
 		{
 			XLogRecord *record;
-			char	   *errm = NULL;
+			XLogReaderError errordata = {0};
 
-			record = XLogReadRecord(ctx->reader, &errm);
-			if (errm)
-				elog(ERROR, "could not find record for logical decoding: %s", errm);
+			record = XLogReadRecord(ctx->reader, &errordata);
+			if (errordata.message)
+				elog(ERROR, "could not find record for logical decoding: %s",
+					 errordata.message);
 
 			/*
 			 * The {begin_txn,change,commit_txn}_wrapper callbacks above will
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 6035cf4816..4fa4e6bfed 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -503,17 +503,17 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto)
 		/* Decode at least one record, until we run out of records */
 		while (ctx->reader->EndRecPtr < moveto)
 		{
-			char	   *errm = NULL;
 			XLogRecord *record;
+			XLogReaderError errordata = {0};
 
 			/*
 			 * Read records.  No changes are generated in fast_forward mode,
 			 * but snapbuilder/slot statuses are updated properly.
 			 */
-			record = XLogReadRecord(ctx->reader, &errm);
-			if (errm)
+			record = XLogReadRecord(ctx->reader, &errordata);
+			if (errordata.message)
 				elog(ERROR, "could not find record while advancing replication slot: %s",
-					 errm);
+					 errordata.message);
 
 			/*
 			 * Process the record.  Storage-level changes are ignored in
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index d27ef2985d..d05c60f09f 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -3045,7 +3045,7 @@ static void
 XLogSendLogical(void)
 {
 	XLogRecord *record;
-	char	   *errm;
+	XLogReaderError errordata = {0};
 
 	/*
 	 * We'll use the current flush point to determine whether we've caught up.
@@ -3063,12 +3063,12 @@ XLogSendLogical(void)
 	 */
 	WalSndCaughtUp = false;
 
-	record = XLogReadRecord(logical_decoding_ctx->reader, &errm);
+	record = XLogReadRecord(logical_decoding_ctx->reader, &errordata);
 
 	/* xlog record was invalid */
-	if (errm != NULL)
+	if (errordata.message != NULL)
 		elog(ERROR, "could not find record while sending logically-decoded data: %s",
-			 errm);
+			 errordata.message);
 
 	if (record != NULL)
 	{
diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c
index 27782237d0..2705d9bf45 100644
--- a/src/bin/pg_rewind/parsexlog.c
+++ b/src/bin/pg_rewind/parsexlog.c
@@ -68,7 +68,7 @@ extractPageMap(const char *datadir, XLogRecPtr startpoint, int tliIndex,
 {
 	XLogRecord *record;
 	XLogReaderState *xlogreader;
-	char	   *errormsg;
+	XLogReaderError errordata = {0};
 	XLogPageReadPrivate private;
 
 	private.tliIndex = tliIndex;
@@ -82,16 +82,16 @@ extractPageMap(const char *datadir, XLogRecPtr startpoint, int tliIndex,
 	XLogBeginRead(xlogreader, startpoint);
 	do
 	{
-		record = XLogReadRecord(xlogreader, &errormsg);
+		record = XLogReadRecord(xlogreader, &errordata);
 
 		if (record == NULL)
 		{
 			XLogRecPtr	errptr = xlogreader->EndRecPtr;
 
-			if (errormsg)
+			if (errordata.message)
 				pg_fatal("could not read WAL record at %X/%X: %s",
 						 LSN_FORMAT_ARGS(errptr),
-						 errormsg);
+						 errordata.message);
 			else
 				pg_fatal("could not read WAL record at %X/%X",
 						 LSN_FORMAT_ARGS(errptr));
@@ -126,7 +126,7 @@ readOneRecord(const char *datadir, XLogRecPtr ptr, int tliIndex,
 {
 	XLogRecord *record;
 	XLogReaderState *xlogreader;
-	char	   *errormsg;
+	XLogReaderError errordata = {0};
 	XLogPageReadPrivate private;
 	XLogRecPtr	endptr;
 
@@ -139,12 +139,12 @@ readOneRecord(const char *datadir, XLogRecPtr ptr, int tliIndex,
 		pg_fatal("out of memory while allocating a WAL reading processor");
 
 	XLogBeginRead(xlogreader, ptr);
-	record = XLogReadRecord(xlogreader, &errormsg);
+	record = XLogReadRecord(xlogreader, &errordata);
 	if (record == NULL)
 	{
-		if (errormsg)
+		if (errordata.message)
 			pg_fatal("could not read WAL record at %X/%X: %s",
-					 LSN_FORMAT_ARGS(ptr), errormsg);
+					 LSN_FORMAT_ARGS(ptr), errordata.message);
 		else
 			pg_fatal("could not read WAL record at %X/%X",
 					 LSN_FORMAT_ARGS(ptr));
@@ -173,7 +173,7 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex,
 	XLogRecord *record;
 	XLogRecPtr	searchptr;
 	XLogReaderState *xlogreader;
-	char	   *errormsg;
+	XLogReaderError errordata = {0};
 	XLogPageReadPrivate private;
 
 	/*
@@ -204,14 +204,14 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex,
 		uint8		info;
 
 		XLogBeginRead(xlogreader, searchptr);
-		record = XLogReadRecord(xlogreader, &errormsg);
+		record = XLogReadRecord(xlogreader, &errordata);
 
 		if (record == NULL)
 		{
-			if (errormsg)
+			if (errordata.message)
 				pg_fatal("could not find previous WAL record at %X/%X: %s",
 						 LSN_FORMAT_ARGS(searchptr),
-						 errormsg);
+						 errordata.message);
 			else
 				pg_fatal("could not find previous WAL record at %X/%X",
 						 LSN_FORMAT_ARGS(searchptr));
diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c
index e8b5a6cd61..4129ba901b 100644
--- a/src/bin/pg_waldump/pg_waldump.c
+++ b/src/bin/pg_waldump/pg_waldump.c
@@ -508,7 +508,7 @@ XLogRecordSaveFPWs(XLogReaderState *record, const char *savepath)
 
 		/* Full page exists, so let's save it */
 		if (!RestoreBlockImage(record, block_id, page))
-			pg_fatal("%s", record->errormsg_buf);
+			pg_fatal("%s", record->errordata.message);
 
 		(void) XLogRecGetBlockTagExtended(record, block_id,
 										  &rnode, &fork, &blk, NULL);
@@ -796,7 +796,7 @@ main(int argc, char **argv)
 	XLogRecord *record;
 	XLogRecPtr	first_record;
 	char	   *waldir = NULL;
-	char	   *errormsg;
+	XLogReaderError errordata = {0};
 
 	static struct option long_options[] = {
 		{"bkp-details", no_argument, NULL, 'b'},
@@ -1239,7 +1239,7 @@ main(int argc, char **argv)
 		}
 
 		/* try to read the next record */
-		record = XLogReadRecord(xlogreader_state, &errormsg);
+		record = XLogReadRecord(xlogreader_state, &errordata);
 		if (!record)
 		{
 			if (!config.follow || private.endptr_reached)
@@ -1304,10 +1304,10 @@ main(int argc, char **argv)
 	if (time_to_stop)
 		exit(0);
 
-	if (errormsg)
+	if (errordata.message)
 		pg_fatal("error in WAL record at %X/%X: %s",
 				 LSN_FORMAT_ARGS(xlogreader_state->ReadRecPtr),
-				 errormsg);
+				 errordata.message);
 
 	XLogReaderFree(xlogreader_state);
 
diff --git a/contrib/pg_walinspect/pg_walinspect.c b/contrib/pg_walinspect/pg_walinspect.c
index 796a74f322..e7d30554ed 100644
--- a/contrib/pg_walinspect/pg_walinspect.c
+++ b/contrib/pg_walinspect/pg_walinspect.c
@@ -146,9 +146,9 @@ static XLogRecord *
 ReadNextXLogRecord(XLogReaderState *xlogreader)
 {
 	XLogRecord *record;
-	char	   *errormsg;
+	XLogReaderError errordata = {0};
 
-	record = XLogReadRecord(xlogreader, &errormsg);
+	record = XLogReadRecord(xlogreader, &errordata);
 
 	if (record == NULL)
 	{
@@ -161,11 +161,12 @@ ReadNextXLogRecord(XLogReaderState *xlogreader)
 		if (private_data->end_of_wal)
 			return NULL;
 
-		if (errormsg)
+		if (errordata.message)
 			ereport(ERROR,
 					(errcode_for_file_access(),
 					 errmsg("could not read WAL at %X/%X: %s",
-							LSN_FORMAT_ARGS(xlogreader->EndRecPtr), errormsg)));
+							LSN_FORMAT_ARGS(xlogreader->EndRecPtr),
+							errordata.message)));
 		else
 			ereport(ERROR,
 					(errcode_for_file_access(),
@@ -384,7 +385,7 @@ GetWALBlockInfo(FunctionCallInfo fcinfo, XLogReaderState *record,
 			if (!RestoreBlockImage(record, block_id, page))
 				ereport(ERROR,
 						(errcode(ERRCODE_INTERNAL_ERROR),
-						 errmsg_internal("%s", record->errormsg_buf)));
+						 errmsg_internal("%s", record->errordata.message)));
 
 			block_fpi_data = (bytea *) palloc(BLCKSZ + VARHDRSZ);
 			SET_VARSIZE(block_fpi_data, BLCKSZ + VARHDRSZ);
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 66823bc2a7..53ce72c4c2 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -3077,6 +3077,8 @@ XLogPageReadResult
 XLogPrefetchStats
 XLogPrefetcher
 XLogPrefetcherFilter
+XLogReaderError
+XLogReaderErrorCode
 XLogReaderRoutine
 XLogReaderState
 XLogRecData
-- 
2.40.1

From e374c0ea3a9e775f44ec63582895e4596c65da1c Mon Sep 17 00:00:00 2001
From: Michael Paquier <mich...@paquier.xyz>
Date: Wed, 9 Aug 2023 14:53:26 +0900
Subject: [PATCH v2 2/3] Make WAL replay more robust on OOM failures

This takes advantage of the new error facility for WAL readers, allowing
WAL replay to loop when an out-of-memory happens when reading a record.
This was the origin of potential data loss scenarios, making WAL replay
more robust by acting like a standby here.
---
 src/backend/access/transam/xlogrecovery.c | 75 ++++++++++++++++-------
 1 file changed, 52 insertions(+), 23 deletions(-)

diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index 68100bfa4a..d6811c8678 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -3067,29 +3067,50 @@ ReadRecord(XLogPrefetcher *xlogprefetcher, int emode,
 		record = XLogPrefetcherReadRecord(xlogprefetcher, &errordata);
 		if (record == NULL)
 		{
-			/*
-			 * When we find that WAL ends in an incomplete record, keep track
-			 * of that record.  After recovery is done, we'll write a record
-			 * to indicate to downstream WAL readers that that portion is to
-			 * be ignored.
-			 *
-			 * However, when ArchiveRecoveryRequested = true, we're going to
-			 * switch to a new timeline at the end of recovery. We will only
-			 * copy WAL over to the new timeline up to the end of the last
-			 * complete record, so if we did this, we would later create an
-			 * overwrite contrecord in the wrong place, breaking everything.
-			 */
-			if (!ArchiveRecoveryRequested &&
-				!XLogRecPtrIsInvalid(xlogreader->abortedRecPtr))
+			switch (errordata.code)
 			{
-				abortedRecPtr = xlogreader->abortedRecPtr;
-				missingContrecPtr = xlogreader->missingContrecPtr;
-			}
+				case XLOG_READER_NONE:
+					/* Possible when XLogPageRead() has failed */
+					Assert(!errordata.message);
+					/* FALLTHROUGH */
 
-			if (readFile >= 0)
-			{
-				close(readFile);
-				readFile = -1;
+				case XLOG_READER_INVALID_DATA:
+
+					/*
+					 * When we find that WAL ends in an incomplete record,
+					 * keep track of that record.  After recovery is done,
+					 * we'll write a record to indicate to downstream WAL
+					 * readers that that portion is to be ignored.
+					 *
+					 * However, when ArchiveRecoveryRequested = true, we're
+					 * going to switch to a new timeline at the end of
+					 * recovery. We will only copy WAL over to the new
+					 * timeline up to the end of the last complete record, so
+					 * if we did this, we would later create an overwrite
+					 * contrecord in the wrong place, breaking everything.
+					 */
+					if (!ArchiveRecoveryRequested &&
+						!XLogRecPtrIsInvalid(xlogreader->abortedRecPtr))
+					{
+						abortedRecPtr = xlogreader->abortedRecPtr;
+						missingContrecPtr = xlogreader->missingContrecPtr;
+					}
+
+					if (readFile >= 0)
+					{
+						close(readFile);
+						readFile = -1;
+					}
+					break;
+				case XLOG_READER_OOM:
+
+					/*
+					 * If we failed because of an out-of-memory problem, just
+					 * give up and retry recovery later.  It may be posible
+					 * that the WAL record to decode required a larger memory
+					 * allocation than what the host can offer.
+					 */
+					break;
 			}
 
 			/*
@@ -3147,9 +3168,12 @@ ReadRecord(XLogPrefetcher *xlogprefetcher, int emode,
 			 * WAL from the archive, even if pg_wal is completely empty, but
 			 * we'd have no idea how far we'd have to replay to reach
 			 * consistency.  So err on the safe side and give up.
+			 *
+			 * It may be possible that the record was not decoded because of
+			 * an out-of-memory failure.  In this case, just loop.
 			 */
 			if (!InArchiveRecovery && ArchiveRecoveryRequested &&
-				!fetching_ckpt)
+				!fetching_ckpt && errordata.code != XLOG_READER_OOM)
 			{
 				ereport(DEBUG1,
 						(errmsg_internal("reached end of WAL in pg_wal, entering archive recovery")));
@@ -3173,9 +3197,14 @@ ReadRecord(XLogPrefetcher *xlogprefetcher, int emode,
 				continue;
 			}
 
-			/* In standby mode, loop back to retry. Otherwise, give up. */
+			/*
+			 * In standby mode or if the WAL record failed on a out-of-memory,
+			 * loop back to retry.  Otherwise, give up.
+			 */
 			if (StandbyMode && !CheckForStandbyTrigger())
 				continue;
+			else if (errordata.code == XLOG_READER_OOM)
+				continue;
 			else
 				return NULL;
 		}
-- 
2.40.1

From 8c834db3da2b5391cebeff7404a8cc8ca15b58b2 Mon Sep 17 00:00:00 2001
From: Michael Paquier <mich...@paquier.xyz>
Date: Wed, 9 Aug 2023 14:53:44 +0900
Subject: [PATCH v2 3/3] Tweak to force OOM behavior when replaying records

---
 src/backend/access/transam/xlogreader.c | 26 ++++++++++++++++++++++++-
 1 file changed, 25 insertions(+), 1 deletion(-)

diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 891e38e6e7..c5f7985d88 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -557,6 +557,7 @@ XLogDecodeNextRecord(XLogReaderState *state, bool nonblocking)
 	int			readOff;
 	DecodedXLogRecord *decoded;
 	XLogReaderError errordata = {0};		/* not used */
+	bool		trigger_oom = false;
 
 	/*
 	 * randAccess indicates whether to verify the previous-record pointer of
@@ -708,7 +709,30 @@ restart:
 								  total_len,
 								  !nonblocking /* allow_oversized */ );
 
-	if (decoded == NULL)
+#ifndef FRONTEND
+
+	/*
+	 * Trick to emulate an OOM after a hardcoded number of records replayed.
+	 */
+	{
+		struct stat fstat;
+		static int	counter = 0;
+
+		if (stat("/tmp/xlogreader_oom", &fstat) == 0)
+		{
+			counter++;
+			if (counter >= 100)
+			{
+				trigger_oom = true;
+
+				/* Reset counter, to not fail when shutting down WAL */
+				counter = 0;
+			}
+		}
+	}
+#endif
+
+	if (decoded == NULL || trigger_oom)
 	{
 		/*
 		 * There is no space in the decode buffer.  The caller should help
-- 
2.40.1

Attachment: signature.asc
Description: PGP signature

Reply via email to