On Tue, Aug 08, 2023 at 05:44:03PM +0900, Kyotaro Horiguchi wrote: > I like the overall direction. Though, I'm considering enclosing the > errormsg and errorcode in a struct.
Yes, this suggestion makes sense as it simplifies all the WAL routines that need to report back a complete error state, and there are four of them now: XLogPrefetcherReadRecord() XLogReadRecord() XLogNextRecord() DecodeXLogRecord() I have spent more time on 0001, polishing it and fixing a few bugs that I have found while reviewing the whole. Most of them were related to mistakes in resetting the error state when expected. I have also expanded DecodeXLogRecord() to use an error structure instead of only an errmsg, giving more consistency. The error state now relies on two structures: +typedef enum XLogReaderErrorCode +{ + XLOG_READER_NONE = 0, + XLOG_READER_OOM, /* out-of-memory */ + XLOG_READER_INVALID_DATA, /* record data */ +} XLogReaderErrorCode; +typedef struct XLogReaderError +{ + /* Buffer to hold error message */ + char *message; + bool message_deferred; + /* Error code when filling *message */ + XLogReaderErrorCode code; +} XLogReaderError; I'm kind of happy with this layer, now. I have also spent some time on finding a more elegant solution for the WAL replay, relying on the new facility from 0001. And it happens that it is easy enough to loop if facing an out-of-memory failure when reading a record when we are in crash recovery, as the state is actually close to what a standby does. The trick is that we should not change the state and avoid tracking a continuation record. This is done in 0002, making replay more robust. With the addition of the error injection tweak in 0003, I am able to finish recovery while the startup process loops if under memory pressure. As mentioned previously, there are more code paths to consider, but that's a start to fix the data loss problems. Comments are welcome. -- Michael
From 9da3751ef46be90cf7a5d4050c5cb281c33dbf70 Mon Sep 17 00:00:00 2001 From: Michael Paquier <mich...@paquier.xyz> Date: Wed, 9 Aug 2023 13:47:30 +0900 Subject: [PATCH v2 1/3] Add infrastructure to report error codes in WAL reader This commits moves the error state coming from WAL readers into a new structure, that includes the existing pointer to the error message buffer, but it also gains an error code that fed back to the callers of the following routines: XLogPrefetcherReadRecord() XLogReadRecord() XLogNextRecord() DecodeXLogRecord() This will help in improving the decisions to take during recovery depending on the failure more reported. --- src/include/access/xlogprefetcher.h | 2 +- src/include/access/xlogreader.h | 33 +++- src/backend/access/transam/twophase.c | 8 +- src/backend/access/transam/xlog.c | 6 +- src/backend/access/transam/xlogprefetcher.c | 4 +- src/backend/access/transam/xlogreader.c | 167 ++++++++++++------ src/backend/access/transam/xlogrecovery.c | 14 +- src/backend/access/transam/xlogutils.c | 2 +- src/backend/replication/logical/logical.c | 9 +- .../replication/logical/logicalfuncs.c | 9 +- src/backend/replication/slotfuncs.c | 8 +- src/backend/replication/walsender.c | 8 +- src/bin/pg_rewind/parsexlog.c | 24 +-- src/bin/pg_waldump/pg_waldump.c | 10 +- contrib/pg_walinspect/pg_walinspect.c | 11 +- src/tools/pgindent/typedefs.list | 2 + 16 files changed, 200 insertions(+), 117 deletions(-) diff --git a/src/include/access/xlogprefetcher.h b/src/include/access/xlogprefetcher.h index 7dd7f20ad0..5563ad1a67 100644 --- a/src/include/access/xlogprefetcher.h +++ b/src/include/access/xlogprefetcher.h @@ -48,7 +48,7 @@ extern void XLogPrefetcherBeginRead(XLogPrefetcher *prefetcher, XLogRecPtr recPtr); extern XLogRecord *XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, - char **errmsg); + XLogReaderError *errordata); extern void XLogPrefetcherComputeStats(XLogPrefetcher *prefetcher); diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h index da32c7db77..2b57b5eb01 100644 --- a/src/include/access/xlogreader.h +++ b/src/include/access/xlogreader.h @@ -58,6 +58,25 @@ typedef struct WALSegmentContext typedef struct XLogReaderState XLogReaderState; +/* Values for XLogReaderError.errorcode */ +typedef enum XLogReaderErrorCode +{ + XLOG_READER_NONE = 0, + XLOG_READER_OOM, /* out-of-memory */ + XLOG_READER_INVALID_DATA, /* record data */ +} XLogReaderErrorCode; + +/* Error status generated by a WAL reader on failure */ +typedef struct XLogReaderError +{ + /* Buffer to hold error message */ + char *message; + bool message_deferred; + /* Error code when filling *message */ + XLogReaderErrorCode code; +} XLogReaderError; + + /* Function type definitions for various xlogreader interactions */ typedef int (*XLogPageReadCB) (XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, @@ -307,9 +326,8 @@ struct XLogReaderState char *readRecordBuf; uint32 readRecordBufSize; - /* Buffer to hold error message */ - char *errormsg_buf; - bool errormsg_deferred; + /* Error state data */ + XLogReaderError errordata; /* * Flag to indicate to XLogPageReadCB that it should not block waiting for @@ -324,7 +342,8 @@ struct XLogReaderState static inline bool XLogReaderHasQueuedRecordOrError(XLogReaderState *state) { - return (state->decode_queue_head != NULL) || state->errormsg_deferred; + return (state->decode_queue_head != NULL) || + state->errordata.message_deferred; } /* Get a new XLogReader */ @@ -355,11 +374,11 @@ typedef enum XLogPageReadResult /* Read the next XLog record. Returns NULL on end-of-WAL or failure */ extern struct XLogRecord *XLogReadRecord(XLogReaderState *state, - char **errormsg); + XLogReaderError *errordata); /* Consume the next record or error. */ extern DecodedXLogRecord *XLogNextRecord(XLogReaderState *state, - char **errormsg); + XLogReaderError *errordata); /* Release the previously returned record, if necessary. */ extern XLogRecPtr XLogReleasePreviousRecord(XLogReaderState *state); @@ -399,7 +418,7 @@ extern bool DecodeXLogRecord(XLogReaderState *state, DecodedXLogRecord *decoded, XLogRecord *record, XLogRecPtr lsn, - char **errormsg); + XLogReaderError *errordata); /* * Macros that provide access to parts of the record most recently returned by diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index c6af8cfd7e..08bd6586ec 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -1399,7 +1399,7 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len) { XLogRecord *record; XLogReaderState *xlogreader; - char *errormsg; + XLogReaderError errordata = {0}; xlogreader = XLogReaderAllocate(wal_segment_size, NULL, XL_ROUTINE(.page_read = &read_local_xlog_page, @@ -1413,15 +1413,15 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len) errdetail("Failed while allocating a WAL reading processor."))); XLogBeginRead(xlogreader, lsn); - record = XLogReadRecord(xlogreader, &errormsg); + record = XLogReadRecord(xlogreader, &errordata); if (record == NULL) { - if (errormsg) + if (errordata.message) ereport(ERROR, (errcode_for_file_access(), errmsg("could not read two-phase state from WAL at %X/%X: %s", - LSN_FORMAT_ARGS(lsn), errormsg))); + LSN_FORMAT_ARGS(lsn), errordata.message))); else ereport(ERROR, (errcode_for_file_access(), diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 60c0b7ec3a..d16acd5e49 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -953,7 +953,7 @@ XLogInsertRecord(XLogRecData *rdata, DecodedXLogRecord *decoded; StringInfoData buf; StringInfoData recordBuf; - char *errormsg = NULL; + XLogReaderError errordata = {0}; MemoryContext oldCxt; oldCxt = MemoryContextSwitchTo(walDebugCxt); @@ -987,10 +987,10 @@ XLogInsertRecord(XLogRecData *rdata, decoded, record, EndPos, - &errormsg)) + &errordata)) { appendStringInfo(&buf, "error decoding record: %s", - errormsg ? errormsg : "no error message"); + errordata.message ? errordata.message : "no error message"); } else { diff --git a/src/backend/access/transam/xlogprefetcher.c b/src/backend/access/transam/xlogprefetcher.c index 539928cb85..92d691ca49 100644 --- a/src/backend/access/transam/xlogprefetcher.c +++ b/src/backend/access/transam/xlogprefetcher.c @@ -984,7 +984,7 @@ XLogPrefetcherBeginRead(XLogPrefetcher *prefetcher, XLogRecPtr recPtr) * tries to initiate I/O for blocks referenced in future WAL records. */ XLogRecord * -XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg) +XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, XLogReaderError *errdata) { DecodedXLogRecord *record; XLogRecPtr replayed_up_to; @@ -1052,7 +1052,7 @@ XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg) } /* Read the next record. */ - record = XLogNextRecord(prefetcher->reader, errmsg); + record = XLogNextRecord(prefetcher->reader, errdata); if (!record) return NULL; diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index c9f9f6e98f..891e38e6e7 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -41,8 +41,10 @@ #include "common/logging.h" #endif -static void report_invalid_record(XLogReaderState *state, const char *fmt,...) - pg_attribute_printf(2, 3); +static void report_invalid_record(XLogReaderState *state, + XLogReaderErrorCode errorcode, + const char *fmt,...) + pg_attribute_printf(3, 4); static bool allocate_recordbuf(XLogReaderState *state, uint32 reclength); static int ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen); @@ -66,21 +68,23 @@ static void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt, #define DEFAULT_DECODE_BUFFER_SIZE (64 * 1024) /* - * Construct a string in state->errormsg_buf explaining what's wrong with + * Construct a string in state->errordata.message explaining what's wrong with * the current record being read. */ static void -report_invalid_record(XLogReaderState *state, const char *fmt,...) +report_invalid_record(XLogReaderState *state, XLogReaderErrorCode errorcode, + const char *fmt,...) { va_list args; fmt = _(fmt); va_start(args, fmt); - vsnprintf(state->errormsg_buf, MAX_ERRORMSG_LEN, fmt, args); + vsnprintf(state->errordata.message, MAX_ERRORMSG_LEN, fmt, args); va_end(args); - state->errormsg_deferred = true; + state->errordata.message_deferred = true; + state->errordata.code = errorcode; } /* @@ -141,15 +145,16 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir, /* system_identifier initialized to zeroes above */ state->private_data = private_data; /* ReadRecPtr, EndRecPtr and readLen initialized to zeroes above */ - state->errormsg_buf = palloc_extended(MAX_ERRORMSG_LEN + 1, - MCXT_ALLOC_NO_OOM); - if (!state->errormsg_buf) + state->errordata.message = palloc_extended(MAX_ERRORMSG_LEN + 1, + MCXT_ALLOC_NO_OOM); + if (!state->errordata.message) { pfree(state->readBuf); pfree(state); return NULL; } - state->errormsg_buf[0] = '\0'; + state->errordata.message[0] = '\0'; + state->errordata.code = XLOG_READER_NONE; /* * Allocate an initial readRecordBuf of minimal size, which can later be @@ -157,7 +162,7 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir, */ if (!allocate_recordbuf(state, 0)) { - pfree(state->errormsg_buf); + pfree(state->errordata.message); pfree(state->readBuf); pfree(state); return NULL; @@ -175,7 +180,7 @@ XLogReaderFree(XLogReaderState *state) if (state->decode_buffer && state->free_decode_buffer) pfree(state->decode_buffer); - pfree(state->errormsg_buf); + pfree(state->errordata.message); if (state->readRecordBuf) pfree(state->readRecordBuf); pfree(state->readBuf); @@ -351,23 +356,27 @@ XLogReleasePreviousRecord(XLogReaderState *state) * * On success, a record is returned. * - * The returned record (or *errormsg) points to an internal buffer that's - * valid until the next call to XLogNextRecord. + * The returned record (or errordata->message) points to an internal buffer + * that's valid until the next call to XLogNextRecord. */ DecodedXLogRecord * -XLogNextRecord(XLogReaderState *state, char **errormsg) +XLogNextRecord(XLogReaderState *state, XLogReaderError *errordata) { /* Release the last record returned by XLogNextRecord(). */ XLogReleasePreviousRecord(state); if (state->decode_queue_head == NULL) { - *errormsg = NULL; - if (state->errormsg_deferred) + errordata->message = NULL; + errordata->code = XLOG_READER_NONE; + if (state->errordata.message_deferred) { - if (state->errormsg_buf[0] != '\0') - *errormsg = state->errormsg_buf; - state->errormsg_deferred = false; + if (state->errordata.message[0] != '\0') + errordata->message = state->errordata.message; + if (state->errordata.code != XLOG_READER_NONE) + errordata->code = state->errordata.code; + state->errordata.message_deferred = false; + state->errordata.code = XLOG_READER_NONE; } /* @@ -397,7 +406,8 @@ XLogNextRecord(XLogReaderState *state, char **errormsg) state->ReadRecPtr = state->record->lsn; state->EndRecPtr = state->record->next_lsn; - *errormsg = NULL; + errordata->message = NULL; + errordata->code = XLOG_READER_NONE; return state->record; } @@ -409,17 +419,17 @@ XLogNextRecord(XLogReaderState *state, char **errormsg) * to XLogReadRecord(). * * If the page_read callback fails to read the requested data, NULL is - * returned. The callback is expected to have reported the error; errormsg - * is set to NULL. + * returned. The callback is expected to have reported the error; + * errordata->message is set to NULL. * * If the reading fails for some other reason, NULL is also returned, and - * *errormsg is set to a string with details of the failure. + * *errordata is set with details of the failure. * - * The returned pointer (or *errormsg) points to an internal buffer that's - * valid until the next call to XLogReadRecord. + * The returned pointer (or *errordata.message) points to an internal + * buffer that's valid until the next call to XLogReadRecord. */ XLogRecord * -XLogReadRecord(XLogReaderState *state, char **errormsg) +XLogReadRecord(XLogReaderState *state, XLogReaderError *errordata) { DecodedXLogRecord *decoded; @@ -437,7 +447,7 @@ XLogReadRecord(XLogReaderState *state, char **errormsg) XLogReadAhead(state, false /* nonblocking */ ); /* Consume the head record or error. */ - decoded = XLogNextRecord(state, errormsg); + decoded = XLogNextRecord(state, errordata); if (decoded) { /* @@ -546,7 +556,7 @@ XLogDecodeNextRecord(XLogReaderState *state, bool nonblocking) bool gotheader; int readOff; DecodedXLogRecord *decoded; - char *errormsg; /* not used */ + XLogReaderError errordata = {0}; /* not used */ /* * randAccess indicates whether to verify the previous-record pointer of @@ -556,7 +566,8 @@ XLogDecodeNextRecord(XLogReaderState *state, bool nonblocking) randAccess = false; /* reset error state */ - state->errormsg_buf[0] = '\0'; + state->errordata.message[0] = '\0'; + state->errordata.code = XLOG_READER_NONE; decoded = NULL; state->abortedRecPtr = InvalidXLogRecPtr; @@ -623,7 +634,9 @@ restart: } else if (targetRecOff < pageHeaderSize) { - report_invalid_record(state, "invalid record offset at %X/%X: expected at least %u, got %u", + report_invalid_record(state, + XLOG_READER_INVALID_DATA, + "invalid record offset at %X/%X: expected at least %u, got %u", LSN_FORMAT_ARGS(RecPtr), pageHeaderSize, targetRecOff); goto err; @@ -632,7 +645,9 @@ restart: if ((((XLogPageHeader) state->readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) && targetRecOff == pageHeaderSize) { - report_invalid_record(state, "contrecord is requested by %X/%X", + report_invalid_record(state, + XLOG_READER_INVALID_DATA, + "contrecord is requested by %X/%X", LSN_FORMAT_ARGS(RecPtr)); goto err; } @@ -673,6 +688,7 @@ restart: if (total_len < SizeOfXLogRecord) { report_invalid_record(state, + XLOG_READER_INVALID_DATA, "invalid record length at %X/%X: expected at least %u, got %u", LSN_FORMAT_ARGS(RecPtr), (uint32) SizeOfXLogRecord, total_len); @@ -691,6 +707,7 @@ restart: decoded = XLogReadRecordAlloc(state, total_len, !nonblocking /* allow_oversized */ ); + if (decoded == NULL) { /* @@ -702,6 +719,7 @@ restart: /* We failed to allocate memory for an oversized record. */ report_invalid_record(state, + XLOG_READER_OOM, "out of memory while trying to decode a record of length %u", total_len); goto err; } @@ -724,7 +742,9 @@ restart: !allocate_recordbuf(state, total_len)) { /* We treat this as a "bogus data" condition */ - report_invalid_record(state, "record length %u at %X/%X too long", + report_invalid_record(state, + XLOG_READER_OOM, + "record length %u at %X/%X too long", total_len, LSN_FORMAT_ARGS(RecPtr)); goto err; } @@ -773,6 +793,7 @@ restart: if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD)) { report_invalid_record(state, + XLOG_READER_INVALID_DATA, "there is no contrecord flag at %X/%X", LSN_FORMAT_ARGS(RecPtr)); goto err; @@ -786,6 +807,7 @@ restart: total_len != (pageHeader->xlp_rem_len + gotlen)) { report_invalid_record(state, + XLOG_READER_INVALID_DATA, "invalid contrecord length %u (expected %lld) at %X/%X", pageHeader->xlp_rem_len, ((long long) total_len) - gotlen, @@ -867,7 +889,7 @@ restart: state->NextRecPtr -= XLogSegmentOffset(state->NextRecPtr, state->segcxt.ws_segsize); } - if (DecodeXLogRecord(state, decoded, record, RecPtr, &errormsg)) + if (DecodeXLogRecord(state, decoded, record, RecPtr, &errordata)) { /* Record the location of the next record. */ decoded->next_lsn = state->NextRecPtr; @@ -918,7 +940,7 @@ err: * queued so that XLogPrefetcherReadRecord() doesn't bring us back a * second time and clobber the above state. */ - state->errormsg_deferred = true; + state->errordata.message_deferred = true; } if (decoded && decoded->oversized) @@ -931,9 +953,9 @@ err: XLogReaderInvalReadState(state); /* - * If an error was written to errmsg_buf, it'll be returned to the caller - * of XLogReadRecord() after all successfully decoded records from the - * read queue. + * If an error was written to errordata.message, it'll be returned to the + * caller of XLogReadRecord() after all successfully decoded records from + * the read queue. */ return XLREAD_FAIL; @@ -952,7 +974,7 @@ XLogReadAhead(XLogReaderState *state, bool nonblocking) { XLogPageReadResult result; - if (state->errormsg_deferred) + if (state->errordata.message_deferred) return NULL; result = XLogDecodeNextRecord(state, nonblocking); @@ -970,8 +992,8 @@ XLogReadAhead(XLogReaderState *state, bool nonblocking) * via the page_read() callback. * * Returns XLREAD_FAIL if the required page cannot be read for some - * reason; errormsg_buf is set in that case (unless the error occurs in the - * page_read callback). + * reason; errordata.message is set in that case (unless the error occurs in + * the page_read callback). * * Returns XLREAD_WOULDBLOCK if the requested data can't be read without * waiting. This can be returned only if the installed page_read callback @@ -1116,6 +1138,7 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr, if (record->xl_tot_len < SizeOfXLogRecord) { report_invalid_record(state, + XLOG_READER_INVALID_DATA, "invalid record length at %X/%X: expected at least %u, got %u", LSN_FORMAT_ARGS(RecPtr), (uint32) SizeOfXLogRecord, record->xl_tot_len); @@ -1124,6 +1147,7 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr, if (!RmgrIdIsValid(record->xl_rmid)) { report_invalid_record(state, + XLOG_READER_INVALID_DATA, "invalid resource manager ID %u at %X/%X", record->xl_rmid, LSN_FORMAT_ARGS(RecPtr)); return false; @@ -1137,6 +1161,7 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr, if (!(record->xl_prev < RecPtr)) { report_invalid_record(state, + XLOG_READER_INVALID_DATA, "record with incorrect prev-link %X/%X at %X/%X", LSN_FORMAT_ARGS(record->xl_prev), LSN_FORMAT_ARGS(RecPtr)); @@ -1153,6 +1178,7 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr, if (record->xl_prev != PrevRecPtr) { report_invalid_record(state, + XLOG_READER_INVALID_DATA, "record with incorrect prev-link %X/%X at %X/%X", LSN_FORMAT_ARGS(record->xl_prev), LSN_FORMAT_ARGS(RecPtr)); @@ -1189,6 +1215,7 @@ ValidXLogRecord(XLogReaderState *state, XLogRecord *record, XLogRecPtr recptr) if (!EQ_CRC32C(record->xl_crc, crc)) { report_invalid_record(state, + XLOG_READER_INVALID_DATA, "incorrect resource manager data checksum in record at %X/%X", LSN_FORMAT_ARGS(recptr)); return false; @@ -1223,6 +1250,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr, XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize); report_invalid_record(state, + XLOG_READER_INVALID_DATA, "invalid magic number %04X in WAL segment %s, LSN %X/%X, offset %u", hdr->xlp_magic, fname, @@ -1238,6 +1266,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr, XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize); report_invalid_record(state, + XLOG_READER_INVALID_DATA, "invalid info bits %04X in WAL segment %s, LSN %X/%X, offset %u", hdr->xlp_info, fname, @@ -1254,6 +1283,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr, longhdr->xlp_sysid != state->system_identifier) { report_invalid_record(state, + XLOG_READER_INVALID_DATA, "WAL file is from different database system: WAL file database system identifier is %llu, pg_control database system identifier is %llu", (unsigned long long) longhdr->xlp_sysid, (unsigned long long) state->system_identifier); @@ -1262,12 +1292,14 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr, else if (longhdr->xlp_seg_size != state->segcxt.ws_segsize) { report_invalid_record(state, + XLOG_READER_INVALID_DATA, "WAL file is from different database system: incorrect segment size in page header"); return false; } else if (longhdr->xlp_xlog_blcksz != XLOG_BLCKSZ) { report_invalid_record(state, + XLOG_READER_INVALID_DATA, "WAL file is from different database system: incorrect XLOG_BLCKSZ in page header"); return false; } @@ -1280,6 +1312,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr, /* hmm, first page of file doesn't have a long header? */ report_invalid_record(state, + XLOG_READER_INVALID_DATA, "invalid info bits %04X in WAL segment %s, LSN %X/%X, offset %u", hdr->xlp_info, fname, @@ -1300,6 +1333,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr, XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize); report_invalid_record(state, + XLOG_READER_INVALID_DATA, "unexpected pageaddr %X/%X in WAL segment %s, LSN %X/%X, offset %u", LSN_FORMAT_ARGS(hdr->xlp_pageaddr), fname, @@ -1326,6 +1360,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr, XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize); report_invalid_record(state, + XLOG_READER_INVALID_DATA, "out-of-sequence timeline ID %u (after %u) in WAL segment %s, LSN %X/%X, offset %u", hdr->xlp_tli, state->latestPageTLI, @@ -1347,8 +1382,9 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr, void XLogReaderResetError(XLogReaderState *state) { - state->errormsg_buf[0] = '\0'; - state->errormsg_deferred = false; + state->errordata.message[0] = '\0'; + state->errordata.message_deferred = false; + state->errordata.code = XLOG_READER_NONE; } /* @@ -1368,7 +1404,7 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr) XLogRecPtr tmpRecPtr; XLogRecPtr found = InvalidXLogRecPtr; XLogPageHeader header; - char *errormsg; + XLogReaderError errordata = {0}; Assert(!XLogRecPtrIsInvalid(RecPtr)); @@ -1453,7 +1489,7 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr) * or we just jumped over the remaining data of a continuation. */ XLogBeginRead(state, tmpRecPtr); - while (XLogReadRecord(state, &errormsg) != NULL) + while (XLogReadRecord(state, &errordata) != NULL) { /* past the record we've found, break out */ if (RecPtr <= state->ReadRecPtr) @@ -1598,8 +1634,9 @@ ResetDecoder(XLogReaderState *state) state->decode_buffer_head = state->decode_buffer; /* Clear error state. */ - state->errormsg_buf[0] = '\0'; - state->errormsg_deferred = false; + state->errordata.message[0] = '\0'; + state->errordata.message_deferred = false; + state->errordata.code = XLOG_READER_NONE; } /* @@ -1649,7 +1686,7 @@ DecodeXLogRecord(XLogReaderState *state, DecodedXLogRecord *decoded, XLogRecord *record, XLogRecPtr lsn, - char **errormsg) + XLogReaderError *errordata) { /* * read next _size bytes from record buffer, but check for overrun first. @@ -1732,6 +1769,7 @@ DecodeXLogRecord(XLogReaderState *state, if (block_id <= decoded->max_block_id) { report_invalid_record(state, + XLOG_READER_INVALID_DATA, "out-of-order block_id %u at %X/%X", block_id, LSN_FORMAT_ARGS(state->ReadRecPtr)); @@ -1756,6 +1794,7 @@ DecodeXLogRecord(XLogReaderState *state, if (blk->has_data && blk->data_len == 0) { report_invalid_record(state, + XLOG_READER_INVALID_DATA, "BKPBLOCK_HAS_DATA set, but no data included at %X/%X", LSN_FORMAT_ARGS(state->ReadRecPtr)); goto err; @@ -1763,6 +1802,7 @@ DecodeXLogRecord(XLogReaderState *state, if (!blk->has_data && blk->data_len != 0) { report_invalid_record(state, + XLOG_READER_INVALID_DATA, "BKPBLOCK_HAS_DATA not set, but data length is %u at %X/%X", (unsigned int) blk->data_len, LSN_FORMAT_ARGS(state->ReadRecPtr)); @@ -1799,6 +1839,7 @@ DecodeXLogRecord(XLogReaderState *state, blk->bimg_len == BLCKSZ)) { report_invalid_record(state, + XLOG_READER_INVALID_DATA, "BKPIMAGE_HAS_HOLE set, but hole offset %u length %u block image length %u at %X/%X", (unsigned int) blk->hole_offset, (unsigned int) blk->hole_length, @@ -1815,6 +1856,7 @@ DecodeXLogRecord(XLogReaderState *state, (blk->hole_offset != 0 || blk->hole_length != 0)) { report_invalid_record(state, + XLOG_READER_INVALID_DATA, "BKPIMAGE_HAS_HOLE not set, but hole offset %u length %u at %X/%X", (unsigned int) blk->hole_offset, (unsigned int) blk->hole_length, @@ -1829,6 +1871,7 @@ DecodeXLogRecord(XLogReaderState *state, blk->bimg_len == BLCKSZ) { report_invalid_record(state, + XLOG_READER_INVALID_DATA, "BKPIMAGE_COMPRESSED set, but block image length %u at %X/%X", (unsigned int) blk->bimg_len, LSN_FORMAT_ARGS(state->ReadRecPtr)); @@ -1844,6 +1887,7 @@ DecodeXLogRecord(XLogReaderState *state, blk->bimg_len != BLCKSZ) { report_invalid_record(state, + XLOG_READER_INVALID_DATA, "neither BKPIMAGE_HAS_HOLE nor BKPIMAGE_COMPRESSED set, but block image length is %u at %X/%X", (unsigned int) blk->data_len, LSN_FORMAT_ARGS(state->ReadRecPtr)); @@ -1860,6 +1904,7 @@ DecodeXLogRecord(XLogReaderState *state, if (rlocator == NULL) { report_invalid_record(state, + XLOG_READER_INVALID_DATA, "BKPBLOCK_SAME_REL set but no previous rel at %X/%X", LSN_FORMAT_ARGS(state->ReadRecPtr)); goto err; @@ -1872,6 +1917,7 @@ DecodeXLogRecord(XLogReaderState *state, else { report_invalid_record(state, + XLOG_READER_INVALID_DATA, "invalid block_id %u at %X/%X", block_id, LSN_FORMAT_ARGS(state->ReadRecPtr)); goto err; @@ -1939,10 +1985,12 @@ DecodeXLogRecord(XLogReaderState *state, shortdata_err: report_invalid_record(state, + XLOG_READER_INVALID_DATA, "record with invalid length at %X/%X", LSN_FORMAT_ARGS(state->ReadRecPtr)); err: - *errormsg = state->errormsg_buf; + errordata->message = state->errordata.message; + errordata->code = state->errordata.code; return false; } @@ -2049,6 +2097,7 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page) !record->record->blocks[block_id].in_use) { report_invalid_record(record, + XLOG_READER_INVALID_DATA, "could not restore image at %X/%X with invalid block %d specified", LSN_FORMAT_ARGS(record->ReadRecPtr), block_id); @@ -2056,7 +2105,9 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page) } if (!record->record->blocks[block_id].has_image) { - report_invalid_record(record, "could not restore image at %X/%X with invalid state, block %d", + report_invalid_record(record, + XLOG_READER_INVALID_DATA, + "could not restore image at %X/%X with invalid state, block %d", LSN_FORMAT_ARGS(record->ReadRecPtr), block_id); return false; @@ -2083,7 +2134,9 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page) bkpb->bimg_len, BLCKSZ - bkpb->hole_length) <= 0) decomp_success = false; #else - report_invalid_record(record, "could not restore image at %X/%X compressed with %s not supported by build, block %d", + report_invalid_record(record, + XLOG_READER_INVALID_DATA, + "could not restore image at %X/%X compressed with %s not supported by build, block %d", LSN_FORMAT_ARGS(record->ReadRecPtr), "LZ4", block_id); @@ -2100,7 +2153,9 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page) if (ZSTD_isError(decomp_result)) decomp_success = false; #else - report_invalid_record(record, "could not restore image at %X/%X compressed with %s not supported by build, block %d", + report_invalid_record(record, + XLOG_READER_INVALID_DATA, + "could not restore image at %X/%X compressed with %s not supported by build, block %d", LSN_FORMAT_ARGS(record->ReadRecPtr), "zstd", block_id); @@ -2109,7 +2164,9 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page) } else { - report_invalid_record(record, "could not restore image at %X/%X compressed with unknown method, block %d", + report_invalid_record(record, + XLOG_READER_INVALID_DATA, + "could not restore image at %X/%X compressed with unknown method, block %d", LSN_FORMAT_ARGS(record->ReadRecPtr), block_id); return false; @@ -2117,7 +2174,9 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page) if (!decomp_success) { - report_invalid_record(record, "could not decompress image at %X/%X, block %d", + report_invalid_record(record, + XLOG_READER_INVALID_DATA, + "could not decompress image at %X/%X, block %d", LSN_FORMAT_ARGS(record->ReadRecPtr), block_id); return false; diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index becc2bda62..68100bfa4a 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -2454,7 +2454,7 @@ verifyBackupPageConsistency(XLogReaderState *record) if (!RestoreBlockImage(record, block_id, primary_image_masked)) ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), - errmsg_internal("%s", record->errormsg_buf))); + errmsg_internal("%s", record->errordata.message))); /* * If masking function is defined, mask both the primary and replay @@ -3062,9 +3062,9 @@ ReadRecord(XLogPrefetcher *xlogprefetcher, int emode, for (;;) { - char *errormsg; + XLogReaderError errordata = {0}; - record = XLogPrefetcherReadRecord(xlogprefetcher, &errormsg); + record = XLogPrefetcherReadRecord(xlogprefetcher, &errordata); if (record == NULL) { /* @@ -3098,9 +3098,9 @@ ReadRecord(XLogPrefetcher *xlogprefetcher, int emode, * StandbyMode that only happens if we have been triggered, so we * shouldn't loop anymore in that case. */ - if (errormsg) + if (errordata.message) ereport(emode_for_corrupt_record(emode, xlogreader->EndRecPtr), - (errmsg_internal("%s", errormsg) /* already translated */ )); + (errmsg_internal("%s", errordata.message) /* already translated */ )); } /* @@ -3385,9 +3385,9 @@ retry: * Emit this error right now then retry this page immediately. Use * errmsg_internal() because the message was already translated. */ - if (xlogreader->errormsg_buf[0]) + if (xlogreader->errordata.message[0]) ereport(emode_for_corrupt_record(emode, xlogreader->EndRecPtr), - (errmsg_internal("%s", xlogreader->errormsg_buf))); + (errmsg_internal("%s", xlogreader->errordata.message))); /* reset any error XLogReaderValidatePageHeader() might have set */ XLogReaderResetError(xlogreader); diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index e174a2a891..5c64454e7e 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -395,7 +395,7 @@ XLogReadBufferForRedoExtended(XLogReaderState *record, if (!RestoreBlockImage(record, block_id, page)) ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), - errmsg_internal("%s", record->errormsg_buf))); + errmsg_internal("%s", record->errordata.message))); /* * The page may be uninitialized. If so, we can't set the LSN because diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 41243d0187..f48feab944 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -641,12 +641,13 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx) for (;;) { XLogRecord *record; - char *err = NULL; + XLogReaderError errordata = {0}; /* the read_page callback waits for new WAL */ - record = XLogReadRecord(ctx->reader, &err); - if (err) - elog(ERROR, "could not find logical decoding starting point: %s", err); + record = XLogReadRecord(ctx->reader, &errordata); + if (errordata.message) + elog(ERROR, "could not find logical decoding starting point: %s", + errordata.message); if (!record) elog(ERROR, "could not find logical decoding starting point"); diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index 55a24c02c9..e7f74809e3 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -244,11 +244,12 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin while (ctx->reader->EndRecPtr < end_of_wal) { XLogRecord *record; - char *errm = NULL; + XLogReaderError errordata = {0}; - record = XLogReadRecord(ctx->reader, &errm); - if (errm) - elog(ERROR, "could not find record for logical decoding: %s", errm); + record = XLogReadRecord(ctx->reader, &errordata); + if (errordata.message) + elog(ERROR, "could not find record for logical decoding: %s", + errordata.message); /* * The {begin_txn,change,commit_txn}_wrapper callbacks above will diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 6035cf4816..4fa4e6bfed 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -503,17 +503,17 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto) /* Decode at least one record, until we run out of records */ while (ctx->reader->EndRecPtr < moveto) { - char *errm = NULL; XLogRecord *record; + XLogReaderError errordata = {0}; /* * Read records. No changes are generated in fast_forward mode, * but snapbuilder/slot statuses are updated properly. */ - record = XLogReadRecord(ctx->reader, &errm); - if (errm) + record = XLogReadRecord(ctx->reader, &errordata); + if (errordata.message) elog(ERROR, "could not find record while advancing replication slot: %s", - errm); + errordata.message); /* * Process the record. Storage-level changes are ignored in diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index d27ef2985d..d05c60f09f 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -3045,7 +3045,7 @@ static void XLogSendLogical(void) { XLogRecord *record; - char *errm; + XLogReaderError errordata = {0}; /* * We'll use the current flush point to determine whether we've caught up. @@ -3063,12 +3063,12 @@ XLogSendLogical(void) */ WalSndCaughtUp = false; - record = XLogReadRecord(logical_decoding_ctx->reader, &errm); + record = XLogReadRecord(logical_decoding_ctx->reader, &errordata); /* xlog record was invalid */ - if (errm != NULL) + if (errordata.message != NULL) elog(ERROR, "could not find record while sending logically-decoded data: %s", - errm); + errordata.message); if (record != NULL) { diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c index 27782237d0..2705d9bf45 100644 --- a/src/bin/pg_rewind/parsexlog.c +++ b/src/bin/pg_rewind/parsexlog.c @@ -68,7 +68,7 @@ extractPageMap(const char *datadir, XLogRecPtr startpoint, int tliIndex, { XLogRecord *record; XLogReaderState *xlogreader; - char *errormsg; + XLogReaderError errordata = {0}; XLogPageReadPrivate private; private.tliIndex = tliIndex; @@ -82,16 +82,16 @@ extractPageMap(const char *datadir, XLogRecPtr startpoint, int tliIndex, XLogBeginRead(xlogreader, startpoint); do { - record = XLogReadRecord(xlogreader, &errormsg); + record = XLogReadRecord(xlogreader, &errordata); if (record == NULL) { XLogRecPtr errptr = xlogreader->EndRecPtr; - if (errormsg) + if (errordata.message) pg_fatal("could not read WAL record at %X/%X: %s", LSN_FORMAT_ARGS(errptr), - errormsg); + errordata.message); else pg_fatal("could not read WAL record at %X/%X", LSN_FORMAT_ARGS(errptr)); @@ -126,7 +126,7 @@ readOneRecord(const char *datadir, XLogRecPtr ptr, int tliIndex, { XLogRecord *record; XLogReaderState *xlogreader; - char *errormsg; + XLogReaderError errordata = {0}; XLogPageReadPrivate private; XLogRecPtr endptr; @@ -139,12 +139,12 @@ readOneRecord(const char *datadir, XLogRecPtr ptr, int tliIndex, pg_fatal("out of memory while allocating a WAL reading processor"); XLogBeginRead(xlogreader, ptr); - record = XLogReadRecord(xlogreader, &errormsg); + record = XLogReadRecord(xlogreader, &errordata); if (record == NULL) { - if (errormsg) + if (errordata.message) pg_fatal("could not read WAL record at %X/%X: %s", - LSN_FORMAT_ARGS(ptr), errormsg); + LSN_FORMAT_ARGS(ptr), errordata.message); else pg_fatal("could not read WAL record at %X/%X", LSN_FORMAT_ARGS(ptr)); @@ -173,7 +173,7 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex, XLogRecord *record; XLogRecPtr searchptr; XLogReaderState *xlogreader; - char *errormsg; + XLogReaderError errordata = {0}; XLogPageReadPrivate private; /* @@ -204,14 +204,14 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex, uint8 info; XLogBeginRead(xlogreader, searchptr); - record = XLogReadRecord(xlogreader, &errormsg); + record = XLogReadRecord(xlogreader, &errordata); if (record == NULL) { - if (errormsg) + if (errordata.message) pg_fatal("could not find previous WAL record at %X/%X: %s", LSN_FORMAT_ARGS(searchptr), - errormsg); + errordata.message); else pg_fatal("could not find previous WAL record at %X/%X", LSN_FORMAT_ARGS(searchptr)); diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c index e8b5a6cd61..4129ba901b 100644 --- a/src/bin/pg_waldump/pg_waldump.c +++ b/src/bin/pg_waldump/pg_waldump.c @@ -508,7 +508,7 @@ XLogRecordSaveFPWs(XLogReaderState *record, const char *savepath) /* Full page exists, so let's save it */ if (!RestoreBlockImage(record, block_id, page)) - pg_fatal("%s", record->errormsg_buf); + pg_fatal("%s", record->errordata.message); (void) XLogRecGetBlockTagExtended(record, block_id, &rnode, &fork, &blk, NULL); @@ -796,7 +796,7 @@ main(int argc, char **argv) XLogRecord *record; XLogRecPtr first_record; char *waldir = NULL; - char *errormsg; + XLogReaderError errordata = {0}; static struct option long_options[] = { {"bkp-details", no_argument, NULL, 'b'}, @@ -1239,7 +1239,7 @@ main(int argc, char **argv) } /* try to read the next record */ - record = XLogReadRecord(xlogreader_state, &errormsg); + record = XLogReadRecord(xlogreader_state, &errordata); if (!record) { if (!config.follow || private.endptr_reached) @@ -1304,10 +1304,10 @@ main(int argc, char **argv) if (time_to_stop) exit(0); - if (errormsg) + if (errordata.message) pg_fatal("error in WAL record at %X/%X: %s", LSN_FORMAT_ARGS(xlogreader_state->ReadRecPtr), - errormsg); + errordata.message); XLogReaderFree(xlogreader_state); diff --git a/contrib/pg_walinspect/pg_walinspect.c b/contrib/pg_walinspect/pg_walinspect.c index 796a74f322..e7d30554ed 100644 --- a/contrib/pg_walinspect/pg_walinspect.c +++ b/contrib/pg_walinspect/pg_walinspect.c @@ -146,9 +146,9 @@ static XLogRecord * ReadNextXLogRecord(XLogReaderState *xlogreader) { XLogRecord *record; - char *errormsg; + XLogReaderError errordata = {0}; - record = XLogReadRecord(xlogreader, &errormsg); + record = XLogReadRecord(xlogreader, &errordata); if (record == NULL) { @@ -161,11 +161,12 @@ ReadNextXLogRecord(XLogReaderState *xlogreader) if (private_data->end_of_wal) return NULL; - if (errormsg) + if (errordata.message) ereport(ERROR, (errcode_for_file_access(), errmsg("could not read WAL at %X/%X: %s", - LSN_FORMAT_ARGS(xlogreader->EndRecPtr), errormsg))); + LSN_FORMAT_ARGS(xlogreader->EndRecPtr), + errordata.message))); else ereport(ERROR, (errcode_for_file_access(), @@ -384,7 +385,7 @@ GetWALBlockInfo(FunctionCallInfo fcinfo, XLogReaderState *record, if (!RestoreBlockImage(record, block_id, page)) ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), - errmsg_internal("%s", record->errormsg_buf))); + errmsg_internal("%s", record->errordata.message))); block_fpi_data = (bytea *) palloc(BLCKSZ + VARHDRSZ); SET_VARSIZE(block_fpi_data, BLCKSZ + VARHDRSZ); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 66823bc2a7..53ce72c4c2 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -3077,6 +3077,8 @@ XLogPageReadResult XLogPrefetchStats XLogPrefetcher XLogPrefetcherFilter +XLogReaderError +XLogReaderErrorCode XLogReaderRoutine XLogReaderState XLogRecData -- 2.40.1
From e374c0ea3a9e775f44ec63582895e4596c65da1c Mon Sep 17 00:00:00 2001 From: Michael Paquier <mich...@paquier.xyz> Date: Wed, 9 Aug 2023 14:53:26 +0900 Subject: [PATCH v2 2/3] Make WAL replay more robust on OOM failures This takes advantage of the new error facility for WAL readers, allowing WAL replay to loop when an out-of-memory happens when reading a record. This was the origin of potential data loss scenarios, making WAL replay more robust by acting like a standby here. --- src/backend/access/transam/xlogrecovery.c | 75 ++++++++++++++++------- 1 file changed, 52 insertions(+), 23 deletions(-) diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index 68100bfa4a..d6811c8678 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -3067,29 +3067,50 @@ ReadRecord(XLogPrefetcher *xlogprefetcher, int emode, record = XLogPrefetcherReadRecord(xlogprefetcher, &errordata); if (record == NULL) { - /* - * When we find that WAL ends in an incomplete record, keep track - * of that record. After recovery is done, we'll write a record - * to indicate to downstream WAL readers that that portion is to - * be ignored. - * - * However, when ArchiveRecoveryRequested = true, we're going to - * switch to a new timeline at the end of recovery. We will only - * copy WAL over to the new timeline up to the end of the last - * complete record, so if we did this, we would later create an - * overwrite contrecord in the wrong place, breaking everything. - */ - if (!ArchiveRecoveryRequested && - !XLogRecPtrIsInvalid(xlogreader->abortedRecPtr)) + switch (errordata.code) { - abortedRecPtr = xlogreader->abortedRecPtr; - missingContrecPtr = xlogreader->missingContrecPtr; - } + case XLOG_READER_NONE: + /* Possible when XLogPageRead() has failed */ + Assert(!errordata.message); + /* FALLTHROUGH */ - if (readFile >= 0) - { - close(readFile); - readFile = -1; + case XLOG_READER_INVALID_DATA: + + /* + * When we find that WAL ends in an incomplete record, + * keep track of that record. After recovery is done, + * we'll write a record to indicate to downstream WAL + * readers that that portion is to be ignored. + * + * However, when ArchiveRecoveryRequested = true, we're + * going to switch to a new timeline at the end of + * recovery. We will only copy WAL over to the new + * timeline up to the end of the last complete record, so + * if we did this, we would later create an overwrite + * contrecord in the wrong place, breaking everything. + */ + if (!ArchiveRecoveryRequested && + !XLogRecPtrIsInvalid(xlogreader->abortedRecPtr)) + { + abortedRecPtr = xlogreader->abortedRecPtr; + missingContrecPtr = xlogreader->missingContrecPtr; + } + + if (readFile >= 0) + { + close(readFile); + readFile = -1; + } + break; + case XLOG_READER_OOM: + + /* + * If we failed because of an out-of-memory problem, just + * give up and retry recovery later. It may be posible + * that the WAL record to decode required a larger memory + * allocation than what the host can offer. + */ + break; } /* @@ -3147,9 +3168,12 @@ ReadRecord(XLogPrefetcher *xlogprefetcher, int emode, * WAL from the archive, even if pg_wal is completely empty, but * we'd have no idea how far we'd have to replay to reach * consistency. So err on the safe side and give up. + * + * It may be possible that the record was not decoded because of + * an out-of-memory failure. In this case, just loop. */ if (!InArchiveRecovery && ArchiveRecoveryRequested && - !fetching_ckpt) + !fetching_ckpt && errordata.code != XLOG_READER_OOM) { ereport(DEBUG1, (errmsg_internal("reached end of WAL in pg_wal, entering archive recovery"))); @@ -3173,9 +3197,14 @@ ReadRecord(XLogPrefetcher *xlogprefetcher, int emode, continue; } - /* In standby mode, loop back to retry. Otherwise, give up. */ + /* + * In standby mode or if the WAL record failed on a out-of-memory, + * loop back to retry. Otherwise, give up. + */ if (StandbyMode && !CheckForStandbyTrigger()) continue; + else if (errordata.code == XLOG_READER_OOM) + continue; else return NULL; } -- 2.40.1
From 8c834db3da2b5391cebeff7404a8cc8ca15b58b2 Mon Sep 17 00:00:00 2001 From: Michael Paquier <mich...@paquier.xyz> Date: Wed, 9 Aug 2023 14:53:44 +0900 Subject: [PATCH v2 3/3] Tweak to force OOM behavior when replaying records --- src/backend/access/transam/xlogreader.c | 26 ++++++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index 891e38e6e7..c5f7985d88 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -557,6 +557,7 @@ XLogDecodeNextRecord(XLogReaderState *state, bool nonblocking) int readOff; DecodedXLogRecord *decoded; XLogReaderError errordata = {0}; /* not used */ + bool trigger_oom = false; /* * randAccess indicates whether to verify the previous-record pointer of @@ -708,7 +709,30 @@ restart: total_len, !nonblocking /* allow_oversized */ ); - if (decoded == NULL) +#ifndef FRONTEND + + /* + * Trick to emulate an OOM after a hardcoded number of records replayed. + */ + { + struct stat fstat; + static int counter = 0; + + if (stat("/tmp/xlogreader_oom", &fstat) == 0) + { + counter++; + if (counter >= 100) + { + trigger_oom = true; + + /* Reset counter, to not fail when shutting down WAL */ + counter = 0; + } + } + } +#endif + + if (decoded == NULL || trigger_oom) { /* * There is no space in the decode buffer. The caller should help -- 2.40.1
signature.asc
Description: PGP signature