Couldn't push: I tested with --disable-atomics --disable-spinlocks and the tests fail because the semaphore for the atomic variables is not always initialized. This is weird -- it's like a client process is running at a time when StartupXLOG has not initialized the variable ... so the initialization in the other place was not completely wrong. I'll investigate after lunch. Here's v16.
-- Álvaro Herrera PostgreSQL Developer — https://www.EnterpriseDB.com/ "XML!" Exclaimed C++. "What are you doing here? You're not a programming language." "Tell that to the people who use me," said XML. https://burningbird.net/the-parable-of-the-languages/
>From 2164778b74681910544a64ba6c2ae36e5204ed9e Mon Sep 17 00:00:00 2001 From: Alvaro Herrera <alvhe...@alvh.no-ip.org> Date: Fri, 5 Apr 2024 13:21:39 +0200 Subject: [PATCH v16 1/2] Make XLogCtl->log{Write,Flush}Result accessible with atomics This removes the need to hold both the info_lck spinlock and WALWriteLock to update them. We use stock atomic write instead, with WALWriteLock held. Readers can use atomic read, without any locking. This allows for some code to be reordered: some places were a bit contorted to avoid repeated spinlock acquisition, but that's no longer a concern, so we can turn them to more natural coding. Some further changes are likely possible with a positive performance impact, but in this commit I did rather minimal ones only, to avoid increasing the blast radius. Reviewed-by: Bharath Rupireddy <bharath.rupireddyforpostg...@gmail.com> Reviewed-by: Jeff Davis <pg...@j-davis.com> Reviewed-by: Andres Freund <and...@anarazel.de> (earlier versions) Discussion: https://postgr.es/m/20200831182156.GA3983@alvherre.pgsql --- src/backend/access/transam/xlog.c | 105 ++++++++++++++++-------------- 1 file changed, 57 insertions(+), 48 deletions(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 3bdd9a2ddd..748ed1f2ef 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -292,12 +292,7 @@ static bool doPageWrites; * LogwrtRqst indicates a byte position that we need to write and/or fsync * the log up to (all records before that point must be written or fsynced). * The positions already written/fsynced are maintained in logWriteResult - * and logFlushResult. - * - * To read XLogCtl->logWriteResult or ->logFlushResult, you must hold either - * info_lck or WALWriteLock. To update them, you need to hold both locks. - * The point of this arrangement is that the value can be examined by code - * that already holds WALWriteLock without needing to grab info_lck as well. + * and logFlushResult using atomic access. * In addition to the shared variable, each backend has a private copy of * both in LogwrtResult, which is updated when convenient. * @@ -473,12 +468,9 @@ typedef struct XLogCtlData pg_time_t lastSegSwitchTime; XLogRecPtr lastSegSwitchLSN; - /* - * Protected by info_lck and WALWriteLock (you must hold either lock to - * read it, but both to update) - */ - XLogRecPtr logWriteResult; /* last byte + 1 written out */ - XLogRecPtr logFlushResult; /* last byte + 1 flushed */ + /* These are accessed using atomics -- info_lck not needed */ + pg_atomic_uint64 logWriteResult; /* last byte + 1 written out */ + pg_atomic_uint64 logFlushResult; /* last byte + 1 flushed */ /* * Latest initialized page in the cache (last byte position + 1). @@ -616,11 +608,15 @@ static XLogwrtResult LogwrtResult = {0, 0}; /* * Update local copy of shared XLogCtl->log{Write,Flush}Result + * + * It's critical that Flush always trails Write, so the order of the reads is + * important, as is the barrier. See also XLogWrite. */ #define RefreshXLogWriteResult(_target) \ do { \ - _target.Write = XLogCtl->logWriteResult; \ - _target.Flush = XLogCtl->logFlushResult; \ + _target.Flush = pg_atomic_read_u64(&XLogCtl->logFlushResult); \ + pg_read_barrier(); \ + _target.Write = pg_atomic_read_u64(&XLogCtl->logWriteResult); \ } while (0) /* @@ -968,9 +964,8 @@ XLogInsertRecord(XLogRecData *rdata, /* advance global request to include new block(s) */ if (XLogCtl->LogwrtRqst.Write < EndPos) XLogCtl->LogwrtRqst.Write = EndPos; - /* update local result copy while I have the chance */ - RefreshXLogWriteResult(LogwrtResult); SpinLockRelease(&XLogCtl->info_lck); + RefreshXLogWriteResult(LogwrtResult); } /* @@ -1989,17 +1984,17 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic) if (opportunistic) break; - /* Before waiting, get info_lck and update LogwrtResult */ + /* Advance shared memory write request position */ SpinLockAcquire(&XLogCtl->info_lck); if (XLogCtl->LogwrtRqst.Write < OldPageRqstPtr) XLogCtl->LogwrtRqst.Write = OldPageRqstPtr; - RefreshXLogWriteResult(LogwrtResult); SpinLockRelease(&XLogCtl->info_lck); /* - * Now that we have an up-to-date LogwrtResult value, see if we - * still need to write it or if someone else already did. + * Acquire an up-to-date LogwrtResult value and see if we still + * need to write it or if someone else already did. */ + RefreshXLogWriteResult(LogwrtResult); if (LogwrtResult.Write < OldPageRqstPtr) { /* @@ -2556,16 +2551,35 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible) * 'result' values. This is not absolutely essential, but it saves some * code in a couple of places. */ + SpinLockAcquire(&XLogCtl->info_lck); + if (XLogCtl->LogwrtRqst.Write < LogwrtResult.Write) + XLogCtl->LogwrtRqst.Write = LogwrtResult.Write; + if (XLogCtl->LogwrtRqst.Flush < LogwrtResult.Flush) + XLogCtl->LogwrtRqst.Flush = LogwrtResult.Flush; + SpinLockRelease(&XLogCtl->info_lck); + + /* + * We write Write first, bar, then Flush. When reading, the opposite must + * be done (with a matching barrier in between), so that we always see a + * Flush value that trails behind the Write value seen. + */ + pg_atomic_write_u64(&XLogCtl->logWriteResult, LogwrtResult.Write); + pg_write_barrier(); + pg_atomic_write_u64(&XLogCtl->logFlushResult, LogwrtResult.Flush); + +#ifdef USE_ASSERT_CHECKING { - SpinLockAcquire(&XLogCtl->info_lck); - XLogCtl->logWriteResult = LogwrtResult.Write; - XLogCtl->logFlushResult = LogwrtResult.Flush; - if (XLogCtl->LogwrtRqst.Write < LogwrtResult.Write) - XLogCtl->LogwrtRqst.Write = LogwrtResult.Write; - if (XLogCtl->LogwrtRqst.Flush < LogwrtResult.Flush) - XLogCtl->LogwrtRqst.Flush = LogwrtResult.Flush; - SpinLockRelease(&XLogCtl->info_lck); + XLogRecPtr Flush; + XLogRecPtr Write; + + Flush = pg_atomic_read_u64(&XLogCtl->logFlushResult); + pg_read_barrier(); + Write = pg_atomic_read_u64(&XLogCtl->logWriteResult); + + /* WAL written to disk is always ahead of WAL flushed */ + Assert(Write >= Flush); } +#endif } /* @@ -2582,7 +2596,6 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN) XLogRecPtr prevAsyncXactLSN; SpinLockAcquire(&XLogCtl->info_lck); - RefreshXLogWriteResult(LogwrtResult); sleeping = XLogCtl->WalWriterSleeping; prevAsyncXactLSN = XLogCtl->asyncXactLSN; if (XLogCtl->asyncXactLSN < asyncXactLSN) @@ -2608,6 +2621,8 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN) { int flushblocks; + RefreshXLogWriteResult(LogwrtResult); + flushblocks = WriteRqstPtr / XLOG_BLCKSZ - LogwrtResult.Flush / XLOG_BLCKSZ; @@ -2790,14 +2805,8 @@ XLogFlush(XLogRecPtr record) { XLogRecPtr insertpos; - /* read LogwrtResult and update local state */ - SpinLockAcquire(&XLogCtl->info_lck); - if (WriteRqstPtr < XLogCtl->LogwrtRqst.Write) - WriteRqstPtr = XLogCtl->LogwrtRqst.Write; - RefreshXLogWriteResult(LogwrtResult); - SpinLockRelease(&XLogCtl->info_lck); - /* done already? */ + RefreshXLogWriteResult(LogwrtResult); if (record <= LogwrtResult.Flush) break; @@ -2805,6 +2814,10 @@ XLogFlush(XLogRecPtr record) * Before actually performing the write, wait for all in-flight * insertions to the pages we're about to write to finish. */ + SpinLockAcquire(&XLogCtl->info_lck); + if (WriteRqstPtr < XLogCtl->LogwrtRqst.Write) + WriteRqstPtr = XLogCtl->LogwrtRqst.Write; + SpinLockRelease(&XLogCtl->info_lck); insertpos = WaitXLogInsertionsToFinish(WriteRqstPtr); /* @@ -2947,9 +2960,8 @@ XLogBackgroundFlush(void) */ insertTLI = XLogCtl->InsertTimeLineID; - /* read LogwrtResult and update local state */ + /* read updated LogwrtRqst */ SpinLockAcquire(&XLogCtl->info_lck); - RefreshXLogWriteResult(LogwrtResult); WriteRqst = XLogCtl->LogwrtRqst; SpinLockRelease(&XLogCtl->info_lck); @@ -2957,6 +2969,7 @@ XLogBackgroundFlush(void) WriteRqst.Write -= WriteRqst.Write % XLOG_BLCKSZ; /* if we have already flushed that far, consider async commit records */ + RefreshXLogWriteResult(LogwrtResult); if (WriteRqst.Write <= LogwrtResult.Flush) { SpinLockAcquire(&XLogCtl->info_lck); @@ -3125,9 +3138,7 @@ XLogNeedsFlush(XLogRecPtr record) return false; /* read LogwrtResult and update local state */ - SpinLockAcquire(&XLogCtl->info_lck); RefreshXLogWriteResult(LogwrtResult); - SpinLockRelease(&XLogCtl->info_lck); /* check again */ if (record <= LogwrtResult.Flush) @@ -5961,11 +5972,13 @@ StartupXLOG(void) XLogCtl->InitializedUpTo = EndOfLog; } + /* + * Update local and shared status. This is OK to do without any locks + * because no other process can be reading or writing WAL yet. + */ LogwrtResult.Write = LogwrtResult.Flush = EndOfLog; - - XLogCtl->logWriteResult = LogwrtResult.Write; - XLogCtl->logFlushResult = LogwrtResult.Flush; - + pg_atomic_init_u64(&XLogCtl->logWriteResult, EndOfLog); + pg_atomic_init_u64(&XLogCtl->logFlushResult, EndOfLog); XLogCtl->LogwrtRqst.Write = EndOfLog; XLogCtl->LogwrtRqst.Flush = EndOfLog; @@ -6410,9 +6423,7 @@ GetFlushRecPtr(TimeLineID *insertTLI) { Assert(XLogCtl->SharedRecoveryState == RECOVERY_STATE_DONE); - SpinLockAcquire(&XLogCtl->info_lck); RefreshXLogWriteResult(LogwrtResult); - SpinLockRelease(&XLogCtl->info_lck); /* * If we're writing and flushing WAL, the time line can't be changing, so @@ -9326,9 +9337,7 @@ GetXLogInsertRecPtr(void) XLogRecPtr GetXLogWriteRecPtr(void) { - SpinLockAcquire(&XLogCtl->info_lck); RefreshXLogWriteResult(LogwrtResult); - SpinLockRelease(&XLogCtl->info_lck); return LogwrtResult.Write; } -- 2.39.2
>From 82a26b21c3597661bf8dc70cb1b6f14248972cc4 Mon Sep 17 00:00:00 2001 From: Alvaro Herrera <alvhe...@alvh.no-ip.org> Date: Fri, 5 Apr 2024 13:52:44 +0200 Subject: [PATCH v16 2/2] Add logCopyResult Updated using monotonic increment --- src/backend/access/transam/xlog.c | 31 +++++++++++++++++++++++++- src/include/port/atomics.h | 36 +++++++++++++++++++++++++++++++ 2 files changed, 66 insertions(+), 1 deletion(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 748ed1f2ef..036550f5d4 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -469,6 +469,7 @@ typedef struct XLogCtlData XLogRecPtr lastSegSwitchLSN; /* These are accessed using atomics -- info_lck not needed */ + pg_atomic_uint64 logCopyResult; /* last byte + 1 copied to WAL buffers */ pg_atomic_uint64 logWriteResult; /* last byte + 1 written out */ pg_atomic_uint64 logFlushResult; /* last byte + 1 flushed */ @@ -1499,6 +1500,7 @@ static XLogRecPtr WaitXLogInsertionsToFinish(XLogRecPtr upto) { uint64 bytepos; + XLogRecPtr copyptr; XLogRecPtr reservedUpto; XLogRecPtr finishedUpto; XLogCtlInsert *Insert = &XLogCtl->Insert; @@ -1507,6 +1509,11 @@ WaitXLogInsertionsToFinish(XLogRecPtr upto) if (MyProc == NULL) elog(PANIC, "cannot wait without a PGPROC structure"); + /* check if there's any work to do */ + copyptr = pg_atomic_read_u64(&XLogCtl->logCopyResult); + if (upto <= copyptr) + return copyptr; + /* Read the current insert position */ SpinLockAcquire(&Insert->insertpos_lck); bytepos = Insert->CurrBytePos; @@ -1586,6 +1593,10 @@ WaitXLogInsertionsToFinish(XLogRecPtr upto) if (insertingat != InvalidXLogRecPtr && insertingat < finishedUpto) finishedUpto = insertingat; } + + finishedUpto = + pg_atomic_monotonic_advance_u64(&XLogCtl->logCopyResult, finishedUpto); + return finishedUpto; } @@ -1727,13 +1738,24 @@ WALReadFromBuffers(char *dstbuf, XLogRecPtr startptr, Size count, { char *pdst = dstbuf; XLogRecPtr recptr = startptr; + XLogRecPtr copyptr; Size nbytes = count; if (RecoveryInProgress() || tli != GetWALInsertionTimeLine()) return 0; Assert(!XLogRecPtrIsInvalid(startptr)); - Assert(startptr + count <= LogwrtResult.Write); + + /* + * Caller should ensure that the requested data has been copied to WAL + * buffers before we try to read it. + */ + copyptr = pg_atomic_read_u64(&XLogCtl->logCopyResult); + if (startptr + count > copyptr) + ereport(ERROR, + errmsg("request to read past end of generated WAL; requested %X/%X, current position %X/%X", + LSN_FORMAT_ARGS(startptr + count), + LSN_FORMAT_ARGS(copyptr))); /* * Loop through the buffers without a lock. For each buffer, atomically @@ -2571,13 +2593,19 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible) { XLogRecPtr Flush; XLogRecPtr Write; + XLogRecPtr Copy; Flush = pg_atomic_read_u64(&XLogCtl->logFlushResult); pg_read_barrier(); Write = pg_atomic_read_u64(&XLogCtl->logWriteResult); + pg_read_barrier(); + Copy = pg_atomic_read_u64(&XLogCtl->logCopyResult); /* WAL written to disk is always ahead of WAL flushed */ Assert(Write >= Flush); + + /* WAL copied to buffers is always ahead of WAL written */ + Assert(Copy >= Write); } #endif } @@ -5977,6 +6005,7 @@ StartupXLOG(void) * because no other process can be reading or writing WAL yet. */ LogwrtResult.Write = LogwrtResult.Flush = EndOfLog; + pg_atomic_init_u64(&XLogCtl->logCopyResult, EndOfLog); pg_atomic_init_u64(&XLogCtl->logWriteResult, EndOfLog); pg_atomic_init_u64(&XLogCtl->logFlushResult, EndOfLog); XLogCtl->LogwrtRqst.Write = EndOfLog; diff --git a/src/include/port/atomics.h b/src/include/port/atomics.h index ff47782cdb..78987f3154 100644 --- a/src/include/port/atomics.h +++ b/src/include/port/atomics.h @@ -570,6 +570,42 @@ pg_atomic_sub_fetch_u64(volatile pg_atomic_uint64 *ptr, int64 sub_) return pg_atomic_sub_fetch_u64_impl(ptr, sub_); } +/* + * Monotonically advance the given variable using only atomic operations until + * it's at least the target value. Returns the latest value observed, which + * may or may not be the target value. + * + * Full barrier semantics (even when value is unchanged). + */ +static inline uint64 +pg_atomic_monotonic_advance_u64(volatile pg_atomic_uint64 *ptr, uint64 target_) +{ + uint64 currval; + +#ifndef PG_HAVE_ATOMIC_U64_SIMULATION + AssertPointerAlignment(ptr, 8); +#endif + + currval = pg_atomic_read_u64_impl(ptr); + if (currval >= target_) + { + pg_memory_barrier(); + return currval; + } + +#ifndef PG_HAVE_ATOMIC_U64_SIMULATION + AssertPointerAlignment(&currval, 8); +#endif + + while (currval < target_) + { + if (pg_atomic_compare_exchange_u64_impl(ptr, &currval, target_)) + break; + } + + return Max(target_, currval); +} + #undef INSIDE_ATOMICS_H #endif /* ATOMICS_H */ -- 2.39.2