From 3a81649cf5d8421d47e6b901e4b3bfb7159aa20b Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Tue, 15 Jul 2025 00:07:26 -0700
Subject: [PATCH v3 2/3] Add pg_[de]activate_logical_decoding() SQL functions.

Author:
Reviewed-by:
Discussion: https://postgr.es/m/
Backpatch-through:
---
 src/backend/access/transam/xlogrecovery.c     |   2 +-
 src/backend/commands/subscriptioncmds.c       |   2 +-
 src/backend/replication/logical/logicalctl.c  | 125 ++++++++++++++++++
 .../replication/logical/reorderbuffer.c       |   2 +-
 src/backend/replication/logical/slotsync.c    |   2 +-
 src/backend/replication/slot.c                |  52 +++++++-
 src/backend/replication/slotfuncs.c           |   8 +-
 src/backend/replication/walsender.c           |   4 +-
 src/include/catalog/pg_proc.dat               |  10 ++
 src/include/replication/slot.h                |  15 ++-
 10 files changed, 208 insertions(+), 14 deletions(-)

diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index 23878b2dd91..e8f3ba00caa 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -4760,7 +4760,7 @@ bool
 check_primary_slot_name(char **newval, void **extra, GucSource source)
 {
 	if (*newval && strcmp(*newval, "") != 0 &&
-		!ReplicationSlotValidateName(*newval, WARNING))
+		!ReplicationSlotValidateName(*newval, false, WARNING))
 		return false;
 
 	return true;
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index e23b0de7242..bb28f6c95e2 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -210,7 +210,7 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 			if (strcmp(opts->slot_name, "none") == 0)
 				opts->slot_name = NULL;
 			else
-				ReplicationSlotValidateName(opts->slot_name, ERROR);
+				ReplicationSlotValidateName(opts->slot_name, false, ERROR);
 		}
 		else if (IsSet(supported_opts, SUBOPT_COPY_DATA) &&
 				 strcmp(defel->defname, "copy_data") == 0)
diff --git a/src/backend/replication/logical/logicalctl.c b/src/backend/replication/logical/logicalctl.c
index 320a43b0574..f4b7dc072ea 100644
--- a/src/backend/replication/logical/logicalctl.c
+++ b/src/backend/replication/logical/logicalctl.c
@@ -38,6 +38,9 @@ LogicalDecodingCtlData *LogicalDecodingCtl = NULL;
  */
 bool		XLogLogicalInfo = false;
 
+PG_FUNCTION_INFO_V1(pg_activate_logical_decoding);
+PG_FUNCTION_INFO_V1(pg_deactivate_logical_decoding);
+
 Size
 LogicalDecodingCtlShmemSize(void)
 {
@@ -403,3 +406,125 @@ UpdateLogicalDecodingStatusEndOfRecovery(void)
 	WaitForProcSignalBarrier(
 							 EmitProcSignalBarrier(PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO));
 }
+
+/*
+ * SQL function to activate the logical decoding when wal_level = 'replica'.
+ * This function essentially creates an empty logical slot with the name
+ * LOGICAL_DECODING_ACTIVATION_SLOT to keep the logical decoding active
+ * until users disable it by calling pg_deactivate_logical_decoding() SQL
+ * function.
+ */
+Datum
+pg_activate_logical_decoding(PG_FUNCTION_ARGS)
+{
+	CheckSlotPermissions();
+	CheckSlotRequirements();
+
+	if (RecoveryInProgress())
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("cannot activate logical decoding on standbys")));
+
+	if (wal_level >= WAL_LEVEL_LOGICAL)
+	{
+		ereport(NOTICE,
+				(errmsg("skipping logical decoding activation"),
+				 errdetail("logical decoding is already activated because \"wal_level\" >= \"logical\".")));
+
+		PG_RETURN_VOID();
+	}
+
+	if (SearchNamedReplicationSlot(LOGICAL_DECODING_ACTIVATION_SLOT, true))
+	{
+		ereport(NOTICE,
+				(errmsg("skipping logical decoding activation"),
+				 errdetail("logical decoding is already activated using pg_activate_logical_decoding().")));
+
+		PG_RETURN_VOID();
+	}
+
+	if (IsLogicalDecodingEnabled())
+	{
+		uint32		nslots = pg_atomic_read_u32(&ReplicationSlotCtl->n_inuse_logical_slots);
+
+		/*
+		 * The logical decoding is already activated since we have at least
+		 * one logical slot, but we don't have
+		 * LOGICAL_DECODING_ACTIVATION_SLOT. We let users know it's already
+		 * enabled but proceed with creating the slot.
+		 */
+		ereport(NOTICE,
+				(errmsg_plural("logical decoding is already activated because there is %u in-use logical slot",
+							   "logical decoding is already activated because there are %u in-use logical slots",
+							   nslots, nslots)));
+	}
+
+	/*
+	 * Create an empty logical slot with the reserved name, and enable the
+	 * logical decoding.
+	 */
+	ReplicationSlotCreate(LOGICAL_DECODING_ACTIVATION_SLOT, true,
+						  RS_PERSISTENT, false, false, false, true);
+	EnsureLogicalDecodingEnabled();
+	Assert(IsLogicalDecodingEnabled());
+
+	ReplicationSlotRelease();
+
+	PG_RETURN_VOID();
+}
+
+/*
+ * Deactivate the logical decoding if it has been enabled using
+ * pg_activate_logical_decoding() SQL function.
+ */
+Datum
+pg_deactivate_logical_decoding(PG_FUNCTION_ARGS)
+{
+	if (RecoveryInProgress())
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("cannot deactivate logical decoding on standbys")));
+
+	if (wal_level >= WAL_LEVEL_LOGICAL)
+	{
+		ereport(NOTICE,
+				(errmsg("skipping logical decoding deactivation"),
+				 errdetail("logical decoding is activated because  \"wal_level\" >= \"logical\".")));
+
+		PG_RETURN_VOID();
+	}
+
+	if (!IsLogicalDecodingEnabled())
+	{
+		ereport(NOTICE,
+				(errmsg("skipping logical decoding deactivation"),
+				 errdetail("logical decoding is already deactivated.")));
+
+		PG_RETURN_VOID();
+	}
+
+	if (!SearchNamedReplicationSlot(LOGICAL_DECODING_ACTIVATION_SLOT, true))
+	{
+		/*
+		 * The logical decoding is enabled but there is no
+		 * LOGICAL_DECODING_ACTIVATION_SLOT slot, meaning that there is at
+		 * least one logical slot. The only way to disable it in this case is
+		 * to drop all logical slot.
+		 */
+		ereport(NOTICE,
+				(errmsg("skipping logical decoding deactivation"),
+				 errdetail("pg_activate_logical_decoding() has not called."),
+				 errhint("Drop all logical replication slots to disable logical decoding")));
+
+		PG_RETURN_VOID();
+	}
+
+	DropLogicalDecodingActivationSlot();
+	DisableLogicalDecodingIfNecessary();
+
+	if (IsLogicalDecodingEnabled())
+		ereport(NOTICE,
+				(errmsg("logical replication is still active until all logical slots are dropped")));
+
+	PG_RETURN_VOID();
+}
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 7b4e8629553..1e1af094bab 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -4917,7 +4917,7 @@ StartupReorderBuffer(void)
 			continue;
 
 		/* if it cannot be a slot, skip the directory */
-		if (!ReplicationSlotValidateName(logical_de->d_name, DEBUG2))
+		if (!ReplicationSlotValidateName(logical_de->d_name, false, DEBUG2))
 			continue;
 
 		/*
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index f3ed58b9001..7841a03d67f 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -762,7 +762,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 		ReplicationSlotCreate(remote_slot->name, true, RS_TEMPORARY,
 							  remote_slot->two_phase,
 							  remote_slot->failover,
-							  true);
+							  true, false);
 
 		EnsureLogicalDecodingEnabled();
 
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 671b7fa2083..5ec8e3b4aab 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -264,10 +264,14 @@ ReplicationSlotShmemExit(int code, Datum arg)
  * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow
  * the name to be used as a directory name on every supported OS.
  *
+ * An error will be reported for a reserved replication slot name if
+ * allow_reserved_name is set to false.
+ *
  * Returns whether the directory name is valid or not if elevel < ERROR.
  */
 bool
-ReplicationSlotValidateName(const char *name, int elevel)
+ReplicationSlotValidateName(const char *name, bool allow_reserved_name,
+							int elevel)
 {
 	const char *cp;
 
@@ -303,6 +307,20 @@ ReplicationSlotValidateName(const char *name, int elevel)
 			return false;
 		}
 	}
+
+	if (!allow_reserved_name &&
+		(strcmp(name, LOGICAL_DECODING_ACTIVATION_SLOT) == 0))
+	{
+		ereport(elevel,
+				errcode(ERRCODE_RESERVED_NAME),
+				errmsg("replication slot name \"%s\" is reserved",
+					   name),
+				errdetail("The name \"%s\" is reserved for the conflict detection slot.",
+						  LOGICAL_DECODING_ACTIVATION_SLOT));
+
+		return false;
+	}
+
 	return true;
 }
 
@@ -326,14 +344,16 @@ ReplicationSlotValidateName(const char *name, int elevel)
 void
 ReplicationSlotCreate(const char *name, bool db_specific,
 					  ReplicationSlotPersistency persistency,
-					  bool two_phase, bool failover, bool synced)
+					  bool two_phase, bool failover, bool synced,
+					  bool skip_name_validation)
 {
 	ReplicationSlot *slot = NULL;
 	int			i;
 
 	Assert(MyReplicationSlot == NULL);
 
-	ReplicationSlotValidateName(name, ERROR);
+	if (!skip_name_validation)
+		ReplicationSlotValidateName(name, true, ERROR);
 
 	if (failover)
 	{
@@ -570,6 +590,12 @@ ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
 
 	Assert(name != NULL);
 
+	/* Any operations are not permitted on the special-purpose slot */
+	if (strcmp(name, LOGICAL_DECODING_ACTIVATION_SLOT) == 0)
+		ereport(ERROR,
+				(errmsg("cannot acquire \"%s\" slot",
+						LOGICAL_DECODING_ACTIVATION_SLOT)));
+
 retry:
 	Assert(MyReplicationSlot == NULL);
 
@@ -748,8 +774,11 @@ ReplicationSlotRelease(void)
 		SpinLockRelease(&slot->mutex);
 		ConditionVariableBroadcast(&slot->active_cv);
 	}
-	else
+	else if (strcmp(NameStr(slot->data.name), LOGICAL_DECODING_ACTIVATION_SLOT) != 0)
+	{
+		/* Set inactive_since value unless the slot is not reserved one */
 		ReplicationSlotSetInactiveSince(slot, now, true);
+	}
 
 	MyReplicationSlot = NULL;
 
@@ -839,6 +868,21 @@ ReplicationSlotDrop(const char *name, bool nowait)
 	ReplicationSlotDropAcquired();
 }
 
+/*
+ * Drop the (reserved) LOGICAL_DECODING_ACTIVATION_SLOT slot.
+ */
+void
+DropLogicalDecodingActivationSlot(void)
+{
+	ReplicationSlot *slot;
+
+	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+	slot = SearchNamedReplicationSlot(LOGICAL_DECODING_ACTIVATION_SLOT, false);
+	LWLockRelease(ReplicationSlotControlLock);	/* avoid deadlock */
+
+	ReplicationSlotDropPtr(slot);
+}
+
 /*
  * Change the definition of the slot identified by the specified name.
  */
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 6744a7979d7..99c36b9a525 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -42,7 +42,7 @@ create_physical_replication_slot(char *name, bool immediately_reserve,
 	/* acquire replication slot, this will check for conflicting names */
 	ReplicationSlotCreate(name, false,
 						  temporary ? RS_TEMPORARY : RS_PERSISTENT, false,
-						  false, false);
+						  false, false, false);
 
 	if (immediately_reserve)
 	{
@@ -135,7 +135,11 @@ create_logical_replication_slot(char *name, char *plugin,
 	 */
 	ReplicationSlotCreate(name, true,
 						  temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase,
-						  failover, false);
+						  failover, false, false);
+
+	EnsureLogicalDecodingEnabled();
+	CheckLogicalDecodingRequirements();
+
 
 	EnsureLogicalDecodingEnabled();
 	CheckLogicalDecodingRequirements();
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 7158c638836..dea9445adce 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1199,7 +1199,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 	{
 		ReplicationSlotCreate(cmd->slotname, false,
 							  cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT,
-							  false, false, false);
+							  false, false, false, false);
 
 		if (reserve_wal)
 		{
@@ -1231,7 +1231,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 		 */
 		ReplicationSlotCreate(cmd->slotname, true,
 							  cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL,
-							  two_phase, failover, false);
+							  two_phase, failover, false, false);
 
 		/*
 		 * Do options check early so that we can bail before calling the
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 1fc19146f46..2d3686ae867 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -12572,4 +12572,14 @@
   proargnames => '{pid,io_id,io_generation,state,operation,off,length,target,handle_data_len,raw_result,result,target_desc,f_sync,f_localmem,f_buffered}',
   prosrc => 'pg_get_aios' },
 
+# Logical decoding activation/deactivation functions
+{ oid => '8562', descr => 'activate logical decoding',
+  proname => 'pg_activate_logical_decoding', prorettype => 'void',
+  provolatile => 'v', proparallel => 'u', proargtypes => '',
+  prosrc => 'pg_activate_logical_decoding' },
+{ oid => '8563', descr => 'deactivate logical decoding',
+  proname => 'pg_deactivate_logical_decoding', prorettype => 'void',
+  provolatile => 'v', proparallel => 'u', proargtypes => '',
+  prosrc => 'pg_deactivate_logical_decoding' },
+
 ]
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index a983e025d63..9ea45266f19 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -21,6 +21,14 @@
 /* directory to store replication slot data in */
 #define PG_REPLSLOT_DIR     "pg_replslot"
 
+/*
+ * The reserved name for a replication slot used to activate the logical
+ * decoding even when wal_level = 'replica'. This slot is created and
+ * dropped only via dedicated functions such as
+ * pg_activate_logical_decoding().
+ */
+#define LOGICAL_DECODING_ACTIVATION_SLOT "pg_logical_decoding_activation"
+
 /*
  * Behaviour of replication slots, upon release or crash.
  *
@@ -279,7 +287,7 @@ extern void ReplicationSlotsShmemInit(void);
 extern void ReplicationSlotCreate(const char *name, bool db_specific,
 								  ReplicationSlotPersistency persistency,
 								  bool two_phase, bool failover,
-								  bool synced);
+								  bool synced, bool skip_name_validation);
 extern void ReplicationSlotPersist(void);
 extern void ReplicationSlotDrop(const char *name, bool nowait);
 extern void ReplicationSlotDropAcquired(void);
@@ -295,7 +303,8 @@ extern void ReplicationSlotMarkDirty(void);
 
 /* misc stuff */
 extern void ReplicationSlotInitialize(void);
-extern bool ReplicationSlotValidateName(const char *name, int elevel);
+extern bool ReplicationSlotValidateName(const char *name, bool allow_reserved_name,
+										int elevel);
 extern void ReplicationSlotReserveWal(void);
 extern void ReplicationSlotsComputeRequiredXmin(bool already_locked);
 extern void ReplicationSlotsComputeRequiredLSN(void);
@@ -325,4 +334,6 @@ extern bool SlotExistsInSyncStandbySlots(const char *slot_name);
 extern bool StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel);
 extern void WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn);
 
+extern void DropLogicalDecodingActivationSlot(void);
+
 #endif							/* SLOT_H */
-- 
2.43.5

