Hi,

While working on WaitEventSet-ifying various codepaths, I found it
strange that walreceiver wakes up 10 times per second while idle.
Here's a draft patch to compute the correct sleep time.
From 553d2dae8f8e7bb46ac73ca739aac03862f473cc Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.mu...@gmail.com>
Date: Thu, 27 Jan 2022 21:43:17 +1300
Subject: [PATCH] Suppress useless wakeups in walreceiver.

Instead of waking up 10 times per second to check for various timeout
conditions, keep track of when we next have periodic work to do.
---
 src/backend/replication/walreceiver.c | 258 ++++++++++++++++----------
 1 file changed, 158 insertions(+), 100 deletions(-)

diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index b39fce8c23..55ac67f35a 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -94,8 +94,6 @@ bool		hot_standby_feedback;
 static WalReceiverConn *wrconn = NULL;
 WalReceiverFunctionsType *WalReceiverFunctions = NULL;
 
-#define NAPTIME_PER_CYCLE 100	/* max sleep time between cycles (100ms) */
-
 /*
  * These variables are used similarly to openLogFile/SegNo,
  * but for walreceiver to write the XLOG. recvFileTLI is the TimeLineID
@@ -115,6 +113,28 @@ static struct
 	XLogRecPtr	Flush;			/* last byte + 1 flushed in the standby */
 }			LogstreamResult;
 
+/*
+ * Reasons to wake up and perform periodic tasks.
+ */
+typedef enum WalRcvWakeupReason
+{
+	WALRCV_WAKEUP_TIMEOUT,
+	WALRCV_WAKEUP_PING,
+	WALRCV_WAKEUP_REPLY,
+	WALRCV_WAKEUP_HSFEEDBACK,
+	NUM_WALRCV_WAKEUPS
+} WalRcvWakeupReason;
+
+/*
+ * A struct to keep track of non-shared state.
+ */
+typedef struct WalRcvInfo
+{
+	TimeLineID	startpointTLI;
+	bool		primary_has_standby_xmin;
+	TimestampTz	wakeup[NUM_WALRCV_WAKEUPS];
+} WalRcvInfo;
+
 static StringInfoData reply_message;
 static StringInfoData incoming_message;
 
@@ -122,15 +142,24 @@ static StringInfoData incoming_message;
 static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last);
 static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI);
 static void WalRcvDie(int code, Datum arg);
-static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len,
+static void XLogWalRcvProcessMsg(WalRcvInfo *state,
+								 unsigned char type, char *buf, Size len,
 								 TimeLineID tli);
-static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr,
-							TimeLineID tli);
-static void XLogWalRcvFlush(bool dying, TimeLineID tli);
-static void XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli);
-static void XLogWalRcvSendReply(bool force, bool requestReply);
-static void XLogWalRcvSendHSFeedback(bool immed);
+static void XLogWalRcvWrite(WalRcvInfo *state, char *buf, Size nbytes,
+							XLogRecPtr recptr, TimeLineID tli);
+static void XLogWalRcvFlush(WalRcvInfo *state, bool dying, TimeLineID tli);
+static void XLogWalRcvClose(WalRcvInfo *state, XLogRecPtr recptr, TimeLineID tli);
+static void XLogWalRcvSendReply(WalRcvInfo *state,
+								TimestampTz now,
+								bool force,
+								bool requestReply);
+static void XLogWalRcvSendHSFeedback(WalRcvInfo *state,
+									 TimestampTz now,
+									 bool immed);
 static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
+static void WalRcvComputeNextWakeup(WalRcvInfo *state,
+									WalRcvWakeupReason reason,
+									TimestampTz now);
 
 /*
  * Process any interrupts the walreceiver process may have received.
@@ -164,7 +193,6 @@ ProcessWalRcvInterrupts(void)
 	}
 }
 
-
 /* Main entry point for walreceiver process */
 void
 WalReceiverMain(void)
@@ -174,16 +202,20 @@ WalReceiverMain(void)
 	char		slotname[NAMEDATALEN];
 	bool		is_temp_slot;
 	XLogRecPtr	startpoint;
-	TimeLineID	startpointTLI;
 	TimeLineID	primaryTLI;
 	bool		first_stream;
 	WalRcvData *walrcv = WalRcv;
 	TimestampTz last_recv_timestamp;
 	TimestampTz now;
-	bool		ping_sent;
 	char	   *err;
 	char	   *sender_host = NULL;
 	int			sender_port = 0;
+	WalRcvInfo state = {0};
+
+	now = GetCurrentTimestamp();
+
+	/* initially true so we always send at least one feedback message */
+	state.primary_has_standby_xmin = true;
 
 	/*
 	 * WalRcv should be set up already (if we are a backend, we inherit this
@@ -191,7 +223,6 @@ WalReceiverMain(void)
 	 */
 	Assert(walrcv != NULL);
 
-	now = GetCurrentTimestamp();
 
 	/*
 	 * Mark walreceiver as running in shared memory.
@@ -237,7 +268,7 @@ WalReceiverMain(void)
 	strlcpy(slotname, (char *) walrcv->slotname, NAMEDATALEN);
 	is_temp_slot = walrcv->is_temp_slot;
 	startpoint = walrcv->receiveStart;
-	startpointTLI = walrcv->receiveStartTLI;
+	state.startpointTLI = walrcv->receiveStartTLI;
 
 	/*
 	 * At most one of is_temp_slot and slotname can be set; otherwise,
@@ -257,7 +288,7 @@ WalReceiverMain(void)
 	pg_atomic_write_u64(&WalRcv->writtenUpto, 0);
 
 	/* Arrange to clean up at walreceiver exit */
-	on_shmem_exit(WalRcvDie, PointerGetDatum(&startpointTLI));
+	on_shmem_exit(WalRcvDie, PointerGetDatum(&state));
 
 	/* Properly accept or ignore signals the postmaster might send us */
 	pqsignal(SIGHUP, SignalHandlerForConfigReload); /* set flag to read config
@@ -344,11 +375,11 @@ WalReceiverMain(void)
 		 * Confirm that the current timeline of the primary is the same or
 		 * ahead of ours.
 		 */
-		if (primaryTLI < startpointTLI)
+		if (primaryTLI < state.startpointTLI)
 			ereport(ERROR,
 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 					 errmsg("highest timeline %u of the primary is behind recovery timeline %u",
-							primaryTLI, startpointTLI)));
+							primaryTLI, state.startpointTLI)));
 
 		/*
 		 * Get any missing history files. We do this always, even when we're
@@ -360,7 +391,7 @@ WalReceiverMain(void)
 		 * but let's avoid the confusion of timeline id collisions where we
 		 * can.
 		 */
-		WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
+		WalRcvFetchTimeLineHistoryFiles(state.startpointTLI, primaryTLI);
 
 		/*
 		 * Create temporary replication slot if requested, and update slot
@@ -395,17 +426,17 @@ WalReceiverMain(void)
 		options.logical = false;
 		options.startpoint = startpoint;
 		options.slotname = slotname[0] != '\0' ? slotname : NULL;
-		options.proto.physical.startpointTLI = startpointTLI;
+		options.proto.physical.startpointTLI = state.startpointTLI;
 		if (walrcv_startstreaming(wrconn, &options))
 		{
 			if (first_stream)
 				ereport(LOG,
 						(errmsg("started streaming WAL from primary at %X/%X on timeline %u",
-								LSN_FORMAT_ARGS(startpoint), startpointTLI)));
+								LSN_FORMAT_ARGS(startpoint), state.startpointTLI)));
 			else
 				ereport(LOG,
 						(errmsg("restarted WAL streaming at %X/%X on timeline %u",
-								LSN_FORMAT_ARGS(startpoint), startpointTLI)));
+								LSN_FORMAT_ARGS(startpoint), state.startpointTLI)));
 			first_stream = false;
 
 			/* Initialize LogstreamResult and buffers for processing messages */
@@ -415,7 +446,10 @@ WalReceiverMain(void)
 
 			/* Initialize the last recv timestamp */
 			last_recv_timestamp = GetCurrentTimestamp();
-			ping_sent = false;
+
+			/* Initialize nap wakeup times. */
+			for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
+				WalRcvComputeNextWakeup(&state, i, last_recv_timestamp);
 
 			/* Loop until end-of-streaming or error */
 			for (;;)
@@ -425,6 +459,8 @@ WalReceiverMain(void)
 				bool		endofwal = false;
 				pgsocket	wait_fd = PGINVALID_SOCKET;
 				int			rc;
+				TimestampTz	nextWakeup;
+				int			nap;
 
 				/*
 				 * Exit walreceiver if we're not in recovery. This should not
@@ -442,7 +478,11 @@ WalReceiverMain(void)
 				{
 					ConfigReloadPending = false;
 					ProcessConfigFile(PGC_SIGHUP);
-					XLogWalRcvSendHSFeedback(true);
+					for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
+						WalRcvComputeNextWakeup(&state, i, now);
+					XLogWalRcvSendHSFeedback(&state,
+											 GetCurrentTimestamp(),
+											 true);
 				}
 
 				/* See if we can read data immediately */
@@ -458,13 +498,19 @@ WalReceiverMain(void)
 						if (len > 0)
 						{
 							/*
-							 * Something was received from primary, so reset
-							 * timeout
+							 * Something was received from primary, so adjust
+							 * the ping and timeout wakeup times.
 							 */
 							last_recv_timestamp = GetCurrentTimestamp();
-							ping_sent = false;
-							XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1,
-												 startpointTLI);
+							WalRcvComputeNextWakeup(&state,
+													WALRCV_WAKEUP_TIMEOUT,
+													last_recv_timestamp);
+							WalRcvComputeNextWakeup(&state,
+													WALRCV_WAKEUP_PING,
+													last_recv_timestamp);
+							XLogWalRcvProcessMsg(&state,
+												 buf[0], &buf[1], len - 1,
+												 state.startpointTLI);
 						}
 						else if (len == 0)
 							break;
@@ -473,7 +519,7 @@ WalReceiverMain(void)
 							ereport(LOG,
 									(errmsg("replication terminated by primary server"),
 									 errdetail("End of WAL reached on timeline %u at %X/%X.",
-											   startpointTLI,
+											   state.startpointTLI,
 											   LSN_FORMAT_ARGS(LogstreamResult.Write))));
 							endofwal = true;
 							break;
@@ -482,20 +528,28 @@ WalReceiverMain(void)
 					}
 
 					/* Let the primary know that we received some data. */
-					XLogWalRcvSendReply(false, false);
+					XLogWalRcvSendReply(&state, GetCurrentTimestamp(),
+										false, false);
 
 					/*
 					 * If we've written some records, flush them to disk and
 					 * let the startup process and primary server know about
 					 * them.
 					 */
-					XLogWalRcvFlush(false, startpointTLI);
+					XLogWalRcvFlush(&state, false, state.startpointTLI);
 				}
 
 				/* Check if we need to exit the streaming loop. */
 				if (endofwal)
 					break;
 
+				/* Find the soonest wakeup time, to limit our nap. */
+				now = GetCurrentTimestamp();
+				nextWakeup = INT64_MAX;
+				for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
+					nextWakeup = Min(state.wakeup[i], nextWakeup);
+				nap = Max(0, (nextWakeup - now) / 1000);
+
 				/*
 				 * Ideally we would reuse a WaitEventSet object repeatedly
 				 * here to avoid the overheads of WaitLatchOrSocket on epoll
@@ -512,7 +566,7 @@ WalReceiverMain(void)
 									   WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE |
 									   WL_TIMEOUT | WL_LATCH_SET,
 									   wait_fd,
-									   NAPTIME_PER_CYCLE,
+									   nap,
 									   WAIT_EVENT_WAL_RECEIVER_MAIN);
 				if (rc & WL_LATCH_SET)
 				{
@@ -529,7 +583,7 @@ WalReceiverMain(void)
 						 */
 						walrcv->force_reply = false;
 						pg_memory_barrier();
-						XLogWalRcvSendReply(true, false);
+						XLogWalRcvSendReply(&state, now, true, false);
 					}
 				}
 				if (rc & WL_TIMEOUT)
@@ -551,14 +605,7 @@ WalReceiverMain(void)
 					 */
 					if (wal_receiver_timeout > 0)
 					{
-						TimestampTz now = GetCurrentTimestamp();
-						TimestampTz timeout;
-
-						timeout =
-							TimestampTzPlusMilliseconds(last_recv_timestamp,
-														wal_receiver_timeout);
-
-						if (now >= timeout)
+						if (now >= state.wakeup[WALRCV_WAKEUP_TIMEOUT])
 							ereport(ERROR,
 									(errcode(ERRCODE_CONNECTION_FAILURE),
 									 errmsg("terminating walreceiver due to timeout")));
@@ -567,20 +614,15 @@ WalReceiverMain(void)
 						 * We didn't receive anything new, for half of
 						 * receiver replication timeout. Ping the server.
 						 */
-						if (!ping_sent)
+						if (now >= state.wakeup[WALRCV_WAKEUP_PING])
 						{
-							timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
-																  (wal_receiver_timeout / 2));
-							if (now >= timeout)
-							{
-								requestReply = true;
-								ping_sent = true;
-							}
+							requestReply = true;
+							state.wakeup[WALRCV_WAKEUP_PING] = INT64_MAX;
 						}
 					}
 
-					XLogWalRcvSendReply(requestReply, requestReply);
-					XLogWalRcvSendHSFeedback(false);
+					XLogWalRcvSendReply(&state, now, requestReply, requestReply);
+					XLogWalRcvSendHSFeedback(&state, now, false);
 				}
 			}
 
@@ -595,12 +637,12 @@ WalReceiverMain(void)
 			 * know about when we began streaming, fetch its timeline history
 			 * file now.
 			 */
-			WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
+			WalRcvFetchTimeLineHistoryFiles(state.startpointTLI, primaryTLI);
 		}
 		else
 			ereport(LOG,
 					(errmsg("primary server contains no more WAL on requested timeline %u",
-							startpointTLI)));
+							state.startpointTLI)));
 
 		/*
 		 * End of WAL reached on the requested timeline. Close the last
@@ -610,7 +652,7 @@ WalReceiverMain(void)
 		{
 			char		xlogfname[MAXFNAMELEN];
 
-			XLogWalRcvFlush(false, startpointTLI);
+			XLogWalRcvFlush(&state, false, state.startpointTLI);
 			XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size);
 			if (close(recvFile) != 0)
 				ereport(PANIC,
@@ -630,7 +672,7 @@ WalReceiverMain(void)
 		recvFile = -1;
 
 		elog(DEBUG1, "walreceiver ended streaming and awaits new instructions");
-		WalRcvWaitForStartPosition(&startpoint, &startpointTLI);
+		WalRcvWaitForStartPosition(&startpoint, &state.startpointTLI);
 	}
 	/* not reached */
 }
@@ -778,12 +820,10 @@ static void
 WalRcvDie(int code, Datum arg)
 {
 	WalRcvData *walrcv = WalRcv;
-	TimeLineID *startpointTLI_p = (TimeLineID *) DatumGetPointer(arg);
-
-	Assert(*startpointTLI_p != 0);
+	WalRcvInfo *state = (WalRcvInfo *) DatumGetPointer(arg);
 
 	/* Ensure that all WAL records received are flushed to disk */
-	XLogWalRcvFlush(true, *startpointTLI_p);
+	XLogWalRcvFlush(state, true, state->startpointTLI);
 
 	/* Mark ourselves inactive in shared memory */
 	SpinLockAcquire(&walrcv->mutex);
@@ -813,7 +853,8 @@ WalRcvDie(int code, Datum arg)
  * Accept the message from XLOG stream, and process it.
  */
 static void
-XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli)
+XLogWalRcvProcessMsg(WalRcvInfo *state,
+					 unsigned char type, char *buf, Size len, TimeLineID tli)
 {
 	int			hdrlen;
 	XLogRecPtr	dataStart;
@@ -843,7 +884,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli)
 
 				buf += hdrlen;
 				len -= hdrlen;
-				XLogWalRcvWrite(buf, len, dataStart, tli);
+				XLogWalRcvWrite(state, buf, len, dataStart, tli);
 				break;
 			}
 		case 'k':				/* Keepalive */
@@ -865,7 +906,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli)
 
 				/* If the primary requested a reply, send one immediately */
 				if (replyRequested)
-					XLogWalRcvSendReply(true, false);
+					XLogWalRcvSendReply(state, GetCurrentTimestamp(), true, false);
 				break;
 			}
 		default:
@@ -880,7 +921,8 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli)
  * Write XLOG data to disk.
  */
 static void
-XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli)
+XLogWalRcvWrite(WalRcvInfo *state, char *buf, Size nbytes, XLogRecPtr recptr,
+				TimeLineID tli)
 {
 	int			startoff;
 	int			byteswritten;
@@ -893,7 +935,7 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli)
 
 		/* Close the current segment if it's completed */
 		if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
-			XLogWalRcvClose(recptr, tli);
+			XLogWalRcvClose(state, recptr, tli);
 
 		if (recvFile < 0)
 		{
@@ -953,7 +995,7 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli)
 	 * segment is received and written.
 	 */
 	if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
-		XLogWalRcvClose(recptr, tli);
+		XLogWalRcvClose(state, recptr, tli);
 }
 
 /*
@@ -963,7 +1005,7 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli)
  * an error, so we skip sending a reply in that case.
  */
 static void
-XLogWalRcvFlush(bool dying, TimeLineID tli)
+XLogWalRcvFlush(WalRcvInfo *state, bool dying, TimeLineID tli)
 {
 	Assert(tli != 0);
 
@@ -1003,8 +1045,8 @@ XLogWalRcvFlush(bool dying, TimeLineID tli)
 		/* Also let the primary know that we made some progress */
 		if (!dying)
 		{
-			XLogWalRcvSendReply(false, false);
-			XLogWalRcvSendHSFeedback(false);
+			XLogWalRcvSendReply(state, GetCurrentTimestamp(), false, false);
+			XLogWalRcvSendHSFeedback(state, GetCurrentTimestamp(), false);
 		}
 	}
 }
@@ -1018,7 +1060,7 @@ XLogWalRcvFlush(bool dying, TimeLineID tli)
  * Create an archive notification file since the segment is known completed.
  */
 static void
-XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli)
+XLogWalRcvClose(WalRcvInfo *state, XLogRecPtr recptr, TimeLineID tli)
 {
 	char		xlogfname[MAXFNAMELEN];
 
@@ -1029,7 +1071,7 @@ XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli)
 	 * fsync() and close current file before we switch to next one. We would
 	 * otherwise have to reopen this file to fsync it later
 	 */
-	XLogWalRcvFlush(false, tli);
+	XLogWalRcvFlush(state, false, tli);
 
 	XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size);
 
@@ -1070,13 +1112,12 @@ XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli)
  * wal_receiver_timeout.
  */
 static void
-XLogWalRcvSendReply(bool force, bool requestReply)
+XLogWalRcvSendReply(WalRcvInfo *state, TimestampTz now, bool force,
+					bool requestReply)
 {
 	static XLogRecPtr writePtr = 0;
 	static XLogRecPtr flushPtr = 0;
 	XLogRecPtr	applyPtr;
-	static TimestampTz sendTime = 0;
-	TimestampTz now;
 
 	/*
 	 * If the user doesn't want status to be reported to the primary, be sure
@@ -1085,9 +1126,6 @@ XLogWalRcvSendReply(bool force, bool requestReply)
 	if (!force && wal_receiver_status_interval <= 0)
 		return;
 
-	/* Get current timestamp. */
-	now = GetCurrentTimestamp();
-
 	/*
 	 * We can compare the write and flush positions to the last message we
 	 * sent without taking any lock, but the apply position requires a spin
@@ -1100,10 +1138,11 @@ XLogWalRcvSendReply(bool force, bool requestReply)
 	if (!force
 		&& writePtr == LogstreamResult.Write
 		&& flushPtr == LogstreamResult.Flush
-		&& !TimestampDifferenceExceeds(sendTime, now,
-									   wal_receiver_status_interval * 1000))
+		&& now < state->wakeup[WALRCV_WAKEUP_REPLY])
 		return;
-	sendTime = now;
+
+	/* Make sure we wake up when it's time to send another reply. */
+	WalRcvComputeNextWakeup(state, WALRCV_WAKEUP_REPLY, now);
 
 	/* Construct a new message */
 	writePtr = LogstreamResult.Write;
@@ -1139,41 +1178,29 @@ XLogWalRcvSendReply(bool force, bool requestReply)
  * send a feedback message explicitly setting InvalidTransactionId).
  */
 static void
-XLogWalRcvSendHSFeedback(bool immed)
+XLogWalRcvSendHSFeedback(WalRcvInfo *state, TimestampTz now, bool immed)
 {
-	TimestampTz now;
 	FullTransactionId nextFullXid;
 	TransactionId nextXid;
 	uint32		xmin_epoch,
 				catalog_xmin_epoch;
 	TransactionId xmin,
 				catalog_xmin;
-	static TimestampTz sendTime = 0;
-
-	/* initially true so we always send at least one feedback message */
-	static bool primary_has_standby_xmin = true;
 
 	/*
 	 * If the user doesn't want status to be reported to the primary, be sure
 	 * to exit before doing anything at all.
 	 */
 	if ((wal_receiver_status_interval <= 0 || !hot_standby_feedback) &&
-		!primary_has_standby_xmin)
+		!state->primary_has_standby_xmin)
 		return;
 
-	/* Get current timestamp. */
-	now = GetCurrentTimestamp();
+	/* Send feedback at most once per wal_receiver_status_interval. */
+	if (!immed && now < state->wakeup[WALRCV_WAKEUP_HSFEEDBACK])
+		return;
 
-	if (!immed)
-	{
-		/*
-		 * Send feedback at most once per wal_receiver_status_interval.
-		 */
-		if (!TimestampDifferenceExceeds(sendTime, now,
-										wal_receiver_status_interval * 1000))
-			return;
-		sendTime = now;
-	}
+	/* Make sure we wake up when it's time to send feedback again. */
+	WalRcvComputeNextWakeup(state, WALRCV_WAKEUP_HSFEEDBACK, now);
 
 	/*
 	 * If Hot Standby is not yet accepting connections there is nothing to
@@ -1228,9 +1255,9 @@ XLogWalRcvSendHSFeedback(bool immed)
 	pq_sendint32(&reply_message, catalog_xmin_epoch);
 	walrcv_send(wrconn, reply_message.data, reply_message.len);
 	if (TransactionIdIsValid(xmin) || TransactionIdIsValid(catalog_xmin))
-		primary_has_standby_xmin = true;
+		state->primary_has_standby_xmin = true;
 	else
-		primary_has_standby_xmin = false;
+		state->primary_has_standby_xmin = false;
 }
 
 /*
@@ -1284,6 +1311,37 @@ ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
 	}
 }
 
+static void
+WalRcvComputeNextWakeup(WalRcvInfo *state,
+						WalRcvWakeupReason reason,
+						TimestampTz now)
+{
+	switch (reason)
+	{
+	case WALRCV_WAKEUP_TIMEOUT:
+		state->wakeup[reason] = now + wal_receiver_timeout * 1000;
+		break;
+	case WALRCV_WAKEUP_PING:
+		state->wakeup[reason] = now + (wal_receiver_timeout / 2) * 1000;
+		break;
+	case WALRCV_WAKEUP_HSFEEDBACK:
+		if (!hot_standby_feedback)
+		{
+			state->wakeup[reason] = INT64_MAX;
+			break;
+		}
+		/* Fall through */
+	case WALRCV_WAKEUP_REPLY:
+		if (wal_receiver_status_interval <= 0)
+			state->wakeup[reason] = INT64_MAX;
+		else
+			state->wakeup[reason] = now + wal_receiver_status_interval * 1000000;
+		break;
+	default:
+		;
+	}
+}
+
 /*
  * Wake up the walreceiver main loop.
  *
-- 
2.30.2

Reply via email to