I've noticed a few things here, v16 attached with some rather largish
changes.

1. Using pg_atomic_write_membarrier_u64 is useless and it imposes mora
barriers than we actually need.  So I switched back to
pg_atomic_write_u64 and add one barrier between the two writes.  Same
for reads.

2. Using monotonic_advance for Write and Flush is useless.  We can use a
simple atomic_write with a write barrier in between.  The reason is
that, as Andres said[1], we only write those with WALWriteLock held, so
it's not possible for them to move forward while we aren't looking.  All
callers of XLogWrite do RefreshXLogWriteResult() with the WALWriteLock
held.  Therefore we can just use pg_atomic_write_u64.  Consequently I
moved the addition of the monotonic advance function to the patch that
adds Copy.

3. Testing the invariant that the Copy pointer cannot be 0 is useless,
because we initialize that pointer to EndOfLog during StartupXLOG.
So, removed.

4. If we're not modifying any callers of WALReadFromBuffers(), then
AFAICS the added check that the request is not past the Copy pointer is
useless.  In a quick look at that code, I think we only try to read data
that's been flushed, not written, so the stricter check that we don't
read data that hasn't been Copied does nothing.  (Honestly I'm not sure
that we want XLogSendPhysical to be reading data that has not been
written, or do we?)  Please argue why we need this patch.

5. The existing weird useless-looking block at the end of XLogWrite is
there because we used to have it to declare a volatile pointer to
XLogCtl (cf.  commit 6ba4ecbf477e).  That's no longer needed, so we
could remove it.  Or we could leave it alone (just because it's ancient
and it doesn't hurt anything), but there's no reason to have the new
invariant-testing block inside the same block.  So I added another weird
useless-looking block, except that this one does have two variable
declaration at its top.

6. In a few places, we read both Write and Flush to only use one of
them.  This is wasteful and we could dial this back to reading only the
one we need.  Andres suggested as much in [1].  I didn't touch this in
the current patch, and I don't necessarily think we need to address it
right now.  Addressing this should probably done similar to what I
posted in [2]'s 0002.

[1] https://postgr.es/m/20210130023011.n545o54j65t4k...@alap3.anarazel.de
[2] https://postgr.es/m/202203221611.hqbjdinzsbu2@alvherre.pgsql

-- 
Álvaro Herrera               48°01'N 7°57'E  —  https://www.EnterpriseDB.com/
>From f791c7ee9815871a1843116febffc0077d29a9a3 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Wed, 3 Apr 2024 11:48:27 +0200
Subject: [PATCH v16 1/2] Make XLogCtl->log{Write,Flush}Result accessible with
 atomics

This removes the need to hold both the info_lck spinlock and
WALWriteLock to update them.
---
 src/backend/access/transam/xlog.c | 91 +++++++++++++++++--------------
 1 file changed, 49 insertions(+), 42 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index f26533ec01..9b68afd400 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -292,12 +292,7 @@ static bool doPageWrites;
  * LogwrtRqst indicates a byte position that we need to write and/or fsync
  * the log up to (all records before that point must be written or fsynced).
  * The positions already written/fsynced are maintained in logWriteResult
- * and logFlushResult.
- *
- * To read XLogCtl->logWriteResult or ->logFlushResult, you must hold either
- * info_lck or WALWriteLock.  To update them, you need to hold both locks.
- * The point of this arrangement is that the value can be examined by code
- * that already holds WALWriteLock without needing to grab info_lck as well.
+ * and logFlushResult using atomic access.
  * In addition to the shared variable, each backend has a private copy of
  * both in LogwrtResult, which is updated when convenient.
  *
@@ -473,12 +468,9 @@ typedef struct XLogCtlData
 	pg_time_t	lastSegSwitchTime;
 	XLogRecPtr	lastSegSwitchLSN;
 
-	/*
-	 * Protected by info_lck and WALWriteLock (you must hold either lock to
-	 * read it, but both to update)
-	 */
-	XLogRecPtr	logWriteResult; /* last byte + 1 written out */
-	XLogRecPtr	logFlushResult; /* last byte + 1 flushed */
+	/* These are accessed using atomics -- info_lck not needed */
+	pg_atomic_uint64 logWriteResult;	/* last byte + 1 written out */
+	pg_atomic_uint64 logFlushResult;	/* last byte + 1 flushed */
 
 	/*
 	 * Latest initialized page in the cache (last byte position + 1).
@@ -616,11 +608,15 @@ static XLogwrtResult LogwrtResult = {0, 0};
 
 /*
  * Update local copy of shared XLogCtl->log{Write,Flush}Result
+ *
+ * It's critical that Flush always trails Write, so the order of the reads is
+ * important, as is the barrier.
  */
 #define RefreshXLogWriteResult(_target) \
 	do { \
-		_target.Write = XLogCtl->logWriteResult; \
-		_target.Flush = XLogCtl->logFlushResult; \
+		_target.Flush = pg_atomic_read_u64(&XLogCtl->logFlushResult); \
+		pg_read_barrier(); \
+		_target.Write = pg_atomic_read_u64(&XLogCtl->logWriteResult); \
 	} while (0)
 
 /*
@@ -968,9 +964,8 @@ XLogInsertRecord(XLogRecData *rdata,
 		/* advance global request to include new block(s) */
 		if (XLogCtl->LogwrtRqst.Write < EndPos)
 			XLogCtl->LogwrtRqst.Write = EndPos;
-		/* update local result copy while I have the chance */
-		RefreshXLogWriteResult(LogwrtResult);
 		SpinLockRelease(&XLogCtl->info_lck);
+		RefreshXLogWriteResult(LogwrtResult);
 	}
 
 	/*
@@ -1989,17 +1984,17 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
 			if (opportunistic)
 				break;
 
-			/* Before waiting, get info_lck and update LogwrtResult */
+			/* Advance shared memory write request position */
 			SpinLockAcquire(&XLogCtl->info_lck);
 			if (XLogCtl->LogwrtRqst.Write < OldPageRqstPtr)
 				XLogCtl->LogwrtRqst.Write = OldPageRqstPtr;
-			RefreshXLogWriteResult(LogwrtResult);
 			SpinLockRelease(&XLogCtl->info_lck);
 
 			/*
-			 * Now that we have an up-to-date LogwrtResult value, see if we
-			 * still need to write it or if someone else already did.
+			 * Acquire an up-to-date LogwrtResult value and see if we still
+			 * need to write it or if someone else already did.
 			 */
+			RefreshXLogWriteResult(LogwrtResult);
 			if (LogwrtResult.Write < OldPageRqstPtr)
 			{
 				/*
@@ -2559,14 +2554,31 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible)
 	 */
 	{
 		SpinLockAcquire(&XLogCtl->info_lck);
-		XLogCtl->logWriteResult = LogwrtResult.Write;
-		XLogCtl->logFlushResult = LogwrtResult.Flush;
 		if (XLogCtl->LogwrtRqst.Write < LogwrtResult.Write)
 			XLogCtl->LogwrtRqst.Write = LogwrtResult.Write;
 		if (XLogCtl->LogwrtRqst.Flush < LogwrtResult.Flush)
 			XLogCtl->LogwrtRqst.Flush = LogwrtResult.Flush;
 		SpinLockRelease(&XLogCtl->info_lck);
+
+		pg_atomic_write_u64(&XLogCtl->logWriteResult, LogwrtResult.Write);
+		pg_write_barrier();
+		pg_atomic_write_u64(&XLogCtl->logFlushResult, LogwrtResult.Flush);
 	}
+
+#ifdef USE_ASSERT_CHECKING
+	{
+		/* See RefreshXLogWriteResult about ordering */
+		XLogRecPtr	Flush;
+		XLogRecPtr	Write;
+
+		Flush = pg_atomic_read_u64(&XLogCtl->logFlushResult);
+		pg_read_barrier();
+		Write = pg_atomic_read_u64(&XLogCtl->logWriteResult);
+
+		/* WAL flushed to disk is always ahead of WAL written */
+		Assert(Write >= Flush);
+	}
+#endif
 }
 
 /*
@@ -2583,7 +2595,6 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN)
 	XLogRecPtr	prevAsyncXactLSN;
 
 	SpinLockAcquire(&XLogCtl->info_lck);
-	RefreshXLogWriteResult(LogwrtResult);
 	sleeping = XLogCtl->WalWriterSleeping;
 	prevAsyncXactLSN = XLogCtl->asyncXactLSN;
 	if (XLogCtl->asyncXactLSN < asyncXactLSN)
@@ -2609,6 +2620,8 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN)
 	{
 		int			flushblocks;
 
+		RefreshXLogWriteResult(LogwrtResult);
+
 		flushblocks =
 			WriteRqstPtr / XLOG_BLCKSZ - LogwrtResult.Flush / XLOG_BLCKSZ;
 
@@ -2791,14 +2804,8 @@ XLogFlush(XLogRecPtr record)
 	{
 		XLogRecPtr	insertpos;
 
-		/* read LogwrtResult and update local state */
-		SpinLockAcquire(&XLogCtl->info_lck);
-		if (WriteRqstPtr < XLogCtl->LogwrtRqst.Write)
-			WriteRqstPtr = XLogCtl->LogwrtRqst.Write;
-		RefreshXLogWriteResult(LogwrtResult);
-		SpinLockRelease(&XLogCtl->info_lck);
-
 		/* done already? */
+		RefreshXLogWriteResult(LogwrtResult);
 		if (record <= LogwrtResult.Flush)
 			break;
 
@@ -2806,6 +2813,10 @@ XLogFlush(XLogRecPtr record)
 		 * Before actually performing the write, wait for all in-flight
 		 * insertions to the pages we're about to write to finish.
 		 */
+		SpinLockAcquire(&XLogCtl->info_lck);
+		if (WriteRqstPtr < XLogCtl->LogwrtRqst.Write)
+			WriteRqstPtr = XLogCtl->LogwrtRqst.Write;
+		SpinLockRelease(&XLogCtl->info_lck);
 		insertpos = WaitXLogInsertionsToFinish(WriteRqstPtr);
 
 		/*
@@ -2948,9 +2959,8 @@ XLogBackgroundFlush(void)
 	 */
 	insertTLI = XLogCtl->InsertTimeLineID;
 
-	/* read LogwrtResult and update local state */
+	/* read updated LogwrtRqst */
 	SpinLockAcquire(&XLogCtl->info_lck);
-	RefreshXLogWriteResult(LogwrtResult);
 	WriteRqst = XLogCtl->LogwrtRqst;
 	SpinLockRelease(&XLogCtl->info_lck);
 
@@ -2958,6 +2968,7 @@ XLogBackgroundFlush(void)
 	WriteRqst.Write -= WriteRqst.Write % XLOG_BLCKSZ;
 
 	/* if we have already flushed that far, consider async commit records */
+	RefreshXLogWriteResult(LogwrtResult);
 	if (WriteRqst.Write <= LogwrtResult.Flush)
 	{
 		SpinLockAcquire(&XLogCtl->info_lck);
@@ -3126,9 +3137,7 @@ XLogNeedsFlush(XLogRecPtr record)
 		return false;
 
 	/* read LogwrtResult and update local state */
-	SpinLockAcquire(&XLogCtl->info_lck);
 	RefreshXLogWriteResult(LogwrtResult);
-	SpinLockRelease(&XLogCtl->info_lck);
 
 	/* check again */
 	if (record <= LogwrtResult.Flush)
@@ -5962,11 +5971,13 @@ StartupXLOG(void)
 		XLogCtl->InitializedUpTo = EndOfLog;
 	}
 
+	/*
+	 * Update local and shared status.  This is OK to do without any locks
+	 * because no other process can be reading or writing WAL yet.
+	 */
 	LogwrtResult.Write = LogwrtResult.Flush = EndOfLog;
-
-	XLogCtl->logWriteResult = LogwrtResult.Write;
-	XLogCtl->logFlushResult = LogwrtResult.Flush;
-
+	pg_atomic_init_u64(&XLogCtl->logWriteResult, EndOfLog);
+	pg_atomic_init_u64(&XLogCtl->logFlushResult, EndOfLog);
 	XLogCtl->LogwrtRqst.Write = EndOfLog;
 	XLogCtl->LogwrtRqst.Flush = EndOfLog;
 
@@ -6411,9 +6422,7 @@ GetFlushRecPtr(TimeLineID *insertTLI)
 {
 	Assert(XLogCtl->SharedRecoveryState == RECOVERY_STATE_DONE);
 
-	SpinLockAcquire(&XLogCtl->info_lck);
 	RefreshXLogWriteResult(LogwrtResult);
-	SpinLockRelease(&XLogCtl->info_lck);
 
 	/*
 	 * If we're writing and flushing WAL, the time line can't be changing, so
@@ -9327,9 +9336,7 @@ GetXLogInsertRecPtr(void)
 XLogRecPtr
 GetXLogWriteRecPtr(void)
 {
-	SpinLockAcquire(&XLogCtl->info_lck);
 	RefreshXLogWriteResult(LogwrtResult);
-	SpinLockRelease(&XLogCtl->info_lck);
 
 	return LogwrtResult.Write;
 }
-- 
2.39.2

>From e99ce2263cbbe10556bdac07e18e5eadd8c3966e Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Thu, 4 Apr 2024 16:18:13 +0200
Subject: [PATCH v16 2/2] Add Copy pointer to track data copied to WAL buffers

---
 src/backend/access/transam/xlog.c | 30 +++++++++++++++++++++++++++-
 src/include/port/atomics.h        | 33 +++++++++++++++++++++++++++++++
 2 files changed, 62 insertions(+), 1 deletion(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 9b68afd400..4e918d3923 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -469,6 +469,7 @@ typedef struct XLogCtlData
 	XLogRecPtr	lastSegSwitchLSN;
 
 	/* These are accessed using atomics -- info_lck not needed */
+	pg_atomic_uint64 logCopyResult; /* last byte + 1 copied to WAL buffers */
 	pg_atomic_uint64 logWriteResult;	/* last byte + 1 written out */
 	pg_atomic_uint64 logFlushResult;	/* last byte + 1 flushed */
 
@@ -1499,6 +1500,7 @@ static XLogRecPtr
 WaitXLogInsertionsToFinish(XLogRecPtr upto)
 {
 	uint64		bytepos;
+	XLogRecPtr	copyptr;
 	XLogRecPtr	reservedUpto;
 	XLogRecPtr	finishedUpto;
 	XLogCtlInsert *Insert = &XLogCtl->Insert;
@@ -1507,6 +1509,11 @@ WaitXLogInsertionsToFinish(XLogRecPtr upto)
 	if (MyProc == NULL)
 		elog(PANIC, "cannot wait without a PGPROC structure");
 
+	/* check if there's any work to do */
+	copyptr = pg_atomic_read_u64(&XLogCtl->logCopyResult);
+	if (upto <= copyptr)
+		return copyptr;
+
 	/* Read the current insert position */
 	SpinLockAcquire(&Insert->insertpos_lck);
 	bytepos = Insert->CurrBytePos;
@@ -1586,6 +1593,9 @@ WaitXLogInsertionsToFinish(XLogRecPtr upto)
 		if (insertingat != InvalidXLogRecPtr && insertingat < finishedUpto)
 			finishedUpto = insertingat;
 	}
+
+	pg_atomic_monotonic_advance_u64(&XLogCtl->logCopyResult, finishedUpto);
+
 	return finishedUpto;
 }
 
@@ -1727,13 +1737,24 @@ WALReadFromBuffers(char *dstbuf, XLogRecPtr startptr, Size count,
 {
 	char	   *pdst = dstbuf;
 	XLogRecPtr	recptr = startptr;
+	XLogRecPtr	copyptr;
 	Size		nbytes = count;
 
 	if (RecoveryInProgress() || tli != GetWALInsertionTimeLine())
 		return 0;
 
 	Assert(!XLogRecPtrIsInvalid(startptr));
-	Assert(startptr + count <= LogwrtResult.Write);
+
+	/*
+	 * Caller should ensure that the requested data has been copied to WAL
+	 * buffers before we try to read it.
+	 */
+	copyptr = pg_atomic_read_u64(&XLogCtl->logCopyResult);
+	if (startptr + count > copyptr)
+		ereport(ERROR,
+				errmsg("request to read past end of generated WAL; requested %X/%X, current position %X/%X",
+					   LSN_FORMAT_ARGS(startptr + count),
+					   LSN_FORMAT_ARGS(copyptr)));
 
 	/*
 	 * Loop through the buffers without a lock. For each buffer, atomically
@@ -2570,13 +2591,19 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible)
 		/* See RefreshXLogWriteResult about ordering */
 		XLogRecPtr	Flush;
 		XLogRecPtr	Write;
+		XLogRecPtr	Copy;
 
 		Flush = pg_atomic_read_u64(&XLogCtl->logFlushResult);
 		pg_read_barrier();
 		Write = pg_atomic_read_u64(&XLogCtl->logWriteResult);
+		pg_read_barrier();
+		Copy = pg_atomic_read_u64(&XLogCtl->logCopyResult);
 
 		/* WAL flushed to disk is always ahead of WAL written */
 		Assert(Write >= Flush);
+
+		/* WAL copied to buffers is always ahead of WAL written */
+		Assert(Copy >= Write);
 	}
 #endif
 }
@@ -5976,6 +6003,7 @@ StartupXLOG(void)
 	 * because no other process can be reading or writing WAL yet.
 	 */
 	LogwrtResult.Write = LogwrtResult.Flush = EndOfLog;
+	pg_atomic_init_u64(&XLogCtl->logCopyResult, EndOfLog);
 	pg_atomic_init_u64(&XLogCtl->logWriteResult, EndOfLog);
 	pg_atomic_init_u64(&XLogCtl->logFlushResult, EndOfLog);
 	XLogCtl->LogwrtRqst.Write = EndOfLog;
diff --git a/src/include/port/atomics.h b/src/include/port/atomics.h
index ff47782cdb..e962b9f6d3 100644
--- a/src/include/port/atomics.h
+++ b/src/include/port/atomics.h
@@ -570,6 +570,39 @@ pg_atomic_sub_fetch_u64(volatile pg_atomic_uint64 *ptr, int64 sub_)
 	return pg_atomic_sub_fetch_u64_impl(ptr, sub_);
 }
 
+/*
+ * Monotonically advance the given variable using only atomic operations until
+ * it's at least the target value.
+ *
+ * Full barrier semantics (even when value is unchanged).
+ */
+static inline void
+pg_atomic_monotonic_advance_u64(volatile pg_atomic_uint64 *ptr, uint64 target_)
+{
+	uint64		currval;
+
+#ifndef PG_HAVE_ATOMIC_U64_SIMULATION
+	AssertPointerAlignment(ptr, 8);
+#endif
+
+	currval = pg_atomic_read_u64_impl(ptr);
+	if (currval >= target_)
+	{
+		pg_memory_barrier();
+		return;
+	}
+
+#ifndef PG_HAVE_ATOMIC_U64_SIMULATION
+	AssertPointerAlignment(&currval, 8);
+#endif
+
+	while (currval < target_)
+	{
+		if (pg_atomic_compare_exchange_u64_impl(ptr, &currval, target_))
+			break;
+	}
+}
+
 #undef INSIDE_ATOMICS_H
 
 #endif							/* ATOMICS_H */
-- 
2.39.2

Reply via email to