From 926b08dbbfedc199e101d407b27a5a57fd76b9c4 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Mon, 1 Dec 2025 10:37:27 +0900
Subject: [PATCH v4 1/4] Introduce new type of logical replication messages to
 track dependencies

This patch introduces two logical replication messages,
LOGICAL_REP_MSG_INTERNAL_DEPENDENCY and LOGICAL_REP_MSG_INTERNAL_RELATION.
Apart from other messages, they are not sent by walsnders; the leader worker
sends to parallel workers based on the needs.

LOGICAL_REP_MSG_INTERNAL_DEPENDENCY ensures that dependent transactions are
committed in the correct order. It has a list of transaction IDs that parallel
workers must wait for. The message type would be generated when the leader
detects a dependency between the current and other transactions, or just before
the COMMIT message. The latter one is used to preserve the commit ordering
between the publisher and the subscriber.

LOGICAL_REP_MSG_INTERNAL_RELATION is used to synchronize the relation
information between the leader and parallel workers. It has a list of relations
that the leader already knows, and parallel workers also update the relmap in
response to the message. This type of message is generated when the leader
allocates a new parallel worker to the transaction, or when the publisher sends
additional RELATION messages.

Author: Hou Zhijie <houzj.fnst@fujitsu.com>
Author: Hayato Kuroda <kuroda.hayato@fujitsu.com>
---
 .../replication/logical/applyparallelworker.c | 16 ++++++
 src/backend/replication/logical/proto.c       |  4 ++
 src/backend/replication/logical/worker.c      | 49 +++++++++++++++++++
 src/include/replication/logicalproto.h        |  2 +
 src/include/replication/worker_internal.h     |  4 ++
 5 files changed, 75 insertions(+)

diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
index baa68c1ab6c..735a3e9acad 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -1645,3 +1645,19 @@ pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn)
 
 	pa_free_worker(winfo);
 }
+
+/*
+ * Wait for the given transaction to finish.
+ */
+void
+pa_wait_for_depended_transaction(TransactionId xid)
+{
+	elog(DEBUG1, "wait for depended xid %u", xid);
+
+	for (;;)
+	{
+		/* XXX wait until given transaction is finished */
+	}
+
+	elog(DEBUG1, "finish waiting for depended xid %u", xid);
+}
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index f0a913892b9..72dedee3a43 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -1253,6 +1253,10 @@ logicalrep_message_type(LogicalRepMsgType action)
 			return "STREAM ABORT";
 		case LOGICAL_REP_MSG_STREAM_PREPARE:
 			return "STREAM PREPARE";
+		case LOGICAL_REP_MSG_INTERNAL_DEPENDENCY:
+			return "INTERNAL DEPENDENCY";
+		case LOGICAL_REP_MSG_INTERNAL_RELATION:
+			return "INTERNAL RELATION";
 	}
 
 	/*
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 93970c6af29..ebf8cd62552 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -629,6 +629,47 @@ static TransApplyAction get_transaction_apply_action(TransactionId xid,
 
 static void replorigin_reset(int code, Datum arg);
 
+/*
+ * Handle internal dependency information.
+ *
+ * Wait for all transactions listed in the message to commit.
+ */
+static void
+apply_handle_internal_dependency(StringInfo s)
+{
+	int			nxids = pq_getmsgint(s, 4);
+
+	for (int i = 0; i < nxids; i++)
+	{
+		TransactionId xid = pq_getmsgint(s, 4);
+
+		pa_wait_for_depended_transaction(xid);
+	}
+}
+
+/*
+ * Handle internal relation information.
+ *
+ * Update all relation details in the relation map cache.
+ */
+static void
+apply_handle_internal_relation(StringInfo s)
+{
+	int			num_rels;
+
+	num_rels = pq_getmsgint(s, 4);
+
+	for (int i = 0; i < num_rels; i++)
+	{
+		LogicalRepRelation *rel = logicalrep_read_rel(s);
+
+		logicalrep_relmap_update(rel);
+
+		elog(DEBUG1, "parallel apply worker worker init relmap for %s",
+			 rel->relname);
+	}
+}
+
 /*
  * Form the origin name for the subscription.
  *
@@ -3868,6 +3909,14 @@ apply_dispatch(StringInfo s)
 			apply_handle_stream_prepare(s);
 			break;
 
+		case LOGICAL_REP_MSG_INTERNAL_RELATION:
+			apply_handle_internal_relation(s);
+			break;
+
+		case LOGICAL_REP_MSG_INTERNAL_DEPENDENCY:
+			apply_handle_internal_dependency(s);
+			break;
+
 		default:
 			ereport(ERROR,
 					(errcode(ERRCODE_PROTOCOL_VIOLATION),
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index b261c60d3fa..5d91e2a4287 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -75,6 +75,8 @@ typedef enum LogicalRepMsgType
 	LOGICAL_REP_MSG_STREAM_COMMIT = 'c',
 	LOGICAL_REP_MSG_STREAM_ABORT = 'A',
 	LOGICAL_REP_MSG_STREAM_PREPARE = 'p',
+	LOGICAL_REP_MSG_INTERNAL_DEPENDENCY = 'd',
+	LOGICAL_REP_MSG_INTERNAL_RELATION = 'i',
 } LogicalRepMsgType;
 
 /*
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index f081619f151..a3526eae578 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -359,6 +359,8 @@ extern void pa_decr_and_wait_stream_block(void);
 extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
 						   XLogRecPtr remote_lsn);
 
+extern void pa_wait_for_depended_transaction(TransactionId xid);
+
 #define isParallelApplyWorker(worker) ((worker)->in_use && \
 									   (worker)->type == WORKERTYPE_PARALLEL_APPLY)
 #define isTableSyncWorker(worker) ((worker)->in_use && \
@@ -366,6 +368,8 @@ extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
 #define isSequenceSyncWorker(worker) ((worker)->in_use && \
 									  (worker)->type == WORKERTYPE_SEQUENCESYNC)
 
+#define PARALLEL_APPLY_INTERNAL_MESSAGE	'i'
+
 static inline bool
 am_tablesync_worker(void)
 {
-- 
2.47.3

