Hi, Alexander!
Thank you very much for looking at the patch and providing valuable
feedback!
> This approach makes sense to me. Do you think it might have corner
cases? I suggest the test scenario might include some delay between
"UPDATE" queries. Then we can see how changing of this delay interacts
with cascade_replication_batch_delay.
The effect of 'cascade_replication_batch_delay' setting could be more
easily observed by manually changing a single row in the primary
database ('A' instance in the test) and then observing the delay before
such change became visible on the 'C' instance. Something like following:
On C instance:
select c0 where test_repli_test_t1 where id=0 \watch 1
On A instance, first set the initial value:
update test_repli_test_t1 set c0=0 where id=0;
... and then update the row and wait for it to became visible on C instance:
update test_repli_test_t1 set c0=c0+1 where id=0;
In my tests with enabled batching and without enabling delay limit (i.e.
by setting the 'cascade_replication_batch_delay' to 0), the change
became visible in about 5-6 seconds (as walsender on B instance seems to
wake up by itself anyway). With 'cascade_replication_batch_delay' set to
500 (ms) the value became visible almost immediately.
> This comment tells about logical walsenders, but they same will be
applied to physical walsenders, right?
Yes, this item probably needs some clarification. In this code path we
are dealing with logical walsenders, as physical walsenders are notified
in XLogWalRcvFlush. However, when TLI changes, this code will notify
both physical and logical walsenders. So, I've changed the comment now
to describe this behavior more clearly.
Another question is whether we really need to notify physical walsenders
at this point. This was the logic of the original code, so I kept it
when adding batching support. However, it seems that physical sender
should not be very interested in knowing that logical decoding has
discovered change in timeline ID, as it should be either already
notified by walreceiver or discover it by itself in the stored WAL data
if recovery was invoked at startup. So, maybe the better approach here
is just to keep notifications for logical walsenders only.
> I see these two GUCs are both PGC_POSTMASTER. Could they be PGC_SIGHUP?
This is a good suggestion. I've tried to implement support for
PGC_SIGHUP context in the new patch version. Now the current batch
should be flushed immediately as parameters are changed and then new
values will be used for processing once next WAL record is applied. This
also makes testing a little simpler: if we start test script for longer
interval (i.e. 300 seconds instead of 60), then it's possible to see how
CPU load is changed on the fly as batching is enabled or disabled.
> Also I think there is a typo in the the description of
cascade_replication_batch_size, it must say "0 disables".
Sure, thanks for catching this!
> I also think these GUCs should be in the sample file, possibly
disabled by default because it only make sense to set up them with high
number of cascaded walsenders.
Yes, it was my intention for having 'cascade_replication_batch_size'
disabled by default as it was described in the mail message, but I
forget to actually set it to '0' in the previous patch version. Thank
you for noticing this! The 'cascade_replication_batch_delay' is working
only if batching is enabled (i.e. batch size is set to value greater
than 1), so a value of 500 (ms) seems to be a reasonable default settings.
I've also added both values to the sample configuration in the new patch
version, as suggested.
The new patch version with changes described above and rebased on top of
current master is attached.
Thank you again for looking on this proposal!
Thanks,
Alexey
From e8154d48aa0f4a447f8bd519dd64f1da500d85a3 Mon Sep 17 00:00:00 2001
From: Rustam Khamidullin <[email protected]>
Date: Fri, 14 Mar 2025 18:18:34 +0700
Subject: [PATCH] Implement batching for WAL records notification during
cascade replication
Currently standby server notifies walsenders after applying of each WAL
record during cascade replication. This creates a bottleneck in case of large
number of sender processes during WalSndWakeup invocation. This change
introduces batching for such notifications, which are now sent either after
certain number of applied records or specified time interval (whichever comes
first).
Co-authored-by: Alexey Makhmutov <[email protected]>
---
src/backend/access/transam/xlogrecovery.c | 127 +++++++++++++++++-
src/backend/postmaster/startup.c | 5 +
src/backend/utils/misc/guc_parameters.dat | 21 +++
src/backend/utils/misc/postgresql.conf.sample | 6 +
src/include/access/xlogrecovery.h | 8 ++
src/include/utils/guc_hooks.h | 1 +
src/include/utils/timeout.h | 1 +
7 files changed, 167 insertions(+), 2 deletions(-)
diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index 346319338a0..eec35406e6e 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -64,6 +64,7 @@
#include "utils/pg_lsn.h"
#include "utils/ps_status.h"
#include "utils/pg_rusage.h"
+#include "utils/timeout.h"
/* Unsupported old recovery command file names (relative to $PGDATA) */
#define RECOVERY_COMMAND_FILE "recovery.conf"
@@ -147,6 +148,13 @@ bool InArchiveRecovery = false;
static bool StandbyModeRequested = false;
bool StandbyMode = false;
+/*
+ * Whether we are currently in process of processing recovery records while
+ * allowing downstream replication instances
+ */
+#define StandbyWithCascadeReplication() \
+ (AmStartupProcess() && StandbyMode && AllowCascadeReplication())
+
/* was a signal file present at startup? */
static bool standby_signal_file_found = false;
static bool recovery_signal_file_found = false;
@@ -303,6 +311,28 @@ bool reachedConsistency = false;
static char *replay_image_masked = NULL;
static char *primary_image_masked = NULL;
+/*
+ * Maximum number of applied records in batch before notifying walsender during
+ * cascade replication
+ */
+int cascadeReplicationMaxBatchSize;
+
+/*
+ * Maximum batching delay before notifying walsender during cascade replication
+ */
+int cascadeReplicationMaxBatchDelay;
+
+/* Current cascade replication batching delay used while enabling timer */
+static int cascadeDelayCurrent = 0;
+
+/* Counter for applied records which are not yet signaled to walsenders */
+static int appliedRecords = 0;
+
+/*
+ * True if downstream walsenders need to be notified about pending WAL records,
+ * set by timeout handler.
+ */
+volatile sig_atomic_t replicationNotificationPending = false;
/*
* Shared-memory state for WAL recovery.
@@ -1845,6 +1875,15 @@ PerformWalRecovery(void)
* end of main redo apply loop
*/
+ /* Send notification for batched messages once loop is ended */
+ if (StandbyWithCascadeReplication() && appliedRecords > 0)
+ {
+ if (cascadeDelayCurrent > 0)
+ disable_timeout(STANDBY_CASCADE_WAL_SEND_TIMEOUT, false);
+ appliedRecords = 0;
+ WalSndWakeup(false, true);
+ }
+
if (reachedRecoveryTarget)
{
if (!reachedConsistency)
@@ -2043,8 +2082,45 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl
* be created otherwise)
* ------
*/
- if (AllowCascadeReplication())
- WalSndWakeup(switchedTLI, true);
+
+ if (StandbyWithCascadeReplication())
+ {
+ if (cascadeReplicationMaxBatchSize <= 1 && appliedRecords == 0)
+ WalSndWakeup(switchedTLI, true);
+ else
+ {
+ /*
+ * If time line has switched, then we will imediately notify both
+ * physical and logical downstream walsenders here, as we do not
+ * want to introduce additional delay in such case. Otherwise we
+ * will wait until we apply specified number of records before
+ * notifying downstream logical walsenders.
+ */
+ bool batchFlushRequired =
+ ++appliedRecords >= cascadeReplicationMaxBatchSize ||
+ replicationNotificationPending ||
+ switchedTLI;
+
+ if (batchFlushRequired)
+ {
+ if (cascadeDelayCurrent > 0)
+ disable_timeout(STANDBY_CASCADE_WAL_SEND_TIMEOUT, false);
+ appliedRecords = 0;
+ replicationNotificationPending = false;
+ }
+
+ WalSndWakeup(switchedTLI, batchFlushRequired);
+
+ /* Setup timeout to limit maximum delay for notifications */
+ if (appliedRecords == 1)
+ {
+ cascadeDelayCurrent = cascadeReplicationMaxBatchDelay;
+ if (cascadeDelayCurrent > 0)
+ enable_timeout_after(STANDBY_CASCADE_WAL_SEND_TIMEOUT,
+ cascadeDelayCurrent);
+ }
+ }
+ }
/*
* If rm_redo called XLogRequestWalReceiverReply, then we wake up the
@@ -5083,3 +5159,50 @@ assign_recovery_target_xid(const char *newval, void *extra)
else
recoveryTarget = RECOVERY_TARGET_UNSET;
}
+
+/*
+ * GUC assign_hook for cascade_replication_batch_size and
+ * cascade_replication_batch_delay
+ */
+void
+assign_cascade_replication_batch_values(int new_value, void *extra)
+{
+ /*
+ * If either cascade_replication_batch_size or
+ * cascade_replication_batch_delay is changed, then we want to disable
+ * current timer (if any) and immediately flush current batch. New values
+ * will be picked once next WAL record is applied.
+ */
+ if (cascadeDelayCurrent > 0)
+ {
+ cascadeDelayCurrent = 0;
+ disable_timeout(STANDBY_CASCADE_WAL_SEND_TIMEOUT, false);
+ }
+ /* Will be processed by ProcessStartupProcInterrupts */
+ replicationNotificationPending = true;
+}
+
+/*
+ * Send notifications to downstream walsenders if there are batched records
+ */
+void
+StandbyWalCheckSendNotify(void)
+{
+ if (appliedRecords > 0)
+ {
+ WalSndWakeup(false, true);
+ appliedRecords = 0;
+ }
+ replicationNotificationPending = false;
+}
+
+/*
+ * Timer handler for batch notifications in cascade replication
+ */
+void
+StandbyWalSendTimeoutHandler(void)
+{
+ replicationNotificationPending = true;
+ /* Most likely process is waiting for arrival of WAL records */
+ WakeupRecovery();
+}
diff --git a/src/backend/postmaster/startup.c b/src/backend/postmaster/startup.c
index 27e86cf393f..555090c82a0 100644
--- a/src/backend/postmaster/startup.c
+++ b/src/backend/postmaster/startup.c
@@ -189,6 +189,10 @@ ProcessStartupProcInterrupts(void)
if (ProcSignalBarrierPending)
ProcessProcSignalBarrier();
+ /* Send a notification to downstream walsenders if required */
+ if (replicationNotificationPending)
+ StandbyWalCheckSendNotify();
+
/* Perform logging of memory contexts of this process */
if (LogMemoryContextPending)
ProcessLogMemoryContextInterrupt();
@@ -246,6 +250,7 @@ StartupProcessMain(const void *startup_data, size_t startup_data_len)
RegisterTimeout(STANDBY_DEADLOCK_TIMEOUT, StandbyDeadLockHandler);
RegisterTimeout(STANDBY_TIMEOUT, StandbyTimeoutHandler);
RegisterTimeout(STANDBY_LOCK_TIMEOUT, StandbyLockTimeoutHandler);
+ RegisterTimeout(STANDBY_CASCADE_WAL_SEND_TIMEOUT, StandbyWalSendTimeoutHandler);
/*
* Unblock signals (they were blocked when the postmaster forked us)
diff --git a/src/backend/utils/misc/guc_parameters.dat b/src/backend/utils/misc/guc_parameters.dat
index 6bc6be13d2a..9d4f2b5d91e 100644
--- a/src/backend/utils/misc/guc_parameters.dat
+++ b/src/backend/utils/misc/guc_parameters.dat
@@ -1062,6 +1062,27 @@
max => 'INT_MAX',
},
+{ name => 'cascade_replication_batch_size', type => 'int', context => 'PGC_SIGHUP', group => 'REPLICATION_STANDBY',
+ short_desc => 'Sets the maximum number of applied WAL records before cascade walsenders are notified on standby.',
+ long_desc => '0 disables records batching in cascade replication.',
+ variable => 'cascadeReplicationMaxBatchSize',
+ boot_val => '0',
+ min => '0',
+ max => 'INT_MAX',
+ assign_hook => 'assign_cascade_replication_batch_values',
+},
+
+{ name => 'cascade_replication_batch_delay', type => 'int', context => 'PGC_SIGHUP', group => 'REPLICATION_STANDBY',
+ short_desc => 'Sets the maximum time before cascade walsenders are notified on standby about applied records.',
+ long_desc => '0 disables timed notifications. This option works only if cascade_replication_batch_size is greater than 0.',
+ flags => 'GUC_UNIT_MS',
+ variable => 'cascadeReplicationMaxBatchDelay',
+ boot_val => '500',
+ min => '0',
+ max => 'INT_MAX',
+ assign_hook => 'assign_cascade_replication_batch_values',
+},
+
{ name => 'max_connections', type => 'int', context => 'PGC_POSTMASTER', group => 'CONN_AUTH_SETTINGS',
short_desc => 'Sets the maximum number of concurrent connections.',
variable => 'MaxConnections',
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index c36fcb9ab61..9942f90c3e2 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -376,6 +376,12 @@
# is not set
#wal_receiver_status_interval = 10s # send replies at least this often
# 0 disables
+#cascade_replication_batch_size = 0 # maximum number of applied WAL records
+ # before cascade walsenders are notified on standby
+ # 0 disables records batching in cascade replication
+#cascade_replication_batch_delay = 500ms # maximum time before cascade walsenders
+ # are notified about applied records if batching is enabled
+ # 0 disables timed notifications during batching
#hot_standby_feedback = off # send info from standby to prevent
# query conflicts
#wal_receiver_timeout = 60s # time that receiver waits for
diff --git a/src/include/access/xlogrecovery.h b/src/include/access/xlogrecovery.h
index 8e475e266d1..75a09d4b9e3 100644
--- a/src/include/access/xlogrecovery.h
+++ b/src/include/access/xlogrecovery.h
@@ -11,6 +11,8 @@
#ifndef XLOGRECOVERY_H
#define XLOGRECOVERY_H
+#include <signal.h>
+
#include "access/xlogreader.h"
#include "catalog/pg_control.h"
#include "lib/stringinfo.h"
@@ -67,6 +69,8 @@ extern PGDLLIMPORT char *PrimarySlotName;
extern PGDLLIMPORT char *recoveryRestoreCommand;
extern PGDLLIMPORT char *recoveryEndCommand;
extern PGDLLIMPORT char *archiveCleanupCommand;
+extern PGDLLIMPORT int cascadeReplicationMaxBatchSize;
+extern PGDLLIMPORT int cascadeReplicationMaxBatchDelay;
/* indirectly set via GUC system */
extern PGDLLIMPORT TransactionId recoveryTargetXid;
@@ -165,4 +169,8 @@ extern void RecoveryRequiresIntParameter(const char *param_name, int currValue,
extern void xlog_outdesc(StringInfo buf, XLogReaderState *record);
+extern PGDLLIMPORT volatile sig_atomic_t replicationNotificationPending;
+extern void StandbyWalCheckSendNotify(void);
+extern void StandbyWalSendTimeoutHandler(void);
+
#endif /* XLOGRECOVERY_H */
diff --git a/src/include/utils/guc_hooks.h b/src/include/utils/guc_hooks.h
index 82ac8646a8d..370ad79d5e5 100644
--- a/src/include/utils/guc_hooks.h
+++ b/src/include/utils/guc_hooks.h
@@ -118,6 +118,7 @@ extern void assign_recovery_target_timeline(const char *newval, void *extra);
extern bool check_recovery_target_xid(char **newval, void **extra,
GucSource source);
extern void assign_recovery_target_xid(const char *newval, void *extra);
+extern void assign_cascade_replication_batch_values(int new_value, void *extra);
extern bool check_role(char **newval, void **extra, GucSource source);
extern void assign_role(const char *newval, void *extra);
extern const char *show_role(void);
diff --git a/src/include/utils/timeout.h b/src/include/utils/timeout.h
index 7b19beafdc9..0062cb562b9 100644
--- a/src/include/utils/timeout.h
+++ b/src/include/utils/timeout.h
@@ -36,6 +36,7 @@ typedef enum TimeoutId
IDLE_STATS_UPDATE_TIMEOUT,
CLIENT_CONNECTION_CHECK_TIMEOUT,
STARTUP_PROGRESS_TIMEOUT,
+ STANDBY_CASCADE_WAL_SEND_TIMEOUT,
/* First user-definable timeout reason */
USER_TIMEOUT,
/* Maximum number of timeout reasons */
--
2.51.0