On Thu, Mar 21, 2024 at 11:33 PM Bharath Rupireddy
<bharath.rupireddyforpostg...@gmail.com> wrote:
>
> Please find the v26 patches after rebasing.

Commit f3ff7bf83b added a check in WALReadFromBuffers to ensure the
requested WAL is not past the WAL that's copied to WAL buffers. So,
I've dropped v26-0001 patch.

I've attached v27 patches for further review.

0001 adds a test module to demonstrate reading from WAL buffers
patterns like the caller ensuring the requested WAL is fully copied to
WAL buffers using WaitXLogInsertionsToFinish and an implementation of
xlogreader page_read
callback to read unflushed/not-yet-flushed WAL directly from WAL buffers.

0002 Use WALReadFromBuffers in more places like for logical
walsenders, logical decoding functions, backends reading WAL.

--
Bharath Rupireddy
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com
From 77c4cc3320a9c2982b5fdac9bd51a892ca644fae Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Mon, 8 Apr 2024 05:02:07 +0000
Subject: [PATCH v27 1/2] Add test module to demonstrate reading from WAL 
 buffers patterns

This commit adds a test module to demonstrate a few patterns for
reading from WAL buffers using WALReadFromBuffers added by commit
91f2cae7a4e.

1. This module contains a test function to read the WAL that's
fully copied to WAL buffers. Whether or not the WAL is fully
copied to WAL buffers is ensured by WaitXLogInsertionsToFinish
before WALReadFromBuffers.

2. This module contains an implementation of xlogreader page_read
callback to read unflushed/not-yet-flushed WAL directly from WAL
buffers.
---
 src/backend/access/transam/xlog.c             |   3 +-
 src/backend/access/transam/xlogreader.c       |   3 +-
 src/include/access/xlog.h                     |   1 +
 src/test/modules/Makefile                     |   1 +
 src/test/modules/meson.build                  |   1 +
 .../modules/read_wal_from_buffers/.gitignore  |   4 +
 .../modules/read_wal_from_buffers/Makefile    |  23 ++
 .../modules/read_wal_from_buffers/meson.build |  33 ++
 .../read_wal_from_buffers--1.0.sql            |  37 ++
 .../read_wal_from_buffers.c                   | 318 ++++++++++++++++++
 .../read_wal_from_buffers.control             |   4 +
 .../read_wal_from_buffers/t/001_basic.pl      | 111 ++++++
 12 files changed, 536 insertions(+), 3 deletions(-)
 create mode 100644 src/test/modules/read_wal_from_buffers/.gitignore
 create mode 100644 src/test/modules/read_wal_from_buffers/Makefile
 create mode 100644 src/test/modules/read_wal_from_buffers/meson.build
 create mode 100644 src/test/modules/read_wal_from_buffers/read_wal_from_buffers--1.0.sql
 create mode 100644 src/test/modules/read_wal_from_buffers/read_wal_from_buffers.c
 create mode 100644 src/test/modules/read_wal_from_buffers/read_wal_from_buffers.control
 create mode 100644 src/test/modules/read_wal_from_buffers/t/001_basic.pl

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index e3fb26f5ab..4d4a840c8e 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -700,7 +700,6 @@ static void ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos,
 									  XLogRecPtr *EndPos, XLogRecPtr *PrevPtr);
 static bool ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos,
 							  XLogRecPtr *PrevPtr);
-static XLogRecPtr WaitXLogInsertionsToFinish(XLogRecPtr upto);
 static char *GetXLogBuffer(XLogRecPtr ptr, TimeLineID tli);
 static XLogRecPtr XLogBytePosToRecPtr(uint64 bytepos);
 static XLogRecPtr XLogBytePosToEndRecPtr(uint64 bytepos);
@@ -1496,7 +1495,7 @@ WALInsertLockUpdateInsertingAt(XLogRecPtr insertingAt)
  * uninitialized page), and the inserter might need to evict an old WAL buffer
  * to make room for a new one, which in turn requires WALWriteLock.
  */
-static XLogRecPtr
+XLogRecPtr
 WaitXLogInsertionsToFinish(XLogRecPtr upto)
 {
 	uint64		bytepos;
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 37d2a57961..12dddf64cc 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -1033,7 +1033,8 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
 	 * record is.  This is so that we can check the additional identification
 	 * info that is present in the first page's "long" header.
 	 */
-	if (targetSegNo != state->seg.ws_segno && targetPageOff != 0)
+	if (state->seg.ws_segno != 0 &&
+		targetSegNo != state->seg.ws_segno && targetPageOff != 0)
 	{
 		XLogRecPtr	targetSegmentPtr = pageptr - targetPageOff;
 
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 76787a8267..74606a6846 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -252,6 +252,7 @@ extern XLogRecPtr GetLastImportantRecPtr(void);
 
 extern void SetWalWriterSleeping(bool sleeping);
 
+extern XLogRecPtr WaitXLogInsertionsToFinish(XLogRecPtr upto);
 extern Size WALReadFromBuffers(char *dstbuf, XLogRecPtr startptr, Size count,
 							   TimeLineID tli);
 
diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile
index 256799f520..c39b407e5b 100644
--- a/src/test/modules/Makefile
+++ b/src/test/modules/Makefile
@@ -12,6 +12,7 @@ SUBDIRS = \
 		  dummy_seclabel \
 		  libpq_pipeline \
 		  plsample \
+		  read_wal_from_buffers \
 		  spgist_name_ops \
 		  test_bloomfilter \
 		  test_copy_callbacks \
diff --git a/src/test/modules/meson.build b/src/test/modules/meson.build
index d8fe059d23..222fa1cd72 100644
--- a/src/test/modules/meson.build
+++ b/src/test/modules/meson.build
@@ -10,6 +10,7 @@ subdir('injection_points')
 subdir('ldap_password_func')
 subdir('libpq_pipeline')
 subdir('plsample')
+subdir('read_wal_from_buffers')
 subdir('spgist_name_ops')
 subdir('ssl_passphrase_callback')
 subdir('test_bloomfilter')
diff --git a/src/test/modules/read_wal_from_buffers/.gitignore b/src/test/modules/read_wal_from_buffers/.gitignore
new file mode 100644
index 0000000000..5dcb3ff972
--- /dev/null
+++ b/src/test/modules/read_wal_from_buffers/.gitignore
@@ -0,0 +1,4 @@
+# Generated subdirectories
+/log/
+/results/
+/tmp_check/
diff --git a/src/test/modules/read_wal_from_buffers/Makefile b/src/test/modules/read_wal_from_buffers/Makefile
new file mode 100644
index 0000000000..9e57a837f9
--- /dev/null
+++ b/src/test/modules/read_wal_from_buffers/Makefile
@@ -0,0 +1,23 @@
+# src/test/modules/read_wal_from_buffers/Makefile
+
+MODULE_big = read_wal_from_buffers
+OBJS = \
+	$(WIN32RES) \
+	read_wal_from_buffers.o
+PGFILEDESC = "read_wal_from_buffers - test module to read WAL from WAL buffers"
+
+EXTENSION = read_wal_from_buffers
+DATA = read_wal_from_buffers--1.0.sql
+
+TAP_TESTS = 1
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = src/test/modules/read_wal_from_buffers
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/src/test/modules/read_wal_from_buffers/meson.build b/src/test/modules/read_wal_from_buffers/meson.build
new file mode 100644
index 0000000000..3fac00d616
--- /dev/null
+++ b/src/test/modules/read_wal_from_buffers/meson.build
@@ -0,0 +1,33 @@
+# Copyright (c) 2024, PostgreSQL Global Development Group
+
+read_wal_from_buffers_sources = files(
+  'read_wal_from_buffers.c',
+)
+
+if host_system == 'windows'
+  read_wal_from_buffers_sources += rc_lib_gen.process(win32ver_rc, extra_args: [
+    '--NAME', 'read_wal_from_buffers',
+    '--FILEDESC', 'read_wal_from_buffers - test module to read WAL from WAL buffers',])
+endif
+
+read_wal_from_buffers = shared_module('read_wal_from_buffers',
+  read_wal_from_buffers_sources,
+  kwargs: pg_test_mod_args,
+)
+test_install_libs += read_wal_from_buffers
+
+test_install_data += files(
+  'read_wal_from_buffers.control',
+  'read_wal_from_buffers--1.0.sql',
+)
+
+tests += {
+  'name': 'read_wal_from_buffers',
+  'sd': meson.current_source_dir(),
+  'bd': meson.current_build_dir(),
+  'tap': {
+    'tests': [
+      't/001_basic.pl',
+    ],
+  },
+}
diff --git a/src/test/modules/read_wal_from_buffers/read_wal_from_buffers--1.0.sql b/src/test/modules/read_wal_from_buffers/read_wal_from_buffers--1.0.sql
new file mode 100644
index 0000000000..72d05522fc
--- /dev/null
+++ b/src/test/modules/read_wal_from_buffers/read_wal_from_buffers--1.0.sql
@@ -0,0 +1,37 @@
+/* src/test/modules/read_wal_from_buffers/read_wal_from_buffers--1.0.sql */
+
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "CREATE EXTENSION read_wal_from_buffers" to load this file. \quit
+
+--
+-- read_wal_from_buffers()
+--
+-- SQL function to read WAL from WAL buffers. Returns number of bytes read.
+--
+CREATE FUNCTION read_wal_from_buffers(IN lsn pg_lsn, IN bytes_to_read int,
+    bytes_read OUT int)
+AS 'MODULE_PATHNAME', 'read_wal_from_buffers'
+LANGUAGE C STRICT;
+
+--
+-- get_wal_records_info_from_buffers()
+--
+-- SQL function to get info of WAL records available in WAL buffers.
+--
+CREATE FUNCTION get_wal_records_info_from_buffers(IN start_lsn pg_lsn,
+    IN end_lsn pg_lsn,
+    OUT start_lsn pg_lsn,
+    OUT end_lsn pg_lsn,
+    OUT prev_lsn pg_lsn,
+    OUT xid xid,
+    OUT resource_manager text,
+    OUT record_type text,
+    OUT record_length int4,
+    OUT main_data_length int4,
+    OUT fpi_length int4,
+    OUT description text,
+    OUT block_ref text
+)
+RETURNS SETOF record
+AS 'MODULE_PATHNAME', 'get_wal_records_info_from_buffers'
+LANGUAGE C STRICT PARALLEL SAFE;
diff --git a/src/test/modules/read_wal_from_buffers/read_wal_from_buffers.c b/src/test/modules/read_wal_from_buffers/read_wal_from_buffers.c
new file mode 100644
index 0000000000..ed33a14127
--- /dev/null
+++ b/src/test/modules/read_wal_from_buffers/read_wal_from_buffers.c
@@ -0,0 +1,318 @@
+/*--------------------------------------------------------------------------
+ *
+ * read_wal_from_buffers.c
+ *		Test module to read WAL from WAL buffers.
+ *
+ * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *	src/test/modules/read_wal_from_buffers/read_wal_from_buffers.c
+ * -------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/xlog.h"
+#include "access/xlog_internal.h"
+#include "access/xlogreader.h"
+#include "access/xlogrecovery.h"
+#include "funcapi.h"
+#include "miscadmin.h"
+#include "utils/builtins.h"
+#include "utils/pg_lsn.h"
+
+PG_MODULE_MAGIC;
+
+static int	read_from_wal_buffers(XLogReaderState *state, XLogRecPtr targetPagePtr,
+								  int reqLen, XLogRecPtr targetRecPtr,
+								  char *cur_page);
+
+static XLogRecord *ReadNextXLogRecord(XLogReaderState *xlogreader);
+static void GetWALRecordInfo(XLogReaderState *record, Datum *values,
+							 bool *nulls, uint32 ncols);
+static void GetWALRecordsInfo(FunctionCallInfo fcinfo,
+							  XLogRecPtr start_lsn,
+							  XLogRecPtr end_lsn);
+
+/*
+ * SQL function to read WAL from WAL buffers. Returns number of bytes read.
+ */
+PG_FUNCTION_INFO_V1(read_wal_from_buffers);
+Datum
+read_wal_from_buffers(PG_FUNCTION_ARGS)
+{
+	XLogRecPtr	startptr = PG_GETARG_LSN(0);
+	int32		count = PG_GETARG_INT32(1);
+	Size		read;
+	char	   *data = palloc0(count);
+	XLogRecPtr	upto = startptr + count;
+	XLogRecPtr	insert_pos = GetXLogInsertRecPtr();
+	TimeLineID	tli = GetWALInsertionTimeLine();
+
+	/*
+	 * The requested WAL may be very recent, so wait for any in-progress WAL
+	 * insertions to WAL buffers to finish.
+	 */
+	if (upto > insert_pos)
+	{
+		XLogRecPtr	writtenUpto = WaitXLogInsertionsToFinish(upto);
+
+		upto = Min(upto, writtenUpto);
+		count = upto - startptr;
+	}
+
+	read = WALReadFromBuffers(data, startptr, count, tli);
+
+	pfree(data);
+
+	PG_RETURN_INT32(read);
+}
+
+/*
+ * XLogReaderRoutine->page_read callback for reading WAL from WAL buffers.
+ */
+static int
+read_from_wal_buffers(XLogReaderState *state, XLogRecPtr targetPagePtr,
+					  int reqLen, XLogRecPtr targetRecPtr,
+					  char *cur_page)
+{
+	XLogRecPtr	read_upto,
+				loc;
+	TimeLineID	tli = GetWALInsertionTimeLine();
+	Size		count;
+	Size		read = 0;
+
+	loc = targetPagePtr + reqLen;
+
+	/* Loop waiting for xlog to be available if necessary */
+	while (1)
+	{
+		read_upto = GetXLogInsertRecPtr();
+
+		if (loc <= read_upto)
+			break;
+
+		WaitXLogInsertionsToFinish(loc);
+
+		CHECK_FOR_INTERRUPTS();
+		pg_usleep(1000L);
+	}
+
+	if (targetPagePtr + XLOG_BLCKSZ <= read_upto)
+	{
+		/*
+		 * more than one block available; read only that block, have caller
+		 * come back if they need more.
+		 */
+		count = XLOG_BLCKSZ;
+	}
+	else if (targetPagePtr + reqLen > read_upto)
+	{
+		/* not enough data there */
+		return -1;
+	}
+	else
+	{
+		/* enough bytes available to satisfy the request */
+		count = read_upto - targetPagePtr;
+	}
+
+	/* read WAL from WAL buffers */
+	read = WALReadFromBuffers(cur_page, targetPagePtr, count, tli);
+
+	if (read != count)
+		ereport(ERROR,
+				errmsg("could not read fully from WAL buffers; expected %lu, read %lu",
+					   count, read));
+
+	return count;
+}
+
+/*
+ * Get info of all WAL records between start LSN and end LSN.
+ *
+ * This function and its helpers below are similar to pg_walinspect's
+ * pg_get_wal_records_info() except that it will get info of WAL records
+ * available in WAL buffers.
+ */
+PG_FUNCTION_INFO_V1(get_wal_records_info_from_buffers);
+Datum
+get_wal_records_info_from_buffers(PG_FUNCTION_ARGS)
+{
+	XLogRecPtr	start_lsn = PG_GETARG_LSN(0);
+	XLogRecPtr	end_lsn = PG_GETARG_LSN(1);
+
+	/*
+	 * Validate start and end LSNs coming from the function inputs.
+	 *
+	 * Reading WAL below the first page of the first segments isn't allowed.
+	 * This is a bootstrap WAL page and the page_read callback fails to read
+	 * it.
+	 */
+	if (start_lsn < XLOG_BLCKSZ)
+		ereport(ERROR,
+				(errmsg("could not read WAL at LSN %X/%X",
+						LSN_FORMAT_ARGS(start_lsn))));
+
+	if (start_lsn > end_lsn)
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("WAL start LSN must be less than end LSN")));
+
+	GetWALRecordsInfo(fcinfo, start_lsn, end_lsn);
+
+	PG_RETURN_VOID();
+}
+
+/*
+ * Read next WAL record.
+ */
+static XLogRecord *
+ReadNextXLogRecord(XLogReaderState *xlogreader)
+{
+	XLogRecord *record;
+	char	   *errormsg;
+
+	record = XLogReadRecord(xlogreader, &errormsg);
+
+	if (record == NULL)
+	{
+		if (errormsg)
+			ereport(ERROR,
+					errmsg("could not read WAL at %X/%X: %s",
+						   LSN_FORMAT_ARGS(xlogreader->EndRecPtr), errormsg));
+		else
+			ereport(ERROR,
+					errmsg("could not read WAL at %X/%X",
+						   LSN_FORMAT_ARGS(xlogreader->EndRecPtr)));
+	}
+
+	return record;
+}
+
+/*
+ * Output values that make up a row describing caller's WAL record.
+ */
+static void
+GetWALRecordInfo(XLogReaderState *record, Datum *values,
+				 bool *nulls, uint32 ncols)
+{
+	const char *record_type;
+	RmgrData	desc;
+	uint32		fpi_len = 0;
+	StringInfoData rec_desc;
+	StringInfoData rec_blk_ref;
+	int			i = 0;
+
+	desc = GetRmgr(XLogRecGetRmid(record));
+	record_type = desc.rm_identify(XLogRecGetInfo(record));
+
+	if (record_type == NULL)
+		record_type = psprintf("UNKNOWN (%x)", XLogRecGetInfo(record) & ~XLR_INFO_MASK);
+
+	initStringInfo(&rec_desc);
+	desc.rm_desc(&rec_desc, record);
+
+	if (XLogRecHasAnyBlockRefs(record))
+	{
+		initStringInfo(&rec_blk_ref);
+		XLogRecGetBlockRefInfo(record, false, true, &rec_blk_ref, &fpi_len);
+	}
+
+	values[i++] = LSNGetDatum(record->ReadRecPtr);
+	values[i++] = LSNGetDatum(record->EndRecPtr);
+	values[i++] = LSNGetDatum(XLogRecGetPrev(record));
+	values[i++] = TransactionIdGetDatum(XLogRecGetXid(record));
+	values[i++] = CStringGetTextDatum(desc.rm_name);
+	values[i++] = CStringGetTextDatum(record_type);
+	values[i++] = UInt32GetDatum(XLogRecGetTotalLen(record));
+	values[i++] = UInt32GetDatum(XLogRecGetDataLen(record));
+	values[i++] = UInt32GetDatum(fpi_len);
+
+	if (rec_desc.len > 0)
+		values[i++] = CStringGetTextDatum(rec_desc.data);
+	else
+		nulls[i++] = true;
+
+	if (XLogRecHasAnyBlockRefs(record))
+		values[i++] = CStringGetTextDatum(rec_blk_ref.data);
+	else
+		nulls[i++] = true;
+
+	Assert(i == ncols);
+}
+
+/*
+ * Get info of all WAL records between start LSN and end LSN.
+ */
+static void
+GetWALRecordsInfo(FunctionCallInfo fcinfo, XLogRecPtr start_lsn,
+				  XLogRecPtr end_lsn)
+{
+#define GET_WAL_RECORDS_INFO_FROM_BUFFERS_COLS 11
+	XLogReaderState *xlogreader;
+	XLogRecPtr	first_valid_record;
+	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+	MemoryContext old_cxt;
+	MemoryContext tmp_cxt;
+
+	Assert(start_lsn <= end_lsn);
+
+	InitMaterializedSRF(fcinfo, 0);
+
+	xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
+									XL_ROUTINE(.page_read = &read_from_wal_buffers,
+											   .segment_open = NULL,
+											   .segment_close = NULL),
+									NULL);
+
+	if (xlogreader == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_OUT_OF_MEMORY),
+				 errmsg("out of memory"),
+				 errdetail("Failed while allocating a WAL reading processor.")));
+
+	/* first find a valid recptr to start from */
+	first_valid_record = XLogFindNextRecord(xlogreader, start_lsn);
+
+	if (XLogRecPtrIsInvalid(first_valid_record))
+	{
+		ereport(LOG,
+				(errmsg("could not find a valid record after %X/%X",
+						LSN_FORMAT_ARGS(start_lsn))));
+
+		return;
+	}
+
+	tmp_cxt = AllocSetContextCreate(CurrentMemoryContext,
+									"GetWALRecordsInfo temporary cxt",
+									ALLOCSET_DEFAULT_SIZES);
+
+	while (ReadNextXLogRecord(xlogreader) &&
+		   xlogreader->EndRecPtr <= end_lsn)
+	{
+		Datum		values[GET_WAL_RECORDS_INFO_FROM_BUFFERS_COLS] = {0};
+		bool		nulls[GET_WAL_RECORDS_INFO_FROM_BUFFERS_COLS] = {0};
+
+		/* Use the tmp context so we can clean up after each tuple is done */
+		old_cxt = MemoryContextSwitchTo(tmp_cxt);
+
+		GetWALRecordInfo(xlogreader, values, nulls,
+						 GET_WAL_RECORDS_INFO_FROM_BUFFERS_COLS);
+
+		tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
+							 values, nulls);
+
+		/* clean up and switch back */
+		MemoryContextSwitchTo(old_cxt);
+		MemoryContextReset(tmp_cxt);
+
+		CHECK_FOR_INTERRUPTS();
+	}
+
+	MemoryContextDelete(tmp_cxt);
+	XLogReaderFree(xlogreader);
+
+#undef GET_WAL_RECORDS_INFO_FROM_BUFFERS_COLS
+}
diff --git a/src/test/modules/read_wal_from_buffers/read_wal_from_buffers.control b/src/test/modules/read_wal_from_buffers/read_wal_from_buffers.control
new file mode 100644
index 0000000000..b14d24751c
--- /dev/null
+++ b/src/test/modules/read_wal_from_buffers/read_wal_from_buffers.control
@@ -0,0 +1,4 @@
+comment = 'Test module to read WAL from WAL buffers'
+default_version = '1.0'
+module_pathname = '$libdir/read_wal_from_buffers'
+relocatable = true
diff --git a/src/test/modules/read_wal_from_buffers/t/001_basic.pl b/src/test/modules/read_wal_from_buffers/t/001_basic.pl
new file mode 100644
index 0000000000..15ef550c8c
--- /dev/null
+++ b/src/test/modules/read_wal_from_buffers/t/001_basic.pl
@@ -0,0 +1,111 @@
+# Copyright (c) 2021-2023, PostgreSQL Global Development Group
+
+use strict;
+use warnings;
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+use Time::HiRes qw(usleep);
+
+# Setup a new node.  The configuration chosen here minimizes the number
+# of arbitrary records that could get generated in a cluster.  Enlarging
+# checkpoint_timeout avoids noise with checkpoint activity.  wal_level
+# set to "minimal" avoids random standby snapshot records.  Autovacuum
+# could also trigger randomly, generating random WAL activity of its own.
+# Enlarging wal_writer_delay and wal_writer_flush_after avoid background
+# wal flush by walwriter.
+my $node = PostgreSQL::Test::Cluster->new("node");
+$node->init;
+$node->append_conf(
+	'postgresql.conf',
+	q[wal_level = minimal
+	  autovacuum = off
+	  checkpoint_timeout = '30min'
+	  wal_writer_delay = 10000ms
+	  wal_writer_flush_after = 1GB
+]);
+$node->start;
+
+# Setup.
+$node->safe_psql('postgres', 'CREATE EXTENSION read_wal_from_buffers;');
+
+$node->safe_psql('postgres', 'CREATE TABLE t (c int);');
+
+my $result = 0;
+my $lsn;
+my $to_read;
+
+# Wait until we read from WAL buffers
+for (my $i = 0; $i < 10 * $PostgreSQL::Test::Utils::timeout_default; $i++)
+{
+	# Get current insert LSN. After this, we generate some WAL which is guranteed
+	# to be in WAL buffers as there is no other WAL generating activity is
+	# happening on the server. We then verify if we can read the WAL from WAL
+	# buffers using this LSN.
+	$lsn =
+	  $node->safe_psql('postgres', 'SELECT pg_current_wal_insert_lsn();');
+
+	my $logstart = -s $node->logfile;
+
+	# Generate minimal WAL so that WAL buffers don't get overwritten.
+	$node->safe_psql('postgres', "INSERT INTO t VALUES ($i);");
+
+	$to_read = 8192;
+
+	my $res = $node->safe_psql('postgres',
+		qq{SELECT read_wal_from_buffers(lsn := '$lsn', bytes_to_read := $to_read) > 0;}
+	);
+
+	my $log = $node->log_contains(
+		"request to flush past end of generated WAL; request .*, current position .*",
+		$logstart);
+
+	if ($res eq 't' && $log > 0)
+	{
+		$result = 1;
+		last;
+	}
+
+	usleep(100_000);
+}
+ok($result, 'waited until WAL is successfully read from WAL buffers');
+
+$result = 0;
+
+# Wait until we get info of WAL records available in WAL buffers.
+for (my $i = 0; $i < 10 * $PostgreSQL::Test::Utils::timeout_default; $i++)
+{
+	$node->safe_psql('postgres', "DROP TABLE IF EXISTS foo, bar;");
+	$node->safe_psql('postgres',
+		"CREATE TABLE foo AS SELECT * FROM generate_series(1, 2);");
+	my $start_lsn =
+	  $node->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn();");
+	my $tbl_oid = $node->safe_psql('postgres',
+		"SELECT oid FROM pg_class WHERE relname = 'foo';");
+	$node->safe_psql('postgres',
+		"INSERT INTO foo SELECT * FROM generate_series(1, 10);");
+	my $end_lsn =
+	  $node->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn();");
+	$node->safe_psql('postgres',
+		"CREATE TABLE bar AS SELECT * FROM generate_series(1, 2);");
+
+	my $res = $node->safe_psql(
+		'postgres',
+		"SELECT count(*) FROM get_wal_records_info_from_buffers('$start_lsn', '$end_lsn')
+					WHERE block_ref LIKE concat('%', '$tbl_oid', '%') AND
+						resource_manager = 'Heap' AND
+						record_type = 'INSERT';");
+
+	if ($res eq 10)
+	{
+		$result = 1;
+		last;
+	}
+
+	usleep(100_000);
+}
+ok($result,
+	'waited until we get info of WAL records available in WAL buffers.');
+
+done_testing();
-- 
2.34.1

From 620695a36afdc30d27227a4864131c0fec80f3ba Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Mon, 8 Apr 2024 05:02:46 +0000
Subject: [PATCH v27 2/2] Use WALReadFromBuffers in more places.

---
 src/backend/access/transam/xlogutils.c | 13 ++++++++++++-
 src/backend/replication/walsender.c    | 16 +++++++++++++---
 2 files changed, 25 insertions(+), 4 deletions(-)

diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index 5295b85fe0..1e1f5b5306 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -892,6 +892,8 @@ read_local_xlog_page_guts(XLogReaderState *state, XLogRecPtr targetPagePtr,
 	int			count;
 	WALReadError errinfo;
 	TimeLineID	currTLI;
+	Size		nbytes;
+	Size		rbytes;
 
 	loc = targetPagePtr + reqLen;
 
@@ -1004,7 +1006,16 @@ read_local_xlog_page_guts(XLogReaderState *state, XLogRecPtr targetPagePtr,
 		count = read_upto - targetPagePtr;
 	}
 
-	if (!WALRead(state, cur_page, targetPagePtr, count, tli,
+	/* attempt to read WAL from WAL buffers first */
+	nbytes = count;
+	rbytes = WALReadFromBuffers(cur_page, targetPagePtr, nbytes, currTLI);
+	cur_page += rbytes;
+	targetPagePtr += rbytes;
+	nbytes -= rbytes;
+
+	/* now read the remaining WAL from WAL file */
+	if (nbytes > 0 &&
+		!WALRead(state, cur_page, targetPagePtr, nbytes, tli,
 				 &errinfo))
 		WALReadRaiseError(&errinfo);
 
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index bc40c454de..19cf1d5ce7 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1056,6 +1056,8 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
 	WALReadError errinfo;
 	XLogSegNo	segno;
 	TimeLineID	currTLI;
+	Size		nbytes;
+	Size		rbytes;
 
 	/*
 	 * Make sure we have enough WAL available before retrieving the current
@@ -1092,11 +1094,19 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
 	else
 		count = flushptr - targetPagePtr;	/* part of the page available */
 
-	/* now actually read the data, we know it's there */
-	if (!WALRead(state,
+	/* attempt to read WAL from WAL buffers first */
+	nbytes = count;
+	rbytes = WALReadFromBuffers(cur_page, targetPagePtr, nbytes, currTLI);
+	cur_page += rbytes;
+	targetPagePtr += rbytes;
+	nbytes -= rbytes;
+
+	/* now read the remaining WAL from WAL file */
+	if (nbytes > 0 &&
+		!WALRead(state,
 				 cur_page,
 				 targetPagePtr,
-				 count,
+				 nbytes,
 				 currTLI,		/* Pass the current TLI because only
 								 * WalSndSegmentOpen controls whether new TLI
 								 * is needed. */
-- 
2.34.1

Reply via email to