From 5339af3055461f7a9229eb8acb28fd2e70944481 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Mon, 1 Dec 2025 16:39:02 +0900
Subject: [PATCH v4 3/4] Introduce a local hash table to store replica
 identities

This local hash table on the leader is used for detecting dependencies between
transactions.

The hash contains the Replica Identity (RI) as a key and the remote XID that
modified the corresponding tuple. The hash entries are inserted when the leader
finds an RI from a replication message. Entries are deleted when transactions
committed by parallel workers are gathered, or the number of entries exceeds the
limit.

When the leader sends replication changes to parallel workers, it checks whether
other transactions have already used the RI associated with the change. If
something is found, the leader treats it as a dependent transaction and notifies
parallel workers to wait until it finishes via LOGICAL_REP_MSG_INTERNAL_DEPENDENCY.

Author: Hou Zhijie <houzj.fnst@fujitsu.com>
Author: Hayato Kuroda <kuroda.hayato@fujitsu.com>
---
 .../replication/logical/applyparallelworker.c | 123 +++-
 src/backend/replication/logical/relation.c    |  24 +
 src/backend/replication/logical/worker.c      | 616 +++++++++++++++++-
 src/include/replication/logicalrelation.h     |   3 +
 src/include/replication/worker_internal.h     |   8 +-
 5 files changed, 771 insertions(+), 3 deletions(-)

diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
index bc8a0480778..40d57daf179 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -216,6 +216,7 @@ typedef struct ParallelApplyWorkerEntry
 {
 	TransactionId xid;			/* Hash key -- must be first */
 	ParallelApplyWorkerInfo *winfo;
+	XLogRecPtr	local_end;
 } ParallelApplyWorkerEntry;
 
 /* an entry in the parallelized_txns shared hash table */
@@ -504,7 +505,7 @@ pa_launch_parallel_worker(void)
  * streaming changes.
  */
 void
-pa_allocate_worker(TransactionId xid)
+pa_allocate_worker(TransactionId xid, bool stream_txn)
 {
 	bool		found;
 	ParallelApplyWorkerInfo *winfo = NULL;
@@ -545,7 +546,9 @@ pa_allocate_worker(TransactionId xid)
 
 	winfo->in_use = true;
 	winfo->serialize_changes = false;
+	winfo->stream_txn = stream_txn;
 	entry->winfo = winfo;
+	entry->local_end = InvalidXLogRecPtr;
 }
 
 /*
@@ -742,6 +745,73 @@ pa_process_spooled_messages_if_required(void)
 	return true;
 }
 
+/*
+ * Get the local end LSN for a transaction applied by a parallel apply worker.
+ *
+ * Set delete_entry to true if you intend to remove the transaction from the
+ * ParallelApplyTxnHash after collecting its LSN.
+ *
+ * If the parallel apply worker did not write any changes during the transaction
+ * application due to situations like update/delete_missing or a before trigger,
+ * the *skipped_write will be set to true.
+ */
+XLogRecPtr
+pa_get_last_commit_end(TransactionId xid, bool delete_entry, bool *skipped_write)
+{
+	bool		found;
+	ParallelApplyWorkerEntry *entry;
+	ParallelApplyWorkerInfo *winfo;
+
+	Assert(TransactionIdIsValid(xid));
+
+	if (skipped_write)
+		*skipped_write = false;
+
+	/* Find an entry for the requested transaction. */
+	entry = hash_search(ParallelApplyTxnHash, &xid, HASH_FIND, &found);
+
+	if (!found)
+		return InvalidXLogRecPtr;
+
+	/*
+	 * If worker info is NULL, it indicates that the worker has been reused
+	 * for handling other transactions. Consequently, the local end LSN has
+	 * already been collected and saved in entry->local_end.
+	 */
+	winfo = entry->winfo;
+	if (winfo == NULL)
+	{
+		if (delete_entry &&
+			!hash_search(ParallelApplyTxnHash, &xid, HASH_REMOVE, NULL))
+			elog(ERROR, "hash table corrupted");
+
+		if (skipped_write)
+			*skipped_write = XLogRecPtrIsInvalid(entry->local_end);
+
+		return entry->local_end;
+	}
+
+	/* Return InvalidXLogRecPtr if the transaction is still in progress */
+	if (pa_get_xact_state(winfo->shared) != PARALLEL_TRANS_FINISHED)
+		return InvalidXLogRecPtr;
+
+	/* Collect the local end LSN from the worker's shared memory area */
+	entry->local_end = winfo->shared->last_commit_end;
+	entry->winfo = NULL;
+
+	if (skipped_write)
+		*skipped_write = XLogRecPtrIsInvalid(entry->local_end);
+
+	elog(DEBUG1, "store local commit %X/%X end to txn entry: %u",
+		 LSN_FORMAT_ARGS(entry->local_end), xid);
+
+	if (delete_entry &&
+		!hash_search(ParallelApplyTxnHash, &xid, HASH_REMOVE, NULL))
+		elog(ERROR, "hash table corrupted");
+
+	return entry->local_end;
+}
+
 /*
  * Interrupt handler for main loop of parallel apply worker.
  */
@@ -1686,6 +1756,26 @@ pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn)
 	pa_free_worker(winfo);
 }
 
+bool
+pa_transaction_committed(TransactionId xid)
+{
+	bool		found;
+	ParallelApplyWorkerEntry *entry;
+
+	Assert(TransactionIdIsValid(xid));
+
+	/* Find an entry for the requested transaction */
+	entry = hash_search(ParallelApplyTxnHash, &xid, HASH_FIND, &found);
+
+	if (!found)
+		return true;
+
+	if (!entry->winfo)
+		return true;
+
+	return pa_get_xact_state(entry->winfo->shared) == PARALLEL_TRANS_FINISHED;
+}
+
 /*
  * Attach to the shared hash table for parallelized transactions.
  */
@@ -1731,6 +1821,37 @@ pa_attach_parallelized_txn_hash(dsa_handle *pa_dsa_handle,
 	MemoryContextSwitchTo(oldctx);
 }
 
+/*
+ * Record in-progress transactions from the given list that are being depended
+ * on into the shared hash table.
+ */
+void
+pa_record_dependency_on_transactions(List *depends_on_xids)
+{
+	foreach_xid(xid, depends_on_xids)
+	{
+		bool		found;
+		ParallelApplyWorkerEntry *winfo_entry;
+		ParallelApplyWorkerInfo *winfo;
+		ParallelizedTxnEntry *txn_entry;
+
+		winfo_entry = hash_search(ParallelApplyTxnHash, &xid, HASH_FIND, &found);
+		winfo = winfo_entry->winfo;
+
+		txn_entry = dshash_find_or_insert(parallelized_txns, &xid, &found);
+
+		/*
+		 * If the transaction has been committed now, remove the entry,
+		 * otherwise the parallel apply worker will remove the entry once
+		 * committed the transaction.
+		 */
+		if (pa_get_xact_state(winfo->shared) == PARALLEL_TRANS_FINISHED)
+			dshash_delete_entry(parallelized_txns, txn_entry);
+		else
+			dshash_release_lock(parallelized_txns, txn_entry);
+	}
+}
+
 /*
  * Wait for the given transaction to finish.
  */
diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c
index 10b3d0d9b82..66c73ce34a1 100644
--- a/src/backend/replication/logical/relation.c
+++ b/src/backend/replication/logical/relation.c
@@ -959,3 +959,27 @@ FindLogicalRepLocalIndex(Relation localrel, LogicalRepRelation *remoterel,
 
 	return InvalidOid;
 }
+
+/*
+ * Get the LogicalRepRelMapEntry corresponding to the given relid without
+ * opening the local relation.
+ */
+LogicalRepRelMapEntry *
+logicalrep_get_relentry(LogicalRepRelId remoteid)
+{
+	LogicalRepRelMapEntry *entry;
+	bool		found;
+
+	if (LogicalRepRelMap == NULL)
+		logicalrep_relmap_init();
+
+	/* Search for existing entry. */
+	entry = hash_search(LogicalRepRelMap, (void *) &remoteid,
+						HASH_FIND, &found);
+
+	if (!found)
+		elog(DEBUG1, "no relation map entry for remote relation ID %u",
+			 remoteid);
+
+	return entry;
+}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index ebf8cd62552..269a3ac5804 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -303,6 +303,7 @@ typedef struct FlushPosition
 	dlist_node	node;
 	XLogRecPtr	local_end;
 	XLogRecPtr	remote_end;
+	TransactionId pa_remote_xid;
 } FlushPosition;
 
 static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping);
@@ -544,6 +545,49 @@ typedef struct ApplySubXactData
 
 static ApplySubXactData subxact_data = {0, 0, InvalidTransactionId, NULL};
 
+typedef struct ReplicaIdentityKey
+{
+	Oid			relid;
+	LogicalRepTupleData *data;
+} ReplicaIdentityKey;
+
+typedef struct ReplicaIdentityEntry
+{
+	ReplicaIdentityKey *keydata;
+	TransactionId remote_xid;
+
+	/* needed for simplehash */
+	uint32		hash;
+	char		status;
+} ReplicaIdentityEntry;
+
+#include "common/hashfn.h"
+
+static uint32 hash_replica_identity(ReplicaIdentityKey *key);
+static bool hash_replica_identity_compare(ReplicaIdentityKey *a,
+										  ReplicaIdentityKey *b);
+
+/* Define parameters for replica identity hash table code generation. */
+#define SH_PREFIX		replica_identity
+#define SH_ELEMENT_TYPE	ReplicaIdentityEntry
+#define SH_KEY_TYPE		ReplicaIdentityKey *
+#define	SH_KEY			keydata
+#define SH_HASH_KEY(tb, key)	hash_replica_identity(key)
+#define SH_EQUAL(tb, a, b)		hash_replica_identity_compare(a, b)
+#define SH_STORE_HASH
+#define SH_GET_HASH(tb, a) (a)->hash
+#define	SH_SCOPE		static inline
+#define SH_DECLARE
+#define SH_DEFINE
+#include "lib/simplehash.h"
+
+#define REPLICA_IDENTITY_INITIAL_SIZE 128
+#define REPLICA_IDENTITY_CLEANUP_THRESHOLD 1024
+
+static replica_identity_hash *replica_identity_table = NULL;
+
+static void write_internal_dependencies(StringInfo s, List *depends_on_xids);
+
 static inline void subxact_filename(char *path, Oid subid, TransactionId xid);
 static inline void changes_filename(char *path, Oid subid, TransactionId xid);
 
@@ -629,6 +673,546 @@ static TransApplyAction get_transaction_apply_action(TransactionId xid,
 
 static void replorigin_reset(int code, Datum arg);
 
+static bool send_internal_dependencies(ParallelApplyWorkerInfo *winfo,
+									   StringInfo s);
+
+/*
+ * Compute the hash value for entries in the replica_identity_table.
+ */
+static uint32
+hash_replica_identity(ReplicaIdentityKey *key)
+{
+	int			i;
+	uint32		hashkey = 0;
+
+	hashkey = hash_combine(hashkey, hash_uint32(key->relid));
+
+	for (i = 0; i < key->data->ncols; i++)
+	{
+		uint32		hkey;
+
+		if (key->data->colstatus[i] == LOGICALREP_COLUMN_NULL)
+			continue;
+
+		hkey = hash_any((const unsigned char *) key->data->colvalues[i].data,
+						key->data->colvalues[i].len);
+		hashkey = hash_combine(hashkey, hkey);
+	}
+
+	return hashkey;
+}
+
+/*
+ * Compare two entries in the replica_identity_table.
+ */
+static bool
+hash_replica_identity_compare(ReplicaIdentityKey *a, ReplicaIdentityKey *b)
+{
+	if (a->relid != b->relid ||
+		a->data->ncols != b->data->ncols)
+		return false;
+
+	for (int i = 0; i < a->data->ncols; i++)
+	{
+		if (a->data->colstatus[i] != b->data->colstatus[i])
+			return false;
+
+		if (a->data->colvalues[i].len != b->data->colvalues[i].len)
+			return false;
+
+		if (strcmp(a->data->colvalues[i].data, b->data->colvalues[i].data))
+			return false;
+
+		elog(DEBUG1, "conflicting key %s", a->data->colvalues[i].data);
+	}
+
+	return true;
+}
+
+/*
+ * Free resources associated with a replica identity key.
+ */
+static void
+free_replica_identity_key(ReplicaIdentityKey *key)
+{
+	Assert(key);
+
+	pfree(key->data->colvalues);
+	pfree(key->data->colstatus);
+	pfree(key->data);
+	pfree(key);
+}
+
+/*
+ * Clean up hash table entries associated with the given transaction IDs.
+ */
+static void
+cleanup_replica_identity_table(List *committed_xid)
+{
+	replica_identity_iterator i;
+	ReplicaIdentityEntry *rientry;
+
+	if (!committed_xid)
+		return;
+
+	replica_identity_start_iterate(replica_identity_table, &i);
+	while ((rientry = replica_identity_iterate(replica_identity_table, &i)) != NULL)
+	{
+		if (!list_member_xid(committed_xid, rientry->remote_xid))
+			continue;
+
+		/* Clean up the hash entry for committed transaction */
+		free_replica_identity_key(rientry->keydata);
+		replica_identity_delete_item(replica_identity_table, rientry);
+	}
+}
+
+/*
+ * Check committed transactions and clean up corresponding entries in the hash
+ * table.
+ */
+static void
+cleanup_committed_replica_identity_entries(void)
+{
+	dlist_mutable_iter iter;
+	List	   *committed_xids = NIL;
+
+	dlist_foreach_modify(iter, &lsn_mapping)
+	{
+		FlushPosition *pos =
+			dlist_container(FlushPosition, node, iter.cur);
+		bool		skipped_write;
+
+		if (!TransactionIdIsValid(pos->pa_remote_xid) ||
+			!XLogRecPtrIsInvalid(pos->local_end))
+			continue;
+
+		pos->local_end = pa_get_last_commit_end(pos->pa_remote_xid, true,
+												&skipped_write);
+
+		elog(DEBUG1,
+			 "got commit end from parallel apply worker, "
+			 "txn: %u, remote_end %X/%X, local_end %X/%X",
+			 pos->pa_remote_xid, LSN_FORMAT_ARGS(pos->remote_end),
+			 LSN_FORMAT_ARGS(pos->local_end));
+
+		if (!skipped_write && XLogRecPtrIsInvalid(pos->local_end))
+			continue;
+
+		committed_xids = lappend_xid(committed_xids, pos->pa_remote_xid);
+	}
+
+	/* cleanup the entries for committed transactions */
+	cleanup_replica_identity_table(committed_xids);
+}
+
+/*
+ * Append a transaction dependency, excluding duplicates and committed
+ * transactions.
+ */
+static List *
+check_and_append_xid_dependency(List *depends_on_xids,
+								TransactionId *depends_on_xid,
+								TransactionId current_xid)
+{
+	Assert(depends_on_xid);
+
+	if (!TransactionIdIsValid(*depends_on_xid))
+		return depends_on_xids;
+
+	if (TransactionIdEquals(*depends_on_xid, current_xid))
+		return depends_on_xids;
+
+	if (list_member_xid(depends_on_xids, *depends_on_xid))
+		return depends_on_xids;
+
+	/*
+	 * Return and reset the xid if the transaction has been committed.
+	 */
+	if (pa_transaction_committed(*depends_on_xid))
+	{
+		*depends_on_xid = InvalidTransactionId;
+		return depends_on_xids;
+	}
+
+	return lappend_xid(depends_on_xids, *depends_on_xid);
+}
+
+/*
+ * Check for dependencies on preceding transactions that modify the same key.
+ * Returns the dependent transactions in 'depends_on_xids' and records the
+ * current change.
+ */
+static void
+check_dependency_on_replica_identity(Oid relid,
+									 LogicalRepTupleData *original_data,
+									 TransactionId new_depended_xid,
+									 List **depends_on_xids)
+{
+	LogicalRepRelMapEntry *relentry;
+	LogicalRepTupleData *ridata;
+	ReplicaIdentityKey *rikey;
+	ReplicaIdentityEntry *rientry;
+	MemoryContext oldctx;
+	int			n_ri;
+	bool		found = false;
+
+	Assert(depends_on_xids);
+
+	/* Search for existing entry */
+	relentry = logicalrep_get_relentry(relid);
+
+	Assert(relentry);
+
+	/*
+	 * First search whether any previous transaction has affected the whole
+	 * table e.g., truncate or schema change from publisher.
+	 */
+	*depends_on_xids = check_and_append_xid_dependency(*depends_on_xids,
+													   &relentry->last_depended_xid,
+													   new_depended_xid);
+
+	n_ri = bms_num_members(relentry->remoterel.attkeys);
+
+	/*
+	 * Return if there are no replica identity columns, indicating that the
+	 * remote relation has neither a replica identity key nor is marked as
+	 * replica identity full.
+	 */
+	if (!n_ri)
+		return;
+
+	/* Check if the RI key value of the tuple is invalid */
+	for (int i = 0; i < original_data->ncols; i++)
+	{
+		if (!bms_is_member(i, relentry->remoterel.attkeys))
+			continue;
+
+		/*
+		 * Return if RI key is NULL or is explicitly marked unchanged. The key
+		 * value could be NULL in the new tuple of a update opertaion which
+		 * means the RI key is not updated.
+		 */
+		if (original_data->colstatus[i] == LOGICALREP_COLUMN_NULL ||
+			original_data->colstatus[i] == LOGICALREP_COLUMN_UNCHANGED)
+			return;
+	}
+
+	oldctx = MemoryContextSwitchTo(ApplyContext);
+
+	/* Allocate space for replica identity values */
+	ridata = palloc0_object(LogicalRepTupleData);
+	ridata->colvalues = palloc0_array(StringInfoData, n_ri);
+	ridata->colstatus = palloc0_array(char, n_ri);
+	ridata->ncols = n_ri;
+
+	for (int i_original = 0, i_ri = 0; i_original < original_data->ncols; i_original++)
+	{
+		StringInfo	original_colvalue = &original_data->colvalues[i_original];
+
+		if (!bms_is_member(i_original, relentry->remoterel.attkeys))
+			continue;
+
+		initStringInfoExt(&ridata->colvalues[i_ri], original_colvalue->len + 1);
+		appendStringInfoString(&ridata->colvalues[i_ri], original_colvalue->data);
+		ridata->colstatus[i_ri] = original_data->colstatus[i_original];
+		i_ri++;
+	}
+
+	rikey = palloc0_object(ReplicaIdentityKey);
+	rikey->relid = relid;
+	rikey->data = ridata;
+
+	if (TransactionIdIsValid(new_depended_xid))
+	{
+		rientry = replica_identity_insert(replica_identity_table, rikey,
+										  &found);
+
+		/*
+		 * Release the key built to search the entry, if the entry already
+		 * exists. Otherwise, initialize the remote_xid.
+		 */
+		if (found)
+		{
+			elog(DEBUG1, "found conflicting replica identity change from %u",
+				 rientry->remote_xid);
+
+			free_replica_identity_key(rikey);
+		}
+		else
+			rientry->remote_xid = InvalidTransactionId;
+	}
+	else
+	{
+		rientry = replica_identity_lookup(replica_identity_table, rikey);
+		free_replica_identity_key(rikey);
+	}
+
+	MemoryContextSwitchTo(oldctx);
+
+	/* Return if no entry found */
+	if (!rientry)
+		return;
+
+	Assert(!found || TransactionIdIsValid(rientry->remote_xid));
+
+	*depends_on_xids = check_and_append_xid_dependency(*depends_on_xids,
+													   &rientry->remote_xid,
+													   new_depended_xid);
+
+	/*
+	 * Update the new depended xid into the entry if valid, the new xid could
+	 * be invalid if the transaction will be applied by the leader itself
+	 * which means all the changes will be committed before processing next
+	 * transaction, so no need to be depended on.
+	 */
+	if (TransactionIdIsValid(new_depended_xid))
+		rientry->remote_xid = new_depended_xid;
+
+	/*
+	 * Remove the entry if the transaction has been committed and no new
+	 * dependency needs to be added.
+	 */
+	else if (!TransactionIdIsValid(rientry->remote_xid))
+	{
+		free_replica_identity_key(rientry->keydata);
+		replica_identity_delete_item(replica_identity_table, rientry);
+	}
+}
+
+/*
+ * Check for preceding transactions that involve insert, delete, or update
+ * operations on the specified table, and return them in 'depends_on_xids'.
+ */
+static void
+find_all_dependencies_on_rel(LogicalRepRelId relid, TransactionId new_depended_xid,
+							 List **depends_on_xids)
+{
+	replica_identity_iterator i;
+	ReplicaIdentityEntry *rientry;
+
+	Assert(depends_on_xids);
+
+	replica_identity_start_iterate(replica_identity_table, &i);
+	while ((rientry = replica_identity_iterate(replica_identity_table, &i)) != NULL)
+	{
+		Assert(TransactionIdIsValid(rientry->remote_xid));
+
+		if (rientry->keydata->relid != relid)
+			continue;
+
+		/* Clean up the hash entry for committed transaction while on it */
+		if (pa_transaction_committed(rientry->remote_xid))
+		{
+			free_replica_identity_key(rientry->keydata);
+			replica_identity_delete_item(replica_identity_table, rientry);
+
+			continue;
+		}
+
+		*depends_on_xids = check_and_append_xid_dependency(*depends_on_xids,
+														   &rientry->remote_xid,
+														   new_depended_xid);
+	}
+}
+
+/*
+ * Check for any preceding transactions that affect the given table and returns
+ * them in 'depends_on_xids'.
+ */
+static void
+check_dependency_on_rel(LogicalRepRelId relid, TransactionId new_depended_xid,
+						List **depends_on_xids)
+{
+	LogicalRepRelMapEntry *relentry;
+
+	Assert(depends_on_xids);
+
+	find_all_dependencies_on_rel(relid, new_depended_xid, depends_on_xids);
+
+	/* Search for existing entry */
+	relentry = logicalrep_get_relentry(relid);
+
+	/*
+	 * The relentry has not been initialized yet, indicating no change has
+	 * been applide yet.
+	 */
+	if (!relentry)
+		return;
+
+	*depends_on_xids = check_and_append_xid_dependency(*depends_on_xids,
+													   &relentry->last_depended_xid,
+													   new_depended_xid);
+
+	if (TransactionIdIsValid(new_depended_xid))
+		relentry->last_depended_xid = new_depended_xid;
+}
+
+/*
+ * Check dependencies related to the current change by determining if the
+ * modification impacts the same row or table as another ongoing transaction. If
+ * needed, instruct parallel apply workers to wait for these preceding
+ * transactions to complete.
+ *
+ * Simultaneously, track the dependency for the current change to ensure that
+ * subsequent transactions address this dependency.
+ */
+static void
+handle_dependency_on_change(LogicalRepMsgType action, StringInfo s,
+							TransactionId new_depended_xid,
+							ParallelApplyWorkerInfo *winfo)
+{
+	LogicalRepRelId relid;
+	LogicalRepTupleData oldtup;
+	LogicalRepTupleData newtup;
+	LogicalRepRelation *rel;
+	List	   *depends_on_xids = NIL;
+	List	   *remote_relids;
+	bool		has_oldtup = false;
+	bool		cascade = false;
+	bool		restart_seqs = false;
+	StringInfoData dependencies;
+
+	/*
+	 * Parse the consume data using a local copy instead of directly consuming
+	 * the given remote change as the caller may also read the data from the
+	 * remote message.
+	 */
+	StringInfoData change = *s;
+
+	/* Compute dependency only for non-streaming transaction */
+	if (in_streamed_transaction || (winfo && winfo->stream_txn))
+		return;
+
+	/* Only the leader checks dependencies and schedules the parallel apply */
+	if (!am_leader_apply_worker())
+		return;
+
+	if (!replica_identity_table)
+		replica_identity_table = replica_identity_create(ApplyContext,
+														 REPLICA_IDENTITY_INITIAL_SIZE,
+														 NULL);
+
+	if (replica_identity_table->members >= REPLICA_IDENTITY_CLEANUP_THRESHOLD)
+		cleanup_committed_replica_identity_entries();
+
+	switch (action)
+	{
+		case LOGICAL_REP_MSG_INSERT:
+			relid = logicalrep_read_insert(&change, &newtup);
+			check_dependency_on_replica_identity(relid, &newtup,
+												 new_depended_xid,
+												 &depends_on_xids);
+			break;
+
+		case LOGICAL_REP_MSG_UPDATE:
+			relid = logicalrep_read_update(&change, &has_oldtup, &oldtup,
+										   &newtup);
+
+			if (has_oldtup)
+				check_dependency_on_replica_identity(relid, &oldtup,
+													 new_depended_xid,
+													 &depends_on_xids);
+
+			check_dependency_on_replica_identity(relid, &newtup,
+												 new_depended_xid,
+												 &depends_on_xids);
+			break;
+
+		case LOGICAL_REP_MSG_DELETE:
+			relid = logicalrep_read_delete(&change, &oldtup);
+			check_dependency_on_replica_identity(relid, &oldtup,
+												 new_depended_xid,
+												 &depends_on_xids);
+			break;
+
+		case LOGICAL_REP_MSG_TRUNCATE:
+			remote_relids = logicalrep_read_truncate(&change, &cascade,
+													 &restart_seqs);
+
+			/*
+			 * Truncate affects all rows in a table, so the current
+			 * transaction should wait for all preceding transactions that
+			 * modified the same table.
+			 */
+			foreach_int(truncated_relid, remote_relids)
+				check_dependency_on_rel(truncated_relid, new_depended_xid,
+										&depends_on_xids);
+
+			break;
+
+		case LOGICAL_REP_MSG_RELATION:
+			rel = logicalrep_read_rel(&change);
+
+			/*
+			 * The replica identity key could be changed, making existing
+			 * entries in the replica identity invalid. In this case, parallel
+			 * apply is not allowed on this specific table until all running
+			 * transactions that modified it have finished.
+			 */
+			check_dependency_on_rel(rel->remoteid, new_depended_xid,
+									&depends_on_xids);
+			break;
+
+		case LOGICAL_REP_MSG_TYPE:
+		case LOGICAL_REP_MSG_MESSAGE:
+
+			/*
+			 * Type updates accompany relation updates, so dependencies have
+			 * already been checked during relation updates. Logical messages
+			 * do not conflict with any changes, so they can be ignored.
+			 */
+			break;
+
+		default:
+			Assert(false);
+			break;
+	}
+
+	if (!depends_on_xids)
+		return;
+
+	/*
+	 * Notify the transactions that they are dependent on the current
+	 * transaction.
+	 */
+	pa_record_dependency_on_transactions(depends_on_xids);
+
+	/*
+	 * If the leader applies the transaction itself, start waiting for
+	 * transactions that depend on the current transaction immediately.
+	 */
+	if (winfo == NULL)
+	{
+		foreach_xid(xid, depends_on_xids)
+			pa_wait_for_depended_transaction(xid);
+
+		return;
+	}
+
+	initStringInfo(&dependencies);
+
+	/* Build the dependency message used to send to parallel apply worker */
+	write_internal_dependencies(&dependencies, depends_on_xids);
+
+	(void) send_internal_dependencies(winfo, &dependencies);
+}
+
+/*
+ * Write internal dependency information to the output for the parallel apply
+ * worker.
+ */
+static void
+write_internal_dependencies(StringInfo s, List *depends_on_xids)
+{
+	pq_sendbyte(s, PARALLEL_APPLY_INTERNAL_MESSAGE);
+	pq_sendbyte(s, LOGICAL_REP_MSG_INTERNAL_DEPENDENCY);
+	pq_sendint32(s, list_length(depends_on_xids));
+
+	foreach_xid(xid, depends_on_xids)
+		pq_sendint32(s, xid);
+}
+
 /*
  * Handle internal dependency information.
  *
@@ -826,7 +1410,10 @@ handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
 
 	/* not in streaming mode */
 	if (apply_action == TRANS_LEADER_APPLY)
+	{
+		handle_dependency_on_change(action, s, InvalidTransactionId, winfo);
 		return false;
+	}
 
 	Assert(TransactionIdIsValid(stream_xid));
 
@@ -1268,6 +1855,33 @@ apply_handle_begin(StringInfo s)
 	pgstat_report_activity(STATE_RUNNING, NULL);
 }
 
+/*
+ * Send an INTERNAL_DEPENDENCY message to a parallel apply worker.
+ *
+ * Returns false if we switched to the serialize mode to send the message,
+ * true otherwise.
+ */
+static bool
+send_internal_dependencies(ParallelApplyWorkerInfo *winfo, StringInfo s)
+{
+	Assert(s->data[0] == PARALLEL_APPLY_INTERNAL_MESSAGE);
+	Assert(s->data[1] == LOGICAL_REP_MSG_INTERNAL_DEPENDENCY);
+
+	if (!winfo->serialize_changes)
+	{
+		if (pa_send_data(winfo, s->len, s->data))
+			return true;
+
+		pa_switch_to_partial_serialize(winfo, true);
+	}
+
+	/* Skip writing the first internal message flag */
+	s->cursor++;
+	stream_write_change(LOGICAL_REP_MSG_INTERNAL_DEPENDENCY, s);
+
+	return false;
+}
+
 /*
  * Handle COMMIT message.
  *
@@ -1795,7 +2409,7 @@ apply_handle_stream_start(StringInfo s)
 
 	/* Try to allocate a worker for the streaming transaction. */
 	if (first_segment)
-		pa_allocate_worker(stream_xid);
+		pa_allocate_worker(stream_xid, true);
 
 	apply_action = get_transaction_apply_action(stream_xid, &winfo);
 
diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h
index 7a561a8e8d8..4b321bd2ad2 100644
--- a/src/include/replication/logicalrelation.h
+++ b/src/include/replication/logicalrelation.h
@@ -37,6 +37,8 @@ typedef struct LogicalRepRelMapEntry
 	/* Sync state. */
 	char		state;
 	XLogRecPtr	statelsn;
+
+	TransactionId last_depended_xid;
 } LogicalRepRelMapEntry;
 
 extern void logicalrep_relmap_update(LogicalRepRelation *remoterel);
@@ -50,5 +52,6 @@ extern void logicalrep_rel_close(LogicalRepRelMapEntry *rel,
 								 LOCKMODE lockmode);
 extern bool IsIndexUsableForReplicaIdentityFull(Relation idxrel, AttrMap *attrmap);
 extern Oid	GetRelationIdentityOrPK(Relation rel);
+extern LogicalRepRelMapEntry *logicalrep_get_relentry(LogicalRepRelId remoteid);
 
 #endif							/* LOGICALRELATION_H */
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index ddcdcc05053..78b5667cebe 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -235,6 +235,8 @@ typedef struct ParallelApplyWorkerInfo
 	 */
 	bool		in_use;
 
+	bool		stream_txn;
+
 	ParallelApplyWorkerShared *shared;
 } ParallelApplyWorkerInfo;
 
@@ -332,8 +334,10 @@ extern void apply_error_callback(void *arg);
 extern void set_apply_error_context_origin(char *originname);
 
 /* Parallel apply worker setup and interactions */
-extern void pa_allocate_worker(TransactionId xid);
+extern void pa_allocate_worker(TransactionId xid, bool stream_txn);
 extern ParallelApplyWorkerInfo *pa_find_worker(TransactionId xid);
+extern XLogRecPtr pa_get_last_commit_end(TransactionId xid, bool delete_entry,
+										 bool *skipped_write);
 extern void pa_detach_all_error_mq(void);
 
 extern bool pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes,
@@ -362,6 +366,8 @@ extern void pa_decr_and_wait_stream_block(void);
 
 extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
 						   XLogRecPtr remote_lsn);
+extern bool pa_transaction_committed(TransactionId xid);
+extern void pa_record_dependency_on_transactions(List *depends_on_xids);
 
 extern void pa_wait_for_depended_transaction(TransactionId xid);
 
-- 
2.47.3

