On Fri, Feb 16, 2024 at 11:01 PM Jeff Davis <pg...@j-davis.com> wrote: > > > Here, I'm with v23 patch set: > > Thank you, I'll look at these.
Thanks. Here's the v24 patch set after rebasing. -- Bharath Rupireddy PostgreSQL Contributors Team RDS Open Source Databases Amazon Web Services: https://aws.amazon.com
From d0317ed91b1483a5556c87388e0186462711e022 Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com> Date: Sat, 17 Feb 2024 04:41:29 +0000 Subject: [PATCH v24 4/4] Demonstrate reading unflushed WAL directly from WAL buffers --- src/backend/access/transam/xlogreader.c | 3 +- .../read_wal_from_buffers--1.0.sql | 23 ++ .../read_wal_from_buffers.c | 266 +++++++++++++++++- .../read_wal_from_buffers/t/001_basic.pl | 35 +++ 4 files changed, 325 insertions(+), 2 deletions(-) diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index ae9904e7e4..4658a86997 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -1035,7 +1035,8 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) * record is. This is so that we can check the additional identification * info that is present in the first page's "long" header. */ - if (targetSegNo != state->seg.ws_segno && targetPageOff != 0) + if (state->seg.ws_segno != 0 && + targetSegNo != state->seg.ws_segno && targetPageOff != 0) { XLogRecPtr targetSegmentPtr = pageptr - targetPageOff; diff --git a/src/test/modules/read_wal_from_buffers/read_wal_from_buffers--1.0.sql b/src/test/modules/read_wal_from_buffers/read_wal_from_buffers--1.0.sql index 82fa097d10..72d05522fc 100644 --- a/src/test/modules/read_wal_from_buffers/read_wal_from_buffers--1.0.sql +++ b/src/test/modules/read_wal_from_buffers/read_wal_from_buffers--1.0.sql @@ -12,3 +12,26 @@ CREATE FUNCTION read_wal_from_buffers(IN lsn pg_lsn, IN bytes_to_read int, bytes_read OUT int) AS 'MODULE_PATHNAME', 'read_wal_from_buffers' LANGUAGE C STRICT; + +-- +-- get_wal_records_info_from_buffers() +-- +-- SQL function to get info of WAL records available in WAL buffers. +-- +CREATE FUNCTION get_wal_records_info_from_buffers(IN start_lsn pg_lsn, + IN end_lsn pg_lsn, + OUT start_lsn pg_lsn, + OUT end_lsn pg_lsn, + OUT prev_lsn pg_lsn, + OUT xid xid, + OUT resource_manager text, + OUT record_type text, + OUT record_length int4, + OUT main_data_length int4, + OUT fpi_length int4, + OUT description text, + OUT block_ref text +) +RETURNS SETOF record +AS 'MODULE_PATHNAME', 'get_wal_records_info_from_buffers' +LANGUAGE C STRICT PARALLEL SAFE; diff --git a/src/test/modules/read_wal_from_buffers/read_wal_from_buffers.c b/src/test/modules/read_wal_from_buffers/read_wal_from_buffers.c index 9df5c07b4b..ed33a14127 100644 --- a/src/test/modules/read_wal_from_buffers/read_wal_from_buffers.c +++ b/src/test/modules/read_wal_from_buffers/read_wal_from_buffers.c @@ -14,11 +14,27 @@ #include "postgres.h" #include "access/xlog.h" -#include "fmgr.h" +#include "access/xlog_internal.h" +#include "access/xlogreader.h" +#include "access/xlogrecovery.h" +#include "funcapi.h" +#include "miscadmin.h" +#include "utils/builtins.h" #include "utils/pg_lsn.h" PG_MODULE_MAGIC; +static int read_from_wal_buffers(XLogReaderState *state, XLogRecPtr targetPagePtr, + int reqLen, XLogRecPtr targetRecPtr, + char *cur_page); + +static XLogRecord *ReadNextXLogRecord(XLogReaderState *xlogreader); +static void GetWALRecordInfo(XLogReaderState *record, Datum *values, + bool *nulls, uint32 ncols); +static void GetWALRecordsInfo(FunctionCallInfo fcinfo, + XLogRecPtr start_lsn, + XLogRecPtr end_lsn); + /* * SQL function to read WAL from WAL buffers. Returns number of bytes read. */ @@ -52,3 +68,251 @@ read_wal_from_buffers(PG_FUNCTION_ARGS) PG_RETURN_INT32(read); } + +/* + * XLogReaderRoutine->page_read callback for reading WAL from WAL buffers. + */ +static int +read_from_wal_buffers(XLogReaderState *state, XLogRecPtr targetPagePtr, + int reqLen, XLogRecPtr targetRecPtr, + char *cur_page) +{ + XLogRecPtr read_upto, + loc; + TimeLineID tli = GetWALInsertionTimeLine(); + Size count; + Size read = 0; + + loc = targetPagePtr + reqLen; + + /* Loop waiting for xlog to be available if necessary */ + while (1) + { + read_upto = GetXLogInsertRecPtr(); + + if (loc <= read_upto) + break; + + WaitXLogInsertionsToFinish(loc); + + CHECK_FOR_INTERRUPTS(); + pg_usleep(1000L); + } + + if (targetPagePtr + XLOG_BLCKSZ <= read_upto) + { + /* + * more than one block available; read only that block, have caller + * come back if they need more. + */ + count = XLOG_BLCKSZ; + } + else if (targetPagePtr + reqLen > read_upto) + { + /* not enough data there */ + return -1; + } + else + { + /* enough bytes available to satisfy the request */ + count = read_upto - targetPagePtr; + } + + /* read WAL from WAL buffers */ + read = WALReadFromBuffers(cur_page, targetPagePtr, count, tli); + + if (read != count) + ereport(ERROR, + errmsg("could not read fully from WAL buffers; expected %lu, read %lu", + count, read)); + + return count; +} + +/* + * Get info of all WAL records between start LSN and end LSN. + * + * This function and its helpers below are similar to pg_walinspect's + * pg_get_wal_records_info() except that it will get info of WAL records + * available in WAL buffers. + */ +PG_FUNCTION_INFO_V1(get_wal_records_info_from_buffers); +Datum +get_wal_records_info_from_buffers(PG_FUNCTION_ARGS) +{ + XLogRecPtr start_lsn = PG_GETARG_LSN(0); + XLogRecPtr end_lsn = PG_GETARG_LSN(1); + + /* + * Validate start and end LSNs coming from the function inputs. + * + * Reading WAL below the first page of the first segments isn't allowed. + * This is a bootstrap WAL page and the page_read callback fails to read + * it. + */ + if (start_lsn < XLOG_BLCKSZ) + ereport(ERROR, + (errmsg("could not read WAL at LSN %X/%X", + LSN_FORMAT_ARGS(start_lsn)))); + + if (start_lsn > end_lsn) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("WAL start LSN must be less than end LSN"))); + + GetWALRecordsInfo(fcinfo, start_lsn, end_lsn); + + PG_RETURN_VOID(); +} + +/* + * Read next WAL record. + */ +static XLogRecord * +ReadNextXLogRecord(XLogReaderState *xlogreader) +{ + XLogRecord *record; + char *errormsg; + + record = XLogReadRecord(xlogreader, &errormsg); + + if (record == NULL) + { + if (errormsg) + ereport(ERROR, + errmsg("could not read WAL at %X/%X: %s", + LSN_FORMAT_ARGS(xlogreader->EndRecPtr), errormsg)); + else + ereport(ERROR, + errmsg("could not read WAL at %X/%X", + LSN_FORMAT_ARGS(xlogreader->EndRecPtr))); + } + + return record; +} + +/* + * Output values that make up a row describing caller's WAL record. + */ +static void +GetWALRecordInfo(XLogReaderState *record, Datum *values, + bool *nulls, uint32 ncols) +{ + const char *record_type; + RmgrData desc; + uint32 fpi_len = 0; + StringInfoData rec_desc; + StringInfoData rec_blk_ref; + int i = 0; + + desc = GetRmgr(XLogRecGetRmid(record)); + record_type = desc.rm_identify(XLogRecGetInfo(record)); + + if (record_type == NULL) + record_type = psprintf("UNKNOWN (%x)", XLogRecGetInfo(record) & ~XLR_INFO_MASK); + + initStringInfo(&rec_desc); + desc.rm_desc(&rec_desc, record); + + if (XLogRecHasAnyBlockRefs(record)) + { + initStringInfo(&rec_blk_ref); + XLogRecGetBlockRefInfo(record, false, true, &rec_blk_ref, &fpi_len); + } + + values[i++] = LSNGetDatum(record->ReadRecPtr); + values[i++] = LSNGetDatum(record->EndRecPtr); + values[i++] = LSNGetDatum(XLogRecGetPrev(record)); + values[i++] = TransactionIdGetDatum(XLogRecGetXid(record)); + values[i++] = CStringGetTextDatum(desc.rm_name); + values[i++] = CStringGetTextDatum(record_type); + values[i++] = UInt32GetDatum(XLogRecGetTotalLen(record)); + values[i++] = UInt32GetDatum(XLogRecGetDataLen(record)); + values[i++] = UInt32GetDatum(fpi_len); + + if (rec_desc.len > 0) + values[i++] = CStringGetTextDatum(rec_desc.data); + else + nulls[i++] = true; + + if (XLogRecHasAnyBlockRefs(record)) + values[i++] = CStringGetTextDatum(rec_blk_ref.data); + else + nulls[i++] = true; + + Assert(i == ncols); +} + +/* + * Get info of all WAL records between start LSN and end LSN. + */ +static void +GetWALRecordsInfo(FunctionCallInfo fcinfo, XLogRecPtr start_lsn, + XLogRecPtr end_lsn) +{ +#define GET_WAL_RECORDS_INFO_FROM_BUFFERS_COLS 11 + XLogReaderState *xlogreader; + XLogRecPtr first_valid_record; + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + MemoryContext old_cxt; + MemoryContext tmp_cxt; + + Assert(start_lsn <= end_lsn); + + InitMaterializedSRF(fcinfo, 0); + + xlogreader = XLogReaderAllocate(wal_segment_size, NULL, + XL_ROUTINE(.page_read = &read_from_wal_buffers, + .segment_open = NULL, + .segment_close = NULL), + NULL); + + if (xlogreader == NULL) + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("out of memory"), + errdetail("Failed while allocating a WAL reading processor."))); + + /* first find a valid recptr to start from */ + first_valid_record = XLogFindNextRecord(xlogreader, start_lsn); + + if (XLogRecPtrIsInvalid(first_valid_record)) + { + ereport(LOG, + (errmsg("could not find a valid record after %X/%X", + LSN_FORMAT_ARGS(start_lsn)))); + + return; + } + + tmp_cxt = AllocSetContextCreate(CurrentMemoryContext, + "GetWALRecordsInfo temporary cxt", + ALLOCSET_DEFAULT_SIZES); + + while (ReadNextXLogRecord(xlogreader) && + xlogreader->EndRecPtr <= end_lsn) + { + Datum values[GET_WAL_RECORDS_INFO_FROM_BUFFERS_COLS] = {0}; + bool nulls[GET_WAL_RECORDS_INFO_FROM_BUFFERS_COLS] = {0}; + + /* Use the tmp context so we can clean up after each tuple is done */ + old_cxt = MemoryContextSwitchTo(tmp_cxt); + + GetWALRecordInfo(xlogreader, values, nulls, + GET_WAL_RECORDS_INFO_FROM_BUFFERS_COLS); + + tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, + values, nulls); + + /* clean up and switch back */ + MemoryContextSwitchTo(old_cxt); + MemoryContextReset(tmp_cxt); + + CHECK_FOR_INTERRUPTS(); + } + + MemoryContextDelete(tmp_cxt); + XLogReaderFree(xlogreader); + +#undef GET_WAL_RECORDS_INFO_FROM_BUFFERS_COLS +} diff --git a/src/test/modules/read_wal_from_buffers/t/001_basic.pl b/src/test/modules/read_wal_from_buffers/t/001_basic.pl index f985e49a27..fcdcdb001e 100644 --- a/src/test/modules/read_wal_from_buffers/t/001_basic.pl +++ b/src/test/modules/read_wal_from_buffers/t/001_basic.pl @@ -69,4 +69,39 @@ for (my $i = 0; $i < 10 * $PostgreSQL::Test::Utils::timeout_default; $i++) } ok($result, 'waited until WAL is successfully read from WAL buffers'); +$result = 0; + +# Wait until we get info of WAL records available in WAL buffers. +for (my $i = 0; $i < 10 * $PostgreSQL::Test::Utils::timeout_default; $i++) +{ + $node->safe_psql('postgres', "DROP TABLE IF EXISTS foo, bar;"); + $node->safe_psql('postgres', + "CREATE TABLE foo AS SELECT * FROM generate_series(1, 2);"); + my $start_lsn = $node->safe_psql('postgres', + "SELECT pg_current_wal_insert_lsn();"); + my $tbl_oid = $node->safe_psql('postgres', + "SELECT oid FROM pg_class WHERE relname = 'foo';"); + $node->safe_psql('postgres', + "INSERT INTO foo SELECT * FROM generate_series(1, 10);"); + my $end_lsn = $node->safe_psql('postgres', + "SELECT pg_current_wal_insert_lsn();"); + $node->safe_psql('postgres', + "CREATE TABLE bar AS SELECT * FROM generate_series(1, 2);"); + + my $res = $node->safe_psql('postgres', + "SELECT count(*) FROM get_wal_records_info_from_buffers('$start_lsn', '$end_lsn') + WHERE block_ref LIKE concat('%', '$tbl_oid', '%') AND + resource_manager = 'Heap' AND + record_type = 'INSERT';"); + + if ($res eq 10) + { + $result = 1; + last; + } + + usleep(100_000); +} +ok($result, 'waited until we get info of WAL records available in WAL buffers.'); + done_testing(); -- 2.34.1
From 648e261505ac819c85112276e7b6054105f22e13 Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com> Date: Sat, 17 Feb 2024 04:32:09 +0000 Subject: [PATCH v24 1/4] Add check in WALReadFromBuffers against requested WAL --- src/backend/access/transam/xlog.c | 26 ++++++++++++++++++------- src/backend/access/transam/xlogreader.c | 3 +++ src/include/access/xlog.h | 1 + 3 files changed, 23 insertions(+), 7 deletions(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 50c347a679..b01a3b4ed1 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -698,7 +698,6 @@ static void ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos, XLogRecPtr *EndPos, XLogRecPtr *PrevPtr); static bool ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos, XLogRecPtr *PrevPtr); -static XLogRecPtr WaitXLogInsertionsToFinish(XLogRecPtr upto); static char *GetXLogBuffer(XLogRecPtr ptr, TimeLineID tli); static XLogRecPtr XLogBytePosToRecPtr(uint64 bytepos); static XLogRecPtr XLogBytePosToEndRecPtr(uint64 bytepos); @@ -1493,7 +1492,7 @@ WALInsertLockUpdateInsertingAt(XLogRecPtr insertingAt) * uninitialized page), and the inserter might need to evict an old WAL buffer * to make room for a new one, which in turn requires WALWriteLock. */ -static XLogRecPtr +XLogRecPtr WaitXLogInsertionsToFinish(XLogRecPtr upto) { uint64 bytepos; @@ -1710,13 +1709,14 @@ GetXLogBuffer(XLogRecPtr ptr, TimeLineID tli) * of bytes read successfully. * * Fewer than 'count' bytes may be read if some of the requested WAL data has - * already been evicted. + * already been evicted from the WAL buffers. * * No locks are taken. * - * Caller should ensure that it reads no further than LogwrtResult.Write - * (which should have been updated by the caller when determining how far to - * read). The 'tli' argument is only used as a convenient safety check so that + * Caller should ensure that it reads no further than current insert position + * with the help of WaitXLogInsertionsToFinish(). + * + * The 'tli' argument is only used as a convenient safety check so that * callers do not read from WAL buffers on a historical timeline. */ Size @@ -1731,7 +1731,19 @@ WALReadFromBuffers(char *dstbuf, XLogRecPtr startptr, Size count, return 0; Assert(!XLogRecPtrIsInvalid(startptr)); - Assert(startptr + count <= LogwrtResult.Write); + +#ifdef USE_ASSERT_CHECKING + { + XLogRecPtr upto = startptr + count; + XLogRecPtr insert_pos = GetXLogInsertRecPtr(); + + if (upto > insert_pos) + ereport(ERROR, + (errmsg("cannot read past end of current insert position; request %X/%X, insert position %X/%X", + LSN_FORMAT_ARGS(upto), + LSN_FORMAT_ARGS(insert_pos)))); + } +#endif /* * Loop through the buffers without a lock. For each buffer, atomically diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index 74a6b11866..ae9904e7e4 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -1500,6 +1500,9 @@ err: * * Returns true if succeeded, false if an error occurs, in which case * 'errinfo' receives error details. + * + * Note: It is the caller's responsibility to ensure requested WAL is written + * to disk, that is 'startptr'+'count' > LogwrtResult.Write. */ bool WALRead(XLogReaderState *state, diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 76787a8267..74606a6846 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -252,6 +252,7 @@ extern XLogRecPtr GetLastImportantRecPtr(void); extern void SetWalWriterSleeping(bool sleeping); +extern XLogRecPtr WaitXLogInsertionsToFinish(XLogRecPtr upto); extern Size WALReadFromBuffers(char *dstbuf, XLogRecPtr startptr, Size count, TimeLineID tli); -- 2.34.1
From 6f3b49ef70f0cab1f9191423db0d419ff3771211 Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com> Date: Sat, 17 Feb 2024 04:32:33 +0000 Subject: [PATCH v24 2/4] Add test module for verifying read from WAL buffers --- src/test/modules/Makefile | 1 + src/test/modules/meson.build | 1 + .../modules/read_wal_from_buffers/.gitignore | 4 ++ .../modules/read_wal_from_buffers/Makefile | 23 ++++++ .../modules/read_wal_from_buffers/meson.build | 33 +++++++++ .../read_wal_from_buffers--1.0.sql | 14 ++++ .../read_wal_from_buffers.c | 54 ++++++++++++++ .../read_wal_from_buffers.control | 4 ++ .../read_wal_from_buffers/t/001_basic.pl | 72 +++++++++++++++++++ 9 files changed, 206 insertions(+) create mode 100644 src/test/modules/read_wal_from_buffers/.gitignore create mode 100644 src/test/modules/read_wal_from_buffers/Makefile create mode 100644 src/test/modules/read_wal_from_buffers/meson.build create mode 100644 src/test/modules/read_wal_from_buffers/read_wal_from_buffers--1.0.sql create mode 100644 src/test/modules/read_wal_from_buffers/read_wal_from_buffers.c create mode 100644 src/test/modules/read_wal_from_buffers/read_wal_from_buffers.control create mode 100644 src/test/modules/read_wal_from_buffers/t/001_basic.pl diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile index 89aa41b5e3..864a3dd72b 100644 --- a/src/test/modules/Makefile +++ b/src/test/modules/Makefile @@ -12,6 +12,7 @@ SUBDIRS = \ dummy_seclabel \ libpq_pipeline \ plsample \ + read_wal_from_buffers \ spgist_name_ops \ test_bloomfilter \ test_copy_callbacks \ diff --git a/src/test/modules/meson.build b/src/test/modules/meson.build index 8fbe742d38..4f3dd69e58 100644 --- a/src/test/modules/meson.build +++ b/src/test/modules/meson.build @@ -33,6 +33,7 @@ subdir('test_resowner') subdir('test_rls_hooks') subdir('test_shm_mq') subdir('test_slru') +subdir('read_wal_from_buffers') subdir('unsafe_tests') subdir('worker_spi') subdir('xid_wraparound') diff --git a/src/test/modules/read_wal_from_buffers/.gitignore b/src/test/modules/read_wal_from_buffers/.gitignore new file mode 100644 index 0000000000..5dcb3ff972 --- /dev/null +++ b/src/test/modules/read_wal_from_buffers/.gitignore @@ -0,0 +1,4 @@ +# Generated subdirectories +/log/ +/results/ +/tmp_check/ diff --git a/src/test/modules/read_wal_from_buffers/Makefile b/src/test/modules/read_wal_from_buffers/Makefile new file mode 100644 index 0000000000..9e57a837f9 --- /dev/null +++ b/src/test/modules/read_wal_from_buffers/Makefile @@ -0,0 +1,23 @@ +# src/test/modules/read_wal_from_buffers/Makefile + +MODULE_big = read_wal_from_buffers +OBJS = \ + $(WIN32RES) \ + read_wal_from_buffers.o +PGFILEDESC = "read_wal_from_buffers - test module to read WAL from WAL buffers" + +EXTENSION = read_wal_from_buffers +DATA = read_wal_from_buffers--1.0.sql + +TAP_TESTS = 1 + +ifdef USE_PGXS +PG_CONFIG = pg_config +PGXS := $(shell $(PG_CONFIG) --pgxs) +include $(PGXS) +else +subdir = src/test/modules/read_wal_from_buffers +top_builddir = ../../../.. +include $(top_builddir)/src/Makefile.global +include $(top_srcdir)/contrib/contrib-global.mk +endif diff --git a/src/test/modules/read_wal_from_buffers/meson.build b/src/test/modules/read_wal_from_buffers/meson.build new file mode 100644 index 0000000000..3fac00d616 --- /dev/null +++ b/src/test/modules/read_wal_from_buffers/meson.build @@ -0,0 +1,33 @@ +# Copyright (c) 2024, PostgreSQL Global Development Group + +read_wal_from_buffers_sources = files( + 'read_wal_from_buffers.c', +) + +if host_system == 'windows' + read_wal_from_buffers_sources += rc_lib_gen.process(win32ver_rc, extra_args: [ + '--NAME', 'read_wal_from_buffers', + '--FILEDESC', 'read_wal_from_buffers - test module to read WAL from WAL buffers',]) +endif + +read_wal_from_buffers = shared_module('read_wal_from_buffers', + read_wal_from_buffers_sources, + kwargs: pg_test_mod_args, +) +test_install_libs += read_wal_from_buffers + +test_install_data += files( + 'read_wal_from_buffers.control', + 'read_wal_from_buffers--1.0.sql', +) + +tests += { + 'name': 'read_wal_from_buffers', + 'sd': meson.current_source_dir(), + 'bd': meson.current_build_dir(), + 'tap': { + 'tests': [ + 't/001_basic.pl', + ], + }, +} diff --git a/src/test/modules/read_wal_from_buffers/read_wal_from_buffers--1.0.sql b/src/test/modules/read_wal_from_buffers/read_wal_from_buffers--1.0.sql new file mode 100644 index 0000000000..82fa097d10 --- /dev/null +++ b/src/test/modules/read_wal_from_buffers/read_wal_from_buffers--1.0.sql @@ -0,0 +1,14 @@ +/* src/test/modules/read_wal_from_buffers/read_wal_from_buffers--1.0.sql */ + +-- complain if script is sourced in psql, rather than via CREATE EXTENSION +\echo Use "CREATE EXTENSION read_wal_from_buffers" to load this file. \quit + +-- +-- read_wal_from_buffers() +-- +-- SQL function to read WAL from WAL buffers. Returns number of bytes read. +-- +CREATE FUNCTION read_wal_from_buffers(IN lsn pg_lsn, IN bytes_to_read int, + bytes_read OUT int) +AS 'MODULE_PATHNAME', 'read_wal_from_buffers' +LANGUAGE C STRICT; diff --git a/src/test/modules/read_wal_from_buffers/read_wal_from_buffers.c b/src/test/modules/read_wal_from_buffers/read_wal_from_buffers.c new file mode 100644 index 0000000000..9df5c07b4b --- /dev/null +++ b/src/test/modules/read_wal_from_buffers/read_wal_from_buffers.c @@ -0,0 +1,54 @@ +/*-------------------------------------------------------------------------- + * + * read_wal_from_buffers.c + * Test module to read WAL from WAL buffers. + * + * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/test/modules/read_wal_from_buffers/read_wal_from_buffers.c + * ------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/xlog.h" +#include "fmgr.h" +#include "utils/pg_lsn.h" + +PG_MODULE_MAGIC; + +/* + * SQL function to read WAL from WAL buffers. Returns number of bytes read. + */ +PG_FUNCTION_INFO_V1(read_wal_from_buffers); +Datum +read_wal_from_buffers(PG_FUNCTION_ARGS) +{ + XLogRecPtr startptr = PG_GETARG_LSN(0); + int32 count = PG_GETARG_INT32(1); + Size read; + char *data = palloc0(count); + XLogRecPtr upto = startptr + count; + XLogRecPtr insert_pos = GetXLogInsertRecPtr(); + TimeLineID tli = GetWALInsertionTimeLine(); + + /* + * The requested WAL may be very recent, so wait for any in-progress WAL + * insertions to WAL buffers to finish. + */ + if (upto > insert_pos) + { + XLogRecPtr writtenUpto = WaitXLogInsertionsToFinish(upto); + + upto = Min(upto, writtenUpto); + count = upto - startptr; + } + + read = WALReadFromBuffers(data, startptr, count, tli); + + pfree(data); + + PG_RETURN_INT32(read); +} diff --git a/src/test/modules/read_wal_from_buffers/read_wal_from_buffers.control b/src/test/modules/read_wal_from_buffers/read_wal_from_buffers.control new file mode 100644 index 0000000000..b14d24751c --- /dev/null +++ b/src/test/modules/read_wal_from_buffers/read_wal_from_buffers.control @@ -0,0 +1,4 @@ +comment = 'Test module to read WAL from WAL buffers' +default_version = '1.0' +module_pathname = '$libdir/read_wal_from_buffers' +relocatable = true diff --git a/src/test/modules/read_wal_from_buffers/t/001_basic.pl b/src/test/modules/read_wal_from_buffers/t/001_basic.pl new file mode 100644 index 0000000000..f985e49a27 --- /dev/null +++ b/src/test/modules/read_wal_from_buffers/t/001_basic.pl @@ -0,0 +1,72 @@ +# Copyright (c) 2021-2023, PostgreSQL Global Development Group + +use strict; +use warnings; + +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; +use Time::HiRes qw(usleep); + +# Setup a new node. The configuration chosen here minimizes the number +# of arbitrary records that could get generated in a cluster. Enlarging +# checkpoint_timeout avoids noise with checkpoint activity. wal_level +# set to "minimal" avoids random standby snapshot records. Autovacuum +# could also trigger randomly, generating random WAL activity of its own. +# Enlarging wal_writer_delay and wal_writer_flush_after avoid background +# wal flush by walwriter. +my $node = PostgreSQL::Test::Cluster->new("node"); +$node->init; +$node->append_conf( + 'postgresql.conf', + q[wal_level = minimal + autovacuum = off + checkpoint_timeout = '30min' + wal_writer_delay = 10000ms + wal_writer_flush_after = 1GB +]); +$node->start; + +# Setup. +$node->safe_psql('postgres', 'CREATE EXTENSION read_wal_from_buffers;'); + +$node->safe_psql('postgres', 'CREATE TABLE t (c int);'); + +my $result = 0; +my $lsn; +my $to_read; + +# Wait until we read from WAL buffers +for (my $i = 0; $i < 10 * $PostgreSQL::Test::Utils::timeout_default; $i++) +{ + # Get current insert LSN. After this, we generate some WAL which is guranteed + # to be in WAL buffers as there is no other WAL generating activity is + # happening on the server. We then verify if we can read the WAL from WAL + # buffers using this LSN. + $lsn = $node->safe_psql('postgres', 'SELECT pg_current_wal_insert_lsn();'); + + my $logstart = -s $node->logfile; + + # Generate minimal WAL so that WAL buffers don't get overwritten. + $node->safe_psql('postgres', "INSERT INTO t VALUES ($i);"); + + $to_read = 8192; + + my $res = $node->safe_psql('postgres', + qq{SELECT read_wal_from_buffers(lsn := '$lsn', bytes_to_read := $to_read) > 0;}); + + my $log = $node->log_contains( + "request to flush past end of generated WAL; request .*, current position .*", + $logstart); + + if ($res eq 't' && $log > 0) + { + $result = 1; + last; + } + + usleep(100_000); +} +ok($result, 'waited until WAL is successfully read from WAL buffers'); + +done_testing(); -- 2.34.1
From c590fc71291514d761e1d2e8d865fa348d0c206a Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com> Date: Sat, 17 Feb 2024 04:40:05 +0000 Subject: [PATCH v24 3/4] Use WALReadFromBuffers in more places --- src/backend/access/transam/xlogutils.c | 13 ++++++++++++- src/backend/replication/walsender.c | 16 +++++++++++++--- 2 files changed, 25 insertions(+), 4 deletions(-) diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index ad93035d50..8fb2e68e85 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -895,6 +895,8 @@ read_local_xlog_page_guts(XLogReaderState *state, XLogRecPtr targetPagePtr, int count; WALReadError errinfo; TimeLineID currTLI; + Size nbytes; + Size rbytes; loc = targetPagePtr + reqLen; @@ -1007,7 +1009,16 @@ read_local_xlog_page_guts(XLogReaderState *state, XLogRecPtr targetPagePtr, count = read_upto - targetPagePtr; } - if (!WALRead(state, cur_page, targetPagePtr, count, tli, + /* attempt to read WAL from WAL buffers first */ + nbytes = count; + rbytes = WALReadFromBuffers(cur_page, targetPagePtr, nbytes, currTLI); + cur_page += rbytes; + targetPagePtr += rbytes; + nbytes -= rbytes; + + /* now read the remaining WAL from WAL file */ + if (nbytes > 0 && + !WALRead(state, cur_page, targetPagePtr, nbytes, tli, &errinfo)) WALReadRaiseError(&errinfo); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 631d1e0c9f..7ecc7174a0 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1059,6 +1059,8 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req WALReadError errinfo; XLogSegNo segno; TimeLineID currTLI; + Size nbytes; + Size rbytes; /* * Make sure we have enough WAL available before retrieving the current @@ -1095,11 +1097,19 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req else count = flushptr - targetPagePtr; /* part of the page available */ - /* now actually read the data, we know it's there */ - if (!WALRead(state, + /* attempt to read WAL from WAL buffers first */ + nbytes = count; + rbytes = WALReadFromBuffers(cur_page, targetPagePtr, nbytes, currTLI); + cur_page += rbytes; + targetPagePtr += rbytes; + nbytes -= rbytes; + + /* now read the remaining WAL from WAL file */ + if (nbytes > 0 && + !WALRead(state, cur_page, targetPagePtr, - count, + nbytes, currTLI, /* Pass the current TLI because only * WalSndSegmentOpen controls whether new TLI * is needed. */ -- 2.34.1