Hi,
While working on WaitEventSet-ifying various codepaths, I found it
strange that walreceiver wakes up 10 times per second while idle.
Here's a draft patch to compute the correct sleep time.
From 553d2dae8f8e7bb46ac73ca739aac03862f473cc Mon Sep 17 00:00:00 2001
From: Thomas Munro <[email protected]>
Date: Thu, 27 Jan 2022 21:43:17 +1300
Subject: [PATCH] Suppress useless wakeups in walreceiver.
Instead of waking up 10 times per second to check for various timeout
conditions, keep track of when we next have periodic work to do.
---
src/backend/replication/walreceiver.c | 258 ++++++++++++++++----------
1 file changed, 158 insertions(+), 100 deletions(-)
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index b39fce8c23..55ac67f35a 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -94,8 +94,6 @@ bool hot_standby_feedback;
static WalReceiverConn *wrconn = NULL;
WalReceiverFunctionsType *WalReceiverFunctions = NULL;
-#define NAPTIME_PER_CYCLE 100 /* max sleep time between cycles (100ms) */
-
/*
* These variables are used similarly to openLogFile/SegNo,
* but for walreceiver to write the XLOG. recvFileTLI is the TimeLineID
@@ -115,6 +113,28 @@ static struct
XLogRecPtr Flush; /* last byte + 1 flushed in the standby */
} LogstreamResult;
+/*
+ * Reasons to wake up and perform periodic tasks.
+ */
+typedef enum WalRcvWakeupReason
+{
+ WALRCV_WAKEUP_TIMEOUT,
+ WALRCV_WAKEUP_PING,
+ WALRCV_WAKEUP_REPLY,
+ WALRCV_WAKEUP_HSFEEDBACK,
+ NUM_WALRCV_WAKEUPS
+} WalRcvWakeupReason;
+
+/*
+ * A struct to keep track of non-shared state.
+ */
+typedef struct WalRcvInfo
+{
+ TimeLineID startpointTLI;
+ bool primary_has_standby_xmin;
+ TimestampTz wakeup[NUM_WALRCV_WAKEUPS];
+} WalRcvInfo;
+
static StringInfoData reply_message;
static StringInfoData incoming_message;
@@ -122,15 +142,24 @@ static StringInfoData incoming_message;
static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last);
static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI);
static void WalRcvDie(int code, Datum arg);
-static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len,
+static void XLogWalRcvProcessMsg(WalRcvInfo *state,
+ unsigned char type, char *buf, Size len,
TimeLineID tli);
-static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr,
- TimeLineID tli);
-static void XLogWalRcvFlush(bool dying, TimeLineID tli);
-static void XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli);
-static void XLogWalRcvSendReply(bool force, bool requestReply);
-static void XLogWalRcvSendHSFeedback(bool immed);
+static void XLogWalRcvWrite(WalRcvInfo *state, char *buf, Size nbytes,
+ XLogRecPtr recptr, TimeLineID tli);
+static void XLogWalRcvFlush(WalRcvInfo *state, bool dying, TimeLineID tli);
+static void XLogWalRcvClose(WalRcvInfo *state, XLogRecPtr recptr, TimeLineID tli);
+static void XLogWalRcvSendReply(WalRcvInfo *state,
+ TimestampTz now,
+ bool force,
+ bool requestReply);
+static void XLogWalRcvSendHSFeedback(WalRcvInfo *state,
+ TimestampTz now,
+ bool immed);
static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
+static void WalRcvComputeNextWakeup(WalRcvInfo *state,
+ WalRcvWakeupReason reason,
+ TimestampTz now);
/*
* Process any interrupts the walreceiver process may have received.
@@ -164,7 +193,6 @@ ProcessWalRcvInterrupts(void)
}
}
-
/* Main entry point for walreceiver process */
void
WalReceiverMain(void)
@@ -174,16 +202,20 @@ WalReceiverMain(void)
char slotname[NAMEDATALEN];
bool is_temp_slot;
XLogRecPtr startpoint;
- TimeLineID startpointTLI;
TimeLineID primaryTLI;
bool first_stream;
WalRcvData *walrcv = WalRcv;
TimestampTz last_recv_timestamp;
TimestampTz now;
- bool ping_sent;
char *err;
char *sender_host = NULL;
int sender_port = 0;
+ WalRcvInfo state = {0};
+
+ now = GetCurrentTimestamp();
+
+ /* initially true so we always send at least one feedback message */
+ state.primary_has_standby_xmin = true;
/*
* WalRcv should be set up already (if we are a backend, we inherit this
@@ -191,7 +223,6 @@ WalReceiverMain(void)
*/
Assert(walrcv != NULL);
- now = GetCurrentTimestamp();
/*
* Mark walreceiver as running in shared memory.
@@ -237,7 +268,7 @@ WalReceiverMain(void)
strlcpy(slotname, (char *) walrcv->slotname, NAMEDATALEN);
is_temp_slot = walrcv->is_temp_slot;
startpoint = walrcv->receiveStart;
- startpointTLI = walrcv->receiveStartTLI;
+ state.startpointTLI = walrcv->receiveStartTLI;
/*
* At most one of is_temp_slot and slotname can be set; otherwise,
@@ -257,7 +288,7 @@ WalReceiverMain(void)
pg_atomic_write_u64(&WalRcv->writtenUpto, 0);
/* Arrange to clean up at walreceiver exit */
- on_shmem_exit(WalRcvDie, PointerGetDatum(&startpointTLI));
+ on_shmem_exit(WalRcvDie, PointerGetDatum(&state));
/* Properly accept or ignore signals the postmaster might send us */
pqsignal(SIGHUP, SignalHandlerForConfigReload); /* set flag to read config
@@ -344,11 +375,11 @@ WalReceiverMain(void)
* Confirm that the current timeline of the primary is the same or
* ahead of ours.
*/
- if (primaryTLI < startpointTLI)
+ if (primaryTLI < state.startpointTLI)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("highest timeline %u of the primary is behind recovery timeline %u",
- primaryTLI, startpointTLI)));
+ primaryTLI, state.startpointTLI)));
/*
* Get any missing history files. We do this always, even when we're
@@ -360,7 +391,7 @@ WalReceiverMain(void)
* but let's avoid the confusion of timeline id collisions where we
* can.
*/
- WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
+ WalRcvFetchTimeLineHistoryFiles(state.startpointTLI, primaryTLI);
/*
* Create temporary replication slot if requested, and update slot
@@ -395,17 +426,17 @@ WalReceiverMain(void)
options.logical = false;
options.startpoint = startpoint;
options.slotname = slotname[0] != '\0' ? slotname : NULL;
- options.proto.physical.startpointTLI = startpointTLI;
+ options.proto.physical.startpointTLI = state.startpointTLI;
if (walrcv_startstreaming(wrconn, &options))
{
if (first_stream)
ereport(LOG,
(errmsg("started streaming WAL from primary at %X/%X on timeline %u",
- LSN_FORMAT_ARGS(startpoint), startpointTLI)));
+ LSN_FORMAT_ARGS(startpoint), state.startpointTLI)));
else
ereport(LOG,
(errmsg("restarted WAL streaming at %X/%X on timeline %u",
- LSN_FORMAT_ARGS(startpoint), startpointTLI)));
+ LSN_FORMAT_ARGS(startpoint), state.startpointTLI)));
first_stream = false;
/* Initialize LogstreamResult and buffers for processing messages */
@@ -415,7 +446,10 @@ WalReceiverMain(void)
/* Initialize the last recv timestamp */
last_recv_timestamp = GetCurrentTimestamp();
- ping_sent = false;
+
+ /* Initialize nap wakeup times. */
+ for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
+ WalRcvComputeNextWakeup(&state, i, last_recv_timestamp);
/* Loop until end-of-streaming or error */
for (;;)
@@ -425,6 +459,8 @@ WalReceiverMain(void)
bool endofwal = false;
pgsocket wait_fd = PGINVALID_SOCKET;
int rc;
+ TimestampTz nextWakeup;
+ int nap;
/*
* Exit walreceiver if we're not in recovery. This should not
@@ -442,7 +478,11 @@ WalReceiverMain(void)
{
ConfigReloadPending = false;
ProcessConfigFile(PGC_SIGHUP);
- XLogWalRcvSendHSFeedback(true);
+ for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
+ WalRcvComputeNextWakeup(&state, i, now);
+ XLogWalRcvSendHSFeedback(&state,
+ GetCurrentTimestamp(),
+ true);
}
/* See if we can read data immediately */
@@ -458,13 +498,19 @@ WalReceiverMain(void)
if (len > 0)
{
/*
- * Something was received from primary, so reset
- * timeout
+ * Something was received from primary, so adjust
+ * the ping and timeout wakeup times.
*/
last_recv_timestamp = GetCurrentTimestamp();
- ping_sent = false;
- XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1,
- startpointTLI);
+ WalRcvComputeNextWakeup(&state,
+ WALRCV_WAKEUP_TIMEOUT,
+ last_recv_timestamp);
+ WalRcvComputeNextWakeup(&state,
+ WALRCV_WAKEUP_PING,
+ last_recv_timestamp);
+ XLogWalRcvProcessMsg(&state,
+ buf[0], &buf[1], len - 1,
+ state.startpointTLI);
}
else if (len == 0)
break;
@@ -473,7 +519,7 @@ WalReceiverMain(void)
ereport(LOG,
(errmsg("replication terminated by primary server"),
errdetail("End of WAL reached on timeline %u at %X/%X.",
- startpointTLI,
+ state.startpointTLI,
LSN_FORMAT_ARGS(LogstreamResult.Write))));
endofwal = true;
break;
@@ -482,20 +528,28 @@ WalReceiverMain(void)
}
/* Let the primary know that we received some data. */
- XLogWalRcvSendReply(false, false);
+ XLogWalRcvSendReply(&state, GetCurrentTimestamp(),
+ false, false);
/*
* If we've written some records, flush them to disk and
* let the startup process and primary server know about
* them.
*/
- XLogWalRcvFlush(false, startpointTLI);
+ XLogWalRcvFlush(&state, false, state.startpointTLI);
}
/* Check if we need to exit the streaming loop. */
if (endofwal)
break;
+ /* Find the soonest wakeup time, to limit our nap. */
+ now = GetCurrentTimestamp();
+ nextWakeup = INT64_MAX;
+ for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
+ nextWakeup = Min(state.wakeup[i], nextWakeup);
+ nap = Max(0, (nextWakeup - now) / 1000);
+
/*
* Ideally we would reuse a WaitEventSet object repeatedly
* here to avoid the overheads of WaitLatchOrSocket on epoll
@@ -512,7 +566,7 @@ WalReceiverMain(void)
WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE |
WL_TIMEOUT | WL_LATCH_SET,
wait_fd,
- NAPTIME_PER_CYCLE,
+ nap,
WAIT_EVENT_WAL_RECEIVER_MAIN);
if (rc & WL_LATCH_SET)
{
@@ -529,7 +583,7 @@ WalReceiverMain(void)
*/
walrcv->force_reply = false;
pg_memory_barrier();
- XLogWalRcvSendReply(true, false);
+ XLogWalRcvSendReply(&state, now, true, false);
}
}
if (rc & WL_TIMEOUT)
@@ -551,14 +605,7 @@ WalReceiverMain(void)
*/
if (wal_receiver_timeout > 0)
{
- TimestampTz now = GetCurrentTimestamp();
- TimestampTz timeout;
-
- timeout =
- TimestampTzPlusMilliseconds(last_recv_timestamp,
- wal_receiver_timeout);
-
- if (now >= timeout)
+ if (now >= state.wakeup[WALRCV_WAKEUP_TIMEOUT])
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("terminating walreceiver due to timeout")));
@@ -567,20 +614,15 @@ WalReceiverMain(void)
* We didn't receive anything new, for half of
* receiver replication timeout. Ping the server.
*/
- if (!ping_sent)
+ if (now >= state.wakeup[WALRCV_WAKEUP_PING])
{
- timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
- (wal_receiver_timeout / 2));
- if (now >= timeout)
- {
- requestReply = true;
- ping_sent = true;
- }
+ requestReply = true;
+ state.wakeup[WALRCV_WAKEUP_PING] = INT64_MAX;
}
}
- XLogWalRcvSendReply(requestReply, requestReply);
- XLogWalRcvSendHSFeedback(false);
+ XLogWalRcvSendReply(&state, now, requestReply, requestReply);
+ XLogWalRcvSendHSFeedback(&state, now, false);
}
}
@@ -595,12 +637,12 @@ WalReceiverMain(void)
* know about when we began streaming, fetch its timeline history
* file now.
*/
- WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
+ WalRcvFetchTimeLineHistoryFiles(state.startpointTLI, primaryTLI);
}
else
ereport(LOG,
(errmsg("primary server contains no more WAL on requested timeline %u",
- startpointTLI)));
+ state.startpointTLI)));
/*
* End of WAL reached on the requested timeline. Close the last
@@ -610,7 +652,7 @@ WalReceiverMain(void)
{
char xlogfname[MAXFNAMELEN];
- XLogWalRcvFlush(false, startpointTLI);
+ XLogWalRcvFlush(&state, false, state.startpointTLI);
XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size);
if (close(recvFile) != 0)
ereport(PANIC,
@@ -630,7 +672,7 @@ WalReceiverMain(void)
recvFile = -1;
elog(DEBUG1, "walreceiver ended streaming and awaits new instructions");
- WalRcvWaitForStartPosition(&startpoint, &startpointTLI);
+ WalRcvWaitForStartPosition(&startpoint, &state.startpointTLI);
}
/* not reached */
}
@@ -778,12 +820,10 @@ static void
WalRcvDie(int code, Datum arg)
{
WalRcvData *walrcv = WalRcv;
- TimeLineID *startpointTLI_p = (TimeLineID *) DatumGetPointer(arg);
-
- Assert(*startpointTLI_p != 0);
+ WalRcvInfo *state = (WalRcvInfo *) DatumGetPointer(arg);
/* Ensure that all WAL records received are flushed to disk */
- XLogWalRcvFlush(true, *startpointTLI_p);
+ XLogWalRcvFlush(state, true, state->startpointTLI);
/* Mark ourselves inactive in shared memory */
SpinLockAcquire(&walrcv->mutex);
@@ -813,7 +853,8 @@ WalRcvDie(int code, Datum arg)
* Accept the message from XLOG stream, and process it.
*/
static void
-XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli)
+XLogWalRcvProcessMsg(WalRcvInfo *state,
+ unsigned char type, char *buf, Size len, TimeLineID tli)
{
int hdrlen;
XLogRecPtr dataStart;
@@ -843,7 +884,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli)
buf += hdrlen;
len -= hdrlen;
- XLogWalRcvWrite(buf, len, dataStart, tli);
+ XLogWalRcvWrite(state, buf, len, dataStart, tli);
break;
}
case 'k': /* Keepalive */
@@ -865,7 +906,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli)
/* If the primary requested a reply, send one immediately */
if (replyRequested)
- XLogWalRcvSendReply(true, false);
+ XLogWalRcvSendReply(state, GetCurrentTimestamp(), true, false);
break;
}
default:
@@ -880,7 +921,8 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli)
* Write XLOG data to disk.
*/
static void
-XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli)
+XLogWalRcvWrite(WalRcvInfo *state, char *buf, Size nbytes, XLogRecPtr recptr,
+ TimeLineID tli)
{
int startoff;
int byteswritten;
@@ -893,7 +935,7 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli)
/* Close the current segment if it's completed */
if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
- XLogWalRcvClose(recptr, tli);
+ XLogWalRcvClose(state, recptr, tli);
if (recvFile < 0)
{
@@ -953,7 +995,7 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli)
* segment is received and written.
*/
if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
- XLogWalRcvClose(recptr, tli);
+ XLogWalRcvClose(state, recptr, tli);
}
/*
@@ -963,7 +1005,7 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli)
* an error, so we skip sending a reply in that case.
*/
static void
-XLogWalRcvFlush(bool dying, TimeLineID tli)
+XLogWalRcvFlush(WalRcvInfo *state, bool dying, TimeLineID tli)
{
Assert(tli != 0);
@@ -1003,8 +1045,8 @@ XLogWalRcvFlush(bool dying, TimeLineID tli)
/* Also let the primary know that we made some progress */
if (!dying)
{
- XLogWalRcvSendReply(false, false);
- XLogWalRcvSendHSFeedback(false);
+ XLogWalRcvSendReply(state, GetCurrentTimestamp(), false, false);
+ XLogWalRcvSendHSFeedback(state, GetCurrentTimestamp(), false);
}
}
}
@@ -1018,7 +1060,7 @@ XLogWalRcvFlush(bool dying, TimeLineID tli)
* Create an archive notification file since the segment is known completed.
*/
static void
-XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli)
+XLogWalRcvClose(WalRcvInfo *state, XLogRecPtr recptr, TimeLineID tli)
{
char xlogfname[MAXFNAMELEN];
@@ -1029,7 +1071,7 @@ XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli)
* fsync() and close current file before we switch to next one. We would
* otherwise have to reopen this file to fsync it later
*/
- XLogWalRcvFlush(false, tli);
+ XLogWalRcvFlush(state, false, tli);
XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size);
@@ -1070,13 +1112,12 @@ XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli)
* wal_receiver_timeout.
*/
static void
-XLogWalRcvSendReply(bool force, bool requestReply)
+XLogWalRcvSendReply(WalRcvInfo *state, TimestampTz now, bool force,
+ bool requestReply)
{
static XLogRecPtr writePtr = 0;
static XLogRecPtr flushPtr = 0;
XLogRecPtr applyPtr;
- static TimestampTz sendTime = 0;
- TimestampTz now;
/*
* If the user doesn't want status to be reported to the primary, be sure
@@ -1085,9 +1126,6 @@ XLogWalRcvSendReply(bool force, bool requestReply)
if (!force && wal_receiver_status_interval <= 0)
return;
- /* Get current timestamp. */
- now = GetCurrentTimestamp();
-
/*
* We can compare the write and flush positions to the last message we
* sent without taking any lock, but the apply position requires a spin
@@ -1100,10 +1138,11 @@ XLogWalRcvSendReply(bool force, bool requestReply)
if (!force
&& writePtr == LogstreamResult.Write
&& flushPtr == LogstreamResult.Flush
- && !TimestampDifferenceExceeds(sendTime, now,
- wal_receiver_status_interval * 1000))
+ && now < state->wakeup[WALRCV_WAKEUP_REPLY])
return;
- sendTime = now;
+
+ /* Make sure we wake up when it's time to send another reply. */
+ WalRcvComputeNextWakeup(state, WALRCV_WAKEUP_REPLY, now);
/* Construct a new message */
writePtr = LogstreamResult.Write;
@@ -1139,41 +1178,29 @@ XLogWalRcvSendReply(bool force, bool requestReply)
* send a feedback message explicitly setting InvalidTransactionId).
*/
static void
-XLogWalRcvSendHSFeedback(bool immed)
+XLogWalRcvSendHSFeedback(WalRcvInfo *state, TimestampTz now, bool immed)
{
- TimestampTz now;
FullTransactionId nextFullXid;
TransactionId nextXid;
uint32 xmin_epoch,
catalog_xmin_epoch;
TransactionId xmin,
catalog_xmin;
- static TimestampTz sendTime = 0;
-
- /* initially true so we always send at least one feedback message */
- static bool primary_has_standby_xmin = true;
/*
* If the user doesn't want status to be reported to the primary, be sure
* to exit before doing anything at all.
*/
if ((wal_receiver_status_interval <= 0 || !hot_standby_feedback) &&
- !primary_has_standby_xmin)
+ !state->primary_has_standby_xmin)
return;
- /* Get current timestamp. */
- now = GetCurrentTimestamp();
+ /* Send feedback at most once per wal_receiver_status_interval. */
+ if (!immed && now < state->wakeup[WALRCV_WAKEUP_HSFEEDBACK])
+ return;
- if (!immed)
- {
- /*
- * Send feedback at most once per wal_receiver_status_interval.
- */
- if (!TimestampDifferenceExceeds(sendTime, now,
- wal_receiver_status_interval * 1000))
- return;
- sendTime = now;
- }
+ /* Make sure we wake up when it's time to send feedback again. */
+ WalRcvComputeNextWakeup(state, WALRCV_WAKEUP_HSFEEDBACK, now);
/*
* If Hot Standby is not yet accepting connections there is nothing to
@@ -1228,9 +1255,9 @@ XLogWalRcvSendHSFeedback(bool immed)
pq_sendint32(&reply_message, catalog_xmin_epoch);
walrcv_send(wrconn, reply_message.data, reply_message.len);
if (TransactionIdIsValid(xmin) || TransactionIdIsValid(catalog_xmin))
- primary_has_standby_xmin = true;
+ state->primary_has_standby_xmin = true;
else
- primary_has_standby_xmin = false;
+ state->primary_has_standby_xmin = false;
}
/*
@@ -1284,6 +1311,37 @@ ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
}
}
+static void
+WalRcvComputeNextWakeup(WalRcvInfo *state,
+ WalRcvWakeupReason reason,
+ TimestampTz now)
+{
+ switch (reason)
+ {
+ case WALRCV_WAKEUP_TIMEOUT:
+ state->wakeup[reason] = now + wal_receiver_timeout * 1000;
+ break;
+ case WALRCV_WAKEUP_PING:
+ state->wakeup[reason] = now + (wal_receiver_timeout / 2) * 1000;
+ break;
+ case WALRCV_WAKEUP_HSFEEDBACK:
+ if (!hot_standby_feedback)
+ {
+ state->wakeup[reason] = INT64_MAX;
+ break;
+ }
+ /* Fall through */
+ case WALRCV_WAKEUP_REPLY:
+ if (wal_receiver_status_interval <= 0)
+ state->wakeup[reason] = INT64_MAX;
+ else
+ state->wakeup[reason] = now + wal_receiver_status_interval * 1000000;
+ break;
+ default:
+ ;
+ }
+}
+
/*
* Wake up the walreceiver main loop.
*
--
2.30.2