From d41b37f65f5a8266d0e18bdfb320079a40a7999b Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Wed, 20 Dec 2023 10:00:19 +0000
Subject: [PATCH v18] Allow WAL reading from WAL buffers

This commit adds WALRead() the capability to read WAL from WAL
buffers when possible. When requested WAL isn't available in WAL
buffers, the WAL is read from the WAL file as usual.

This commit benefits the callers of WALRead(), that are walsenders
and pg_walinspect. They can now avoid reading WAL from the WAL
file (possibly avoiding disk IO). Tests show that the WAL buffers
hit ratio stood at 95% for 1 primary, 1 sync standby, 1 async
standby, with pgbench --scale=300 --client=32 --time=900. In other
words, the walsenders avoided 95% of the time reading from the
file/avoided pread system calls:
https://www.postgresql.org/message-id/CALj2ACXKKK%3DwbiG5_t6dGao5GoecMwRkhr7GjVBM_jg54%2BNa%3DQ%40mail.gmail.com

This commit also benefits when direct IO is enabled for WAL.
Reading WAL from WAL buffers puts back the performance close to
that of without direct IO for WAL:
https://www.postgresql.org/message-id/CALj2ACV6rS%2B7iZx5%2BoAvyXJaN4AG-djAQeM1mrM%3DYSDkVrUs7g%40mail.gmail.com

This commit paves the way for the following features in future:
- Improves synchronous replication performance by replicating
directly from WAL buffers.
- A opt-in way for the walreceivers to receive unflushed WAL.
More details here:
https://www.postgresql.org/message-id/20231011224353.cl7c2s222dw3de4j%40awork3.anarazel.de

Author: Bharath Rupireddy
Reviewed-by: Dilip Kumar, Andres Freund
Reviewed-by: Nathan Bossart, Kuntal Ghosh
Discussion: https://www.postgresql.org/message-id/CALj2ACXKKK%3DwbiG5_t6dGao5GoecMwRkhr7GjVBM_jg54%2BNa%3DQ%40mail.gmail.com
---
 src/backend/access/transam/xlog.c       | 189 ++++++++++++++++++++++++
 src/backend/access/transam/xlogreader.c |  47 +++++-
 src/backend/access/transam/xlogutils.c  |  11 +-
 src/backend/replication/walsender.c     |  10 +-
 src/include/access/xlog.h               |  23 +++
 5 files changed, 268 insertions(+), 12 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 56e4d6fb02..86dbea0a26 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -1704,6 +1704,195 @@ GetXLogBuffer(XLogRecPtr ptr, TimeLineID tli)
 	return cachedPos + ptr % XLOG_BLCKSZ;
 }
 
+/*
+ * Read WAL from WAL buffers.
+ *
+ * This function reads 'bytes_to_read' bytes of WAL from WAL buffers into
+ * 'buf' starting at location 'startptr' on timeline 'tli' and returns
+ * appropriate result code and fills total read bytes if any into
+ * 'bytes_read'.
+ *
+ * Points to note:
+ *
+ * - This function reads as much as it can from WAL buffers, meaning, it may
+ * not read all the requested 'bytes_to_read' bytes. Caller must be aware of
+ * this and deal with it.
+ *
+ * - This function reads WAL from WAL buffers without holding any lock. First
+ * it reads xlblocks atomically for checking page existence, then it reads the
+ * page contents, validates. Finally, it rechecks the page existence by
+ * rereading xlblocks, if the read page is replaced, it discards read page and
+ * returns.
+ *
+ * - This function is not available for frontend code as WAL buffers is an
+ * internal mechanism to the server.
+ *
+ * - Caller must look at the result code to take appropriate action such as
+ * error out on failure or emit warning or continue.
+ *
+ * - This function waits for any in-progress WAL insertions to WAL buffers to
+ * finish.
+ */
+XLogReadFromBuffersResult
+XLogReadFromBuffers(XLogRecPtr startptr,
+					TimeLineID tli,
+					Size bytes_to_read,
+					char *buf,
+					Size *bytes_read)
+{
+	XLogRecPtr	ptr;
+	Size		nbytes;
+	char	   *dst;
+	uint64		bytepos;
+	XLogReadFromBuffersResult result = XLREADBUFS_OK;
+
+	*bytes_read = 0;
+
+	/* WAL buffers aren't in use when server is in recovery. */
+	if (RecoveryInProgress())
+		return XLREADBUFS_IN_RECOVERY;
+
+	/* WAL is inserted into WAL buffers on current server's insertion TLI. */
+	if (tli != GetWALInsertionTimeLine())
+		return XLREADBUFS_NOT_INSERT_TLI;
+
+	if (XLogRecPtrIsInvalid(startptr))
+		return XLREADBUFS_INVALID_INPUT;
+
+	ptr = startptr;
+	nbytes = bytes_to_read;
+	dst = buf;
+
+	while (nbytes > 0)
+	{
+		XLogRecPtr	expectedEndPtr;
+		XLogRecPtr	endptr;
+		int			idx;
+		char	   *page;
+		char	   *data;
+		XLogPageHeader phdr;
+		Size		nread;
+		XLogRecPtr	reservedUpto;
+		XLogwrtResult LogwrtResult;
+
+		idx = XLogRecPtrToBufIdx(ptr);
+		expectedEndPtr = ptr;
+		expectedEndPtr += XLOG_BLCKSZ - ptr % XLOG_BLCKSZ;
+		endptr = pg_atomic_read_u64(&XLogCtl->xlblocks[idx]);
+
+		/* Requested WAL isn't available in WAL buffers. */
+		if (expectedEndPtr != endptr)
+			break;
+
+		/*
+		 * We found WAL buffer page containing given XLogRecPtr. Get starting
+		 * address of the page and a pointer to the right location of given
+		 * XLogRecPtr in that page.
+		 */
+		page = XLogCtl->pages + idx * (Size) XLOG_BLCKSZ;
+		data = page + ptr % XLOG_BLCKSZ;
+
+		/*
+		 * Make sure we don't read xlblocks up above before the page contents
+		 * down below.
+		 */
+		pg_read_barrier();
+
+		nread = 0;
+
+		/* Read what is wanted, not the whole page. */
+		if ((data + nbytes) <= (page + XLOG_BLCKSZ))
+		{
+			/* All the bytes are in one page. */
+			nread = nbytes;
+		}
+		else
+		{
+			/*
+			 * All the bytes are not in one page. Read available bytes on the
+			 * current page, copy them over to output buffer and continue to
+			 * read remaining bytes.
+			 */
+			nread = XLOG_BLCKSZ - (data - page);
+			Assert(nread > 0 && nread <= nbytes);
+		}
+
+		Assert(nread > 0);
+		memcpy(dst, data, nread);
+
+		/*
+		 * Make sure we don't read xlblocks down below before the page
+		 * contents up above.
+		 */
+		pg_read_barrier();
+
+		/* Recheck if the read page still exists in WAL buffers. */
+		endptr = pg_atomic_read_u64(&XLogCtl->xlblocks[idx]);
+
+		/* Return if the page got initalized while we were reading it. */
+		if (expectedEndPtr != endptr)
+			break;
+
+		/* Read the current insert position */
+		SpinLockAcquire(&XLogCtl->Insert.insertpos_lck);
+		bytepos = XLogCtl->Insert.CurrBytePos;
+		SpinLockRelease(&XLogCtl->Insert.insertpos_lck);
+
+		reservedUpto = XLogBytePosToEndRecPtr(bytepos);
+
+		/*
+		 * We can't allow WAL being read is past the current insert position
+		 * as it does not yet exist.
+		 */
+		if ((ptr + nread) > reservedUpto)
+		{
+			result = XLREADBUFS_NON_EXISTENT_WAL;
+			break;
+		}
+
+		SpinLockAcquire(&XLogCtl->info_lck);
+		LogwrtResult = XLogCtl->LogwrtResult;
+		SpinLockRelease(&XLogCtl->info_lck);
+
+		/* Wait for any in-progress WAL insertions to WAL buffers to finish. */
+		if ((ptr + nread) > LogwrtResult.Write &&
+			(ptr + nread) <= reservedUpto)
+			WaitXLogInsertionsToFinish(ptr + nread);
+
+		/*
+		 * Typically, we must not read a WAL buffer page that just got
+		 * initialized, because we waited enough for the in-progress WAL
+		 * insertions to finish above. However, there can exists a slight
+		 * window after the above wait finishes in which the read buffer page
+		 * can get replaced especially under high WAL generation rates. So,
+		 * let's not account such buffer page.
+		 */
+		phdr = (XLogPageHeader) page;
+		if (!(phdr->xlp_magic == XLOG_PAGE_MAGIC &&
+			  phdr->xlp_pageaddr == (ptr - (ptr % XLOG_BLCKSZ)) &&
+			  phdr->xlp_tli == tli))
+		{
+			result = XLREADBUFS_UNINITIALIZED_WAL;
+			break;
+		}
+
+		dst += nread;
+		ptr += nread;
+		*bytes_read += nread;
+		nbytes -= nread;
+	}
+
+	/* We never read more than what the caller has asked for. */
+	Assert(*bytes_read <= bytes_to_read);
+
+	ereport(DEBUG1,
+			errmsg_internal("read %zu bytes out of %zu bytes from WAL buffers for given start LSN %X/%X, timeline ID %u",
+							*bytes_read, bytes_to_read,
+							LSN_FORMAT_ARGS(startptr), tli));
+
+	return result;
+}
+
 /*
  * Converts a "usable byte position" to XLogRecPtr. A usable byte position
  * is the position starting from the beginning of WAL, excluding all WAL
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 6b404b8169..631bd4fe6b 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -1501,17 +1501,54 @@ err:
  * Returns true if succeeded, false if an error occurs, in which case
  * 'errinfo' receives error details.
  *
- * XXX probably this should be improved to suck data directly from the
- * WAL buffers when possible.
+ * When possible, this function reads data directly from WAL buffers. When
+ * requested WAL isn't available in WAL buffers, the WAL is read from the WAL
+ * file as usual. The callers may avoid reading WAL from the WAL file thus
+ * reducing read system calls or even disk IOs.
  */
 bool
-WALRead(XLogReaderState *state,
-		char *buf, XLogRecPtr startptr, Size count, TimeLineID tli,
-		WALReadError *errinfo)
+WALRead(XLogReaderState *state, char *buf, XLogRecPtr startptr,
+		Size count, TimeLineID tli, WALReadError *errinfo)
 {
 	char	   *p;
 	XLogRecPtr	recptr;
 	Size		nbytes;
+#ifndef FRONTEND
+	Size		nread;
+#endif
+
+#ifndef FRONTEND
+
+	/*
+	 * Try reading WAL from WAL buffers. Frontend code has no idea of WAL
+	 * buffers.
+	 */
+	(void) XLogReadFromBuffers(startptr, tli, count, buf, &nread);
+
+	if (nread > 0)
+	{
+		/*
+		 * Check if we have read fully (hit), partially (partial hit) or
+		 * nothing (miss) from WAL buffers. If we have read either partially
+		 * or nothing, then continue to read the remaining bytes the usual
+		 * way, that is, read from WAL file.
+		 *
+		 * XXX: It might be worth to expose WAL buffer read stats.
+		 */
+		if (nread == count)
+			return true;		/* Buffer hit, so return. */
+		else if (nread < count)
+		{
+			/*
+			 * Buffer partial hit, so reset the state to count the read bytes
+			 * and continue.
+			 */
+			buf += nread;
+			startptr += nread;
+			count -= nread;
+		}
+	}
+#endif
 
 	p = buf;
 	recptr = startptr;
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index 43f7b31205..057c9b4ea0 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -1007,12 +1007,13 @@ read_local_xlog_page_guts(XLogReaderState *state, XLogRecPtr targetPagePtr,
 	}
 
 	/*
-	 * Even though we just determined how much of the page can be validly read
-	 * as 'count', read the whole page anyway. It's guaranteed to be
-	 * zero-padded up to the page boundary if it's incomplete.
+	 * We determined how much of the page can be validly read as 'count', read
+	 * that much only, not the entire page. Since WALRead() can read the page
+	 * from WAL buffers, in which case, the page is not guaranteed to be
+	 * zero-padded up to the page boundary because of the concurrent
+	 * insertions.
 	 */
-	if (!WALRead(state, cur_page, targetPagePtr, XLOG_BLCKSZ, tli,
-				 &errinfo))
+	if (!WALRead(state, cur_page, targetPagePtr, count, tli, &errinfo))
 		WALReadRaiseError(&errinfo);
 
 	/* number of valid bytes in the buffer */
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 3bc9c82389..a00bcd30bf 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -943,11 +943,17 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
 	else
 		count = flushptr - targetPagePtr;	/* part of the page available */
 
-	/* now actually read the data, we know it's there */
+	/*
+	 * We determined how much of the page can be validly read as 'count', read
+	 * that much only, not the entire page. Since WALRead() can read the page
+	 * from WAL buffers, in which case, the page is not guaranteed to be
+	 * zero-padded up to the page boundary because of the concurrent
+	 * insertions.
+	 */
 	if (!WALRead(state,
 				 cur_page,
 				 targetPagePtr,
-				 XLOG_BLCKSZ,
+				 count,
 				 currTLI,		/* Pass the current TLI because only
 								 * WalSndSegmentOpen controls whether new TLI
 								 * is needed. */
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index a14126d164..9035a12a7b 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -193,6 +193,23 @@ typedef enum WALAvailability
 	WALAVAIL_REMOVED,			/* WAL segment has been removed */
 } WALAvailability;
 
+/* Return values from XLogReadFromBuffers. */
+typedef enum XLogReadFromBuffersResult
+{
+	XLREADBUFS_OK = 0,			/* no error */
+	XLREADBUFS_INVALID_INPUT = -1,	/* invalid startptr */
+	XLREADBUFS_IN_RECOVERY = -2,	/* read attempted when in recovery */
+
+	/* read attempted with TLI that's different from server insertion TLI */
+	XLREADBUFS_NOT_INSERT_TLI = -3,
+
+	/* read attempted for non-existent WAL */
+	XLREADBUFS_NON_EXISTENT_WAL = -4,
+
+	/* uninitialized WAL buffer page */
+	XLREADBUFS_UNINITIALIZED_WAL = -5
+} XLogReadFromBuffersResult;
+
 struct XLogRecData;
 struct XLogReaderState;
 
@@ -251,6 +268,12 @@ extern XLogRecPtr GetLastImportantRecPtr(void);
 
 extern void SetWalWriterSleeping(bool sleeping);
 
+extern XLogReadFromBuffersResult XLogReadFromBuffers(XLogRecPtr startptr,
+													 TimeLineID tli,
+													 Size bytes_to_read,
+													 char *buf,
+													 Size *bytes_read);
+
 /*
  * Routines used by xlogrecovery.c to call back into xlog.c during recovery.
  */
-- 
2.34.1

