From 56855e25fc9e21a86c21a38328736ad727797d05 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Tue, 7 Mar 2023 06:51:40 +0000
Subject: [PATCH v8] Improve WALRead() to suck data directly from WAL buffers

---
 src/backend/access/transam/xlog.c       | 145 ++++++++++++++++++++++++
 src/backend/access/transam/xlogreader.c |  42 ++++++-
 src/include/access/xlog.h               |   6 +
 3 files changed, 191 insertions(+), 2 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 543d4d897a..9dd97a66d3 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -1639,6 +1639,151 @@ GetXLogBuffer(XLogRecPtr ptr, TimeLineID tli)
 	return cachedPos + ptr % XLOG_BLCKSZ;
 }
 
+/*
+ * Read WAL from WAL buffers.
+ *
+ * Read 'count' bytes of WAL from WAL buffers into 'buf', starting at location
+ * 'startptr', on timeline 'tli' and return total read bytes.
+ *
+ * Note that this function reads as much as it can from WAL buffers, meaning,
+ * it may not read all the requested 'count' bytes. Caller must be aware of
+ * this and deal with it.
+ */
+Size
+XLogReadFromBuffers(XLogReaderState *state PG_USED_FOR_ASSERTS_ONLY,
+					XLogRecPtr startptr,
+					TimeLineID tli,
+					Size count,
+					char *buf)
+{
+	XLogRecPtr	ptr;
+	Size    nbytes;
+	Size	ntotal;
+
+	Assert(!XLogRecPtrIsInvalid(startptr));
+	Assert(count > 0);
+	Assert(startptr <= GetFlushRecPtr(NULL));
+	Assert(!RecoveryInProgress());
+	Assert(tli == GetWALInsertionTimeLine());
+
+	ptr = startptr;
+	nbytes = count;
+	ntotal = 0;
+
+	/*
+	 * Holding WALBufMappingLock ensures inserters don't overwrite this value
+	 * while we are reading it. We try to acquire it in shared mode so that the
+	 * concurrent WAL readers are also allowed. We try to do as less work as
+	 * possible while holding the lock to not impact concurrent WAL writers
+	 * much. We quickly exit to not cause any contention, if the lock isn't
+	 * immediately available.
+	 */
+	if (!LWLockConditionalAcquire(WALBufMappingLock, LW_SHARED))
+		return ntotal;
+
+	while (nbytes > 0)
+	{
+		XLogRecPtr origptr;
+		XLogRecPtr	expectedEndPtr;
+		XLogRecPtr	endptr;
+		int 	idx;
+		char	*page;
+		char    *data;
+		XLogPageHeader	phdr;
+
+		origptr = ptr;
+		idx = XLogRecPtrToBufIdx(ptr);
+		expectedEndPtr = ptr;
+		expectedEndPtr += XLOG_BLCKSZ - ptr % XLOG_BLCKSZ;
+		endptr = XLogCtl->xlblocks[idx];
+
+		/*
+		 * Requested WAL isn't available in WAL buffers, so return with what we
+		 * have read so far.
+		 */
+		if (expectedEndPtr != endptr)
+			break;
+
+		/*
+		 * We found WAL buffer page containing given XLogRecPtr. Get starting
+		 * address of the page and a pointer to the right location of given
+		 * XLogRecPtr in that page.
+		 */
+		page = XLogCtl->pages + idx * (Size) XLOG_BLCKSZ;
+		data = page + ptr % XLOG_BLCKSZ;
+
+		/* Read what is wanted, not the whole page. */
+		if ((data + nbytes) <= (page + XLOG_BLCKSZ))
+		{
+			/* All the bytes are in one page. */
+			memcpy(buf, data, nbytes);
+			ntotal += nbytes;
+			nbytes = 0;
+		}
+		else
+		{
+			Size	nread;
+
+			/*
+			 * All the bytes are not in one page. Read available bytes on the
+			 * current page, copy them over to output buffer and continue to
+			 * read remaining bytes.
+			 */
+			nread = XLOG_BLCKSZ - (data - page);
+			Assert(nread > 0 && nread <= nbytes);
+			memcpy(buf, data, nread);
+			ptr += nread;
+			nbytes -= nread;
+			buf += nread;
+			ntotal += nread;
+		}
+
+		/*
+		 * The fact that we acquire WALBufMappingLock while reading the WAL
+		 * buffer page itself guarantees that no one else initializes it or
+		 * makes it ready for next use in AdvanceXLInsertBuffer(). However, we
+		 * need to ensure that we are not reading a page that just got
+		 * initialized. For this, we looka at the needed page header.
+		 */
+		phdr = (XLogPageHeader) page;
+
+		/*
+		 * Check if WAL buffer page looks valid. If it doesn't, return with
+		 * what we have read so far.
+		 */
+		if (!(phdr->xlp_magic == XLOG_PAGE_MAGIC &&
+			  phdr->xlp_pageaddr == (origptr - (origptr % XLOG_BLCKSZ)) &&
+			  phdr->xlp_tli == tli))
+			break;
+
+		/*
+		 * Note that we don't perform all page header checks here to avoid
+		 * extra work in production builds, callers will anyway do those checks
+		 * extensively. However, in an assert-enabled build, we perform all the
+		 * checks here and raise an error if failed.
+		 */
+#ifdef USE_ASSERT_CHECKING
+		if (state != NULL &&
+			!XLogReaderValidatePageHeader(state, (endptr - XLOG_BLCKSZ),
+										  (char *) phdr))
+			ereport(ERROR,
+					(errcode(ERRCODE_INTERNAL_ERROR),
+					 errmsg_internal("error while reading WAL from WAL buffers: %s", state->errormsg_buf)));
+#endif
+	}
+
+	LWLockRelease(WALBufMappingLock);
+
+	/* We never read more than what the caller has asked for. */
+	Assert(ntotal <= count);
+
+	ereport(DEBUG1,
+			(errmsg_internal("read %zu bytes out of %zu bytes from WAL buffers for given LSN %X/%X, Timeline ID %u",
+							 ntotal, count, LSN_FORMAT_ARGS(startptr), tli)));
+
+	return ntotal;
+}
+
 /*
  * Converts a "usable byte position" to XLogRecPtr. A usable byte position
  * is the position starting from the beginning of WAL, excluding all WAL
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index cadea21b37..03f0cca1e6 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -1486,8 +1486,7 @@ err:
  * Returns true if succeeded, false if an error occurs, in which case
  * 'errinfo' receives error details.
  *
- * XXX probably this should be improved to suck data directly from the
- * WAL buffers when possible.
+ * When possible, this function reads data directly from WAL buffers.
  */
 bool
 WALRead(XLogReaderState *state,
@@ -1498,6 +1497,45 @@ WALRead(XLogReaderState *state,
 	XLogRecPtr	recptr;
 	Size		nbytes;
 
+#ifndef FRONTEND
+	/* Frontend tools have no idea of WAL buffers. */
+	Size        nread;
+
+	/*
+	 * Try reading WAL from WAL buffers. We skip this step and continue the
+	 * usual way, that is to read from WAL file, either when server is in
+	 * recovery (standby mode, archive or crash recovery), in which case the
+	 * WAL buffers are not used or when the server is inserting in a different
+	 * timeline from that of the timeline that we're trying to read WAL from.
+	 */
+	if (!RecoveryInProgress() &&
+		tli == GetWALInsertionTimeLine())
+	{
+		nread = XLogReadFromBuffers(state, startptr, tli, count, buf);
+
+		Assert(nread >= 0);
+
+		/*
+		 * Check if we have read fully (hit), partially (partial hit) or
+		 * nothing (miss) from WAL buffers. If we have read either partially or
+		 * nothing, then continue to read the remaining bytes the usual way,
+		 * that is, read from WAL file.
+		 */
+		if (count == nread)
+			return true;	/* Buffer hit, so return. */
+		else if (count > nread)
+		{
+			/*
+			 * Buffer partial hit, so reset the state to count the read bytes
+			 * and continue.
+			 */
+			buf += nread;
+			startptr += nread;
+			count -= nread;
+		}
+	}
+#endif	/* FRONTEND */
+
 	p = buf;
 	recptr = startptr;
 	nbytes = count;
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index cfe5409738..4fdd8c8b17 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -247,6 +247,12 @@ extern XLogRecPtr GetLastImportantRecPtr(void);
 
 extern void SetWalWriterSleeping(bool sleeping);
 
+extern Size XLogReadFromBuffers(struct XLogReaderState *state,
+								XLogRecPtr startptr,
+								TimeLineID tli,
+								Size count,
+								char *buf);
+
 /*
  * Routines used by xlogrecovery.c to call back into xlog.c during recovery.
  */
-- 
2.34.1

