On Fri, Feb 16, 2024 at 11:01 PM Jeff Davis <pg...@j-davis.com> wrote:
>
> > Here, I'm with v23 patch set:
>
> Thank you, I'll look at these.

Thanks. Here's the v24 patch set after rebasing.

--
Bharath Rupireddy
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com
From d0317ed91b1483a5556c87388e0186462711e022 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Sat, 17 Feb 2024 04:41:29 +0000
Subject: [PATCH v24 4/4] Demonstrate reading unflushed WAL directly from WAL
 buffers

---
 src/backend/access/transam/xlogreader.c       |   3 +-
 .../read_wal_from_buffers--1.0.sql            |  23 ++
 .../read_wal_from_buffers.c                   | 266 +++++++++++++++++-
 .../read_wal_from_buffers/t/001_basic.pl      |  35 +++
 4 files changed, 325 insertions(+), 2 deletions(-)

diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index ae9904e7e4..4658a86997 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -1035,7 +1035,8 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
 	 * record is.  This is so that we can check the additional identification
 	 * info that is present in the first page's "long" header.
 	 */
-	if (targetSegNo != state->seg.ws_segno && targetPageOff != 0)
+	if (state->seg.ws_segno != 0 &&
+		targetSegNo != state->seg.ws_segno && targetPageOff != 0)
 	{
 		XLogRecPtr	targetSegmentPtr = pageptr - targetPageOff;
 
diff --git a/src/test/modules/read_wal_from_buffers/read_wal_from_buffers--1.0.sql b/src/test/modules/read_wal_from_buffers/read_wal_from_buffers--1.0.sql
index 82fa097d10..72d05522fc 100644
--- a/src/test/modules/read_wal_from_buffers/read_wal_from_buffers--1.0.sql
+++ b/src/test/modules/read_wal_from_buffers/read_wal_from_buffers--1.0.sql
@@ -12,3 +12,26 @@ CREATE FUNCTION read_wal_from_buffers(IN lsn pg_lsn, IN bytes_to_read int,
     bytes_read OUT int)
 AS 'MODULE_PATHNAME', 'read_wal_from_buffers'
 LANGUAGE C STRICT;
+
+--
+-- get_wal_records_info_from_buffers()
+--
+-- SQL function to get info of WAL records available in WAL buffers.
+--
+CREATE FUNCTION get_wal_records_info_from_buffers(IN start_lsn pg_lsn,
+    IN end_lsn pg_lsn,
+    OUT start_lsn pg_lsn,
+    OUT end_lsn pg_lsn,
+    OUT prev_lsn pg_lsn,
+    OUT xid xid,
+    OUT resource_manager text,
+    OUT record_type text,
+    OUT record_length int4,
+    OUT main_data_length int4,
+    OUT fpi_length int4,
+    OUT description text,
+    OUT block_ref text
+)
+RETURNS SETOF record
+AS 'MODULE_PATHNAME', 'get_wal_records_info_from_buffers'
+LANGUAGE C STRICT PARALLEL SAFE;
diff --git a/src/test/modules/read_wal_from_buffers/read_wal_from_buffers.c b/src/test/modules/read_wal_from_buffers/read_wal_from_buffers.c
index 9df5c07b4b..ed33a14127 100644
--- a/src/test/modules/read_wal_from_buffers/read_wal_from_buffers.c
+++ b/src/test/modules/read_wal_from_buffers/read_wal_from_buffers.c
@@ -14,11 +14,27 @@
 #include "postgres.h"
 
 #include "access/xlog.h"
-#include "fmgr.h"
+#include "access/xlog_internal.h"
+#include "access/xlogreader.h"
+#include "access/xlogrecovery.h"
+#include "funcapi.h"
+#include "miscadmin.h"
+#include "utils/builtins.h"
 #include "utils/pg_lsn.h"
 
 PG_MODULE_MAGIC;
 
+static int	read_from_wal_buffers(XLogReaderState *state, XLogRecPtr targetPagePtr,
+								  int reqLen, XLogRecPtr targetRecPtr,
+								  char *cur_page);
+
+static XLogRecord *ReadNextXLogRecord(XLogReaderState *xlogreader);
+static void GetWALRecordInfo(XLogReaderState *record, Datum *values,
+							 bool *nulls, uint32 ncols);
+static void GetWALRecordsInfo(FunctionCallInfo fcinfo,
+							  XLogRecPtr start_lsn,
+							  XLogRecPtr end_lsn);
+
 /*
  * SQL function to read WAL from WAL buffers. Returns number of bytes read.
  */
@@ -52,3 +68,251 @@ read_wal_from_buffers(PG_FUNCTION_ARGS)
 
 	PG_RETURN_INT32(read);
 }
+
+/*
+ * XLogReaderRoutine->page_read callback for reading WAL from WAL buffers.
+ */
+static int
+read_from_wal_buffers(XLogReaderState *state, XLogRecPtr targetPagePtr,
+					  int reqLen, XLogRecPtr targetRecPtr,
+					  char *cur_page)
+{
+	XLogRecPtr	read_upto,
+				loc;
+	TimeLineID	tli = GetWALInsertionTimeLine();
+	Size		count;
+	Size		read = 0;
+
+	loc = targetPagePtr + reqLen;
+
+	/* Loop waiting for xlog to be available if necessary */
+	while (1)
+	{
+		read_upto = GetXLogInsertRecPtr();
+
+		if (loc <= read_upto)
+			break;
+
+		WaitXLogInsertionsToFinish(loc);
+
+		CHECK_FOR_INTERRUPTS();
+		pg_usleep(1000L);
+	}
+
+	if (targetPagePtr + XLOG_BLCKSZ <= read_upto)
+	{
+		/*
+		 * more than one block available; read only that block, have caller
+		 * come back if they need more.
+		 */
+		count = XLOG_BLCKSZ;
+	}
+	else if (targetPagePtr + reqLen > read_upto)
+	{
+		/* not enough data there */
+		return -1;
+	}
+	else
+	{
+		/* enough bytes available to satisfy the request */
+		count = read_upto - targetPagePtr;
+	}
+
+	/* read WAL from WAL buffers */
+	read = WALReadFromBuffers(cur_page, targetPagePtr, count, tli);
+
+	if (read != count)
+		ereport(ERROR,
+				errmsg("could not read fully from WAL buffers; expected %lu, read %lu",
+					   count, read));
+
+	return count;
+}
+
+/*
+ * Get info of all WAL records between start LSN and end LSN.
+ *
+ * This function and its helpers below are similar to pg_walinspect's
+ * pg_get_wal_records_info() except that it will get info of WAL records
+ * available in WAL buffers.
+ */
+PG_FUNCTION_INFO_V1(get_wal_records_info_from_buffers);
+Datum
+get_wal_records_info_from_buffers(PG_FUNCTION_ARGS)
+{
+	XLogRecPtr	start_lsn = PG_GETARG_LSN(0);
+	XLogRecPtr	end_lsn = PG_GETARG_LSN(1);
+
+	/*
+	 * Validate start and end LSNs coming from the function inputs.
+	 *
+	 * Reading WAL below the first page of the first segments isn't allowed.
+	 * This is a bootstrap WAL page and the page_read callback fails to read
+	 * it.
+	 */
+	if (start_lsn < XLOG_BLCKSZ)
+		ereport(ERROR,
+				(errmsg("could not read WAL at LSN %X/%X",
+						LSN_FORMAT_ARGS(start_lsn))));
+
+	if (start_lsn > end_lsn)
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("WAL start LSN must be less than end LSN")));
+
+	GetWALRecordsInfo(fcinfo, start_lsn, end_lsn);
+
+	PG_RETURN_VOID();
+}
+
+/*
+ * Read next WAL record.
+ */
+static XLogRecord *
+ReadNextXLogRecord(XLogReaderState *xlogreader)
+{
+	XLogRecord *record;
+	char	   *errormsg;
+
+	record = XLogReadRecord(xlogreader, &errormsg);
+
+	if (record == NULL)
+	{
+		if (errormsg)
+			ereport(ERROR,
+					errmsg("could not read WAL at %X/%X: %s",
+						   LSN_FORMAT_ARGS(xlogreader->EndRecPtr), errormsg));
+		else
+			ereport(ERROR,
+					errmsg("could not read WAL at %X/%X",
+						   LSN_FORMAT_ARGS(xlogreader->EndRecPtr)));
+	}
+
+	return record;
+}
+
+/*
+ * Output values that make up a row describing caller's WAL record.
+ */
+static void
+GetWALRecordInfo(XLogReaderState *record, Datum *values,
+				 bool *nulls, uint32 ncols)
+{
+	const char *record_type;
+	RmgrData	desc;
+	uint32		fpi_len = 0;
+	StringInfoData rec_desc;
+	StringInfoData rec_blk_ref;
+	int			i = 0;
+
+	desc = GetRmgr(XLogRecGetRmid(record));
+	record_type = desc.rm_identify(XLogRecGetInfo(record));
+
+	if (record_type == NULL)
+		record_type = psprintf("UNKNOWN (%x)", XLogRecGetInfo(record) & ~XLR_INFO_MASK);
+
+	initStringInfo(&rec_desc);
+	desc.rm_desc(&rec_desc, record);
+
+	if (XLogRecHasAnyBlockRefs(record))
+	{
+		initStringInfo(&rec_blk_ref);
+		XLogRecGetBlockRefInfo(record, false, true, &rec_blk_ref, &fpi_len);
+	}
+
+	values[i++] = LSNGetDatum(record->ReadRecPtr);
+	values[i++] = LSNGetDatum(record->EndRecPtr);
+	values[i++] = LSNGetDatum(XLogRecGetPrev(record));
+	values[i++] = TransactionIdGetDatum(XLogRecGetXid(record));
+	values[i++] = CStringGetTextDatum(desc.rm_name);
+	values[i++] = CStringGetTextDatum(record_type);
+	values[i++] = UInt32GetDatum(XLogRecGetTotalLen(record));
+	values[i++] = UInt32GetDatum(XLogRecGetDataLen(record));
+	values[i++] = UInt32GetDatum(fpi_len);
+
+	if (rec_desc.len > 0)
+		values[i++] = CStringGetTextDatum(rec_desc.data);
+	else
+		nulls[i++] = true;
+
+	if (XLogRecHasAnyBlockRefs(record))
+		values[i++] = CStringGetTextDatum(rec_blk_ref.data);
+	else
+		nulls[i++] = true;
+
+	Assert(i == ncols);
+}
+
+/*
+ * Get info of all WAL records between start LSN and end LSN.
+ */
+static void
+GetWALRecordsInfo(FunctionCallInfo fcinfo, XLogRecPtr start_lsn,
+				  XLogRecPtr end_lsn)
+{
+#define GET_WAL_RECORDS_INFO_FROM_BUFFERS_COLS 11
+	XLogReaderState *xlogreader;
+	XLogRecPtr	first_valid_record;
+	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+	MemoryContext old_cxt;
+	MemoryContext tmp_cxt;
+
+	Assert(start_lsn <= end_lsn);
+
+	InitMaterializedSRF(fcinfo, 0);
+
+	xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
+									XL_ROUTINE(.page_read = &read_from_wal_buffers,
+											   .segment_open = NULL,
+											   .segment_close = NULL),
+									NULL);
+
+	if (xlogreader == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_OUT_OF_MEMORY),
+				 errmsg("out of memory"),
+				 errdetail("Failed while allocating a WAL reading processor.")));
+
+	/* first find a valid recptr to start from */
+	first_valid_record = XLogFindNextRecord(xlogreader, start_lsn);
+
+	if (XLogRecPtrIsInvalid(first_valid_record))
+	{
+		ereport(LOG,
+				(errmsg("could not find a valid record after %X/%X",
+						LSN_FORMAT_ARGS(start_lsn))));
+
+		return;
+	}
+
+	tmp_cxt = AllocSetContextCreate(CurrentMemoryContext,
+									"GetWALRecordsInfo temporary cxt",
+									ALLOCSET_DEFAULT_SIZES);
+
+	while (ReadNextXLogRecord(xlogreader) &&
+		   xlogreader->EndRecPtr <= end_lsn)
+	{
+		Datum		values[GET_WAL_RECORDS_INFO_FROM_BUFFERS_COLS] = {0};
+		bool		nulls[GET_WAL_RECORDS_INFO_FROM_BUFFERS_COLS] = {0};
+
+		/* Use the tmp context so we can clean up after each tuple is done */
+		old_cxt = MemoryContextSwitchTo(tmp_cxt);
+
+		GetWALRecordInfo(xlogreader, values, nulls,
+						 GET_WAL_RECORDS_INFO_FROM_BUFFERS_COLS);
+
+		tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
+							 values, nulls);
+
+		/* clean up and switch back */
+		MemoryContextSwitchTo(old_cxt);
+		MemoryContextReset(tmp_cxt);
+
+		CHECK_FOR_INTERRUPTS();
+	}
+
+	MemoryContextDelete(tmp_cxt);
+	XLogReaderFree(xlogreader);
+
+#undef GET_WAL_RECORDS_INFO_FROM_BUFFERS_COLS
+}
diff --git a/src/test/modules/read_wal_from_buffers/t/001_basic.pl b/src/test/modules/read_wal_from_buffers/t/001_basic.pl
index f985e49a27..fcdcdb001e 100644
--- a/src/test/modules/read_wal_from_buffers/t/001_basic.pl
+++ b/src/test/modules/read_wal_from_buffers/t/001_basic.pl
@@ -69,4 +69,39 @@ for (my $i = 0; $i < 10 * $PostgreSQL::Test::Utils::timeout_default; $i++)
 }
 ok($result, 'waited until WAL is successfully read from WAL buffers');
 
+$result = 0;
+
+# Wait until we get info of WAL records available in WAL buffers.
+for (my $i = 0; $i < 10 * $PostgreSQL::Test::Utils::timeout_default; $i++)
+{
+	$node->safe_psql('postgres', "DROP TABLE IF EXISTS foo, bar;");
+	$node->safe_psql('postgres',
+		"CREATE TABLE foo AS SELECT * FROM generate_series(1, 2);");
+	my $start_lsn = $node->safe_psql('postgres',
+		"SELECT pg_current_wal_insert_lsn();");
+	my $tbl_oid = $node->safe_psql('postgres',
+		"SELECT oid FROM pg_class WHERE relname = 'foo';");
+	$node->safe_psql('postgres',
+		"INSERT INTO foo SELECT * FROM generate_series(1, 10);");
+	my $end_lsn = $node->safe_psql('postgres',
+		"SELECT pg_current_wal_insert_lsn();");
+	$node->safe_psql('postgres',
+		"CREATE TABLE bar AS SELECT * FROM generate_series(1, 2);");
+
+	my $res = $node->safe_psql('postgres',
+				"SELECT count(*) FROM get_wal_records_info_from_buffers('$start_lsn', '$end_lsn')
+					WHERE block_ref LIKE concat('%', '$tbl_oid', '%') AND
+						resource_manager = 'Heap' AND
+						record_type = 'INSERT';");
+
+	if ($res eq 10)
+	{
+		$result = 1;
+		last;
+	}
+
+	usleep(100_000);
+}
+ok($result, 'waited until we get info of WAL records available in WAL buffers.');
+
 done_testing();
-- 
2.34.1

From 648e261505ac819c85112276e7b6054105f22e13 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Sat, 17 Feb 2024 04:32:09 +0000
Subject: [PATCH v24 1/4] Add check in WALReadFromBuffers against requested WAL

---
 src/backend/access/transam/xlog.c       | 26 ++++++++++++++++++-------
 src/backend/access/transam/xlogreader.c |  3 +++
 src/include/access/xlog.h               |  1 +
 3 files changed, 23 insertions(+), 7 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 50c347a679..b01a3b4ed1 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -698,7 +698,6 @@ static void ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos,
 									  XLogRecPtr *EndPos, XLogRecPtr *PrevPtr);
 static bool ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos,
 							  XLogRecPtr *PrevPtr);
-static XLogRecPtr WaitXLogInsertionsToFinish(XLogRecPtr upto);
 static char *GetXLogBuffer(XLogRecPtr ptr, TimeLineID tli);
 static XLogRecPtr XLogBytePosToRecPtr(uint64 bytepos);
 static XLogRecPtr XLogBytePosToEndRecPtr(uint64 bytepos);
@@ -1493,7 +1492,7 @@ WALInsertLockUpdateInsertingAt(XLogRecPtr insertingAt)
  * uninitialized page), and the inserter might need to evict an old WAL buffer
  * to make room for a new one, which in turn requires WALWriteLock.
  */
-static XLogRecPtr
+XLogRecPtr
 WaitXLogInsertionsToFinish(XLogRecPtr upto)
 {
 	uint64		bytepos;
@@ -1710,13 +1709,14 @@ GetXLogBuffer(XLogRecPtr ptr, TimeLineID tli)
  * of bytes read successfully.
  *
  * Fewer than 'count' bytes may be read if some of the requested WAL data has
- * already been evicted.
+ * already been evicted from the WAL buffers.
  *
  * No locks are taken.
  *
- * Caller should ensure that it reads no further than LogwrtResult.Write
- * (which should have been updated by the caller when determining how far to
- * read). The 'tli' argument is only used as a convenient safety check so that
+ * Caller should ensure that it reads no further than current insert position
+ * with the help of WaitXLogInsertionsToFinish().
+ *
+ * The 'tli' argument is only used as a convenient safety check so that
  * callers do not read from WAL buffers on a historical timeline.
  */
 Size
@@ -1731,7 +1731,19 @@ WALReadFromBuffers(char *dstbuf, XLogRecPtr startptr, Size count,
 		return 0;
 
 	Assert(!XLogRecPtrIsInvalid(startptr));
-	Assert(startptr + count <= LogwrtResult.Write);
+
+#ifdef USE_ASSERT_CHECKING
+	{
+		XLogRecPtr	upto = startptr + count;
+		XLogRecPtr	insert_pos = GetXLogInsertRecPtr();
+
+		if (upto > insert_pos)
+			ereport(ERROR,
+					(errmsg("cannot read past end of current insert position; request %X/%X, insert position %X/%X",
+							LSN_FORMAT_ARGS(upto),
+							LSN_FORMAT_ARGS(insert_pos))));
+	}
+#endif
 
 	/*
 	 * Loop through the buffers without a lock. For each buffer, atomically
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 74a6b11866..ae9904e7e4 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -1500,6 +1500,9 @@ err:
  *
  * Returns true if succeeded, false if an error occurs, in which case
  * 'errinfo' receives error details.
+ *
+ * Note: It is the caller's responsibility to ensure requested WAL is written
+ * to disk, that is 'startptr'+'count' > LogwrtResult.Write.
  */
 bool
 WALRead(XLogReaderState *state,
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 76787a8267..74606a6846 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -252,6 +252,7 @@ extern XLogRecPtr GetLastImportantRecPtr(void);
 
 extern void SetWalWriterSleeping(bool sleeping);
 
+extern XLogRecPtr WaitXLogInsertionsToFinish(XLogRecPtr upto);
 extern Size WALReadFromBuffers(char *dstbuf, XLogRecPtr startptr, Size count,
 							   TimeLineID tli);
 
-- 
2.34.1

From 6f3b49ef70f0cab1f9191423db0d419ff3771211 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Sat, 17 Feb 2024 04:32:33 +0000
Subject: [PATCH v24 2/4] Add test module for verifying read from WAL buffers

---
 src/test/modules/Makefile                     |  1 +
 src/test/modules/meson.build                  |  1 +
 .../modules/read_wal_from_buffers/.gitignore  |  4 ++
 .../modules/read_wal_from_buffers/Makefile    | 23 ++++++
 .../modules/read_wal_from_buffers/meson.build | 33 +++++++++
 .../read_wal_from_buffers--1.0.sql            | 14 ++++
 .../read_wal_from_buffers.c                   | 54 ++++++++++++++
 .../read_wal_from_buffers.control             |  4 ++
 .../read_wal_from_buffers/t/001_basic.pl      | 72 +++++++++++++++++++
 9 files changed, 206 insertions(+)
 create mode 100644 src/test/modules/read_wal_from_buffers/.gitignore
 create mode 100644 src/test/modules/read_wal_from_buffers/Makefile
 create mode 100644 src/test/modules/read_wal_from_buffers/meson.build
 create mode 100644 src/test/modules/read_wal_from_buffers/read_wal_from_buffers--1.0.sql
 create mode 100644 src/test/modules/read_wal_from_buffers/read_wal_from_buffers.c
 create mode 100644 src/test/modules/read_wal_from_buffers/read_wal_from_buffers.control
 create mode 100644 src/test/modules/read_wal_from_buffers/t/001_basic.pl

diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile
index 89aa41b5e3..864a3dd72b 100644
--- a/src/test/modules/Makefile
+++ b/src/test/modules/Makefile
@@ -12,6 +12,7 @@ SUBDIRS = \
 		  dummy_seclabel \
 		  libpq_pipeline \
 		  plsample \
+		  read_wal_from_buffers \
 		  spgist_name_ops \
 		  test_bloomfilter \
 		  test_copy_callbacks \
diff --git a/src/test/modules/meson.build b/src/test/modules/meson.build
index 8fbe742d38..4f3dd69e58 100644
--- a/src/test/modules/meson.build
+++ b/src/test/modules/meson.build
@@ -33,6 +33,7 @@ subdir('test_resowner')
 subdir('test_rls_hooks')
 subdir('test_shm_mq')
 subdir('test_slru')
+subdir('read_wal_from_buffers')
 subdir('unsafe_tests')
 subdir('worker_spi')
 subdir('xid_wraparound')
diff --git a/src/test/modules/read_wal_from_buffers/.gitignore b/src/test/modules/read_wal_from_buffers/.gitignore
new file mode 100644
index 0000000000..5dcb3ff972
--- /dev/null
+++ b/src/test/modules/read_wal_from_buffers/.gitignore
@@ -0,0 +1,4 @@
+# Generated subdirectories
+/log/
+/results/
+/tmp_check/
diff --git a/src/test/modules/read_wal_from_buffers/Makefile b/src/test/modules/read_wal_from_buffers/Makefile
new file mode 100644
index 0000000000..9e57a837f9
--- /dev/null
+++ b/src/test/modules/read_wal_from_buffers/Makefile
@@ -0,0 +1,23 @@
+# src/test/modules/read_wal_from_buffers/Makefile
+
+MODULE_big = read_wal_from_buffers
+OBJS = \
+	$(WIN32RES) \
+	read_wal_from_buffers.o
+PGFILEDESC = "read_wal_from_buffers - test module to read WAL from WAL buffers"
+
+EXTENSION = read_wal_from_buffers
+DATA = read_wal_from_buffers--1.0.sql
+
+TAP_TESTS = 1
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = src/test/modules/read_wal_from_buffers
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/src/test/modules/read_wal_from_buffers/meson.build b/src/test/modules/read_wal_from_buffers/meson.build
new file mode 100644
index 0000000000..3fac00d616
--- /dev/null
+++ b/src/test/modules/read_wal_from_buffers/meson.build
@@ -0,0 +1,33 @@
+# Copyright (c) 2024, PostgreSQL Global Development Group
+
+read_wal_from_buffers_sources = files(
+  'read_wal_from_buffers.c',
+)
+
+if host_system == 'windows'
+  read_wal_from_buffers_sources += rc_lib_gen.process(win32ver_rc, extra_args: [
+    '--NAME', 'read_wal_from_buffers',
+    '--FILEDESC', 'read_wal_from_buffers - test module to read WAL from WAL buffers',])
+endif
+
+read_wal_from_buffers = shared_module('read_wal_from_buffers',
+  read_wal_from_buffers_sources,
+  kwargs: pg_test_mod_args,
+)
+test_install_libs += read_wal_from_buffers
+
+test_install_data += files(
+  'read_wal_from_buffers.control',
+  'read_wal_from_buffers--1.0.sql',
+)
+
+tests += {
+  'name': 'read_wal_from_buffers',
+  'sd': meson.current_source_dir(),
+  'bd': meson.current_build_dir(),
+  'tap': {
+    'tests': [
+      't/001_basic.pl',
+    ],
+  },
+}
diff --git a/src/test/modules/read_wal_from_buffers/read_wal_from_buffers--1.0.sql b/src/test/modules/read_wal_from_buffers/read_wal_from_buffers--1.0.sql
new file mode 100644
index 0000000000..82fa097d10
--- /dev/null
+++ b/src/test/modules/read_wal_from_buffers/read_wal_from_buffers--1.0.sql
@@ -0,0 +1,14 @@
+/* src/test/modules/read_wal_from_buffers/read_wal_from_buffers--1.0.sql */
+
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "CREATE EXTENSION read_wal_from_buffers" to load this file. \quit
+
+--
+-- read_wal_from_buffers()
+--
+-- SQL function to read WAL from WAL buffers. Returns number of bytes read.
+--
+CREATE FUNCTION read_wal_from_buffers(IN lsn pg_lsn, IN bytes_to_read int,
+    bytes_read OUT int)
+AS 'MODULE_PATHNAME', 'read_wal_from_buffers'
+LANGUAGE C STRICT;
diff --git a/src/test/modules/read_wal_from_buffers/read_wal_from_buffers.c b/src/test/modules/read_wal_from_buffers/read_wal_from_buffers.c
new file mode 100644
index 0000000000..9df5c07b4b
--- /dev/null
+++ b/src/test/modules/read_wal_from_buffers/read_wal_from_buffers.c
@@ -0,0 +1,54 @@
+/*--------------------------------------------------------------------------
+ *
+ * read_wal_from_buffers.c
+ *		Test module to read WAL from WAL buffers.
+ *
+ * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *	src/test/modules/read_wal_from_buffers/read_wal_from_buffers.c
+ * -------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/xlog.h"
+#include "fmgr.h"
+#include "utils/pg_lsn.h"
+
+PG_MODULE_MAGIC;
+
+/*
+ * SQL function to read WAL from WAL buffers. Returns number of bytes read.
+ */
+PG_FUNCTION_INFO_V1(read_wal_from_buffers);
+Datum
+read_wal_from_buffers(PG_FUNCTION_ARGS)
+{
+	XLogRecPtr	startptr = PG_GETARG_LSN(0);
+	int32		count = PG_GETARG_INT32(1);
+	Size		read;
+	char	   *data = palloc0(count);
+	XLogRecPtr	upto = startptr + count;
+	XLogRecPtr	insert_pos = GetXLogInsertRecPtr();
+	TimeLineID	tli = GetWALInsertionTimeLine();
+
+	/*
+	 * The requested WAL may be very recent, so wait for any in-progress WAL
+	 * insertions to WAL buffers to finish.
+	 */
+	if (upto > insert_pos)
+	{
+		XLogRecPtr	writtenUpto = WaitXLogInsertionsToFinish(upto);
+
+		upto = Min(upto, writtenUpto);
+		count = upto - startptr;
+	}
+
+	read = WALReadFromBuffers(data, startptr, count, tli);
+
+	pfree(data);
+
+	PG_RETURN_INT32(read);
+}
diff --git a/src/test/modules/read_wal_from_buffers/read_wal_from_buffers.control b/src/test/modules/read_wal_from_buffers/read_wal_from_buffers.control
new file mode 100644
index 0000000000..b14d24751c
--- /dev/null
+++ b/src/test/modules/read_wal_from_buffers/read_wal_from_buffers.control
@@ -0,0 +1,4 @@
+comment = 'Test module to read WAL from WAL buffers'
+default_version = '1.0'
+module_pathname = '$libdir/read_wal_from_buffers'
+relocatable = true
diff --git a/src/test/modules/read_wal_from_buffers/t/001_basic.pl b/src/test/modules/read_wal_from_buffers/t/001_basic.pl
new file mode 100644
index 0000000000..f985e49a27
--- /dev/null
+++ b/src/test/modules/read_wal_from_buffers/t/001_basic.pl
@@ -0,0 +1,72 @@
+# Copyright (c) 2021-2023, PostgreSQL Global Development Group
+
+use strict;
+use warnings;
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+use Time::HiRes qw(usleep);
+
+# Setup a new node.  The configuration chosen here minimizes the number
+# of arbitrary records that could get generated in a cluster.  Enlarging
+# checkpoint_timeout avoids noise with checkpoint activity.  wal_level
+# set to "minimal" avoids random standby snapshot records.  Autovacuum
+# could also trigger randomly, generating random WAL activity of its own.
+# Enlarging wal_writer_delay and wal_writer_flush_after avoid background
+# wal flush by walwriter.
+my $node = PostgreSQL::Test::Cluster->new("node");
+$node->init;
+$node->append_conf(
+	'postgresql.conf',
+	q[wal_level = minimal
+	  autovacuum = off
+	  checkpoint_timeout = '30min'
+	  wal_writer_delay = 10000ms
+	  wal_writer_flush_after = 1GB
+]);
+$node->start;
+
+# Setup.
+$node->safe_psql('postgres', 'CREATE EXTENSION read_wal_from_buffers;');
+
+$node->safe_psql('postgres', 'CREATE TABLE t (c int);');
+
+my $result = 0;
+my $lsn;
+my $to_read;
+
+# Wait until we read from WAL buffers
+for (my $i = 0; $i < 10 * $PostgreSQL::Test::Utils::timeout_default; $i++)
+{
+	# Get current insert LSN. After this, we generate some WAL which is guranteed
+	# to be in WAL buffers as there is no other WAL generating activity is
+	# happening on the server. We then verify if we can read the WAL from WAL
+	# buffers using this LSN.
+	$lsn = $node->safe_psql('postgres', 'SELECT pg_current_wal_insert_lsn();');
+
+	my $logstart = -s $node->logfile;
+
+	# Generate minimal WAL so that WAL buffers don't get overwritten.
+	$node->safe_psql('postgres', "INSERT INTO t VALUES ($i);");
+
+	$to_read = 8192;
+
+	my $res = $node->safe_psql('postgres',
+				qq{SELECT read_wal_from_buffers(lsn := '$lsn', bytes_to_read := $to_read) > 0;});
+
+	my $log = $node->log_contains(
+				"request to flush past end of generated WAL; request .*, current position .*",
+				$logstart);
+
+	if ($res eq 't' && $log > 0)
+	{
+		$result = 1;
+		last;
+	}
+
+	usleep(100_000);
+}
+ok($result, 'waited until WAL is successfully read from WAL buffers');
+
+done_testing();
-- 
2.34.1

From c590fc71291514d761e1d2e8d865fa348d0c206a Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Sat, 17 Feb 2024 04:40:05 +0000
Subject: [PATCH v24 3/4] Use WALReadFromBuffers in more places

---
 src/backend/access/transam/xlogutils.c | 13 ++++++++++++-
 src/backend/replication/walsender.c    | 16 +++++++++++++---
 2 files changed, 25 insertions(+), 4 deletions(-)

diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index ad93035d50..8fb2e68e85 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -895,6 +895,8 @@ read_local_xlog_page_guts(XLogReaderState *state, XLogRecPtr targetPagePtr,
 	int			count;
 	WALReadError errinfo;
 	TimeLineID	currTLI;
+	Size		nbytes;
+	Size		rbytes;
 
 	loc = targetPagePtr + reqLen;
 
@@ -1007,7 +1009,16 @@ read_local_xlog_page_guts(XLogReaderState *state, XLogRecPtr targetPagePtr,
 		count = read_upto - targetPagePtr;
 	}
 
-	if (!WALRead(state, cur_page, targetPagePtr, count, tli,
+	/* attempt to read WAL from WAL buffers first */
+	nbytes = count;
+	rbytes = WALReadFromBuffers(cur_page, targetPagePtr, nbytes, currTLI);
+	cur_page += rbytes;
+	targetPagePtr += rbytes;
+	nbytes -= rbytes;
+
+	/* now read the remaining WAL from WAL file */
+	if (nbytes > 0 &&
+		!WALRead(state, cur_page, targetPagePtr, nbytes, tli,
 				 &errinfo))
 		WALReadRaiseError(&errinfo);
 
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 631d1e0c9f..7ecc7174a0 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1059,6 +1059,8 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
 	WALReadError errinfo;
 	XLogSegNo	segno;
 	TimeLineID	currTLI;
+	Size		nbytes;
+	Size		rbytes;
 
 	/*
 	 * Make sure we have enough WAL available before retrieving the current
@@ -1095,11 +1097,19 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
 	else
 		count = flushptr - targetPagePtr;	/* part of the page available */
 
-	/* now actually read the data, we know it's there */
-	if (!WALRead(state,
+	/* attempt to read WAL from WAL buffers first */
+	nbytes = count;
+	rbytes = WALReadFromBuffers(cur_page, targetPagePtr, nbytes, currTLI);
+	cur_page += rbytes;
+	targetPagePtr += rbytes;
+	nbytes -= rbytes;
+
+	/* now read the remaining WAL from WAL file */
+	if (nbytes > 0 &&
+		!WALRead(state,
 				 cur_page,
 				 targetPagePtr,
-				 count,
+				 nbytes,
 				 currTLI,		/* Pass the current TLI because only
 								 * WalSndSegmentOpen controls whether new TLI
 								 * is needed. */
-- 
2.34.1

Reply via email to