Attached is a v2 patch that implements the "handshake clamp" approach Xuneng suggested. Rather than tracking lastStreamedFlush in process-local state (which doesn't survive a cascade restart, as Fujii-san demonstrated), it uses the WAL flush position already returned by IDENTIFY_SYSTEM.
The walreceiver now checks the upstream's flush position before issuing START_REPLICATION. If the requested startpoint is ahead (on the same timeline), it waits for wal_retrieve_retry_interval and retries. This works across restarts since it queries the upstream's live position on every connection attempt, and requires no new state variables. When timelines differ, we let START_REPLICATION handle the timeline negotiation as before. The patch includes a TAP test (053_cascade_reconnect.pl) that reproduces the scenario and verifies the fix.
From d4ee8752267e91a4ca10ef01c2a0f4e458edb2be Mon Sep 17 00:00:00 2001 From: Marco Nenciarini <[email protected]> Date: Mon, 16 Mar 2026 18:37:31 +0100 Subject: [PATCH] Fix cascading standby reconnect failure after archive fallback A cascading standby could fail to reconnect to its upstream standby with "requested starting point ... is ahead of the WAL flush position" after falling back to archive recovery. This happened because the walreceiver requests streaming from RecPtr, which can advance past the upstream's flush position when WAL is restored from archive. Fix by having the walreceiver check the upstream's current WAL flush position via IDENTIFY_SYSTEM before issuing START_REPLICATION. IDENTIFY_SYSTEM already returns this position (as xlogpos), but walrcv_identify_system() previously discarded it. If the requested start point exceeds the upstream's flush position on the same timeline, the walreceiver waits for wal_retrieve_retry_interval and retries. The bug was introduced in PG 9.3 by commit abfd192b1b5, which added a flush-position check in StartReplication() that rejects requests ahead of the server's WAL flush position. --- .../libpqwalreceiver/libpqwalreceiver.c | 21 ++- src/backend/replication/logical/worker.c | 2 +- src/backend/replication/walreceiver.c | 34 ++++- .../utils/activity/wait_event_names.txt | 1 + src/include/replication/walreceiver.h | 9 +- src/test/recovery/t/053_cascade_reconnect.pl | 143 ++++++++++++++++++ 6 files changed, 203 insertions(+), 7 deletions(-) create mode 100644 src/test/recovery/t/053_cascade_reconnect.pl diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 9f04c9ed25d..0e1c95180b2 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -63,7 +63,8 @@ static char *libpqrcv_get_conninfo(WalReceiverConn *conn); static void libpqrcv_get_senderinfo(WalReceiverConn *conn, char **sender_host, int *sender_port); static char *libpqrcv_identify_system(WalReceiverConn *conn, - TimeLineID *primary_tli); + TimeLineID *primary_tli, + XLogRecPtr *server_lsn); static char *libpqrcv_get_dbname_from_conninfo(const char *connInfo); static char *libpqrcv_get_option_from_conninfo(const char *connInfo, const char *keyword); @@ -421,7 +422,8 @@ libpqrcv_get_senderinfo(WalReceiverConn *conn, char **sender_host, * timeline ID of the primary. */ static char * -libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli) +libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli, + XLogRecPtr *server_lsn) { PGresult *res; char *primary_sysid; @@ -452,6 +454,21 @@ libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli) PQntuples(res), PQnfields(res), 1, 3))); primary_sysid = pstrdup(PQgetvalue(res, 0, 0)); *primary_tli = pg_strtoint32(PQgetvalue(res, 0, 1)); + + /* Column 2 is the server's current WAL flush position */ + if (server_lsn) + { + uint32 hi, + lo; + + if (sscanf(PQgetvalue(res, 0, 2), "%X/%X", &hi, &lo) != 2) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("could not parse WAL location \"%s\"", + PQgetvalue(res, 0, 2)))); + *server_lsn = ((uint64) hi) << 32 | lo; + } + PQclear(res); return primary_sysid; diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 2d7708805a6..f207a640a74 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -5717,7 +5717,7 @@ run_apply_worker(void) * We don't really use the output identify_system for anything but it does * some initializations on the upstream so let's still call it. */ - (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI); + (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI, NULL); set_apply_error_context_origin(originname); diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index fabe3c73034..057759498a0 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -54,6 +54,7 @@ #include "access/htup_details.h" #include "access/timeline.h" #include "access/transam.h" +#include "access/xlog.h" #include "access/xlog_internal.h" #include "access/xlogarchive.h" #include "access/xlogrecovery.h" @@ -309,13 +310,15 @@ WalReceiverMain(const void *startup_data, size_t startup_data_len) { char *primary_sysid; char standby_sysid[32]; + XLogRecPtr primaryFlushPtr; WalRcvStreamOptions options; /* * Check that we're connected to a valid server using the * IDENTIFY_SYSTEM replication command. */ - primary_sysid = walrcv_identify_system(wrconn, &primaryTLI); + primary_sysid = walrcv_identify_system(wrconn, &primaryTLI, + &primaryFlushPtr); snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT, GetSystemIdentifier()); @@ -338,6 +341,35 @@ WalReceiverMain(const void *startup_data, size_t startup_data_len) errmsg("highest timeline %u of the primary is behind recovery timeline %u", primaryTLI, startpointTLI))); + /* + * If our requested startpoint is ahead of the primary server's + * current WAL flush position, we cannot start streaming yet. This + * can happen when a cascading standby has advanced past the upstream + * via archive recovery. In this case, wait for the upstream to + * catch up before attempting START_REPLICATION, which would + * otherwise fail with "requested starting point is ahead of the WAL + * flush position". + * + * We only perform this check when we're on the same timeline as the + * primary; when timelines differ, let START_REPLICATION handle the + * timeline negotiation. + */ + if (startpointTLI == primaryTLI && startpoint > primaryFlushPtr) + { + ereport(LOG, + errmsg("walreceiver requested start point %X/%08X on timeline %u is ahead of the primary server's flush position %X/%08X, waiting", + LSN_FORMAT_ARGS(startpoint), startpointTLI, + LSN_FORMAT_ARGS(primaryFlushPtr))); + + (void) WaitLatch(MyLatch, + WL_EXIT_ON_PM_DEATH | WL_TIMEOUT | WL_LATCH_SET, + wal_retrieve_retry_interval, + WAIT_EVENT_WAL_RECEIVER_UPSTREAM_CATCHUP); + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + continue; + } + /* * Get any missing history files. We do this always, even when we're * not interested in that timeline, so that if we're promoted to diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index 4aa864fe3c3..5d7f97015db 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -161,6 +161,7 @@ SAFE_SNAPSHOT "Waiting to obtain a valid snapshot for a <literal>READ ONLY DEFER SYNC_REP "Waiting for confirmation from a remote server during synchronous replication." WAL_RECEIVER_EXIT "Waiting for the WAL receiver to exit." WAL_RECEIVER_WAIT_START "Waiting for startup process to send initial data for streaming replication." +WAL_RECEIVER_UPSTREAM_CATCHUP "Waiting for upstream server WAL flush position to catch up to requested start point." WAL_SUMMARY_READY "Waiting for a new WAL summary to be generated." XACT_GROUP_UPDATE "Waiting for the group leader to update transaction status at transaction end." diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 9b9bd916314..9f8b5b8c72a 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -280,9 +280,12 @@ typedef void (*walrcv_get_senderinfo_fn) (WalReceiverConn *conn, * Run IDENTIFY_SYSTEM on the cluster connected to and validate the * identity of the cluster. Returns the system ID of the cluster * connected to. 'primary_tli' is the timeline ID of the sender. + * If 'server_lsn' is not NULL, it is set to the current WAL flush + * position of the sender. */ typedef char *(*walrcv_identify_system_fn) (WalReceiverConn *conn, - TimeLineID *primary_tli); + TimeLineID *primary_tli, + XLogRecPtr *server_lsn); /* * walrcv_get_dbname_from_conninfo_fn @@ -441,8 +444,8 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions; WalReceiverFunctions->walrcv_get_conninfo(conn) #define walrcv_get_senderinfo(conn, sender_host, sender_port) \ WalReceiverFunctions->walrcv_get_senderinfo(conn, sender_host, sender_port) -#define walrcv_identify_system(conn, primary_tli) \ - WalReceiverFunctions->walrcv_identify_system(conn, primary_tli) +#define walrcv_identify_system(conn, primary_tli, server_lsn) \ + WalReceiverFunctions->walrcv_identify_system(conn, primary_tli, server_lsn) #define walrcv_get_dbname_from_conninfo(conninfo) \ WalReceiverFunctions->walrcv_get_dbname_from_conninfo(conninfo) #define walrcv_server_version(conn) \ diff --git a/src/test/recovery/t/053_cascade_reconnect.pl b/src/test/recovery/t/053_cascade_reconnect.pl new file mode 100644 index 00000000000..54db26b709c --- /dev/null +++ b/src/test/recovery/t/053_cascade_reconnect.pl @@ -0,0 +1,143 @@ + +# Copyright (c) 2026, PostgreSQL Global Development Group + +# Test that a cascading standby can reconnect to its upstream standby after +# advancing past the upstream's WAL flush position via archive recovery. +# +# Setup: primary -> standby_a -> standby_b +# standby_b has both streaming (from standby_a) and restore_command +# (from primary's archive). +# +# When standby_a's walreceiver is stopped and standby_b falls back to +# archive recovery, standby_b may advance its recovery position past +# standby_a's replay position. Previously, standby_b's walreceiver +# would fail with "requested starting point is ahead of the WAL flush +# position" when reconnecting to standby_a. +use strict; +use warnings FATAL => 'all'; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +# Initialize primary with archiving +my $node_primary = PostgreSQL::Test::Cluster->new('primary'); +$node_primary->init(allows_streaming => 1, has_archiving => 1); +$node_primary->append_conf( + 'postgresql.conf', qq( +wal_keep_size = 128MB +)); +$node_primary->start; + +# Take backup and create standby_a (streaming from primary, no archive) +my $backup_name = 'my_backup'; +$node_primary->backup($backup_name); + +my $node_standby_a = PostgreSQL::Test::Cluster->new('standby_a'); +$node_standby_a->init_from_backup($node_primary, $backup_name, + has_streaming => 1); +$node_standby_a->start; + +# Wait for standby_a to start streaming +$node_primary->wait_for_catchup($node_standby_a); + +# Take backup from standby_a and create standby_b +# standby_b streams from standby_a AND restores from primary's archive +$node_standby_a->backup($backup_name); + +my $node_standby_b = PostgreSQL::Test::Cluster->new('standby_b'); +$node_standby_b->init_from_backup($node_standby_a, $backup_name, + has_streaming => 1); +$node_standby_b->enable_restoring($node_primary); +$node_standby_b->start; + +# Generate initial data and wait for full cascade replication +$node_primary->safe_psql('postgres', + "CREATE TABLE test_tab AS SELECT generate_series(1, 1000) AS id"); +$node_primary->wait_for_replay_catchup($node_standby_a); +$node_standby_a->wait_for_replay_catchup($node_standby_b, $node_primary); + +my $result = $node_standby_b->safe_psql('postgres', + "SELECT count(*) FROM test_tab"); +is($result, '1000', 'initial data replicated to cascading standby'); + +# Disconnect standby_a from primary by clearing primary_conninfo. +# This stops standby_a's walreceiver, so standby_a can no longer receive +# new WAL. Its GetStandbyFlushRecPtr() will return only replayPtr. +$node_standby_a->append_conf('postgresql.conf', "primary_conninfo = ''"); +$node_standby_a->reload; + +# Wait for standby_a's walreceiver to stop +$node_standby_a->poll_query_until('postgres', + "SELECT NOT EXISTS (SELECT 1 FROM pg_stat_wal_receiver)") + or die "Timed out waiting for standby_a walreceiver to stop"; + +# Stop standby_b cleanly. We'll restart it after generating new WAL +# so it enters the recovery state machine fresh and tries archive first. +$node_standby_b->stop; + +# Generate more WAL on primary +$node_primary->safe_psql('postgres', + "INSERT INTO test_tab SELECT generate_series(1001, 2000)"); + +# Force WAL switch and wait for archiving to complete, so that +# standby_b can find the new WAL in the archive when it starts. +my $walfile = $node_primary->safe_psql('postgres', + "SELECT pg_walfile_name(pg_current_wal_lsn())"); +$node_primary->safe_psql('postgres', "SELECT pg_switch_wal()"); +$node_primary->poll_query_until('postgres', + "SELECT '$walfile' <= last_archived_wal FROM pg_stat_archiver") + or die "Timed out waiting for WAL archiving"; + +# Rotate standby_b's log so we can check just the new log output +$node_standby_b->rotate_logfile; + +# Start standby_b. It will: +# 1. Read new WAL from primary's archive (XLOG_FROM_ARCHIVE) +# 2. Advance RecPtr past standby_a's replay position +# 3. Try streaming from standby_a (XLOG_FROM_STREAM) +# 4. With the fix: walreceiver detects upstream is behind via +# IDENTIFY_SYSTEM and waits instead of failing +$node_standby_b->start; + +# Wait for standby_b to replay the new data from archive +$node_standby_b->poll_query_until('postgres', + "SELECT count(*) >= 2000 FROM test_tab") + or die "Timed out waiting for standby_b to replay archived WAL"; + +$result = $node_standby_b->safe_psql('postgres', + "SELECT count(*) FROM test_tab"); +is($result, '2000', + 'cascading standby replayed new data from archive'); + +# Verify no "requested starting point is ahead" errors occurred. +# Before the fix, standby_b's walreceiver would fail with this error +# when trying to reconnect to standby_a. +ok( !$node_standby_b->log_contains( + "requested starting point .* is ahead of the WAL flush position"), + 'no "ahead of flush position" errors in standby_b log'); + +# Now restore standby_a's streaming from primary so it can catch up +$node_standby_a->enable_streaming($node_primary); +$node_standby_a->reload; + +# Wait for standby_a to catch up with primary +$node_primary->wait_for_replay_catchup($node_standby_a); + +# standby_b's walreceiver should eventually connect to standby_a and +# resume streaming (once standby_a has caught up past standby_b's position) +$node_standby_a->poll_query_until('postgres', + "SELECT EXISTS (SELECT 1 FROM pg_stat_replication)") + or die "Timed out waiting for standby_b to reconnect to standby_a"; + +# Verify end-to-end cascade streaming works with new data +$node_primary->safe_psql('postgres', + "INSERT INTO test_tab SELECT generate_series(2001, 3000)"); +$node_primary->wait_for_replay_catchup($node_standby_a); +$node_standby_a->wait_for_replay_catchup($node_standby_b, $node_primary); + +$result = $node_standby_b->safe_psql('postgres', + "SELECT count(*) FROM test_tab"); +is($result, '3000', + 'cascade streaming resumes normally after upstream catches up'); + +done_testing(); -- 2.47.3
