On 2022-Mar-21, Andres Freund wrote: > This patch currently doesn't apply: http://cfbot.cputube.org/patch_37_2716.log
Updated. > Are you aiming this for v15? Otherwise I'd like to move the entry to the next > CF. Marked as waiting-on-author. I'd like to get 0001 pushed to pg15, yes. I'll let 0002 sit here for discussion, but I haven't seen any evidence that we need it. If others vouch for it, I can push that one too, but I'd rather have it be a separate thing. -- Álvaro Herrera 48°01'N 7°57'E — https://www.EnterpriseDB.com/ "El miedo atento y previsor es la madre de la seguridad" (E. Burke)
>From 4d5a8d915b497b79bbe62e18352f7b9d8c1b9bca Mon Sep 17 00:00:00 2001 From: Alvaro Herrera <alvhe...@alvh.no-ip.org> Date: Tue, 2 Feb 2021 14:03:43 -0300 Subject: [PATCH v6 1/2] Change XLogCtl->LogwrtResult to use atomic ops MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Currently, access to LogwrtResult is protected by a spinlock. This becomes severely contended in some scenarios, such as with a largish replication flock: walsenders all calling GetFlushRecPtr repeatedly cause the processor heat up to the point where eggs can be fried on top. This can be reduced to a non-problem by replacing XLogCtl->LogwrtResult with a struct containing a pair of atomically accessed variables. In addition, this commit splits the process-local copy of these values (kept in the freestanding LogwrtResult struct) into two separate LogWriteResult and LogFlushResult. This is not strictly necessary, but it makes it clearer that these are updated separately, each on their own schedule. Author: Álvaro Herrera <alvhe...@alvh.no-ip.org> Discussion: https://postgr.es/m/20200831182156.GA3983@alvherre.pgsql --- src/backend/access/transam/xlog.c | 195 +++++++++++++++--------------- src/include/port/atomics.h | 29 +++++ src/tools/pgindent/typedefs.list | 1 + 3 files changed, 125 insertions(+), 100 deletions(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 4ac3871c74..311a8a1192 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -285,16 +285,14 @@ static bool doPageWrites; * * LogwrtRqst indicates a byte position that we need to write and/or fsync * the log up to (all records before that point must be written or fsynced). - * LogwrtResult indicates the byte positions we have already written/fsynced. - * These structs are identical but are declared separately to indicate their - * slightly different functions. + * LogWrtResult indicates the byte positions we have already written/fsynced. + * These structs are similar but are declared separately to indicate their + * slightly different functions; in addition, the latter is read and written + * using atomic operations. * - * To read XLogCtl->LogwrtResult, you must hold either info_lck or - * WALWriteLock. To update it, you need to hold both locks. The point of - * this arrangement is that the value can be examined by code that already - * holds WALWriteLock without needing to grab info_lck as well. In addition - * to the shared variable, each backend has a private copy of LogwrtResult, - * which is updated when convenient. + * In addition to the shared variable, each backend has a private copy of + * each member of LogwrtResult (LogWriteResult and LogFlushResult), each of + * which is separately updated when convenient. * * The request bookkeeping is simpler: there is a shared XLogCtl->LogwrtRqst * (protected by info_lck), but we don't need to cache any copies of it. @@ -317,18 +315,18 @@ static bool doPageWrites; *---------- */ +typedef struct XLogwrtAtomic +{ + pg_atomic_uint64 Write; /* last byte + 1 of write position */ + pg_atomic_uint64 Flush; /* last byte + 1 of flush position */ +} XLogwrtAtomic; + typedef struct XLogwrtRqst { XLogRecPtr Write; /* last byte + 1 to write out */ XLogRecPtr Flush; /* last byte + 1 to flush */ } XLogwrtRqst; -typedef struct XLogwrtResult -{ - XLogRecPtr Write; /* last byte + 1 written out */ - XLogRecPtr Flush; /* last byte + 1 flushed */ -} XLogwrtResult; - /* * Inserting to WAL is protected by a small fixed number of WAL insertion * locks. To insert to the WAL, you must hold one of the locks - it doesn't @@ -480,6 +478,7 @@ typedef struct XLogCtlData { XLogCtlInsert Insert; + XLogwrtAtomic LogwrtResult; /* uses atomics */ /* Protected by info_lck: */ XLogwrtRqst LogwrtRqst; XLogRecPtr RedoRecPtr; /* a recent copy of Insert->RedoRecPtr */ @@ -497,12 +496,6 @@ typedef struct XLogCtlData pg_time_t lastSegSwitchTime; XLogRecPtr lastSegSwitchLSN; - /* - * Protected by info_lck and WALWriteLock (you must hold either lock to - * read it, but both to update) - */ - XLogwrtResult LogwrtResult; - /* * Latest initialized page in the cache (last byte position + 1). * @@ -622,10 +615,11 @@ static ControlFileData *ControlFile = NULL; static int UsableBytesInSegment; /* - * Private, possibly out-of-date copy of shared LogwrtResult. + * Private, possibly out-of-date copy of shared XLogCtl->LogwrtResult. * See discussion above. */ -static XLogwrtResult LogwrtResult = {0, 0}; +static XLogRecPtr LogWriteResult = 0; +static XLogRecPtr LogFlushResult = 0; /* * openLogFile is -1 or a kernel FD for an open log file segment. @@ -932,8 +926,6 @@ XLogInsertRecord(XLogRecData *rdata, /* advance global request to include new block(s) */ if (XLogCtl->LogwrtRqst.Write < EndPos) XLogCtl->LogwrtRqst.Write = EndPos; - /* update local result copy while I have the chance */ - LogwrtResult = XLogCtl->LogwrtResult; SpinLockRelease(&XLogCtl->info_lck); } @@ -1811,6 +1803,7 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic) * Now that we have the lock, check if someone initialized the page * already. */ + LogWriteResult = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write); while (upto >= XLogCtl->InitializedUpTo || opportunistic) { nextidx = XLogRecPtrToBufIdx(XLogCtl->InitializedUpTo); @@ -1821,7 +1814,7 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic) * already written out. */ OldPageRqstPtr = XLogCtl->xlblocks[nextidx]; - if (LogwrtResult.Write < OldPageRqstPtr) + if (LogWriteResult < OldPageRqstPtr) { /* * Nope, got work to do. If we just want to pre-initialize as much @@ -1830,18 +1823,18 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic) if (opportunistic) break; - /* Before waiting, get info_lck and update LogwrtResult */ + /* Advance shared memory write request position */ SpinLockAcquire(&XLogCtl->info_lck); if (XLogCtl->LogwrtRqst.Write < OldPageRqstPtr) XLogCtl->LogwrtRqst.Write = OldPageRqstPtr; - LogwrtResult = XLogCtl->LogwrtResult; SpinLockRelease(&XLogCtl->info_lck); /* - * Now that we have an up-to-date LogwrtResult value, see if we - * still need to write it or if someone else already did. + * Before waiting, update LogWriteResult and see if we still need + * to write it or if someone else already did. */ - if (LogwrtResult.Write < OldPageRqstPtr) + LogWriteResult = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write); + if (LogWriteResult < OldPageRqstPtr) { /* * Must acquire write lock. Release WALBufMappingLock first, @@ -1855,8 +1848,8 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic) LWLockAcquire(WALWriteLock, LW_EXCLUSIVE); - LogwrtResult = XLogCtl->LogwrtResult; - if (LogwrtResult.Write >= OldPageRqstPtr) + LogWriteResult = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write); + if (LogWriteResult >= OldPageRqstPtr) { /* OK, someone wrote it already */ LWLockRelease(WALWriteLock); @@ -2101,7 +2094,7 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible) /* * Update local LogwrtResult (caller probably did this already, but...) */ - LogwrtResult = XLogCtl->LogwrtResult; + LogWriteResult = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write); /* * Since successive pages in the xlog cache are consecutively allocated, @@ -2121,9 +2114,9 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible) * consider writing. Begin at the buffer containing the next unwritten * page, or last partially written page. */ - curridx = XLogRecPtrToBufIdx(LogwrtResult.Write); + curridx = XLogRecPtrToBufIdx(LogWriteResult); - while (LogwrtResult.Write < WriteRqst.Write) + while (LogWriteResult < WriteRqst.Write) { /* * Make sure we're not ahead of the insert process. This could happen @@ -2132,16 +2125,16 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible) */ XLogRecPtr EndPtr = XLogCtl->xlblocks[curridx]; - if (LogwrtResult.Write >= EndPtr) + if (LogWriteResult >= EndPtr) elog(PANIC, "xlog write request %X/%X is past end of log %X/%X", - LSN_FORMAT_ARGS(LogwrtResult.Write), + LSN_FORMAT_ARGS(LogWriteResult), LSN_FORMAT_ARGS(EndPtr)); - /* Advance LogwrtResult.Write to end of current buffer page */ - LogwrtResult.Write = EndPtr; - ispartialpage = WriteRqst.Write < LogwrtResult.Write; + /* Advance LogWriteResult to end of current buffer page */ + LogWriteResult = EndPtr; + ispartialpage = WriteRqst.Write < LogWriteResult; - if (!XLByteInPrevSeg(LogwrtResult.Write, openLogSegNo, + if (!XLByteInPrevSeg(LogWriteResult, openLogSegNo, wal_segment_size)) { /* @@ -2151,7 +2144,7 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible) Assert(npages == 0); if (openLogFile >= 0) XLogFileClose(); - XLByteToPrevSeg(LogwrtResult.Write, openLogSegNo, + XLByteToPrevSeg(LogWriteResult, openLogSegNo, wal_segment_size); openLogTLI = tli; @@ -2163,7 +2156,7 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible) /* Make sure we have the current logfile open */ if (openLogFile < 0) { - XLByteToPrevSeg(LogwrtResult.Write, openLogSegNo, + XLByteToPrevSeg(LogWriteResult, openLogSegNo, wal_segment_size); openLogTLI = tli; openLogFile = XLogFileOpen(openLogSegNo, tli); @@ -2175,7 +2168,7 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible) { /* first of group */ startidx = curridx; - startoffset = XLogSegmentOffset(LogwrtResult.Write - XLOG_BLCKSZ, + startoffset = XLogSegmentOffset(LogWriteResult - XLOG_BLCKSZ, wal_segment_size); } npages++; @@ -2186,7 +2179,7 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible) * contiguous in memory), or if we are at the end of the logfile * segment. */ - last_iteration = WriteRqst.Write <= LogwrtResult.Write; + last_iteration = WriteRqst.Write <= LogWriteResult; finishing_seg = !ispartialpage && (startoffset + npages * XLOG_BLCKSZ) >= wal_segment_size; @@ -2277,13 +2270,13 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible) /* signal that we need to wakeup walsenders later */ WalSndWakeupRequest(); - LogwrtResult.Flush = LogwrtResult.Write; /* end of page */ + LogFlushResult = LogWriteResult; /* end of page */ if (XLogArchivingActive()) XLogArchiveNotifySeg(openLogSegNo, tli); XLogCtl->lastSegSwitchTime = (pg_time_t) time(NULL); - XLogCtl->lastSegSwitchLSN = LogwrtResult.Flush; + XLogCtl->lastSegSwitchLSN = LogFlushResult; /* * Request a checkpoint if we've consumed too much xlog since @@ -2304,7 +2297,7 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible) if (ispartialpage) { /* Only asked to write a partial page */ - LogwrtResult.Write = WriteRqst.Write; + LogWriteResult = WriteRqst.Write; break; } curridx = NextBufIdx(curridx); @@ -2316,11 +2309,14 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible) Assert(npages == 0); + /* Publish current write result position */ + pg_atomic_monotonic_advance_u64(&XLogCtl->LogwrtResult.Write, LogWriteResult); + /* * If asked to flush, do so */ - if (LogwrtResult.Flush < WriteRqst.Flush && - LogwrtResult.Flush < LogwrtResult.Write) + if (LogFlushResult < WriteRqst.Flush && + LogFlushResult < LogWriteResult) { /* * Could get here without iterating above loop, in which case we might @@ -2331,12 +2327,12 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible) sync_method != SYNC_METHOD_OPEN_DSYNC) { if (openLogFile >= 0 && - !XLByteInPrevSeg(LogwrtResult.Write, openLogSegNo, + !XLByteInPrevSeg(LogWriteResult, openLogSegNo, wal_segment_size)) XLogFileClose(); if (openLogFile < 0) { - XLByteToPrevSeg(LogwrtResult.Write, openLogSegNo, + XLByteToPrevSeg(LogWriteResult, openLogSegNo, wal_segment_size); openLogTLI = tli; openLogFile = XLogFileOpen(openLogSegNo, tli); @@ -2349,23 +2345,23 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible) /* signal that we need to wakeup walsenders later */ WalSndWakeupRequest(); - LogwrtResult.Flush = LogwrtResult.Write; + LogFlushResult = LogWriteResult; } + /* Publish current flush result position */ + pg_atomic_monotonic_advance_u64(&XLogCtl->LogwrtResult.Flush, LogFlushResult); + /* - * Update shared-memory status - * - * We make sure that the shared 'request' values do not fall behind the + * Make sure that the shared 'request' values do not fall behind the * 'result' values. This is not absolutely essential, but it saves some * code in a couple of places. */ { SpinLockAcquire(&XLogCtl->info_lck); - XLogCtl->LogwrtResult = LogwrtResult; - if (XLogCtl->LogwrtRqst.Write < LogwrtResult.Write) - XLogCtl->LogwrtRqst.Write = LogwrtResult.Write; - if (XLogCtl->LogwrtRqst.Flush < LogwrtResult.Flush) - XLogCtl->LogwrtRqst.Flush = LogwrtResult.Flush; + if (XLogCtl->LogwrtRqst.Write < LogWriteResult) + XLogCtl->LogwrtRqst.Write = LogWriteResult; + if (XLogCtl->LogwrtRqst.Flush < LogFlushResult) + XLogCtl->LogwrtRqst.Flush = LogFlushResult; SpinLockRelease(&XLogCtl->info_lck); } } @@ -2381,8 +2377,8 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN) XLogRecPtr WriteRqstPtr = asyncXactLSN; bool sleeping; + LogFlushResult = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush); SpinLockAcquire(&XLogCtl->info_lck); - LogwrtResult = XLogCtl->LogwrtResult; sleeping = XLogCtl->WalWriterSleeping; if (XLogCtl->asyncXactLSN < asyncXactLSN) XLogCtl->asyncXactLSN = asyncXactLSN; @@ -2399,7 +2395,7 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN) WriteRqstPtr -= WriteRqstPtr % XLOG_BLCKSZ; /* if we have already flushed that far, we're done */ - if (WriteRqstPtr <= LogwrtResult.Flush) + if (WriteRqstPtr <= LogFlushResult) return; } @@ -2551,15 +2547,15 @@ XLogFlush(XLogRecPtr record) } /* Quick exit if already known flushed */ - if (record <= LogwrtResult.Flush) + if (record <= LogFlushResult) return; #ifdef WAL_DEBUG if (XLOG_DEBUG) elog(LOG, "xlog flush request %X/%X; write %X/%X; flush %X/%X", LSN_FORMAT_ARGS(record), - LSN_FORMAT_ARGS(LogwrtResult.Write), - LSN_FORMAT_ARGS(LogwrtResult.Flush)); + LSN_FORMAT_ARGS(LogWriteResult), + LSN_FORMAT_ARGS(LogFlushResult)); #endif START_CRIT_SECTION(); @@ -2568,8 +2564,8 @@ XLogFlush(XLogRecPtr record) * Since fsync is usually a horribly expensive operation, we try to * piggyback as much data as we can on each fsync: if we see any more data * entered into the xlog buffer, we'll write and fsync that too, so that - * the final value of LogwrtResult.Flush is as large as possible. This - * gives us some chance of avoiding another fsync immediately after. + * the final value of LogFlushResult is as large as possible. This gives + * us some chance of avoiding another fsync immediately after. */ /* initialize to given target; may increase below */ @@ -2587,11 +2583,11 @@ XLogFlush(XLogRecPtr record) SpinLockAcquire(&XLogCtl->info_lck); if (WriteRqstPtr < XLogCtl->LogwrtRqst.Write) WriteRqstPtr = XLogCtl->LogwrtRqst.Write; - LogwrtResult = XLogCtl->LogwrtResult; SpinLockRelease(&XLogCtl->info_lck); /* done already? */ - if (record <= LogwrtResult.Flush) + LogFlushResult = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush); + if (record <= LogFlushResult) break; /* @@ -2618,8 +2614,8 @@ XLogFlush(XLogRecPtr record) } /* Got the lock; recheck whether request is satisfied */ - LogwrtResult = XLogCtl->LogwrtResult; - if (record <= LogwrtResult.Flush) + LogFlushResult = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush); + if (record <= LogFlushResult) { LWLockRelease(WALWriteLock); break; @@ -2689,11 +2685,11 @@ XLogFlush(XLogRecPtr record) * calls from bufmgr.c are not within critical sections and so we will not * force a restart for a bad LSN on a data page. */ - if (LogwrtResult.Flush < record) + if (LogFlushResult < record) elog(ERROR, "xlog flush request %X/%X is not satisfied --- flushed only to %X/%X", LSN_FORMAT_ARGS(record), - LSN_FORMAT_ARGS(LogwrtResult.Flush)); + LSN_FORMAT_ARGS(LogFlushResult)); } /* @@ -2742,7 +2738,6 @@ XLogBackgroundFlush(void) /* read LogwrtResult and update local state */ SpinLockAcquire(&XLogCtl->info_lck); - LogwrtResult = XLogCtl->LogwrtResult; WriteRqst = XLogCtl->LogwrtRqst; SpinLockRelease(&XLogCtl->info_lck); @@ -2750,8 +2745,10 @@ XLogBackgroundFlush(void) WriteRqst.Write -= WriteRqst.Write % XLOG_BLCKSZ; /* if we have already flushed that far, consider async commit records */ - if (WriteRqst.Write <= LogwrtResult.Flush) + LogFlushResult = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush); + if (WriteRqst.Write <= LogFlushResult) { + pg_memory_barrier(); SpinLockAcquire(&XLogCtl->info_lck); WriteRqst.Write = XLogCtl->asyncXactLSN; SpinLockRelease(&XLogCtl->info_lck); @@ -2763,11 +2760,12 @@ XLogBackgroundFlush(void) * holding an open file handle to a logfile that's no longer in use, * preventing the file from being deleted. */ - if (WriteRqst.Write <= LogwrtResult.Flush) + if (WriteRqst.Write <= LogFlushResult) { if (openLogFile >= 0) { - if (!XLByteInPrevSeg(LogwrtResult.Write, openLogSegNo, + LogWriteResult = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write); + if (!XLByteInPrevSeg(LogWriteResult, openLogSegNo, wal_segment_size)) { XLogFileClose(); @@ -2782,7 +2780,7 @@ XLogBackgroundFlush(void) */ now = GetCurrentTimestamp(); flushbytes = - WriteRqst.Write / XLOG_BLCKSZ - LogwrtResult.Flush / XLOG_BLCKSZ; + WriteRqst.Write / XLOG_BLCKSZ - LogFlushResult / XLOG_BLCKSZ; if (WalWriterFlushAfter == 0 || lastflush == 0) { @@ -2817,8 +2815,8 @@ XLogBackgroundFlush(void) elog(LOG, "xlog bg flush request write %X/%X; flush: %X/%X, current is write %X/%X; flush %X/%X", LSN_FORMAT_ARGS(WriteRqst.Write), LSN_FORMAT_ARGS(WriteRqst.Flush), - LSN_FORMAT_ARGS(LogwrtResult.Write), - LSN_FORMAT_ARGS(LogwrtResult.Flush)); + LSN_FORMAT_ARGS(LogWriteResult), + LSN_FORMAT_ARGS(LogFlushResult)); #endif START_CRIT_SECTION(); @@ -2826,9 +2824,10 @@ XLogBackgroundFlush(void) /* now wait for any in-progress insertions to finish and get write lock */ WaitXLogInsertionsToFinish(WriteRqst.Write); LWLockAcquire(WALWriteLock, LW_EXCLUSIVE); - LogwrtResult = XLogCtl->LogwrtResult; - if (WriteRqst.Write > LogwrtResult.Write || - WriteRqst.Flush > LogwrtResult.Flush) + LogWriteResult = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write); + LogFlushResult = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush); + if (WriteRqst.Write > LogWriteResult || + WriteRqst.Flush > LogFlushResult) { XLogWrite(WriteRqst, insertTLI, flexible); } @@ -2910,16 +2909,14 @@ XLogNeedsFlush(XLogRecPtr record) } /* Quick exit if already known flushed */ - if (record <= LogwrtResult.Flush) + if (record <= LogFlushResult) return false; /* read LogwrtResult and update local state */ - SpinLockAcquire(&XLogCtl->info_lck); - LogwrtResult = XLogCtl->LogwrtResult; - SpinLockRelease(&XLogCtl->info_lck); + LogFlushResult = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush); /* check again */ - if (record <= LogwrtResult.Flush) + if (record <= LogFlushResult) return false; return true; @@ -5487,9 +5484,11 @@ StartupXLOG(void) XLogCtl->InitializedUpTo = EndOfLog; } - LogwrtResult.Write = LogwrtResult.Flush = EndOfLog; + LogWriteResult = LogFlushResult = EndOfLog; - XLogCtl->LogwrtResult = LogwrtResult; + /* XXX OK to write without WALWriteLock? */ + pg_atomic_write_u64(&XLogCtl->LogwrtResult.Write, EndOfLog); + pg_atomic_write_u64(&XLogCtl->LogwrtResult.Flush, EndOfLog); XLogCtl->LogwrtRqst.Write = EndOfLog; XLogCtl->LogwrtRqst.Flush = EndOfLog; @@ -5925,9 +5924,7 @@ GetFlushRecPtr(TimeLineID *insertTLI) { Assert(XLogCtl->SharedRecoveryState == RECOVERY_STATE_DONE); - SpinLockAcquire(&XLogCtl->info_lck); - LogwrtResult = XLogCtl->LogwrtResult; - SpinLockRelease(&XLogCtl->info_lck); + LogFlushResult = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush); /* * If we're writing and flushing WAL, the time line can't be changing, so @@ -5936,7 +5933,7 @@ GetFlushRecPtr(TimeLineID *insertTLI) if (insertTLI) *insertTLI = XLogCtl->InsertTimeLineID; - return LogwrtResult.Flush; + return LogFlushResult; } /* @@ -9083,11 +9080,9 @@ GetXLogInsertRecPtr(void) XLogRecPtr GetXLogWriteRecPtr(void) { - SpinLockAcquire(&XLogCtl->info_lck); - LogwrtResult = XLogCtl->LogwrtResult; - SpinLockRelease(&XLogCtl->info_lck); + LogWriteResult = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write); - return LogwrtResult.Write; + return LogWriteResult; } /* diff --git a/src/include/port/atomics.h b/src/include/port/atomics.h index 9550e04aaa..7abed1785a 100644 --- a/src/include/port/atomics.h +++ b/src/include/port/atomics.h @@ -519,6 +519,35 @@ pg_atomic_sub_fetch_u64(volatile pg_atomic_uint64 *ptr, int64 sub_) return pg_atomic_sub_fetch_u64_impl(ptr, sub_); } +/* + * Monotonically advance the given variable using only atomic operations until + * it's at least the target value. + * + * Full barrier semantics (even when value is unchanged). + */ +static inline void +pg_atomic_monotonic_advance_u64(volatile pg_atomic_uint64 *ptr, uint64 target_) +{ + uint64 currval; + +#ifndef PG_HAVE_ATOMIC_U64_SIMULATION + AssertPointerAlignment(ptr, 8); +#endif + + currval = pg_atomic_read_u64(ptr); + if (currval >= target_) + { + pg_memory_barrier(); + return; + } + + while (currval < target_) + { + if (pg_atomic_compare_exchange_u64(ptr, &currval, target_)) + break; + } +} + #undef INSIDE_ATOMICS_H #endif /* ATOMICS_H */ diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 93d5190508..5fed02537a 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2959,6 +2959,7 @@ XLogRecoveryCtlData XLogRedoAction XLogSegNo XLogSource +XLogwrtAtomic XLogwrtResult XLogwrtRqst XPVIV -- 2.30.2
>From d0ed6ec6b6b52b3f5fe0cd6616b64a43764c37b1 Mon Sep 17 00:00:00 2001 From: Alvaro Herrera <alvhe...@alvh.no-ip.org> Date: Mon, 22 Nov 2021 18:13:09 -0300 Subject: [PATCH v6 2/2] Change asyncXactLSN and WalWriterSleeping to atomics Using atomics for these two members of XLogCtl further reduces use of the info_lck spinlock. There's no benchmark study to show that this change is necessary, however. --- src/backend/access/transam/xlog.c | 31 ++++++++++++++----------------- 1 file changed, 14 insertions(+), 17 deletions(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 311a8a1192..b6243bcee2 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -479,11 +479,12 @@ typedef struct XLogCtlData XLogCtlInsert Insert; XLogwrtAtomic LogwrtResult; /* uses atomics */ + pg_atomic_uint64 asyncXactLSN; /* LSN of newest async commit/abort */ + /* Protected by info_lck: */ XLogwrtRqst LogwrtRqst; XLogRecPtr RedoRecPtr; /* a recent copy of Insert->RedoRecPtr */ FullTransactionId ckptFullXid; /* nextXid of latest checkpoint */ - XLogRecPtr asyncXactLSN; /* LSN of newest async commit/abort */ XLogRecPtr replicationSlotMinLSN; /* oldest LSN needed by any slot */ XLogSegNo lastRemovedSegNo; /* latest removed/recycled XLOG segment */ @@ -547,9 +548,9 @@ typedef struct XLogCtlData /* * WalWriterSleeping indicates whether the WAL writer is currently in * low-power mode (and hence should be nudged if an async commit occurs). - * Protected by info_lck. + * Uses atomics. */ - bool WalWriterSleeping; + pg_atomic_uint32 WalWriterSleeping; /* * During recovery, we keep a copy of the latest checkpoint record here. @@ -2377,12 +2378,8 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN) XLogRecPtr WriteRqstPtr = asyncXactLSN; bool sleeping; - LogFlushResult = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush); - SpinLockAcquire(&XLogCtl->info_lck); - sleeping = XLogCtl->WalWriterSleeping; - if (XLogCtl->asyncXactLSN < asyncXactLSN) - XLogCtl->asyncXactLSN = asyncXactLSN; - SpinLockRelease(&XLogCtl->info_lck); + sleeping = (bool) pg_atomic_read_u32(&XLogCtl->WalWriterSleeping); + pg_atomic_monotonic_advance_u64(&XLogCtl->asyncXactLSN, asyncXactLSN); /* * If the WALWriter is sleeping, we should kick it to make it come out of @@ -2395,6 +2392,7 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN) WriteRqstPtr -= WriteRqstPtr % XLOG_BLCKSZ; /* if we have already flushed that far, we're done */ + LogFlushResult = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush); if (WriteRqstPtr <= LogFlushResult) return; } @@ -2748,10 +2746,8 @@ XLogBackgroundFlush(void) LogFlushResult = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush); if (WriteRqst.Write <= LogFlushResult) { - pg_memory_barrier(); - SpinLockAcquire(&XLogCtl->info_lck); - WriteRqst.Write = XLogCtl->asyncXactLSN; - SpinLockRelease(&XLogCtl->info_lck); + pg_read_barrier(); + WriteRqst.Write = pg_atomic_read_u64(&XLogCtl->asyncXactLSN); flexible = false; /* ensure it all gets written */ } @@ -2826,6 +2822,7 @@ XLogBackgroundFlush(void) LWLockAcquire(WALWriteLock, LW_EXCLUSIVE); LogWriteResult = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write); LogFlushResult = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush); + /* no barrier needed here */ if (WriteRqst.Write > LogWriteResult || WriteRqst.Flush > LogFlushResult) { @@ -4494,7 +4491,7 @@ XLOGShmemInit(void) XLogCtl->XLogCacheBlck = XLOGbuffers - 1; XLogCtl->SharedRecoveryState = RECOVERY_STATE_CRASH; XLogCtl->InstallXLogFileSegmentActive = false; - XLogCtl->WalWriterSleeping = false; + pg_atomic_write_u32(&XLogCtl->WalWriterSleeping, (uint32) false); SpinLockInit(&XLogCtl->Insert.insertpos_lck); SpinLockInit(&XLogCtl->info_lck); @@ -9213,11 +9210,11 @@ IsInstallXLogFileSegmentActive(void) /* * Update the WalWriterSleeping flag. + * + * Note there is no memory barrier here. Caller must insert one if needed. */ void SetWalWriterSleeping(bool sleeping) { - SpinLockAcquire(&XLogCtl->info_lck); - XLogCtl->WalWriterSleeping = sleeping; - SpinLockRelease(&XLogCtl->info_lck); + pg_atomic_write_u32(&XLogCtl->WalWriterSleeping, (uint32) sleeping); } -- 2.30.2