On Wed, Apr 8, 2020 at 12:52 PM Thomas Munro <thomas.mu...@gmail.com> wrote: > * he gave some feedback on the read_local_xlog_page() modifications: I > probably need to reconsider the change to logical.c that passes NULL > instead of cxt to the read_page callback; and the switch statement in > read_local_xlog_page() probably should have a case for the preexisting > mode
So... logical.c wants to give its LogicalDecodingContext to any XLogPageReadCB you give it, via "private_data"; that is, it really only accepts XLogPageReadCB implementations that understand that (or ignore it). What I want to do is give every XLogPageReadCB the chance to have its own state that it is control of (to receive settings specific to the implementation, or whatever), that you supply along with it. We can't do both kinds of things with private_data, so I have added a second member read_page_data to XLogReaderState. If you pass in read_local_xlog_page as read_page, then you can optionally install a pointer to XLogReadLocalOptions as reader->read_page_data, to activate the new behaviours I added for prefetching purposes. While working on that, I realised the readahead XLogReader was breaking a rule expressed in XLogReadDetermineTimeLine(). Timelines are really confusing and there were probably several subtle or not to subtle bugs there. So I added an option to skip all of that logic, and just say "I command you to read only from TLI X". It reads the same TLI as recovery is reading, until it hits the end of readable data and that causes prefetching to shut down. Then the main recovery loop resets the prefetching module when it sees a TLI switch, so then it starts up again. This seems to work reliably, but I've obviously had limited time to test. Does this scheme sound sane? I think this is basically committable (though of course I wish I had more time to test and review). Ugh. Feature freeze in half an hour.
From 8654ea7f2ed61de7ab3f0b305e37d190932ad97c Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Tue, 17 Mar 2020 15:28:08 +1300 Subject: [PATCH v7 1/4] Rationalize GetWalRcv{Write,Flush}RecPtr(). GetWalRcvWriteRecPtr() previously reported the latest *flushed* location. Adopt the conventional terminology used elsewhere in the tree by renaming it to GetWalRcvFlushRecPtr(), and likewise for some related variables that used the term "received". Add a new definition of GetWalRcvWriteRecPtr(), which returns the latest *written* value. This will allow later patches to use the value for non-data-integrity purposes, without having to wait for the flush pointer to advance. Reviewed-by: Alvaro Herrera <alvhe...@2ndquadrant.com> Reviewed-by: Andres Freund <and...@anarazel.de> Discussion: https://postgr.es/m/CA%2BhUKGJ4VJN8ttxScUFM8dOKX0BrBiboo5uz1cq%3DAovOddfHpA%40mail.gmail.com --- src/backend/access/transam/xlog.c | 20 +++++++++--------- src/backend/access/transam/xlogfuncs.c | 2 +- src/backend/replication/README | 2 +- src/backend/replication/walreceiver.c | 15 +++++++++----- src/backend/replication/walreceiverfuncs.c | 24 ++++++++++++++++------ src/backend/replication/walsender.c | 2 +- src/include/replication/walreceiver.h | 18 ++++++++++++---- 7 files changed, 55 insertions(+), 28 deletions(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 8a4c1743e5..c60842ea03 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -209,8 +209,8 @@ HotStandbyState standbyState = STANDBY_DISABLED; static XLogRecPtr LastRec; -/* Local copy of WalRcv->receivedUpto */ -static XLogRecPtr receivedUpto = 0; +/* Local copy of WalRcv->flushedUpto */ +static XLogRecPtr flushedUpto = 0; static TimeLineID receiveTLI = 0; /* @@ -9376,7 +9376,7 @@ CreateRestartPoint(int flags) * Retreat _logSegNo using the current end of xlog replayed or received, * whichever is later. */ - receivePtr = GetWalRcvWriteRecPtr(NULL, NULL); + receivePtr = GetWalRcvFlushRecPtr(NULL, NULL); replayPtr = GetXLogReplayRecPtr(&replayTLI); endptr = (receivePtr < replayPtr) ? replayPtr : receivePtr; KeepLogSeg(endptr, &_logSegNo); @@ -11869,7 +11869,7 @@ retry: /* See if we need to retrieve more data */ if (readFile < 0 || (readSource == XLOG_FROM_STREAM && - receivedUpto < targetPagePtr + reqLen)) + flushedUpto < targetPagePtr + reqLen)) { if (!WaitForWALToBecomeAvailable(targetPagePtr + reqLen, private->randAccess, @@ -11900,10 +11900,10 @@ retry: */ if (readSource == XLOG_FROM_STREAM) { - if (((targetPagePtr) / XLOG_BLCKSZ) != (receivedUpto / XLOG_BLCKSZ)) + if (((targetPagePtr) / XLOG_BLCKSZ) != (flushedUpto / XLOG_BLCKSZ)) readLen = XLOG_BLCKSZ; else - readLen = XLogSegmentOffset(receivedUpto, wal_segment_size) - + readLen = XLogSegmentOffset(flushedUpto, wal_segment_size) - targetPageOff; } else @@ -12318,7 +12318,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, RequestXLogStreaming(tli, ptr, PrimaryConnInfo, PrimarySlotName, wal_receiver_create_temp_slot); - receivedUpto = 0; + flushedUpto = 0; } /* @@ -12342,14 +12342,14 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, * XLogReceiptTime will not advance, so the grace time * allotted to conflicting queries will decrease. */ - if (RecPtr < receivedUpto) + if (RecPtr < flushedUpto) havedata = true; else { XLogRecPtr latestChunkStart; - receivedUpto = GetWalRcvWriteRecPtr(&latestChunkStart, &receiveTLI); - if (RecPtr < receivedUpto && receiveTLI == curFileTLI) + flushedUpto = GetWalRcvFlushRecPtr(&latestChunkStart, &receiveTLI); + if (RecPtr < flushedUpto && receiveTLI == curFileTLI) { havedata = true; if (latestChunkStart <= RecPtr) diff --git a/src/backend/access/transam/xlogfuncs.c b/src/backend/access/transam/xlogfuncs.c index b84ba57259..00e1b33ed5 100644 --- a/src/backend/access/transam/xlogfuncs.c +++ b/src/backend/access/transam/xlogfuncs.c @@ -398,7 +398,7 @@ pg_last_wal_receive_lsn(PG_FUNCTION_ARGS) { XLogRecPtr recptr; - recptr = GetWalRcvWriteRecPtr(NULL, NULL); + recptr = GetWalRcvFlushRecPtr(NULL, NULL); if (recptr == 0) PG_RETURN_NULL(); diff --git a/src/backend/replication/README b/src/backend/replication/README index 0cbb990613..8ccdd86e74 100644 --- a/src/backend/replication/README +++ b/src/backend/replication/README @@ -54,7 +54,7 @@ and WalRcvData->slotname, and initializes the starting point in WalRcvData->receiveStart. As walreceiver receives WAL from the master server, and writes and flushes -it to disk (in pg_wal), it updates WalRcvData->receivedUpto and signals +it to disk (in pg_wal), it updates WalRcvData->flushedUpto and signals the startup process to know how far WAL replay can advance. Walreceiver sends information about replication progress to the master server diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index aee67c61aa..d69fb90132 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -12,7 +12,7 @@ * in the primary server), and then keeps receiving XLOG records and * writing them to the disk as long as the connection is alive. As XLOG * records are received and flushed to disk, it updates the - * WalRcv->receivedUpto variable in shared memory, to inform the startup + * WalRcv->flushedUpto variable in shared memory, to inform the startup * process of how far it can proceed with XLOG replay. * * A WAL receiver cannot directly load GUC parameters used when establishing @@ -261,6 +261,8 @@ WalReceiverMain(void) SpinLockRelease(&walrcv->mutex); + pg_atomic_init_u64(&WalRcv->writtenUpto, 0); + /* Arrange to clean up at walreceiver exit */ on_shmem_exit(WalRcvDie, 0); @@ -984,6 +986,9 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr) LogstreamResult.Write = recptr; } + + /* Update shared-memory status */ + pg_atomic_write_u64(&WalRcv->writtenUpto, LogstreamResult.Write); } /* @@ -1005,10 +1010,10 @@ XLogWalRcvFlush(bool dying) /* Update shared-memory status */ SpinLockAcquire(&walrcv->mutex); - if (walrcv->receivedUpto < LogstreamResult.Flush) + if (walrcv->flushedUpto < LogstreamResult.Flush) { - walrcv->latestChunkStart = walrcv->receivedUpto; - walrcv->receivedUpto = LogstreamResult.Flush; + walrcv->latestChunkStart = walrcv->flushedUpto; + walrcv->flushedUpto = LogstreamResult.Flush; walrcv->receivedTLI = ThisTimeLineID; } SpinLockRelease(&walrcv->mutex); @@ -1361,7 +1366,7 @@ pg_stat_get_wal_receiver(PG_FUNCTION_ARGS) state = WalRcv->walRcvState; receive_start_lsn = WalRcv->receiveStart; receive_start_tli = WalRcv->receiveStartTLI; - received_lsn = WalRcv->receivedUpto; + received_lsn = WalRcv->flushedUpto; received_tli = WalRcv->receivedTLI; last_send_time = WalRcv->lastMsgSendTime; last_receipt_time = WalRcv->lastMsgReceiptTime; diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c index 21d1823607..4afad83539 100644 --- a/src/backend/replication/walreceiverfuncs.c +++ b/src/backend/replication/walreceiverfuncs.c @@ -282,11 +282,11 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo, /* * If this is the first startup of walreceiver (on this timeline), - * initialize receivedUpto and latestChunkStart to the starting point. + * initialize flushedUpto and latestChunkStart to the starting point. */ if (walrcv->receiveStart == 0 || walrcv->receivedTLI != tli) { - walrcv->receivedUpto = recptr; + walrcv->flushedUpto = recptr; walrcv->receivedTLI = tli; walrcv->latestChunkStart = recptr; } @@ -304,7 +304,7 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo, } /* - * Returns the last+1 byte position that walreceiver has written. + * Returns the last+1 byte position that walreceiver has flushed. * * Optionally, returns the previous chunk start, that is the first byte * written in the most recent walreceiver flush cycle. Callers not @@ -312,13 +312,13 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo, * receiveTLI. */ XLogRecPtr -GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI) +GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI) { WalRcvData *walrcv = WalRcv; XLogRecPtr recptr; SpinLockAcquire(&walrcv->mutex); - recptr = walrcv->receivedUpto; + recptr = walrcv->flushedUpto; if (latestChunkStart) *latestChunkStart = walrcv->latestChunkStart; if (receiveTLI) @@ -328,6 +328,18 @@ GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI) return recptr; } +/* + * Returns the last+1 byte position that walreceiver has written. + * This returns a recently written value without taking a lock. + */ +XLogRecPtr +GetWalRcvWriteRecPtr(void) +{ + WalRcvData *walrcv = WalRcv; + + return pg_atomic_read_u64(&walrcv->writtenUpto); +} + /* * Returns the replication apply delay in ms or -1 * if the apply delay info is not available @@ -345,7 +357,7 @@ GetReplicationApplyDelay(void) TimestampTz chunkReplayStartTime; SpinLockAcquire(&walrcv->mutex); - receivePtr = walrcv->receivedUpto; + receivePtr = walrcv->flushedUpto; SpinLockRelease(&walrcv->mutex); replayPtr = GetXLogReplayRecPtr(NULL); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 06e8b79036..122d884f3e 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -2949,7 +2949,7 @@ GetStandbyFlushRecPtr(void) * has streamed, but hasn't been replayed yet. */ - receivePtr = GetWalRcvWriteRecPtr(NULL, &receiveTLI); + receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI); replayPtr = GetXLogReplayRecPtr(&replayTLI); ThisTimeLineID = replayTLI; diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index cf3e43128c..f1aa6e9977 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -16,6 +16,7 @@ #include "access/xlogdefs.h" #include "getaddrinfo.h" /* for NI_MAXHOST */ #include "pgtime.h" +#include "port/atomics.h" #include "replication/logicalproto.h" #include "replication/walsender.h" #include "storage/latch.h" @@ -73,19 +74,19 @@ typedef struct TimeLineID receiveStartTLI; /* - * receivedUpto-1 is the last byte position that has already been + * flushedUpto-1 is the last byte position that has already been * received, and receivedTLI is the timeline it came from. At the first * startup of walreceiver, these are set to receiveStart and * receiveStartTLI. After that, walreceiver updates these whenever it * flushes the received WAL to disk. */ - XLogRecPtr receivedUpto; + XLogRecPtr flushedUpto; TimeLineID receivedTLI; /* * latestChunkStart is the starting byte position of the current "batch" * of received WAL. It's actually the same as the previous value of - * receivedUpto before the last flush to disk. Startup process can use + * flushedUpto before the last flush to disk. Startup process can use * this to detect whether it's keeping up or not. */ XLogRecPtr latestChunkStart; @@ -141,6 +142,14 @@ typedef struct slock_t mutex; /* locks shared variables shown above */ + /* + * Like flushedUpto, but advanced after writing and before flushing, + * without the need to acquire the spin lock. Data can be read by another + * process up to this point, but shouldn't be used for data integrity + * purposes. + */ + pg_atomic_uint64 writtenUpto; + /* * force walreceiver reply? This doesn't need to be locked; memory * barriers for ordering are sufficient. But we do need atomic fetch and @@ -322,7 +331,8 @@ extern bool WalRcvRunning(void); extern void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo, const char *slotname, bool create_temp_slot); -extern XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI); +extern XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI); +extern XLogRecPtr GetWalRcvWriteRecPtr(void); extern int GetReplicationApplyDelay(void); extern int GetReplicationTransferLatency(void); extern void WalRcvForceReply(void); -- 2.20.1
From d0a1b60cbe589a4023b94db35ce3b830f5cbde04 Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Sat, 28 Mar 2020 11:42:59 +1300 Subject: [PATCH v7 2/4] Add pg_atomic_unlocked_add_fetch_XXX(). Add a variant of pg_atomic_add_fetch_XXX with no barrier semantics, for cases where you only want to avoid the possibility that a concurrent pg_atomic_read_XXX() sees a torn/partial value. Discussion: https://postgr.es/m/CA%2BhUKGJ4VJN8ttxScUFM8dOKX0BrBiboo5uz1cq%3DAovOddfHpA%40mail.gmail.com --- src/include/port/atomics.h | 24 ++++++++++++++++++++++ src/include/port/atomics/generic.h | 33 ++++++++++++++++++++++++++++++ 2 files changed, 57 insertions(+) diff --git a/src/include/port/atomics.h b/src/include/port/atomics.h index 4956ec55cb..2abb852893 100644 --- a/src/include/port/atomics.h +++ b/src/include/port/atomics.h @@ -389,6 +389,21 @@ pg_atomic_add_fetch_u32(volatile pg_atomic_uint32 *ptr, int32 add_) return pg_atomic_add_fetch_u32_impl(ptr, add_); } +/* + * pg_atomic_unlocked_add_fetch_u32 - atomically add to variable + * + * Like pg_atomic_unlocked_write_u32, guarantees only that partial values + * cannot be observed. + * + * No barrier semantics. + */ +static inline uint32 +pg_atomic_unlocked_add_fetch_u32(volatile pg_atomic_uint32 *ptr, int32 add_) +{ + AssertPointerAlignment(ptr, 4); + return pg_atomic_unlocked_add_fetch_u32_impl(ptr, add_); +} + /* * pg_atomic_sub_fetch_u32 - atomically subtract from variable * @@ -519,6 +534,15 @@ pg_atomic_sub_fetch_u64(volatile pg_atomic_uint64 *ptr, int64 sub_) return pg_atomic_sub_fetch_u64_impl(ptr, sub_); } +static inline uint64 +pg_atomic_unlocked_add_fetch_u64(volatile pg_atomic_uint64 *ptr, int64 add_) +{ +#ifndef PG_HAVE_ATOMIC_U64_SIMULATION + AssertPointerAlignment(ptr, 8); +#endif + return pg_atomic_unlocked_add_fetch_u64_impl(ptr, add_); +} + #undef INSIDE_ATOMICS_H #endif /* ATOMICS_H */ diff --git a/src/include/port/atomics/generic.h b/src/include/port/atomics/generic.h index d3ba89a58f..1683653ca6 100644 --- a/src/include/port/atomics/generic.h +++ b/src/include/port/atomics/generic.h @@ -234,6 +234,16 @@ pg_atomic_add_fetch_u32_impl(volatile pg_atomic_uint32 *ptr, int32 add_) } #endif +#if !defined(PG_HAVE_ATOMIC_UNLOCKED_ADD_FETCH_U32) +#define PG_HAVE_ATOMIC_UNLOCKED_ADD_FETCH_U32 +static inline uint32 +pg_atomic_unlocked_add_fetch_u32_impl(volatile pg_atomic_uint32 *ptr, int32 add_) +{ + ptr->value += add_; + return ptr->value; +} +#endif + #if !defined(PG_HAVE_ATOMIC_SUB_FETCH_U32) && defined(PG_HAVE_ATOMIC_FETCH_SUB_U32) #define PG_HAVE_ATOMIC_SUB_FETCH_U32 static inline uint32 @@ -399,3 +409,26 @@ pg_atomic_sub_fetch_u64_impl(volatile pg_atomic_uint64 *ptr, int64 sub_) return pg_atomic_fetch_sub_u64_impl(ptr, sub_) - sub_; } #endif + +#if defined(PG_HAVE_8BYTE_SINGLE_COPY_ATOMICITY) && \ + !defined(PG_HAVE_ATOMIC_U64_SIMULATION) + +#ifndef PG_HAVE_ATOMIC_UNLOCKED_ADD_FETCH_U64 +#define PG_HAVE_ATOMIC_UNLOCKED_ADD_FETCH_U64 +static inline uint64 +pg_atomic_unlocked_add_fetch_u64_impl(volatile pg_atomic_uint64 *ptr, uint64 val) +{ + ptr->value += val; + return ptr->value; +} +#endif + +#else + +static inline uint64 +pg_atomic_unlocked_add_fetch_u64_impl(volatile pg_atomic_uint64 *ptr, uint64 val) +{ + return pg_atomic_add_fetch_u64_impl(ptr, val); +} + +#endif -- 2.20.1
From dea9a3c46d35b12bbea8469e44223f73e4004d22 Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Tue, 7 Apr 2020 22:56:27 +1200 Subject: [PATCH v7 3/4] Allow XLogReadRecord() to be non-blocking. Extend read_local_xlog_page() to support non-blocking modes: 1. Reading as far as the WAL receiver has written so far. 2. Reading all the way to the end, when the end LSN is unknown. Discussion: https://postgr.es/m/CA%2BhUKGJ4VJN8ttxScUFM8dOKX0BrBiboo5uz1cq%3DAovOddfHpA%40mail.gmail.com --- src/backend/access/transam/xlogreader.c | 37 ++++-- src/backend/access/transam/xlogutils.c | 151 +++++++++++++++++------- src/backend/replication/walsender.c | 2 +- src/include/access/xlogreader.h | 20 +++- src/include/access/xlogutils.h | 23 ++++ 5 files changed, 178 insertions(+), 55 deletions(-) diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index 79ff976474..554b2029da 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -257,6 +257,9 @@ XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr) * If the reading fails for some other reason, NULL is also returned, and * *errormsg is set to a string with details of the failure. * + * If the read_page callback is one that returns XLOGPAGEREAD_WOULDBLOCK rather + * than waiting for WAL to arrive, NULL is also returned in that case. + * * The returned pointer (or *errormsg) points to an internal buffer that's * valid until the next call to XLogReadRecord. */ @@ -546,10 +549,11 @@ XLogReadRecord(XLogReaderState *state, char **errormsg) err: /* - * Invalidate the read state. We might read from a different source after - * failure. + * Invalidate the read state, if this was an error. We might read from a + * different source after failure. */ - XLogReaderInvalReadState(state); + if (readOff != XLOGPAGEREAD_WOULDBLOCK) + XLogReaderInvalReadState(state); if (state->errormsg_buf[0] != '\0') *errormsg = state->errormsg_buf; @@ -561,8 +565,9 @@ err: * Read a single xlog page including at least [pageptr, reqLen] of valid data * via the read_page() callback. * - * Returns -1 if the required page cannot be read for some reason; errormsg_buf - * is set in that case (unless the error occurs in the read_page callback). + * Returns XLOGPAGEREAD_ERROR or XLOGPAGEREAD_WOULDBLOCK if the required page + * cannot be read for some reason; errormsg_buf is set in the former case + * (unless the error occurs in the read_page callback). * * We fetch the page from a reader-local cache if we know we have the required * data and if there hasn't been any error since caching the data. @@ -659,8 +664,11 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) return readLen; err: + if (readLen == XLOGPAGEREAD_WOULDBLOCK) + return XLOGPAGEREAD_WOULDBLOCK; + XLogReaderInvalReadState(state); - return -1; + return XLOGPAGEREAD_ERROR; } /* @@ -939,6 +947,7 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr) XLogRecPtr found = InvalidXLogRecPtr; XLogPageHeader header; char *errormsg; + int readLen; Assert(!XLogRecPtrIsInvalid(RecPtr)); @@ -952,7 +961,6 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr) XLogRecPtr targetPagePtr; int targetRecOff; uint32 pageHeaderSize; - int readLen; /* * Compute targetRecOff. It should typically be equal or greater than @@ -1033,7 +1041,8 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr) } err: - XLogReaderInvalReadState(state); + if (readLen != XLOGPAGEREAD_WOULDBLOCK) + XLogReaderInvalReadState(state); return InvalidXLogRecPtr; } @@ -1084,13 +1093,23 @@ WALRead(char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, tli != seg->ws_tli) { XLogSegNo nextSegNo; - if (seg->ws_file >= 0) close(seg->ws_file); XLByteToSeg(recptr, nextSegNo, segcxt->ws_segsize); seg->ws_file = openSegment(nextSegNo, segcxt, &tli); + /* callback reported that there was no such file */ + if (seg->ws_file < 0) + { + errinfo->wre_errno = errno; + errinfo->wre_req = 0; + errinfo->wre_read = 0; + errinfo->wre_off = startoff; + errinfo->wre_seg = *seg; + return false; + } + /* Update the current segment info. */ seg->ws_tli = tli; seg->ws_segno = nextSegNo; diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index 6cb143e161..2d702437dd 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -25,6 +25,7 @@ #include "access/xlogutils.h" #include "miscadmin.h" #include "pgstat.h" +#include "replication/walreceiver.h" #include "storage/smgr.h" #include "utils/guc.h" #include "utils/hsearch.h" @@ -783,6 +784,30 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa } } +/* openSegment callback for WALRead */ +static int +wal_segment_try_open(XLogSegNo nextSegNo, + WALSegmentContext *segcxt, + TimeLineID *tli_p) +{ + TimeLineID tli = *tli_p; + char path[MAXPGPATH]; + int fd; + + XLogFilePath(path, tli, nextSegNo, segcxt->ws_segsize); + fd = BasicOpenFile(path, O_RDONLY | PG_BINARY); + if (fd >= 0) + return fd; + + if (errno != ENOENT) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open file \"%s\": %m", + path))); + + return -1; /* keep compiler quiet */ +} + /* openSegment callback for WALRead */ static int wal_segment_open(XLogSegNo nextSegNo, WALSegmentContext * segcxt, @@ -831,58 +856,92 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, TimeLineID tli; int count; WALReadError errinfo; + bool try_read = false; + XLogReadLocalOptions *options = + (XLogReadLocalOptions *) state->read_page_data; loc = targetPagePtr + reqLen; /* Loop waiting for xlog to be available if necessary */ while (1) { - /* - * Determine the limit of xlog we can currently read to, and what the - * most recent timeline is. - * - * RecoveryInProgress() will update ThisTimeLineID when it first - * notices recovery finishes, so we only have to maintain it for the - * local process until recovery ends. - */ - if (!RecoveryInProgress()) - read_upto = GetFlushRecPtr(); - else - read_upto = GetXLogReplayRecPtr(&ThisTimeLineID); - tli = ThisTimeLineID; + switch (options ? options->read_upto_policy : -1) + { + case XLRO_WALRCV_WRITTEN: + /* + * We'll try to read as far as has been written by the WAL + * receiver, on the requested timeline. When we run out of valid + * data, we'll return an error. This is used by xlogprefetch.c + * while streaming. + */ + read_upto = GetWalRcvWriteRecPtr(); + try_read = true; + state->currTLI = tli = options->tli; + break; - /* - * Check which timeline to get the record from. - * - * We have to do it each time through the loop because if we're in - * recovery as a cascading standby, the current timeline might've - * become historical. We can't rely on RecoveryInProgress() because in - * a standby configuration like - * - * A => B => C - * - * if we're a logical decoding session on C, and B gets promoted, our - * timeline will change while we remain in recovery. - * - * We can't just keep reading from the old timeline as the last WAL - * archive in the timeline will get renamed to .partial by - * StartupXLOG(). - * - * If that happens after our caller updated ThisTimeLineID but before - * we actually read the xlog page, we might still try to read from the - * old (now renamed) segment and fail. There's not much we can do - * about this, but it can only happen when we're a leaf of a cascading - * standby whose master gets promoted while we're decoding, so a - * one-off ERROR isn't too bad. - */ - XLogReadDetermineTimeline(state, targetPagePtr, reqLen); + case XLRO_END: + /* + * We'll try to read as far as we can on one timeline. This is + * used by xlogprefetch.c for crash recovery. + */ + read_upto = (XLogRecPtr) -1; + try_read = true; + state->currTLI = tli = options->tli; + break; + + default: + /* + * Determine the limit of xlog we can currently read to, and what the + * most recent timeline is. + * + * RecoveryInProgress() will update ThisTimeLineID when it first + * notices recovery finishes, so we only have to maintain it for + * the local process until recovery ends. + */ + if (!RecoveryInProgress()) + read_upto = GetFlushRecPtr(); + else + read_upto = GetXLogReplayRecPtr(&ThisTimeLineID); + tli = ThisTimeLineID; + + /* + * Check which timeline to get the record from. + * + * We have to do it each time through the loop because if we're in + * recovery as a cascading standby, the current timeline might've + * become historical. We can't rely on RecoveryInProgress() + * because in a standby configuration like + * + * A => B => C + * + * if we're a logical decoding session on C, and B gets promoted, + * our timeline will change while we remain in recovery. + * + * We can't just keep reading from the old timeline as the last + * WAL archive in the timeline will get renamed to .partial by + * StartupXLOG(). + * + * If that happens after our caller updated ThisTimeLineID but + * before we actually read the xlog page, we might still try to + * read from the old (now renamed) segment and fail. There's not + * much we can do about this, but it can only happen when we're a + * leaf of a cascading standby whose master gets promoted while + * we're decoding, so a one-off ERROR isn't too bad. + */ + XLogReadDetermineTimeline(state, targetPagePtr, reqLen); + break; + } - if (state->currTLI == ThisTimeLineID) + if (state->currTLI == tli) { if (loc <= read_upto) break; + /* not enough data there, but we were asked not to wait */ + if (options && options->nowait) + return XLOGPAGEREAD_WOULDBLOCK; + CHECK_FOR_INTERRUPTS(); pg_usleep(1000L); } @@ -924,7 +983,7 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, else if (targetPagePtr + reqLen > read_upto) { /* not enough data there */ - return -1; + return XLOGPAGEREAD_ERROR; } else { @@ -938,8 +997,18 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, * zero-padded up to the page boundary if it's incomplete. */ if (!WALRead(cur_page, targetPagePtr, XLOG_BLCKSZ, tli, &state->seg, - &state->segcxt, wal_segment_open, &errinfo)) + &state->segcxt, + try_read ? wal_segment_try_open : wal_segment_open, + &errinfo)) + { + /* + * When on one single timeline, we may read past the end of available + * segments. Report lack of file as an error. + */ + if (try_read) + return XLOGPAGEREAD_ERROR; WALReadRaiseError(&errinfo); + } /* number of valid bytes in the buffer */ return count; diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 122d884f3e..15ff3d35e4 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -818,7 +818,7 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req /* fail if not (implies we are going to shut down) */ if (flushptr < targetPagePtr + reqLen) - return -1; + return XLOGPAGEREAD_ERROR; if (targetPagePtr + XLOG_BLCKSZ <= flushptr) count = XLOG_BLCKSZ; /* more than one block available */ diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h index 4582196e18..a3ac7f414b 100644 --- a/src/include/access/xlogreader.h +++ b/src/include/access/xlogreader.h @@ -50,6 +50,10 @@ typedef struct WALSegmentContext typedef struct XLogReaderState XLogReaderState; +/* Special negative return values for XLogPageReadCB functions */ +#define XLOGPAGEREAD_ERROR -1 +#define XLOGPAGEREAD_WOULDBLOCK -2 + /* Function type definition for the read_page callback */ typedef int (*XLogPageReadCB) (XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, @@ -99,10 +103,13 @@ struct XLogReaderState * This callback shall read at least reqLen valid bytes of the xlog page * starting at targetPagePtr, and store them in readBuf. The callback * shall return the number of bytes read (never more than XLOG_BLCKSZ), or - * -1 on failure. The callback shall sleep, if necessary, to wait for the - * requested bytes to become available. The callback will not be invoked - * again for the same page unless more than the returned number of bytes - * are needed. + * XLOGPAGEREAD_ERROR on failure. The callback may either sleep or return + * XLOGPAGEREAD_WOULDBLOCK, if necessary, to wait for the requested bytes + * to become available. If a callback that can return + * XLOGPAGEREAD_WOULDBLOCK is installed, the reader client must expect to + * fail to read when there is not enough data. The callback will not be + * invoked again for the same page unless more than the returned number of + * bytes are needed. * * targetRecPtr is the position of the WAL record we're reading. Usually * it is equal to targetPagePtr + reqLen, but sometimes xlogreader needs @@ -126,6 +133,11 @@ struct XLogReaderState */ void *private_data; + /* + * Opaque data for callbacks to use. Not used by XLogReader. + */ + void *read_page_data; + /* * Start and end point of last record read. EndRecPtr is also used as the * position to read next. Calling XLogBeginRead() sets EndRecPtr to the diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h index 5181a077d9..89c9ce90f8 100644 --- a/src/include/access/xlogutils.h +++ b/src/include/access/xlogutils.h @@ -47,6 +47,29 @@ extern Buffer XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum, extern Relation CreateFakeRelcacheEntry(RelFileNode rnode); extern void FreeFakeRelcacheEntry(Relation fakerel); +/* + * A pointer to an XLogReadLocalOptions struct can supplied as the private data + * for an XLogReader, causing read_local_xlog_page() to modify its behavior. + */ +typedef struct XLogReadLocalOptions +{ + /* Don't block waiting for new WAL to arrive. */ + bool nowait; + + /* + * For XLRO_WALRCV_WRITTEN and XLRO_END modes, the timeline ID must be + * provided. + */ + TimeLineID tli; + + /* How far to read. */ + enum { + XLRO_STANDARD, + XLRO_WALRCV_WRITTEN, + XLRO_END + } read_upto_policy; +} XLogReadLocalOptions; + extern int read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page); -- 2.20.1
From 85c2ea245c03c6a859e652cc2d9df3b2ca323bb4 Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Wed, 8 Apr 2020 23:04:51 +1200 Subject: [PATCH v7 4/4] Prefetch referenced blocks during recovery. Introduce a new GUC max_recovery_prefetch_distance. If it is set to a positive number of bytes, then read ahead in the WAL at most that distance, and initiate asynchronous reading of referenced blocks. The goal is to avoid I/O stalls and benefit from concurrent I/O. The number of concurrency asynchronous reads is capped by the existing maintenance_io_concurrency GUC. The feature is enabled by default for now, but we might reconsider that before release. Reviewed-by: Alvaro Herrera <alvhe...@2ndquadrant.com> Reviewed-by: Andres Freund <and...@anarazel.de> Reviewed-by: Tomas Vondra <tomas.von...@2ndquadrant.com> Discussion: https://postgr.es/m/CA%2BhUKGJ4VJN8ttxScUFM8dOKX0BrBiboo5uz1cq%3DAovOddfHpA%40mail.gmail.com --- doc/src/sgml/config.sgml | 45 + doc/src/sgml/monitoring.sgml | 81 ++ doc/src/sgml/wal.sgml | 13 + src/backend/access/transam/Makefile | 1 + src/backend/access/transam/xlog.c | 16 + src/backend/access/transam/xlogprefetch.c | 905 ++++++++++++++++++ src/backend/catalog/system_views.sql | 14 + src/backend/postmaster/pgstat.c | 96 +- src/backend/storage/ipc/ipci.c | 3 + src/backend/utils/misc/guc.c | 47 +- src/backend/utils/misc/postgresql.conf.sample | 5 + src/include/access/xlogprefetch.h | 85 ++ src/include/catalog/pg_proc.dat | 8 + src/include/pgstat.h | 27 + src/include/utils/guc.h | 4 + src/test/regress/expected/rules.out | 11 + 16 files changed, 1359 insertions(+), 2 deletions(-) create mode 100644 src/backend/access/transam/xlogprefetch.c create mode 100644 src/include/access/xlogprefetch.h diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index a0da4aabac..18979d0496 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -3121,6 +3121,51 @@ include_dir 'conf.d' </listitem> </varlistentry> + <varlistentry id="guc-max-recovery-prefetch-distance" xreflabel="max_recovery_prefetch_distance"> + <term><varname>max_recovery_prefetch_distance</varname> (<type>integer</type>) + <indexterm> + <primary><varname>max_recovery_prefetch_distance</varname> configuration parameter</primary> + </indexterm> + </term> + <listitem> + <para> + The maximum distance to look ahead in the WAL during recovery, to find + blocks to prefetch. Prefetching blocks that will soon be needed can + reduce I/O wait times. The number of concurrent prefetches is limited + by this setting as well as + <xref linkend="guc-maintenance-io-concurrency"/>. Setting it too high + might be counterproductive, if it means that data falls out of the + kernel cache before it is needed. If this value is specified without + units, it is taken as bytes. A setting of -1 disables prefetching + during recovery. + The default is 256kB on systems that support + <function>posix_fadvise</function>, and otherwise -1. + </para> + </listitem> + </varlistentry> + + <varlistentry id="guc-recovery-prefetch-fpw" xreflabel="recovery_prefetch_fpw"> + <term><varname>recovery_prefetch_fpw</varname> (<type>boolean</type>) + <indexterm> + <primary><varname>recovery_prefetch_fpw</varname> configuration parameter</primary> + </indexterm> + </term> + <listitem> + <para> + Whether to prefetch blocks that were logged with full page images, + during recovery. Often this doesn't help, since such blocks will not + be read the first time they are needed and might remain in the buffer + pool after that. However, on file systems with a block size larger + than + <productname>PostgreSQL</productname>'s, prefetching can avoid a + costly read-before-write when a blocks are later written. This + setting has no effect unless + <xref linkend="guc-max-recovery-prefetch-distance"/> is set to a positive + number. The default is off. + </para> + </listitem> + </varlistentry> + </variablelist> </sect2> <sect2 id="runtime-config-wal-archiving"> diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index c50b72137f..ddf2ee1f96 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -320,6 +320,13 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser </entry> </row> + <row> + <entry><structname>pg_stat_prefetch_recovery</structname><indexterm><primary>pg_stat_prefetch_recovery</primary></indexterm></entry> + <entry>Only one row, showing statistics about blocks prefetched during recovery. + See <xref linkend="pg-stat-prefetch-recovery-view"/> for details. + </entry> + </row> + <row> <entry><structname>pg_stat_subscription</structname><indexterm><primary>pg_stat_subscription</primary></indexterm></entry> <entry>At least one row per subscription, showing information about @@ -2223,6 +2230,78 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i connected server. </para> + <table id="pg-stat-prefetch-recovery-view" xreflabel="pg_stat_prefetch_recovery"> + <title><structname>pg_stat_prefetch_recovery</structname> View</title> + <tgroup cols="3"> + <thead> + <row> + <entry>Column</entry> + <entry>Type</entry> + <entry>Description</entry> + </row> + </thead> + + <tbody> + <row> + <entry><structfield>prefetch</structfield></entry> + <entry><type>bigint</type></entry> + <entry>Number of blocks prefetched because they were not in the buffer pool</entry> + </row> + <row> + <entry><structfield>skip_hit</structfield></entry> + <entry><type>bigint</type></entry> + <entry>Number of blocks not prefetched because they were already in the buffer pool</entry> + </row> + <row> + <entry><structfield>skip_new</structfield></entry> + <entry><type>bigint</type></entry> + <entry>Number of blocks not prefetched because they were new (usually relation extension)</entry> + </row> + <row> + <entry><structfield>skip_fpw</structfield></entry> + <entry><type>bigint</type></entry> + <entry>Number of blocks not prefetched because a full page image was included in the WAL and <xref linkend="guc-recovery-prefetch-fpw"/> was set to <literal>off</literal></entry> + </row> + <row> + <entry><structfield>skip_seq</structfield></entry> + <entry><type>bigint</type></entry> + <entry>Number of blocks not prefetched because of repeated or sequential access</entry> + </row> + <row> + <entry><structfield>distance</structfield></entry> + <entry><type>integer</type></entry> + <entry>How far ahead of recovery the prefetcher is currently reading, in bytes</entry> + </row> + <row> + <entry><structfield>queue_depth</structfield></entry> + <entry><type>integer</type></entry> + <entry>How many prefetches have been initiated but are not yet known to have completed</entry> + </row> + <row> + <entry><structfield>avg_distance</structfield></entry> + <entry><type>float4</type></entry> + <entry>How far ahead of recovery the prefetcher is on average, while recovery is not idle</entry> + </row> + <row> + <entry><structfield>avg_queue_depth</structfield></entry> + <entry><type>float4</type></entry> + <entry>Average number of prefetches in flight while recovery is not idle</entry> + </row> + </tbody> + </tgroup> + </table> + + <para> + The <structname>pg_stat_prefetch_recovery</structname> view will contain only + one row. It is filled with nulls if recovery is not running or WAL + prefetching is not enabled. See <xref linkend="guc-max-recovery-prefetch-distance"/> + for more information. The counters in this view are reset whenever the + <xref linkend="guc-max-recovery-prefetch-distance"/>, + <xref linkend="guc-recovery-prefetch-fpw"/> or + <xref linkend="guc-maintenance-io-concurrency"/> setting is changed and + the server configuration is reloaded. + </para> + <table id="pg-stat-subscription" xreflabel="pg_stat_subscription"> <title><structname>pg_stat_subscription</structname> View</title> <tgroup cols="3"> @@ -3446,6 +3525,8 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i counters shown in the <structname>pg_stat_bgwriter</structname> view. Calling <literal>pg_stat_reset_shared('archiver')</literal> will zero all the counters shown in the <structname>pg_stat_archiver</structname> view. + Calling <literal>pg_stat_reset_shared('prefetch_recovery')</literal> will zero all the + counters shown in the <structname>pg_stat_prefetch_recovery</structname> view. </entry> </row> diff --git a/doc/src/sgml/wal.sgml b/doc/src/sgml/wal.sgml index bd9fae544c..38fc8149a8 100644 --- a/doc/src/sgml/wal.sgml +++ b/doc/src/sgml/wal.sgml @@ -719,6 +719,19 @@ <acronym>WAL</acronym> call being logged to the server log. This option might be replaced by a more general mechanism in the future. </para> + + <para> + The <xref linkend="guc-max-recovery-prefetch-distance"/> parameter can + be used to improve I/O performance during recovery by instructing + <productname>PostgreSQL</productname> to initiate reads + of disk blocks that will soon be needed, in combination with the + <xref linkend="guc-maintenance-io-concurrency"/> parameter. The + prefetching mechanism is most likely to be effective on systems + with <varname>full_page_writes</varname> set to + <varname>off</varname> (where that is safe), and where the working + set is larger than RAM. By default, prefetching in recovery is enabled, + but it can be disabled by setting the distance to -1. + </para> </sect1> <sect1 id="wal-internals"> diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile index 595e02de72..39f9d4e77d 100644 --- a/src/backend/access/transam/Makefile +++ b/src/backend/access/transam/Makefile @@ -31,6 +31,7 @@ OBJS = \ xlogarchive.o \ xlogfuncs.o \ xloginsert.o \ + xlogprefetch.o \ xlogreader.o \ xlogutils.o diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index c60842ea03..6b2e95c06c 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -35,6 +35,7 @@ #include "access/xlog_internal.h" #include "access/xlogarchive.h" #include "access/xloginsert.h" +#include "access/xlogprefetch.h" #include "access/xlogreader.h" #include "access/xlogutils.h" #include "catalog/catversion.h" @@ -7144,6 +7145,7 @@ StartupXLOG(void) { ErrorContextCallback errcallback; TimestampTz xtime; + XLogPrefetchState prefetch; InRedo = true; @@ -7151,6 +7153,9 @@ StartupXLOG(void) (errmsg("redo starts at %X/%X", (uint32) (ReadRecPtr >> 32), (uint32) ReadRecPtr))); + /* Prepare to prefetch, if configured. */ + XLogPrefetchBegin(&prefetch); + /* * main redo apply loop */ @@ -7181,6 +7186,12 @@ StartupXLOG(void) /* Handle interrupt signals of startup process */ HandleStartupProcInterrupts(); + /* Peform WAL prefetching, if enabled. */ + XLogPrefetch(&prefetch, + ThisTimeLineID, + xlogreader->ReadRecPtr, + currentSource == XLOG_FROM_STREAM); + /* * Pause WAL replay, if requested by a hot-standby session via * SetRecoveryPause(). @@ -7352,6 +7363,9 @@ StartupXLOG(void) */ if (switchedTLI && AllowCascadeReplication()) WalSndWakeup(); + + /* Reset the prefetcher. */ + XLogPrefetchReconfigure(); } /* Exit loop if we reached inclusive recovery target */ @@ -7379,6 +7393,7 @@ StartupXLOG(void) /* * end of main redo apply loop */ + XLogPrefetchEnd(&prefetch); if (reachedRecoveryTarget) { @@ -12107,6 +12122,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, */ currentSource = XLOG_FROM_STREAM; startWalReceiver = true; + XLogPrefetchReconfigure(); break; case XLOG_FROM_STREAM: diff --git a/src/backend/access/transam/xlogprefetch.c b/src/backend/access/transam/xlogprefetch.c new file mode 100644 index 0000000000..7d3aea53f7 --- /dev/null +++ b/src/backend/access/transam/xlogprefetch.c @@ -0,0 +1,905 @@ +/*------------------------------------------------------------------------- + * + * xlogprefetch.c + * Prefetching support for recovery. + * + * Portions Copyright (c) 2020, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/access/transam/xlogprefetch.c + * + * The goal of this module is to read future WAL records and issue + * PrefetchSharedBuffer() calls for referenced blocks, so that we avoid I/O + * stalls in the main recovery loop. Currently, this is achieved by using a + * separate XLogReader to read ahead. In future, we should find a way to + * avoid reading and decoding each record twice. + * + * When examining a WAL record from the future, we need to consider that a + * referenced block or segment file might not exist on disk until this record + * or some earlier record has been replayed. After a crash, a file might also + * be missing because it was dropped by a later WAL record; in that case, it + * will be recreated when this record is replayed. These cases are handled by + * recognizing them and adding a "filter" that prevents all prefetching of a + * certain block range until the present WAL record has been replayed. Blocks + * skipped for these reasons are counted as "skip_new" (that is, cases where we + * didn't try to prefetch "new" blocks). + * + * Blocks found in the buffer pool already are counted as "skip_hit". + * Repeated access to the same buffer is detected and skipped, and this is + * counted with "skip_seq". Blocks that were logged with FPWs are skipped if + * recovery_prefetch_fpw is off, since on most systems there will be no I/O + * stall; this is counted with "skip_fpw". + * + * The only way we currently have to know that an I/O initiated with + * PrefetchSharedBuffer() has completed is to call ReadBuffer(). Therefore, + * we track the number of potentially in-flight I/Os by using a circular + * buffer of LSNs. When it's full, we have to wait for recovery to replay + * records so that the queue depth can be reduced, before we can do any more + * prefetching. Ideally, this keeps us the right distance ahead to respect + * maintenance_io_concurrency. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/xlog.h" +#include "access/xlogprefetch.h" +#include "access/xlogreader.h" +#include "access/xlogutils.h" +#include "catalog/storage_xlog.h" +#include "utils/fmgrprotos.h" +#include "utils/timestamp.h" +#include "funcapi.h" +#include "pgstat.h" +#include "miscadmin.h" +#include "port/atomics.h" +#include "storage/bufmgr.h" +#include "storage/shmem.h" +#include "storage/smgr.h" +#include "utils/guc.h" +#include "utils/hsearch.h" + +/* + * Sample the queue depth and distance every time we replay this much WAL. + * This is used to compute avg_queue_depth and avg_distance for the log + * message that appears at the end of crash recovery. It's also used to send + * messages periodically to the stats collector, to save the counters on disk. + */ +#define XLOGPREFETCHER_SAMPLE_DISTANCE 0x40000 + +/* GUCs */ +int max_recovery_prefetch_distance = -1; +bool recovery_prefetch_fpw = false; + +int XLogPrefetchReconfigureCount; + +/* + * A prefetcher object. There is at most one of these in existence at a time, + * recreated whenever there is a configuration change. + */ +struct XLogPrefetcher +{ + /* Reader and current reading state. */ + XLogReaderState *reader; + XLogReadLocalOptions options; + bool have_record; + bool shutdown; + int next_block_id; + + /* Details of last prefetch to skip repeats and seq scans. */ + SMgrRelation last_reln; + RelFileNode last_rnode; + BlockNumber last_blkno; + + /* Online averages. */ + uint64 samples; + double avg_queue_depth; + double avg_distance; + XLogRecPtr next_sample_lsn; + + /* Book-keeping required to avoid accessing non-existing blocks. */ + HTAB *filter_table; + dlist_head filter_queue; + + /* Book-keeping required to limit concurrent prefetches. */ + int prefetch_head; + int prefetch_tail; + int prefetch_queue_size; + XLogRecPtr prefetch_queue[FLEXIBLE_ARRAY_MEMBER]; +}; + +/* + * A temporary filter used to track block ranges that haven't been created + * yet, whole relations that haven't been created yet, and whole relations + * that we must assume have already been dropped. + */ +typedef struct XLogPrefetcherFilter +{ + RelFileNode rnode; + XLogRecPtr filter_until_replayed; + BlockNumber filter_from_block; + dlist_node link; +} XLogPrefetcherFilter; + +/* + * Counters exposed in shared memory for pg_stat_prefetch_recovery. + */ +typedef struct XLogPrefetchStats +{ + pg_atomic_uint64 reset_time; /* Time of last reset. */ + pg_atomic_uint64 prefetch; /* Prefetches initiated. */ + pg_atomic_uint64 skip_hit; /* Blocks already buffered. */ + pg_atomic_uint64 skip_new; /* New/missing blocks filtered. */ + pg_atomic_uint64 skip_fpw; /* FPWs skipped. */ + pg_atomic_uint64 skip_seq; /* Repeat blocks skipped. */ + float avg_distance; + float avg_queue_depth; + + /* Reset counters */ + pg_atomic_uint32 reset_request; + uint32 reset_handled; + + /* Dynamic values */ + int distance; /* Number of bytes ahead in the WAL. */ + int queue_depth; /* Number of I/Os possibly in progress. */ +} XLogPrefetchStats; + +static inline void XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher, + RelFileNode rnode, + BlockNumber blockno, + XLogRecPtr lsn); +static inline bool XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher, + RelFileNode rnode, + BlockNumber blockno); +static inline void XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher, + XLogRecPtr replaying_lsn); +static inline void XLogPrefetcherInitiatedIO(XLogPrefetcher *prefetcher, + XLogRecPtr prefetching_lsn); +static inline void XLogPrefetcherCompletedIO(XLogPrefetcher *prefetcher, + XLogRecPtr replaying_lsn); +static inline bool XLogPrefetcherSaturated(XLogPrefetcher *prefetcher); +static void XLogPrefetcherScanRecords(XLogPrefetcher *prefetcher, + XLogRecPtr replaying_lsn); +static bool XLogPrefetcherScanBlocks(XLogPrefetcher *prefetcher); +static void XLogPrefetchSaveStats(void); +static void XLogPrefetchRestoreStats(void); + +static XLogPrefetchStats *Stats; + +size_t +XLogPrefetchShmemSize(void) +{ + return sizeof(XLogPrefetchStats); +} + +static void +XLogPrefetchResetStats(void) +{ + pg_atomic_write_u64(&Stats->reset_time, GetCurrentTimestamp()); + pg_atomic_write_u64(&Stats->prefetch, 0); + pg_atomic_write_u64(&Stats->skip_hit, 0); + pg_atomic_write_u64(&Stats->skip_new, 0); + pg_atomic_write_u64(&Stats->skip_fpw, 0); + pg_atomic_write_u64(&Stats->skip_seq, 0); + Stats->avg_distance = 0; + Stats->avg_queue_depth = 0; +} + +void +XLogPrefetchShmemInit(void) +{ + bool found; + + Stats = (XLogPrefetchStats *) + ShmemInitStruct("XLogPrefetchStats", + sizeof(XLogPrefetchStats), + &found); + if (!found) + { + pg_atomic_init_u32(&Stats->reset_request, 0); + Stats->reset_handled = 0; + pg_atomic_init_u64(&Stats->reset_time, GetCurrentTimestamp()); + pg_atomic_init_u64(&Stats->prefetch, 0); + pg_atomic_init_u64(&Stats->skip_hit, 0); + pg_atomic_init_u64(&Stats->skip_new, 0); + pg_atomic_init_u64(&Stats->skip_fpw, 0); + pg_atomic_init_u64(&Stats->skip_seq, 0); + Stats->avg_distance = 0; + Stats->avg_queue_depth = 0; + Stats->distance = 0; + Stats->queue_depth = 0; + } +} + +/* + * Called when any GUC is changed that affects prefetching. + */ +void +XLogPrefetchReconfigure(void) +{ + XLogPrefetchReconfigureCount++; +} + +/* + * Called by any backend to request that the stats be reset. + */ +void +XLogPrefetchRequestResetStats(void) +{ + pg_atomic_fetch_add_u32(&Stats->reset_request, 1); +} + +/* + * Tell the stats collector to serialize the shared memory counters into the + * stats file. + */ +static void +XLogPrefetchSaveStats(void) +{ + PgStat_RecoveryPrefetchStats serialized = { + .prefetch = pg_atomic_read_u64(&Stats->prefetch), + .skip_hit = pg_atomic_read_u64(&Stats->skip_hit), + .skip_new = pg_atomic_read_u64(&Stats->skip_new), + .skip_fpw = pg_atomic_read_u64(&Stats->skip_fpw), + .skip_seq = pg_atomic_read_u64(&Stats->skip_seq), + .stat_reset_timestamp = pg_atomic_read_u64(&Stats->reset_time) + }; + + pgstat_send_recoveryprefetch(&serialized); +} + +/* + * Try to restore the shared memory counters from the stats file. + */ +static void +XLogPrefetchRestoreStats(void) +{ + PgStat_RecoveryPrefetchStats *serialized = pgstat_fetch_recoveryprefetch(); + + if (serialized->stat_reset_timestamp != 0) + { + pg_atomic_write_u64(&Stats->prefetch, serialized->prefetch); + pg_atomic_write_u64(&Stats->skip_hit, serialized->skip_hit); + pg_atomic_write_u64(&Stats->skip_new, serialized->skip_new); + pg_atomic_write_u64(&Stats->skip_fpw, serialized->skip_fpw); + pg_atomic_write_u64(&Stats->skip_seq, serialized->skip_seq); + pg_atomic_write_u64(&Stats->reset_time, serialized->stat_reset_timestamp); + } +} + +/* + * Initialize an XLogPrefetchState object and restore the last saved + * statistics from disk. + */ +void +XLogPrefetchBegin(XLogPrefetchState *state) +{ + XLogPrefetchRestoreStats(); + + /* We'll reconfigure on the first call to XLogPrefetch(). */ + state->prefetcher = NULL; + state->reconfigure_count = XLogPrefetchReconfigureCount - 1; +} + +/* + * Shut down the prefetching infrastructure, if configured. + */ +void +XLogPrefetchEnd(XLogPrefetchState *state) +{ + XLogPrefetchSaveStats(); + + if (state->prefetcher) + XLogPrefetcherFree(state->prefetcher); + state->prefetcher = NULL; + + Stats->queue_depth = 0; + Stats->distance = 0; +} + +/* + * Create a prefetcher that is ready to begin prefetching blocks referenced by + * WAL that is ahead of the given lsn. + */ +XLogPrefetcher * +XLogPrefetcherAllocate(TimeLineID tli, XLogRecPtr lsn, bool streaming) +{ + XLogPrefetcher *prefetcher; + static HASHCTL hash_table_ctl = { + .keysize = sizeof(RelFileNode), + .entrysize = sizeof(XLogPrefetcherFilter) + }; + + /* + * The size of the queue is based on the maintenance_io_concurrency + * setting. In theory we might have a separate queue for each tablespace, + * but it's not clear how that should work, so for now we'll just use the + * general GUC to rate-limit all prefetching. We add one to the size + * because our circular buffer has a gap between head and tail when full. + */ + prefetcher = palloc0(offsetof(XLogPrefetcher, prefetch_queue) + + sizeof(XLogRecPtr) * (maintenance_io_concurrency + 1)); + prefetcher->prefetch_queue_size = maintenance_io_concurrency + 1; + prefetcher->options.tli = tli; + prefetcher->options.nowait = true; + if (streaming) + { + /* + * We're only allowed to read as far as the WAL receiver has written. + * We don't have to wait for it to be flushed, though, as recovery + * does, so that gives us a chance to get a bit further ahead. + */ + prefetcher->options.read_upto_policy = XLRO_WALRCV_WRITTEN; + } + else + { + /* Read as far as we can. */ + prefetcher->options.read_upto_policy = XLRO_END; + } + prefetcher->reader = XLogReaderAllocate(wal_segment_size, + NULL, + read_local_xlog_page, + NULL); + prefetcher->reader->read_page_data = &prefetcher->options; + prefetcher->filter_table = hash_create("XLogPrefetcherFilterTable", 1024, + &hash_table_ctl, + HASH_ELEM | HASH_BLOBS); + dlist_init(&prefetcher->filter_queue); + + /* Prepare to read at the given LSN. */ + ereport(LOG, + (errmsg("recovery started prefetching on timeline %u at %X/%X", + tli, + (uint32) (lsn << 32), (uint32) lsn))); + XLogBeginRead(prefetcher->reader, lsn); + + Stats->queue_depth = 0; + Stats->distance = 0; + + return prefetcher; +} + +/* + * Destroy a prefetcher and release all resources. + */ +void +XLogPrefetcherFree(XLogPrefetcher *prefetcher) +{ + /* Log final statistics. */ + ereport(LOG, + (errmsg("recovery finished prefetching at %X/%X; " + "prefetch = " UINT64_FORMAT ", " + "skip_hit = " UINT64_FORMAT ", " + "skip_new = " UINT64_FORMAT ", " + "skip_fpw = " UINT64_FORMAT ", " + "skip_seq = " UINT64_FORMAT ", " + "avg_distance = %f, " + "avg_queue_depth = %f", + (uint32) (prefetcher->reader->EndRecPtr << 32), + (uint32) (prefetcher->reader->EndRecPtr), + pg_atomic_read_u64(&Stats->prefetch), + pg_atomic_read_u64(&Stats->skip_hit), + pg_atomic_read_u64(&Stats->skip_new), + pg_atomic_read_u64(&Stats->skip_fpw), + pg_atomic_read_u64(&Stats->skip_seq), + Stats->avg_distance, + Stats->avg_queue_depth))); + XLogReaderFree(prefetcher->reader); + hash_destroy(prefetcher->filter_table); + pfree(prefetcher); +} + +/* + * Called when recovery is replaying a new LSN, to check if we can read ahead. + */ +void +XLogPrefetcherReadAhead(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn) +{ + uint32 reset_request; + + /* If an error has occurred or we've hit the end of the WAL, do nothing. */ + if (prefetcher->shutdown) + return; + + /* + * Have any in-flight prefetches definitely completed, judging by the LSN + * that is currently being replayed? + */ + XLogPrefetcherCompletedIO(prefetcher, replaying_lsn); + + /* + * Do we already have the maximum permitted number of I/Os running + * (according to the information we have)? If so, we have to wait for at + * least one to complete, so give up early and let recovery catch up. + */ + if (XLogPrefetcherSaturated(prefetcher)) + return; + + /* + * Can we drop any filters yet? This happens when the LSN that is + * currently being replayed has moved past a record that prevents + * pretching of a block range, such as relation extension. + */ + XLogPrefetcherCompleteFilters(prefetcher, replaying_lsn); + + /* + * Have we been asked to reset our stats counters? This is checked with + * an unsynchronized memory read, but we'll see it eventually and we'll be + * accessing that cache line anyway. + */ + reset_request = pg_atomic_read_u32(&Stats->reset_request); + if (reset_request != Stats->reset_handled) + { + XLogPrefetchResetStats(); + Stats->reset_handled = reset_request; + prefetcher->avg_distance = 0; + prefetcher->avg_queue_depth = 0; + prefetcher->samples = 0; + } + + /* OK, we can now try reading ahead. */ + XLogPrefetcherScanRecords(prefetcher, replaying_lsn); +} + +/* + * Read ahead as far as we are allowed to, considering the LSN that recovery + * is currently replaying. + */ +static void +XLogPrefetcherScanRecords(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn) +{ + XLogReaderState *reader = prefetcher->reader; + + Assert(!XLogPrefetcherSaturated(prefetcher)); + + for (;;) + { + char *error; + int64 distance; + + /* If we don't already have a record, then try to read one. */ + if (!prefetcher->have_record) + { + if (!XLogReadRecord(reader, &error)) + { + /* If we got an error, log it and give up. */ + if (error) + { + ereport(LOG, (errmsg("recovery no longer prefetching: %s", error))); + prefetcher->shutdown = true; + Stats->queue_depth = 0; + Stats->distance = 0; + } + /* Otherwise, we'll try again later when more data is here. */ + return; + } + prefetcher->have_record = true; + prefetcher->next_block_id = 0; + } + + /* How far ahead of replay are we now? */ + distance = prefetcher->reader->ReadRecPtr - replaying_lsn; + + /* Update distance shown in shm. */ + Stats->distance = distance; + + /* Periodically recompute some statistics. */ + if (unlikely(replaying_lsn >= prefetcher->next_sample_lsn)) + { + /* Compute online averages. */ + prefetcher->samples++; + if (prefetcher->samples == 1) + { + prefetcher->avg_distance = Stats->distance; + prefetcher->avg_queue_depth = Stats->queue_depth; + } + else + { + prefetcher->avg_distance += + (Stats->distance - prefetcher->avg_distance) / + prefetcher->samples; + prefetcher->avg_queue_depth += + (Stats->queue_depth - prefetcher->avg_queue_depth) / + prefetcher->samples; + } + + /* Expose it in shared memory. */ + Stats->avg_distance = prefetcher->avg_distance; + Stats->avg_queue_depth = prefetcher->avg_queue_depth; + + /* Also periodically save the simple counters. */ + XLogPrefetchSaveStats(); + + prefetcher->next_sample_lsn = + replaying_lsn + XLOGPREFETCHER_SAMPLE_DISTANCE; + } + + /* Are we too far ahead of replay? */ + if (distance >= max_recovery_prefetch_distance) + break; + + /* Are we not far enough ahead? */ + if (distance <= 0) + { + prefetcher->have_record = false; /* skip this record */ + continue; + } + + /* + * If this is a record that creates a new SMGR relation, we'll avoid + * prefetching anything from that rnode until it has been replayed. + */ + if (replaying_lsn < reader->ReadRecPtr && + XLogRecGetRmid(reader) == RM_SMGR_ID && + (XLogRecGetInfo(reader) & ~XLR_INFO_MASK) == XLOG_SMGR_CREATE) + { + xl_smgr_create *xlrec = (xl_smgr_create *) XLogRecGetData(reader); + + XLogPrefetcherAddFilter(prefetcher, xlrec->rnode, 0, + reader->ReadRecPtr); + } + + /* Scan the record's block references. */ + if (!XLogPrefetcherScanBlocks(prefetcher)) + return; + + /* Advance to the next record. */ + prefetcher->have_record = false; + } +} + +/* + * Scan the current record for block references, and consider prefetching. + * + * Return true if we processed the current record to completion and still have + * queue space to process a new record, and false if we saturated the I/O + * queue and need to wait for recovery to advance before we continue. + */ +static bool +XLogPrefetcherScanBlocks(XLogPrefetcher *prefetcher) +{ + XLogReaderState *reader = prefetcher->reader; + + Assert(!XLogPrefetcherSaturated(prefetcher)); + + /* + * We might already have been partway through processing this record when + * our queue became saturated, so we need to start where we left off. + */ + for (int block_id = prefetcher->next_block_id; + block_id <= reader->max_block_id; + ++block_id) + { + PrefetchBufferResult prefetch; + DecodedBkpBlock *block = &reader->blocks[block_id]; + SMgrRelation reln; + + /* Ignore everything but the main fork for now. */ + if (block->forknum != MAIN_FORKNUM) + continue; + + /* + * If there is a full page image attached, we won't be reading the + * page, so you might think we should skip it. However, if the + * underlying filesystem uses larger logical blocks than us, it + * might still need to perform a read-before-write some time later. + * Therefore, only prefetch if configured to do so. + */ + if (block->has_image && !recovery_prefetch_fpw) + { + pg_atomic_unlocked_add_fetch_u64(&Stats->skip_fpw, 1); + continue; + } + + /* + * If this block will initialize a new page then it's probably an + * extension. Since it might create a new segment, we can't try + * to prefetch this block until the record has been replayed, or we + * might try to open a file that doesn't exist yet. + */ + if (block->flags & BKPBLOCK_WILL_INIT) + { + XLogPrefetcherAddFilter(prefetcher, block->rnode, block->blkno, + reader->ReadRecPtr); + pg_atomic_unlocked_add_fetch_u64(&Stats->skip_new, 1); + continue; + } + + /* Should we skip this block due to a filter? */ + if (XLogPrefetcherIsFiltered(prefetcher, block->rnode, block->blkno)) + { + pg_atomic_unlocked_add_fetch_u64(&Stats->skip_new, 1); + continue; + } + + /* Fast path for repeated references to the same relation. */ + if (RelFileNodeEquals(block->rnode, prefetcher->last_rnode)) + { + /* + * If this is a repeat access to the same block, then skip it. + * + * XXX We could also check for last_blkno + 1 too, and also update + * last_blkno; it's not clear if the kernel would do a better job + * of sequential prefetching. + */ + if (block->blkno == prefetcher->last_blkno) + { + pg_atomic_unlocked_add_fetch_u64(&Stats->skip_seq, 1); + continue; + } + + /* We can avoid calling smgropen(). */ + reln = prefetcher->last_reln; + } + else + { + /* Otherwise we have to open it. */ + reln = smgropen(block->rnode, InvalidBackendId); + prefetcher->last_rnode = block->rnode; + prefetcher->last_reln = reln; + } + prefetcher->last_blkno = block->blkno; + + /* Try to prefetch this block! */ + prefetch = PrefetchSharedBuffer(reln, block->forknum, block->blkno); + if (BufferIsValid(prefetch.recent_buffer)) + { + /* + * It was already cached, so do nothing. Perhaps in future we + * could remember the buffer so that recovery doesn't have to look + * it up again. + */ + pg_atomic_unlocked_add_fetch_u64(&Stats->skip_hit, 1); + } + else if (prefetch.initiated_io) + { + /* + * I/O has possibly been initiated (though we don't know if it + * was already cached by the kernel, so we just have to assume + * that it has due to lack of better information). Record + * this as an I/O in progress until eventually we replay this + * LSN. + */ + pg_atomic_unlocked_add_fetch_u64(&Stats->prefetch, 1); + XLogPrefetcherInitiatedIO(prefetcher, reader->ReadRecPtr); + /* + * If the queue is now full, we'll have to wait before processing + * any more blocks from this record, or move to a new record if + * that was the last block. + */ + if (XLogPrefetcherSaturated(prefetcher)) + { + prefetcher->next_block_id = block_id + 1; + return false; + } + } + else + { + /* + * Neither cached nor initiated. The underlying segment file + * doesn't exist. Presumably it will be unlinked by a later WAL + * record. When recovery reads this block, it will use the + * EXTENSION_CREATE_RECOVERY flag. We certainly don't want to do + * that sort of thing while merely prefetching, so let's just + * ignore references to this relation until this record is + * replayed, and let recovery create the dummy file or complain if + * something is wrong. + */ + XLogPrefetcherAddFilter(prefetcher, block->rnode, 0, + reader->ReadRecPtr); + pg_atomic_unlocked_add_fetch_u64(&Stats->skip_new, 1); + } + } + + return true; +} + +/* + * Expose statistics about recovery prefetching. + */ +Datum +pg_stat_get_prefetch_recovery(PG_FUNCTION_ARGS) +{ +#define PG_STAT_GET_PREFETCH_RECOVERY_COLS 10 + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + TupleDesc tupdesc; + Tuplestorestate *tupstore; + MemoryContext per_query_ctx; + MemoryContext oldcontext; + Datum values[PG_STAT_GET_PREFETCH_RECOVERY_COLS]; + bool nulls[PG_STAT_GET_PREFETCH_RECOVERY_COLS]; + + if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("set-valued function called in context that cannot accept a set"))); + if (!(rsinfo->allowedModes & SFRM_Materialize)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("materialize mod required, but it is not allowed in this context"))); + + if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) + elog(ERROR, "return type must be a row type"); + + per_query_ctx = rsinfo->econtext->ecxt_per_query_memory; + oldcontext = MemoryContextSwitchTo(per_query_ctx); + + tupstore = tuplestore_begin_heap(true, false, work_mem); + rsinfo->returnMode = SFRM_Materialize; + rsinfo->setResult = tupstore; + rsinfo->setDesc = tupdesc; + + MemoryContextSwitchTo(oldcontext); + + if (pg_atomic_read_u32(&Stats->reset_request) != Stats->reset_handled) + { + /* There's an unhandled reset request, so just show NULLs */ + for (int i = 0; i < PG_STAT_GET_PREFETCH_RECOVERY_COLS; ++i) + nulls[i] = true; + } + else + { + for (int i = 0; i < PG_STAT_GET_PREFETCH_RECOVERY_COLS; ++i) + nulls[i] = false; + } + + values[0] = TimestampTzGetDatum(pg_atomic_read_u64(&Stats->reset_time)); + values[1] = Int64GetDatum(pg_atomic_read_u64(&Stats->prefetch)); + values[2] = Int64GetDatum(pg_atomic_read_u64(&Stats->skip_hit)); + values[3] = Int64GetDatum(pg_atomic_read_u64(&Stats->skip_new)); + values[4] = Int64GetDatum(pg_atomic_read_u64(&Stats->skip_fpw)); + values[5] = Int64GetDatum(pg_atomic_read_u64(&Stats->skip_seq)); + values[6] = Int32GetDatum(Stats->distance); + values[7] = Int32GetDatum(Stats->queue_depth); + values[8] = Float4GetDatum(Stats->avg_distance); + values[9] = Float4GetDatum(Stats->avg_queue_depth); + tuplestore_putvalues(tupstore, tupdesc, values, nulls); + tuplestore_donestoring(tupstore); + + return (Datum) 0; +} + +/* + * Don't prefetch any blocks >= 'blockno' from a given 'rnode', until 'lsn' + * has been replayed. + */ +static inline void +XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher, RelFileNode rnode, + BlockNumber blockno, XLogRecPtr lsn) +{ + XLogPrefetcherFilter *filter; + bool found; + + filter = hash_search(prefetcher->filter_table, &rnode, HASH_ENTER, &found); + if (!found) + { + /* + * Don't allow any prefetching of this block or higher until replayed. + */ + filter->filter_until_replayed = lsn; + filter->filter_from_block = blockno; + dlist_push_head(&prefetcher->filter_queue, &filter->link); + } + else + { + /* + * We were already filtering this rnode. Extend the filter's lifetime + * to cover this WAL record, but leave the (presumably lower) block + * number there because we don't want to have to track individual + * blocks. + */ + filter->filter_until_replayed = lsn; + dlist_delete(&filter->link); + dlist_push_head(&prefetcher->filter_queue, &filter->link); + } +} + +/* + * Have we replayed the records that caused us to begin filtering a block + * range? That means that relations should have been created, extended or + * dropped as required, so we can drop relevant filters. + */ +static inline void +XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn) +{ + while (unlikely(!dlist_is_empty(&prefetcher->filter_queue))) + { + XLogPrefetcherFilter *filter = dlist_tail_element(XLogPrefetcherFilter, + link, + &prefetcher->filter_queue); + + if (filter->filter_until_replayed >= replaying_lsn) + break; + dlist_delete(&filter->link); + hash_search(prefetcher->filter_table, filter, HASH_REMOVE, NULL); + } +} + +/* + * Check if a given block should be skipped due to a filter. + */ +static inline bool +XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher, RelFileNode rnode, + BlockNumber blockno) +{ + /* + * Test for empty queue first, because we expect it to be empty most of the + * time and we can avoid the hash table lookup in that case. + */ + if (unlikely(!dlist_is_empty(&prefetcher->filter_queue))) + { + XLogPrefetcherFilter *filter = hash_search(prefetcher->filter_table, &rnode, + HASH_FIND, NULL); + + if (filter && filter->filter_from_block <= blockno) + return true; + } + + return false; +} + +/* + * Insert an LSN into the queue. The queue must not be full already. This + * tracks the fact that we have (to the best of our knowledge) initiated an + * I/O, so that we can impose a cap on concurrent prefetching. + */ +static inline void +XLogPrefetcherInitiatedIO(XLogPrefetcher *prefetcher, + XLogRecPtr prefetching_lsn) +{ + Assert(!XLogPrefetcherSaturated(prefetcher)); + prefetcher->prefetch_queue[prefetcher->prefetch_head++] = prefetching_lsn; + prefetcher->prefetch_head %= prefetcher->prefetch_queue_size; + Stats->queue_depth++; + Assert(Stats->queue_depth <= prefetcher->prefetch_queue_size); +} + +/* + * Have we replayed the records that caused us to initiate the oldest + * prefetches yet? That means that they're definitely finished, so we can can + * forget about them and allow ourselves to initiate more prefetches. For now + * we don't have any awareness of when I/O really completes. + */ +static inline void +XLogPrefetcherCompletedIO(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn) +{ + while (prefetcher->prefetch_head != prefetcher->prefetch_tail && + prefetcher->prefetch_queue[prefetcher->prefetch_tail] < replaying_lsn) + { + prefetcher->prefetch_tail++; + prefetcher->prefetch_tail %= prefetcher->prefetch_queue_size; + Stats->queue_depth--; + Assert(Stats->queue_depth >= 0); + } +} + +/* + * Check if the maximum allowed number of I/Os is already in flight. + */ +static inline bool +XLogPrefetcherSaturated(XLogPrefetcher *prefetcher) +{ + return (prefetcher->prefetch_head + 1) % prefetcher->prefetch_queue_size == + prefetcher->prefetch_tail; +} + +void +assign_max_recovery_prefetch_distance(int new_value, void *extra) +{ + /* Reconfigure prefetching, because a setting it depends on changed. */ + max_recovery_prefetch_distance = new_value; + if (AmStartupProcess()) + XLogPrefetchReconfigure(); +} + +void +assign_recovery_prefetch_fpw(bool new_value, void *extra) +{ + /* Reconfigure prefetching, because a setting it depends on changed. */ + recovery_prefetch_fpw = new_value; + if (AmStartupProcess()) + XLogPrefetchReconfigure(); +} diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index d406ea8118..3b15f5ef8e 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -825,6 +825,20 @@ CREATE VIEW pg_stat_wal_receiver AS FROM pg_stat_get_wal_receiver() s WHERE s.pid IS NOT NULL; +CREATE VIEW pg_stat_prefetch_recovery AS + SELECT + s.stats_reset, + s.prefetch, + s.skip_hit, + s.skip_new, + s.skip_fpw, + s.skip_seq, + s.distance, + s.queue_depth, + s.avg_distance, + s.avg_queue_depth + FROM pg_stat_get_prefetch_recovery() s; + CREATE VIEW pg_stat_subscription AS SELECT su.oid AS subid, diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index 50eea2e8a8..6c9ac5b29b 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -38,6 +38,7 @@ #include "access/transam.h" #include "access/twophase_rmgr.h" #include "access/xact.h" +#include "access/xlogprefetch.h" #include "catalog/pg_database.h" #include "catalog/pg_proc.h" #include "common/ip.h" @@ -276,6 +277,7 @@ static int localNumBackends = 0; static PgStat_ArchiverStats archiverStats; static PgStat_GlobalStats globalStats; static PgStat_SLRUStats slruStats[SLRU_NUM_ELEMENTS]; +static PgStat_RecoveryPrefetchStats recoveryPrefetchStats; /* * List of OIDs of databases we need to write out. If an entry is InvalidOid, @@ -348,6 +350,7 @@ static void pgstat_recv_analyze(PgStat_MsgAnalyze *msg, int len); static void pgstat_recv_archiver(PgStat_MsgArchiver *msg, int len); static void pgstat_recv_bgwriter(PgStat_MsgBgWriter *msg, int len); static void pgstat_recv_slru(PgStat_MsgSLRU *msg, int len); +static void pgstat_recv_recoveryprefetch(PgStat_MsgRecoveryPrefetch *msg, int len); static void pgstat_recv_funcstat(PgStat_MsgFuncstat *msg, int len); static void pgstat_recv_funcpurge(PgStat_MsgFuncpurge *msg, int len); static void pgstat_recv_recoveryconflict(PgStat_MsgRecoveryConflict *msg, int len); @@ -1364,11 +1367,20 @@ pgstat_reset_shared_counters(const char *target) msg.m_resettarget = RESET_ARCHIVER; else if (strcmp(target, "bgwriter") == 0) msg.m_resettarget = RESET_BGWRITER; + else if (strcmp(target, "prefetch_recovery") == 0) + { + /* + * We can't ask the stats collector to do this for us as it is not + * attached to shared memory. + */ + XLogPrefetchRequestResetStats(); + return; + } else ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("unrecognized reset target: \"%s\"", target), - errhint("Target must be \"archiver\" or \"bgwriter\"."))); + errhint("Target must be \"archiver\", \"bgwriter\" or \"prefetch_recovery\"."))); pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RESETSHAREDCOUNTER); pgstat_send(&msg, sizeof(msg)); @@ -2690,6 +2702,22 @@ pgstat_fetch_slru(void) } +/* + * --------- + * pgstat_fetch_recoveryprefetch() - + * + * Support function for restoring the counters managed by xlogprefetch.c. + * --------- + */ +PgStat_RecoveryPrefetchStats * +pgstat_fetch_recoveryprefetch(void) +{ + backend_read_statsfile(); + + return &recoveryPrefetchStats; +} + + /* ------------------------------------------------------------ * Functions for management of the shared-memory PgBackendStatus array * ------------------------------------------------------------ @@ -4440,6 +4468,23 @@ pgstat_send_slru(void) } +/* ---------- + * pgstat_send_recoveryprefetch() - + * + * Send recovery prefetch statistics to the collector + * ---------- + */ +void +pgstat_send_recoveryprefetch(PgStat_RecoveryPrefetchStats *stats) +{ + PgStat_MsgRecoveryPrefetch msg; + + pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RECOVERYPREFETCH); + msg.m_stats = *stats; + pgstat_send(&msg, sizeof(msg)); +} + + /* ---------- * PgstatCollectorMain() - * @@ -4636,6 +4681,10 @@ PgstatCollectorMain(int argc, char *argv[]) pgstat_recv_slru(&msg.msg_slru, len); break; + case PGSTAT_MTYPE_RECOVERYPREFETCH: + pgstat_recv_recoveryprefetch(&msg.msg_recoveryprefetch, len); + break; + case PGSTAT_MTYPE_FUNCSTAT: pgstat_recv_funcstat(&msg.msg_funcstat, len); break; @@ -4911,6 +4960,13 @@ pgstat_write_statsfiles(bool permanent, bool allDbs) rc = fwrite(slruStats, sizeof(slruStats), 1, fpout); (void) rc; /* we'll check for error with ferror */ + /* + * Write recovery prefetch stats struct + */ + rc = fwrite(&recoveryPrefetchStats, sizeof(recoveryPrefetchStats), 1, + fpout); + (void) rc; /* we'll check for error with ferror */ + /* * Walk through the database table. */ @@ -5170,6 +5226,7 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep) memset(&globalStats, 0, sizeof(globalStats)); memset(&archiverStats, 0, sizeof(archiverStats)); memset(&slruStats, 0, sizeof(slruStats)); + memset(&recoveryPrefetchStats, 0, sizeof(recoveryPrefetchStats)); /* * Set the current timestamp (will be kept only in case we can't load an @@ -5257,6 +5314,18 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep) goto done; } + /* + * Read recoveryPrefetchStats struct + */ + if (fread(&recoveryPrefetchStats, 1, sizeof(recoveryPrefetchStats), + fpin) != sizeof(recoveryPrefetchStats)) + { + ereport(pgStatRunningInCollector ? LOG : WARNING, + (errmsg("corrupted statistics file \"%s\"", statfile))); + memset(&recoveryPrefetchStats, 0, sizeof(recoveryPrefetchStats)); + goto done; + } + /* * We found an existing collector stats file. Read it and put all the * hashtable entries into place. @@ -5556,6 +5625,7 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent, PgStat_GlobalStats myGlobalStats; PgStat_ArchiverStats myArchiverStats; PgStat_SLRUStats mySLRUStats[SLRU_NUM_ELEMENTS]; + PgStat_RecoveryPrefetchStats myRecoveryPrefetchStats; FILE *fpin; int32 format_id; const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename; @@ -5621,6 +5691,18 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent, return false; } + /* + * Read recovery prefetch stats struct + */ + if (fread(&myRecoveryPrefetchStats, 1, sizeof(myRecoveryPrefetchStats), + fpin) != sizeof(myRecoveryPrefetchStats)) + { + ereport(pgStatRunningInCollector ? LOG : WARNING, + (errmsg("corrupted statistics file \"%s\"", statfile))); + FreeFile(fpin); + return false; + } + /* By default, we're going to return the timestamp of the global file. */ *ts = myGlobalStats.stats_timestamp; @@ -6420,6 +6502,18 @@ pgstat_recv_slru(PgStat_MsgSLRU *msg, int len) slruStats[msg->m_index].truncate += msg->m_truncate; } +/* ---------- + * pgstat_recv_recoveryprefetch() - + * + * Process a recovery prefetch message. + * ---------- + */ +static void +pgstat_recv_recoveryprefetch(PgStat_MsgRecoveryPrefetch *msg, int len) +{ + recoveryPrefetchStats = msg->m_stats; +} + /* ---------- * pgstat_recv_recoveryconflict() - * diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index 417840a8f1..a965ab9d35 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -21,6 +21,7 @@ #include "access/nbtree.h" #include "access/subtrans.h" #include "access/twophase.h" +#include "access/xlogprefetch.h" #include "commands/async.h" #include "commands/wait.h" #include "miscadmin.h" @@ -125,6 +126,7 @@ CreateSharedMemoryAndSemaphores(void) size = add_size(size, PredicateLockShmemSize()); size = add_size(size, ProcGlobalShmemSize()); size = add_size(size, XLOGShmemSize()); + size = add_size(size, XLogPrefetchShmemSize()); size = add_size(size, CLOGShmemSize()); size = add_size(size, CommitTsShmemSize()); size = add_size(size, SUBTRANSShmemSize()); @@ -214,6 +216,7 @@ CreateSharedMemoryAndSemaphores(void) * Set up xlog, clog, and buffers */ XLOGShmemInit(); + XLogPrefetchShmemInit(); CLOGShmemInit(); CommitTsShmemInit(); SUBTRANSShmemInit(); diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 5bdc02fce2..5ed7ed13e8 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -34,6 +34,7 @@ #include "access/twophase.h" #include "access/xact.h" #include "access/xlog_internal.h" +#include "access/xlogprefetch.h" #include "catalog/namespace.h" #include "catalog/pg_authid.h" #include "catalog/storage.h" @@ -198,6 +199,7 @@ static bool check_max_wal_senders(int *newval, void **extra, GucSource source); static bool check_autovacuum_work_mem(int *newval, void **extra, GucSource source); static bool check_effective_io_concurrency(int *newval, void **extra, GucSource source); static bool check_maintenance_io_concurrency(int *newval, void **extra, GucSource source); +static void assign_maintenance_io_concurrency(int newval, void *extra); static void assign_pgstat_temp_directory(const char *newval, void *extra); static bool check_application_name(char **newval, void **extra, GucSource source); static void assign_application_name(const char *newval, void *extra); @@ -1272,6 +1274,18 @@ static struct config_bool ConfigureNamesBool[] = true, NULL, NULL, NULL }, + { + {"recovery_prefetch_fpw", PGC_SIGHUP, WAL_SETTINGS, + gettext_noop("Prefetch blocks that have full page images in the WAL"), + gettext_noop("On some systems, there is no benefit to prefetching pages that will be " + "entirely overwritten, but if the logical page size of the filesystem is " + "larger than PostgreSQL's, this can be beneficial. This option has no " + "effect unless max_recovery_prefetch_distance is set to a positive number.") + }, + &recovery_prefetch_fpw, + false, + NULL, assign_recovery_prefetch_fpw, NULL + }, { {"wal_log_hints", PGC_POSTMASTER, WAL_SETTINGS, @@ -2649,6 +2663,22 @@ static struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + { + {"max_recovery_prefetch_distance", PGC_SIGHUP, WAL_ARCHIVE_RECOVERY, + gettext_noop("Maximum number of bytes to read ahead in the WAL to prefetch referenced blocks."), + gettext_noop("Set to -1 to disable prefetching during recovery."), + GUC_UNIT_BYTE + }, + &max_recovery_prefetch_distance, +#ifdef USE_PREFETCH + 256 * 1024, +#else + -1, +#endif + -1, INT_MAX, + NULL, assign_max_recovery_prefetch_distance, NULL + }, + { {"wal_keep_segments", PGC_SIGHUP, REPLICATION_SENDING, gettext_noop("Sets the number of WAL files held for standby servers."), @@ -2968,7 +2998,8 @@ static struct config_int ConfigureNamesInt[] = 0, #endif 0, MAX_IO_CONCURRENCY, - check_maintenance_io_concurrency, NULL, NULL + check_maintenance_io_concurrency, assign_maintenance_io_concurrency, + NULL }, { @@ -11586,6 +11617,20 @@ check_maintenance_io_concurrency(int *newval, void **extra, GucSource source) return true; } +static void +assign_maintenance_io_concurrency(int newval, void *extra) +{ +#ifdef USE_PREFETCH + /* + * Reconfigure recovery prefetching, because a setting it depends on + * changed. + */ + maintenance_io_concurrency = newval; + if (AmStartupProcess()) + XLogPrefetchReconfigure(); +#endif +} + static void assign_pgstat_temp_directory(const char *newval, void *extra) { diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 995b6ca155..55cce90763 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -230,6 +230,11 @@ #checkpoint_flush_after = 0 # measured in pages, 0 disables #checkpoint_warning = 30s # 0 disables +# - Prefetching during recovery - + +#max_recovery_prefetch_distance = 256kB # -1 disables prefetching +#recovery_prefetch_fpw = off # whether to prefetch pages logged with FPW + # - Archiving - #archive_mode = off # enables archiving; off, on, or always diff --git a/src/include/access/xlogprefetch.h b/src/include/access/xlogprefetch.h new file mode 100644 index 0000000000..d8e2e1ca50 --- /dev/null +++ b/src/include/access/xlogprefetch.h @@ -0,0 +1,85 @@ +/*------------------------------------------------------------------------- + * + * xlogprefetch.h + * Declarations for the recovery prefetching module. + * + * Portions Copyright (c) 2020, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/include/access/xlogprefetch.h + *------------------------------------------------------------------------- + */ +#ifndef XLOGPREFETCH_H +#define XLOGPREFETCH_H + +#include "access/xlogdefs.h" + +/* GUCs */ +extern int max_recovery_prefetch_distance; +extern bool recovery_prefetch_fpw; + +struct XLogPrefetcher; +typedef struct XLogPrefetcher XLogPrefetcher; + +extern int XLogPrefetchReconfigureCount; + +typedef struct XLogPrefetchState +{ + XLogPrefetcher *prefetcher; + int reconfigure_count; +} XLogPrefetchState; + +extern size_t XLogPrefetchShmemSize(void); +extern void XLogPrefetchShmemInit(void); + +extern void XLogPrefetchReconfigure(void); +extern void XLogPrefetchRequestResetStats(void); + +extern void XLogPrefetchBegin(XLogPrefetchState *state); +extern void XLogPrefetchEnd(XLogPrefetchState *state); + +/* Functions exposed only for the use of XLogPrefetch(). */ +extern XLogPrefetcher *XLogPrefetcherAllocate(TimeLineID tli, + XLogRecPtr lsn, + bool streaming); +extern void XLogPrefetcherFree(XLogPrefetcher *prefetcher); +extern void XLogPrefetcherReadAhead(XLogPrefetcher *prefetch, + XLogRecPtr replaying_lsn); + +/* + * Tell the prefetching module that we are now replaying a given LSN, so that + * it can decide how far ahead to read in the WAL, if configured. + */ +static inline void +XLogPrefetch(XLogPrefetchState *state, + TimeLineID replaying_tli, + XLogRecPtr replaying_lsn, + bool from_stream) +{ + /* + * Handle any configuration changes. Rather than trying to deal with + * various parameter changes, we just tear down and set up a new + * prefetcher if anything we depend on changes. + */ + if (unlikely(state->reconfigure_count != XLogPrefetchReconfigureCount)) + { + /* If we had a prefetcher, tear it down. */ + if (state->prefetcher) + { + XLogPrefetcherFree(state->prefetcher); + state->prefetcher = NULL; + } + /* If we want a prefetcher, set it up. */ + if (max_recovery_prefetch_distance > 0) + state->prefetcher = XLogPrefetcherAllocate(replaying_tli, + replaying_lsn, + from_stream); + state->reconfigure_count = XLogPrefetchReconfigureCount; + } + + if (state->prefetcher) + XLogPrefetcherReadAhead(state->prefetcher, replaying_lsn); +} + +#endif diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 4bce3ad8de..9f5f0ed4c8 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -6138,6 +6138,14 @@ prorettype => 'bool', proargtypes => '', prosrc => 'pg_is_wal_replay_paused' }, +{ oid => '9085', descr => 'statistics: information about WAL prefetching', + proname => 'pg_stat_get_prefetch_recovery', prorows => '1', provolatile => 'v', + proretset => 't', prorettype => 'record', proargtypes => '', + proallargtypes => '{timestamptz,int8,int8,int8,int8,int8,int4,int4,float4,float4}', + proargmodes => '{o,o,o,o,o,o,o,o,o,o}', + proargnames => '{stats_reset,prefetch,skip_hit,skip_new,skip_fpw,skip_seq,distance,queue_depth,avg_distance,avg_queue_depth}', + prosrc => 'pg_stat_get_prefetch_recovery' }, + { oid => '2621', descr => 'reload configuration files', proname => 'pg_reload_conf', provolatile => 'v', prorettype => 'bool', proargtypes => '', prosrc => 'pg_reload_conf' }, diff --git a/src/include/pgstat.h b/src/include/pgstat.h index b8041d9988..105c2e77d2 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -63,6 +63,7 @@ typedef enum StatMsgType PGSTAT_MTYPE_ARCHIVER, PGSTAT_MTYPE_BGWRITER, PGSTAT_MTYPE_SLRU, + PGSTAT_MTYPE_RECOVERYPREFETCH, PGSTAT_MTYPE_FUNCSTAT, PGSTAT_MTYPE_FUNCPURGE, PGSTAT_MTYPE_RECOVERYCONFLICT, @@ -183,6 +184,19 @@ typedef struct PgStat_TableXactStatus struct PgStat_TableXactStatus *next; /* next of same subxact */ } PgStat_TableXactStatus; +/* + * Recovery prefetching statistics persisted on disk by pgstat.c, but kept in + * shared memory by xlogprefetch.c. + */ +typedef struct PgStat_RecoveryPrefetchStats +{ + PgStat_Counter prefetch; + PgStat_Counter skip_hit; + PgStat_Counter skip_new; + PgStat_Counter skip_fpw; + PgStat_Counter skip_seq; + TimestampTz stat_reset_timestamp; +} PgStat_RecoveryPrefetchStats; /* ------------------------------------------------------------ * Message formats follow @@ -454,6 +468,16 @@ typedef struct PgStat_MsgSLRU PgStat_Counter m_truncate; } PgStat_MsgSLRU; +/* ---------- + * PgStat_MsgRecoveryPrefetch Sent by XLogPrefetch to save statistics. + * ---------- + */ +typedef struct PgStat_MsgRecoveryPrefetch +{ + PgStat_MsgHdr m_hdr; + PgStat_RecoveryPrefetchStats m_stats; +} PgStat_MsgRecoveryPrefetch; + /* ---------- * PgStat_MsgRecoveryConflict Sent by the backend upon recovery conflict * ---------- @@ -598,6 +622,7 @@ typedef union PgStat_Msg PgStat_MsgArchiver msg_archiver; PgStat_MsgBgWriter msg_bgwriter; PgStat_MsgSLRU msg_slru; + PgStat_MsgRecoveryPrefetch msg_recoveryprefetch; PgStat_MsgFuncstat msg_funcstat; PgStat_MsgFuncpurge msg_funcpurge; PgStat_MsgRecoveryConflict msg_recoveryconflict; @@ -1464,6 +1489,7 @@ extern void pgstat_twophase_postabort(TransactionId xid, uint16 info, extern void pgstat_send_archiver(const char *xlog, bool failed); extern void pgstat_send_bgwriter(void); +extern void pgstat_send_recoveryprefetch(PgStat_RecoveryPrefetchStats *stats); /* ---------- * Support functions for the SQL-callable functions to @@ -1479,6 +1505,7 @@ extern int pgstat_fetch_stat_numbackends(void); extern PgStat_ArchiverStats *pgstat_fetch_stat_archiver(void); extern PgStat_GlobalStats *pgstat_fetch_global(void); extern PgStat_SLRUStats *pgstat_fetch_slru(void); +extern PgStat_RecoveryPrefetchStats *pgstat_fetch_recoveryprefetch(void); extern void pgstat_count_slru_page_zeroed(SlruCtl ctl); extern void pgstat_count_slru_page_hit(SlruCtl ctl); diff --git a/src/include/utils/guc.h b/src/include/utils/guc.h index 2819282181..976cf8b116 100644 --- a/src/include/utils/guc.h +++ b/src/include/utils/guc.h @@ -440,4 +440,8 @@ extern void assign_search_path(const char *newval, void *extra); extern bool check_wal_buffers(int *newval, void **extra, GucSource source); extern void assign_xlog_sync_method(int new_sync_method, void *extra); +/* in access/transam/xlogprefetch.c */ +extern void assign_max_recovery_prefetch_distance(int new_value, void *extra); +extern void assign_recovery_prefetch_fpw(bool new_value, void *extra); + #endif /* GUC_H */ diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index ac31840739..942a07ffee 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1857,6 +1857,17 @@ pg_stat_gssapi| SELECT s.pid, s.gss_enc AS encrypted FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid) WHERE (s.client_port IS NOT NULL); +pg_stat_prefetch_recovery| SELECT s.stats_reset, + s.prefetch, + s.skip_hit, + s.skip_new, + s.skip_fpw, + s.skip_seq, + s.distance, + s.queue_depth, + s.avg_distance, + s.avg_queue_depth + FROM pg_stat_get_prefetch_recovery() s(stats_reset, prefetch, skip_hit, skip_new, skip_fpw, skip_seq, distance, queue_depth, avg_distance, avg_queue_depth); pg_stat_progress_analyze| SELECT s.pid, s.datid, d.datname, -- 2.20.1