Pushed 0001. Here's the patch that adds the Copy position one more time, with the monotonic_advance function returning the value.
-- Álvaro Herrera PostgreSQL Developer — https://www.EnterpriseDB.com/
>From 3f5c860576245b92701e7bfc517947c418c68510 Mon Sep 17 00:00:00 2001 From: Alvaro Herrera <alvhe...@alvh.no-ip.org> Date: Fri, 5 Apr 2024 13:52:44 +0200 Subject: [PATCH v17] Add logCopyResult Updated using monotonic increment --- src/backend/access/transam/xlog.c | 32 ++++++++++++++++++++++++++- src/include/port/atomics.h | 36 +++++++++++++++++++++++++++++++ 2 files changed, 67 insertions(+), 1 deletion(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index b4499cda7b..a40c1fb79e 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -469,6 +469,7 @@ typedef struct XLogCtlData XLogRecPtr lastSegSwitchLSN; /* These are accessed using atomics -- info_lck not needed */ + pg_atomic_uint64 logCopyResult; /* last byte + 1 copied to WAL buffers */ pg_atomic_uint64 logWriteResult; /* last byte + 1 written out */ pg_atomic_uint64 logFlushResult; /* last byte + 1 flushed */ @@ -1499,6 +1500,7 @@ static XLogRecPtr WaitXLogInsertionsToFinish(XLogRecPtr upto) { uint64 bytepos; + XLogRecPtr copyptr; XLogRecPtr reservedUpto; XLogRecPtr finishedUpto; XLogCtlInsert *Insert = &XLogCtl->Insert; @@ -1507,6 +1509,11 @@ WaitXLogInsertionsToFinish(XLogRecPtr upto) if (MyProc == NULL) elog(PANIC, "cannot wait without a PGPROC structure"); + /* check if there's any work to do */ + copyptr = pg_atomic_read_u64(&XLogCtl->logCopyResult); + if (upto <= copyptr) + return copyptr; + /* Read the current insert position */ SpinLockAcquire(&Insert->insertpos_lck); bytepos = Insert->CurrBytePos; @@ -1586,6 +1593,10 @@ WaitXLogInsertionsToFinish(XLogRecPtr upto) if (insertingat != InvalidXLogRecPtr && insertingat < finishedUpto) finishedUpto = insertingat; } + + finishedUpto = + pg_atomic_monotonic_advance_u64(&XLogCtl->logCopyResult, finishedUpto); + return finishedUpto; } @@ -1727,13 +1738,24 @@ WALReadFromBuffers(char *dstbuf, XLogRecPtr startptr, Size count, { char *pdst = dstbuf; XLogRecPtr recptr = startptr; + XLogRecPtr copyptr; Size nbytes = count; if (RecoveryInProgress() || tli != GetWALInsertionTimeLine()) return 0; Assert(!XLogRecPtrIsInvalid(startptr)); - Assert(startptr + count <= LogwrtResult.Write); + + /* + * Caller should ensure that the requested data has been copied to WAL + * buffers before we try to read it. + */ + copyptr = pg_atomic_read_u64(&XLogCtl->logCopyResult); + if (startptr + count > copyptr) + ereport(ERROR, + errmsg("request to read past end of generated WAL; requested %X/%X, current position %X/%X", + LSN_FORMAT_ARGS(startptr + count), + LSN_FORMAT_ARGS(copyptr))); /* * Loop through the buffers without a lock. For each buffer, atomically @@ -2571,13 +2593,19 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible) { XLogRecPtr Flush; XLogRecPtr Write; + XLogRecPtr Copy; Flush = pg_atomic_read_u64(&XLogCtl->logFlushResult); pg_read_barrier(); Write = pg_atomic_read_u64(&XLogCtl->logWriteResult); + pg_read_barrier(); + Copy = pg_atomic_read_u64(&XLogCtl->logCopyResult); /* WAL written to disk is always ahead of WAL flushed */ Assert(Write >= Flush); + + /* WAL copied to buffers is always ahead of WAL written */ + Assert(Copy >= Write); } #endif } @@ -4951,6 +4979,7 @@ XLOGShmemInit(void) SpinLockInit(&XLogCtl->Insert.insertpos_lck); SpinLockInit(&XLogCtl->info_lck); + pg_atomic_init_u64(&XLogCtl->logCopyResult, InvalidXLogRecPtr); pg_atomic_init_u64(&XLogCtl->logWriteResult, InvalidXLogRecPtr); pg_atomic_init_u64(&XLogCtl->logFlushResult, InvalidXLogRecPtr); pg_atomic_init_u64(&XLogCtl->unloggedLSN, InvalidXLogRecPtr); @@ -5979,6 +6008,7 @@ StartupXLOG(void) * because no other process can be reading or writing WAL yet. */ LogwrtResult.Write = LogwrtResult.Flush = EndOfLog; + pg_atomic_write_u64(&XLogCtl->logCopyResult, EndOfLog); pg_atomic_write_u64(&XLogCtl->logWriteResult, EndOfLog); pg_atomic_write_u64(&XLogCtl->logFlushResult, EndOfLog); XLogCtl->LogwrtRqst.Write = EndOfLog; diff --git a/src/include/port/atomics.h b/src/include/port/atomics.h index ff47782cdb..78987f3154 100644 --- a/src/include/port/atomics.h +++ b/src/include/port/atomics.h @@ -570,6 +570,42 @@ pg_atomic_sub_fetch_u64(volatile pg_atomic_uint64 *ptr, int64 sub_) return pg_atomic_sub_fetch_u64_impl(ptr, sub_); } +/* + * Monotonically advance the given variable using only atomic operations until + * it's at least the target value. Returns the latest value observed, which + * may or may not be the target value. + * + * Full barrier semantics (even when value is unchanged). + */ +static inline uint64 +pg_atomic_monotonic_advance_u64(volatile pg_atomic_uint64 *ptr, uint64 target_) +{ + uint64 currval; + +#ifndef PG_HAVE_ATOMIC_U64_SIMULATION + AssertPointerAlignment(ptr, 8); +#endif + + currval = pg_atomic_read_u64_impl(ptr); + if (currval >= target_) + { + pg_memory_barrier(); + return currval; + } + +#ifndef PG_HAVE_ATOMIC_U64_SIMULATION + AssertPointerAlignment(&currval, 8); +#endif + + while (currval < target_) + { + if (pg_atomic_compare_exchange_u64_impl(ptr, &currval, target_)) + break; + } + + return Max(target_, currval); +} + #undef INSIDE_ATOMICS_H #endif /* ATOMICS_H */ -- 2.39.2