From 047c58df6aaae586efe58b6a4068b17f25976b0a Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Mon, 26 Dec 2022 08:36:28 +0000
Subject: [PATCH v2] Improve WALRead() to suck data directly from WAL buffers
 when possible

---
 src/backend/access/transam/xlog.c       | 154 ++++++++++++++++++++++++
 src/backend/access/transam/xlogreader.c |  47 +++++++-
 src/include/access/xlog.h               |   6 +
 3 files changed, 205 insertions(+), 2 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 91473b00d9..c3138493be 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -689,6 +689,7 @@ static bool ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos,
 							  XLogRecPtr *PrevPtr);
 static XLogRecPtr WaitXLogInsertionsToFinish(XLogRecPtr upto);
 static char *GetXLogBuffer(XLogRecPtr ptr, TimeLineID tli);
+static char *GetXLogBufferForRead(XLogRecPtr ptr, TimeLineID tli, char *page);
 static XLogRecPtr XLogBytePosToRecPtr(uint64 bytepos);
 static XLogRecPtr XLogBytePosToEndRecPtr(uint64 bytepos);
 static uint64 XLogRecPtrToBytePos(XLogRecPtr ptr);
@@ -1639,6 +1640,159 @@ GetXLogBuffer(XLogRecPtr ptr, TimeLineID tli)
 	return cachedPos + ptr % XLOG_BLCKSZ;
 }
 
+/*
+ * Get the WAL buffer page containing passed in WAL record and also return the
+ * record's location within that buffer page.
+ */
+static char *
+GetXLogBufferForRead(XLogRecPtr ptr, TimeLineID tli, char *page)
+{
+	XLogRecPtr	expectedEndPtr;
+	XLogRecPtr	endptr;
+	int 	idx;
+	char    *recptr = NULL;
+
+	idx = XLogRecPtrToBufIdx(ptr);
+	expectedEndPtr = ptr;
+	expectedEndPtr += XLOG_BLCKSZ - ptr % XLOG_BLCKSZ;
+
+	/*
+	 * Try to acquire WALBufMappingLock in shared mode so that the other
+	 * concurrent WAL readers are also allowed. We try to do as less work as
+	 * possible while holding the lock as it might impact concurrent WAL
+	 * writers.
+	 *
+	 * If we cannot immediately acquire the lock, meaning the lock was busy,
+	 * then exit quickly to not cause any contention. The caller can then
+	 * fallback to reading WAL from WAL file.
+	 */
+	if (!LWLockConditionalAcquire(WALBufMappingLock, LW_SHARED))
+		return recptr;
+
+	/*
+	 * Holding WALBufMappingLock ensures inserters don't overwrite this value
+	 * while we are reading it.
+	 */
+	endptr = XLogCtl->xlblocks[idx];
+
+	if (expectedEndPtr == endptr)
+	{
+		XLogPageHeader phdr;
+
+		/*
+		 * We have found the WAL buffer page holding the given LSN. Read from a
+		 * pointer to the right offset within the page.
+		 */
+		memcpy(page, (XLogCtl->pages + idx * (Size) XLOG_BLCKSZ),
+			   (Size) XLOG_BLCKSZ);
+
+		/*
+		 * Release the lock as early as possible to avoid creating any possible
+		 * contention.
+		 */
+		LWLockRelease(WALBufMappingLock);
+
+		/*
+		 * The fact that we acquire WALBufMappingLock while reading the WAL
+		 * buffer page itself guarantees that no one else initializes it or
+		 * makes it ready for next use in AdvanceXLInsertBuffer().
+		 *
+		 * However, we perform basic page header checks for ensuring that we
+		 * are not reading a page that got just initialized. The callers will
+		 * anyway perform extensive page-level and record-level checks.
+		 */
+		phdr = (XLogPageHeader) page;
+
+		if (phdr->xlp_magic == XLOG_PAGE_MAGIC &&
+			phdr->xlp_pageaddr == (ptr - (ptr % XLOG_BLCKSZ)) &&
+			phdr->xlp_tli == tli)
+		{
+			/*
+			 * Page looks valid, so return the page and the requested record's
+			 * LSN.
+			 */
+			recptr = page + ptr % XLOG_BLCKSZ;
+		}
+	}
+	else
+	{
+		/* We have found nothing. */
+		LWLockRelease(WALBufMappingLock);
+	}
+
+	return recptr;
+}
+
+/*
+ * When possible, read WAL starting at 'startptr' of size 'count' bytes from
+ * WAL buffers into buffer passed in by the caller 'buf'. Read as much WAL as
+ * possible from the WAL buffers, remaining WAL, if any, the caller will take
+ * care of reading from WAL files directly.
+ *
+ * This function sets read bytes to 'read_bytes'.
+ */
+void
+XLogReadFromBuffers(XLogRecPtr startptr,
+					TimeLineID tli,
+					Size count,
+					char *buf,
+					Size *read_bytes)
+{
+	XLogRecPtr	ptr;
+	char    *dst;
+	Size    nbytes;
+
+	Assert(!XLogRecPtrIsInvalid(startptr));
+	Assert(count > 0);
+	Assert(startptr <= GetFlushRecPtr(NULL));
+	Assert(!RecoveryInProgress());
+
+	ptr = startptr;
+	nbytes = count;
+	dst = buf;
+	*read_bytes = 0;
+
+	while (nbytes > 0)
+	{
+		char 	page[XLOG_BLCKSZ] = {0};
+		char    *recptr;
+
+		recptr = GetXLogBufferForRead(ptr, tli, page);
+
+		if (recptr == NULL)
+			break;
+
+		if ((recptr + nbytes) <= (page + XLOG_BLCKSZ))
+		{
+			/* All the bytes are in one page. */
+			memcpy(dst, recptr, nbytes);
+			dst += nbytes;
+			*read_bytes += nbytes;
+			ptr += nbytes;
+			nbytes = 0;
+		}
+		else if ((recptr + nbytes) > (page + XLOG_BLCKSZ))
+		{
+			/* All the bytes are not in one page. */
+			Size bytes_remaining;
+
+			/*
+			 * Compute the remaining bytes on the current page, copy them over
+			 * to output buffer and move forward to read further.
+			 */
+			bytes_remaining = XLOG_BLCKSZ - (recptr - page);
+			memcpy(dst, recptr, bytes_remaining);
+			dst += bytes_remaining;
+			nbytes -= bytes_remaining;
+			*read_bytes += bytes_remaining;
+			ptr += bytes_remaining;
+		}
+	}
+
+	elog(DEBUG1, "read %zu bytes out of %zu bytes from WAL buffers for given LSN %X/%X, Timeline ID %u",
+		 *read_bytes, count, LSN_FORMAT_ARGS(startptr), tli);
+}
+
 /*
  * Converts a "usable byte position" to XLogRecPtr. A usable byte position
  * is the position starting from the beginning of WAL, excluding all WAL
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index a38a80e049..4a2e7af169 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -1485,8 +1485,7 @@ err:
  * Returns true if succeeded, false if an error occurs, in which case
  * 'errinfo' receives error details.
  *
- * XXX probably this should be improved to suck data directly from the
- * WAL buffers when possible.
+ * When possible, this function reads data directly from WAL buffers.
  */
 bool
 WALRead(XLogReaderState *state,
@@ -1497,6 +1496,50 @@ WALRead(XLogReaderState *state,
 	XLogRecPtr	recptr;
 	Size		nbytes;
 
+#ifndef FRONTEND
+	/* Frontend tools have no idea of WAL buffers. */
+	Size        read_bytes;
+
+	/*
+	 * When possible, read WAL from WAL buffers. We skip this step and continue
+	 * the usual way, that is to read from WAL file, either when the server is
+	 * in recovery (standby mode, archive or crash recovery), in which case the
+	 * WAL buffers are not used or when the server is inserting in a different
+	 * timeline from that of the timeline that we're trying to read WAL from.
+	 */
+	if (!RecoveryInProgress() &&
+		tli == GetWALInsertionTimeLine())
+	{
+		pgstat_report_wait_start(WAIT_EVENT_WAL_READ);
+		XLogReadFromBuffers(startptr, tli, count, buf, &read_bytes);
+		pgstat_report_wait_end();
+
+		/*
+		 * Check if we have read fully (hit), partially (partial hit) or
+		 * nothing (miss) from WAL buffers. If we have read either partially or
+		 * nothing, then continue to read the remaining bytes the usual way,
+		 * that is, read from WAL file.
+		 */
+		if (count == read_bytes)
+		{
+			/* Buffer hit, so return. */
+			return true;
+		}
+		else if (read_bytes > 0 && count > read_bytes)
+		{
+			/*
+			 * Buffer partial hit, so reset the state to count the read bytes
+			 * and continue.
+			 */
+			buf += read_bytes;
+			startptr += read_bytes;
+			count -= read_bytes;
+		}
+
+		/* Buffer miss i.e., read_bytes = 0, so continue */
+	}
+#endif	/* FRONTEND */
+
 	p = buf;
 	recptr = startptr;
 	nbytes = count;
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 1fbd48fbda..f4e1c46b23 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -247,6 +247,12 @@ extern XLogRecPtr GetLastImportantRecPtr(void);
 
 extern void SetWalWriterSleeping(bool sleeping);
 
+extern void XLogReadFromBuffers(XLogRecPtr startptr,
+								TimeLineID tli,
+								Size count,
+								char *buf,
+								Size *read_bytes);
+
 /*
  * Routines used by xlogrecovery.c to call back into xlog.c during recovery.
  */
-- 
2.34.1

