Hi,

On Thu, Apr 03, 2025 at 03:23:31PM +0530, vignesh C wrote:
> Can we add it to one of the subscription tests, such as 001_rep_changes.pl?

Yeah that sounds like a good place for it. Done in the attached.

Regards,

-- 
Bertrand Drouvot
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com
>From 49d9c73a29f5c05567d01f57424b7b8fd4df4eee Mon Sep 17 00:00:00 2001
From: Bertrand Drouvot <bertranddrouvot...@gmail.com>
Date: Tue, 25 Feb 2025 10:18:05 +0000
Subject: [PATCH v6] Flush the IO statistics of active walsenders

The walsender does not flush its IO statistics until it exits.
The issue is there since pg_stat_io has been introduced in a9c70b46dbe.
This commits:

1. ensures it does not wait to exit to flush its IO statistics
2. flush its IO statistics periodically to not overload the walsender
3. adds a test for a physical walsender and a test for a logical walsender
---
 src/backend/replication/walsender.c        | 63 +++++++++++++++-------
 src/test/recovery/t/001_stream_rep.pl      | 16 ++++++
 src/test/subscription/t/001_rep_changes.pl | 13 +++++
 3 files changed, 74 insertions(+), 18 deletions(-)
  65.8% src/backend/replication/
  20.0% src/test/recovery/t/
  14.0% src/test/subscription/t/

diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 1028919aecb..9eed37b5de9 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -91,10 +91,14 @@
 #include "utils/guc.h"
 #include "utils/memutils.h"
 #include "utils/pg_lsn.h"
+#include "utils/pgstat_internal.h"
 #include "utils/ps_status.h"
 #include "utils/timeout.h"
 #include "utils/timestamp.h"
 
+/* Minimum interval walsender IO stats flushes */
+#define MIN_IOSTATS_FLUSH_INTERVAL         1000
+
 /*
  * Maximum data payload in a WAL data message.  Must be >= XLOG_BLCKSZ.
  *
@@ -2742,6 +2746,8 @@ WalSndCheckTimeOut(void)
 static void
 WalSndLoop(WalSndSendDataCallback send_data)
 {
+	TimestampTz last_flush = 0;
+
 	/*
 	 * Initialize the last reply timestamp. That enables timeout processing
 	 * from hereon.
@@ -2836,30 +2842,51 @@ WalSndLoop(WalSndSendDataCallback send_data)
 		 * WalSndWaitForWal() handle any other blocking; idle receivers need
 		 * its additional actions.  For physical replication, also block if
 		 * caught up; its send_data does not block.
+		 *
+		 * When the WAL sender is caught up or has pending data to send, we
+		 * also periodically report I/O statistics. It's done periodically to
+		 * not overload the WAL sender.
 		 */
-		if ((WalSndCaughtUp && send_data != XLogSendLogical &&
-			 !streamingDoneSending) ||
-			pq_is_send_pending())
+		if ((WalSndCaughtUp && !streamingDoneSending) || pq_is_send_pending())
 		{
-			long		sleeptime;
-			int			wakeEvents;
+			TimestampTz now;
 
-			if (!streamingDoneReceiving)
-				wakeEvents = WL_SOCKET_READABLE;
-			else
-				wakeEvents = 0;
+			now = GetCurrentTimestamp();
 
-			/*
-			 * Use fresh timestamp, not last_processing, to reduce the chance
-			 * of reaching wal_sender_timeout before sending a keepalive.
-			 */
-			sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp());
+			if (TimestampDifferenceExceeds(last_flush, now, MIN_IOSTATS_FLUSH_INTERVAL))
+			{
+				/*
+				 * Report IO statistics
+				 */
+				pgstat_flush_io(false);
+				(void) pgstat_flush_backend(false, PGSTAT_BACKEND_FLUSH_IO);
+				last_flush = now;
+			}
 
-			if (pq_is_send_pending())
-				wakeEvents |= WL_SOCKET_WRITEABLE;
+			if (send_data != XLogSendLogical || pq_is_send_pending())
+			{
+				long		sleeptime;
+				int			wakeEvents;
+
+				if (!streamingDoneReceiving)
+					wakeEvents = WL_SOCKET_READABLE;
+				else
+					wakeEvents = 0;
+
+				/*
+				 * Use fresh timestamp, not last_processing, to reduce the
+				 * chance of reaching wal_sender_timeout before sending a
+				 * keepalive.
+				 */
+				sleeptime = WalSndComputeSleeptime(now);
+
+				if (pq_is_send_pending())
+					wakeEvents |= WL_SOCKET_WRITEABLE;
+
+				/* Sleep until something happens or we time out */
+				WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_MAIN);
+			}
 
-			/* Sleep until something happens or we time out */
-			WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_MAIN);
 		}
 	}
 }
diff --git a/src/test/recovery/t/001_stream_rep.pl b/src/test/recovery/t/001_stream_rep.pl
index ccd8417d449..2508295eca6 100644
--- a/src/test/recovery/t/001_stream_rep.pl
+++ b/src/test/recovery/t/001_stream_rep.pl
@@ -42,6 +42,9 @@ $node_standby_2->init_from_backup($node_standby_1, $backup_name,
 	has_streaming => 1);
 $node_standby_2->start;
 
+# To check that an active walsender updates its IO statistics below.
+$node_primary->safe_psql('postgres', "SELECT pg_stat_reset_shared('io')");
+
 # Create some content on primary and check its presence in standby nodes
 $node_primary->safe_psql('postgres',
 	"CREATE TABLE tab_int AS SELECT generate_series(1,1002) AS a");
@@ -333,6 +336,19 @@ $node_primary->psql(
 
 note "switching to physical replication slot";
 
+# Wait for the walsender to update its IO statistics.
+# Has to be done before the next restart and far enough from the
+# pg_stat_reset_shared('io') to minimize the risk of polling for too long.
+$node_primary->poll_query_until(
+	'postgres',
+	qq[SELECT sum(reads) > 0
+       FROM pg_catalog.pg_stat_io
+       WHERE backend_type = 'walsender'
+       AND object = 'wal']
+  )
+  or die
+  "Timed out while waiting for the walsender to update its IO statistics";
+
 # Switch to using a physical replication slot. We can do this without a new
 # backup since physical slots can go backwards if needed. Do so on both
 # standbys. Since we're going to be testing things that affect the slot state,
diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl
index 8726fe04ad2..fc17bbd48a0 100644
--- a/src/test/subscription/t/001_rep_changes.pl
+++ b/src/test/subscription/t/001_rep_changes.pl
@@ -184,6 +184,19 @@ $result =
   $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_no_col");
 is($result, qq(2), 'check replicated changes for table having no columns');
 
+# Wait for the walsender to update its IO statistics.
+# Has to be done far enough from the CREATE SUBSCRIPTION to minimize the risk
+# of polling for too long.
+$node_publisher->poll_query_until(
+	'postgres',
+	qq[SELECT sum(reads) > 0
+       FROM pg_catalog.pg_stat_io
+       WHERE backend_type = 'walsender'
+       AND object = 'wal']
+  )
+  or die
+  "Timed out while waiting for the walsender to update its IO statistics";
+
 # insert some duplicate rows
 $node_publisher->safe_psql('postgres',
 	"INSERT INTO tab_full SELECT generate_series(1,10)");
-- 
2.34.1

Reply via email to