On Thu, Aug 10, 2023 at 02:59:07PM +0900, Michael Paquier wrote: > My apologies if I sounded unclear here. It seems to me that we should > wrap the patch on [1] first, and get it backpatched. At least that > makes for less conflicts when 0001 gets merged for HEAD when we are > able to set a proper error code. (Was looking at it, actually.)
Now that Thomas Munro has addressed the original problem to be able to trust correctly xl_tot_len with bae868caf22, I am coming back to this thread. First, attached is a rebased set: - 0001 to introduce the new error infra for xlogreader.c with an error code, so as callers can make the difference between an OOM and an invalid record. - 0002 to tweak the startup process. Once again, I've taken the approach to make the startup process behave like a standby on crash recovery: each time that an OOM is found, we loop and retry. - 0003 to emulate an OOM failure, that can be used with the script attached to see that we don't stop recovery too early. >>> I guess that it depends on how much responsiveness one may want. >>> Forcing a failure on OOM is at least something that users would be >>> immediately able to act on when we don't run a standby but just >>> recover from a crash, while a standby would do what it is designed to >>> do, aka continue to replay what it can see. One issue with the >>> wait-and-continue is that a standby may loop continuously on OOM, >>> which could be also bad if there's a replication slot retaining WAL on >>> the primary. Perhaps that's just OK to keep doing that for a >>> standby. At least this makes the discussion easier for the sake of >>> this thread: just consider the case of crash recovery when we don't >>> have a standby. >> >> Yeah, I'm with you on focusing on crash recovery cases; that's what I >> intended. Sorry for any confusion. > > Okay, so we're on the same page here, keeping standbys as they are and > do something for the crash recovery case. For the crash recovery case, one argument that stood out in my mind is that causing a hard failure has the disadvantage to force users to do again WAL replay from the last redo position, which may be far away even if the checkpointer now runs during crash recovery. What I am proposing on this thread has the merit to avoid that. Anyway, let's discuss more before settling this point for the crash recovery case. By the way, anything that I am proposing here cannot be backpatched because of the infrastructure changes required in walreader.c, so I am going to create a second thread with something that could be backpatched (yeah, likely FATALs on OOM to stop recovery from doing something bad).. -- Michael
From 3d7bb24fc2f9f070273b63208819c4e54e428d18 Mon Sep 17 00:00:00 2001 From: Michael Paquier <mich...@paquier.xyz> Date: Tue, 26 Sep 2023 15:40:05 +0900 Subject: [PATCH v4 1/3] Add infrastructure to report error codes in WAL reader This commits moves the error state coming from WAL readers into a new structure, that includes the existing pointer to the error message buffer, but it also gains an error code that fed back to the callers of the following routines: XLogPrefetcherReadRecord() XLogReadRecord() XLogNextRecord() DecodeXLogRecord() This will help in improving the decisions to take during recovery depending on the failure more reported. --- src/include/access/xlogprefetcher.h | 2 +- src/include/access/xlogreader.h | 33 +++- src/backend/access/transam/twophase.c | 8 +- src/backend/access/transam/xlog.c | 6 +- src/backend/access/transam/xlogprefetcher.c | 4 +- src/backend/access/transam/xlogreader.c | 170 ++++++++++++------ src/backend/access/transam/xlogrecovery.c | 14 +- src/backend/access/transam/xlogutils.c | 2 +- src/backend/replication/logical/logical.c | 9 +- .../replication/logical/logicalfuncs.c | 9 +- src/backend/replication/slotfuncs.c | 8 +- src/backend/replication/walsender.c | 8 +- src/bin/pg_rewind/parsexlog.c | 24 +-- src/bin/pg_waldump/pg_waldump.c | 10 +- contrib/pg_walinspect/pg_walinspect.c | 11 +- src/tools/pgindent/typedefs.list | 2 + 16 files changed, 201 insertions(+), 119 deletions(-) diff --git a/src/include/access/xlogprefetcher.h b/src/include/access/xlogprefetcher.h index 7dd7f20ad0..5563ad1a67 100644 --- a/src/include/access/xlogprefetcher.h +++ b/src/include/access/xlogprefetcher.h @@ -48,7 +48,7 @@ extern void XLogPrefetcherBeginRead(XLogPrefetcher *prefetcher, XLogRecPtr recPtr); extern XLogRecord *XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, - char **errmsg); + XLogReaderError *errordata); extern void XLogPrefetcherComputeStats(XLogPrefetcher *prefetcher); diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h index da32c7db77..06664dc6fb 100644 --- a/src/include/access/xlogreader.h +++ b/src/include/access/xlogreader.h @@ -58,6 +58,24 @@ typedef struct WALSegmentContext typedef struct XLogReaderState XLogReaderState; +/* Values for XLogReaderError.errorcode */ +typedef enum XLogReaderErrorCode +{ + XLOG_READER_NO_ERROR = 0, + XLOG_READER_OOM, /* out-of-memory */ + XLOG_READER_INVALID_DATA, /* record data */ +} XLogReaderErrorCode; + +/* Error status generated by a WAL reader on failure */ +typedef struct XLogReaderError +{ + /* Buffer to hold error message */ + char *message; + /* Error code when filling *message */ + XLogReaderErrorCode code; +} XLogReaderError; + + /* Function type definitions for various xlogreader interactions */ typedef int (*XLogPageReadCB) (XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, @@ -307,9 +325,9 @@ struct XLogReaderState char *readRecordBuf; uint32 readRecordBufSize; - /* Buffer to hold error message */ - char *errormsg_buf; - bool errormsg_deferred; + /* Error state data */ + XLogReaderError errordata; + bool errordata_deferred; /* * Flag to indicate to XLogPageReadCB that it should not block waiting for @@ -324,7 +342,8 @@ struct XLogReaderState static inline bool XLogReaderHasQueuedRecordOrError(XLogReaderState *state) { - return (state->decode_queue_head != NULL) || state->errormsg_deferred; + return (state->decode_queue_head != NULL) || + state->errordata_deferred; } /* Get a new XLogReader */ @@ -355,11 +374,11 @@ typedef enum XLogPageReadResult /* Read the next XLog record. Returns NULL on end-of-WAL or failure */ extern struct XLogRecord *XLogReadRecord(XLogReaderState *state, - char **errormsg); + XLogReaderError *errordata); /* Consume the next record or error. */ extern DecodedXLogRecord *XLogNextRecord(XLogReaderState *state, - char **errormsg); + XLogReaderError *errordata); /* Release the previously returned record, if necessary. */ extern XLogRecPtr XLogReleasePreviousRecord(XLogReaderState *state); @@ -399,7 +418,7 @@ extern bool DecodeXLogRecord(XLogReaderState *state, DecodedXLogRecord *decoded, XLogRecord *record, XLogRecPtr lsn, - char **errormsg); + XLogReaderError *errordata); /* * Macros that provide access to parts of the record most recently returned by diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index c6af8cfd7e..08bd6586ec 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -1399,7 +1399,7 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len) { XLogRecord *record; XLogReaderState *xlogreader; - char *errormsg; + XLogReaderError errordata = {0}; xlogreader = XLogReaderAllocate(wal_segment_size, NULL, XL_ROUTINE(.page_read = &read_local_xlog_page, @@ -1413,15 +1413,15 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len) errdetail("Failed while allocating a WAL reading processor."))); XLogBeginRead(xlogreader, lsn); - record = XLogReadRecord(xlogreader, &errormsg); + record = XLogReadRecord(xlogreader, &errordata); if (record == NULL) { - if (errormsg) + if (errordata.message) ereport(ERROR, (errcode_for_file_access(), errmsg("could not read two-phase state from WAL at %X/%X: %s", - LSN_FORMAT_ARGS(lsn), errormsg))); + LSN_FORMAT_ARGS(lsn), errordata.message))); else ereport(ERROR, (errcode_for_file_access(), diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index fcbde10529..56dd9f5b64 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -953,7 +953,7 @@ XLogInsertRecord(XLogRecData *rdata, DecodedXLogRecord *decoded; StringInfoData buf; StringInfoData recordBuf; - char *errormsg = NULL; + XLogReaderError errordata = {0}; MemoryContext oldCxt; oldCxt = MemoryContextSwitchTo(walDebugCxt); @@ -987,10 +987,10 @@ XLogInsertRecord(XLogRecData *rdata, decoded, record, EndPos, - &errormsg)) + &errordata)) { appendStringInfo(&buf, "error decoding record: %s", - errormsg ? errormsg : "no error message"); + errordata.message ? errordata.message : "no error message"); } else { diff --git a/src/backend/access/transam/xlogprefetcher.c b/src/backend/access/transam/xlogprefetcher.c index 539928cb85..92d691ca49 100644 --- a/src/backend/access/transam/xlogprefetcher.c +++ b/src/backend/access/transam/xlogprefetcher.c @@ -984,7 +984,7 @@ XLogPrefetcherBeginRead(XLogPrefetcher *prefetcher, XLogRecPtr recPtr) * tries to initiate I/O for blocks referenced in future WAL records. */ XLogRecord * -XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg) +XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, XLogReaderError *errdata) { DecodedXLogRecord *record; XLogRecPtr replayed_up_to; @@ -1052,7 +1052,7 @@ XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg) } /* Read the next record. */ - record = XLogNextRecord(prefetcher->reader, errmsg); + record = XLogNextRecord(prefetcher->reader, errdata); if (!record) return NULL; diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index a17263df20..fd1413b6d3 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -41,8 +41,10 @@ #include "common/logging.h" #endif -static void report_invalid_record(XLogReaderState *state, const char *fmt,...) - pg_attribute_printf(2, 3); +static void report_invalid_record(XLogReaderState *state, + XLogReaderErrorCode errorcode, + const char *fmt,...) + pg_attribute_printf(3, 4); static bool allocate_recordbuf(XLogReaderState *state, uint32 reclength); static int ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen); @@ -66,21 +68,23 @@ static void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt, #define DEFAULT_DECODE_BUFFER_SIZE (64 * 1024) /* - * Construct a string in state->errormsg_buf explaining what's wrong with + * Construct a string in state->errordata.message explaining what's wrong with * the current record being read. */ static void -report_invalid_record(XLogReaderState *state, const char *fmt,...) +report_invalid_record(XLogReaderState *state, XLogReaderErrorCode errorcode, + const char *fmt,...) { va_list args; fmt = _(fmt); va_start(args, fmt); - vsnprintf(state->errormsg_buf, MAX_ERRORMSG_LEN, fmt, args); + vsnprintf(state->errordata.message, MAX_ERRORMSG_LEN, fmt, args); va_end(args); - state->errormsg_deferred = true; + state->errordata_deferred = true; + state->errordata.code = errorcode; } /* @@ -141,15 +145,16 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir, /* system_identifier initialized to zeroes above */ state->private_data = private_data; /* ReadRecPtr, EndRecPtr and readLen initialized to zeroes above */ - state->errormsg_buf = palloc_extended(MAX_ERRORMSG_LEN + 1, - MCXT_ALLOC_NO_OOM); - if (!state->errormsg_buf) + state->errordata.message = palloc_extended(MAX_ERRORMSG_LEN + 1, + MCXT_ALLOC_NO_OOM); + if (!state->errordata.message) { pfree(state->readBuf); pfree(state); return NULL; } - state->errormsg_buf[0] = '\0'; + state->errordata.message[0] = '\0'; + state->errordata.code = XLOG_READER_NO_ERROR; /* * Allocate an initial readRecordBuf of minimal size, which can later be @@ -157,7 +162,7 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir, */ if (!allocate_recordbuf(state, 0)) { - pfree(state->errormsg_buf); + pfree(state->errordata.message); pfree(state->readBuf); pfree(state); return NULL; @@ -175,7 +180,7 @@ XLogReaderFree(XLogReaderState *state) if (state->decode_buffer && state->free_decode_buffer) pfree(state->decode_buffer); - pfree(state->errormsg_buf); + pfree(state->errordata.message); if (state->readRecordBuf) pfree(state->readRecordBuf); pfree(state->readBuf); @@ -335,23 +340,27 @@ XLogReleasePreviousRecord(XLogReaderState *state) * * On success, a record is returned. * - * The returned record (or *errormsg) points to an internal buffer that's - * valid until the next call to XLogNextRecord. + * The returned record (or errordata->message) points to an internal buffer + * that's valid until the next call to XLogNextRecord. */ DecodedXLogRecord * -XLogNextRecord(XLogReaderState *state, char **errormsg) +XLogNextRecord(XLogReaderState *state, XLogReaderError *errordata) { /* Release the last record returned by XLogNextRecord(). */ XLogReleasePreviousRecord(state); if (state->decode_queue_head == NULL) { - *errormsg = NULL; - if (state->errormsg_deferred) + errordata->message = NULL; + errordata->code = XLOG_READER_NO_ERROR; + if (state->errordata_deferred) { - if (state->errormsg_buf[0] != '\0') - *errormsg = state->errormsg_buf; - state->errormsg_deferred = false; + if (state->errordata.message[0] != '\0') + errordata->message = state->errordata.message; + if (state->errordata.code != XLOG_READER_NO_ERROR) + errordata->code = state->errordata.code; + state->errordata_deferred = false; + state->errordata.code = XLOG_READER_NO_ERROR; } /* @@ -381,7 +390,8 @@ XLogNextRecord(XLogReaderState *state, char **errormsg) state->ReadRecPtr = state->record->lsn; state->EndRecPtr = state->record->next_lsn; - *errormsg = NULL; + errordata->message = NULL; + errordata->code = XLOG_READER_NO_ERROR; return state->record; } @@ -393,17 +403,17 @@ XLogNextRecord(XLogReaderState *state, char **errormsg) * to XLogReadRecord(). * * If the page_read callback fails to read the requested data, NULL is - * returned. The callback is expected to have reported the error; errormsg - * is set to NULL. + * returned. The callback is expected to have reported the error; + * errordata->message is set to NULL. * * If the reading fails for some other reason, NULL is also returned, and - * *errormsg is set to a string with details of the failure. + * *errordata is set with details of the failure. * - * The returned pointer (or *errormsg) points to an internal buffer that's - * valid until the next call to XLogReadRecord. + * The returned pointer (or *errordata.message) points to an internal + * buffer that's valid until the next call to XLogReadRecord. */ XLogRecord * -XLogReadRecord(XLogReaderState *state, char **errormsg) +XLogReadRecord(XLogReaderState *state, XLogReaderError *errordata) { DecodedXLogRecord *decoded; @@ -421,7 +431,7 @@ XLogReadRecord(XLogReaderState *state, char **errormsg) XLogReadAhead(state, false /* nonblocking */ ); /* Consume the head record or error. */ - decoded = XLogNextRecord(state, errormsg); + decoded = XLogNextRecord(state, errordata); if (decoded) { /* @@ -530,7 +540,7 @@ XLogDecodeNextRecord(XLogReaderState *state, bool nonblocking) bool gotheader; int readOff; DecodedXLogRecord *decoded; - char *errormsg; /* not used */ + XLogReaderError errordata = {0}; /* not used */ /* * randAccess indicates whether to verify the previous-record pointer of @@ -540,7 +550,8 @@ XLogDecodeNextRecord(XLogReaderState *state, bool nonblocking) randAccess = false; /* reset error state */ - state->errormsg_buf[0] = '\0'; + state->errordata.message[0] = '\0'; + state->errordata.code = XLOG_READER_NO_ERROR; decoded = NULL; state->abortedRecPtr = InvalidXLogRecPtr; @@ -607,7 +618,9 @@ restart: } else if (targetRecOff < pageHeaderSize) { - report_invalid_record(state, "invalid record offset at %X/%X: expected at least %u, got %u", + report_invalid_record(state, + XLOG_READER_INVALID_DATA, + "invalid record offset at %X/%X: expected at least %u, got %u", LSN_FORMAT_ARGS(RecPtr), pageHeaderSize, targetRecOff); goto err; @@ -616,7 +629,9 @@ restart: if ((((XLogPageHeader) state->readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) && targetRecOff == pageHeaderSize) { - report_invalid_record(state, "contrecord is requested by %X/%X", + report_invalid_record(state, + XLOG_READER_INVALID_DATA, + "contrecord is requested by %X/%X", LSN_FORMAT_ARGS(RecPtr)); goto err; } @@ -657,6 +672,7 @@ restart: if (total_len < SizeOfXLogRecord) { report_invalid_record(state, + XLOG_READER_INVALID_DATA, "invalid record length at %X/%X: expected at least %u, got %u", LSN_FORMAT_ARGS(RecPtr), (uint32) SizeOfXLogRecord, total_len); @@ -746,6 +762,7 @@ restart: if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD)) { report_invalid_record(state, + XLOG_READER_INVALID_DATA, "there is no contrecord flag at %X/%X", LSN_FORMAT_ARGS(RecPtr)); goto err; @@ -759,6 +776,7 @@ restart: total_len != (pageHeader->xlp_rem_len + gotlen)) { report_invalid_record(state, + XLOG_READER_INVALID_DATA, "invalid contrecord length %u (expected %lld) at %X/%X", pageHeader->xlp_rem_len, ((long long) total_len) - gotlen, @@ -817,8 +835,10 @@ restart: memcpy(save_copy, state->readRecordBuf, gotlen); if (!allocate_recordbuf(state, total_len)) { - /* We treat this as a "bogus data" condition */ - report_invalid_record(state, "record length %u at %X/%X too long", + /* We treat this as an out-of-memory error */ + report_invalid_record(state, + XLOG_READER_OOM, + "record length %u at %X/%X too long", total_len, LSN_FORMAT_ARGS(RecPtr)); goto err; } @@ -881,15 +901,16 @@ restart: { /* * We failed to allocate memory for an oversized record. As - * above, we currently treat this as a "bogus data" condition. + * above, we currently treat this as an out-of-memory error. */ report_invalid_record(state, + XLOG_READER_OOM, "out of memory while trying to decode a record of length %u", total_len); goto err; } } - if (DecodeXLogRecord(state, decoded, record, RecPtr, &errormsg)) + if (DecodeXLogRecord(state, decoded, record, RecPtr, &errordata)) { /* Record the location of the next record. */ decoded->next_lsn = state->NextRecPtr; @@ -938,7 +959,7 @@ err: * queued so that XLogPrefetcherReadRecord() doesn't bring us back a * second time and clobber the above state. */ - state->errormsg_deferred = true; + state->errordata_deferred = true; } if (decoded && decoded->oversized) @@ -951,9 +972,9 @@ err: XLogReaderInvalReadState(state); /* - * If an error was written to errmsg_buf, it'll be returned to the caller - * of XLogReadRecord() after all successfully decoded records from the - * read queue. + * If an error was written to errordata.message, it'll be returned to the + * caller of XLogReadRecord() after all successfully decoded records from + * the read queue. */ return XLREAD_FAIL; @@ -972,7 +993,7 @@ XLogReadAhead(XLogReaderState *state, bool nonblocking) { XLogPageReadResult result; - if (state->errormsg_deferred) + if (state->errordata_deferred) return NULL; result = XLogDecodeNextRecord(state, nonblocking); @@ -990,8 +1011,8 @@ XLogReadAhead(XLogReaderState *state, bool nonblocking) * via the page_read() callback. * * Returns XLREAD_FAIL if the required page cannot be read for some - * reason; errormsg_buf is set in that case (unless the error occurs in the - * page_read callback). + * reason; errordata.message is set in that case (unless the error occurs in + * the page_read callback). * * Returns XLREAD_WOULDBLOCK if the requested data can't be read without * waiting. This can be returned only if the installed page_read callback @@ -1136,6 +1157,7 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr, if (record->xl_tot_len < SizeOfXLogRecord) { report_invalid_record(state, + XLOG_READER_INVALID_DATA, "invalid record length at %X/%X: expected at least %u, got %u", LSN_FORMAT_ARGS(RecPtr), (uint32) SizeOfXLogRecord, record->xl_tot_len); @@ -1144,6 +1166,7 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr, if (!RmgrIdIsValid(record->xl_rmid)) { report_invalid_record(state, + XLOG_READER_INVALID_DATA, "invalid resource manager ID %u at %X/%X", record->xl_rmid, LSN_FORMAT_ARGS(RecPtr)); return false; @@ -1157,6 +1180,7 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr, if (!(record->xl_prev < RecPtr)) { report_invalid_record(state, + XLOG_READER_INVALID_DATA, "record with incorrect prev-link %X/%X at %X/%X", LSN_FORMAT_ARGS(record->xl_prev), LSN_FORMAT_ARGS(RecPtr)); @@ -1173,6 +1197,7 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr, if (record->xl_prev != PrevRecPtr) { report_invalid_record(state, + XLOG_READER_INVALID_DATA, "record with incorrect prev-link %X/%X at %X/%X", LSN_FORMAT_ARGS(record->xl_prev), LSN_FORMAT_ARGS(RecPtr)); @@ -1211,6 +1236,7 @@ ValidXLogRecord(XLogReaderState *state, XLogRecord *record, XLogRecPtr recptr) if (!EQ_CRC32C(record->xl_crc, crc)) { report_invalid_record(state, + XLOG_READER_INVALID_DATA, "incorrect resource manager data checksum in record at %X/%X", LSN_FORMAT_ARGS(recptr)); return false; @@ -1245,6 +1271,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr, XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize); report_invalid_record(state, + XLOG_READER_INVALID_DATA, "invalid magic number %04X in WAL segment %s, LSN %X/%X, offset %u", hdr->xlp_magic, fname, @@ -1260,6 +1287,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr, XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize); report_invalid_record(state, + XLOG_READER_INVALID_DATA, "invalid info bits %04X in WAL segment %s, LSN %X/%X, offset %u", hdr->xlp_info, fname, @@ -1276,6 +1304,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr, longhdr->xlp_sysid != state->system_identifier) { report_invalid_record(state, + XLOG_READER_INVALID_DATA, "WAL file is from different database system: WAL file database system identifier is %llu, pg_control database system identifier is %llu", (unsigned long long) longhdr->xlp_sysid, (unsigned long long) state->system_identifier); @@ -1284,12 +1313,14 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr, else if (longhdr->xlp_seg_size != state->segcxt.ws_segsize) { report_invalid_record(state, + XLOG_READER_INVALID_DATA, "WAL file is from different database system: incorrect segment size in page header"); return false; } else if (longhdr->xlp_xlog_blcksz != XLOG_BLCKSZ) { report_invalid_record(state, + XLOG_READER_INVALID_DATA, "WAL file is from different database system: incorrect XLOG_BLCKSZ in page header"); return false; } @@ -1302,6 +1333,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr, /* hmm, first page of file doesn't have a long header? */ report_invalid_record(state, + XLOG_READER_INVALID_DATA, "invalid info bits %04X in WAL segment %s, LSN %X/%X, offset %u", hdr->xlp_info, fname, @@ -1322,6 +1354,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr, XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize); report_invalid_record(state, + XLOG_READER_INVALID_DATA, "unexpected pageaddr %X/%X in WAL segment %s, LSN %X/%X, offset %u", LSN_FORMAT_ARGS(hdr->xlp_pageaddr), fname, @@ -1348,6 +1381,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr, XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize); report_invalid_record(state, + XLOG_READER_INVALID_DATA, "out-of-sequence timeline ID %u (after %u) in WAL segment %s, LSN %X/%X, offset %u", hdr->xlp_tli, state->latestPageTLI, @@ -1369,8 +1403,9 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr, void XLogReaderResetError(XLogReaderState *state) { - state->errormsg_buf[0] = '\0'; - state->errormsg_deferred = false; + state->errordata.message[0] = '\0'; + state->errordata_deferred = false; + state->errordata.code = XLOG_READER_NO_ERROR; } /* @@ -1390,7 +1425,7 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr) XLogRecPtr tmpRecPtr; XLogRecPtr found = InvalidXLogRecPtr; XLogPageHeader header; - char *errormsg; + XLogReaderError errordata = {0}; Assert(!XLogRecPtrIsInvalid(RecPtr)); @@ -1475,7 +1510,7 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr) * or we just jumped over the remaining data of a continuation. */ XLogBeginRead(state, tmpRecPtr); - while (XLogReadRecord(state, &errormsg) != NULL) + while (XLogReadRecord(state, &errordata) != NULL) { /* past the record we've found, break out */ if (RecPtr <= state->ReadRecPtr) @@ -1620,8 +1655,9 @@ ResetDecoder(XLogReaderState *state) state->decode_buffer_head = state->decode_buffer; /* Clear error state. */ - state->errormsg_buf[0] = '\0'; - state->errormsg_deferred = false; + state->errordata.message[0] = '\0'; + state->errordata_deferred = false; + state->errordata.code = XLOG_READER_NO_ERROR; } /* @@ -1671,7 +1707,7 @@ DecodeXLogRecord(XLogReaderState *state, DecodedXLogRecord *decoded, XLogRecord *record, XLogRecPtr lsn, - char **errormsg) + XLogReaderError *errordata) { /* * read next _size bytes from record buffer, but check for overrun first. @@ -1754,6 +1790,7 @@ DecodeXLogRecord(XLogReaderState *state, if (block_id <= decoded->max_block_id) { report_invalid_record(state, + XLOG_READER_INVALID_DATA, "out-of-order block_id %u at %X/%X", block_id, LSN_FORMAT_ARGS(state->ReadRecPtr)); @@ -1778,6 +1815,7 @@ DecodeXLogRecord(XLogReaderState *state, if (blk->has_data && blk->data_len == 0) { report_invalid_record(state, + XLOG_READER_INVALID_DATA, "BKPBLOCK_HAS_DATA set, but no data included at %X/%X", LSN_FORMAT_ARGS(state->ReadRecPtr)); goto err; @@ -1785,6 +1823,7 @@ DecodeXLogRecord(XLogReaderState *state, if (!blk->has_data && blk->data_len != 0) { report_invalid_record(state, + XLOG_READER_INVALID_DATA, "BKPBLOCK_HAS_DATA not set, but data length is %u at %X/%X", (unsigned int) blk->data_len, LSN_FORMAT_ARGS(state->ReadRecPtr)); @@ -1821,6 +1860,7 @@ DecodeXLogRecord(XLogReaderState *state, blk->bimg_len == BLCKSZ)) { report_invalid_record(state, + XLOG_READER_INVALID_DATA, "BKPIMAGE_HAS_HOLE set, but hole offset %u length %u block image length %u at %X/%X", (unsigned int) blk->hole_offset, (unsigned int) blk->hole_length, @@ -1837,6 +1877,7 @@ DecodeXLogRecord(XLogReaderState *state, (blk->hole_offset != 0 || blk->hole_length != 0)) { report_invalid_record(state, + XLOG_READER_INVALID_DATA, "BKPIMAGE_HAS_HOLE not set, but hole offset %u length %u at %X/%X", (unsigned int) blk->hole_offset, (unsigned int) blk->hole_length, @@ -1851,6 +1892,7 @@ DecodeXLogRecord(XLogReaderState *state, blk->bimg_len == BLCKSZ) { report_invalid_record(state, + XLOG_READER_INVALID_DATA, "BKPIMAGE_COMPRESSED set, but block image length %u at %X/%X", (unsigned int) blk->bimg_len, LSN_FORMAT_ARGS(state->ReadRecPtr)); @@ -1866,6 +1908,7 @@ DecodeXLogRecord(XLogReaderState *state, blk->bimg_len != BLCKSZ) { report_invalid_record(state, + XLOG_READER_INVALID_DATA, "neither BKPIMAGE_HAS_HOLE nor BKPIMAGE_COMPRESSED set, but block image length is %u at %X/%X", (unsigned int) blk->data_len, LSN_FORMAT_ARGS(state->ReadRecPtr)); @@ -1882,6 +1925,7 @@ DecodeXLogRecord(XLogReaderState *state, if (rlocator == NULL) { report_invalid_record(state, + XLOG_READER_INVALID_DATA, "BKPBLOCK_SAME_REL set but no previous rel at %X/%X", LSN_FORMAT_ARGS(state->ReadRecPtr)); goto err; @@ -1894,6 +1938,7 @@ DecodeXLogRecord(XLogReaderState *state, else { report_invalid_record(state, + XLOG_READER_INVALID_DATA, "invalid block_id %u at %X/%X", block_id, LSN_FORMAT_ARGS(state->ReadRecPtr)); goto err; @@ -1961,10 +2006,12 @@ DecodeXLogRecord(XLogReaderState *state, shortdata_err: report_invalid_record(state, + XLOG_READER_INVALID_DATA, "record with invalid length at %X/%X", LSN_FORMAT_ARGS(state->ReadRecPtr)); err: - *errormsg = state->errormsg_buf; + errordata->message = state->errordata.message; + errordata->code = state->errordata.code; return false; } @@ -2071,6 +2118,7 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page) !record->record->blocks[block_id].in_use) { report_invalid_record(record, + XLOG_READER_INVALID_DATA, "could not restore image at %X/%X with invalid block %d specified", LSN_FORMAT_ARGS(record->ReadRecPtr), block_id); @@ -2078,7 +2126,9 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page) } if (!record->record->blocks[block_id].has_image) { - report_invalid_record(record, "could not restore image at %X/%X with invalid state, block %d", + report_invalid_record(record, + XLOG_READER_INVALID_DATA, + "could not restore image at %X/%X with invalid state, block %d", LSN_FORMAT_ARGS(record->ReadRecPtr), block_id); return false; @@ -2105,7 +2155,9 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page) bkpb->bimg_len, BLCKSZ - bkpb->hole_length) <= 0) decomp_success = false; #else - report_invalid_record(record, "could not restore image at %X/%X compressed with %s not supported by build, block %d", + report_invalid_record(record, + XLOG_READER_INVALID_DATA, + "could not restore image at %X/%X compressed with %s not supported by build, block %d", LSN_FORMAT_ARGS(record->ReadRecPtr), "LZ4", block_id); @@ -2122,7 +2174,9 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page) if (ZSTD_isError(decomp_result)) decomp_success = false; #else - report_invalid_record(record, "could not restore image at %X/%X compressed with %s not supported by build, block %d", + report_invalid_record(record, + XLOG_READER_INVALID_DATA, + "could not restore image at %X/%X compressed with %s not supported by build, block %d", LSN_FORMAT_ARGS(record->ReadRecPtr), "zstd", block_id); @@ -2131,7 +2185,9 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page) } else { - report_invalid_record(record, "could not restore image at %X/%X compressed with unknown method, block %d", + report_invalid_record(record, + XLOG_READER_INVALID_DATA, + "could not restore image at %X/%X compressed with unknown method, block %d", LSN_FORMAT_ARGS(record->ReadRecPtr), block_id); return false; @@ -2139,7 +2195,9 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page) if (!decomp_success) { - report_invalid_record(record, "could not decompress image at %X/%X, block %d", + report_invalid_record(record, + XLOG_READER_INVALID_DATA, + "could not decompress image at %X/%X, block %d", LSN_FORMAT_ARGS(record->ReadRecPtr), block_id); return false; diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index becc2bda62..68100bfa4a 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -2454,7 +2454,7 @@ verifyBackupPageConsistency(XLogReaderState *record) if (!RestoreBlockImage(record, block_id, primary_image_masked)) ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), - errmsg_internal("%s", record->errormsg_buf))); + errmsg_internal("%s", record->errordata.message))); /* * If masking function is defined, mask both the primary and replay @@ -3062,9 +3062,9 @@ ReadRecord(XLogPrefetcher *xlogprefetcher, int emode, for (;;) { - char *errormsg; + XLogReaderError errordata = {0}; - record = XLogPrefetcherReadRecord(xlogprefetcher, &errormsg); + record = XLogPrefetcherReadRecord(xlogprefetcher, &errordata); if (record == NULL) { /* @@ -3098,9 +3098,9 @@ ReadRecord(XLogPrefetcher *xlogprefetcher, int emode, * StandbyMode that only happens if we have been triggered, so we * shouldn't loop anymore in that case. */ - if (errormsg) + if (errordata.message) ereport(emode_for_corrupt_record(emode, xlogreader->EndRecPtr), - (errmsg_internal("%s", errormsg) /* already translated */ )); + (errmsg_internal("%s", errordata.message) /* already translated */ )); } /* @@ -3385,9 +3385,9 @@ retry: * Emit this error right now then retry this page immediately. Use * errmsg_internal() because the message was already translated. */ - if (xlogreader->errormsg_buf[0]) + if (xlogreader->errordata.message[0]) ereport(emode_for_corrupt_record(emode, xlogreader->EndRecPtr), - (errmsg_internal("%s", xlogreader->errormsg_buf))); + (errmsg_internal("%s", xlogreader->errordata.message))); /* reset any error XLogReaderValidatePageHeader() might have set */ XLogReaderResetError(xlogreader); diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index 43f7b31205..a50fc9cb97 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -395,7 +395,7 @@ XLogReadBufferForRedoExtended(XLogReaderState *record, if (!RestoreBlockImage(record, block_id, page)) ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), - errmsg_internal("%s", record->errormsg_buf))); + errmsg_internal("%s", record->errordata.message))); /* * The page may be uninitialized. If so, we can't set the LSN because diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 41243d0187..f48feab944 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -641,12 +641,13 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx) for (;;) { XLogRecord *record; - char *err = NULL; + XLogReaderError errordata = {0}; /* the read_page callback waits for new WAL */ - record = XLogReadRecord(ctx->reader, &err); - if (err) - elog(ERROR, "could not find logical decoding starting point: %s", err); + record = XLogReadRecord(ctx->reader, &errordata); + if (errordata.message) + elog(ERROR, "could not find logical decoding starting point: %s", + errordata.message); if (!record) elog(ERROR, "could not find logical decoding starting point"); diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index 197169d6b0..ca372e5f66 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -244,11 +244,12 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin while (ctx->reader->EndRecPtr < end_of_wal) { XLogRecord *record; - char *errm = NULL; + XLogReaderError errordata = {0}; - record = XLogReadRecord(ctx->reader, &errm); - if (errm) - elog(ERROR, "could not find record for logical decoding: %s", errm); + record = XLogReadRecord(ctx->reader, &errordata); + if (errordata.message) + elog(ERROR, "could not find record for logical decoding: %s", + errordata.message); /* * The {begin_txn,change,commit_txn}_wrapper callbacks above will diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 6035cf4816..4fa4e6bfed 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -503,17 +503,17 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto) /* Decode at least one record, until we run out of records */ while (ctx->reader->EndRecPtr < moveto) { - char *errm = NULL; XLogRecord *record; + XLogReaderError errordata = {0}; /* * Read records. No changes are generated in fast_forward mode, * but snapbuilder/slot statuses are updated properly. */ - record = XLogReadRecord(ctx->reader, &errm); - if (errm) + record = XLogReadRecord(ctx->reader, &errordata); + if (errordata.message) elog(ERROR, "could not find record while advancing replication slot: %s", - errm); + errordata.message); /* * Process the record. Storage-level changes are ignored in diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index e250b0567e..55109bfa51 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -3045,7 +3045,7 @@ static void XLogSendLogical(void) { XLogRecord *record; - char *errm; + XLogReaderError errordata = {0}; /* * We'll use the current flush point to determine whether we've caught up. @@ -3063,12 +3063,12 @@ XLogSendLogical(void) */ WalSndCaughtUp = false; - record = XLogReadRecord(logical_decoding_ctx->reader, &errm); + record = XLogReadRecord(logical_decoding_ctx->reader, &errordata); /* xlog record was invalid */ - if (errm != NULL) + if (errordata.message != NULL) elog(ERROR, "could not find record while sending logically-decoded data: %s", - errm); + errordata.message); if (record != NULL) { diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c index 27782237d0..2705d9bf45 100644 --- a/src/bin/pg_rewind/parsexlog.c +++ b/src/bin/pg_rewind/parsexlog.c @@ -68,7 +68,7 @@ extractPageMap(const char *datadir, XLogRecPtr startpoint, int tliIndex, { XLogRecord *record; XLogReaderState *xlogreader; - char *errormsg; + XLogReaderError errordata = {0}; XLogPageReadPrivate private; private.tliIndex = tliIndex; @@ -82,16 +82,16 @@ extractPageMap(const char *datadir, XLogRecPtr startpoint, int tliIndex, XLogBeginRead(xlogreader, startpoint); do { - record = XLogReadRecord(xlogreader, &errormsg); + record = XLogReadRecord(xlogreader, &errordata); if (record == NULL) { XLogRecPtr errptr = xlogreader->EndRecPtr; - if (errormsg) + if (errordata.message) pg_fatal("could not read WAL record at %X/%X: %s", LSN_FORMAT_ARGS(errptr), - errormsg); + errordata.message); else pg_fatal("could not read WAL record at %X/%X", LSN_FORMAT_ARGS(errptr)); @@ -126,7 +126,7 @@ readOneRecord(const char *datadir, XLogRecPtr ptr, int tliIndex, { XLogRecord *record; XLogReaderState *xlogreader; - char *errormsg; + XLogReaderError errordata = {0}; XLogPageReadPrivate private; XLogRecPtr endptr; @@ -139,12 +139,12 @@ readOneRecord(const char *datadir, XLogRecPtr ptr, int tliIndex, pg_fatal("out of memory while allocating a WAL reading processor"); XLogBeginRead(xlogreader, ptr); - record = XLogReadRecord(xlogreader, &errormsg); + record = XLogReadRecord(xlogreader, &errordata); if (record == NULL) { - if (errormsg) + if (errordata.message) pg_fatal("could not read WAL record at %X/%X: %s", - LSN_FORMAT_ARGS(ptr), errormsg); + LSN_FORMAT_ARGS(ptr), errordata.message); else pg_fatal("could not read WAL record at %X/%X", LSN_FORMAT_ARGS(ptr)); @@ -173,7 +173,7 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex, XLogRecord *record; XLogRecPtr searchptr; XLogReaderState *xlogreader; - char *errormsg; + XLogReaderError errordata = {0}; XLogPageReadPrivate private; /* @@ -204,14 +204,14 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex, uint8 info; XLogBeginRead(xlogreader, searchptr); - record = XLogReadRecord(xlogreader, &errormsg); + record = XLogReadRecord(xlogreader, &errordata); if (record == NULL) { - if (errormsg) + if (errordata.message) pg_fatal("could not find previous WAL record at %X/%X: %s", LSN_FORMAT_ARGS(searchptr), - errormsg); + errordata.message); else pg_fatal("could not find previous WAL record at %X/%X", LSN_FORMAT_ARGS(searchptr)); diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c index a3535bdfa9..880c93b51b 100644 --- a/src/bin/pg_waldump/pg_waldump.c +++ b/src/bin/pg_waldump/pg_waldump.c @@ -512,7 +512,7 @@ XLogRecordSaveFPWs(XLogReaderState *record, const char *savepath) /* Full page exists, so let's save it */ if (!RestoreBlockImage(record, block_id, page)) - pg_fatal("%s", record->errormsg_buf); + pg_fatal("%s", record->errordata.message); (void) XLogRecGetBlockTagExtended(record, block_id, &rnode, &fork, &blk, NULL); @@ -800,7 +800,7 @@ main(int argc, char **argv) XLogRecord *record; XLogRecPtr first_record; char *waldir = NULL; - char *errormsg; + XLogReaderError errordata = {0}; static struct option long_options[] = { {"bkp-details", no_argument, NULL, 'b'}, @@ -1243,7 +1243,7 @@ main(int argc, char **argv) } /* try to read the next record */ - record = XLogReadRecord(xlogreader_state, &errormsg); + record = XLogReadRecord(xlogreader_state, &errordata); if (!record) { if (!config.follow || private.endptr_reached) @@ -1308,10 +1308,10 @@ main(int argc, char **argv) if (time_to_stop) exit(0); - if (errormsg) + if (errordata.message) pg_fatal("error in WAL record at %X/%X: %s", LSN_FORMAT_ARGS(xlogreader_state->ReadRecPtr), - errormsg); + errordata.message); XLogReaderFree(xlogreader_state); diff --git a/contrib/pg_walinspect/pg_walinspect.c b/contrib/pg_walinspect/pg_walinspect.c index 796a74f322..e7d30554ed 100644 --- a/contrib/pg_walinspect/pg_walinspect.c +++ b/contrib/pg_walinspect/pg_walinspect.c @@ -146,9 +146,9 @@ static XLogRecord * ReadNextXLogRecord(XLogReaderState *xlogreader) { XLogRecord *record; - char *errormsg; + XLogReaderError errordata = {0}; - record = XLogReadRecord(xlogreader, &errormsg); + record = XLogReadRecord(xlogreader, &errordata); if (record == NULL) { @@ -161,11 +161,12 @@ ReadNextXLogRecord(XLogReaderState *xlogreader) if (private_data->end_of_wal) return NULL; - if (errormsg) + if (errordata.message) ereport(ERROR, (errcode_for_file_access(), errmsg("could not read WAL at %X/%X: %s", - LSN_FORMAT_ARGS(xlogreader->EndRecPtr), errormsg))); + LSN_FORMAT_ARGS(xlogreader->EndRecPtr), + errordata.message))); else ereport(ERROR, (errcode_for_file_access(), @@ -384,7 +385,7 @@ GetWALBlockInfo(FunctionCallInfo fcinfo, XLogReaderState *record, if (!RestoreBlockImage(record, block_id, page)) ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), - errmsg_internal("%s", record->errormsg_buf))); + errmsg_internal("%s", record->errordata.message))); block_fpi_data = (bytea *) palloc(BLCKSZ + VARHDRSZ); SET_VARSIZE(block_fpi_data, BLCKSZ + VARHDRSZ); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index b5bbdd1608..35cb4b82f4 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -3080,6 +3080,8 @@ XLogPageReadResult XLogPrefetchStats XLogPrefetcher XLogPrefetcherFilter +XLogReaderError +XLogReaderErrorCode XLogReaderRoutine XLogReaderState XLogRecData -- 2.40.1
From 619c8bc3c2bd3a2cf24a283dcfc9666b9769b50c Mon Sep 17 00:00:00 2001 From: Michael Paquier <mich...@paquier.xyz> Date: Tue, 26 Sep 2023 15:23:37 +0900 Subject: [PATCH v4 2/3] Make WAL replay more robust on OOM failures This takes advantage of the new error facility for WAL readers, allowing WAL replay to loop when an out-of-memory happens when reading a record. This was the origin of potential data loss scenarios, making crash recovery more robust by acting the same way as a standby here: each time a record cannot be read because of an OOM, loop and try to read again the record. --- src/backend/access/transam/xlogrecovery.c | 75 ++++++++++++++++------- 1 file changed, 52 insertions(+), 23 deletions(-) diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index 68100bfa4a..ed5ac06938 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -3067,29 +3067,50 @@ ReadRecord(XLogPrefetcher *xlogprefetcher, int emode, record = XLogPrefetcherReadRecord(xlogprefetcher, &errordata); if (record == NULL) { - /* - * When we find that WAL ends in an incomplete record, keep track - * of that record. After recovery is done, we'll write a record - * to indicate to downstream WAL readers that that portion is to - * be ignored. - * - * However, when ArchiveRecoveryRequested = true, we're going to - * switch to a new timeline at the end of recovery. We will only - * copy WAL over to the new timeline up to the end of the last - * complete record, so if we did this, we would later create an - * overwrite contrecord in the wrong place, breaking everything. - */ - if (!ArchiveRecoveryRequested && - !XLogRecPtrIsInvalid(xlogreader->abortedRecPtr)) + switch (errordata.code) { - abortedRecPtr = xlogreader->abortedRecPtr; - missingContrecPtr = xlogreader->missingContrecPtr; - } + case XLOG_READER_NO_ERROR: + /* Possible when XLogPageRead() has failed */ + Assert(!errordata.message); + /* FALLTHROUGH */ - if (readFile >= 0) - { - close(readFile); - readFile = -1; + case XLOG_READER_INVALID_DATA: + + /* + * When we find that WAL ends in an incomplete record, + * keep track of that record. After recovery is done, + * we'll write a record to indicate to downstream WAL + * readers that that portion is to be ignored. + * + * However, when ArchiveRecoveryRequested = true, we're + * going to switch to a new timeline at the end of + * recovery. We will only copy WAL over to the new + * timeline up to the end of the last complete record, so + * if we did this, we would later create an overwrite + * contrecord in the wrong place, breaking everything. + */ + if (!ArchiveRecoveryRequested && + !XLogRecPtrIsInvalid(xlogreader->abortedRecPtr)) + { + abortedRecPtr = xlogreader->abortedRecPtr; + missingContrecPtr = xlogreader->missingContrecPtr; + } + + if (readFile >= 0) + { + close(readFile); + readFile = -1; + } + break; + case XLOG_READER_OOM: + + /* + * If we failed because of an out-of-memory problem, just + * give up and retry recovery later. It may be posible + * that the WAL record to decode required a larger memory + * allocation than what the host can offer. + */ + break; } /* @@ -3147,9 +3168,12 @@ ReadRecord(XLogPrefetcher *xlogprefetcher, int emode, * WAL from the archive, even if pg_wal is completely empty, but * we'd have no idea how far we'd have to replay to reach * consistency. So err on the safe side and give up. + * + * It may be possible that the record was not decoded because of + * an out-of-memory failure. In this case, just loop. */ if (!InArchiveRecovery && ArchiveRecoveryRequested && - !fetching_ckpt) + !fetching_ckpt && errordata.code != XLOG_READER_OOM) { ereport(DEBUG1, (errmsg_internal("reached end of WAL in pg_wal, entering archive recovery"))); @@ -3173,9 +3197,14 @@ ReadRecord(XLogPrefetcher *xlogprefetcher, int emode, continue; } - /* In standby mode, loop back to retry. Otherwise, give up. */ + /* + * In standby mode or if the WAL record failed on an + * out-of-memory, loop back and retry. Otherwise, give up. + */ if (StandbyMode && !CheckForStandbyTrigger()) continue; + else if (errordata.code == XLOG_READER_OOM) + continue; else return NULL; } -- 2.40.1
From 146d50748f8a0c308ac39ea20b88a3b289c8d269 Mon Sep 17 00:00:00 2001 From: Michael Paquier <mich...@paquier.xyz> Date: Tue, 26 Sep 2023 15:23:50 +0900 Subject: [PATCH v4 3/3] Tweak to force OOM behavior when replaying records --- src/backend/access/transam/xlogreader.c | 31 +++++++++++++++++++++---- 1 file changed, 27 insertions(+), 4 deletions(-) diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index fd1413b6d3..854f584e30 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -541,6 +541,7 @@ XLogDecodeNextRecord(XLogReaderState *state, bool nonblocking) int readOff; DecodedXLogRecord *decoded; XLogReaderError errordata = {0}; /* not used */ + bool trigger_oom = false; /* * randAccess indicates whether to verify the previous-record pointer of @@ -690,7 +691,29 @@ restart: decoded = XLogReadRecordAlloc(state, total_len, false /* allow_oversized */ ); - if (decoded == NULL && nonblocking) + +#ifndef FRONTEND + /* + * Trick to emulate an OOM after a hardcoded number of records + * replayed. + */ + { + struct stat fstat; + static int counter = 0; + if (stat("/tmp/xlogreader_oom", &fstat) == 0) + { + counter++; + if (counter >= 100) + { + trigger_oom = true; + /* Reset counter, to not fail when shutting down WAL */ + counter = 0; + } + } + } +#endif + + if ((decoded == NULL || trigger_oom) && nonblocking) { /* * There is no space in the circular decode buffer, and the caller is @@ -833,7 +856,7 @@ restart: Assert(gotlen <= lengthof(save_copy)); Assert(gotlen <= state->readRecordBufSize); memcpy(save_copy, state->readRecordBuf, gotlen); - if (!allocate_recordbuf(state, total_len)) + if (!allocate_recordbuf(state, total_len) || trigger_oom) { /* We treat this as an out-of-memory error */ report_invalid_record(state, @@ -891,13 +914,13 @@ restart: * If we got here without a DecodedXLogRecord, it means we needed to * validate total_len before trusting it, but by now now we've done that. */ - if (decoded == NULL) + if (decoded == NULL || trigger_oom) { Assert(!nonblocking); decoded = XLogReadRecordAlloc(state, total_len, true /* allow_oversized */ ); - if (decoded == NULL) + if (decoded == NULL || trigger_oom) { /* * We failed to allocate memory for an oversized record. As -- 2.40.1
#!/bin/bash killall -9 postgres # Setup Postgres rm -rf /tmp/data initdb -D /tmp/data cat <<EOL >> /tmp/data/postgresql.conf logging_collector = on log_min_messages = debug1 log_min_error_statement = debug1 #log_line_prefix = '%m [%p]: [%l-1] db=%d,user=%u,app=%a,client=%h ' recovery_prefetch = off EOL pg_ctl start -D /tmp/data createdb $USER psql <<EOL CREATE TABLE aa (a int); CHECKPOINT; INSERT INTO aa VALUES (1); -- Generate enough records to trigger the OOM counter in xlogreader.c. DO \$\$ BEGIN FOR i IN 1..1000 LOOP EXECUTE 'SELECT pg_logical_emit_message(true, ''foo'', ''bar'');'; END LOOP; END; \$\$; INSERT INTO aa VALUES (2); SELECT pg_current_wal_lsn(); EOL # Force OOM at recovery. killall -9 postgres touch /tmp/xlogreader_oom # Startup bumps on OOM, succeeds earlier, but it should not. pg_ctl start -D /tmp/data # This loses some data, only one record is returned.. psql -c 'table aa' rm /tmp/xlogreader_oom
signature.asc
Description: PGP signature