Attached 2 patches. Per Andres's suggestion, 0001 adds an: Assert(startptr + count <= LogwrtResult.Write)
Though if we want to allow the caller (e.g. in an extension) to determine the valid range, perhaps using WaitXLogInsertionsToFinish(), then the check is wrong. Maybe we should just get rid of that code entirely and trust the caller to request a reasonable range? On Mon, 2024-02-12 at 17:33 -0800, Jeff Davis wrote: > That makes me wonder whether my previous idea[1] might matter: when > some buffers have been evicted, should WALReadFromBuffers() keep > going > through the loop and return the end portion of the requested data > rather than the beginning? > [1] > https://www.postgresql.org/message-id/2b36bf99e762e65db0dafbf8d338756cf5fa6ece.ca...@j-davis.com 0002 is to illustrate the above idea. It's a strange API so I don't intend to commit it in this form, but I think we will ultimately need to do something like it when we want to replicate unflushed data. The idea is that data past the Write pointer is always (and only) available in the WAL buffers, so WALReadFromBuffers() should always return it. That way we can always safely fall through to ordinary WALRead(), which can only see before the Write pointer. There's also data before the Write pointer that could be in the WAL buffers, and we might as well copy that, too, if it's not evicted. If some buffers are evicted, it will fill in the *end* of the buffer, leaving a gap at the beginning. The nice thing is that if there is any gap, it will be before the Write pointer, so we can always fall back to WALRead() to fill the gap and it should always succeed. Regards, Jeff Davis
From f890362e9f5cefd04ee3f9406c7fcafb6a277e45 Mon Sep 17 00:00:00 2001 From: Jeff Davis <j...@j-davis.com> Date: Tue, 13 Feb 2024 11:17:08 -0800 Subject: [PATCH 1/2] Add assert to WALReadFromBuffers(). Per suggestion from Andres. --- src/backend/access/transam/xlog.c | 24 ++++++------------------ 1 file changed, 6 insertions(+), 18 deletions(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 4e14c242b1..50c347a679 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -1710,12 +1710,13 @@ GetXLogBuffer(XLogRecPtr ptr, TimeLineID tli) * of bytes read successfully. * * Fewer than 'count' bytes may be read if some of the requested WAL data has - * already been evicted from the WAL buffers, or if the caller requests data - * that is not yet available. + * already been evicted. * * No locks are taken. * - * The 'tli' argument is only used as a convenient safety check so that + * Caller should ensure that it reads no further than LogwrtResult.Write + * (which should have been updated by the caller when determining how far to + * read). The 'tli' argument is only used as a convenient safety check so that * callers do not read from WAL buffers on a historical timeline. */ Size @@ -1724,26 +1725,13 @@ WALReadFromBuffers(char *dstbuf, XLogRecPtr startptr, Size count, { char *pdst = dstbuf; XLogRecPtr recptr = startptr; - XLogRecPtr upto; - Size nbytes; + Size nbytes = count; if (RecoveryInProgress() || tli != GetWALInsertionTimeLine()) return 0; Assert(!XLogRecPtrIsInvalid(startptr)); - - /* - * Don't read past the available WAL data. - * - * Check using local copy of LogwrtResult. Ordinarily it's been updated by - * the caller when determining how far to read; but if not, it just means - * we'll read less data. - * - * XXX: the available WAL could be extended to the WAL insert pointer by - * calling WaitXLogInsertionsToFinish(). - */ - upto = Min(startptr + count, LogwrtResult.Write); - nbytes = upto - startptr; + Assert(startptr + count <= LogwrtResult.Write); /* * Loop through the buffers without a lock. For each buffer, atomically -- 2.34.1
From 6fc92fa74a033c881624177365afbbffc37ed873 Mon Sep 17 00:00:00 2001 From: Jeff Davis <j...@j-davis.com> Date: Tue, 13 Feb 2024 16:56:46 -0800 Subject: [PATCH 2/2] WALReadFromBuffers: read end of the requested range --- src/backend/access/transam/xlog.c | 109 ++++++++++++++++------------ src/backend/replication/walsender.c | 14 ++-- src/include/access/xlog.h | 2 +- 3 files changed, 70 insertions(+), 55 deletions(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 50c347a679..ea55e1b77b 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -1706,11 +1706,21 @@ GetXLogBuffer(XLogRecPtr ptr, TimeLineID tli) } /* - * Read WAL data directly from WAL buffers, if available. Returns the number - * of bytes read successfully. + * Read WAL data directly from WAL buffers, if available. * - * Fewer than 'count' bytes may be read if some of the requested WAL data has - * already been evicted. + * Some pages in the requested range may already be evicted from the WAL + * buffers, in which case this function continues on and reads the *end* of + * the range requested (filling in the end of the buffer rather than the + * beginning). + * + * 'count' is an in/out parameter. On return, it's updated to represent the + * range of bytes that haven't been filled in. For example: + * remaining = nbytes; + * WALReadFromBuffers(buf, startptr, &remaining, ...); + * WALRead(xlogreader, buf, startptr, remaining, ...); + * + * On return, startptr + *count <= LogwrtResult.Write, so it will always be + * safe to fall back to WALRead(). * * No locks are taken. * @@ -1719,19 +1729,20 @@ GetXLogBuffer(XLogRecPtr ptr, TimeLineID tli) * read). The 'tli' argument is only used as a convenient safety check so that * callers do not read from WAL buffers on a historical timeline. */ -Size -WALReadFromBuffers(char *dstbuf, XLogRecPtr startptr, Size count, +void +WALReadFromBuffers(char *dstbuf, XLogRecPtr startptr, Size *count, TimeLineID tli) { char *pdst = dstbuf; XLogRecPtr recptr = startptr; - Size nbytes = count; + XLogRecPtr valid = startptr; + Size nbytes = *count; if (RecoveryInProgress() || tli != GetWALInsertionTimeLine()) - return 0; + return; Assert(!XLogRecPtrIsInvalid(startptr)); - Assert(startptr + count <= LogwrtResult.Write); + Assert(startptr + *count <= LogwrtResult.Write); /* * Loop through the buffers without a lock. For each buffer, atomically @@ -1744,8 +1755,8 @@ WALReadFromBuffers(char *dstbuf, XLogRecPtr startptr, Size count, * copy. Read barriers are necessary to ensure that the data copy actually * happens between the two verification steps. * - * If either verification fails, we simply terminate the loop and return - * with the data that had been already copied out successfully. + * If either verification fails, we advance 'valid' to the next page + * boundary and continue the loop. */ while (nbytes > 0) { @@ -1753,8 +1764,6 @@ WALReadFromBuffers(char *dstbuf, XLogRecPtr startptr, Size count, int idx = XLogRecPtrToBufIdx(recptr); XLogRecPtr expectedEndPtr; XLogRecPtr endptr; - const char *page; - const char *psrc; Size npagebytes; /* @@ -1763,54 +1772,62 @@ WALReadFromBuffers(char *dstbuf, XLogRecPtr startptr, Size count, */ expectedEndPtr = recptr + (XLOG_BLCKSZ - offset); + /* determine how much data we intend to read from this page */ + npagebytes = Min(nbytes, XLOG_BLCKSZ - offset); + /* * First verification step: check that the correct page is present in * the WAL buffers. */ endptr = pg_atomic_read_u64(&XLogCtl->xlblocks[idx]); - if (expectedEndPtr != endptr) - break; - - /* - * The correct page is present (or was at the time the endptr was - * read; must re-verify later). Calculate pointer to source data and - * determine how much data to read from this page. - */ - page = XLogCtl->pages + idx * (Size) XLOG_BLCKSZ; - psrc = page + offset; - npagebytes = Min(nbytes, XLOG_BLCKSZ - offset); + if (expectedEndPtr == endptr) + { + /* + * The correct page is present (or was at the time the endptr was + * read; must re-verify later). Calculate pointer to source data. + */ + const char *page = XLogCtl->pages + idx * (Size) XLOG_BLCKSZ; + const char *psrc = page + offset; - /* - * Ensure that the data copy and the first verification step are not - * reordered. - */ - pg_read_barrier(); + /* + * Ensure that the data copy and the first verification step are + * not reordered. + */ + pg_read_barrier(); - /* data copy */ - memcpy(pdst, psrc, npagebytes); + /* data copy */ + memcpy(pdst, psrc, npagebytes); - /* - * Ensure that the data copy and the second verification step are not - * reordered. - */ - pg_read_barrier(); + /* + * Ensure that the data copy and the second verification step are + * not reordered. + */ + pg_read_barrier(); - /* - * Second verification step: check that the page we read from wasn't - * evicted while we were copying the data. - */ - endptr = pg_atomic_read_u64(&XLogCtl->xlblocks[idx]); - if (expectedEndPtr != endptr) - break; + /* + * Second verification step: check that the page we read from wasn't + * evicted while we were copying the data. + */ + endptr = pg_atomic_read_u64(&XLogCtl->xlblocks[idx]); + if (expectedEndPtr != endptr) + { + /* discard previously-copied data but keep going */ + valid = recptr + npagebytes; + } + } + else + { + /* discard previously-copied data but keep going */ + valid = recptr + npagebytes; + } pdst += npagebytes; recptr += npagebytes; nbytes -= npagebytes; } - Assert(pdst - dstbuf <= count); - - return pdst - dstbuf; + *count = valid - startptr; + Assert(startptr + *count <= LogwrtResult.Write); } /* diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 146826d5db..f50328c01d 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -2966,7 +2966,7 @@ XLogSendPhysical(void) Size nbytes; XLogSegNo segno; WALReadError errinfo; - Size rbytes; + Size bytes_remaining; /* If requested switch the WAL sender to the stopping state. */ if (got_STOPPING) @@ -3182,19 +3182,17 @@ XLogSendPhysical(void) enlargeStringInfo(&output_message, nbytes); retry: + bytes_remaining = nbytes; /* attempt to read WAL from WAL buffers first */ - rbytes = WALReadFromBuffers(&output_message.data[output_message.len], - startptr, nbytes, xlogreader->seg.ws_tli); - output_message.len += rbytes; - startptr += rbytes; - nbytes -= rbytes; + WALReadFromBuffers(&output_message.data[output_message.len], + startptr, &bytes_remaining, xlogreader->seg.ws_tli); /* now read the remaining WAL from WAL file */ - if (nbytes > 0 && + if (bytes_remaining > 0 && !WALRead(xlogreader, &output_message.data[output_message.len], startptr, - nbytes, + bytes_remaining, xlogreader->seg.ws_tli, /* Pass the current TLI because * only WalSndSegmentOpen controls * whether new TLI is needed. */ diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 76787a8267..8709e97be0 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -252,7 +252,7 @@ extern XLogRecPtr GetLastImportantRecPtr(void); extern void SetWalWriterSleeping(bool sleeping); -extern Size WALReadFromBuffers(char *dstbuf, XLogRecPtr startptr, Size count, +extern void WALReadFromBuffers(char *dstbuf, XLogRecPtr startptr, Size *count, TimeLineID tli); /* -- 2.34.1