Hi hackers,

I've attached an attempt at porting a similar change to 05a7be9 [0] to
logical/worker.c.  The bulk of the patch is lifted from the walreceiver
patch, but I did need to add a hack for waking up after
wal_retrieve_retry_interval to start sync workers.  This hack involves a
new wakeup variable that process_syncing_tables_for_apply() sets.

For best results, this patch should be applied on top of [1], which is an
attempt at fixing all the stuff that only runs within a reasonable
timeframe because logical worker processes currently wake up at least once
a second.  With the attached patch applied, those periodic wakeups are
gone, so we need to make sure we wake up the logical workers as needed.

[0] 
https://postgr.es/m/CA%2BhUKGJGhX4r2LPUE3Oy9BX71Eum6PBcS8L3sJpScR9oKaTVaA%40mail.gmail.com
[1] https://postgr.es/m/20221122004119.GA132961%40nathanxps13

-- 
Nathan Bossart
Amazon Web Services: https://aws.amazon.com
>From d1af5b4b3073984fba1e5e86b9c034a96b9e235a Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nathandboss...@gmail.com>
Date: Thu, 1 Dec 2022 20:50:21 -0800
Subject: [PATCH v1 1/1] suppress unnecessary wakeups in logical/worker.c

---
 src/backend/replication/logical/tablesync.c |  20 +++
 src/backend/replication/logical/worker.c    | 156 +++++++++++++++-----
 src/include/replication/worker_internal.h   |   3 +
 3 files changed, 142 insertions(+), 37 deletions(-)

diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 94e813ac53..168a2da3a1 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -418,6 +418,13 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 
 	Assert(!IsTransactionState());
 
+	/*
+	 * If we've made it past our previously-stored special wakeup time, reset
+	 * it so that it can be recalculated as needed.
+	 */
+	if (next_sync_start <= GetCurrentTimestamp())
+		next_sync_start = PG_INT64_MAX;
+
 	/* We need up-to-date sync state info for subscription tables here. */
 	FetchTableStates(&started_tx);
 
@@ -612,6 +619,19 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 												 rstate->relid);
 						hentry->last_start_time = now;
 					}
+					else if (found)
+					{
+						TimestampTz retry_time = hentry->last_start_time +
+												 (wal_retrieve_retry_interval *
+												  INT64CONST(1000));
+
+						/*
+						 * Store when we can start the sync worker so that we
+						 * know how long to sleep.
+						 */
+						if (retry_time < next_sync_start)
+							next_sync_start = retry_time;
+					}
 				}
 			}
 		}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index f9efe6c4c6..03dc42ceee 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -196,8 +196,6 @@
 #include "utils/syscache.h"
 #include "utils/timeout.h"
 
-#define NAPTIME_PER_CYCLE 1000	/* max sleep time between cycles (1s) */
-
 typedef struct FlushPosition
 {
 	dlist_node	node;
@@ -279,6 +277,26 @@ static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr;
 /* BufFile handle of the current streaming file */
 static BufFile *stream_fd = NULL;
 
+/*
+ * Reasons to wake up and perform periodic tasks.
+ */
+typedef enum LogRepWorkerWakeupReason
+{
+	LRW_WAKEUP_TERMINATE,
+	LRW_WAKEUP_PING,
+	LRW_WAKEUP_STATUS,
+	NUM_LRW_WAKEUPS
+} LogRepWorkerWakeupReason;
+
+/*
+ * Wake up times for periodic tasks.
+ */
+static TimestampTz wakeup[NUM_LRW_WAKEUPS];
+TimestampTz next_sync_start;
+
+static void LogRepWorkerComputeNextWakeup(LogRepWorkerWakeupReason reason,
+										  TimestampTz now);
+
 typedef struct SubXactInfo
 {
 	TransactionId xid;			/* XID of the subxact */
@@ -2708,10 +2726,9 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
 static void
 LogicalRepApplyLoop(XLogRecPtr last_received)
 {
-	TimestampTz last_recv_timestamp = GetCurrentTimestamp();
-	bool		ping_sent = false;
 	TimeLineID	tli;
 	ErrorContextCallback errcallback;
+	TimestampTz now;
 
 	/*
 	 * Init the ApplyMessageContext which we clean up after each replication
@@ -2740,6 +2757,12 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 	errcallback.previous = error_context_stack;
 	error_context_stack = &errcallback;
 
+	/* Initialize nap wakeup times. */
+	now = GetCurrentTimestamp();
+	for (int i = 0; i < NUM_LRW_WAKEUPS; i++)
+		LogRepWorkerComputeNextWakeup(i, now);
+	next_sync_start = PG_INT64_MAX;
+
 	/* This outer loop iterates once per wait. */
 	for (;;)
 	{
@@ -2756,6 +2779,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 
 		len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
 
+		now = GetCurrentTimestamp();
 		if (len != 0)
 		{
 			/* Loop to process all available data (without blocking). */
@@ -2779,9 +2803,9 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 					int			c;
 					StringInfoData s;
 
-					/* Reset timeout. */
-					last_recv_timestamp = GetCurrentTimestamp();
-					ping_sent = false;
+					/* Adjust the ping and terminate wakeup times. */
+					LogRepWorkerComputeNextWakeup(LRW_WAKEUP_TERMINATE, now);
+					LogRepWorkerComputeNextWakeup(LRW_WAKEUP_PING, now);
 
 					/* Ensure we are reading the data into our memory context. */
 					MemoryContextSwitchTo(ApplyMessageContext);
@@ -2835,6 +2859,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 				}
 
 				len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
+				now = GetCurrentTimestamp();
 			}
 		}
 
@@ -2873,14 +2898,43 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 		if (!dlist_is_empty(&lsn_mapping))
 			wait_time = WalWriterDelay;
 		else
-			wait_time = NAPTIME_PER_CYCLE;
+		{
+			TimestampTz nextWakeup = PG_INT64_MAX;
+
+			/* Find soonest wakeup time, to limit our nap. */
+			now = GetCurrentTimestamp();
+			for (int i = 0; i < NUM_LRW_WAKEUPS; i++)
+				nextWakeup = Min(wakeup[i], nextWakeup);
+
+			/* Also consider special wakeup time for starting sync workers. */
+			if (next_sync_start < now)
+			{
+				/*
+				 * Instead of spinning while we wait for the sync worker to
+				 * start, wait a bit before retrying (unless there's an earlier
+				 * wakeup time).
+				 */
+				nextWakeup = Min(now + INT64CONST(100000), nextWakeup);
+			}
+			else
+				nextWakeup = Min(next_sync_start, nextWakeup);
+
+			/*
+			 * Calculate the nap time.  WaitLatchOrSocket() doesn't accept
+			 * timeouts longer than INT_MAX milliseconds, so we limit the
+			 * result accordingly.  Also, we round up to the next millisecond
+			 * to avoid waking up too early and spinning until one of the
+			 * wakeup times.
+			 */
+			wait_time = (int) Min(INT_MAX, Max(0, (nextWakeup - now + 999) / 1000));
+		}
 
 		rc = WaitLatchOrSocket(MyLatch,
 							   WL_SOCKET_READABLE | WL_LATCH_SET |
 							   WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
 							   fd, wait_time,
 							   WAIT_EVENT_LOGICAL_APPLY_MAIN);
-
+		now = GetCurrentTimestamp();
 		if (rc & WL_LATCH_SET)
 		{
 			ResetLatch(MyLatch);
@@ -2891,6 +2945,16 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 		{
 			ConfigReloadPending = false;
 			ProcessConfigFile(PGC_SIGHUP);
+			now = GetCurrentTimestamp();
+			for (int i = 0; i < NUM_LRW_WAKEUPS; i++)
+				LogRepWorkerComputeNextWakeup(i, now);
+
+			/*
+			 * If a wakeup time for starting sync workers was set, just set it
+			 * to right now.  It will be recalculated as needed.
+			 */
+			if (next_sync_start != PG_INT64_MAX)
+				next_sync_start = now;
 		}
 
 		if (rc & WL_TIMEOUT)
@@ -2909,31 +2973,16 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 			 * Check if time since last receive from primary has reached the
 			 * configured limit.
 			 */
-			if (wal_receiver_timeout > 0)
-			{
-				TimestampTz now = GetCurrentTimestamp();
-				TimestampTz timeout;
+			if (now >= wakeup[LRW_WAKEUP_TERMINATE])
+				ereport(ERROR,
+						(errcode(ERRCODE_CONNECTION_FAILURE),
+						 errmsg("terminating logical replication worker due to timeout")));
 
-				timeout =
-					TimestampTzPlusMilliseconds(last_recv_timestamp,
-												wal_receiver_timeout);
-
-				if (now >= timeout)
-					ereport(ERROR,
-							(errcode(ERRCODE_CONNECTION_FAILURE),
-							 errmsg("terminating logical replication worker due to timeout")));
-
-				/* Check to see if it's time for a ping. */
-				if (!ping_sent)
-				{
-					timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
-														  (wal_receiver_timeout / 2));
-					if (now >= timeout)
-					{
-						requestReply = true;
-						ping_sent = true;
-					}
-				}
+			/* Check to see if it's time for a ping. */
+			if (now >= wakeup[LRW_WAKEUP_PING])
+			{
+				requestReply = true;
+				wakeup[LRW_WAKEUP_PING] = PG_INT64_MAX;
 			}
 
 			send_feedback(last_received, requestReply, requestReply);
@@ -2968,7 +3017,6 @@ static void
 send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
 {
 	static StringInfo reply_message = NULL;
-	static TimestampTz send_time = 0;
 
 	static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
 	static XLogRecPtr last_writepos = InvalidXLogRecPtr;
@@ -3011,10 +3059,11 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
 	if (!force &&
 		writepos == last_writepos &&
 		flushpos == last_flushpos &&
-		!TimestampDifferenceExceeds(send_time, now,
-									wal_receiver_status_interval * 1000))
+		now < wakeup[LRW_WAKEUP_STATUS])
 		return;
-	send_time = now;
+
+	/* Make sure we wake up when it's time to send another status update. */
+	LogRepWorkerComputeNextWakeup(LRW_WAKEUP_STATUS, now);
 
 	if (!reply_message)
 	{
@@ -4092,3 +4141,36 @@ reset_apply_error_context_info(void)
 	apply_error_callback_arg.remote_attnum = -1;
 	set_apply_error_context_xact(InvalidTransactionId, InvalidXLogRecPtr);
 }
+
+/*
+ * Compute the next wakeup time for a given wakeup reason.  Can be called to
+ * initialize a wakeup time, to adjust it for the next wakeup, or to
+ * reinitialize it when GUCs have changed.
+ */
+static void
+LogRepWorkerComputeNextWakeup(LogRepWorkerWakeupReason reason, TimestampTz now)
+{
+	switch (reason)
+	{
+		case LRW_WAKEUP_TERMINATE:
+			if (wal_receiver_timeout <= 0)
+				wakeup[reason] = PG_INT64_MAX;
+			else
+				wakeup[reason] = now + wal_receiver_timeout * INT64CONST(1000);
+			break;
+		case LRW_WAKEUP_PING:
+			if (wal_receiver_timeout <= 0)
+				wakeup[reason] = PG_INT64_MAX;
+			else
+				wakeup[reason] = now + (wal_receiver_timeout / 2) * INT64CONST(1000);
+			break;
+		case LRW_WAKEUP_STATUS:
+			if (wal_receiver_status_interval <= 0)
+				wakeup[reason] = PG_INT64_MAX;
+			else
+				wakeup[reason] = now + wal_receiver_status_interval * INT64CONST(1000000);
+			break;
+		default:
+			break;
+	}
+}
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 2b7114ff6d..dfb7b06796 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -80,6 +80,9 @@ extern PGDLLIMPORT LogicalRepWorker *MyLogicalRepWorker;
 
 extern PGDLLIMPORT bool in_remote_transaction;
 
+/* Next time to attempt starting sync workers. */
+extern PGDLLIMPORT TimestampTz next_sync_start;
+
 extern void logicalrep_worker_attach(int slot);
 extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
 												bool only_running);
-- 
2.25.1

Reply via email to