On Mon, 2024-01-22 at 12:12 -0800, Andres Freund wrote:
> I still think that anything that requires such checks shouldn't be
> merged. It's completely bogus to check page contents for validity
> when we
> should have metadata telling us which range of the buffers is valid
> and which
> not.

The check seems entirely unnecessary, to me. A leftover from v18?

I have attached a new patch (version "19j") to illustrate some of my
previous suggestions. I didn't spend a lot of time on it so it's not
ready for commit, but I believe my suggestions are easier to understand
in code form.

Note that, right now, it only works for XLogSendPhysical(). I believe
it's best to just make it work for 1-3 callers that we understand well,
and we can generalize later if it makes sense.

I'm still not clear on why some callers are reading XLOG_BLCKSZ
(expecting zeros at the end), and if it's OK to just change them to use
the exact byte count.

Also, if we've detected that the first requested buffer has been
evicted, is there any value in continuing the loop to see if more
recent buffers are available? For example, if the requested LSNs range
over buffers 4, 5, and 6, and 4 has already been evicted, should we try
to return LSN data from 5 and 6 at the proper offset in the dest
buffer? If so, we'd need to adjust the API so the caller knows what
parts of the dest buffer were filled in.

Regards,
        Jeff Davis

From 34d89d6b869a454fd15097d8a0f0b6d3997a74da Mon Sep 17 00:00:00 2001
From: Jeff Davis <j...@j-davis.com>
Date: Mon, 22 Jan 2024 16:21:53 -0800
Subject: [PATCH v19j 1/2] Add XLogReadFromBuffers().

Allows reading directly from WAL buffers without a lock, avoiding the
need to wait for WAL flushing and read from the filesystem.

For now, the only caller is physical replication, but we can consider
expanding it to other callers as needed.

Author: Bharath Rupireddy
---
 src/backend/access/transam/xlog.c   | 145 ++++++++++++++++++++++++++--
 src/backend/replication/walsender.c |  14 +++
 src/include/access/xlog.h           |   2 +
 3 files changed, 152 insertions(+), 9 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 478377c4a2..c9619139af 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -698,7 +698,7 @@ static void ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos,
 									  XLogRecPtr *EndPos, XLogRecPtr *PrevPtr);
 static bool ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos,
 							  XLogRecPtr *PrevPtr);
-static XLogRecPtr WaitXLogInsertionsToFinish(XLogRecPtr upto);
+static XLogRecPtr WaitXLogInsertionsToFinish(XLogRecPtr upto, bool emitLog);
 static char *GetXLogBuffer(XLogRecPtr ptr, TimeLineID tli);
 static XLogRecPtr XLogBytePosToRecPtr(uint64 bytepos);
 static XLogRecPtr XLogBytePosToEndRecPtr(uint64 bytepos);
@@ -1494,7 +1494,7 @@ WALInsertLockUpdateInsertingAt(XLogRecPtr insertingAt)
  * to make room for a new one, which in turn requires WALWriteLock.
  */
 static XLogRecPtr
-WaitXLogInsertionsToFinish(XLogRecPtr upto)
+WaitXLogInsertionsToFinish(XLogRecPtr upto, bool emitLog)
 {
 	uint64		bytepos;
 	XLogRecPtr	reservedUpto;
@@ -1521,9 +1521,10 @@ WaitXLogInsertionsToFinish(XLogRecPtr upto)
 	 */
 	if (upto > reservedUpto)
 	{
-		ereport(LOG,
-				(errmsg("request to flush past end of generated WAL; request %X/%X, current position %X/%X",
-						LSN_FORMAT_ARGS(upto), LSN_FORMAT_ARGS(reservedUpto))));
+		if (emitLog)
+			ereport(LOG,
+					(errmsg("request to flush past end of generated WAL; request %X/%X, current position %X/%X",
+							LSN_FORMAT_ARGS(upto), LSN_FORMAT_ARGS(reservedUpto))));
 		upto = reservedUpto;
 	}
 
@@ -1705,6 +1706,132 @@ GetXLogBuffer(XLogRecPtr ptr, TimeLineID tli)
 	return cachedPos + ptr % XLOG_BLCKSZ;
 }
 
+/*
+ * Read WAL directly from WAL buffers, if available.
+ *
+ * This function reads 'count' bytes of WAL from WAL buffers into 'buf'
+ * starting at location 'startptr' and returns total bytes read.
+ *
+ * The bytes read may be fewer than requested if any of the WAL buffers in the
+ * requested range have been evicted, or if the last requested byte is beyond
+ * the Insert pointer.
+ *
+ * If reading beyond the Write pointer, this function will wait for concurent
+ * inserters to finish. Otherwise, it does not wait at all.
+ *
+ * The caller must ensure that it's reasonable to read from the WAL buffers,
+ * i.e. that the requested data is from the current timeline, that we're not
+ * in recovery, etc.
+ */
+Size
+XLogReadFromBuffers(char *buf, XLogRecPtr startptr, Size count)
+{
+	XLogRecPtr	 ptr	= startptr;
+	XLogRecPtr	 upto	= startptr + count;
+	Size		 nbytes = count;
+	Size		 ntotal = 0;
+	char		*dst	= buf;
+
+	Assert(!RecoveryInProgress());
+	Assert(!XLogRecPtrIsInvalid(startptr));
+
+	/*
+	 * Caller requested very recent WAL data. Wait for any in-progress WAL
+	 * insertions to WAL buffers to finish.
+	 *
+	 * Most callers will have already updated LogwrtResult when determining
+	 * how far to read, but it's OK if it's out of date. (XXX: is it worth
+	 * taking a spinlock to update LogwrtResult and check again before calling
+	 * WaitXLogInsertionsToFinish()?)
+	 */
+	if (upto > LogwrtResult.Write)
+	{
+		XLogRecPtr writtenUpto = WaitXLogInsertionsToFinish(upto, false);
+
+		upto = Min(upto, writtenUpto);
+		nbytes = upto - startptr;
+	}
+
+	/*
+	 * Loop through the buffers without a lock. For each buffer, atomically
+	 * read and verify the end pointer, then copy the data out, and finally
+	 * re-read and re-verify the end pointer.
+	 *
+	 * Once a page is evicted, it never returns to the WAL buffers, so if the
+	 * end pointer matches the expected end pointer before and after we copy
+	 * the data, then the right page must have been present during the data
+	 * copy. Read barriers are necessary to ensure that the data copy actually
+	 * happens between the two verification steps.
+	 *
+	 * If the verification fails, we simply terminate the loop and return the
+	 * data had been already copied out successfully.
+	 */
+	while (nbytes > 0)
+	{
+		XLogRecPtr	 expectedEndPtr;
+		XLogRecPtr	 endptr;
+		int			 idx;
+		const char	*page;
+		const char	*data;
+		Size		 nread;
+
+		idx = XLogRecPtrToBufIdx(ptr);
+		expectedEndPtr = ptr;
+		expectedEndPtr += XLOG_BLCKSZ - ptr % XLOG_BLCKSZ;
+
+		/*
+		 * First verification step: check that the correct page is present in
+		 * the WAL buffers
+		 */
+		endptr = pg_atomic_read_u64(&XLogCtl->xlblocks[idx]);
+		if (expectedEndPtr != endptr)
+			break;
+
+		/*
+		 * We found WAL buffer page containing given XLogRecPtr. Get starting
+		 * address of the page and a pointer to the right location of given
+		 * XLogRecPtr in that page.
+		 */
+		page = XLogCtl->pages + idx * (Size) XLOG_BLCKSZ;
+		data = page + ptr % XLOG_BLCKSZ;
+
+		/*
+		 * Ensure that the data copy and the first verification step are not
+		 * reordered
+		 */
+		pg_read_barrier();
+
+		/* how much is available on this page to read? */
+		nread = Min(nbytes, XLOG_BLCKSZ - (data - page));
+
+		/* data copy */
+		memcpy(dst, data, nread);
+
+		/*
+		 * Ensure that the data copy and the second verification step are not
+		 * reordered.
+		 */
+		pg_read_barrier();
+
+		/*
+		 * Second verification step: check that the page we read from wasn't
+		 * evicted while we were copying the data.
+		 */
+		endptr = pg_atomic_read_u64(&XLogCtl->xlblocks[idx]);
+		if (expectedEndPtr != endptr)
+			break;
+
+		dst += nread;
+		ptr += nread;
+		ntotal += nread;
+		nbytes -= nread;
+	}
+
+	Assert(ntotal <= count);
+
+	return ntotal;
+}
+
 /*
  * Converts a "usable byte position" to XLogRecPtr. A usable byte position
  * is the position starting from the beginning of WAL, excluding all WAL
@@ -1895,7 +2022,7 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
 				 */
 				LWLockRelease(WALBufMappingLock);
 
-				WaitXLogInsertionsToFinish(OldPageRqstPtr);
+				WaitXLogInsertionsToFinish(OldPageRqstPtr, true);
 
 				LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
 
@@ -2689,7 +2816,7 @@ XLogFlush(XLogRecPtr record)
 		 * Before actually performing the write, wait for all in-flight
 		 * insertions to the pages we're about to write to finish.
 		 */
-		insertpos = WaitXLogInsertionsToFinish(WriteRqstPtr);
+		insertpos = WaitXLogInsertionsToFinish(WriteRqstPtr, true);
 
 		/*
 		 * Try to get the write lock. If we can't get it immediately, wait
@@ -2740,7 +2867,7 @@ XLogFlush(XLogRecPtr record)
 			 * We're only calling it again to allow insertpos to be moved
 			 * further forward, not to actually wait for anyone.
 			 */
-			insertpos = WaitXLogInsertionsToFinish(insertpos);
+			insertpos = WaitXLogInsertionsToFinish(insertpos, true);
 		}
 
 		/* try to write/flush later additions to XLOG as well */
@@ -2919,7 +3046,7 @@ XLogBackgroundFlush(void)
 	START_CRIT_SECTION();
 
 	/* now wait for any in-progress insertions to finish and get write lock */
-	WaitXLogInsertionsToFinish(WriteRqst.Write);
+	WaitXLogInsertionsToFinish(WriteRqst.Write, true);
 	LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
 	LogwrtResult = XLogCtl->LogwrtResult;
 	if (WriteRqst.Write > LogwrtResult.Write ||
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 087031e9dc..b06f5e75d6 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -3125,6 +3125,20 @@ XLogSendPhysical(void)
 	enlargeStringInfo(&output_message, nbytes);
 
 retry:
+	/*
+	 * Read from WAL buffers, if available.
+	 */
+	if (!RecoveryInProgress() &&
+		xlogreader->seg.ws_tli == GetWALInsertionTimeLine())
+	{
+		Size rbytes = XLogReadFromBuffers(
+			&output_message.data[output_message.len],
+			startptr, nbytes);
+		output_message.len += rbytes;
+		startptr += rbytes;
+		nbytes -= rbytes;
+	}
+
 	if (!WALRead(xlogreader,
 				 &output_message.data[output_message.len],
 				 startptr,
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 301c5fa11f..5f2621b0b9 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -252,6 +252,8 @@ extern XLogRecPtr GetLastImportantRecPtr(void);
 
 extern void SetWalWriterSleeping(bool sleeping);
 
+extern Size XLogReadFromBuffers(char *buf, XLogRecPtr startptr, Size count);
+
 /*
  * Routines used by xlogrecovery.c to call back into xlog.c during recovery.
  */
-- 
2.34.1

From fc9bbbc28c59f5c0a88d658f76bfc421f9b9e34d Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostg...@gmail.com>
Date: Wed, 10 Jan 2024 13:36:15 +0000
Subject: [PATCH v19j 2/2] Add test module for verifying WAL read from WAL
 buffers

This commit adds a test module to verify WAL read from WAL
buffers.

Author: Bharath Rupireddy
Reviewed-by: Dilip Kumar
Discussion: https://www.postgresql.org/message-id/CALj2ACXKKK%3DwbiG5_t6dGao5GoecMwRkhr7GjVBM_jg54%2BNa%3DQ%40mail.gmail.com
---
 src/test/modules/Makefile                     |  1 +
 src/test/modules/meson.build                  |  1 +
 .../test_wal_read_from_buffers/.gitignore     |  4 ++
 .../test_wal_read_from_buffers/Makefile       | 23 ++++++++
 .../test_wal_read_from_buffers/meson.build    | 33 ++++++++++++
 .../test_wal_read_from_buffers/t/001_basic.pl | 54 +++++++++++++++++++
 .../test_wal_read_from_buffers--1.0.sql       | 16 ++++++
 .../test_wal_read_from_buffers.c              | 37 +++++++++++++
 .../test_wal_read_from_buffers.control        |  4 ++
 9 files changed, 173 insertions(+)
 create mode 100644 src/test/modules/test_wal_read_from_buffers/.gitignore
 create mode 100644 src/test/modules/test_wal_read_from_buffers/Makefile
 create mode 100644 src/test/modules/test_wal_read_from_buffers/meson.build
 create mode 100644 src/test/modules/test_wal_read_from_buffers/t/001_basic.pl
 create mode 100644 src/test/modules/test_wal_read_from_buffers/test_wal_read_from_buffers--1.0.sql
 create mode 100644 src/test/modules/test_wal_read_from_buffers/test_wal_read_from_buffers.c
 create mode 100644 src/test/modules/test_wal_read_from_buffers/test_wal_read_from_buffers.control

diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile
index e32c8925f6..c6e1f01dca 100644
--- a/src/test/modules/Makefile
+++ b/src/test/modules/Makefile
@@ -34,6 +34,7 @@ SUBDIRS = \
 		  test_rls_hooks \
 		  test_shm_mq \
 		  test_slru \
+		  test_wal_read_from_buffers \
 		  unsafe_tests \
 		  worker_spi \
 		  xid_wraparound
diff --git a/src/test/modules/meson.build b/src/test/modules/meson.build
index 397e0906e6..9595bbc342 100644
--- a/src/test/modules/meson.build
+++ b/src/test/modules/meson.build
@@ -32,6 +32,7 @@ subdir('test_resowner')
 subdir('test_rls_hooks')
 subdir('test_shm_mq')
 subdir('test_slru')
+subdir('test_wal_read_from_buffers')
 subdir('unsafe_tests')
 subdir('worker_spi')
 subdir('xid_wraparound')
diff --git a/src/test/modules/test_wal_read_from_buffers/.gitignore b/src/test/modules/test_wal_read_from_buffers/.gitignore
new file mode 100644
index 0000000000..5dcb3ff972
--- /dev/null
+++ b/src/test/modules/test_wal_read_from_buffers/.gitignore
@@ -0,0 +1,4 @@
+# Generated subdirectories
+/log/
+/results/
+/tmp_check/
diff --git a/src/test/modules/test_wal_read_from_buffers/Makefile b/src/test/modules/test_wal_read_from_buffers/Makefile
new file mode 100644
index 0000000000..7472494501
--- /dev/null
+++ b/src/test/modules/test_wal_read_from_buffers/Makefile
@@ -0,0 +1,23 @@
+# src/test/modules/test_wal_read_from_buffers/Makefile
+
+MODULE_big = test_wal_read_from_buffers
+OBJS = \
+	$(WIN32RES) \
+	test_wal_read_from_buffers.o
+PGFILEDESC = "test_wal_read_from_buffers - test module to read WAL from WAL buffers"
+
+EXTENSION = test_wal_read_from_buffers
+DATA = test_wal_read_from_buffers--1.0.sql
+
+TAP_TESTS = 1
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = src/test/modules/test_wal_read_from_buffers
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/src/test/modules/test_wal_read_from_buffers/meson.build b/src/test/modules/test_wal_read_from_buffers/meson.build
new file mode 100644
index 0000000000..40bd5dcd33
--- /dev/null
+++ b/src/test/modules/test_wal_read_from_buffers/meson.build
@@ -0,0 +1,33 @@
+# Copyright (c) 2023, PostgreSQL Global Development Group
+
+test_wal_read_from_buffers_sources = files(
+  'test_wal_read_from_buffers.c',
+)
+
+if host_system == 'windows'
+  test_wal_read_from_buffers_sources += rc_lib_gen.process(win32ver_rc, extra_args: [
+    '--NAME', 'test_wal_read_from_buffers',
+    '--FILEDESC', 'test_wal_read_from_buffers - test module to read WAL from WAL buffers',])
+endif
+
+test_wal_read_from_buffers = shared_module('test_wal_read_from_buffers',
+  test_wal_read_from_buffers_sources,
+  kwargs: pg_test_mod_args,
+)
+test_install_libs += test_wal_read_from_buffers
+
+test_install_data += files(
+  'test_wal_read_from_buffers.control',
+  'test_wal_read_from_buffers--1.0.sql',
+)
+
+tests += {
+  'name': 'test_wal_read_from_buffers',
+  'sd': meson.current_source_dir(),
+  'bd': meson.current_build_dir(),
+  'tap': {
+    'tests': [
+      't/001_basic.pl',
+    ],
+  },
+}
diff --git a/src/test/modules/test_wal_read_from_buffers/t/001_basic.pl b/src/test/modules/test_wal_read_from_buffers/t/001_basic.pl
new file mode 100644
index 0000000000..1d842bb02e
--- /dev/null
+++ b/src/test/modules/test_wal_read_from_buffers/t/001_basic.pl
@@ -0,0 +1,54 @@
+# Copyright (c) 2021-2023, PostgreSQL Global Development Group
+
+use strict;
+use warnings;
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+my $node = PostgreSQL::Test::Cluster->new('test');
+
+$node->init;
+
+# Ensure nobody interferes with us so that the WAL in WAL buffers don't get
+# overwritten while running tests.
+$node->append_conf(
+	'postgresql.conf', qq(
+autovacuum = off
+checkpoint_timeout = 1h
+wal_writer_delay = 10000ms
+wal_writer_flush_after = 1GB
+));
+$node->start;
+
+# Setup.
+$node->safe_psql('postgres', 'CREATE EXTENSION test_wal_read_from_buffers;');
+
+# Get current insert LSN. After this, we generate some WAL which is guranteed
+# to be in WAL buffers as there is no other WAL generating activity is
+# happening on the server. We then verify if we can read the WAL from WAL
+# buffers using this LSN.
+my $lsn = $node->safe_psql('postgres', 'SELECT pg_current_wal_insert_lsn();');
+
+# Generate minimal WAL so that WAL buffers don't get overwritten.
+$node->safe_psql('postgres',
+	"CREATE TABLE t (c int); INSERT INTO t VALUES (1);");
+
+# Check if WAL is successfully read from WAL buffers.
+my $result = $node->safe_psql('postgres',
+	qq{SELECT test_wal_read_from_buffers('$lsn');});
+is($result, 't', "WAL is successfully read from WAL buffers");
+
+# Check with a WAL that doesn't yet exist.
+$lsn = $node->safe_psql('postgres', 'SELECT pg_current_wal_flush_lsn()+8192;');
+$result = $node->safe_psql('postgres',
+	qq{SELECT test_wal_read_from_buffers('$lsn');});
+is($result, 'f', "WAL that doesn't yet exist is not read from WAL buffers");
+
+# Check with invalid input.
+$result = $node->safe_psql('postgres',
+	qq{SELECT test_wal_read_from_buffers('0/0');});
+is($result, 'f', "WAL is not read from WAL buffers with invalid input");
+
+done_testing();
diff --git a/src/test/modules/test_wal_read_from_buffers/test_wal_read_from_buffers--1.0.sql b/src/test/modules/test_wal_read_from_buffers/test_wal_read_from_buffers--1.0.sql
new file mode 100644
index 0000000000..c6ffb3fa65
--- /dev/null
+++ b/src/test/modules/test_wal_read_from_buffers/test_wal_read_from_buffers--1.0.sql
@@ -0,0 +1,16 @@
+/* src/test/modules/test_wal_read_from_buffers/test_wal_read_from_buffers--1.0.sql */
+
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "CREATE EXTENSION test_wal_read_from_buffers" to load this file. \quit
+
+--
+-- test_wal_read_from_buffers()
+--
+-- Returns true if WAL data at a given LSN can be read from WAL buffers.
+-- Otherwise returns false.
+--
+CREATE FUNCTION test_wal_read_from_buffers(IN lsn pg_lsn,
+    read_successful OUT boolean
+)
+AS 'MODULE_PATHNAME', 'test_wal_read_from_buffers'
+LANGUAGE C STRICT;
diff --git a/src/test/modules/test_wal_read_from_buffers/test_wal_read_from_buffers.c b/src/test/modules/test_wal_read_from_buffers/test_wal_read_from_buffers.c
new file mode 100644
index 0000000000..5368f59b16
--- /dev/null
+++ b/src/test/modules/test_wal_read_from_buffers/test_wal_read_from_buffers.c
@@ -0,0 +1,37 @@
+/*--------------------------------------------------------------------------
+ *
+ * test_wal_read_from_buffers.c
+ *		Test module to read WAL from WAL buffers.
+ *
+ * Portions Copyright (c) 2023, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *	src/test/modules/test_wal_read_from_buffers/test_wal_read_from_buffers.c
+ * -------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/xlog.h"
+#include "fmgr.h"
+#include "utils/pg_lsn.h"
+
+PG_MODULE_MAGIC;
+
+/*
+ * SQL function for verifying that WAL data at a given LSN can be read from WAL
+ * buffers. Returns true if read from WAL buffers, otherwise false.
+ */
+PG_FUNCTION_INFO_V1(test_wal_read_from_buffers);
+Datum
+test_wal_read_from_buffers(PG_FUNCTION_ARGS)
+{
+	XLogRecPtr	startptr		  = PG_GETARG_LSN(0);
+	char		data[XLOG_BLCKSZ] = {0};
+	Size		nread			  = 0;
+
+	if (!XLogRecPtrIsInvalid(startptr))
+		nread = XLogReadFromBuffers(data, startptr, XLOG_BLCKSZ);
+
+	PG_RETURN_BOOL(nread > 0);
+}
diff --git a/src/test/modules/test_wal_read_from_buffers/test_wal_read_from_buffers.control b/src/test/modules/test_wal_read_from_buffers/test_wal_read_from_buffers.control
new file mode 100644
index 0000000000..eda8d47954
--- /dev/null
+++ b/src/test/modules/test_wal_read_from_buffers/test_wal_read_from_buffers.control
@@ -0,0 +1,4 @@
+comment = 'Test module to read WAL from WAL buffers'
+default_version = '1.0'
+module_pathname = '$libdir/test_wal_read_from_buffers'
+relocatable = true
-- 
2.34.1

Reply via email to