Pushed 0001.  Here's the patch that adds the Copy position one more
time, with the monotonic_advance function returning the value.

-- 
Álvaro Herrera         PostgreSQL Developer  —  https://www.EnterpriseDB.com/
>From 3f5c860576245b92701e7bfc517947c418c68510 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Fri, 5 Apr 2024 13:52:44 +0200
Subject: [PATCH v17] Add logCopyResult

Updated using monotonic increment
---
 src/backend/access/transam/xlog.c | 32 ++++++++++++++++++++++++++-
 src/include/port/atomics.h        | 36 +++++++++++++++++++++++++++++++
 2 files changed, 67 insertions(+), 1 deletion(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index b4499cda7b..a40c1fb79e 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -469,6 +469,7 @@ typedef struct XLogCtlData
 	XLogRecPtr	lastSegSwitchLSN;
 
 	/* These are accessed using atomics -- info_lck not needed */
+	pg_atomic_uint64 logCopyResult; /* last byte + 1 copied to WAL buffers */
 	pg_atomic_uint64 logWriteResult;	/* last byte + 1 written out */
 	pg_atomic_uint64 logFlushResult;	/* last byte + 1 flushed */
 
@@ -1499,6 +1500,7 @@ static XLogRecPtr
 WaitXLogInsertionsToFinish(XLogRecPtr upto)
 {
 	uint64		bytepos;
+	XLogRecPtr	copyptr;
 	XLogRecPtr	reservedUpto;
 	XLogRecPtr	finishedUpto;
 	XLogCtlInsert *Insert = &XLogCtl->Insert;
@@ -1507,6 +1509,11 @@ WaitXLogInsertionsToFinish(XLogRecPtr upto)
 	if (MyProc == NULL)
 		elog(PANIC, "cannot wait without a PGPROC structure");
 
+	/* check if there's any work to do */
+	copyptr = pg_atomic_read_u64(&XLogCtl->logCopyResult);
+	if (upto <= copyptr)
+		return copyptr;
+
 	/* Read the current insert position */
 	SpinLockAcquire(&Insert->insertpos_lck);
 	bytepos = Insert->CurrBytePos;
@@ -1586,6 +1593,10 @@ WaitXLogInsertionsToFinish(XLogRecPtr upto)
 		if (insertingat != InvalidXLogRecPtr && insertingat < finishedUpto)
 			finishedUpto = insertingat;
 	}
+
+	finishedUpto =
+		pg_atomic_monotonic_advance_u64(&XLogCtl->logCopyResult, finishedUpto);
+
 	return finishedUpto;
 }
 
@@ -1727,13 +1738,24 @@ WALReadFromBuffers(char *dstbuf, XLogRecPtr startptr, Size count,
 {
 	char	   *pdst = dstbuf;
 	XLogRecPtr	recptr = startptr;
+	XLogRecPtr	copyptr;
 	Size		nbytes = count;
 
 	if (RecoveryInProgress() || tli != GetWALInsertionTimeLine())
 		return 0;
 
 	Assert(!XLogRecPtrIsInvalid(startptr));
-	Assert(startptr + count <= LogwrtResult.Write);
+
+	/*
+	 * Caller should ensure that the requested data has been copied to WAL
+	 * buffers before we try to read it.
+	 */
+	copyptr = pg_atomic_read_u64(&XLogCtl->logCopyResult);
+	if (startptr + count > copyptr)
+		ereport(ERROR,
+				errmsg("request to read past end of generated WAL; requested %X/%X, current position %X/%X",
+					   LSN_FORMAT_ARGS(startptr + count),
+					   LSN_FORMAT_ARGS(copyptr)));
 
 	/*
 	 * Loop through the buffers without a lock. For each buffer, atomically
@@ -2571,13 +2593,19 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible)
 	{
 		XLogRecPtr	Flush;
 		XLogRecPtr	Write;
+		XLogRecPtr	Copy;
 
 		Flush = pg_atomic_read_u64(&XLogCtl->logFlushResult);
 		pg_read_barrier();
 		Write = pg_atomic_read_u64(&XLogCtl->logWriteResult);
+		pg_read_barrier();
+		Copy = pg_atomic_read_u64(&XLogCtl->logCopyResult);
 
 		/* WAL written to disk is always ahead of WAL flushed */
 		Assert(Write >= Flush);
+
+		/* WAL copied to buffers is always ahead of WAL written */
+		Assert(Copy >= Write);
 	}
 #endif
 }
@@ -4951,6 +4979,7 @@ XLOGShmemInit(void)
 
 	SpinLockInit(&XLogCtl->Insert.insertpos_lck);
 	SpinLockInit(&XLogCtl->info_lck);
+	pg_atomic_init_u64(&XLogCtl->logCopyResult, InvalidXLogRecPtr);
 	pg_atomic_init_u64(&XLogCtl->logWriteResult, InvalidXLogRecPtr);
 	pg_atomic_init_u64(&XLogCtl->logFlushResult, InvalidXLogRecPtr);
 	pg_atomic_init_u64(&XLogCtl->unloggedLSN, InvalidXLogRecPtr);
@@ -5979,6 +6008,7 @@ StartupXLOG(void)
 	 * because no other process can be reading or writing WAL yet.
 	 */
 	LogwrtResult.Write = LogwrtResult.Flush = EndOfLog;
+	pg_atomic_write_u64(&XLogCtl->logCopyResult, EndOfLog);
 	pg_atomic_write_u64(&XLogCtl->logWriteResult, EndOfLog);
 	pg_atomic_write_u64(&XLogCtl->logFlushResult, EndOfLog);
 	XLogCtl->LogwrtRqst.Write = EndOfLog;
diff --git a/src/include/port/atomics.h b/src/include/port/atomics.h
index ff47782cdb..78987f3154 100644
--- a/src/include/port/atomics.h
+++ b/src/include/port/atomics.h
@@ -570,6 +570,42 @@ pg_atomic_sub_fetch_u64(volatile pg_atomic_uint64 *ptr, int64 sub_)
 	return pg_atomic_sub_fetch_u64_impl(ptr, sub_);
 }
 
+/*
+ * Monotonically advance the given variable using only atomic operations until
+ * it's at least the target value.  Returns the latest value observed, which
+ * may or may not be the target value.
+ *
+ * Full barrier semantics (even when value is unchanged).
+ */
+static inline uint64
+pg_atomic_monotonic_advance_u64(volatile pg_atomic_uint64 *ptr, uint64 target_)
+{
+	uint64		currval;
+
+#ifndef PG_HAVE_ATOMIC_U64_SIMULATION
+	AssertPointerAlignment(ptr, 8);
+#endif
+
+	currval = pg_atomic_read_u64_impl(ptr);
+	if (currval >= target_)
+	{
+		pg_memory_barrier();
+		return currval;
+	}
+
+#ifndef PG_HAVE_ATOMIC_U64_SIMULATION
+	AssertPointerAlignment(&currval, 8);
+#endif
+
+	while (currval < target_)
+	{
+		if (pg_atomic_compare_exchange_u64_impl(ptr, &currval, target_))
+			break;
+	}
+
+	return Max(target_, currval);
+}
+
 #undef INSIDE_ATOMICS_H
 
 #endif							/* ATOMICS_H */
-- 
2.39.2

Reply via email to