Hi Xuneng, On Fri, Mar 20, 2026 at 1:52 AM Xuneng Zhou <[email protected]> wrote: > > The one-segment bound holds for the case where both nodes have > replayed exactly the same WAL -- the gap comes from > RequestXLogStreaming truncating recptr to the segment boundary, so > startpoint is always at the start of the next segment while > GetStandbyFlushRecPtr returns replayPtr within the current one. I > think that part of the analysis is correct.
Right, I verified it: RequestXLogStreaming (walreceiverfuncs.c) explicitly truncates recptr to the segment start to avoid creating broken partial segments. So the cascade always asks to stream from the beginning of a segment, while GetStandbyFlushRecPtr on the upstream returns replayPtr somewhere inside that same segment. The gap is the distance from the segment start to replayPtr, always less than wal_segment_size. > But the gap can legitimately be multiple segments. Consider: the > upstream standby goes down (or is restarted for maintenance) while the > primary keeps generating and archiving WAL. Agreed, that scenario produces a multi-segment gap. But I think handling it is out of scope for this patch. The bug we're fixing is that a cascade can never start streaming from an upstream that is fully caught up, because of the RequestXLogStreaming truncation. Making the walreceiver wait for an upstream that is genuinely many segments behind would be a feature, not a bug fix, and it would need its own discussion about the right behavior. The wal_segment_size threshold keeps the fix narrowly targeted at this specific bug: absorb the sub-segment gap that arises from the truncation, let everything else fail as before. > If there's a consensus for this and the fix of one-segment gap, the > current tap test would become non-deterministic. Good catch. I'll tighten the test to make sure the gap stays within one segment. > I think the difference is that -- during normal streaming, > wal_receiver_timeout will eventually fire and kill the connection, > whereas the catch-up polling loop has no such timeout. Fair point. I'll add a wal_receiver_timeout check to the polling loop so the walreceiver exits if it has been waiting too long, same as it would during normal streaming. Updated patches attached. Best regards, Marco
From 488411b6f85a469d360a11c9a4f9557fe0fb8b4c Mon Sep 17 00:00:00 2001 From: Marco Nenciarini <[email protected]> Date: Wed, 18 Mar 2026 10:23:13 +0100 Subject: [PATCH v5] Fix cascading standby reconnect failure after archive fallback A cascading standby could fail to reconnect to its upstream standby with "requested starting point ... is ahead of the WAL flush position" after falling back to archive recovery. This happened because RequestXLogStreaming() truncates the requested start position to the WAL segment boundary, which can place it ahead of the upstream's flush position reported by GetStandbyFlushRecPtr(). Fix by having the walreceiver check the upstream's current WAL flush position via IDENTIFY_SYSTEM before issuing START_REPLICATION. IDENTIFY_SYSTEM already returns this position (as xlogpos), but walrcv_identify_system() previously discarded it. If the requested start point exceeds the upstream's flush position on the same timeline, the walreceiver waits for wal_retrieve_retry_interval and retries. The wait is limited to gaps of at most one WAL segment, which is the expected case from the segment-boundary truncation. Larger gaps indicate the upstream is genuinely behind, so START_REPLICATION is allowed to proceed (and fail) normally, letting the startup process fall back to other WAL sources. The first wait is logged at LOG level; subsequent waits are demoted to DEBUG1 to avoid log noise. The walreceiver honors wal_receiver_timeout during the wait, so it will exit if the upstream doesn't catch up in time. The bug was introduced in PG 9.3 by commit abfd192b1b5, which added a flush-position check in StartReplication() that rejects requests ahead of the server's WAL flush position. --- .../libpqwalreceiver/libpqwalreceiver.c | 21 ++- src/backend/replication/logical/worker.c | 2 +- src/backend/replication/walreceiver.c | 62 ++++++- .../utils/activity/wait_event_names.txt | 1 + src/include/replication/walreceiver.h | 9 +- src/test/recovery/t/053_cascade_reconnect.pl | 157 ++++++++++++++++++ 6 files changed, 245 insertions(+), 7 deletions(-) create mode 100644 src/test/recovery/t/053_cascade_reconnect.pl diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 9f04c9ed25d..0e1c95180b2 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -63,7 +63,8 @@ static char *libpqrcv_get_conninfo(WalReceiverConn *conn); static void libpqrcv_get_senderinfo(WalReceiverConn *conn, char **sender_host, int *sender_port); static char *libpqrcv_identify_system(WalReceiverConn *conn, - TimeLineID *primary_tli); + TimeLineID *primary_tli, + XLogRecPtr *server_lsn); static char *libpqrcv_get_dbname_from_conninfo(const char *connInfo); static char *libpqrcv_get_option_from_conninfo(const char *connInfo, const char *keyword); @@ -421,7 +422,8 @@ libpqrcv_get_senderinfo(WalReceiverConn *conn, char **sender_host, * timeline ID of the primary. */ static char * -libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli) +libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli, + XLogRecPtr *server_lsn) { PGresult *res; char *primary_sysid; @@ -452,6 +454,21 @@ libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli) PQntuples(res), PQnfields(res), 1, 3))); primary_sysid = pstrdup(PQgetvalue(res, 0, 0)); *primary_tli = pg_strtoint32(PQgetvalue(res, 0, 1)); + + /* Column 2 is the server's current WAL flush position */ + if (server_lsn) + { + uint32 hi, + lo; + + if (sscanf(PQgetvalue(res, 0, 2), "%X/%X", &hi, &lo) != 2) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("could not parse WAL location \"%s\"", + PQgetvalue(res, 0, 2)))); + *server_lsn = ((uint64) hi) << 32 | lo; + } + PQclear(res); return primary_sysid; diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 2d7708805a6..f207a640a74 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -5717,7 +5717,7 @@ run_apply_worker(void) * We don't really use the output identify_system for anything but it does * some initializations on the upstream so let's still call it. */ - (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI); + (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI, NULL); set_apply_error_context_origin(originname); diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index fabe3c73034..1bb984060ac 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -54,6 +54,7 @@ #include "access/htup_details.h" #include "access/timeline.h" #include "access/transam.h" +#include "access/xlog.h" #include "access/xlog_internal.h" #include "access/xlogarchive.h" #include "access/xlogrecovery.h" @@ -161,6 +162,8 @@ WalReceiverMain(const void *startup_data, size_t startup_data_len) TimeLineID startpointTLI; TimeLineID primaryTLI; bool first_stream; + bool upstream_catchup_logged = false; + TimestampTz upstream_catchup_deadline = 0; WalRcvData *walrcv; TimestampTz now; char *err; @@ -309,13 +312,15 @@ WalReceiverMain(const void *startup_data, size_t startup_data_len) { char *primary_sysid; char standby_sysid[32]; + XLogRecPtr primaryFlushPtr; WalRcvStreamOptions options; /* * Check that we're connected to a valid server using the * IDENTIFY_SYSTEM replication command. */ - primary_sysid = walrcv_identify_system(wrconn, &primaryTLI); + primary_sysid = walrcv_identify_system(wrconn, &primaryTLI, + &primaryFlushPtr); snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT, GetSystemIdentifier()); @@ -338,6 +343,61 @@ WalReceiverMain(const void *startup_data, size_t startup_data_len) errmsg("highest timeline %u of the primary is behind recovery timeline %u", primaryTLI, startpointTLI))); + /* + * If our requested startpoint is ahead of the primary server's + * current WAL flush position, we cannot start streaming yet. This + * can happen when a cascading standby has advanced past the upstream + * via archive recovery. In this case, wait for the upstream to + * catch up before attempting START_REPLICATION, which would + * otherwise fail with "requested starting point is ahead of the WAL + * flush position". + * + * We only perform this check when we're on the same timeline as the + * primary; when timelines differ, let START_REPLICATION handle the + * timeline negotiation. + * + * We also only wait if the gap is within one WAL segment. This is + * the expected case because RequestXLogStreaming() truncates the + * start position to the segment boundary, producing at most a + * sub-segment gap. A larger gap means the upstream is genuinely + * behind, so we let START_REPLICATION fail normally and allow the + * startup process to fall back to other WAL sources. + * + * Honor wal_receiver_timeout so the walreceiver doesn't wait + * indefinitely: if the upstream hasn't caught up within the + * timeout, exit and let the startup process retry. + */ + if (startpointTLI == primaryTLI && startpoint > primaryFlushPtr && + startpoint - primaryFlushPtr <= wal_segment_size) + { + /* Set deadline on first iteration */ + if (!upstream_catchup_logged && wal_receiver_timeout > 0) + upstream_catchup_deadline = + TimestampTzPlusMilliseconds(GetCurrentTimestamp(), + wal_receiver_timeout); + + ereport(upstream_catchup_logged ? DEBUG1 : LOG, + errmsg("walreceiver requested start point %X/%08X on timeline %u is ahead of the primary server's flush position %X/%08X, waiting", + LSN_FORMAT_ARGS(startpoint), startpointTLI, + LSN_FORMAT_ARGS(primaryFlushPtr))); + upstream_catchup_logged = true; + + (void) WaitLatch(MyLatch, + WL_EXIT_ON_PM_DEATH | WL_TIMEOUT | WL_LATCH_SET, + wal_retrieve_retry_interval, + WAIT_EVENT_WAL_RECEIVER_UPSTREAM_CATCHUP); + ResetLatch(MyLatch); + + if (upstream_catchup_deadline > 0 && + GetCurrentTimestamp() >= upstream_catchup_deadline) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("terminating walreceiver due to timeout while waiting for upstream to catch up"))); + + CHECK_FOR_INTERRUPTS(); + continue; + } + /* * Get any missing history files. We do this always, even when we're * not interested in that timeline, so that if we're promoted to diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index 4aa864fe3c3..1f586ffe853 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -160,6 +160,7 @@ RESTORE_COMMAND "Waiting for <xref linkend="guc-restore-command"/> to complete." SAFE_SNAPSHOT "Waiting to obtain a valid snapshot for a <literal>READ ONLY DEFERRABLE</literal> transaction." SYNC_REP "Waiting for confirmation from a remote server during synchronous replication." WAL_RECEIVER_EXIT "Waiting for the WAL receiver to exit." +WAL_RECEIVER_UPSTREAM_CATCHUP "Waiting for upstream server WAL flush position to catch up to requested start point." WAL_RECEIVER_WAIT_START "Waiting for startup process to send initial data for streaming replication." WAL_SUMMARY_READY "Waiting for a new WAL summary to be generated." XACT_GROUP_UPDATE "Waiting for the group leader to update transaction status at transaction end." diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 9b9bd916314..9f8b5b8c72a 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -280,9 +280,12 @@ typedef void (*walrcv_get_senderinfo_fn) (WalReceiverConn *conn, * Run IDENTIFY_SYSTEM on the cluster connected to and validate the * identity of the cluster. Returns the system ID of the cluster * connected to. 'primary_tli' is the timeline ID of the sender. + * If 'server_lsn' is not NULL, it is set to the current WAL flush + * position of the sender. */ typedef char *(*walrcv_identify_system_fn) (WalReceiverConn *conn, - TimeLineID *primary_tli); + TimeLineID *primary_tli, + XLogRecPtr *server_lsn); /* * walrcv_get_dbname_from_conninfo_fn @@ -441,8 +444,8 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions; WalReceiverFunctions->walrcv_get_conninfo(conn) #define walrcv_get_senderinfo(conn, sender_host, sender_port) \ WalReceiverFunctions->walrcv_get_senderinfo(conn, sender_host, sender_port) -#define walrcv_identify_system(conn, primary_tli) \ - WalReceiverFunctions->walrcv_identify_system(conn, primary_tli) +#define walrcv_identify_system(conn, primary_tli, server_lsn) \ + WalReceiverFunctions->walrcv_identify_system(conn, primary_tli, server_lsn) #define walrcv_get_dbname_from_conninfo(conninfo) \ WalReceiverFunctions->walrcv_get_dbname_from_conninfo(conninfo) #define walrcv_server_version(conn) \ diff --git a/src/test/recovery/t/053_cascade_reconnect.pl b/src/test/recovery/t/053_cascade_reconnect.pl new file mode 100644 index 00000000000..232ea2208d8 --- /dev/null +++ b/src/test/recovery/t/053_cascade_reconnect.pl @@ -0,0 +1,157 @@ + +# Copyright (c) 2026, PostgreSQL Global Development Group + +# Test that a cascading standby can reconnect to its upstream standby after +# advancing past the upstream's WAL flush position via archive recovery. +# +# Setup: primary -> standby_a -> standby_b +# standby_b has both streaming (from standby_a) and restore_command +# (from primary's archive). +# +# When standby_a's walreceiver is stopped and standby_b falls back to +# archive recovery, standby_b may advance its recovery position past +# standby_a's replay position. Previously, standby_b's walreceiver +# would fail with "requested starting point is ahead of the WAL flush +# position" when reconnecting to standby_a. +use strict; +use warnings FATAL => 'all'; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +# Initialize primary with archiving +my $node_primary = PostgreSQL::Test::Cluster->new('primary'); +$node_primary->init(allows_streaming => 1, has_archiving => 1); +$node_primary->append_conf( + 'postgresql.conf', qq( +wal_keep_size = 128MB +checkpoint_timeout = 1h +)); +$node_primary->start; + +# Take backup and create standby_a (streaming from primary, no archive) +my $backup_name = 'my_backup'; +$node_primary->backup($backup_name); + +my $node_standby_a = PostgreSQL::Test::Cluster->new('standby_a'); +$node_standby_a->init_from_backup($node_primary, $backup_name, + has_streaming => 1); +$node_standby_a->start; + +# Wait for standby_a to start streaming +$node_primary->wait_for_catchup($node_standby_a); + +# Take backup from standby_a and create standby_b +# standby_b streams from standby_a AND restores from primary's archive +$node_standby_a->backup($backup_name); + +my $node_standby_b = PostgreSQL::Test::Cluster->new('standby_b'); +$node_standby_b->init_from_backup($node_standby_a, $backup_name, + has_streaming => 1); +$node_standby_b->enable_restoring($node_primary); +$node_standby_b->start; + +# Generate initial data and wait for full cascade replication +$node_primary->safe_psql('postgres', + "CREATE TABLE test_tab AS SELECT generate_series(1, 1000) AS id"); +$node_primary->wait_for_replay_catchup($node_standby_a); +$node_standby_a->wait_for_replay_catchup($node_standby_b, $node_primary); + +my $result = + $node_standby_b->safe_psql('postgres', "SELECT count(*) FROM test_tab"); +is($result, '1000', 'initial data replicated to cascading standby'); + +# Disconnect standby_a from primary by clearing primary_conninfo. +# This stops standby_a's walreceiver, so standby_a can no longer receive +# new WAL. Its GetStandbyFlushRecPtr() will return only replayPtr. +$node_standby_a->append_conf('postgresql.conf', "primary_conninfo = ''"); +$node_standby_a->reload; + +# Wait for standby_a's walreceiver to stop +$node_standby_a->poll_query_until('postgres', + "SELECT NOT EXISTS (SELECT 1 FROM pg_stat_wal_receiver)") + or die "Timed out waiting for standby_a walreceiver to stop"; + +# Stop standby_b cleanly. We'll restart it after generating new WAL +# so it enters the recovery state machine fresh and tries archive first. +$node_standby_b->stop; + +# Force a checkpoint now so that no background checkpoint can generate +# extra WAL during the INSERT below and push it across a segment boundary. +# Combined with checkpoint_timeout = 1h this ensures the new WAL fits +# within a single segment, keeping the gap within wal_segment_size. +$node_primary->safe_psql('postgres', "CHECKPOINT"); + +# Generate more WAL on primary +$node_primary->safe_psql('postgres', + "INSERT INTO test_tab SELECT generate_series(1001, 2000)"); + +# Force WAL switch and wait for archiving to complete, so that +# standby_b can find the new WAL in the archive when it starts. +my $walfile = $node_primary->safe_psql('postgres', + "SELECT pg_walfile_name(pg_current_wal_lsn())"); +$node_primary->safe_psql('postgres', "SELECT pg_switch_wal()"); +$node_primary->poll_query_until('postgres', + "SELECT '$walfile' <= last_archived_wal FROM pg_stat_archiver") + or die "Timed out waiting for WAL archiving"; + +# Rotate standby_b's log so we can check just the new log output +$node_standby_b->rotate_logfile; +my $standby_b_log_offset = -s $node_standby_b->logfile; + +# Start standby_b. It will: +# 1. Read new WAL from primary's archive (XLOG_FROM_ARCHIVE) +# 2. Advance RecPtr past standby_a's replay position +# 3. Try streaming from standby_a (XLOG_FROM_STREAM) +# 4. With the fix: walreceiver detects upstream is behind via +# IDENTIFY_SYSTEM and waits instead of failing +$node_standby_b->start; + +# Wait for standby_b to replay the new data from archive +$node_standby_b->poll_query_until('postgres', + "SELECT count(*) >= 2000 FROM test_tab") + or die "Timed out waiting for standby_b to replay archived WAL"; + +$result = + $node_standby_b->safe_psql('postgres', "SELECT count(*) FROM test_tab"); +is($result, '2000', 'cascading standby replayed new data from archive'); + +# Wait for walreceiver to hit the upstream-catchup wait event, proving we +# exercised the START_REPLICATION-ahead-of-upstream path. +$node_standby_b->wait_for_event('walreceiver', 'WalReceiverUpstreamCatchup'); + +# Verify no "requested starting point is ahead" errors occurred. +# Before the fix, standby_b's walreceiver would fail with this error +# when trying to reconnect to standby_a. +my $standby_b_log_contents = + PostgreSQL::Test::Utils::slurp_file($node_standby_b->logfile, + $standby_b_log_offset); +ok( $standby_b_log_contents !~ + m/requested starting point .* is ahead of the WAL flush position/, + 'no "ahead of flush position" errors in standby_b log'); + +# Now restore standby_a's streaming from primary so it can catch up +$node_standby_a->enable_streaming($node_primary); +$node_standby_a->reload; + +# Wait for standby_a to catch up with primary +$node_primary->wait_for_replay_catchup($node_standby_a); + +# standby_b's walreceiver should eventually connect to standby_a and +# resume streaming (once standby_a has caught up past standby_b's position) +$node_standby_a->poll_query_until('postgres', + "SELECT EXISTS (SELECT 1 FROM pg_stat_replication)") + or die "Timed out waiting for standby_b to reconnect to standby_a"; + +# Verify end-to-end cascade streaming works with new data +$node_primary->safe_psql('postgres', + "INSERT INTO test_tab SELECT generate_series(2001, 3000)"); +$node_primary->wait_for_replay_catchup($node_standby_a); +$node_standby_a->wait_for_replay_catchup($node_standby_b, $node_primary); + +$result = + $node_standby_b->safe_psql('postgres', "SELECT count(*) FROM test_tab"); +is($result, '3000', + 'cascade streaming resumes normally after upstream catches up'); + +done_testing(); -- 2.47.3
From 55507cadae44e4f01d9bed469a77516a2f714607 Mon Sep 17 00:00:00 2001 From: Marco Nenciarini <[email protected]> Date: Wed, 18 Mar 2026 10:30:00 +0100 Subject: [PATCH v5] Fix cascading standby reconnect failure after archive fallback A cascading standby could fail to reconnect to its upstream standby with "requested starting point ... is ahead of the WAL flush position" after falling back to archive recovery. This happened because RequestXLogStreaming() truncates the requested start position to the WAL segment boundary, which can place it ahead of the upstream's flush position reported by GetStandbyFlushRecPtr(). Fix by having the walreceiver check the upstream's current WAL flush position via IDENTIFY_SYSTEM before issuing START_REPLICATION. IDENTIFY_SYSTEM already returns this position (as xlogpos), but it was previously discarded. If the requested start point exceeds the upstream's flush position on the same timeline, the walreceiver waits for wal_retrieve_retry_interval and retries. The wait is limited to gaps of at most one WAL segment, which is the expected case from the segment-boundary truncation. Larger gaps indicate the upstream is genuinely behind, so START_REPLICATION is allowed to proceed (and fail) normally, letting the startup process fall back to other WAL sources. The first wait is logged at LOG level; subsequent waits are demoted to DEBUG1 to avoid log noise. The walreceiver honors wal_receiver_timeout during the wait, so it will exit if the upstream doesn't catch up in time. To preserve ABI compatibility on back branches, the flush position from IDENTIFY_SYSTEM is communicated via a new global variable (WalRcvIdentifySystemLsn) rather than changing the signature of walrcv_identify_system(). The bug was introduced in PG 9.3 by commit abfd192b1b5, which added a flush-position check in StartReplication() that rejects requests ahead of the server's WAL flush position. --- .../libpqwalreceiver/libpqwalreceiver.c | 14 ++ src/backend/replication/walreceiver.c | 70 +++++++- .../utils/activity/wait_event_names.txt | 1 + src/include/replication/walreceiver.h | 7 + src/test/recovery/t/053_cascade_reconnect.pl | 157 ++++++++++++++++++ 5 files changed, 248 insertions(+), 1 deletion(-) create mode 100644 src/test/recovery/t/053_cascade_reconnect.pl diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 9f04c9ed25d..a6024420394 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -452,6 +452,20 @@ libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli) PQntuples(res), PQnfields(res), 1, 3))); primary_sysid = pstrdup(PQgetvalue(res, 0, 0)); *primary_tli = pg_strtoint32(PQgetvalue(res, 0, 1)); + + /* Column 2 is the server's current WAL flush position */ + { + uint32 hi, + lo; + + if (sscanf(PQgetvalue(res, 0, 2), "%X/%X", &hi, &lo) != 2) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("could not parse WAL location \"%s\"", + PQgetvalue(res, 0, 2)))); + WalRcvIdentifySystemLsn = ((uint64) hi) << 32 | lo; + } + PQclear(res); return primary_sysid; diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index fabe3c73034..2cd0c8fac30 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -54,6 +54,7 @@ #include "access/htup_details.h" #include "access/timeline.h" #include "access/transam.h" +#include "access/xlog.h" #include "access/xlog_internal.h" #include "access/xlogarchive.h" #include "access/xlogrecovery.h" @@ -95,6 +96,12 @@ bool hot_standby_feedback; static WalReceiverConn *wrconn = NULL; WalReceiverFunctionsType *WalReceiverFunctions = NULL; +/* + * Server's WAL flush position from the last IDENTIFY_SYSTEM call. + * Written by libpqwalreceiver, read by walreceiver main loop. + */ +XLogRecPtr WalRcvIdentifySystemLsn = InvalidXLogRecPtr; + /* * These variables are used similarly to openLogFile/SegNo, * but for walreceiver to write the XLOG. recvFileTLI is the TimeLineID @@ -161,6 +168,8 @@ WalReceiverMain(const void *startup_data, size_t startup_data_len) TimeLineID startpointTLI; TimeLineID primaryTLI; bool first_stream; + bool upstream_catchup_logged = false; + TimestampTz upstream_catchup_deadline = 0; WalRcvData *walrcv; TimestampTz now; char *err; @@ -313,8 +322,10 @@ WalReceiverMain(const void *startup_data, size_t startup_data_len) /* * Check that we're connected to a valid server using the - * IDENTIFY_SYSTEM replication command. + * IDENTIFY_SYSTEM replication command. Reset the global LSN + * first so we don't act on a stale value if the call fails. */ + WalRcvIdentifySystemLsn = InvalidXLogRecPtr; primary_sysid = walrcv_identify_system(wrconn, &primaryTLI); snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT, @@ -338,6 +349,63 @@ WalReceiverMain(const void *startup_data, size_t startup_data_len) errmsg("highest timeline %u of the primary is behind recovery timeline %u", primaryTLI, startpointTLI))); + /* + * If our requested startpoint is ahead of the primary server's + * current WAL flush position, we cannot start streaming yet. This + * can happen when a cascading standby has advanced past the upstream + * via archive recovery. In this case, wait for the upstream to + * catch up before attempting START_REPLICATION, which would + * otherwise fail with "requested starting point is ahead of the WAL + * flush position". + * + * We only perform this check when we're on the same timeline as the + * primary; when timelines differ, let START_REPLICATION handle the + * timeline negotiation. + * + * We also only wait if the gap is within one WAL segment. This is + * the expected case because RequestXLogStreaming() truncates the + * start position to the segment boundary, producing at most a + * sub-segment gap. A larger gap means the upstream is genuinely + * behind, so we let START_REPLICATION fail normally and allow the + * startup process to fall back to other WAL sources. + * + * Honor wal_receiver_timeout so the walreceiver doesn't wait + * indefinitely: if the upstream hasn't caught up within the + * timeout, exit and let the startup process retry. + */ + if (startpointTLI == primaryTLI && + XLogRecPtrIsValid(WalRcvIdentifySystemLsn) && + startpoint > WalRcvIdentifySystemLsn && + startpoint - WalRcvIdentifySystemLsn <= wal_segment_size) + { + /* Set deadline on first iteration */ + if (!upstream_catchup_logged && wal_receiver_timeout > 0) + upstream_catchup_deadline = + TimestampTzPlusMilliseconds(GetCurrentTimestamp(), + wal_receiver_timeout); + + ereport(upstream_catchup_logged ? DEBUG1 : LOG, + errmsg("walreceiver requested start point %X/%08X on timeline %u is ahead of the primary server's flush position %X/%08X, waiting", + LSN_FORMAT_ARGS(startpoint), startpointTLI, + LSN_FORMAT_ARGS(WalRcvIdentifySystemLsn))); + upstream_catchup_logged = true; + + (void) WaitLatch(MyLatch, + WL_EXIT_ON_PM_DEATH | WL_TIMEOUT | WL_LATCH_SET, + wal_retrieve_retry_interval, + WAIT_EVENT_WAL_RECEIVER_UPSTREAM_CATCHUP); + ResetLatch(MyLatch); + + if (upstream_catchup_deadline > 0 && + GetCurrentTimestamp() >= upstream_catchup_deadline) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("terminating walreceiver due to timeout while waiting for upstream to catch up"))); + + CHECK_FOR_INTERRUPTS(); + continue; + } + /* * Get any missing history files. We do this always, even when we're * not interested in that timeline, so that if we're promoted to diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index 4aa864fe3c3..1f586ffe853 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -160,6 +160,7 @@ RESTORE_COMMAND "Waiting for <xref linkend="guc-restore-command"/> to complete." SAFE_SNAPSHOT "Waiting to obtain a valid snapshot for a <literal>READ ONLY DEFERRABLE</literal> transaction." SYNC_REP "Waiting for confirmation from a remote server during synchronous replication." WAL_RECEIVER_EXIT "Waiting for the WAL receiver to exit." +WAL_RECEIVER_UPSTREAM_CATCHUP "Waiting for upstream server WAL flush position to catch up to requested start point." WAL_RECEIVER_WAIT_START "Waiting for startup process to send initial data for streaming replication." WAL_SUMMARY_READY "Waiting for a new WAL summary to be generated." XACT_GROUP_UPDATE "Waiting for the group leader to update transaction status at transaction end." diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 9b9bd916314..daee2944973 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -165,6 +165,13 @@ typedef struct extern PGDLLIMPORT WalRcvData *WalRcv; +/* + * Server's WAL flush position as reported by the last IDENTIFY_SYSTEM call. + * Set by walrcv_identify_system(), used by the walreceiver to avoid + * requesting streaming from a point ahead of the upstream's flush position. + */ +extern PGDLLIMPORT XLogRecPtr WalRcvIdentifySystemLsn; + typedef struct { bool logical; /* True if this is logical replication stream, diff --git a/src/test/recovery/t/053_cascade_reconnect.pl b/src/test/recovery/t/053_cascade_reconnect.pl new file mode 100644 index 00000000000..232ea2208d8 --- /dev/null +++ b/src/test/recovery/t/053_cascade_reconnect.pl @@ -0,0 +1,157 @@ + +# Copyright (c) 2026, PostgreSQL Global Development Group + +# Test that a cascading standby can reconnect to its upstream standby after +# advancing past the upstream's WAL flush position via archive recovery. +# +# Setup: primary -> standby_a -> standby_b +# standby_b has both streaming (from standby_a) and restore_command +# (from primary's archive). +# +# When standby_a's walreceiver is stopped and standby_b falls back to +# archive recovery, standby_b may advance its recovery position past +# standby_a's replay position. Previously, standby_b's walreceiver +# would fail with "requested starting point is ahead of the WAL flush +# position" when reconnecting to standby_a. +use strict; +use warnings FATAL => 'all'; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +# Initialize primary with archiving +my $node_primary = PostgreSQL::Test::Cluster->new('primary'); +$node_primary->init(allows_streaming => 1, has_archiving => 1); +$node_primary->append_conf( + 'postgresql.conf', qq( +wal_keep_size = 128MB +checkpoint_timeout = 1h +)); +$node_primary->start; + +# Take backup and create standby_a (streaming from primary, no archive) +my $backup_name = 'my_backup'; +$node_primary->backup($backup_name); + +my $node_standby_a = PostgreSQL::Test::Cluster->new('standby_a'); +$node_standby_a->init_from_backup($node_primary, $backup_name, + has_streaming => 1); +$node_standby_a->start; + +# Wait for standby_a to start streaming +$node_primary->wait_for_catchup($node_standby_a); + +# Take backup from standby_a and create standby_b +# standby_b streams from standby_a AND restores from primary's archive +$node_standby_a->backup($backup_name); + +my $node_standby_b = PostgreSQL::Test::Cluster->new('standby_b'); +$node_standby_b->init_from_backup($node_standby_a, $backup_name, + has_streaming => 1); +$node_standby_b->enable_restoring($node_primary); +$node_standby_b->start; + +# Generate initial data and wait for full cascade replication +$node_primary->safe_psql('postgres', + "CREATE TABLE test_tab AS SELECT generate_series(1, 1000) AS id"); +$node_primary->wait_for_replay_catchup($node_standby_a); +$node_standby_a->wait_for_replay_catchup($node_standby_b, $node_primary); + +my $result = + $node_standby_b->safe_psql('postgres', "SELECT count(*) FROM test_tab"); +is($result, '1000', 'initial data replicated to cascading standby'); + +# Disconnect standby_a from primary by clearing primary_conninfo. +# This stops standby_a's walreceiver, so standby_a can no longer receive +# new WAL. Its GetStandbyFlushRecPtr() will return only replayPtr. +$node_standby_a->append_conf('postgresql.conf', "primary_conninfo = ''"); +$node_standby_a->reload; + +# Wait for standby_a's walreceiver to stop +$node_standby_a->poll_query_until('postgres', + "SELECT NOT EXISTS (SELECT 1 FROM pg_stat_wal_receiver)") + or die "Timed out waiting for standby_a walreceiver to stop"; + +# Stop standby_b cleanly. We'll restart it after generating new WAL +# so it enters the recovery state machine fresh and tries archive first. +$node_standby_b->stop; + +# Force a checkpoint now so that no background checkpoint can generate +# extra WAL during the INSERT below and push it across a segment boundary. +# Combined with checkpoint_timeout = 1h this ensures the new WAL fits +# within a single segment, keeping the gap within wal_segment_size. +$node_primary->safe_psql('postgres', "CHECKPOINT"); + +# Generate more WAL on primary +$node_primary->safe_psql('postgres', + "INSERT INTO test_tab SELECT generate_series(1001, 2000)"); + +# Force WAL switch and wait for archiving to complete, so that +# standby_b can find the new WAL in the archive when it starts. +my $walfile = $node_primary->safe_psql('postgres', + "SELECT pg_walfile_name(pg_current_wal_lsn())"); +$node_primary->safe_psql('postgres', "SELECT pg_switch_wal()"); +$node_primary->poll_query_until('postgres', + "SELECT '$walfile' <= last_archived_wal FROM pg_stat_archiver") + or die "Timed out waiting for WAL archiving"; + +# Rotate standby_b's log so we can check just the new log output +$node_standby_b->rotate_logfile; +my $standby_b_log_offset = -s $node_standby_b->logfile; + +# Start standby_b. It will: +# 1. Read new WAL from primary's archive (XLOG_FROM_ARCHIVE) +# 2. Advance RecPtr past standby_a's replay position +# 3. Try streaming from standby_a (XLOG_FROM_STREAM) +# 4. With the fix: walreceiver detects upstream is behind via +# IDENTIFY_SYSTEM and waits instead of failing +$node_standby_b->start; + +# Wait for standby_b to replay the new data from archive +$node_standby_b->poll_query_until('postgres', + "SELECT count(*) >= 2000 FROM test_tab") + or die "Timed out waiting for standby_b to replay archived WAL"; + +$result = + $node_standby_b->safe_psql('postgres', "SELECT count(*) FROM test_tab"); +is($result, '2000', 'cascading standby replayed new data from archive'); + +# Wait for walreceiver to hit the upstream-catchup wait event, proving we +# exercised the START_REPLICATION-ahead-of-upstream path. +$node_standby_b->wait_for_event('walreceiver', 'WalReceiverUpstreamCatchup'); + +# Verify no "requested starting point is ahead" errors occurred. +# Before the fix, standby_b's walreceiver would fail with this error +# when trying to reconnect to standby_a. +my $standby_b_log_contents = + PostgreSQL::Test::Utils::slurp_file($node_standby_b->logfile, + $standby_b_log_offset); +ok( $standby_b_log_contents !~ + m/requested starting point .* is ahead of the WAL flush position/, + 'no "ahead of flush position" errors in standby_b log'); + +# Now restore standby_a's streaming from primary so it can catch up +$node_standby_a->enable_streaming($node_primary); +$node_standby_a->reload; + +# Wait for standby_a to catch up with primary +$node_primary->wait_for_replay_catchup($node_standby_a); + +# standby_b's walreceiver should eventually connect to standby_a and +# resume streaming (once standby_a has caught up past standby_b's position) +$node_standby_a->poll_query_until('postgres', + "SELECT EXISTS (SELECT 1 FROM pg_stat_replication)") + or die "Timed out waiting for standby_b to reconnect to standby_a"; + +# Verify end-to-end cascade streaming works with new data +$node_primary->safe_psql('postgres', + "INSERT INTO test_tab SELECT generate_series(2001, 3000)"); +$node_primary->wait_for_replay_catchup($node_standby_a); +$node_standby_a->wait_for_replay_catchup($node_standby_b, $node_primary); + +$result = + $node_standby_b->safe_psql('postgres', "SELECT count(*) FROM test_tab"); +is($result, '3000', + 'cascade streaming resumes normally after upstream catches up'); + +done_testing(); -- 2.47.3
