rebased for cfbot
--
Nathan Bossart
Amazon Web Services: https://aws.amazon.com
>From 2466001a3ae6f94aac8eff45b488765e619bea1b Mon Sep 17 00:00:00 2001
From: Nathan Bossart <[email protected]>
Date: Thu, 1 Dec 2022 20:50:21 -0800
Subject: [PATCH v2 1/1] suppress unnecessary wakeups in logical/worker.c
---
src/backend/replication/logical/tablesync.c | 20 +++
src/backend/replication/logical/worker.c | 156 +++++++++++++++-----
src/include/replication/worker_internal.h | 3 +
3 files changed, 142 insertions(+), 37 deletions(-)
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 38dfce7129..88218e1fed 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -419,6 +419,13 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
Assert(!IsTransactionState());
+ /*
+ * If we've made it past our previously-stored special wakeup time, reset
+ * it so that it can be recalculated as needed.
+ */
+ if (next_sync_start <= GetCurrentTimestamp())
+ next_sync_start = PG_INT64_MAX;
+
/* We need up-to-date sync state info for subscription tables here. */
FetchTableStates(&started_tx);
@@ -592,6 +599,19 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
DSM_HANDLE_INVALID);
hentry->last_start_time = now;
}
+ else if (found)
+ {
+ TimestampTz retry_time = hentry->last_start_time +
+ (wal_retrieve_retry_interval *
+ INT64CONST(1000));
+
+ /*
+ * Store when we can start the sync worker so that we
+ * know how long to sleep.
+ */
+ if (retry_time < next_sync_start)
+ next_sync_start = retry_time;
+ }
}
}
}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 79cda39445..284f11428c 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -207,8 +207,6 @@
#include "utils/syscache.h"
#include "utils/timeout.h"
-#define NAPTIME_PER_CYCLE 1000 /* max sleep time between cycles (1s) */
-
typedef struct FlushPosition
{
dlist_node node;
@@ -348,6 +346,26 @@ static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr;
/* BufFile handle of the current streaming file */
static BufFile *stream_fd = NULL;
+/*
+ * Reasons to wake up and perform periodic tasks.
+ */
+typedef enum LogRepWorkerWakeupReason
+{
+ LRW_WAKEUP_TERMINATE,
+ LRW_WAKEUP_PING,
+ LRW_WAKEUP_STATUS,
+ NUM_LRW_WAKEUPS
+} LogRepWorkerWakeupReason;
+
+/*
+ * Wake up times for periodic tasks.
+ */
+static TimestampTz wakeup[NUM_LRW_WAKEUPS];
+TimestampTz next_sync_start;
+
+static void LogRepWorkerComputeNextWakeup(LogRepWorkerWakeupReason reason,
+ TimestampTz now);
+
typedef struct SubXactInfo
{
TransactionId xid; /* XID of the subxact */
@@ -3446,10 +3464,9 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
static void
LogicalRepApplyLoop(XLogRecPtr last_received)
{
- TimestampTz last_recv_timestamp = GetCurrentTimestamp();
- bool ping_sent = false;
TimeLineID tli;
ErrorContextCallback errcallback;
+ TimestampTz now;
/*
* Init the ApplyMessageContext which we clean up after each replication
@@ -3479,6 +3496,12 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
error_context_stack = &errcallback;
apply_error_context_stack = error_context_stack;
+ /* Initialize nap wakeup times. */
+ now = GetCurrentTimestamp();
+ for (int i = 0; i < NUM_LRW_WAKEUPS; i++)
+ LogRepWorkerComputeNextWakeup(i, now);
+ next_sync_start = PG_INT64_MAX;
+
/* This outer loop iterates once per wait. */
for (;;)
{
@@ -3495,6 +3518,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
+ now = GetCurrentTimestamp();
if (len != 0)
{
/* Loop to process all available data (without blocking). */
@@ -3518,9 +3542,9 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
int c;
StringInfoData s;
- /* Reset timeout. */
- last_recv_timestamp = GetCurrentTimestamp();
- ping_sent = false;
+ /* Adjust the ping and terminate wakeup times. */
+ LogRepWorkerComputeNextWakeup(LRW_WAKEUP_TERMINATE, now);
+ LogRepWorkerComputeNextWakeup(LRW_WAKEUP_PING, now);
/* Ensure we are reading the data into our memory context. */
MemoryContextSwitchTo(ApplyMessageContext);
@@ -3574,6 +3598,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
}
len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
+ now = GetCurrentTimestamp();
}
}
@@ -3612,14 +3637,43 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
if (!dlist_is_empty(&lsn_mapping))
wait_time = WalWriterDelay;
else
- wait_time = NAPTIME_PER_CYCLE;
+ {
+ TimestampTz nextWakeup = PG_INT64_MAX;
+
+ /* Find soonest wakeup time, to limit our nap. */
+ now = GetCurrentTimestamp();
+ for (int i = 0; i < NUM_LRW_WAKEUPS; i++)
+ nextWakeup = Min(wakeup[i], nextWakeup);
+
+ /* Also consider special wakeup time for starting sync workers. */
+ if (next_sync_start < now)
+ {
+ /*
+ * Instead of spinning while we wait for the sync worker to
+ * start, wait a bit before retrying (unless there's an earlier
+ * wakeup time).
+ */
+ nextWakeup = Min(now + INT64CONST(100000), nextWakeup);
+ }
+ else
+ nextWakeup = Min(next_sync_start, nextWakeup);
+
+ /*
+ * Calculate the nap time. WaitLatchOrSocket() doesn't accept
+ * timeouts longer than INT_MAX milliseconds, so we limit the
+ * result accordingly. Also, we round up to the next millisecond
+ * to avoid waking up too early and spinning until one of the
+ * wakeup times.
+ */
+ wait_time = (int) Min(INT_MAX, Max(0, (nextWakeup - now + 999) / 1000));
+ }
rc = WaitLatchOrSocket(MyLatch,
WL_SOCKET_READABLE | WL_LATCH_SET |
WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
fd, wait_time,
WAIT_EVENT_LOGICAL_APPLY_MAIN);
-
+ now = GetCurrentTimestamp();
if (rc & WL_LATCH_SET)
{
ResetLatch(MyLatch);
@@ -3630,6 +3684,16 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
{
ConfigReloadPending = false;
ProcessConfigFile(PGC_SIGHUP);
+ now = GetCurrentTimestamp();
+ for (int i = 0; i < NUM_LRW_WAKEUPS; i++)
+ LogRepWorkerComputeNextWakeup(i, now);
+
+ /*
+ * If a wakeup time for starting sync workers was set, just set it
+ * to right now. It will be recalculated as needed.
+ */
+ if (next_sync_start != PG_INT64_MAX)
+ next_sync_start = now;
}
if (rc & WL_TIMEOUT)
@@ -3648,31 +3712,16 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
* Check if time since last receive from primary has reached the
* configured limit.
*/
- if (wal_receiver_timeout > 0)
- {
- TimestampTz now = GetCurrentTimestamp();
- TimestampTz timeout;
+ if (now >= wakeup[LRW_WAKEUP_TERMINATE])
+ ereport(ERROR,
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("terminating logical replication worker due to timeout")));
- timeout =
- TimestampTzPlusMilliseconds(last_recv_timestamp,
- wal_receiver_timeout);
-
- if (now >= timeout)
- ereport(ERROR,
- (errcode(ERRCODE_CONNECTION_FAILURE),
- errmsg("terminating logical replication worker due to timeout")));
-
- /* Check to see if it's time for a ping. */
- if (!ping_sent)
- {
- timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
- (wal_receiver_timeout / 2));
- if (now >= timeout)
- {
- requestReply = true;
- ping_sent = true;
- }
- }
+ /* Check to see if it's time for a ping. */
+ if (now >= wakeup[LRW_WAKEUP_PING])
+ {
+ requestReply = true;
+ wakeup[LRW_WAKEUP_PING] = PG_INT64_MAX;
}
send_feedback(last_received, requestReply, requestReply);
@@ -3708,7 +3757,6 @@ static void
send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
{
static StringInfo reply_message = NULL;
- static TimestampTz send_time = 0;
static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
static XLogRecPtr last_writepos = InvalidXLogRecPtr;
@@ -3751,10 +3799,11 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
if (!force &&
writepos == last_writepos &&
flushpos == last_flushpos &&
- !TimestampDifferenceExceeds(send_time, now,
- wal_receiver_status_interval * 1000))
+ now < wakeup[LRW_WAKEUP_STATUS])
return;
- send_time = now;
+
+ /* Make sure we wake up when it's time to send another status update. */
+ LogRepWorkerComputeNextWakeup(LRW_WAKEUP_STATUS, now);
if (!reply_message)
{
@@ -5034,3 +5083,36 @@ get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo)
return TRANS_LEADER_SEND_TO_PARALLEL;
}
}
+
+/*
+ * Compute the next wakeup time for a given wakeup reason. Can be called to
+ * initialize a wakeup time, to adjust it for the next wakeup, or to
+ * reinitialize it when GUCs have changed.
+ */
+static void
+LogRepWorkerComputeNextWakeup(LogRepWorkerWakeupReason reason, TimestampTz now)
+{
+ switch (reason)
+ {
+ case LRW_WAKEUP_TERMINATE:
+ if (wal_receiver_timeout <= 0)
+ wakeup[reason] = PG_INT64_MAX;
+ else
+ wakeup[reason] = now + wal_receiver_timeout * INT64CONST(1000);
+ break;
+ case LRW_WAKEUP_PING:
+ if (wal_receiver_timeout <= 0)
+ wakeup[reason] = PG_INT64_MAX;
+ else
+ wakeup[reason] = now + (wal_receiver_timeout / 2) * INT64CONST(1000);
+ break;
+ case LRW_WAKEUP_STATUS:
+ if (wal_receiver_status_interval <= 0)
+ wakeup[reason] = PG_INT64_MAX;
+ else
+ wakeup[reason] = now + wal_receiver_status_interval * INT64CONST(1000000);
+ break;
+ default:
+ break;
+ }
+}
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index db891eea8a..ec39711ed1 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -225,6 +225,9 @@ extern PGDLLIMPORT LogicalRepWorker *MyLogicalRepWorker;
extern PGDLLIMPORT bool in_remote_transaction;
+/* Next time to attempt starting sync workers. */
+extern PGDLLIMPORT TimestampTz next_sync_start;
+
extern void logicalrep_worker_attach(int slot);
extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
bool only_running);
--
2.25.1