Hi Hackers, Bug #19093 [1] reported that pg_stat_wal_receiver.status = 'streaming' does not accurately reflect streaming health. In that discussion, Noah noted that even before the reported regression, status = 'streaming' was unreliable because walreceiver sets it during early startup, before attempting a connection. He suggested:
"Long-term, in master only, perhaps we should introduce another status like 'connecting'. Perhaps enact the connecting->streaming status transition just before tendering the first byte of streamed WAL to the startup process. Alternatively, enact that transition when the startup process accepts the first streamed byte." Michael and I also thought this could be a useful addition. This patch implements that suggestion by adding a new WALRCV_CONNECTING state. == Background == Currently, walreceiver transitions directly from STARTING to STREAMING early in WalReceiverMain(), before any WAL data has been received. This means status = 'streaming' can be observed even when: - The connection to the primary has not been established - No WAL data has actually been received or flushed This makes it difficult for monitoring tools to distinguish between a healthy streaming replica and one that is merely attempting to stream. == Proposal == Introduce WALRCV_CONNECTING as an intermediate state between STARTING and STREAMING: - When walreceiver starts, it enters CONNECTING (instead of going directly to STREAMING). - The transition to STREAMING occurs in XLogWalRcvFlush(), inside the existing spinlock-protected block that updates flushedUpto. Feedbacks welcome. [1] https://www.postgresql.org/message-id/flat/19093-c4fff49a608f82a0%40postgresql.org -- Best, Xuneng
From a332797c81a9b84b482802b2bac003c2cfe08f87 Mon Sep 17 00:00:00 2001 From: alterego655 <[email protected]> Date: Fri, 12 Dec 2025 12:21:14 +0800 Subject: [PATCH] Add WALRCV_CONNECTING state to walreceiver Previously, walreceiver set status='streaming' early in startup before receiving any WAL, making it unreliable for health monitoring. Introduce WALRCV_CONNECTING as an intermediate state. Walreceiver enters CONNECTING on startup and transitions to STREAMING only when the first WAL chunk is flushed to disk in XLogWalRcvFlush(). This allows monitoring tools to distinguish "attempting to connect" from "actively receiving WAL". --- src/backend/replication/walreceiver.c | 13 ++++++++++--- src/backend/replication/walreceiverfuncs.c | 2 ++ src/include/replication/walreceiver.h | 1 + 3 files changed, 13 insertions(+), 3 deletions(-) diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index ac802ae85b4..11e35e83e8c 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -205,6 +205,7 @@ WalReceiverMain(const void *startup_data, size_t startup_data_len) break; case WALRCV_WAITING: + case WALRCV_CONNECTING: case WALRCV_STREAMING: case WALRCV_RESTARTING: default: @@ -214,7 +215,7 @@ WalReceiverMain(const void *startup_data, size_t startup_data_len) } /* Advertise our PID so that the startup process can kill us */ walrcv->pid = MyProcPid; - walrcv->walRcvState = WALRCV_STREAMING; + walrcv->walRcvState = WALRCV_CONNECTING; /* Fetch information required to start streaming */ walrcv->ready_to_display = false; @@ -649,7 +650,7 @@ WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI) SpinLockAcquire(&walrcv->mutex); state = walrcv->walRcvState; - if (state != WALRCV_STREAMING) + if (state != WALRCV_STREAMING && state != WALRCV_CONNECTING) { SpinLockRelease(&walrcv->mutex); if (state == WALRCV_STOPPING) @@ -688,7 +689,7 @@ WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI) */ *startpoint = walrcv->receiveStart; *startpointTLI = walrcv->receiveStartTLI; - walrcv->walRcvState = WALRCV_STREAMING; + walrcv->walRcvState = WALRCV_CONNECTING; SpinLockRelease(&walrcv->mutex); break; } @@ -791,6 +792,7 @@ WalRcvDie(int code, Datum arg) /* Mark ourselves inactive in shared memory */ SpinLockAcquire(&walrcv->mutex); Assert(walrcv->walRcvState == WALRCV_STREAMING || + walrcv->walRcvState == WALRCV_CONNECTING || walrcv->walRcvState == WALRCV_RESTARTING || walrcv->walRcvState == WALRCV_STARTING || walrcv->walRcvState == WALRCV_WAITING || @@ -1001,6 +1003,9 @@ XLogWalRcvFlush(bool dying, TimeLineID tli) walrcv->latestChunkStart = walrcv->flushedUpto; walrcv->flushedUpto = LogstreamResult.Flush; walrcv->receivedTLI = tli; + + if (walrcv->walRcvState == WALRCV_CONNECTING) + walrcv->walRcvState = WALRCV_STREAMING; } SpinLockRelease(&walrcv->mutex); @@ -1373,6 +1378,8 @@ WalRcvGetStateString(WalRcvState state) return "stopped"; case WALRCV_STARTING: return "starting"; + case WALRCV_CONNECTING: + return "connecting"; case WALRCV_STREAMING: return "streaming"; case WALRCV_WAITING: diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c index 822645748a7..0f98d07109d 100644 --- a/src/backend/replication/walreceiverfuncs.c +++ b/src/backend/replication/walreceiverfuncs.c @@ -179,6 +179,7 @@ WalRcvStreaming(void) } if (state == WALRCV_STREAMING || state == WALRCV_STARTING || + state == WALRCV_CONNECTING || state == WALRCV_RESTARTING) return true; else @@ -211,6 +212,7 @@ ShutdownWalRcv(void) stopped = true; break; + case WALRCV_CONNECTING: case WALRCV_STREAMING: case WALRCV_WAITING: case WALRCV_RESTARTING: diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index e5557d21fa8..0921752cce2 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -47,6 +47,7 @@ typedef enum WALRCV_STOPPED, /* stopped and mustn't start up again */ WALRCV_STARTING, /* launched, but the process hasn't * initialized yet */ + WALRCV_CONNECTING, /* connection starting, but no WAL data yet */ WALRCV_STREAMING, /* walreceiver is streaming */ WALRCV_WAITING, /* stopped streaming, waiting for orders */ WALRCV_RESTARTING, /* asked to restart streaming */ -- 2.51.0
