Hello hackers,

This is a continuation of the thread https://www.postgresql.org/message-id/flat/076eb7bd-52e6-4a51-ba00-c744d027b15c%40postgrespro.ru, with focus only on the patch related to improving performance in case of large number of cascaded walsenders.

We’ve faced an interesting situation on a standby environment with configured cascade replication and large number (~100) of configured walsenders. We’ve noticed a very high CPU consumption on such environment with the most time-consuming operation being signal delivery from startup recovery process to walsenders via WalSndWakeup invocations from ApplyWalRecord in xlogrecovery.c.

The startup standby process notifies walsenders for downstream systems using ConditionVariableBroadcast (CV), so only processes waiting on this CV need to be contacted. However in case of high load we seems to be hitting here a bottleneck anyway. The current implementation tries to send notification after processing of each WAL record (i.e. during each invocation of ApplyWalRecord), so this implies high rate of WalSndWakeup invocations. At the same time, this also provides each walsender with very small chunk of data to process, so almost every process will be present in the CV wait list for the next iteration. As result, waiting list should be always fully packed in such case, which additionally reduces performance of WAL records processing by the standby instance.

To reproduce such behavior we could use a simple environment with three servers: primary instance, attached physical standby and its downstream server with large number of logical replication subscriptions. Attached is the synthetic test case (test_scenario.zip) to reproduce this behavior: script ‘test_prepare.sh’ could be used to create required environment with test data and ‘test_execute.sh’ script executes ‘pgbench’ tool with simple updates against primary instance to trigger replication to other servers. With just about 6 clients I could observe high CPU consumption by the 'startup recovering process' (and it may be sufficient to completely saturate the CPU on a smaller machine). Please check the environment properties at the top of these scripts before running them, as they need to be updated in order to specify location for installed PG build, target location for database instances creation and used ports.

After thinking about possible ways to improve such case, we've decided to implement batching for notification delivery. We try to slightly postpone sending notification until recovery has applied some number of messages.This reduces rate of CV notifications and also gives receivers more data to process, so they may not need to enter the CV wait state so often. Counting applied records is not difficult, but the tricky part here is to ensure that we do not postpone notifications for too long in case of low load. To reduce such delay we use a timer handler, which sets a timeout flag, which is checked in ProcessStartupProcInterrupts. This allow us to send signal on timeout if the startup process is waiting for the arrival of new WAL records (in ReadRecord). The WalSndWakeup will be invoked either after applying certain number of messages or after expiration of timeout since last notification. The notification however may be delayed while record is being applied (during redo handler invocation from ApplyWalRecord). This could increase delay for some corner cases with non-trivial WAL records like ‘drop database’, but this should be a rare case and walsender process have its own limit on the wait time, so the delay won’t be indefinite even in this case.

The patch introduces two GUCs to control the batching behavior. The first one controls size of batched messages ('cascade_replication_batch_size') and is set to 0 by default, so the functionality is effectively disabled. The second one controls timed delay during batching ('cascade_replication_batch_delay'), which is by default set to 500ms. The delay is used only if batching is enabled.

With this patch applied we’ve noticed a significant reduction in CPU consumption while using the synthetic test program mentioned above. It would be great to hear any thoughts on these observations and fixing approaches, as well as possible pitfalls of proposed changes.

Thanks,
Alexey
From c32b3bd82b53e88cd5cbcca7a2e367aa446a68ae Mon Sep 17 00:00:00 2001
From: Rustam Khamidullin <[email protected]>
Date: Fri, 14 Mar 2025 18:18:34 +0700
Subject: [PATCH] Implement batching for WAL records notification during
 cascade replication

Currently standby server notifies walsenders after applying of each WAL
record during cascade replication. This creates a bottleneck in case of large
number of sender processes during WalSndWakeup invocation. This change
introduces batching for such notifications, which are now sent either after
certain number of applied records or specified time interval (whichever comes
first).

Co-authored-by: Alexey Makhmutov <[email protected]>
---
 src/backend/access/transam/xlogrecovery.c | 99 ++++++++++++++++++++++-
 src/backend/postmaster/startup.c          |  5 ++
 src/backend/utils/misc/guc_tables.c       | 22 +++++
 src/include/access/xlogrecovery.h         |  8 ++
 src/include/utils/timeout.h               |  1 +
 5 files changed, 133 insertions(+), 2 deletions(-)

diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index f23ec8969c2..4edeac46100 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -64,6 +64,7 @@
 #include "utils/pg_lsn.h"
 #include "utils/ps_status.h"
 #include "utils/pg_rusage.h"
+#include "utils/timeout.h"
 
 /* Unsupported old recovery command file names (relative to $PGDATA) */
 #define RECOVERY_COMMAND_FILE	"recovery.conf"
@@ -147,6 +148,13 @@ bool		InArchiveRecovery = false;
 static bool StandbyModeRequested = false;
 bool		StandbyMode = false;
 
+/*
+ * Whether we are currently in process of processing recovery records while
+ * allowing downstream replication instances
+ */
+#define StandbyWithCascadeReplication() \
+		(AmStartupProcess() && StandbyMode && AllowCascadeReplication())
+
 /* was a signal file present at startup? */
 static bool standby_signal_file_found = false;
 static bool recovery_signal_file_found = false;
@@ -303,6 +311,25 @@ bool		reachedConsistency = false;
 static char *replay_image_masked = NULL;
 static char *primary_image_masked = NULL;
 
+/*
+ * Maximum number of applied records in batch before notifying walsender during
+ * cascade replication
+ */
+int			cascadeReplicationMaxBatchSize;
+
+/*
+ * Maximum batching delay before notifying walsender during cascade replication
+ */
+int			cascadeReplicationMaxBatchDelay;
+
+/* Counter for applied records which are not yet signaled to walsenders */
+static int	appliedRecords = 0;
+
+/*
+ * True if downstream walsenders need to be notified about pending WAL records,
+ * set by timeout handler.
+ */
+volatile sig_atomic_t replicationNotificationPending = false;
 
 /*
  * Shared-memory state for WAL recovery.
@@ -1845,6 +1872,17 @@ PerformWalRecovery(void)
 		 * end of main redo apply loop
 		 */
 
+		/* Send notification for batched messages once loop is ended */
+		if (StandbyWithCascadeReplication() &&
+			cascadeReplicationMaxBatchSize > 1 &&
+			appliedRecords > 0)
+		{
+			if (cascadeReplicationMaxBatchDelay > 0)
+				disable_timeout(STANDBY_CASCADE_WAL_SEND_TIMEOUT, false);
+			appliedRecords = 0;
+			WalSndWakeup(false, true);
+		}
+
 		if (reachedRecoveryTarget)
 		{
 			if (!reachedConsistency)
@@ -2043,8 +2081,40 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl
 	 *    be created otherwise)
 	 * ------
 	 */
-	if (AllowCascadeReplication())
-		WalSndWakeup(switchedTLI, true);
+
+	if (StandbyWithCascadeReplication())
+	{
+		if (cascadeReplicationMaxBatchSize <= 1)
+			WalSndWakeup(switchedTLI, true);
+		else
+		{
+			/*
+			 * If time line has switched, then we do not want to delay the
+			 * notification, otherwise we will wait until we apply specified
+			 * number of records before notifying downstream logical
+			 * walsenders.
+			 */
+			bool		batchFlushRequired =
+				++appliedRecords >= cascadeReplicationMaxBatchSize ||
+				replicationNotificationPending ||
+				switchedTLI;
+
+			if (batchFlushRequired)
+			{
+				if (appliedRecords > 0 && cascadeReplicationMaxBatchDelay > 0)
+					disable_timeout(STANDBY_CASCADE_WAL_SEND_TIMEOUT, false);
+				appliedRecords = 0;
+				replicationNotificationPending = false;
+			}
+
+			WalSndWakeup(switchedTLI, batchFlushRequired);
+
+			/* Setup timeout to limit maximum delay for notifications */
+			if (appliedRecords == 1 && cascadeReplicationMaxBatchDelay > 0)
+				enable_timeout_after(STANDBY_CASCADE_WAL_SEND_TIMEOUT,
+									 cascadeReplicationMaxBatchDelay);
+		}
+	}
 
 	/*
 	 * If rm_redo called XLogRequestWalReceiverReply, then we wake up the
@@ -5083,3 +5153,28 @@ assign_recovery_target_xid(const char *newval, void *extra)
 	else
 		recoveryTarget = RECOVERY_TARGET_UNSET;
 }
+
+/*
+ * Send notifications to downstream walsenders
+ */
+void
+StandbyWalCheckSendNotify(void)
+{
+	if (appliedRecords > 0)
+	{
+		WalSndWakeup(false, true);
+		appliedRecords = 0;
+	}
+	replicationNotificationPending = false;
+}
+
+/*
+ * Timer handler for batch notifications in cascade replication
+ */
+void
+StandbyWalSendTimeoutHandler(void)
+{
+	replicationNotificationPending = true;
+	/* Most likely process is waiting for arrival of WAL records */
+	SetLatch(&XLogRecoveryCtl->recoveryWakeupLatch);
+}
diff --git a/src/backend/postmaster/startup.c b/src/backend/postmaster/startup.c
index 27e86cf393f..555090c82a0 100644
--- a/src/backend/postmaster/startup.c
+++ b/src/backend/postmaster/startup.c
@@ -189,6 +189,10 @@ ProcessStartupProcInterrupts(void)
 	if (ProcSignalBarrierPending)
 		ProcessProcSignalBarrier();
 
+	/* Send a notification to downstream walsenders if required */
+	if (replicationNotificationPending)
+		StandbyWalCheckSendNotify();
+
 	/* Perform logging of memory contexts of this process */
 	if (LogMemoryContextPending)
 		ProcessLogMemoryContextInterrupt();
@@ -246,6 +250,7 @@ StartupProcessMain(const void *startup_data, size_t startup_data_len)
 	RegisterTimeout(STANDBY_DEADLOCK_TIMEOUT, StandbyDeadLockHandler);
 	RegisterTimeout(STANDBY_TIMEOUT, StandbyTimeoutHandler);
 	RegisterTimeout(STANDBY_LOCK_TIMEOUT, StandbyLockTimeoutHandler);
+	RegisterTimeout(STANDBY_CASCADE_WAL_SEND_TIMEOUT, StandbyWalSendTimeoutHandler);
 
 	/*
 	 * Unblock signals (they were blocked when the postmaster forked us)
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index f137129209f..cc547ba283b 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -2406,6 +2406,28 @@ struct config_int ConfigureNamesInt[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"cascade_replication_batch_size", PGC_POSTMASTER, REPLICATION_STANDBY,
+			gettext_noop("Set the maximum number of applied WAL records before cascade walsenders are notified on standby."),
+			gettext_noop("0 disabled records batching in cascade replication"),
+			GUC_NOT_IN_SAMPLE
+		},
+		&cascadeReplicationMaxBatchSize,
+		500, 0, INT_MAX,
+		NULL, NULL, NULL
+	},
+
+	{
+		{"cascade_replication_batch_delay", PGC_POSTMASTER, REPLICATION_STANDBY,
+			gettext_noop("Sets the maximum time before cascade walsenders are notified on standby about applied records."),
+			gettext_noop("0 disables timed notifications"),
+			GUC_NOT_IN_SAMPLE | GUC_UNIT_MS
+		},
+		&cascadeReplicationMaxBatchDelay,
+		500, 0, INT_MAX,
+		NULL, NULL, NULL
+	},
+
 	{
 		{"max_connections", PGC_POSTMASTER, CONN_AUTH_SETTINGS,
 			gettext_noop("Sets the maximum number of concurrent connections."),
diff --git a/src/include/access/xlogrecovery.h b/src/include/access/xlogrecovery.h
index 8e475e266d1..75a09d4b9e3 100644
--- a/src/include/access/xlogrecovery.h
+++ b/src/include/access/xlogrecovery.h
@@ -11,6 +11,8 @@
 #ifndef XLOGRECOVERY_H
 #define XLOGRECOVERY_H
 
+#include <signal.h>
+
 #include "access/xlogreader.h"
 #include "catalog/pg_control.h"
 #include "lib/stringinfo.h"
@@ -67,6 +69,8 @@ extern PGDLLIMPORT char *PrimarySlotName;
 extern PGDLLIMPORT char *recoveryRestoreCommand;
 extern PGDLLIMPORT char *recoveryEndCommand;
 extern PGDLLIMPORT char *archiveCleanupCommand;
+extern PGDLLIMPORT int cascadeReplicationMaxBatchSize;
+extern PGDLLIMPORT int cascadeReplicationMaxBatchDelay;
 
 /* indirectly set via GUC system */
 extern PGDLLIMPORT TransactionId recoveryTargetXid;
@@ -165,4 +169,8 @@ extern void RecoveryRequiresIntParameter(const char *param_name, int currValue,
 
 extern void xlog_outdesc(StringInfo buf, XLogReaderState *record);
 
+extern PGDLLIMPORT volatile sig_atomic_t replicationNotificationPending;
+extern void StandbyWalCheckSendNotify(void);
+extern void StandbyWalSendTimeoutHandler(void);
+
 #endif							/* XLOGRECOVERY_H */
diff --git a/src/include/utils/timeout.h b/src/include/utils/timeout.h
index 7b19beafdc9..0062cb562b9 100644
--- a/src/include/utils/timeout.h
+++ b/src/include/utils/timeout.h
@@ -36,6 +36,7 @@ typedef enum TimeoutId
 	IDLE_STATS_UPDATE_TIMEOUT,
 	CLIENT_CONNECTION_CHECK_TIMEOUT,
 	STARTUP_PROGRESS_TIMEOUT,
+	STANDBY_CASCADE_WAL_SEND_TIMEOUT,
 	/* First user-definable timeout reason */
 	USER_TIMEOUT,
 	/* Maximum number of timeout reasons */
-- 
2.51.0

<<attachment: test_scenario.zip>>

Reply via email to