On Thu, May 28, 2026 at 2:09 AM Chao Li <[email protected]> wrote:
>
> Hi,
>
> While testing “Toggle logical decoding dynamically based on logical slot 
> presence”, I hit an assertion failure with concurrent logical slot creation.
>
> This is a repo:
>
> 1. In session 1, attach the injection point locally and start creating a 
> logical slot. The session blocks at logical-decoding-activation:
> ```
> evantest=# set application_name = 'slot_a';
> SET
> evantest=# select injection_points_set_local();
>  injection_points_set_local
> ----------------------------
>
> (1 row)
> evantest=# select injection_points_attach('logical-decoding-activation', 
> 'wait');
>  injection_points_attach
> -------------------------
>
> (1 row)
> evantest=# select pg_create_logical_replication_slot('slot_a', 'pgoutput');
> ```
>
> 2. In session 2, create another logical slot. This succeeds, and 
> effective_wal_level becomes logical:
> ```
> evantest=# select pg_create_logical_replication_slot('slot_b', 'pgoutput');
>  pg_create_logical_replication_slot
> ------------------------------------
>  (slot_b,0/0902E418)
> (1 row)
>
> evantest=# show effective_wal_level;
>  effective_wal_level
> ---------------------
>  logical
> (1 row)
> ```
>
> 3. In session 2, cancel session 1 instead of waking it up:
> ```
> evantest=# select pg_cancel_backend(pid) from pg_stat_activity where 
> application_name = 'slot_a';
>  pg_cancel_backend
> -------------------
>  t
> (1 row)
> ```
>
> Then the server hits this assertion:

Thank you for the report! I've confirmed the problem.

>
> I might be over thinking, but I just feel the safest fix is to make 
> EnableLogicalDecoding() serialize. I tried serializing with 
> LogicalDecodingControlLock and with a separate lock, but both approaches got 
> deadlock around the barrier wait. I ended up with adding an 
> activation_in_progress flag in shared memory, protected by 
> LogicalDecodingControlLock, with a condition variable to wait for the active 
> activation to finish.
>
> With this fix, rerunning the repro makes session 2 wait while session 1 is 
> blocked at the injection point. After canceling session 1 from session 3, 
> session 2 continues, creates the slot successfully, and effective_wal_level 
> becomes logical.

This serialization idea is very similar to what we tried in older
version patches. While I've confirmed that the patch fixes the
reported issue, I'm somewhat concerned that it could introduce another
race condition as the patch adds a new mechanism.

I think that the complication stems from the fact that
abort_logical_decoding_activation() and DisableLogicalDecoding() clear
the xlog_logical_info flag in different ways. I guess it would be
simpler if we could delegate all the deactivation process including
clearing xlog_logical_info flag to DisableLogicalDecoding(). It would
allow the system to be in the state of xlog_logical_info == true and
logical_decoding_enabled = false, but if we change
DisableLogicalDecoding() to handle this case, the deactivation process
would become simpler.

> I didn’t include a test in this patch, as I wasn’t sure such a test would be 
> desirable. If others think it is worth adding, I can convert the repro into a 
> TAP test.

I think we should have the tests.

I've attached the patch for the above idea including the regression
tests. Please review it.

Regards,


--
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com
diff --git a/src/backend/replication/logical/logicalctl.c b/src/backend/replication/logical/logicalctl.c
index 72f68ec58ef..c637e9f6358 100644
--- a/src/backend/replication/logical/logicalctl.c
+++ b/src/backend/replication/logical/logicalctl.c
@@ -256,33 +256,20 @@ write_logical_decoding_status_update_record(bool status)
 }
 
 /*
- * A PG_ENSURE_ERROR_CLEANUP callback for activating logical decoding, resetting
- * the shared flags to revert the logical decoding activation process.
+ * A PG_ENSURE_ERROR_CLEANUP callback for activating logical decoding.
+ *
+ * Rather than directly reverting xlog_logical_info here, we request
+ * that the checkpointer handle it via the normal disable path. This
+ * avoids race conditions when multiple backends attempt concurrent
+ * activation: the checkpointer will only reset xlog_logical_info when
+ * no valid logical slots exist, which naturally protects any other
+ * in-progress activation.
  */
 static void
 abort_logical_decoding_activation(int code, Datum arg)
 {
-	Assert(MyReplicationSlot);
-	Assert(!LogicalDecodingCtl->logical_decoding_enabled);
-
 	elog(DEBUG1, "aborting logical decoding activation process");
-
-	/*
-	 * Abort the change to xlog_logical_info. We don't need to check
-	 * CheckLogicalSlotExists() as we're still holding a logical slot.
-	 */
-	LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE);
-	LogicalDecodingCtl->xlog_logical_info = false;
-	LWLockRelease(LogicalDecodingControlLock);
-
-	/*
-	 * Some processes might have already started logical info WAL logging, so
-	 * tell all running processes to update their caches. We don't need to
-	 * wait for all processes to disable xlog_logical_info locally as it's
-	 * always safe to write logical information to WAL records, even when not
-	 * strictly required.
-	 */
-	EmitProcSignalBarrier(PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO);
+	RequestDisableLogicalDecoding();
 }
 
 /*
@@ -489,16 +476,19 @@ void
 DisableLogicalDecoding(void)
 {
 	bool		in_recovery = RecoveryInProgress();
+	bool		was_enabled;
 
 	LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE);
 
 	/*
 	 * Check if we can disable logical decoding.
 	 *
-	 * Skip CheckLogicalSlotExists() check during recovery because the
-	 * existing slots will be invalidated after disabling logical decoding.
+	 * Nothing to do if both flags are already off, or if valid slots
+	 * exist (skip the slot check during recovery because the existing
+	 * slots will be invalidated after disabling logical decoding.)
 	 */
-	if (!LogicalDecodingCtl->logical_decoding_enabled ||
+	if ((!LogicalDecodingCtl->logical_decoding_enabled &&
+		 !LogicalDecodingCtl->xlog_logical_info) ||
 		(!in_recovery && CheckLogicalSlotExists()))
 	{
 		LogicalDecodingCtl->pending_disable = false;
@@ -506,26 +496,41 @@ DisableLogicalDecoding(void)
 		return;
 	}
 
-	START_CRIT_SECTION();
+	was_enabled = LogicalDecodingCtl->logical_decoding_enabled;
 
-	/*
-	 * We need to disable logical decoding first and then disable logical
-	 * information WAL logging in order to ensure that no logical decoding
-	 * processes WAL records with insufficient information.
-	 */
-	LogicalDecodingCtl->logical_decoding_enabled = false;
+	if (was_enabled)
+	{
+		START_CRIT_SECTION();
 
-	/* Write the WAL to disable logical decoding on standbys too */
-	if (!in_recovery)
-		write_logical_decoding_status_update_record(false);
+		/*
+		 * We need to disable logical decoding first and then disable logical
+		 * information WAL logging in order to ensure that no logical decoding
+		 * processes WAL records with insufficient information.
+		 */
+		LogicalDecodingCtl->logical_decoding_enabled = false;
 
-	/* Now disable logical information WAL logging */
-	LogicalDecodingCtl->xlog_logical_info = false;
-	LogicalDecodingCtl->pending_disable = false;
+		/* Write the WAL to disable logical decoding on standbys too */
+		if (!in_recovery)
+			write_logical_decoding_status_update_record(false);
 
-	END_CRIT_SECTION();
+		/* Now disable logical information WAL logging */
+		LogicalDecodingCtl->xlog_logical_info = false;
+		LogicalDecodingCtl->pending_disable = false;
 
-	if (!in_recovery)
+		END_CRIT_SECTION();
+	}
+	else
+	{
+		/*
+		 * Logical decoding is disabled while xlog_logical_info is true.
+		 * This could happen if an activation is interrupted. Reset only
+		 * xlog_logical_info.
+		 */
+		LogicalDecodingCtl->xlog_logical_info = false;
+		LogicalDecodingCtl->logical_decoding_enabled = false;
+	}
+
+	if (!in_recovery && was_enabled)
 		ereport(LOG,
 				errmsg("logical decoding is disabled because there are no valid logical replication slots"));
 
diff --git a/src/test/recovery/t/051_effective_wal_level.pl b/src/test/recovery/t/051_effective_wal_level.pl
index c862073c34e..8e01ef4e45e 100644
--- a/src/test/recovery/t/051_effective_wal_level.pl
+++ b/src/test/recovery/t/051_effective_wal_level.pl
@@ -410,8 +410,39 @@ select pg_cancel_backend(pid) from pg_stat_activity where query ~ 'slot_canceled
 
 	# Verify that the backend aborted the activation process.
 	$primary->wait_for_log("aborting logical decoding activation process");
-	test_wal_level($primary, "replica|replica",
-		"the activation process aborted");
+	wait_for_logical_decoding_disabled($primary);
+	is(1, 1, "the activation process aborted");
+
+	# Test concurrent activation processes run and one is interrupted.
+	$psql_create_slot = $primary->background_psql('postgres');
+
+	# Start a psql session and stops in the middle of the activation
+	# process.
+	$psql_create_slot->query_until(
+		qr/create_slot_canceled/,
+		q(\echo create_slot_canceled
+select injection_points_set_local();
+select injection_points_attach('logical-decoding-activation', 'wait');
+select pg_create_logical_replication_slot('slot_canceled2', 'pgoutput');
+\q
+));
+	$primary->wait_for_event('client backend', 'logical-decoding-activation');
+	note("injection_point 'logical-decoding-activation' is reached");
+
+	# Another backend concurrently enables the logical decoding.
+	$primary->safe_psql('postgres',
+						qq[select pg_create_logical_replication_slot('test_slot2', 'pgoutput')]);
+
+	# Cancel the backend initiated by $psql_create_slot, aborting its activation
+	# process.
+	$primary->safe_psql(
+		'postgres',
+		qq[
+select pg_cancel_backend(pid) from pg_stat_activity where query ~ 'slot_canceled2' and pid <> pg_backend_pid()
+]);
+
+	test_wal_level($primary, "replica|logical",
+				   "the concurrent activation interrupt is handled property");
 }
 
 $primary->stop;

Reply via email to