Hi, Alexander! Thank you again for the attention to this patch!
> Could you, please, also comment change from check for AllowCascadeReplication() to StandbyWithCascadeReplication()? Do you think this is beneficial and saves us from sending the notifications when they are useless?
The original intention of this check was to avoid enabling all the timer-based machinery for cases, when we definitely don't need to send notifications. So, we try to avoid potential impact of the patch on such cases even if new options are enabled. For example, this may happen if we restore server from backup and apply WAL archive on it (as PerformWalRecovery will be invoked in this case as well). Both 'hot_standby' and 'wal_senders' parameters are enabled by default, so even primary server may pass the the 'AllowCascadeReplication' condition in such case. So, we want to be sure that we the 'StandbyMode' is actually set and we are part of Startup process.
However, the change may be also reasonable by itself, to avoid calling WalSndWakeup at all in such cases (thus avoiding acquiring/releasing CV mutex), although I do not think that it will provide measurable improvements.
> Also, could you comment this condition. > if (cascadeReplicationMaxBatchSize <= 1 && appliedRecords == 0)> Does this mean that if batching was disabled in config then enforced by SIGHUP, we will still wait for the current batch to be completed? Would it be better to stop batching immediately?
Sure, if either of 'cascade_replication_batch_size' or 'cascade_replication_batch_delay' is changed, then we need to flush current batch (send notification) and then decide whether we need to perform batching for next records. This is why we set 'replicationNotificationPending' flag in 'assign_cascade_replication_batch_values' (and disable timer if it is set), so it could be processed in final part 'ProcessStartupProcInterrupts'. So, technically we should not find ourselves in the situation when we have 'cascadeReplicationMaxBatchSize' set to 1 and appliedRecords set to non-zero value. This check for 'appliedRecords' value seems to be a remnant of an my intermediate patch version, where these values were already runtime modifiable, but not yet processed 'assign_cascade_replication_batch_values'. I think it could be safely removed to avoid confusion. Thank you for noticing this!
> Also, this patch lacks documentation. I would especially like to see combinations of GUCs described (cascade_replication_batch_size is enabled, but cascade_replication_batch_delay disabled, and vise versa).
I've added the documentation for these two parameters in the new version of the patch (config.sgml). The new patch version also contains the change for minimal parameter value for 'cascade_replication_batch_size' - now minimal value is 1 to indicate disabled batching. In previous version both '0' and '1' values were valid options to disable batching, but this was looking ambiguous in the documentation. So, I've decided to leave only '1' as valid value to make it simpler to describe. I've also rebased the patch on top of current master to fix failures during the build checks.
Thanks, Alexey
From a9176ccfe4a446c39b93f30595387757273de003 Mon Sep 17 00:00:00 2001 From: Alexey Makhmutov <[email protected]> Date: Fri, 7 Nov 2025 02:03:58 +0300 Subject: [PATCH] Implement batching for WAL records notification during cascade replication Currently standby server notifies walsenders after applying of each WAL record during cascade replication. This creates a bottleneck in case of large number of sender processes during WalSndWakeup invocation. This change introduces batching for such notifications, which are now sent either after certain number of applied records or specified time interval (whichever comes first). Co-authored-by: Rustam Khamidullin <[email protected]> --- doc/src/sgml/config.sgml | 60 +++++++++ src/backend/access/transam/xlogrecovery.c | 127 +++++++++++++++++- src/backend/postmaster/startup.c | 5 + src/backend/utils/misc/guc_parameters.dat | 21 +++ src/backend/utils/misc/postgresql.conf.sample | 7 + src/include/access/xlogrecovery.h | 8 ++ src/include/utils/guc_hooks.h | 1 + src/include/utils/timeout.h | 1 + 8 files changed, 228 insertions(+), 2 deletions(-) diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 06d1e4403b5..748ecd51cde 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -5287,6 +5287,66 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class=" </para> </listitem> </varlistentry> + + <varlistentry id="guc-cascade-replication-batch-size" xreflabel="cascade_replication_batch_size"> + <term><varname>cascade_replication_batch_size</varname> (<type>integer</type>) + <indexterm> + <primary><varname>cascade_replication_batch_size</varname> configuration parameter</primary> + </indexterm> + </term> + <listitem> + <para> + The number of applied WAL records after which the standby server will + notify connected logical replication clients. + </para> + <para> + By default, the standby server notifies each connected logical + replication client after applying each WAL record. In there is a large + number of such clients (100+), this can slow down the WAL application + process. In this case, a batching mode can be used, so server will + notify clients only after applying the specified number of records. This + improves efficiency of notification process, but may cause delays for + delivery of changes to downstream servers if new WAL records arrives on + current server with large intervals (e.g. during low load periods). + Parameter <varname>cascade_replication_batch_delay</varname> could be + used to control the maximum notification delay for such cases. + </para> + <para> + Default value is 1, which disables batching mode for such notifications. + </para> + </listitem> + </varlistentry> + + <varlistentry id="guc-cascade-replication-batch-delay" xreflabel="cascade_replication_batch_delay"> + <term><varname>cascade_replication_batch_delay</varname> (<type>integer</type>) + <indexterm> + <primary><varname>cascade_replication_batch_delay</varname> configuration parameter</primary> + </indexterm> + </term> + <listitem> + <para> + Specifies the maximum notification delay for logical replication clients + on the standby server if the + <varname>cascade_replication_batch_size</varname> parameter is set to a + value greater than 1. The time is calculated from the completion of + applying the first WAL record in the batch. Connected clients will + be notified either after applying the configured number of records or + after the specified time interval, whichever comes first. + </para> + <para> + The default value is 500ms. Setting value to 0 disables time-based + notifications for batch mode. Therefore, logical replication clients + will notice the presence of a new available WAL record after configured + number of records is applied or after the sender wakes up for other + reasons. + </para> + <para> + If batching mode is not enabled for logical replication notifications + (i.e. value of <varname>cascade_replication_batch_size</varname> is set + to 1), then this parameter is ignored. + </para> + </listitem> + </varlistentry> </variablelist> </sect2> diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index 550de6e4a59..2a126ef2793 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -66,6 +66,7 @@ #include "utils/pg_lsn.h" #include "utils/ps_status.h" #include "utils/pg_rusage.h" +#include "utils/timeout.h" /* Unsupported old recovery command file names (relative to $PGDATA) */ #define RECOVERY_COMMAND_FILE "recovery.conf" @@ -149,6 +150,13 @@ bool InArchiveRecovery = false; static bool StandbyModeRequested = false; bool StandbyMode = false; +/* + * Whether we are currently in process of processing recovery records while + * allowing downstream replication instances + */ +#define StandbyWithCascadeReplication() \ + (AmStartupProcess() && StandbyMode && AllowCascadeReplication()) + /* was a signal file present at startup? */ static bool standby_signal_file_found = false; static bool recovery_signal_file_found = false; @@ -305,6 +313,28 @@ bool reachedConsistency = false; static char *replay_image_masked = NULL; static char *primary_image_masked = NULL; +/* + * Maximum number of applied records in batch before notifying walsender during + * cascade replication + */ +int cascadeReplicationMaxBatchSize; + +/* + * Maximum batching delay before notifying walsender during cascade replication + */ +int cascadeReplicationMaxBatchDelay; + +/* Current cascade replication batching delay used while enabling timer */ +static int cascadeDelayCurrent = 0; + +/* Counter for applied records which are not yet signaled to walsenders */ +static int appliedRecords = 0; + +/* + * True if downstream walsenders need to be notified about pending WAL records, + * set by timeout handler. + */ +volatile sig_atomic_t replicationNotificationPending = false; /* * Shared-memory state for WAL recovery. @@ -1857,6 +1887,15 @@ PerformWalRecovery(void) * end of main redo apply loop */ + /* Send notification for batched messages once loop is ended */ + if (StandbyWithCascadeReplication() && appliedRecords > 0) + { + if (cascadeDelayCurrent > 0) + disable_timeout(STANDBY_CASCADE_WAL_SEND_TIMEOUT, false); + appliedRecords = 0; + WalSndWakeup(false, true); + } + if (reachedRecoveryTarget) { if (!reachedConsistency) @@ -2055,8 +2094,45 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl * be created otherwise) * ------ */ - if (AllowCascadeReplication()) - WalSndWakeup(switchedTLI, true); + + if (StandbyWithCascadeReplication()) + { + if (cascadeReplicationMaxBatchSize <= 1) + WalSndWakeup(switchedTLI, true); + else + { + /* + * If time line has switched, then we will imediately notify both + * physical and logical downstream walsenders here, as we do not + * want to introduce additional delay in such case. Otherwise we + * will wait until we apply specified number of records before + * notifying downstream logical walsenders. + */ + bool batchFlushRequired = + ++appliedRecords >= cascadeReplicationMaxBatchSize || + replicationNotificationPending || + switchedTLI; + + if (batchFlushRequired) + { + if (cascadeDelayCurrent > 0) + disable_timeout(STANDBY_CASCADE_WAL_SEND_TIMEOUT, false); + appliedRecords = 0; + replicationNotificationPending = false; + } + + WalSndWakeup(switchedTLI, batchFlushRequired); + + /* Setup timeout to limit maximum delay for notifications */ + if (appliedRecords == 1) + { + cascadeDelayCurrent = cascadeReplicationMaxBatchDelay; + if (cascadeDelayCurrent > 0) + enable_timeout_after(STANDBY_CASCADE_WAL_SEND_TIMEOUT, + cascadeDelayCurrent); + } + } + } /* * If rm_redo called XLogRequestWalReceiverReply, then we wake up the @@ -5129,3 +5205,50 @@ assign_recovery_target_xid(const char *newval, void *extra) else recoveryTarget = RECOVERY_TARGET_UNSET; } + +/* + * GUC assign_hook for cascade_replication_batch_size and + * cascade_replication_batch_delay + */ +void +assign_cascade_replication_batch_values(int new_value, void *extra) +{ + /* + * If either cascade_replication_batch_size or + * cascade_replication_batch_delay is changed, then we want to disable + * current timer (if any) and immediately flush current batch. New values + * will be picked once next WAL record is applied. + */ + if (cascadeDelayCurrent > 0) + { + cascadeDelayCurrent = 0; + disable_timeout(STANDBY_CASCADE_WAL_SEND_TIMEOUT, false); + } + /* Will be processed by ProcessStartupProcInterrupts */ + replicationNotificationPending = true; +} + +/* + * Send notifications to downstream walsenders if there are batched records + */ +void +StandbyWalCheckSendNotify(void) +{ + if (appliedRecords > 0) + { + WalSndWakeup(false, true); + appliedRecords = 0; + } + replicationNotificationPending = false; +} + +/* + * Timer handler for batch notifications in cascade replication + */ +void +StandbyWalSendTimeoutHandler(void) +{ + replicationNotificationPending = true; + /* Most likely process is waiting for arrival of WAL records */ + WakeupRecovery(); +} diff --git a/src/backend/postmaster/startup.c b/src/backend/postmaster/startup.c index 27e86cf393f..555090c82a0 100644 --- a/src/backend/postmaster/startup.c +++ b/src/backend/postmaster/startup.c @@ -189,6 +189,10 @@ ProcessStartupProcInterrupts(void) if (ProcSignalBarrierPending) ProcessProcSignalBarrier(); + /* Send a notification to downstream walsenders if required */ + if (replicationNotificationPending) + StandbyWalCheckSendNotify(); + /* Perform logging of memory contexts of this process */ if (LogMemoryContextPending) ProcessLogMemoryContextInterrupt(); @@ -246,6 +250,7 @@ StartupProcessMain(const void *startup_data, size_t startup_data_len) RegisterTimeout(STANDBY_DEADLOCK_TIMEOUT, StandbyDeadLockHandler); RegisterTimeout(STANDBY_TIMEOUT, StandbyTimeoutHandler); RegisterTimeout(STANDBY_LOCK_TIMEOUT, StandbyLockTimeoutHandler); + RegisterTimeout(STANDBY_CASCADE_WAL_SEND_TIMEOUT, StandbyWalSendTimeoutHandler); /* * Unblock signals (they were blocked when the postmaster forked us) diff --git a/src/backend/utils/misc/guc_parameters.dat b/src/backend/utils/misc/guc_parameters.dat index 1128167c025..fefd3eeedc7 100644 --- a/src/backend/utils/misc/guc_parameters.dat +++ b/src/backend/utils/misc/guc_parameters.dat @@ -354,6 +354,27 @@ options => 'bytea_output_options', }, +{ name => 'cascade_replication_batch_delay', type => 'int', context => 'PGC_SIGHUP', group => 'REPLICATION_STANDBY', + short_desc => 'Sets the maximum time before cascade walsenders are notified on standby about applied records.', + long_desc => '0 disables timed notifications. This option works only if cascade_replication_batch_size is greater than 0.', + flags => 'GUC_UNIT_MS', + variable => 'cascadeReplicationMaxBatchDelay', + boot_val => '500', + min => '0', + max => 'INT_MAX', + assign_hook => 'assign_cascade_replication_batch_values', +}, + +{ name => 'cascade_replication_batch_size', type => 'int', context => 'PGC_SIGHUP', group => 'REPLICATION_STANDBY', + short_desc => 'Sets the maximum number of applied WAL records before cascade walsenders are notified on standby.', + long_desc => '1 disables records batching in cascade replication.', + variable => 'cascadeReplicationMaxBatchSize', + boot_val => '1', + min => '1', + max => 'INT_MAX', + assign_hook => 'assign_cascade_replication_batch_values', +}, + { name => 'check_function_bodies', type => 'bool', context => 'PGC_USERSET', group => 'CLIENT_CONN_STATEMENT', short_desc => 'Check routine bodies during CREATE FUNCTION and CREATE PROCEDURE.', variable => 'check_function_bodies', diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index f62b61967ef..2f41c751880 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -376,6 +376,13 @@ # is not set #wal_receiver_status_interval = 10s # send replies at least this often # 0 disables +#cascade_replication_batch_size = 1 # maximum number of applied WAL records + # before cascade logical walsenders are notified on standby + # 1 disables records batching in cascade replication +#cascade_replication_batch_delay = 500ms # maximum time before cascade walsenders + # are notified about applied records if batching is enabled + # (i.e. if cascade_replication_batch_size is greater than 1) + # 0 disables timed notifications during batching #hot_standby_feedback = off # send info from standby to prevent # query conflicts #wal_receiver_timeout = 60s # time that receiver waits for diff --git a/src/include/access/xlogrecovery.h b/src/include/access/xlogrecovery.h index 8e475e266d1..75a09d4b9e3 100644 --- a/src/include/access/xlogrecovery.h +++ b/src/include/access/xlogrecovery.h @@ -11,6 +11,8 @@ #ifndef XLOGRECOVERY_H #define XLOGRECOVERY_H +#include <signal.h> + #include "access/xlogreader.h" #include "catalog/pg_control.h" #include "lib/stringinfo.h" @@ -67,6 +69,8 @@ extern PGDLLIMPORT char *PrimarySlotName; extern PGDLLIMPORT char *recoveryRestoreCommand; extern PGDLLIMPORT char *recoveryEndCommand; extern PGDLLIMPORT char *archiveCleanupCommand; +extern PGDLLIMPORT int cascadeReplicationMaxBatchSize; +extern PGDLLIMPORT int cascadeReplicationMaxBatchDelay; /* indirectly set via GUC system */ extern PGDLLIMPORT TransactionId recoveryTargetXid; @@ -165,4 +169,8 @@ extern void RecoveryRequiresIntParameter(const char *param_name, int currValue, extern void xlog_outdesc(StringInfo buf, XLogReaderState *record); +extern PGDLLIMPORT volatile sig_atomic_t replicationNotificationPending; +extern void StandbyWalCheckSendNotify(void); +extern void StandbyWalSendTimeoutHandler(void); + #endif /* XLOGRECOVERY_H */ diff --git a/src/include/utils/guc_hooks.h b/src/include/utils/guc_hooks.h index 82ac8646a8d..370ad79d5e5 100644 --- a/src/include/utils/guc_hooks.h +++ b/src/include/utils/guc_hooks.h @@ -118,6 +118,7 @@ extern void assign_recovery_target_timeline(const char *newval, void *extra); extern bool check_recovery_target_xid(char **newval, void **extra, GucSource source); extern void assign_recovery_target_xid(const char *newval, void *extra); +extern void assign_cascade_replication_batch_values(int new_value, void *extra); extern bool check_role(char **newval, void **extra, GucSource source); extern void assign_role(const char *newval, void *extra); extern const char *show_role(void); diff --git a/src/include/utils/timeout.h b/src/include/utils/timeout.h index 7b19beafdc9..0062cb562b9 100644 --- a/src/include/utils/timeout.h +++ b/src/include/utils/timeout.h @@ -36,6 +36,7 @@ typedef enum TimeoutId IDLE_STATS_UPDATE_TIMEOUT, CLIENT_CONNECTION_CHECK_TIMEOUT, STARTUP_PROGRESS_TIMEOUT, + STANDBY_CASCADE_WAL_SEND_TIMEOUT, /* First user-definable timeout reason */ USER_TIMEOUT, /* Maximum number of timeout reasons */ -- 2.51.2
