From 684b35a9a4c62f60a405ff32f1b14fb90b4f025e Mon Sep 17 00:00:00 2001
From: "Chao Li (Evan)" <lic@highgo.com>
Date: Fri, 5 Sep 2025 10:42:24 +0800
Subject: [PATCH v1] Allow logical replication in the same cluster

Thare is a known issue discussed by [1] in 2017 where creating a
logical replication in the same cluster will fail.

This patch provides a solution by adding a new option "local=true"
to "CREATE SUBSCRIPTION".

[1] https://www.postgresql.org/message-id/20170426165954.GK14000%40momjian.us

Author: Chao Li <lic@highgo.com>
---
 src/backend/commands/subscriptioncmds.c     | 51 ++++++++++++++++++-
 src/backend/replication/logical/snapbuild.c | 54 ++++++++++++++++++++-
 src/include/replication/logical.h           |  5 ++
 3 files changed, 106 insertions(+), 4 deletions(-)

diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 82cf65fae73..28fa1b068a3 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -37,6 +37,7 @@
 #include "miscadmin.h"
 #include "nodes/makefuncs.h"
 #include "pgstat.h"
+#include "replication/logical.h"
 #include "replication/logicallauncher.h"
 #include "replication/logicalworker.h"
 #include "replication/origin.h"
@@ -75,6 +76,7 @@
 #define SUBOPT_MAX_RETENTION_DURATION	0x00008000
 #define SUBOPT_LSN					0x00010000
 #define SUBOPT_ORIGIN				0x00020000
+#define SUBOPT_LOCAL				0x00040000
 
 /* check if the 'val' has 'bits' set */
 #define IsSet(val, bits)  (((val) & (bits)) == (bits))
@@ -101,6 +103,7 @@ typedef struct SubOpts
 	bool		runasowner;
 	bool		failover;
 	bool		retaindeadtuples;
+	bool		local;
 	int32		maxretention;
 	char	   *origin;
 	XLogRecPtr	lsn;
@@ -118,7 +121,7 @@ static List *merge_publications(List *oldpublist, List *newpublist, bool addpub,
 static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
 static void CheckAlterSubOption(Subscription *sub, const char *option,
 								bool slot_needs_update, bool isTopLevel);
-
+static void mark_local_subscription_creation(bool set);
 
 /*
  * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands.
@@ -170,6 +173,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 		opts->failover = false;
 	if (IsSet(supported_opts, SUBOPT_RETAIN_DEAD_TUPLES))
 		opts->retaindeadtuples = false;
+	if (IsSet(supported_opts, SUBOPT_LOCAL))
+		opts->local = false;
 	if (IsSet(supported_opts, SUBOPT_MAX_RETENTION_DURATION))
 		opts->maxretention = 0;
 	if (IsSet(supported_opts, SUBOPT_ORIGIN))
@@ -385,6 +390,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 			opts->specified_opts |= SUBOPT_LSN;
 			opts->lsn = lsn;
 		}
+		else if (IsSet(supported_opts, SUBOPT_LOCAL) &&
+				 strcmp(defel->defname, "local") == 0)
+		{
+			if (IsSet(opts->specified_opts, SUBOPT_LOCAL))
+				errorConflictingDefElem(defel, pstate);
+
+			opts->specified_opts |= SUBOPT_LOCAL;
+			opts->local = defGetBoolean(defel);
+		}
 		else
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
@@ -593,7 +607,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 					  SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED |
 					  SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
 					  SUBOPT_RETAIN_DEAD_TUPLES |
-					  SUBOPT_MAX_RETENTION_DURATION | SUBOPT_ORIGIN);
+					  SUBOPT_MAX_RETENTION_DURATION | SUBOPT_ORIGIN |
+					  SUBOPT_LOCAL);
 	parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
 
 	/*
@@ -828,9 +843,15 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 				if (opts.twophase && !opts.copy_data && tables != NIL)
 					twophase_enabled = true;
 
+				if (opts.local)
+					mark_local_subscription_creation(true);
+
 				walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled,
 								   opts.failover, CRS_NOEXPORT_SNAPSHOT, NULL);
 
+				if (opts.local)
+					mark_local_subscription_creation(false);
+
 				if (twophase_enabled)
 					UpdateTwoPhaseState(subid, LOGICALREP_TWOPHASE_STATE_ENABLED);
 
@@ -2893,3 +2914,29 @@ defGetStreamingMode(DefElem *def)
 					def->defname)));
 	return LOGICALREP_STREAM_OFF;	/* keep compiler quiet */
 }
+
+static void
+mark_local_subscription_creation(bool set)
+{
+	bool	found;
+	LogicalLocalCreateSubscriptioinXactInfo *plx;
+	plx = ShmemInitStruct("pg_command_subscription_local",
+						   sizeof(LogicalLocalCreateSubscriptioinXactInfo),
+						   &found);
+
+	/* when unset, the memory must be found */
+	Assert(!set && !found);
+
+	if (set)
+	{
+		plx->backend_proc = MyProcNumber;
+		plx->xid = GetCurrentTransactionId();
+		ereport(LOG,
+				(errmsg("marked local subscription creation xid %d", plx->xid)));
+	}
+	else
+	{
+		plx->backend_proc = INVALID_PROC_NUMBER;
+		plx->xid = InvalidTransactionId;
+	}
+}
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 98ddee20929..e2afca2beb3 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -175,6 +175,9 @@ static void SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn);
 static bool SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn);
 static void SnapBuildRestoreContents(int fd, void *dest, Size size, const char *path);
 
+static bool is_xact_local_subscription_creation(TransactionId xid);
+static bool is_xact_local_subscription_creation_only_running(TransactionId oldestRunningXid, TransactionId nextXid);
+
 /*
  * Allocate a new snapshot builder.
  *
@@ -1291,7 +1294,8 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
 	 * NB: We might have already started to incrementally assemble a snapshot,
 	 * so we need to be careful to deal with that.
 	 */
-	if (running->oldestRunningXid == running->nextXid)
+	if (running->oldestRunningXid == running->nextXid ||
+	    is_xact_local_subscription_creation_only_running(running->oldestRunningXid, running->nextXid))
 	{
 		if (builder->start_decoding_at == InvalidXLogRecPtr ||
 			builder->start_decoding_at <= lsn)
@@ -1299,7 +1303,7 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
 			builder->start_decoding_at = lsn + 1;
 
 		/* As no transactions were running xmin/xmax can be trivially set. */
-		builder->xmin = running->nextXid;	/* < are finished */
+		builder->xmin = running->oldestRunningXid;	/* < are finished */
 		builder->xmax = running->nextXid;	/* >= are running */
 
 		/* so we can safely use the faster comparisons */
@@ -1448,6 +1452,9 @@ SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff)
 		if (TransactionIdIsCurrentTransactionId(xid))
 			elog(ERROR, "waiting for ourselves");
 
+		if (is_xact_local_subscription_creation(xid))
+			continue;
+
 		if (TransactionIdFollows(xid, cutoff))
 			continue;
 
@@ -2074,3 +2081,46 @@ SnapBuildSnapshotExists(XLogRecPtr lsn)
 
 	return ret == 0;
 }
+
+/*
+ * Check if the transaction with id 'xid' is a local subscription creation
+ * transaction.
+ */
+static bool
+is_xact_local_subscription_creation(TransactionId xid)
+{
+	bool			found;
+	PgBackendStatus 	*backendState;
+	LogicalLocalCreateSubscriptioinXactInfo *plx;
+
+	plx = ShmemInitStruct("pg_command_subscription_local",
+						   sizeof(LogicalLocalCreateSubscriptioinXactInfo),
+						   &found);
+	if (!found) {
+		return false;
+	}
+
+	if (plx->xid != xid) {
+		return true;
+	}
+
+	backendState = pgstat_get_beentry_by_proc_number(plx->backend_proc);
+	if (backendState != NULL && backendState->st_state == STATE_RUNNING) {
+		return true;
+	}
+
+	return false;
+}
+
+/*
+ * Check if the only running transaction is a local subscription creation
+ * transaction.
+ */
+static bool
+is_xact_local_subscription_creation_only_running(TransactionId oldestRunningXid, TransactionId nextXid)
+{
+	if (oldestRunningXid + 1 != nextXid) {
+		return false;
+	}
+	return is_xact_local_subscription_creation(oldestRunningXid);
+}
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index 2e562bee5a9..980c236bc27 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -114,6 +114,11 @@ typedef struct LogicalDecodingContext
 	bool		processing_required;
 } LogicalDecodingContext;
 
+typedef struct LogicalLocalCreateSubscriptioinXactInfo
+{
+	TransactionId xid;
+	ProcNumber	backend_proc;
+} LogicalLocalCreateSubscriptioinXactInfo;
 
 extern void CheckLogicalDecodingRequirements(void);
 
-- 
2.39.5 (Apple Git-154)

