Thanks for keeping this moving forward.  I gave your proposed patches a
look.   One thing I didn't like much is that we're adding a new member
(Copy) to XLogwrtAtomic -- but this struct is supposed to be a mirror of
XLogwrtResult for use with atomic access.  Since this new member is not
added to XLogwrtResult (because it's not needed there), the whole idea
of there being symmetry between those two structs crumbles down.
Because we later stop using struct-assign anyway, meaning we no longer
need the structs to match, we can instead spell out the members in
XLogCtl and call it a day.

So what I do in the attached 0001 is stop using the XLogwrtResult struct
in XLogCtl and replace it with separate Write and Flush values, and add
the macro XLogUpdateLocalLogwrtResult() that copies the values of Write
and Flush from the shared XLogCtl to the local variable given as macro
argument.  (I also added our idiomatic do {} while(0) to the macro
definition, for safety).  The new members are XLogCtl->logWriteResult
and XLogCtl->logFlushResult and as of 0001 are just XLogRecPtr, so
essentially identical semantics as the previous code.  No atomic access
yet!

0002 then adds pg_atomic_monotonic_advance_u64.  (I don't add the _u32
variant, because I don't think it's a great idea to add dead code.  If
later we see a need for it we can put it in.)  It also changes the two
new members to be atomics, changes the macro to use atomic read, and
XLogWrite now uses monotonic increment.  A couple of other places can
move the macro calls to occur outside the spinlock.  Also, XLogWrite
gains the invariant checking that involves Write and Flush.

Finally, 0003 adds the Copy pointer to XLogCtl alongside Write and
Flush, and updates WALReadFromBuffers to test that instead of the Write
pointer, and adds in XLogWrite the invariant checks that involve the
Copy pointer.

I haven't rerun Bharath test loop yet; will do so shortly.

-- 
Álvaro Herrera         PostgreSQL Developer  —  https://www.EnterpriseDB.com/
"You're _really_ hosed if the person doing the hiring doesn't understand
relational systems: you end up with a whole raft of programmers, none of
whom has had a Date with the clue stick."              (Andrew Sullivan)
https://postgr.es/m/20050809113420.gd2...@phlogiston.dyndns.org
>From e6ddbe87d598c6a1090e39845a4a308060f0af1e Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Wed, 3 Apr 2024 11:23:12 +0200
Subject: [PATCH v15 1/3] split XLogCtl->LogwrtResult into separate struct
 members

---
 src/backend/access/transam/xlog.c | 59 ++++++++++++++++++-------------
 1 file changed, 35 insertions(+), 24 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 87ea03954b..6213d99561 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -291,16 +291,15 @@ static bool doPageWrites;
  *
  * LogwrtRqst indicates a byte position that we need to write and/or fsync
  * the log up to (all records before that point must be written or fsynced).
- * LogwrtResult indicates the byte positions we have already written/fsynced.
- * These structs are identical but are declared separately to indicate their
- * slightly different functions.
+ * The positions already written/fsynced are maintained in logWriteResult
+ * and logFlushResult.
  *
- * To read XLogCtl->LogwrtResult, you must hold either info_lck or
- * WALWriteLock.  To update it, you need to hold both locks.  The point of
- * this arrangement is that the value can be examined by code that already
- * holds WALWriteLock without needing to grab info_lck as well.  In addition
- * to the shared variable, each backend has a private copy of LogwrtResult,
- * which is updated when convenient.
+ * To read XLogCtl->logWriteResult or ->logFlushResult, you must hold either
+ * info_lck or WALWriteLock.  To update them, you need to hold both locks.
+ * The point of this arrangement is that the value can be examined by code
+ * that already holds WALWriteLock without needing to grab info_lck as well.
+ * In addition to the shared variable, each backend has a private copy of
+ * both in LogwrtResult, which is updated when convenient.
  *
  * The request bookkeeping is simpler: there is a shared XLogCtl->LogwrtRqst
  * (protected by info_lck), but we don't need to cache any copies of it.
@@ -478,7 +477,8 @@ typedef struct XLogCtlData
 	 * Protected by info_lck and WALWriteLock (you must hold either lock to
 	 * read it, but both to update)
 	 */
-	XLogwrtResult LogwrtResult;
+	XLogRecPtr	logWriteResult; /* last byte + 1 written out */
+	XLogRecPtr	logFlushResult; /* last byte + 1 flushed */
 
 	/*
 	 * Latest initialized page in the cache (last byte position + 1).
@@ -614,6 +614,15 @@ static int	UsableBytesInSegment;
  */
 static XLogwrtResult LogwrtResult = {0, 0};
 
+/*
+ * Update local copy of shared XLogCtl->log{Write,Flush}Result
+ */
+#define XLogUpdateLocalLogwrtResult(_target) \
+	do { \
+		_target.Write = XLogCtl->logWriteResult; \
+		_target.Flush = XLogCtl->logFlushResult; \
+	} while (0)
+
 /*
  * openLogFile is -1 or a kernel FD for an open log file segment.
  * openLogSegNo identifies the segment, and openLogTLI the corresponding TLI.
@@ -960,7 +969,7 @@ XLogInsertRecord(XLogRecData *rdata,
 		if (XLogCtl->LogwrtRqst.Write < EndPos)
 			XLogCtl->LogwrtRqst.Write = EndPos;
 		/* update local result copy while I have the chance */
-		LogwrtResult = XLogCtl->LogwrtResult;
+		XLogUpdateLocalLogwrtResult(LogwrtResult);
 		SpinLockRelease(&XLogCtl->info_lck);
 	}
 
@@ -1984,7 +1993,7 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
 			SpinLockAcquire(&XLogCtl->info_lck);
 			if (XLogCtl->LogwrtRqst.Write < OldPageRqstPtr)
 				XLogCtl->LogwrtRqst.Write = OldPageRqstPtr;
-			LogwrtResult = XLogCtl->LogwrtResult;
+			XLogUpdateLocalLogwrtResult(LogwrtResult);
 			SpinLockRelease(&XLogCtl->info_lck);
 
 			/*
@@ -2005,7 +2014,7 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
 
 				LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
 
-				LogwrtResult = XLogCtl->LogwrtResult;
+				XLogUpdateLocalLogwrtResult(LogwrtResult);
 				if (LogwrtResult.Write >= OldPageRqstPtr)
 				{
 					/* OK, someone wrote it already */
@@ -2289,7 +2298,7 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible)
 	/*
 	 * Update local LogwrtResult (caller probably did this already, but...)
 	 */
-	LogwrtResult = XLogCtl->LogwrtResult;
+	XLogUpdateLocalLogwrtResult(LogwrtResult);
 
 	/*
 	 * Since successive pages in the xlog cache are consecutively allocated,
@@ -2549,7 +2558,8 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible)
 	 */
 	{
 		SpinLockAcquire(&XLogCtl->info_lck);
-		XLogCtl->LogwrtResult = LogwrtResult;
+		XLogCtl->logWriteResult = LogwrtResult.Write;
+		XLogCtl->logFlushResult = LogwrtResult.Flush;
 		if (XLogCtl->LogwrtRqst.Write < LogwrtResult.Write)
 			XLogCtl->LogwrtRqst.Write = LogwrtResult.Write;
 		if (XLogCtl->LogwrtRqst.Flush < LogwrtResult.Flush)
@@ -2572,7 +2582,7 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN)
 	XLogRecPtr	prevAsyncXactLSN;
 
 	SpinLockAcquire(&XLogCtl->info_lck);
-	LogwrtResult = XLogCtl->LogwrtResult;
+	XLogUpdateLocalLogwrtResult(LogwrtResult);
 	sleeping = XLogCtl->WalWriterSleeping;
 	prevAsyncXactLSN = XLogCtl->asyncXactLSN;
 	if (XLogCtl->asyncXactLSN < asyncXactLSN)
@@ -2784,7 +2794,7 @@ XLogFlush(XLogRecPtr record)
 		SpinLockAcquire(&XLogCtl->info_lck);
 		if (WriteRqstPtr < XLogCtl->LogwrtRqst.Write)
 			WriteRqstPtr = XLogCtl->LogwrtRqst.Write;
-		LogwrtResult = XLogCtl->LogwrtResult;
+		XLogUpdateLocalLogwrtResult(LogwrtResult);
 		SpinLockRelease(&XLogCtl->info_lck);
 
 		/* done already? */
@@ -2815,7 +2825,7 @@ XLogFlush(XLogRecPtr record)
 		}
 
 		/* Got the lock; recheck whether request is satisfied */
-		LogwrtResult = XLogCtl->LogwrtResult;
+		XLogUpdateLocalLogwrtResult(LogwrtResult);
 		if (record <= LogwrtResult.Flush)
 		{
 			LWLockRelease(WALWriteLock);
@@ -2939,7 +2949,7 @@ XLogBackgroundFlush(void)
 
 	/* read LogwrtResult and update local state */
 	SpinLockAcquire(&XLogCtl->info_lck);
-	LogwrtResult = XLogCtl->LogwrtResult;
+	XLogUpdateLocalLogwrtResult(LogwrtResult);
 	WriteRqst = XLogCtl->LogwrtRqst;
 	SpinLockRelease(&XLogCtl->info_lck);
 
@@ -3027,7 +3037,7 @@ XLogBackgroundFlush(void)
 	/* now wait for any in-progress insertions to finish and get write lock */
 	WaitXLogInsertionsToFinish(WriteRqst.Write);
 	LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
-	LogwrtResult = XLogCtl->LogwrtResult;
+	XLogUpdateLocalLogwrtResult(LogwrtResult);
 	if (WriteRqst.Write > LogwrtResult.Write ||
 		WriteRqst.Flush > LogwrtResult.Flush)
 	{
@@ -3116,7 +3126,7 @@ XLogNeedsFlush(XLogRecPtr record)
 
 	/* read LogwrtResult and update local state */
 	SpinLockAcquire(&XLogCtl->info_lck);
-	LogwrtResult = XLogCtl->LogwrtResult;
+	XLogUpdateLocalLogwrtResult(LogwrtResult);
 	SpinLockRelease(&XLogCtl->info_lck);
 
 	/* check again */
@@ -5953,7 +5963,8 @@ StartupXLOG(void)
 
 	LogwrtResult.Write = LogwrtResult.Flush = EndOfLog;
 
-	XLogCtl->LogwrtResult = LogwrtResult;
+	XLogCtl->logWriteResult = LogwrtResult.Write;
+	XLogCtl->logFlushResult = LogwrtResult.Flush;
 
 	XLogCtl->LogwrtRqst.Write = EndOfLog;
 	XLogCtl->LogwrtRqst.Flush = EndOfLog;
@@ -6400,7 +6411,7 @@ GetFlushRecPtr(TimeLineID *insertTLI)
 	Assert(XLogCtl->SharedRecoveryState == RECOVERY_STATE_DONE);
 
 	SpinLockAcquire(&XLogCtl->info_lck);
-	LogwrtResult = XLogCtl->LogwrtResult;
+	XLogUpdateLocalLogwrtResult(LogwrtResult);
 	SpinLockRelease(&XLogCtl->info_lck);
 
 	/*
@@ -9316,7 +9327,7 @@ XLogRecPtr
 GetXLogWriteRecPtr(void)
 {
 	SpinLockAcquire(&XLogCtl->info_lck);
-	LogwrtResult = XLogCtl->LogwrtResult;
+	XLogUpdateLocalLogwrtResult(LogwrtResult);
 	SpinLockRelease(&XLogCtl->info_lck);
 
 	return LogwrtResult.Write;
-- 
2.39.2

>From 908f21617940264d165ec9f2216778043fc876f1 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Wed, 3 Apr 2024 11:48:27 +0200
Subject: [PATCH v15 2/3] Make XLogCtl->log{Write,Flush}Result accessible with
 atomics

This removes the need to hold both the info_lck spinlock and
WALWriteLock to update them; we now use atomic monotonic increment.
---
 src/backend/access/transam/xlog.c | 64 +++++++++++++++++--------------
 src/include/port/atomics.h        | 33 ++++++++++++++++
 2 files changed, 68 insertions(+), 29 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 6213d99561..e2cd0e8f52 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -292,12 +292,7 @@ static bool doPageWrites;
  * LogwrtRqst indicates a byte position that we need to write and/or fsync
  * the log up to (all records before that point must be written or fsynced).
  * The positions already written/fsynced are maintained in logWriteResult
- * and logFlushResult.
- *
- * To read XLogCtl->logWriteResult or ->logFlushResult, you must hold either
- * info_lck or WALWriteLock.  To update them, you need to hold both locks.
- * The point of this arrangement is that the value can be examined by code
- * that already holds WALWriteLock without needing to grab info_lck as well.
+ * and logFlushResult using atomic access.
  * In addition to the shared variable, each backend has a private copy of
  * both in LogwrtResult, which is updated when convenient.
  *
@@ -473,12 +468,9 @@ typedef struct XLogCtlData
 	pg_time_t	lastSegSwitchTime;
 	XLogRecPtr	lastSegSwitchLSN;
 
-	/*
-	 * Protected by info_lck and WALWriteLock (you must hold either lock to
-	 * read it, but both to update)
-	 */
-	XLogRecPtr	logWriteResult; /* last byte + 1 written out */
-	XLogRecPtr	logFlushResult; /* last byte + 1 flushed */
+	/* These are accessed using atomics -- info_lck not needed */
+	pg_atomic_uint64 logWriteResult;	/* last byte + 1 written out */
+	pg_atomic_uint64 logFlushResult;	/* last byte + 1 flushed */
 
 	/*
 	 * Latest initialized page in the cache (last byte position + 1).
@@ -619,8 +611,8 @@ static XLogwrtResult LogwrtResult = {0, 0};
  */
 #define XLogUpdateLocalLogwrtResult(_target) \
 	do { \
-		_target.Write = XLogCtl->logWriteResult; \
-		_target.Flush = XLogCtl->logFlushResult; \
+		_target.Write = pg_atomic_read_membarrier_u64(&XLogCtl->logWriteResult); \
+		_target.Flush = pg_atomic_read_membarrier_u64(&XLogCtl->logFlushResult); \
 	} while (0)
 
 /*
@@ -1989,12 +1981,12 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
 			if (opportunistic)
 				break;
 
-			/* Before waiting, get info_lck and update LogwrtResult */
+			/* Advance shared memory write request position */
 			SpinLockAcquire(&XLogCtl->info_lck);
 			if (XLogCtl->LogwrtRqst.Write < OldPageRqstPtr)
 				XLogCtl->LogwrtRqst.Write = OldPageRqstPtr;
-			XLogUpdateLocalLogwrtResult(LogwrtResult);
 			SpinLockRelease(&XLogCtl->info_lck);
+			XLogUpdateLocalLogwrtResult(LogwrtResult);
 
 			/*
 			 * Now that we have an up-to-date LogwrtResult value, see if we
@@ -2558,13 +2550,26 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible)
 	 */
 	{
 		SpinLockAcquire(&XLogCtl->info_lck);
-		XLogCtl->logWriteResult = LogwrtResult.Write;
-		XLogCtl->logFlushResult = LogwrtResult.Flush;
 		if (XLogCtl->LogwrtRqst.Write < LogwrtResult.Write)
 			XLogCtl->LogwrtRqst.Write = LogwrtResult.Write;
 		if (XLogCtl->LogwrtRqst.Flush < LogwrtResult.Flush)
 			XLogCtl->LogwrtRqst.Flush = LogwrtResult.Flush;
 		SpinLockRelease(&XLogCtl->info_lck);
+
+		pg_atomic_monotonic_advance_u64(&XLogCtl->logWriteResult,
+										LogwrtResult.Write);
+		pg_atomic_monotonic_advance_u64(&XLogCtl->logFlushResult,
+										LogwrtResult.Flush);
+
+#ifdef USE_ASSERT_CHECKING
+		{
+			XLogRecPtr	Write = pg_atomic_read_membarrier_u64(&XLogCtl->logWriteResult);
+			XLogRecPtr	Flush = pg_atomic_read_membarrier_u64(&XLogCtl->logFlushResult);
+
+			/* WAL flushed to disk is always ahead of WAL written */
+			Assert(Write >= Flush);
+		}
+#endif
 	}
 }
 
@@ -2582,12 +2587,12 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN)
 	XLogRecPtr	prevAsyncXactLSN;
 
 	SpinLockAcquire(&XLogCtl->info_lck);
-	XLogUpdateLocalLogwrtResult(LogwrtResult);
 	sleeping = XLogCtl->WalWriterSleeping;
 	prevAsyncXactLSN = XLogCtl->asyncXactLSN;
 	if (XLogCtl->asyncXactLSN < asyncXactLSN)
 		XLogCtl->asyncXactLSN = asyncXactLSN;
 	SpinLockRelease(&XLogCtl->info_lck);
+	XLogUpdateLocalLogwrtResult(LogwrtResult);
 
 	/*
 	 * If somebody else already called this function with a more aggressive
@@ -2794,8 +2799,8 @@ XLogFlush(XLogRecPtr record)
 		SpinLockAcquire(&XLogCtl->info_lck);
 		if (WriteRqstPtr < XLogCtl->LogwrtRqst.Write)
 			WriteRqstPtr = XLogCtl->LogwrtRqst.Write;
-		XLogUpdateLocalLogwrtResult(LogwrtResult);
 		SpinLockRelease(&XLogCtl->info_lck);
+		XLogUpdateLocalLogwrtResult(LogwrtResult);
 
 		/* done already? */
 		if (record <= LogwrtResult.Flush)
@@ -2949,9 +2954,9 @@ XLogBackgroundFlush(void)
 
 	/* read LogwrtResult and update local state */
 	SpinLockAcquire(&XLogCtl->info_lck);
-	XLogUpdateLocalLogwrtResult(LogwrtResult);
 	WriteRqst = XLogCtl->LogwrtRqst;
 	SpinLockRelease(&XLogCtl->info_lck);
+	XLogUpdateLocalLogwrtResult(LogwrtResult);
 
 	/* back off to last completed page boundary */
 	WriteRqst.Write -= WriteRqst.Write % XLOG_BLCKSZ;
@@ -3125,9 +3130,7 @@ XLogNeedsFlush(XLogRecPtr record)
 		return false;
 
 	/* read LogwrtResult and update local state */
-	SpinLockAcquire(&XLogCtl->info_lck);
 	XLogUpdateLocalLogwrtResult(LogwrtResult);
-	SpinLockRelease(&XLogCtl->info_lck);
 
 	/* check again */
 	if (record <= LogwrtResult.Flush)
@@ -4938,6 +4941,9 @@ XLOGShmemInit(void)
 	XLogCtl->InstallXLogFileSegmentActive = false;
 	XLogCtl->WalWriterSleeping = false;
 
+	pg_atomic_init_u64(&XLogCtl->logWriteResult, InvalidXLogRecPtr);
+	pg_atomic_init_u64(&XLogCtl->logFlushResult, InvalidXLogRecPtr);
+
 	SpinLockInit(&XLogCtl->Insert.insertpos_lck);
 	SpinLockInit(&XLogCtl->info_lck);
 	pg_atomic_init_u64(&XLogCtl->unloggedLSN, InvalidXLogRecPtr);
@@ -5961,10 +5967,14 @@ StartupXLOG(void)
 		XLogCtl->InitializedUpTo = EndOfLog;
 	}
 
+	/*
+	 * Update local and shared status.  This is OK to do without any locks
+	 * because no other process can be reading or writing WAL yet.
+	 */
 	LogwrtResult.Write = LogwrtResult.Flush = EndOfLog;
 
-	XLogCtl->logWriteResult = LogwrtResult.Write;
-	XLogCtl->logFlushResult = LogwrtResult.Flush;
+	pg_atomic_write_u64(&XLogCtl->logWriteResult, EndOfLog);
+	pg_atomic_write_u64(&XLogCtl->logFlushResult, EndOfLog);
 
 	XLogCtl->LogwrtRqst.Write = EndOfLog;
 	XLogCtl->LogwrtRqst.Flush = EndOfLog;
@@ -6410,9 +6420,7 @@ GetFlushRecPtr(TimeLineID *insertTLI)
 {
 	Assert(XLogCtl->SharedRecoveryState == RECOVERY_STATE_DONE);
 
-	SpinLockAcquire(&XLogCtl->info_lck);
 	XLogUpdateLocalLogwrtResult(LogwrtResult);
-	SpinLockRelease(&XLogCtl->info_lck);
 
 	/*
 	 * If we're writing and flushing WAL, the time line can't be changing, so
@@ -9326,9 +9334,7 @@ GetXLogInsertRecPtr(void)
 XLogRecPtr
 GetXLogWriteRecPtr(void)
 {
-	SpinLockAcquire(&XLogCtl->info_lck);
 	XLogUpdateLocalLogwrtResult(LogwrtResult);
-	SpinLockRelease(&XLogCtl->info_lck);
 
 	return LogwrtResult.Write;
 }
diff --git a/src/include/port/atomics.h b/src/include/port/atomics.h
index ff47782cdb..e962b9f6d3 100644
--- a/src/include/port/atomics.h
+++ b/src/include/port/atomics.h
@@ -570,6 +570,39 @@ pg_atomic_sub_fetch_u64(volatile pg_atomic_uint64 *ptr, int64 sub_)
 	return pg_atomic_sub_fetch_u64_impl(ptr, sub_);
 }
 
+/*
+ * Monotonically advance the given variable using only atomic operations until
+ * it's at least the target value.
+ *
+ * Full barrier semantics (even when value is unchanged).
+ */
+static inline void
+pg_atomic_monotonic_advance_u64(volatile pg_atomic_uint64 *ptr, uint64 target_)
+{
+	uint64		currval;
+
+#ifndef PG_HAVE_ATOMIC_U64_SIMULATION
+	AssertPointerAlignment(ptr, 8);
+#endif
+
+	currval = pg_atomic_read_u64_impl(ptr);
+	if (currval >= target_)
+	{
+		pg_memory_barrier();
+		return;
+	}
+
+#ifndef PG_HAVE_ATOMIC_U64_SIMULATION
+	AssertPointerAlignment(&currval, 8);
+#endif
+
+	while (currval < target_)
+	{
+		if (pg_atomic_compare_exchange_u64_impl(ptr, &currval, target_))
+			break;
+	}
+}
+
 #undef INSIDE_ATOMICS_H
 
 #endif							/* ATOMICS_H */
-- 
2.39.2

>From 50bf1ce5dc97508ad74f18011ffc6391e9b3e290 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Wed, 3 Apr 2024 12:20:01 +0200
Subject: [PATCH v15 3/3] Add Copy pointer to track data copied to WAL buffers

---
 src/backend/access/transam/xlog.c | 37 +++++++++++++++++++++++++++++--
 1 file changed, 35 insertions(+), 2 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index e2cd0e8f52..52651be48b 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -469,6 +469,7 @@ typedef struct XLogCtlData
 	XLogRecPtr	lastSegSwitchLSN;
 
 	/* These are accessed using atomics -- info_lck not needed */
+	pg_atomic_uint64 logCopyResult; /* last byte + 1 copied to WAL buffers */
 	pg_atomic_uint64 logWriteResult;	/* last byte + 1 written out */
 	pg_atomic_uint64 logFlushResult;	/* last byte + 1 flushed */
 
@@ -1496,6 +1497,7 @@ static XLogRecPtr
 WaitXLogInsertionsToFinish(XLogRecPtr upto)
 {
 	uint64		bytepos;
+	XLogRecPtr	copyptr;
 	XLogRecPtr	reservedUpto;
 	XLogRecPtr	finishedUpto;
 	XLogCtlInsert *Insert = &XLogCtl->Insert;
@@ -1504,6 +1506,11 @@ WaitXLogInsertionsToFinish(XLogRecPtr upto)
 	if (MyProc == NULL)
 		elog(PANIC, "cannot wait without a PGPROC structure");
 
+	/* check if there's any work to do */
+	copyptr = pg_atomic_read_membarrier_u64(&XLogCtl->logCopyResult);
+	if (upto <= copyptr)
+		return copyptr;
+
 	/* Read the current insert position */
 	SpinLockAcquire(&Insert->insertpos_lck);
 	bytepos = Insert->CurrBytePos;
@@ -1583,6 +1590,9 @@ WaitXLogInsertionsToFinish(XLogRecPtr upto)
 		if (insertingat != InvalidXLogRecPtr && insertingat < finishedUpto)
 			finishedUpto = insertingat;
 	}
+
+	pg_atomic_monotonic_advance_u64(&XLogCtl->logCopyResult, finishedUpto);
+
 	return finishedUpto;
 }
 
@@ -1724,13 +1734,24 @@ WALReadFromBuffers(char *dstbuf, XLogRecPtr startptr, Size count,
 {
 	char	   *pdst = dstbuf;
 	XLogRecPtr	recptr = startptr;
+	XLogRecPtr	copyptr;
 	Size		nbytes = count;
 
 	if (RecoveryInProgress() || tli != GetWALInsertionTimeLine())
 		return 0;
 
 	Assert(!XLogRecPtrIsInvalid(startptr));
-	Assert(startptr + count <= LogwrtResult.Write);
+
+	/*
+	 * Caller should ensure that the requested data has been copied to WAL
+	 * buffers before we try to read it.
+	 */
+	copyptr = pg_atomic_read_membarrier_u64(&XLogCtl->logCopyResult);
+	if (startptr + count > copyptr)
+		ereport(ERROR,
+				errmsg("request to read past end of generated WAL; requested %X/%X, current position %X/%X",
+					   LSN_FORMAT_ARGS(startptr + count),
+					   LSN_FORMAT_ARGS(copyptr)));
 
 	/*
 	 * Loop through the buffers without a lock. For each buffer, atomically
@@ -2563,10 +2584,20 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible)
 
 #ifdef USE_ASSERT_CHECKING
 		{
+			XLogRecPtr	Copy = pg_atomic_read_membarrier_u64(&XLogCtl->logCopyResult);
 			XLogRecPtr	Write = pg_atomic_read_membarrier_u64(&XLogCtl->logWriteResult);
 			XLogRecPtr	Flush = pg_atomic_read_membarrier_u64(&XLogCtl->logFlushResult);
 
-			/* WAL flushed to disk is always ahead of WAL written */
+			/*
+			 * WaitXLogInsertionsToFinish must have been called before us, so
+			 * Copy cannot be invalid.
+			 */
+			Assert(!XLogRecPtrIsInvalid(Copy));
+
+			/* WAL copied to buffers is always ahead of WAL written */
+			Assert(Copy >= Write);
+
+			/* WAL written to disk is always ahead of WAL flushed */
 			Assert(Write >= Flush);
 		}
 #endif
@@ -4941,6 +4972,7 @@ XLOGShmemInit(void)
 	XLogCtl->InstallXLogFileSegmentActive = false;
 	XLogCtl->WalWriterSleeping = false;
 
+	pg_atomic_init_u64(&XLogCtl->logCopyResult, InvalidXLogRecPtr);
 	pg_atomic_init_u64(&XLogCtl->logWriteResult, InvalidXLogRecPtr);
 	pg_atomic_init_u64(&XLogCtl->logFlushResult, InvalidXLogRecPtr);
 
@@ -5973,6 +6005,7 @@ StartupXLOG(void)
 	 */
 	LogwrtResult.Write = LogwrtResult.Flush = EndOfLog;
 
+	pg_atomic_write_u64(&XLogCtl->logCopyResult, EndOfLog);
 	pg_atomic_write_u64(&XLogCtl->logWriteResult, EndOfLog);
 	pg_atomic_write_u64(&XLogCtl->logFlushResult, EndOfLog);
 
-- 
2.39.2

Reply via email to