Dear pgsql-hackers,
I am also interested in solving this problem, so I suggest a patch which
is based on Hayato's work shared earlier.
The problem we are solving is that the logical walsender processes currently
do not allow postgres to shut down until receiver side confirms the flush of
all data. In case of logical replication, this is not necessary. This
can lead
to an undesirable shutdown delay if, for example, apply worker is
waiting for
any locks to be released.
I agree with the opinion that the default behavior of the system should
not be
changed, as some clients may rely on the current behavior. But instead of
the START_REPLICATION parameter I propose a GUC parameter on the sender that
controls the walsender shutdown mode for all logical walsenders.the First,
the START_REPLICATION parameter places responsibility for choosing the
sender’s
shutdown semantics on the receiver side. Second, per-subscriber settings
do not
solve the problematic operational case where many walsenders exist: if
even one
of N walsender processes remains configured non-immediate, the publisher can
still be blocked. In other words, setting immediate for most subscribers but
missing one does not fix the global inability to shut down.
I also attach a tap test that reproduces the apply-worker's waiting for the
release of lock and the successful shutdown of publisher in immediate
walsender
shutdown mode.
Best Regards,
Andrey
From c82757f6aa01758b39814f4168a207174ce95957 Mon Sep 17 00:00:00 2001
From: "a.silitskiy" <[email protected]>
Date: Mon, 20 Oct 2025 13:38:52 +0700
Subject: [PATCH] Introduce a new GUC 'logical_wal_sender_shutdown_mode'.
Previously, at shutdown, walsender processes were waiting to send all pending data
and ensure the all data is flushed in remote node. This mechanism was added for
supporting clean switch over, but such use-case cannot be supported for logical
replication.
New guc allows to change shutdown mode of logical walsenders without changing
default behavior.
The shutdown modes are:
1) 'wait_flush' (the default). In this mode, the walsender will wait for all
WALs to be flushed on the subscriber side, before exiting the process.
2) 'immediate'. In this mode, the walsender will exit without confirming the
remote flush. This may break the consistency between publisher and subscriber.
This mode might be useful for a system that has a high-latency network (to
reduce the amount of time for shutdown), or to allow the shutdown of the
publisher even when the worker is stuck.
Author: Andrey Silitskiy
Co-authored by: Hayato Kuroda
Discussion: https://postgr.es/m/TYAPR01MB586668E50FC2447AD7F92491F5E89%40TYAPR01MB5866.jpnprd01.prod.outlook.com
---
doc/src/sgml/config.sgml | 27 ++++++++
src/backend/replication/walsender.c | 46 +++++++++++++
src/backend/utils/misc/guc_parameters.dat | 7 ++
src/backend/utils/misc/guc_tables.c | 6 ++
src/backend/utils/misc/postgresql.conf.sample | 1 +
src/include/replication/walsender.h | 7 ++
.../t/050_walsnd_immediate_shutdown.pl | 66 +++++++++++++++++++
7 files changed, 160 insertions(+)
create mode 100644 src/test/recovery/t/050_walsnd_immediate_shutdown.pl
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index df1c3eaaa58..c058796b0cf 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -4702,6 +4702,33 @@ restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"' # Windows
</listitem>
</varlistentry>
+ <varlistentry id="guc-logical_wal_sender_shutdown_mode" xreflabel="logical_wal_sender_shutdown_mode">
+ <term><varname>logical_wal_sender_shutdown_mode</varname> (<type>enum</type>)
+ <indexterm>
+ <primary><varname>logical_wal_sender_shutdown_mode</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Specifies the mode in which logical walsender process will terminate
+ after receival of shutdown request. Valid values are
+ <literal>wait_flush</literal> and <literal>immediate</literal>.
+ Default value is <literal>wait_flush</literal>.
+ </para>
+ <para>
+ In <literal>wait_flush</literal> mode, the walsender will wait for all
+ WALs to be flushed on the subscriber side, before exiting the process.
+ </para>
+ <para>
+ In <literal>immediate</literal> mode, the walsender will exit without waiting
+ for data replication to the subscriber. This may break the consistency between
+ publisher and subscriber. This mode might be useful for a system that has a
+ high-latency network (to reduce the amount of time for shutdown), or to allow
+ the shutdown of the publisher even when the worker is stuck.
+ </para>
+ </listitem>
+ </varlistentry>
+
<varlistentry id="guc-synchronized-standby-slots" xreflabel="synchronized_standby_slots">
<term><varname>synchronized_standby_slots</varname> (<type>string</type>)
<indexterm>
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index fc8f8559073..5440c1672d2 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -130,6 +130,9 @@ int max_wal_senders = 10; /* the maximum number of concurrent
* walsenders */
int wal_sender_timeout = 60 * 1000; /* maximum time to send one WAL
* data message */
+
+int logical_wal_sender_shutdown_mode = WALSND_SHUTDOWN_MODE_WAIT_FLUSH;
+
bool log_replication_commands = false;
/*
@@ -262,6 +265,7 @@ static void WalSndKill(int code, Datum arg);
pg_noreturn static void WalSndShutdown(void);
static void XLogSendPhysical(void);
static void XLogSendLogical(void);
+pg_noreturn static void LogicalWalSndDoneImmediate(void);
static void WalSndDone(WalSndSendDataCallback send_data);
static void IdentifySystem(void);
static void UploadManifest(void);
@@ -1650,6 +1654,11 @@ ProcessPendingWrites(void)
/* Try to flush pending output to the client */
if (pq_flush_if_writable() != 0)
WalSndShutdown();
+
+ /* If we got shut down request in immediate shutdown mode, exit the process */
+ if ((got_STOPPING || got_SIGUSR2) &&
+ logical_wal_sender_shutdown_mode == WALSND_SHUTDOWN_MODE_IMMEDIATE)
+ LogicalWalSndDoneImmediate();
}
/* reactivate latch so WalSndLoop knows to continue */
@@ -2927,6 +2936,15 @@ WalSndLoop(WalSndSendDataCallback send_data)
if (pq_flush_if_writable() != 0)
WalSndShutdown();
+ /*
+ * When immediate shutdown of logical walsender is requested, we do not
+ * wait for successfull sending of all data.
+ */
+ if ((got_STOPPING || got_SIGUSR2) &&
+ send_data == XLogSendLogical &&
+ logical_wal_sender_shutdown_mode == WALSND_SHUTDOWN_MODE_IMMEDIATE)
+ LogicalWalSndDoneImmediate();
+
/* If nothing remains to be sent right now ... */
if (WalSndCaughtUp && !pq_is_send_pending())
{
@@ -3575,6 +3593,34 @@ XLogSendLogical(void)
}
}
+/*
+ * Shutdown logical walsender in immediate mode.
+ *
+ * NB: This should only be called when immediate shutdown of logical walsender
+ * was requested and shutdown signal has been received from postmaster.
+ */
+static void
+LogicalWalSndDoneImmediate()
+{
+ QueryCompletion qc;
+
+ /* Try to inform subsriber that XLOG streaming is done */
+ SetQueryCompletion(&qc, CMDTAG_COPY, 0);
+ EndCommand(&qc, DestRemote, false);
+
+ /*
+ * Note that the output buffer may be full during immediate shutdown of
+ * logical replication walsender. If pq_flush() is called at that time,
+ * the walsender process will be stuck. Therefore, call pq_flush_if_writable()
+ * instead. Receival of done message by subscriber in immediate shutdown mode is
+ * not guaranteed.
+ */
+ pq_flush_if_writable();
+
+ proc_exit(0);
+ abort();
+}
+
/*
* Shutdown if the sender is caught up.
*
diff --git a/src/backend/utils/misc/guc_parameters.dat b/src/backend/utils/misc/guc_parameters.dat
index 1128167c025..27e8c19a78d 100644
--- a/src/backend/utils/misc/guc_parameters.dat
+++ b/src/backend/utils/misc/guc_parameters.dat
@@ -1833,6 +1833,13 @@
max => 'MAX_KILOBYTES',
},
+{ name => 'logical_wal_sender_shutdown_mode', type => 'enum', context => 'PGC_SIGHUP', group => 'REPLICATION_SENDING',
+ short_desc => 'Sets the mode in which logical walsender will be terminated after shutdown request.',
+ variable => 'logical_wal_sender_shutdown_mode',
+ boot_val => 'WALSND_SHUTDOWN_MODE_WAIT_FLUSH',
+ options => 'logical_wal_sender_shutdown_mode_options',
+},
+
{ name => 'maintenance_io_concurrency', type => 'int', context => 'PGC_USERSET', group => 'RESOURCES_IO',
short_desc => 'A variant of "effective_io_concurrency" that is used for maintenance work.',
long_desc => '0 disables simultaneous requests.',
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index 0209b2067a2..81f17850aa2 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -335,6 +335,12 @@ static const struct config_enum_entry constraint_exclusion_options[] = {
{NULL, 0, false}
};
+static const struct config_enum_entry logical_wal_sender_shutdown_mode_options[] = {
+ {"wait_flush", WALSND_SHUTDOWN_MODE_WAIT_FLUSH, false},
+ {"immediate", WALSND_SHUTDOWN_MODE_IMMEDIATE, false},
+ {NULL, 0, false}
+};
+
/*
* Although only "on", "off", "remote_apply", "remote_write", and "local" are
* documented, we accept all the likely variants of "on" and "off".
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 6268c175298..66285974f32 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -346,6 +346,7 @@
#wal_sender_timeout = 60s # in milliseconds; 0 disables
#track_commit_timestamp = off # collect timestamp of transaction commit
# (change requires restart)
+#logical_wal_sender_shutdown_mode = wait_flush
# - Primary Server -
diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h
index c3e8e191339..01956ebce33 100644
--- a/src/include/replication/walsender.h
+++ b/src/include/replication/walsender.h
@@ -24,6 +24,12 @@ typedef enum
CRS_USE_SNAPSHOT,
} CRSSnapshotAction;
+typedef enum
+{
+ WALSND_SHUTDOWN_MODE_WAIT_FLUSH = 0,
+ WALSND_SHUTDOWN_MODE_IMMEDIATE
+} LogicalWalSndShutdownMode;
+
/* global state */
extern PGDLLIMPORT bool am_walsender;
extern PGDLLIMPORT bool am_cascading_walsender;
@@ -33,6 +39,7 @@ extern PGDLLIMPORT bool wake_wal_senders;
/* user-settable parameters */
extern PGDLLIMPORT int max_wal_senders;
extern PGDLLIMPORT int wal_sender_timeout;
+extern PGDLLIMPORT int logical_wal_sender_shutdown_mode;
extern PGDLLIMPORT bool log_replication_commands;
extern void InitWalSender(void);
diff --git a/src/test/recovery/t/050_walsnd_immediate_shutdown.pl b/src/test/recovery/t/050_walsnd_immediate_shutdown.pl
new file mode 100644
index 00000000000..31415d3bed1
--- /dev/null
+++ b/src/test/recovery/t/050_walsnd_immediate_shutdown.pl
@@ -0,0 +1,66 @@
+# Checks that publisher is able to shut down without
+# waiting for sending of all pending data to subscriber
+# with logical_wal_sender_shutdown_mode = immediate
+
+use strict;
+use warnings FATAL => 'all';
+
+use PostgreSQL::Test::Cluster;
+use Test::More;
+
+my $out;
+
+# create publisher
+my $publisher = PostgreSQL::Test::Cluster->new('publisher');
+$publisher->init(allows_streaming => 'logical');
+# set logical_wal_sender_shutdown_mode GUC parameter to immediate
+$publisher->append_conf('postgresql.conf',
+ 'wal_sender_timeout = 0
+ logical_wal_sender_shutdown_mode = immediate');
+$publisher->start();
+
+# create subscriber
+my $subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$subscriber->init();
+$subscriber->start();
+
+# create publication for test table
+$publisher->safe_psql('postgres', q{
+ CREATE TABLE pub_test (id int PRIMARY KEY);
+ CREATE PUBLICATION pub_all FOR TABLE pub_test;
+});
+
+# create matching table on subscriber
+$subscriber->safe_psql('postgres', q{
+ CREATE TABLE pub_test (id int PRIMARY KEY);
+});
+
+# form connection string to publisher
+my $pub_connstr = $publisher->connstr;
+
+# create the subscription on subscriber
+$subscriber->safe_psql('postgres', qq{
+ CREATE SUBSCRIPTION sub_all
+ CONNECTION '$pub_connstr'
+ PUBLICATION pub_all;
+});
+
+# Wait for initial sync to finish
+$subscriber->wait_for_subscription_sync($publisher, 'sub_all');
+
+# create background psql session
+my $bpgsql = $subscriber->background_psql('postgres', on_error_stop => 0);
+
+# start transaction on subscriber to hold locks
+$bpgsql->query_safe(
+ "BEGIN; INSERT INTO pub_test VALUES (1), (2), (3);"
+);
+
+# run concurrent transaction on publisher and commit
+$out = $publisher->safe_psql('postgres', 'BEGIN; INSERT INTO pub_test VALUES (1), (2), (3); COMMIT;');
+ok($out eq "", "Concurrent transaction was committed on publisher");
+
+# shutdown publisher
+$publisher->stop('fast');
+
+done_testing();
--
2.34.1