Hi,

On Sat, Jun 8, 2024 at 5:24 PM Nitin Jadhav <nitinjadhavpostg...@gmail.com>
wrote:
>
> I spent some time examining the patch. Here are my observations from the
review.

Thanks.

> - I believe there’s no need for an extra variable ‘nbytes’ in this
> context. We can repurpose the ‘count’ variable for the same function.
> If necessary, we might think about renaming ‘count’ to ‘nbytes’.

'count' variable can't be altered once determined as the page_read
callbacks need to return the total number of bytes read. However, I ended
up removing 'nbytes' like in the attached v2 patch.

> - The operations performed by logical_read_xlog_page() and
> read_local_xlog_page_guts() are identical. It might be beneficial to
> create a shared function to minimize code repetition.

IMO, creating another function to just wrap two other functions doesn't
seem good to me.

I attached v2 patches for further review. No changes in 0002 patch.

--
Bharath Rupireddy
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com
From 6902b207cf7497396493aef369a9e275900a86e7 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Wed, 12 Jun 2024 14:46:16 +0000
Subject: [PATCH v2 1/2] Use WALReadFromBuffers in more places

Commit 91f2cae introduced WALReadFromBuffers, and used it only for
physical replication walsenders. There are couple of other callers
that use read_local_xlog_page page_read callback and logical
replication walsenders can also benefit reading WAL from WAL
buffers using the new function. This commit uses the new function
for these callers.

Author: Bharath Rupireddy
Reviewed-by: Jingtang Zhang, Nitin Jadhav
Discussion: https://www.postgresql.org/message-id/CALj2ACVfF2Uj9NoFy-5m98HNtjHpuD17EDE9twVeJng-jTAe7A%40mail.gmail.com
---
 src/backend/access/transam/xlogutils.c | 10 +++++++++-
 src/backend/replication/walsender.c    | 13 ++++++++++---
 2 files changed, 19 insertions(+), 4 deletions(-)

diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index 5295b85fe0..24a7ef0479 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -892,6 +892,7 @@ read_local_xlog_page_guts(XLogReaderState *state, XLogRecPtr targetPagePtr,
 	int			count;
 	WALReadError errinfo;
 	TimeLineID	currTLI;
+	Size		rbytes;
 
 	loc = targetPagePtr + reqLen;
 
@@ -1004,7 +1005,14 @@ read_local_xlog_page_guts(XLogReaderState *state, XLogRecPtr targetPagePtr,
 		count = read_upto - targetPagePtr;
 	}
 
-	if (!WALRead(state, cur_page, targetPagePtr, count, tli,
+	/* attempt to read WAL from WAL buffers first */
+	rbytes = WALReadFromBuffers(cur_page, targetPagePtr, count, currTLI);
+	cur_page += rbytes;
+	targetPagePtr += rbytes;
+
+	/* now read the remaining WAL from WAL file */
+	if ((count - rbytes) > 0 &&
+		!WALRead(state, cur_page, targetPagePtr, (count - rbytes), tli,
 				 &errinfo))
 		WALReadRaiseError(&errinfo);
 
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index c623b07cf0..bd0decef3d 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1056,6 +1056,7 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
 	WALReadError errinfo;
 	XLogSegNo	segno;
 	TimeLineID	currTLI;
+	Size		rbytes;
 
 	/*
 	 * Make sure we have enough WAL available before retrieving the current
@@ -1092,11 +1093,17 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
 	else
 		count = flushptr - targetPagePtr;	/* part of the page available */
 
-	/* now actually read the data, we know it's there */
-	if (!WALRead(state,
+	/* attempt to read WAL from WAL buffers first */
+	rbytes = WALReadFromBuffers(cur_page, targetPagePtr, count, currTLI);
+	cur_page += rbytes;
+	targetPagePtr += rbytes;
+
+	/* now read the remaining WAL from WAL file */
+	if ((count - rbytes) > 0 &&
+		!WALRead(state,
 				 cur_page,
 				 targetPagePtr,
-				 count,
+				 (count - rbytes),
 				 currTLI,		/* Pass the current TLI because only
 								 * WalSndSegmentOpen controls whether new TLI
 								 * is needed. */
-- 
2.34.1

From 6aca7856208907d2fbde58d507e4a48319a614ad Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Wed, 12 Jun 2024 14:55:31 +0000
Subject: [PATCH v2 2/2] Add test module to demonstrate reading from WAL
 buffers patterns.

his commit adds a test module to demonstrate a few patterns for
reading from WAL buffers using WALReadFromBuffers added by commit
91f2cae7a4e.

1. This module contains a test function to read the WAL that's
fully copied to WAL buffers. Whether or not the WAL is fully
copied to WAL buffers is ensured by WaitXLogInsertionsToFinish
before WALReadFromBuffers.

2. This module contains an implementation of xlogreader page_read
callback to read unflushed/not-yet-flushed WAL directly from WAL
buffers.

Author: Bharath Rupireddy
Discussion: https://www.postgresql.org/message-id/CAFiTN-sE7CJn-ZFj%2B-0Wv6TNytv_fp4n%2BeCszspxJ3mt77t5ig%40mail.gmail.com
Discussion: https://www.postgresql.org/message-id/CALj2ACVfF2Uj9NoFy-5m98HNtjHpuD17EDE9twVeJng-jTAe7A%40mail.gmail.com
---
 src/backend/access/transam/xlog.c             |   3 +-
 src/backend/access/transam/xlogreader.c       |   3 +-
 src/include/access/xlog.h                     |   1 +
 src/test/modules/Makefile                     |   1 +
 src/test/modules/meson.build                  |   1 +
 .../modules/read_wal_from_buffers/.gitignore  |   4 +
 .../modules/read_wal_from_buffers/Makefile    |  23 ++
 .../modules/read_wal_from_buffers/meson.build |  33 ++
 .../read_wal_from_buffers--1.0.sql            |  37 ++
 .../read_wal_from_buffers.c                   | 318 ++++++++++++++++++
 .../read_wal_from_buffers.control             |   4 +
 .../read_wal_from_buffers/t/001_basic.pl      | 111 ++++++
 12 files changed, 536 insertions(+), 3 deletions(-)
 create mode 100644 src/test/modules/read_wal_from_buffers/.gitignore
 create mode 100644 src/test/modules/read_wal_from_buffers/Makefile
 create mode 100644 src/test/modules/read_wal_from_buffers/meson.build
 create mode 100644 src/test/modules/read_wal_from_buffers/read_wal_from_buffers--1.0.sql
 create mode 100644 src/test/modules/read_wal_from_buffers/read_wal_from_buffers.c
 create mode 100644 src/test/modules/read_wal_from_buffers/read_wal_from_buffers.control
 create mode 100644 src/test/modules/read_wal_from_buffers/t/001_basic.pl

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 330e058c5f..7e2f787743 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -699,7 +699,6 @@ static void ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos,
 									  XLogRecPtr *EndPos, XLogRecPtr *PrevPtr);
 static bool ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos,
 							  XLogRecPtr *PrevPtr);
-static XLogRecPtr WaitXLogInsertionsToFinish(XLogRecPtr upto);
 static char *GetXLogBuffer(XLogRecPtr ptr, TimeLineID tli);
 static XLogRecPtr XLogBytePosToRecPtr(uint64 bytepos);
 static XLogRecPtr XLogBytePosToEndRecPtr(uint64 bytepos);
@@ -1495,7 +1494,7 @@ WALInsertLockUpdateInsertingAt(XLogRecPtr insertingAt)
  * uninitialized page), and the inserter might need to evict an old WAL buffer
  * to make room for a new one, which in turn requires WALWriteLock.
  */
-static XLogRecPtr
+XLogRecPtr
 WaitXLogInsertionsToFinish(XLogRecPtr upto)
 {
 	uint64		bytepos;
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 37d2a57961..12dddf64cc 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -1033,7 +1033,8 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
 	 * record is.  This is so that we can check the additional identification
 	 * info that is present in the first page's "long" header.
 	 */
-	if (targetSegNo != state->seg.ws_segno && targetPageOff != 0)
+	if (state->seg.ws_segno != 0 &&
+		targetSegNo != state->seg.ws_segno && targetPageOff != 0)
 	{
 		XLogRecPtr	targetSegmentPtr = pageptr - targetPageOff;
 
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 1a1f11a943..36bf90cf58 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -252,6 +252,7 @@ extern XLogRecPtr GetLastImportantRecPtr(void);
 
 extern void SetWalWriterSleeping(bool sleeping);
 
+extern XLogRecPtr WaitXLogInsertionsToFinish(XLogRecPtr upto);
 extern Size WALReadFromBuffers(char *dstbuf, XLogRecPtr startptr, Size count,
 							   TimeLineID tli);
 
diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile
index 256799f520..c39b407e5b 100644
--- a/src/test/modules/Makefile
+++ b/src/test/modules/Makefile
@@ -12,6 +12,7 @@ SUBDIRS = \
 		  dummy_seclabel \
 		  libpq_pipeline \
 		  plsample \
+		  read_wal_from_buffers \
 		  spgist_name_ops \
 		  test_bloomfilter \
 		  test_copy_callbacks \
diff --git a/src/test/modules/meson.build b/src/test/modules/meson.build
index d8fe059d23..222fa1cd72 100644
--- a/src/test/modules/meson.build
+++ b/src/test/modules/meson.build
@@ -10,6 +10,7 @@ subdir('injection_points')
 subdir('ldap_password_func')
 subdir('libpq_pipeline')
 subdir('plsample')
+subdir('read_wal_from_buffers')
 subdir('spgist_name_ops')
 subdir('ssl_passphrase_callback')
 subdir('test_bloomfilter')
diff --git a/src/test/modules/read_wal_from_buffers/.gitignore b/src/test/modules/read_wal_from_buffers/.gitignore
new file mode 100644
index 0000000000..5dcb3ff972
--- /dev/null
+++ b/src/test/modules/read_wal_from_buffers/.gitignore
@@ -0,0 +1,4 @@
+# Generated subdirectories
+/log/
+/results/
+/tmp_check/
diff --git a/src/test/modules/read_wal_from_buffers/Makefile b/src/test/modules/read_wal_from_buffers/Makefile
new file mode 100644
index 0000000000..9e57a837f9
--- /dev/null
+++ b/src/test/modules/read_wal_from_buffers/Makefile
@@ -0,0 +1,23 @@
+# src/test/modules/read_wal_from_buffers/Makefile
+
+MODULE_big = read_wal_from_buffers
+OBJS = \
+	$(WIN32RES) \
+	read_wal_from_buffers.o
+PGFILEDESC = "read_wal_from_buffers - test module to read WAL from WAL buffers"
+
+EXTENSION = read_wal_from_buffers
+DATA = read_wal_from_buffers--1.0.sql
+
+TAP_TESTS = 1
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = src/test/modules/read_wal_from_buffers
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/src/test/modules/read_wal_from_buffers/meson.build b/src/test/modules/read_wal_from_buffers/meson.build
new file mode 100644
index 0000000000..3fac00d616
--- /dev/null
+++ b/src/test/modules/read_wal_from_buffers/meson.build
@@ -0,0 +1,33 @@
+# Copyright (c) 2024, PostgreSQL Global Development Group
+
+read_wal_from_buffers_sources = files(
+  'read_wal_from_buffers.c',
+)
+
+if host_system == 'windows'
+  read_wal_from_buffers_sources += rc_lib_gen.process(win32ver_rc, extra_args: [
+    '--NAME', 'read_wal_from_buffers',
+    '--FILEDESC', 'read_wal_from_buffers - test module to read WAL from WAL buffers',])
+endif
+
+read_wal_from_buffers = shared_module('read_wal_from_buffers',
+  read_wal_from_buffers_sources,
+  kwargs: pg_test_mod_args,
+)
+test_install_libs += read_wal_from_buffers
+
+test_install_data += files(
+  'read_wal_from_buffers.control',
+  'read_wal_from_buffers--1.0.sql',
+)
+
+tests += {
+  'name': 'read_wal_from_buffers',
+  'sd': meson.current_source_dir(),
+  'bd': meson.current_build_dir(),
+  'tap': {
+    'tests': [
+      't/001_basic.pl',
+    ],
+  },
+}
diff --git a/src/test/modules/read_wal_from_buffers/read_wal_from_buffers--1.0.sql b/src/test/modules/read_wal_from_buffers/read_wal_from_buffers--1.0.sql
new file mode 100644
index 0000000000..72d05522fc
--- /dev/null
+++ b/src/test/modules/read_wal_from_buffers/read_wal_from_buffers--1.0.sql
@@ -0,0 +1,37 @@
+/* src/test/modules/read_wal_from_buffers/read_wal_from_buffers--1.0.sql */
+
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "CREATE EXTENSION read_wal_from_buffers" to load this file. \quit
+
+--
+-- read_wal_from_buffers()
+--
+-- SQL function to read WAL from WAL buffers. Returns number of bytes read.
+--
+CREATE FUNCTION read_wal_from_buffers(IN lsn pg_lsn, IN bytes_to_read int,
+    bytes_read OUT int)
+AS 'MODULE_PATHNAME', 'read_wal_from_buffers'
+LANGUAGE C STRICT;
+
+--
+-- get_wal_records_info_from_buffers()
+--
+-- SQL function to get info of WAL records available in WAL buffers.
+--
+CREATE FUNCTION get_wal_records_info_from_buffers(IN start_lsn pg_lsn,
+    IN end_lsn pg_lsn,
+    OUT start_lsn pg_lsn,
+    OUT end_lsn pg_lsn,
+    OUT prev_lsn pg_lsn,
+    OUT xid xid,
+    OUT resource_manager text,
+    OUT record_type text,
+    OUT record_length int4,
+    OUT main_data_length int4,
+    OUT fpi_length int4,
+    OUT description text,
+    OUT block_ref text
+)
+RETURNS SETOF record
+AS 'MODULE_PATHNAME', 'get_wal_records_info_from_buffers'
+LANGUAGE C STRICT PARALLEL SAFE;
diff --git a/src/test/modules/read_wal_from_buffers/read_wal_from_buffers.c b/src/test/modules/read_wal_from_buffers/read_wal_from_buffers.c
new file mode 100644
index 0000000000..ed33a14127
--- /dev/null
+++ b/src/test/modules/read_wal_from_buffers/read_wal_from_buffers.c
@@ -0,0 +1,318 @@
+/*--------------------------------------------------------------------------
+ *
+ * read_wal_from_buffers.c
+ *		Test module to read WAL from WAL buffers.
+ *
+ * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *	src/test/modules/read_wal_from_buffers/read_wal_from_buffers.c
+ * -------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/xlog.h"
+#include "access/xlog_internal.h"
+#include "access/xlogreader.h"
+#include "access/xlogrecovery.h"
+#include "funcapi.h"
+#include "miscadmin.h"
+#include "utils/builtins.h"
+#include "utils/pg_lsn.h"
+
+PG_MODULE_MAGIC;
+
+static int	read_from_wal_buffers(XLogReaderState *state, XLogRecPtr targetPagePtr,
+								  int reqLen, XLogRecPtr targetRecPtr,
+								  char *cur_page);
+
+static XLogRecord *ReadNextXLogRecord(XLogReaderState *xlogreader);
+static void GetWALRecordInfo(XLogReaderState *record, Datum *values,
+							 bool *nulls, uint32 ncols);
+static void GetWALRecordsInfo(FunctionCallInfo fcinfo,
+							  XLogRecPtr start_lsn,
+							  XLogRecPtr end_lsn);
+
+/*
+ * SQL function to read WAL from WAL buffers. Returns number of bytes read.
+ */
+PG_FUNCTION_INFO_V1(read_wal_from_buffers);
+Datum
+read_wal_from_buffers(PG_FUNCTION_ARGS)
+{
+	XLogRecPtr	startptr = PG_GETARG_LSN(0);
+	int32		count = PG_GETARG_INT32(1);
+	Size		read;
+	char	   *data = palloc0(count);
+	XLogRecPtr	upto = startptr + count;
+	XLogRecPtr	insert_pos = GetXLogInsertRecPtr();
+	TimeLineID	tli = GetWALInsertionTimeLine();
+
+	/*
+	 * The requested WAL may be very recent, so wait for any in-progress WAL
+	 * insertions to WAL buffers to finish.
+	 */
+	if (upto > insert_pos)
+	{
+		XLogRecPtr	writtenUpto = WaitXLogInsertionsToFinish(upto);
+
+		upto = Min(upto, writtenUpto);
+		count = upto - startptr;
+	}
+
+	read = WALReadFromBuffers(data, startptr, count, tli);
+
+	pfree(data);
+
+	PG_RETURN_INT32(read);
+}
+
+/*
+ * XLogReaderRoutine->page_read callback for reading WAL from WAL buffers.
+ */
+static int
+read_from_wal_buffers(XLogReaderState *state, XLogRecPtr targetPagePtr,
+					  int reqLen, XLogRecPtr targetRecPtr,
+					  char *cur_page)
+{
+	XLogRecPtr	read_upto,
+				loc;
+	TimeLineID	tli = GetWALInsertionTimeLine();
+	Size		count;
+	Size		read = 0;
+
+	loc = targetPagePtr + reqLen;
+
+	/* Loop waiting for xlog to be available if necessary */
+	while (1)
+	{
+		read_upto = GetXLogInsertRecPtr();
+
+		if (loc <= read_upto)
+			break;
+
+		WaitXLogInsertionsToFinish(loc);
+
+		CHECK_FOR_INTERRUPTS();
+		pg_usleep(1000L);
+	}
+
+	if (targetPagePtr + XLOG_BLCKSZ <= read_upto)
+	{
+		/*
+		 * more than one block available; read only that block, have caller
+		 * come back if they need more.
+		 */
+		count = XLOG_BLCKSZ;
+	}
+	else if (targetPagePtr + reqLen > read_upto)
+	{
+		/* not enough data there */
+		return -1;
+	}
+	else
+	{
+		/* enough bytes available to satisfy the request */
+		count = read_upto - targetPagePtr;
+	}
+
+	/* read WAL from WAL buffers */
+	read = WALReadFromBuffers(cur_page, targetPagePtr, count, tli);
+
+	if (read != count)
+		ereport(ERROR,
+				errmsg("could not read fully from WAL buffers; expected %lu, read %lu",
+					   count, read));
+
+	return count;
+}
+
+/*
+ * Get info of all WAL records between start LSN and end LSN.
+ *
+ * This function and its helpers below are similar to pg_walinspect's
+ * pg_get_wal_records_info() except that it will get info of WAL records
+ * available in WAL buffers.
+ */
+PG_FUNCTION_INFO_V1(get_wal_records_info_from_buffers);
+Datum
+get_wal_records_info_from_buffers(PG_FUNCTION_ARGS)
+{
+	XLogRecPtr	start_lsn = PG_GETARG_LSN(0);
+	XLogRecPtr	end_lsn = PG_GETARG_LSN(1);
+
+	/*
+	 * Validate start and end LSNs coming from the function inputs.
+	 *
+	 * Reading WAL below the first page of the first segments isn't allowed.
+	 * This is a bootstrap WAL page and the page_read callback fails to read
+	 * it.
+	 */
+	if (start_lsn < XLOG_BLCKSZ)
+		ereport(ERROR,
+				(errmsg("could not read WAL at LSN %X/%X",
+						LSN_FORMAT_ARGS(start_lsn))));
+
+	if (start_lsn > end_lsn)
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("WAL start LSN must be less than end LSN")));
+
+	GetWALRecordsInfo(fcinfo, start_lsn, end_lsn);
+
+	PG_RETURN_VOID();
+}
+
+/*
+ * Read next WAL record.
+ */
+static XLogRecord *
+ReadNextXLogRecord(XLogReaderState *xlogreader)
+{
+	XLogRecord *record;
+	char	   *errormsg;
+
+	record = XLogReadRecord(xlogreader, &errormsg);
+
+	if (record == NULL)
+	{
+		if (errormsg)
+			ereport(ERROR,
+					errmsg("could not read WAL at %X/%X: %s",
+						   LSN_FORMAT_ARGS(xlogreader->EndRecPtr), errormsg));
+		else
+			ereport(ERROR,
+					errmsg("could not read WAL at %X/%X",
+						   LSN_FORMAT_ARGS(xlogreader->EndRecPtr)));
+	}
+
+	return record;
+}
+
+/*
+ * Output values that make up a row describing caller's WAL record.
+ */
+static void
+GetWALRecordInfo(XLogReaderState *record, Datum *values,
+				 bool *nulls, uint32 ncols)
+{
+	const char *record_type;
+	RmgrData	desc;
+	uint32		fpi_len = 0;
+	StringInfoData rec_desc;
+	StringInfoData rec_blk_ref;
+	int			i = 0;
+
+	desc = GetRmgr(XLogRecGetRmid(record));
+	record_type = desc.rm_identify(XLogRecGetInfo(record));
+
+	if (record_type == NULL)
+		record_type = psprintf("UNKNOWN (%x)", XLogRecGetInfo(record) & ~XLR_INFO_MASK);
+
+	initStringInfo(&rec_desc);
+	desc.rm_desc(&rec_desc, record);
+
+	if (XLogRecHasAnyBlockRefs(record))
+	{
+		initStringInfo(&rec_blk_ref);
+		XLogRecGetBlockRefInfo(record, false, true, &rec_blk_ref, &fpi_len);
+	}
+
+	values[i++] = LSNGetDatum(record->ReadRecPtr);
+	values[i++] = LSNGetDatum(record->EndRecPtr);
+	values[i++] = LSNGetDatum(XLogRecGetPrev(record));
+	values[i++] = TransactionIdGetDatum(XLogRecGetXid(record));
+	values[i++] = CStringGetTextDatum(desc.rm_name);
+	values[i++] = CStringGetTextDatum(record_type);
+	values[i++] = UInt32GetDatum(XLogRecGetTotalLen(record));
+	values[i++] = UInt32GetDatum(XLogRecGetDataLen(record));
+	values[i++] = UInt32GetDatum(fpi_len);
+
+	if (rec_desc.len > 0)
+		values[i++] = CStringGetTextDatum(rec_desc.data);
+	else
+		nulls[i++] = true;
+
+	if (XLogRecHasAnyBlockRefs(record))
+		values[i++] = CStringGetTextDatum(rec_blk_ref.data);
+	else
+		nulls[i++] = true;
+
+	Assert(i == ncols);
+}
+
+/*
+ * Get info of all WAL records between start LSN and end LSN.
+ */
+static void
+GetWALRecordsInfo(FunctionCallInfo fcinfo, XLogRecPtr start_lsn,
+				  XLogRecPtr end_lsn)
+{
+#define GET_WAL_RECORDS_INFO_FROM_BUFFERS_COLS 11
+	XLogReaderState *xlogreader;
+	XLogRecPtr	first_valid_record;
+	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+	MemoryContext old_cxt;
+	MemoryContext tmp_cxt;
+
+	Assert(start_lsn <= end_lsn);
+
+	InitMaterializedSRF(fcinfo, 0);
+
+	xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
+									XL_ROUTINE(.page_read = &read_from_wal_buffers,
+											   .segment_open = NULL,
+											   .segment_close = NULL),
+									NULL);
+
+	if (xlogreader == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_OUT_OF_MEMORY),
+				 errmsg("out of memory"),
+				 errdetail("Failed while allocating a WAL reading processor.")));
+
+	/* first find a valid recptr to start from */
+	first_valid_record = XLogFindNextRecord(xlogreader, start_lsn);
+
+	if (XLogRecPtrIsInvalid(first_valid_record))
+	{
+		ereport(LOG,
+				(errmsg("could not find a valid record after %X/%X",
+						LSN_FORMAT_ARGS(start_lsn))));
+
+		return;
+	}
+
+	tmp_cxt = AllocSetContextCreate(CurrentMemoryContext,
+									"GetWALRecordsInfo temporary cxt",
+									ALLOCSET_DEFAULT_SIZES);
+
+	while (ReadNextXLogRecord(xlogreader) &&
+		   xlogreader->EndRecPtr <= end_lsn)
+	{
+		Datum		values[GET_WAL_RECORDS_INFO_FROM_BUFFERS_COLS] = {0};
+		bool		nulls[GET_WAL_RECORDS_INFO_FROM_BUFFERS_COLS] = {0};
+
+		/* Use the tmp context so we can clean up after each tuple is done */
+		old_cxt = MemoryContextSwitchTo(tmp_cxt);
+
+		GetWALRecordInfo(xlogreader, values, nulls,
+						 GET_WAL_RECORDS_INFO_FROM_BUFFERS_COLS);
+
+		tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
+							 values, nulls);
+
+		/* clean up and switch back */
+		MemoryContextSwitchTo(old_cxt);
+		MemoryContextReset(tmp_cxt);
+
+		CHECK_FOR_INTERRUPTS();
+	}
+
+	MemoryContextDelete(tmp_cxt);
+	XLogReaderFree(xlogreader);
+
+#undef GET_WAL_RECORDS_INFO_FROM_BUFFERS_COLS
+}
diff --git a/src/test/modules/read_wal_from_buffers/read_wal_from_buffers.control b/src/test/modules/read_wal_from_buffers/read_wal_from_buffers.control
new file mode 100644
index 0000000000..b14d24751c
--- /dev/null
+++ b/src/test/modules/read_wal_from_buffers/read_wal_from_buffers.control
@@ -0,0 +1,4 @@
+comment = 'Test module to read WAL from WAL buffers'
+default_version = '1.0'
+module_pathname = '$libdir/read_wal_from_buffers'
+relocatable = true
diff --git a/src/test/modules/read_wal_from_buffers/t/001_basic.pl b/src/test/modules/read_wal_from_buffers/t/001_basic.pl
new file mode 100644
index 0000000000..15ef550c8c
--- /dev/null
+++ b/src/test/modules/read_wal_from_buffers/t/001_basic.pl
@@ -0,0 +1,111 @@
+# Copyright (c) 2021-2023, PostgreSQL Global Development Group
+
+use strict;
+use warnings;
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+use Time::HiRes qw(usleep);
+
+# Setup a new node.  The configuration chosen here minimizes the number
+# of arbitrary records that could get generated in a cluster.  Enlarging
+# checkpoint_timeout avoids noise with checkpoint activity.  wal_level
+# set to "minimal" avoids random standby snapshot records.  Autovacuum
+# could also trigger randomly, generating random WAL activity of its own.
+# Enlarging wal_writer_delay and wal_writer_flush_after avoid background
+# wal flush by walwriter.
+my $node = PostgreSQL::Test::Cluster->new("node");
+$node->init;
+$node->append_conf(
+	'postgresql.conf',
+	q[wal_level = minimal
+	  autovacuum = off
+	  checkpoint_timeout = '30min'
+	  wal_writer_delay = 10000ms
+	  wal_writer_flush_after = 1GB
+]);
+$node->start;
+
+# Setup.
+$node->safe_psql('postgres', 'CREATE EXTENSION read_wal_from_buffers;');
+
+$node->safe_psql('postgres', 'CREATE TABLE t (c int);');
+
+my $result = 0;
+my $lsn;
+my $to_read;
+
+# Wait until we read from WAL buffers
+for (my $i = 0; $i < 10 * $PostgreSQL::Test::Utils::timeout_default; $i++)
+{
+	# Get current insert LSN. After this, we generate some WAL which is guranteed
+	# to be in WAL buffers as there is no other WAL generating activity is
+	# happening on the server. We then verify if we can read the WAL from WAL
+	# buffers using this LSN.
+	$lsn =
+	  $node->safe_psql('postgres', 'SELECT pg_current_wal_insert_lsn();');
+
+	my $logstart = -s $node->logfile;
+
+	# Generate minimal WAL so that WAL buffers don't get overwritten.
+	$node->safe_psql('postgres', "INSERT INTO t VALUES ($i);");
+
+	$to_read = 8192;
+
+	my $res = $node->safe_psql('postgres',
+		qq{SELECT read_wal_from_buffers(lsn := '$lsn', bytes_to_read := $to_read) > 0;}
+	);
+
+	my $log = $node->log_contains(
+		"request to flush past end of generated WAL; request .*, current position .*",
+		$logstart);
+
+	if ($res eq 't' && $log > 0)
+	{
+		$result = 1;
+		last;
+	}
+
+	usleep(100_000);
+}
+ok($result, 'waited until WAL is successfully read from WAL buffers');
+
+$result = 0;
+
+# Wait until we get info of WAL records available in WAL buffers.
+for (my $i = 0; $i < 10 * $PostgreSQL::Test::Utils::timeout_default; $i++)
+{
+	$node->safe_psql('postgres', "DROP TABLE IF EXISTS foo, bar;");
+	$node->safe_psql('postgres',
+		"CREATE TABLE foo AS SELECT * FROM generate_series(1, 2);");
+	my $start_lsn =
+	  $node->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn();");
+	my $tbl_oid = $node->safe_psql('postgres',
+		"SELECT oid FROM pg_class WHERE relname = 'foo';");
+	$node->safe_psql('postgres',
+		"INSERT INTO foo SELECT * FROM generate_series(1, 10);");
+	my $end_lsn =
+	  $node->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn();");
+	$node->safe_psql('postgres',
+		"CREATE TABLE bar AS SELECT * FROM generate_series(1, 2);");
+
+	my $res = $node->safe_psql(
+		'postgres',
+		"SELECT count(*) FROM get_wal_records_info_from_buffers('$start_lsn', '$end_lsn')
+					WHERE block_ref LIKE concat('%', '$tbl_oid', '%') AND
+						resource_manager = 'Heap' AND
+						record_type = 'INSERT';");
+
+	if ($res eq 10)
+	{
+		$result = 1;
+		last;
+	}
+
+	usleep(100_000);
+}
+ok($result,
+	'waited until we get info of WAL records available in WAL buffers.');
+
+done_testing();
-- 
2.34.1

Reply via email to