Hi,
Our team has previously asked on pgsql-adminpgsql-general about a standby
that is never switching to streaming replication while recovering: [1]
Our investigation has shown that this happens because often an xlog record
falls on a WAL boundary which makes a single XLogDecodeNextRecord call fetch
pages from different archives. With prefetching enabled, this causes the
following sequence of events:
1. prefetching successfully reads page 1;
2. prefetching fails to read page 2 because the corresponding WAL has not been
uploaded to the archive yet, gets XLREAD_WOULDBLOCK;
3. all of the prefetched records are decoded, recovery attempts to read the
next record;
4. recovery reads page 1 again, reinvoking restore_command.
Because only one WAL is kept open at a time, this causes PostgreSQL to fetch
one WAL from the archive twice, which can be a very slow operation if the
archive is network-attached; the latency of archive fetches may even be
significant enough that recovery never catches up to the primary. Since the
only piece of information a restore_command receives is the segment number,
it cannot distinguish this situation from the database restarting, so it can't
refuse to redownload the WAL either. We use the CloudNativePG operator, which
prefetches multiple WALs at a time and makes use of a one-off flag to stop,
but the nonmonotonicity of the segment number makes the one-off flag useless.
The attached patch fixes this situation by skipping calls to ReadPageInternal
if the required data is already present in the record reassembly buffer,
reducing the number of I/O operations during recovery and ensuring that
restore_command is only executed with monotonically increasing segment
numbers during a single recovery run.
The patch is for the current master branch, but the nonmonotonicity has
been present since at least v15. I don't know if it makes sense to backport
the patch, since it's technically merely a performance improvement? I'm not
sure on how to regression test this either, but the code passes all existing
regression tests and I ran the manual reproduction to confirm that the issue
we've observed has been eliminated.
[1]
https://postgr.es/m/CANOng2i1G_57nvZ4ip4uKKU87jtt%2BfzqWUFV_ou6L8N3bteSXQ%40mail.gmail.com
// Sonya Valchuk
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 5e5001b..fe337e2 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -586,53 +586,72 @@ restart:
state->nonblocking = nonblocking;
state->currRecPtr = RecPtr;
assembled = false;
+ gotheader = false;
targetPagePtr = RecPtr - (RecPtr % XLOG_BLCKSZ);
targetRecOff = RecPtr % XLOG_BLCKSZ;
/*
- * Read the page containing the record into state->readBuf. Request enough
- * byte to cover the whole record header, or at least the part of it that
- * fits on the same page.
+ * If this record spans multiple pages, it's possible that the first
+ * ReadPageInternal call succeeded while one of the following returned
+ * XLREAD_WOUDBLOCK. In that case, we already have the data from some
+ * of the calls in state->readRecordBuf, so skip re-reading it.
+ * This is especially important if the pages are in different WALs,
+ * in which case going back to a previous WAL might require re-fetching
+ * it from an archive - a potentially slow operation.
*/
- readOff = ReadPageInternal(state, targetPagePtr,
- Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ));
- if (readOff == XLREAD_WOULDBLOCK)
- return XLREAD_WOULDBLOCK;
- else if (readOff < 0)
- goto err;
-
- /*
- * ReadPageInternal always returns at least the page header, so we can
- * examine it now.
- */
- pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf);
- if (targetRecOff == 0)
- {
+ if (state->readRecordBufUsed == 0) {
/*
- * At page start, so skip over page header.
+ * Read the page containing the record into state->readBuf. Request
+ * enough bytes to cover the whole record header, or at least the part
+ * of it that fits on the same page.
*/
- RecPtr += pageHeaderSize;
- targetRecOff = pageHeaderSize;
- }
- else if (targetRecOff < pageHeaderSize)
- {
- report_invalid_record(state, "invalid record offset at %X/%08X: expected at least %u, got %u",
- LSN_FORMAT_ARGS(RecPtr),
- pageHeaderSize, targetRecOff);
- goto err;
- }
+ readOff = ReadPageInternal(state, targetPagePtr,
+ Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ));
+ if (readOff == XLREAD_WOULDBLOCK)
+ return XLREAD_WOULDBLOCK;
+ else if (readOff < 0)
+ goto err;
- if ((((XLogPageHeader) state->readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) &&
- targetRecOff == pageHeaderSize)
- {
- report_invalid_record(state, "contrecord is requested by %X/%08X",
- LSN_FORMAT_ARGS(RecPtr));
- goto err;
- }
+ /*
+ * ReadPageInternal always returns at least the page header, so we can
+ * examine it now.
+ */
+ pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf);
+ if (targetRecOff == 0)
+ {
+ /*
+ * At page start, so skip over page header.
+ */
+ RecPtr += pageHeaderSize;
+ targetRecOff = pageHeaderSize;
+ }
+ else if (targetRecOff < pageHeaderSize)
+ {
+ report_invalid_record(state,
+ "invalid record offset at %X/%08X: expected at least %u, got %u",
+ LSN_FORMAT_ARGS(RecPtr),
+ pageHeaderSize, targetRecOff);
+ goto err;
+ }
- /* ReadPageInternal has verified the page header */
- Assert(pageHeaderSize <= readOff);
+ if ((((XLogPageHeader) state->readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) &&
+ targetRecOff == pageHeaderSize)
+ {
+ report_invalid_record(state, "contrecord is requested by %X/%08X",
+ LSN_FORMAT_ARGS(RecPtr));
+ goto err;
+ }
+
+ /* ReadPageInternal has verified the page header */
+ Assert(pageHeaderSize <= readOff);
+
+ record = (XLogRecord *) (state->readBuf + RecPtr % XLOG_BLCKSZ);
+ } else {
+ record = (XLogRecord *) state->readRecordBuf;
+ /* If we have the header, then it has already been validated. */
+ gotheader = state->readRecordBufUsed >= SizeOfXLogRecord;
+ }
/*
* Read the record length.
@@ -643,28 +662,25 @@ restart:
* cannot access any other fields until we've verified that we got the
* whole header.
*/
- record = (XLogRecord *) (state->readBuf + RecPtr % XLOG_BLCKSZ);
total_len = record->xl_tot_len;
- /*
- * If the whole record header is on this page, validate it immediately.
- * Otherwise do just a basic sanity check on xl_tot_len, and validate the
- * rest of the header after reading it from the next page. The xl_tot_len
- * check is necessary here to ensure that we enter the "Need to reassemble
- * record" code path below; otherwise we might fail to apply
- * ValidXLogRecordHeader at all.
- */
- if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord)
- {
- if (!ValidXLogRecordHeader(state, RecPtr, state->DecodeRecPtr, record,
- randAccess))
- goto err;
- gotheader = true;
- }
- else
- {
- /* There may be no next page if it's too small. */
- if (total_len < SizeOfXLogRecord)
+ if (!gotheader) {
+ /*
+ * If the whole record header is on this page, validate it immediately.
+ * Otherwise do just a basic sanity check on xl_tot_len, and validate
+ * the rest of the header after reading it from the next page. The
+ * xl_tot_len check is necessary here to ensure that we enter the "Need
+ * to reassemble record" code path below; otherwise we might fail to
+ * apply ValidXLogRecordHeader at all.
+ */
+ if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord)
+ {
+ if (!ValidXLogRecordHeader(state, RecPtr, state->DecodeRecPtr,
+ record, randAccess))
+ goto err;
+ gotheader = true;
+ }
+ else if (total_len < SizeOfXLogRecord)
{
report_invalid_record(state,
"invalid record length at %X/%08X: expected at least %u, got %u",
@@ -672,8 +688,6 @@ restart:
(uint32) SizeOfXLogRecord, total_len);
goto err;
}
- /* We'll validate the header once we have the next page. */
- gotheader = false;
}
/*
@@ -712,11 +726,20 @@ restart:
Assert(state->readRecordBufSize >= XLOG_BLCKSZ * 2);
Assert(state->readRecordBufSize >= len);
- /* Copy the first fragment of the record from the first page. */
- memcpy(state->readRecordBuf,
- state->readBuf + RecPtr % XLOG_BLCKSZ, len);
- buffer = state->readRecordBuf + len;
- gotlen = len;
+ if (state->readRecordBufUsed == 0) {
+ /* Copy the first fragment of the record from the first page. */
+ memcpy(state->readRecordBuf,
+ state->readBuf + RecPtr % XLOG_BLCKSZ, len);
+ state->readRecordBufUsed = len;
+ state->readRecordBufPage = targetPagePtr;
+ buffer = state->readRecordBuf + len;
+ gotlen = len;
+ record = (XLogRecord *) state->readRecordBuf;
+ } else {
+ gotlen = state->readRecordBufUsed;
+ buffer = state->readRecordBuf + gotlen;
+ targetPagePtr = state->readRecordBufPage;
+ }
do
{
@@ -749,6 +772,7 @@ restart:
if (pageHeader->xlp_info & XLP_FIRST_IS_OVERWRITE_CONTRECORD)
{
state->overwrittenRecPtr = RecPtr;
+ state->readRecordBufUsed = 0;
RecPtr = targetPagePtr;
goto restart;
}
@@ -839,10 +863,12 @@ restart:
memcpy(state->readRecordBuf, save_copy, gotlen);
buffer = state->readRecordBuf + gotlen;
}
+
+ state->readRecordBufUsed = gotlen;
+ state->readRecordBufPage = targetPagePtr;
} while (gotlen < total_len);
Assert(gotheader);
- record = (XLogRecord *) state->readRecordBuf;
if (!ValidXLogRecord(state, record, RecPtr))
goto err;
@@ -870,6 +896,8 @@ restart:
state->DecodeRecPtr = RecPtr;
}
+ state->readRecordBufUsed = 0;
+
/*
* Special processing if it's an XLOG SWITCH record
*/
@@ -1126,6 +1154,7 @@ XLogReaderInvalReadState(XLogReaderState *state)
state->seg.ws_segno = 0;
state->segoff = 0;
state->readLen = 0;
+ state->readRecordBufUsed = 0;
}
/*
@@ -1626,6 +1655,7 @@ ResetDecoder(XLogReaderState *state)
state->decode_queue_tail = NULL;
state->decode_queue_head = NULL;
state->record = NULL;
+ state->readRecordBufUsed = 0;
/* Reset the decode buffer to empty. */
state->decode_buffer_tail = state->decode_buffer;
diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index ae2398d..d4e6820 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -3167,9 +3167,6 @@ ReadRecord(XLogPrefetcher *xlogprefetcher, int emode,
private->randAccess = !XLogRecPtrIsValid(xlogreader->ReadRecPtr);
private->replayTLI = replayTLI;
- /* This is the first attempt to read this page. */
- lastSourceFailed = false;
-
for (;;)
{
char *errormsg;
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index dfabbbd..1b30be4 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -304,7 +304,9 @@ struct XLogReaderState
* crosses a page boundary.
*/
char *readRecordBuf;
- uint32 readRecordBufSize;
+ uint32 readRecordBufSize;
+ uint32 readRecordBufUsed;
+ XLogRecPtr readRecordBufPage;
/* Buffer to hold error message */
char *errormsg_buf;