From d371369ab8d510450b12f857241e650bbc1fc5d7 Mon Sep 17 00:00:00 2001
From: alterego655 <824662526@qq.com>
Date: Sun, 14 Dec 2025 11:05:08 +0800
Subject: [PATCH v2] Add WALRCV_CONNECTING state to walreceiver

Previously, walreceiver set status='streaming' early in startup before
receiving any WAL, making it unreliable for health monitoring.

Introduce WALRCV_CONNECTING as an intermediate state. Walreceiver enters
CONNECTING on startup and transitions to STREAMING only when the first valid 
WAL record is processed by startup process. This allows monitoring tools to
distinguish "attempting to connect" from "actively streaming WAL".
---
 src/backend/access/transam/xlogrecovery.c  | 17 ++++++++++++++
 src/backend/replication/walreceiver.c      |  8 +++++--
 src/backend/replication/walreceiverfuncs.c | 27 +++++++++++++++++++++-
 src/include/replication/walreceiver.h      |  3 +++
 4 files changed, 52 insertions(+), 3 deletions(-)

diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index ae2398d6975..89af2909063 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -250,6 +250,9 @@ static XLogSource currentSource = XLOG_FROM_ANY;
 static bool lastSourceFailed = false;
 static bool pendingWalRcvRestart = false;
 
+/* Guard to update walreceiver state only once per streaming session. */
+static bool walrcv_streaming_set = false;
+
 /*
  * These variables track when we last obtained some WAL data to process,
  * and where we got it from.  (XLogReceiptSource is initially the same as
@@ -1827,6 +1830,17 @@ PerformWalRecovery(void)
 					recoveryPausesHere(false);
 			}
 
+			/*
+			 * If we are reading from the stream, this is the first valid
+			 * record we have successfully parsed. Now we can verify the
+			 * connection is truly streaming valid WAL.
+			 */
+			if (!walrcv_streaming_set && readSource == XLOG_FROM_STREAM)
+			{
+				if (WalRcvSetStreaming())
+					walrcv_streaming_set = true;
+			}
+
 			/*
 			 * Apply the record
 			 */
@@ -3692,6 +3706,9 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
 					 * one can hope...
 					 */
 
+					/* Reset our "streaming active" guard flag */
+					walrcv_streaming_set = false;
+
 					/*
 					 * We should be able to move to XLOG_FROM_STREAM only in
 					 * standby mode.
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index ac802ae85b4..98cae5a25af 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -205,6 +205,7 @@ WalReceiverMain(const void *startup_data, size_t startup_data_len)
 			break;
 
 		case WALRCV_WAITING:
+		case WALRCV_CONNECTING:
 		case WALRCV_STREAMING:
 		case WALRCV_RESTARTING:
 		default:
@@ -214,7 +215,7 @@ WalReceiverMain(const void *startup_data, size_t startup_data_len)
 	}
 	/* Advertise our PID so that the startup process can kill us */
 	walrcv->pid = MyProcPid;
-	walrcv->walRcvState = WALRCV_STREAMING;
+	walrcv->walRcvState = WALRCV_CONNECTING;
 
 	/* Fetch information required to start streaming */
 	walrcv->ready_to_display = false;
@@ -688,7 +689,7 @@ WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
 			 */
 			*startpoint = walrcv->receiveStart;
 			*startpointTLI = walrcv->receiveStartTLI;
-			walrcv->walRcvState = WALRCV_STREAMING;
+			walrcv->walRcvState = WALRCV_CONNECTING;
 			SpinLockRelease(&walrcv->mutex);
 			break;
 		}
@@ -791,6 +792,7 @@ WalRcvDie(int code, Datum arg)
 	/* Mark ourselves inactive in shared memory */
 	SpinLockAcquire(&walrcv->mutex);
 	Assert(walrcv->walRcvState == WALRCV_STREAMING ||
+		   walrcv->walRcvState == WALRCV_CONNECTING ||
 		   walrcv->walRcvState == WALRCV_RESTARTING ||
 		   walrcv->walRcvState == WALRCV_STARTING ||
 		   walrcv->walRcvState == WALRCV_WAITING ||
@@ -1373,6 +1375,8 @@ WalRcvGetStateString(WalRcvState state)
 			return "stopped";
 		case WALRCV_STARTING:
 			return "starting";
+		case WALRCV_CONNECTING:
+			return "connecting";
 		case WALRCV_STREAMING:
 			return "streaming";
 		case WALRCV_WAITING:
diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c
index 822645748a7..a3bf57291ae 100644
--- a/src/backend/replication/walreceiverfuncs.c
+++ b/src/backend/replication/walreceiverfuncs.c
@@ -179,12 +179,36 @@ WalRcvStreaming(void)
 	}
 
 	if (state == WALRCV_STREAMING || state == WALRCV_STARTING ||
-		state == WALRCV_RESTARTING)
+		state == WALRCV_RESTARTING || state == WALRCV_CONNECTING)
 		return true;
 	else
 		return false;
 }
 
+/*
+ * Transition from CONNECTING to STREAMING state.
+ *
+ * This is called by the startup process when the first WAL record from
+ * the walreceiver is processed, indicating that the connection is fully
+ * established and data is flowing.
+ */
+bool
+WalRcvSetStreaming(void)
+{
+	WalRcvData *walrcv = WalRcv;
+	bool		set = false;
+
+	SpinLockAcquire(&walrcv->mutex);
+	if (walrcv->walRcvState == WALRCV_CONNECTING)
+	{
+		walrcv->walRcvState = WALRCV_STREAMING;
+		set = true;
+	}
+	SpinLockRelease(&walrcv->mutex);
+
+	return set;
+}
+
 /*
  * Stop walreceiver (if running) and wait for it to die.
  * Executed by the Startup process.
@@ -211,6 +235,7 @@ ShutdownWalRcv(void)
 			stopped = true;
 			break;
 
+		case WALRCV_CONNECTING:
 		case WALRCV_STREAMING:
 		case WALRCV_WAITING:
 		case WALRCV_RESTARTING:
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index e5557d21fa8..5217ffc7636 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -47,6 +47,8 @@ typedef enum
 	WALRCV_STOPPED,				/* stopped and mustn't start up again */
 	WALRCV_STARTING,			/* launched, but the process hasn't
 								 * initialized yet */
+	WALRCV_CONNECTING,			/* connection starting, but no WAL streaming
+								 * yet */
 	WALRCV_STREAMING,			/* walreceiver is streaming */
 	WALRCV_WAITING,				/* stopped streaming, waiting for orders */
 	WALRCV_RESTARTING,			/* asked to restart streaming */
@@ -499,6 +501,7 @@ extern WalRcvState WalRcvGetState(void);
 extern void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr,
 								 const char *conninfo, const char *slotname,
 								 bool create_temp_slot);
+extern bool WalRcvSetStreaming(void);
 extern XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI);
 extern XLogRecPtr GetWalRcvWriteRecPtr(void);
 extern int	GetReplicationApplyDelay(void);
-- 
2.51.0

