Hi, While working on WaitEventSet-ifying various codepaths, I found it strange that walreceiver wakes up 10 times per second while idle. Here's a draft patch to compute the correct sleep time.
From 553d2dae8f8e7bb46ac73ca739aac03862f473cc Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Thu, 27 Jan 2022 21:43:17 +1300 Subject: [PATCH] Suppress useless wakeups in walreceiver.
Instead of waking up 10 times per second to check for various timeout conditions, keep track of when we next have periodic work to do. --- src/backend/replication/walreceiver.c | 258 ++++++++++++++++---------- 1 file changed, 158 insertions(+), 100 deletions(-) diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index b39fce8c23..55ac67f35a 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -94,8 +94,6 @@ bool hot_standby_feedback; static WalReceiverConn *wrconn = NULL; WalReceiverFunctionsType *WalReceiverFunctions = NULL; -#define NAPTIME_PER_CYCLE 100 /* max sleep time between cycles (100ms) */ - /* * These variables are used similarly to openLogFile/SegNo, * but for walreceiver to write the XLOG. recvFileTLI is the TimeLineID @@ -115,6 +113,28 @@ static struct XLogRecPtr Flush; /* last byte + 1 flushed in the standby */ } LogstreamResult; +/* + * Reasons to wake up and perform periodic tasks. + */ +typedef enum WalRcvWakeupReason +{ + WALRCV_WAKEUP_TIMEOUT, + WALRCV_WAKEUP_PING, + WALRCV_WAKEUP_REPLY, + WALRCV_WAKEUP_HSFEEDBACK, + NUM_WALRCV_WAKEUPS +} WalRcvWakeupReason; + +/* + * A struct to keep track of non-shared state. + */ +typedef struct WalRcvInfo +{ + TimeLineID startpointTLI; + bool primary_has_standby_xmin; + TimestampTz wakeup[NUM_WALRCV_WAKEUPS]; +} WalRcvInfo; + static StringInfoData reply_message; static StringInfoData incoming_message; @@ -122,15 +142,24 @@ static StringInfoData incoming_message; static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last); static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI); static void WalRcvDie(int code, Datum arg); -static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, +static void XLogWalRcvProcessMsg(WalRcvInfo *state, + unsigned char type, char *buf, Size len, TimeLineID tli); -static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, - TimeLineID tli); -static void XLogWalRcvFlush(bool dying, TimeLineID tli); -static void XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli); -static void XLogWalRcvSendReply(bool force, bool requestReply); -static void XLogWalRcvSendHSFeedback(bool immed); +static void XLogWalRcvWrite(WalRcvInfo *state, char *buf, Size nbytes, + XLogRecPtr recptr, TimeLineID tli); +static void XLogWalRcvFlush(WalRcvInfo *state, bool dying, TimeLineID tli); +static void XLogWalRcvClose(WalRcvInfo *state, XLogRecPtr recptr, TimeLineID tli); +static void XLogWalRcvSendReply(WalRcvInfo *state, + TimestampTz now, + bool force, + bool requestReply); +static void XLogWalRcvSendHSFeedback(WalRcvInfo *state, + TimestampTz now, + bool immed); static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime); +static void WalRcvComputeNextWakeup(WalRcvInfo *state, + WalRcvWakeupReason reason, + TimestampTz now); /* * Process any interrupts the walreceiver process may have received. @@ -164,7 +193,6 @@ ProcessWalRcvInterrupts(void) } } - /* Main entry point for walreceiver process */ void WalReceiverMain(void) @@ -174,16 +202,20 @@ WalReceiverMain(void) char slotname[NAMEDATALEN]; bool is_temp_slot; XLogRecPtr startpoint; - TimeLineID startpointTLI; TimeLineID primaryTLI; bool first_stream; WalRcvData *walrcv = WalRcv; TimestampTz last_recv_timestamp; TimestampTz now; - bool ping_sent; char *err; char *sender_host = NULL; int sender_port = 0; + WalRcvInfo state = {0}; + + now = GetCurrentTimestamp(); + + /* initially true so we always send at least one feedback message */ + state.primary_has_standby_xmin = true; /* * WalRcv should be set up already (if we are a backend, we inherit this @@ -191,7 +223,6 @@ WalReceiverMain(void) */ Assert(walrcv != NULL); - now = GetCurrentTimestamp(); /* * Mark walreceiver as running in shared memory. @@ -237,7 +268,7 @@ WalReceiverMain(void) strlcpy(slotname, (char *) walrcv->slotname, NAMEDATALEN); is_temp_slot = walrcv->is_temp_slot; startpoint = walrcv->receiveStart; - startpointTLI = walrcv->receiveStartTLI; + state.startpointTLI = walrcv->receiveStartTLI; /* * At most one of is_temp_slot and slotname can be set; otherwise, @@ -257,7 +288,7 @@ WalReceiverMain(void) pg_atomic_write_u64(&WalRcv->writtenUpto, 0); /* Arrange to clean up at walreceiver exit */ - on_shmem_exit(WalRcvDie, PointerGetDatum(&startpointTLI)); + on_shmem_exit(WalRcvDie, PointerGetDatum(&state)); /* Properly accept or ignore signals the postmaster might send us */ pqsignal(SIGHUP, SignalHandlerForConfigReload); /* set flag to read config @@ -344,11 +375,11 @@ WalReceiverMain(void) * Confirm that the current timeline of the primary is the same or * ahead of ours. */ - if (primaryTLI < startpointTLI) + if (primaryTLI < state.startpointTLI) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("highest timeline %u of the primary is behind recovery timeline %u", - primaryTLI, startpointTLI))); + primaryTLI, state.startpointTLI))); /* * Get any missing history files. We do this always, even when we're @@ -360,7 +391,7 @@ WalReceiverMain(void) * but let's avoid the confusion of timeline id collisions where we * can. */ - WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI); + WalRcvFetchTimeLineHistoryFiles(state.startpointTLI, primaryTLI); /* * Create temporary replication slot if requested, and update slot @@ -395,17 +426,17 @@ WalReceiverMain(void) options.logical = false; options.startpoint = startpoint; options.slotname = slotname[0] != '\0' ? slotname : NULL; - options.proto.physical.startpointTLI = startpointTLI; + options.proto.physical.startpointTLI = state.startpointTLI; if (walrcv_startstreaming(wrconn, &options)) { if (first_stream) ereport(LOG, (errmsg("started streaming WAL from primary at %X/%X on timeline %u", - LSN_FORMAT_ARGS(startpoint), startpointTLI))); + LSN_FORMAT_ARGS(startpoint), state.startpointTLI))); else ereport(LOG, (errmsg("restarted WAL streaming at %X/%X on timeline %u", - LSN_FORMAT_ARGS(startpoint), startpointTLI))); + LSN_FORMAT_ARGS(startpoint), state.startpointTLI))); first_stream = false; /* Initialize LogstreamResult and buffers for processing messages */ @@ -415,7 +446,10 @@ WalReceiverMain(void) /* Initialize the last recv timestamp */ last_recv_timestamp = GetCurrentTimestamp(); - ping_sent = false; + + /* Initialize nap wakeup times. */ + for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i) + WalRcvComputeNextWakeup(&state, i, last_recv_timestamp); /* Loop until end-of-streaming or error */ for (;;) @@ -425,6 +459,8 @@ WalReceiverMain(void) bool endofwal = false; pgsocket wait_fd = PGINVALID_SOCKET; int rc; + TimestampTz nextWakeup; + int nap; /* * Exit walreceiver if we're not in recovery. This should not @@ -442,7 +478,11 @@ WalReceiverMain(void) { ConfigReloadPending = false; ProcessConfigFile(PGC_SIGHUP); - XLogWalRcvSendHSFeedback(true); + for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i) + WalRcvComputeNextWakeup(&state, i, now); + XLogWalRcvSendHSFeedback(&state, + GetCurrentTimestamp(), + true); } /* See if we can read data immediately */ @@ -458,13 +498,19 @@ WalReceiverMain(void) if (len > 0) { /* - * Something was received from primary, so reset - * timeout + * Something was received from primary, so adjust + * the ping and timeout wakeup times. */ last_recv_timestamp = GetCurrentTimestamp(); - ping_sent = false; - XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1, - startpointTLI); + WalRcvComputeNextWakeup(&state, + WALRCV_WAKEUP_TIMEOUT, + last_recv_timestamp); + WalRcvComputeNextWakeup(&state, + WALRCV_WAKEUP_PING, + last_recv_timestamp); + XLogWalRcvProcessMsg(&state, + buf[0], &buf[1], len - 1, + state.startpointTLI); } else if (len == 0) break; @@ -473,7 +519,7 @@ WalReceiverMain(void) ereport(LOG, (errmsg("replication terminated by primary server"), errdetail("End of WAL reached on timeline %u at %X/%X.", - startpointTLI, + state.startpointTLI, LSN_FORMAT_ARGS(LogstreamResult.Write)))); endofwal = true; break; @@ -482,20 +528,28 @@ WalReceiverMain(void) } /* Let the primary know that we received some data. */ - XLogWalRcvSendReply(false, false); + XLogWalRcvSendReply(&state, GetCurrentTimestamp(), + false, false); /* * If we've written some records, flush them to disk and * let the startup process and primary server know about * them. */ - XLogWalRcvFlush(false, startpointTLI); + XLogWalRcvFlush(&state, false, state.startpointTLI); } /* Check if we need to exit the streaming loop. */ if (endofwal) break; + /* Find the soonest wakeup time, to limit our nap. */ + now = GetCurrentTimestamp(); + nextWakeup = INT64_MAX; + for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i) + nextWakeup = Min(state.wakeup[i], nextWakeup); + nap = Max(0, (nextWakeup - now) / 1000); + /* * Ideally we would reuse a WaitEventSet object repeatedly * here to avoid the overheads of WaitLatchOrSocket on epoll @@ -512,7 +566,7 @@ WalReceiverMain(void) WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE | WL_TIMEOUT | WL_LATCH_SET, wait_fd, - NAPTIME_PER_CYCLE, + nap, WAIT_EVENT_WAL_RECEIVER_MAIN); if (rc & WL_LATCH_SET) { @@ -529,7 +583,7 @@ WalReceiverMain(void) */ walrcv->force_reply = false; pg_memory_barrier(); - XLogWalRcvSendReply(true, false); + XLogWalRcvSendReply(&state, now, true, false); } } if (rc & WL_TIMEOUT) @@ -551,14 +605,7 @@ WalReceiverMain(void) */ if (wal_receiver_timeout > 0) { - TimestampTz now = GetCurrentTimestamp(); - TimestampTz timeout; - - timeout = - TimestampTzPlusMilliseconds(last_recv_timestamp, - wal_receiver_timeout); - - if (now >= timeout) + if (now >= state.wakeup[WALRCV_WAKEUP_TIMEOUT]) ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), errmsg("terminating walreceiver due to timeout"))); @@ -567,20 +614,15 @@ WalReceiverMain(void) * We didn't receive anything new, for half of * receiver replication timeout. Ping the server. */ - if (!ping_sent) + if (now >= state.wakeup[WALRCV_WAKEUP_PING]) { - timeout = TimestampTzPlusMilliseconds(last_recv_timestamp, - (wal_receiver_timeout / 2)); - if (now >= timeout) - { - requestReply = true; - ping_sent = true; - } + requestReply = true; + state.wakeup[WALRCV_WAKEUP_PING] = INT64_MAX; } } - XLogWalRcvSendReply(requestReply, requestReply); - XLogWalRcvSendHSFeedback(false); + XLogWalRcvSendReply(&state, now, requestReply, requestReply); + XLogWalRcvSendHSFeedback(&state, now, false); } } @@ -595,12 +637,12 @@ WalReceiverMain(void) * know about when we began streaming, fetch its timeline history * file now. */ - WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI); + WalRcvFetchTimeLineHistoryFiles(state.startpointTLI, primaryTLI); } else ereport(LOG, (errmsg("primary server contains no more WAL on requested timeline %u", - startpointTLI))); + state.startpointTLI))); /* * End of WAL reached on the requested timeline. Close the last @@ -610,7 +652,7 @@ WalReceiverMain(void) { char xlogfname[MAXFNAMELEN]; - XLogWalRcvFlush(false, startpointTLI); + XLogWalRcvFlush(&state, false, state.startpointTLI); XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size); if (close(recvFile) != 0) ereport(PANIC, @@ -630,7 +672,7 @@ WalReceiverMain(void) recvFile = -1; elog(DEBUG1, "walreceiver ended streaming and awaits new instructions"); - WalRcvWaitForStartPosition(&startpoint, &startpointTLI); + WalRcvWaitForStartPosition(&startpoint, &state.startpointTLI); } /* not reached */ } @@ -778,12 +820,10 @@ static void WalRcvDie(int code, Datum arg) { WalRcvData *walrcv = WalRcv; - TimeLineID *startpointTLI_p = (TimeLineID *) DatumGetPointer(arg); - - Assert(*startpointTLI_p != 0); + WalRcvInfo *state = (WalRcvInfo *) DatumGetPointer(arg); /* Ensure that all WAL records received are flushed to disk */ - XLogWalRcvFlush(true, *startpointTLI_p); + XLogWalRcvFlush(state, true, state->startpointTLI); /* Mark ourselves inactive in shared memory */ SpinLockAcquire(&walrcv->mutex); @@ -813,7 +853,8 @@ WalRcvDie(int code, Datum arg) * Accept the message from XLOG stream, and process it. */ static void -XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli) +XLogWalRcvProcessMsg(WalRcvInfo *state, + unsigned char type, char *buf, Size len, TimeLineID tli) { int hdrlen; XLogRecPtr dataStart; @@ -843,7 +884,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli) buf += hdrlen; len -= hdrlen; - XLogWalRcvWrite(buf, len, dataStart, tli); + XLogWalRcvWrite(state, buf, len, dataStart, tli); break; } case 'k': /* Keepalive */ @@ -865,7 +906,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli) /* If the primary requested a reply, send one immediately */ if (replyRequested) - XLogWalRcvSendReply(true, false); + XLogWalRcvSendReply(state, GetCurrentTimestamp(), true, false); break; } default: @@ -880,7 +921,8 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli) * Write XLOG data to disk. */ static void -XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli) +XLogWalRcvWrite(WalRcvInfo *state, char *buf, Size nbytes, XLogRecPtr recptr, + TimeLineID tli) { int startoff; int byteswritten; @@ -893,7 +935,7 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli) /* Close the current segment if it's completed */ if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size)) - XLogWalRcvClose(recptr, tli); + XLogWalRcvClose(state, recptr, tli); if (recvFile < 0) { @@ -953,7 +995,7 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli) * segment is received and written. */ if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size)) - XLogWalRcvClose(recptr, tli); + XLogWalRcvClose(state, recptr, tli); } /* @@ -963,7 +1005,7 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli) * an error, so we skip sending a reply in that case. */ static void -XLogWalRcvFlush(bool dying, TimeLineID tli) +XLogWalRcvFlush(WalRcvInfo *state, bool dying, TimeLineID tli) { Assert(tli != 0); @@ -1003,8 +1045,8 @@ XLogWalRcvFlush(bool dying, TimeLineID tli) /* Also let the primary know that we made some progress */ if (!dying) { - XLogWalRcvSendReply(false, false); - XLogWalRcvSendHSFeedback(false); + XLogWalRcvSendReply(state, GetCurrentTimestamp(), false, false); + XLogWalRcvSendHSFeedback(state, GetCurrentTimestamp(), false); } } } @@ -1018,7 +1060,7 @@ XLogWalRcvFlush(bool dying, TimeLineID tli) * Create an archive notification file since the segment is known completed. */ static void -XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli) +XLogWalRcvClose(WalRcvInfo *state, XLogRecPtr recptr, TimeLineID tli) { char xlogfname[MAXFNAMELEN]; @@ -1029,7 +1071,7 @@ XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli) * fsync() and close current file before we switch to next one. We would * otherwise have to reopen this file to fsync it later */ - XLogWalRcvFlush(false, tli); + XLogWalRcvFlush(state, false, tli); XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size); @@ -1070,13 +1112,12 @@ XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli) * wal_receiver_timeout. */ static void -XLogWalRcvSendReply(bool force, bool requestReply) +XLogWalRcvSendReply(WalRcvInfo *state, TimestampTz now, bool force, + bool requestReply) { static XLogRecPtr writePtr = 0; static XLogRecPtr flushPtr = 0; XLogRecPtr applyPtr; - static TimestampTz sendTime = 0; - TimestampTz now; /* * If the user doesn't want status to be reported to the primary, be sure @@ -1085,9 +1126,6 @@ XLogWalRcvSendReply(bool force, bool requestReply) if (!force && wal_receiver_status_interval <= 0) return; - /* Get current timestamp. */ - now = GetCurrentTimestamp(); - /* * We can compare the write and flush positions to the last message we * sent without taking any lock, but the apply position requires a spin @@ -1100,10 +1138,11 @@ XLogWalRcvSendReply(bool force, bool requestReply) if (!force && writePtr == LogstreamResult.Write && flushPtr == LogstreamResult.Flush - && !TimestampDifferenceExceeds(sendTime, now, - wal_receiver_status_interval * 1000)) + && now < state->wakeup[WALRCV_WAKEUP_REPLY]) return; - sendTime = now; + + /* Make sure we wake up when it's time to send another reply. */ + WalRcvComputeNextWakeup(state, WALRCV_WAKEUP_REPLY, now); /* Construct a new message */ writePtr = LogstreamResult.Write; @@ -1139,41 +1178,29 @@ XLogWalRcvSendReply(bool force, bool requestReply) * send a feedback message explicitly setting InvalidTransactionId). */ static void -XLogWalRcvSendHSFeedback(bool immed) +XLogWalRcvSendHSFeedback(WalRcvInfo *state, TimestampTz now, bool immed) { - TimestampTz now; FullTransactionId nextFullXid; TransactionId nextXid; uint32 xmin_epoch, catalog_xmin_epoch; TransactionId xmin, catalog_xmin; - static TimestampTz sendTime = 0; - - /* initially true so we always send at least one feedback message */ - static bool primary_has_standby_xmin = true; /* * If the user doesn't want status to be reported to the primary, be sure * to exit before doing anything at all. */ if ((wal_receiver_status_interval <= 0 || !hot_standby_feedback) && - !primary_has_standby_xmin) + !state->primary_has_standby_xmin) return; - /* Get current timestamp. */ - now = GetCurrentTimestamp(); + /* Send feedback at most once per wal_receiver_status_interval. */ + if (!immed && now < state->wakeup[WALRCV_WAKEUP_HSFEEDBACK]) + return; - if (!immed) - { - /* - * Send feedback at most once per wal_receiver_status_interval. - */ - if (!TimestampDifferenceExceeds(sendTime, now, - wal_receiver_status_interval * 1000)) - return; - sendTime = now; - } + /* Make sure we wake up when it's time to send feedback again. */ + WalRcvComputeNextWakeup(state, WALRCV_WAKEUP_HSFEEDBACK, now); /* * If Hot Standby is not yet accepting connections there is nothing to @@ -1228,9 +1255,9 @@ XLogWalRcvSendHSFeedback(bool immed) pq_sendint32(&reply_message, catalog_xmin_epoch); walrcv_send(wrconn, reply_message.data, reply_message.len); if (TransactionIdIsValid(xmin) || TransactionIdIsValid(catalog_xmin)) - primary_has_standby_xmin = true; + state->primary_has_standby_xmin = true; else - primary_has_standby_xmin = false; + state->primary_has_standby_xmin = false; } /* @@ -1284,6 +1311,37 @@ ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime) } } +static void +WalRcvComputeNextWakeup(WalRcvInfo *state, + WalRcvWakeupReason reason, + TimestampTz now) +{ + switch (reason) + { + case WALRCV_WAKEUP_TIMEOUT: + state->wakeup[reason] = now + wal_receiver_timeout * 1000; + break; + case WALRCV_WAKEUP_PING: + state->wakeup[reason] = now + (wal_receiver_timeout / 2) * 1000; + break; + case WALRCV_WAKEUP_HSFEEDBACK: + if (!hot_standby_feedback) + { + state->wakeup[reason] = INT64_MAX; + break; + } + /* Fall through */ + case WALRCV_WAKEUP_REPLY: + if (wal_receiver_status_interval <= 0) + state->wakeup[reason] = INT64_MAX; + else + state->wakeup[reason] = now + wal_receiver_status_interval * 1000000; + break; + default: + ; + } +} + /* * Wake up the walreceiver main loop. * -- 2.30.2