From 15d24048224f75cfa083a4874e1666da509d7f01 Mon Sep 17 00:00:00 2001
From: Zhijie Hou <houzj.fnst@fujitsu.com>
Date: Fri, 8 Aug 2025 11:35:59 +0800
Subject: [PATCH v1] Parallel apply non-streaming transactions

--
Basic design
--

The leader worker assigns each non-streaming transaction to a parallel apply
worker. Before dispatching changes to a parallel worker, the leader verifies if
the current modification affects the same row (identitied by replica identity
key) as another ongoing transaction. If so, the leader sends a list of dependent
transaction IDs to the parallel worker, indicating that the parallel apply
worker must wait for these transactions to commit before proceeding. Parallel
apply workers do not maintain commit order; transactions can be committed at any
time provided there are no dependencies.

Each parallel apply worker records the local end LSN of the transaction it
applies in shared memory. Subsequently, the leader gathers these local end LSNs
and logs them in the local 'lsn_mapping' for verifying whether they have been
flushed to disk (following the logic in get_flush_position()).

If no parallel apply worker is available, the leader will apply the transaction
independently.

For further details, please refer to the following:

--
dedendency tracking
--

The leader maintains a local hash table, using the remote change's replica
identity column values and relid as keys, with remote transaction IDs as values.
Before sending changes to the parallel apply worker, the leader computes a hash
using RI key values and the relid of the current change to search the hash
table. If an existing entry is found, the leader tells the parallel worker
to wait for the remote xid in the hash entry, after which the leader updates the
hash entry with the current xid.

If the remote relation lacks a replica identity (RI), it indicates that only
INSERT can be replicated for this table. In such cases, the leader skips
dependency checks, allowing the parallel apply worker to proceed with applying
changes without delay. This is because the only potential conflict could happen
is related to the local unique key or foreign key, which that is yet to be
implemented (see TODO - dependency on local unique key, foreign key.).

In cases of TRUNCATE or remote schema changes affecting the entire table, the
leader retrieves all remote xids touching the same table (via sequential scans
of the hash table) and tells the parallel worker to wait for those transactions
to commit.

Hash entries are cleaned up once the transaction corresponding to the remote xid
in the entry has been committed. Clean-up typically occurs when collecting the
flush position of each transaction, but is forced if the hash table exceeds a
set threshold.

--
dedendency waiting
--

If a transaction is relied upon by others, the leader adds its xid to a shared
hash table. The shared hash table entry is cleared by the parallel apply worker
upon completing the transaction. Workers needing to wait for a transaction check
the shared hash table entry; if present, they lock the transaction ID (using
pa_lock_transaction). If absent, it indicates the transaction has been
committed, negating the need to wait.

--
TODO - error handling
--

If preceding transactions fail, and independent later transactions are already
applied, a mechanism is needed to skip already applied transactions upon
restart. One solution is to PREPARE transactions whose preceding ones remain
uncommitted, then COMMIT PREPARE once all preceding transactions finish. This
allows the worker to skip applied transactions by scanning prepared ones.

--
TODO - dependency on local unique key, foreign key.
--

A transaction could conflict with another if modifying the same unique key.
While current patches don't address conflicts involving unique or foreign keys,
tracking these dependencies might be needed.

--
TODO - user defined trigger and constraints.
--

It would be chanllege to check the dependency if the table has user defined
trigger or constraints. the most viable solution might be to disallow parallel
apply for relations whose triggers and constraints are not marked as
parallel-safe or immutable.

--
TODO - potiential improvement to use shared hash table for tracking dendpencies.
--

Instead of a local hash table, a shared hash table could track replica identity
key dependencies, allowing parallel apply workers to clean up entries. However,
this might increase contention, so need to research whether it's worth it.
---
 .../replication/logical/applyparallelworker.c | 554 ++++++++++-
 src/backend/replication/logical/proto.c       |  42 +
 src/backend/replication/logical/relation.c    |  55 ++
 src/backend/replication/logical/worker.c      | 869 +++++++++++++++++-
 .../utils/activity/wait_event_names.txt       |   1 +
 src/include/replication/logicalproto.h        |   4 +
 src/include/replication/logicalrelation.h     |   5 +
 src/include/replication/worker_internal.h     |  26 +-
 src/include/storage/lwlocklist.h              |   1 +
 src/test/subscription/t/010_truncate.pl       |   2 +-
 src/test/subscription/t/015_stream.pl         |   8 +-
 src/test/subscription/t/026_stats.pl          |   1 +
 src/test/subscription/t/027_nosuperuser.pl    |   1 +
 src/tools/pgindent/typedefs.list              |   4 +
 14 files changed, 1497 insertions(+), 76 deletions(-)

diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
index cd0e19176fd..f30d9b9bd8e 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -14,6 +14,9 @@
  * ParallelApplyWorkerInfo which is required so the leader worker and parallel
  * apply workers can communicate with each other.
  *
+ * Streaming transactions
+ * ======================
+ *
  * The parallel apply workers are assigned (if available) as soon as xact's
  * first stream is received for subscriptions that have set their 'streaming'
  * option as parallel. The leader apply worker will send changes to this new
@@ -146,6 +149,23 @@
  * which will detect deadlock if any. See pa_send_data() and
  * enum TransApplyAction.
  *
+ *
+ * Non-streaming transactions
+ * ======================
+ * The handling is similar to streaming transactions, but including few
+ * differences:
+ *
+ * Transaction dependency
+ * -------------------------------
+ * Before dispatching changes to a parallel worker, the leader verifies if the
+ * current modification affects the same row (identitied by replica identity
+ * key) as another ongoing transaction (see handle_dependency_on_change for
+ * details). If so, the leader sends a list of dependent transaction IDs to the
+ * parallel worker, indicating that the parallel apply worker must wait for
+ * these transactions to commit before proceeding. Parallel apply workers do not
+ * maintain commit order; transactions can be committed at any time provided
+ * there are no dependencies.
+ *
  * Lock types
  * ----------
  * Both the stream lock and the transaction lock mentioned above are
@@ -216,14 +236,38 @@ typedef struct ParallelApplyWorkerEntry
 {
 	TransactionId xid;			/* Hash key -- must be first */
 	ParallelApplyWorkerInfo *winfo;
+	XLogRecPtr	local_end;
 } ParallelApplyWorkerEntry;
 
+/* an entry in the parallelized_txns shared hash table */
+typedef struct ParallelizedTxnEntry
+{
+	TransactionId xid;			/* Hash key */
+} ParallelizedTxnEntry;
+
 /*
  * A hash table used to cache the state of streaming transactions being applied
  * by the parallel apply workers.
  */
 static HTAB *ParallelApplyTxnHash = NULL;
 
+/*
+ * A hash table used to track the parallelized transactions that could be
+ * depended on by other transactions.
+ */
+static dsa_area *parallel_apply_dsa_area = NULL;
+static dshash_table *parallelized_txns = NULL;
+
+/* parameters for the parallelized_txns shared hash table */
+static const dshash_parameters dsh_params = {
+	sizeof(TransactionId),
+	sizeof(ParallelizedTxnEntry),
+	dshash_memcmp,
+	dshash_memhash,
+	dshash_memcpy,
+	LWTRANCHE_PARALLEL_APPLY_DSA
+};
+
 /*
 * A list (pool) of active parallel apply workers. The information for
 * the new worker is added to the list after successfully launching it. The
@@ -257,6 +301,9 @@ static List *subxactlist = NIL;
 static void pa_free_worker_info(ParallelApplyWorkerInfo *winfo);
 static ParallelTransState pa_get_xact_state(ParallelApplyWorkerShared *wshared);
 static PartialFileSetState pa_get_fileset_state(void);
+static void pa_attach_parallelized_txn_hash(dsa_handle *pa_dsa_handle,
+											dshash_table_handle *pa_dshash_handle);
+static void write_internal_relation(StringInfo s, LogicalRepRelation *rel);
 
 /*
  * Returns true if it is OK to start a parallel apply worker, false otherwise.
@@ -334,6 +381,15 @@ pa_setup_dsm(ParallelApplyWorkerInfo *winfo)
 	shm_mq	   *mq;
 	Size		queue_size = DSM_QUEUE_SIZE;
 	Size		error_queue_size = DSM_ERROR_QUEUE_SIZE;
+	dsa_handle	parallel_apply_dsa_handle;
+	dshash_table_handle parallelized_txns_handle;
+
+	pa_attach_parallelized_txn_hash(&parallel_apply_dsa_handle,
+									&parallelized_txns_handle);
+
+	if (parallel_apply_dsa_handle == DSA_HANDLE_INVALID ||
+		parallelized_txns_handle == DSHASH_HANDLE_INVALID)
+		return false;
 
 	/*
 	 * Estimate how much shared memory we need.
@@ -364,11 +420,14 @@ pa_setup_dsm(ParallelApplyWorkerInfo *winfo)
 	/* Set up the header region. */
 	shared = shm_toc_allocate(toc, sizeof(ParallelApplyWorkerShared));
 	SpinLockInit(&shared->mutex);
-
+	shared->xid = InvalidTransactionId;
 	shared->xact_state = PARALLEL_TRANS_UNKNOWN;
 	pg_atomic_init_u32(&(shared->pending_stream_count), 0);
 	shared->last_commit_end = InvalidXLogRecPtr;
 	shared->fileset_state = FS_EMPTY;
+	shared->parallel_apply_dsa_handle = parallel_apply_dsa_handle;
+	shared->parallelized_txns_handle = parallelized_txns_handle;
+	shared->has_dependent_txn = false;
 
 	shm_toc_insert(toc, PARALLEL_APPLY_KEY_SHARED, shared);
 
@@ -406,6 +465,8 @@ pa_launch_parallel_worker(void)
 	MemoryContext oldcontext;
 	bool		launched;
 	ParallelApplyWorkerInfo *winfo;
+	dsa_handle	pa_dsa_handle;
+	dshash_table_handle pa_dshash_handle;
 	ListCell   *lc;
 
 	/* Try to get an available parallel apply worker from the worker pool. */
@@ -413,10 +474,33 @@ pa_launch_parallel_worker(void)
 	{
 		winfo = (ParallelApplyWorkerInfo *) lfirst(lc);
 
-		if (!winfo->in_use)
+		if (!winfo->stream_txn &&
+			pa_get_xact_state(winfo->shared) == PARALLEL_TRANS_FINISHED)
+		{
+			/*
+			 * Save the local commit LSN of the last transaction applied by this
+			 * worker before reusing it for another transaction. This WAL
+			 * position is crucial for determining the flush position in
+			 * responses to the publisher (see get_flush_position()).
+			 */
+			(void) pa_get_last_commit_end(winfo->shared->xid, false, NULL);
+			return winfo;
+		}
+
+		if (winfo->stream_txn && !winfo->in_use)
 			return winfo;
 	}
 
+	pa_attach_parallelized_txn_hash(&pa_dsa_handle, &pa_dshash_handle);
+
+	/*
+	 * Return if the number of parallel apply workers has reached the maximum
+	 * limit.
+	 */
+	if (list_length(ParallelApplyWorkerPool) ==
+		max_parallel_apply_workers_per_subscription)
+		return NULL;
+
 	/*
 	 * Start a new parallel apply worker.
 	 *
@@ -444,18 +528,31 @@ pa_launch_parallel_worker(void)
 										dsm_segment_handle(winfo->dsm_seg),
 										false);
 
-	if (launched)
-	{
-		ParallelApplyWorkerPool = lappend(ParallelApplyWorkerPool, winfo);
-	}
-	else
+	if (!launched)
 	{
+		MemoryContextSwitchTo(oldcontext);
 		pa_free_worker_info(winfo);
-		winfo = NULL;
+		return NULL;
 	}
 
+	ParallelApplyWorkerPool = lappend(ParallelApplyWorkerPool, winfo);
+
 	MemoryContextSwitchTo(oldcontext);
 
+	/*
+	 * Send all existing remote relation information to the parallel apply
+	 * worker. This allows the parallel worker to initialize the
+	 * LogicalRepRelMapEntry locally before applying remote changes.
+	 */
+	if (logicalrep_get_num_rels())
+	{
+		StringInfoData out;
+		initStringInfo(&out);
+
+		write_internal_relation(&out, NULL);
+		pa_send_data(winfo, out.len, out.data);
+	}
+
 	return winfo;
 }
 
@@ -468,7 +565,7 @@ pa_launch_parallel_worker(void)
  * streaming changes.
  */
 void
-pa_allocate_worker(TransactionId xid)
+pa_allocate_worker(TransactionId xid, bool stream_txn)
 {
 	bool		found;
 	ParallelApplyWorkerInfo *winfo = NULL;
@@ -505,11 +602,14 @@ pa_allocate_worker(TransactionId xid)
 	SpinLockAcquire(&winfo->shared->mutex);
 	winfo->shared->xact_state = PARALLEL_TRANS_UNKNOWN;
 	winfo->shared->xid = xid;
+	winfo->shared->has_dependent_txn = false;
 	SpinLockRelease(&winfo->shared->mutex);
 
 	winfo->in_use = true;
 	winfo->serialize_changes = false;
+	winfo->stream_txn = stream_txn;
 	entry->winfo = winfo;
+	entry->local_end = InvalidXLogRecPtr;
 }
 
 /*
@@ -558,7 +658,8 @@ pa_free_worker(ParallelApplyWorkerInfo *winfo)
 {
 	Assert(!am_parallel_apply_worker());
 	Assert(winfo->in_use);
-	Assert(pa_get_xact_state(winfo->shared) == PARALLEL_TRANS_FINISHED);
+	Assert(!winfo->stream_txn ||
+		   pa_get_xact_state(winfo->shared) == PARALLEL_TRANS_FINISHED);
 
 	if (!hash_search(ParallelApplyTxnHash, &winfo->shared->xid, HASH_REMOVE, NULL))
 		elog(ERROR, "hash table corrupted");
@@ -574,9 +675,7 @@ pa_free_worker(ParallelApplyWorkerInfo *winfo)
 	 * been serialized and then letting the parallel apply worker deal with
 	 * the spurious message, we stop the worker.
 	 */
-	if (winfo->serialize_changes ||
-		list_length(ParallelApplyWorkerPool) >
-		(max_parallel_apply_workers_per_subscription / 2))
+	if (winfo->serialize_changes)
 	{
 		logicalrep_pa_worker_stop(winfo);
 		pa_free_worker_info(winfo);
@@ -706,6 +805,105 @@ pa_process_spooled_messages_if_required(void)
 	return true;
 }
 
+/*
+ * Get the local end LSN for a transaction applied by a parallel apply worker.
+ *
+ * Set delete_entry to true if you intend to remove the transaction from the
+ * ParallelApplyTxnHash after collecting its LSN.
+ *
+ * If the parallel apply worker did not write any changes during the transaction
+ * application due to situations like update/delete_missing or a before trigger,
+ * the *skipped_write will be set to true.
+ */
+XLogRecPtr
+pa_get_last_commit_end(TransactionId xid, bool delete_entry, bool *skipped_write)
+{
+	bool		found;
+	ParallelApplyWorkerEntry *entry;
+	ParallelApplyWorkerInfo *winfo;
+
+	Assert(TransactionIdIsValid(xid));
+
+	if (skipped_write)
+		*skipped_write = false;
+
+	/* Find an entry for the requested transaction. */
+	entry = hash_search(ParallelApplyTxnHash, &xid, HASH_FIND, &found);
+
+	if (!found)
+		return InvalidXLogRecPtr;
+
+	/*
+	 * If worker info is NULL, it indicates that the worker has been reused for
+	 * handling other transactions. Consequently, the local end LSN has already
+	 * been collected and saved in entry->local_end.
+	 */
+	winfo = entry->winfo;
+	if (winfo == NULL)
+	{
+		if (delete_entry &&
+			!hash_search(ParallelApplyTxnHash, &xid, HASH_REMOVE, NULL))
+			elog(ERROR, "hash table corrupted");
+
+		if (skipped_write)
+			*skipped_write = XLogRecPtrIsInvalid(entry->local_end);
+
+		return entry->local_end;
+	}
+
+	/* Return InvalidXLogRecPtr if the transaction is still in progress */
+	if (pa_get_xact_state(winfo->shared) != PARALLEL_TRANS_FINISHED)
+		return InvalidXLogRecPtr;
+
+	/* Collect the local end LSN from the worker's shared memory area */
+	entry->local_end = winfo->shared->last_commit_end;
+	entry->winfo = NULL;
+
+	if (skipped_write)
+		*skipped_write = XLogRecPtrIsInvalid(entry->local_end);
+
+	elog(DEBUG1, "store local commit %X/%X end to txn entry: %u",
+		 LSN_FORMAT_ARGS(entry->local_end), xid);
+
+	if (delete_entry &&
+		!hash_search(ParallelApplyTxnHash, &xid, HASH_REMOVE, NULL))
+		elog(ERROR, "hash table corrupted");
+
+	return entry->local_end;
+}
+
+/*
+ * Wait for the remote transaction associated with the specified remote xid to
+ * complete.
+ */
+static void
+pa_wait_for_transaction(TransactionId wait_for_xid)
+{
+	if (!am_leader_apply_worker())
+		return;
+
+	if (!TransactionIdIsValid(wait_for_xid))
+		return;
+
+	elog(DEBUG1, "plan to wait for remote_xid %u to finish",
+		 wait_for_xid);
+
+	for (;;)
+	{
+		if (pa_transaction_committed(wait_for_xid))
+			break;
+
+		pa_lock_transaction(wait_for_xid, AccessShareLock);
+		pa_unlock_transaction(wait_for_xid, AccessShareLock);
+
+		/* An interrupt may have occurred while we were waiting. */
+		CHECK_FOR_INTERRUPTS();
+	}
+
+	elog(DEBUG1, "finished wait for remote_xid %u to finish",
+		 wait_for_xid);
+}
+
 /*
  * Interrupt handler for main loop of parallel apply worker.
  */
@@ -781,21 +979,35 @@ LogicalParallelApplyLoop(shm_mq_handle *mqh)
 			 * parallel apply workers can only be PqReplMsg_WALData.
 			 */
 			c = pq_getmsgbyte(&s);
-			if (c != PqReplMsg_WALData)
-				elog(ERROR, "unexpected message \"%c\"", c);
 
-			/*
-			 * Ignore statistics fields that have been updated by the leader
-			 * apply worker.
-			 *
-			 * XXX We can avoid sending the statistics fields from the leader
-			 * apply worker but for that, it needs to rebuild the entire
-			 * message by removing these fields which could be more work than
-			 * simply ignoring these fields in the parallel apply worker.
-			 */
-			s.cursor += SIZE_STATS_MESSAGE;
+			if (c == PqReplMsg_WALData)
+			{
+				/*
+				 * Ignore statistics fields that have been updated by the
+				 * leader apply worker.
+				 *
+				 * XXX We can avoid sending the statistics fields from the
+				 * leader apply worker but for that, it needs to rebuild the
+				 * entire message by removing these fields which could be more
+				 * work than simply ignoring these fields in the parallel
+				 * apply worker.
+				 */
+				s.cursor += SIZE_STATS_MESSAGE;
 
-			apply_dispatch(&s);
+				apply_dispatch(&s);
+			}
+			else if (c == PARALLEL_APPLY_INTERNAL_MESSAGE)
+			{
+				apply_dispatch(&s);
+			}
+			else
+			{
+				/*
+				 * The first byte of messages sent from leader apply worker to
+				 * parallel apply workers can only be 'w' or 'i'.
+				 */
+				elog(ERROR, "unexpected message \"%c\"", c);
+			}
 		}
 		else if (shmq_res == SHM_MQ_WOULD_BLOCK)
 		{
@@ -812,6 +1024,9 @@ LogicalParallelApplyLoop(shm_mq_handle *mqh)
 
 				if (rc & WL_LATCH_SET)
 					ResetLatch(MyLatch);
+
+				if (!IsTransactionState())
+					pgstat_report_stat(true);
 			}
 		}
 		else
@@ -849,6 +1064,9 @@ pa_shutdown(int code, Datum arg)
 				   INVALID_PROC_NUMBER);
 
 	dsm_detach((dsm_segment *) DatumGetPointer(arg));
+
+	if (parallel_apply_dsa_area)
+		dsa_detach(parallel_apply_dsa_area);
 }
 
 /*
@@ -864,6 +1082,8 @@ ParallelApplyWorkerMain(Datum main_arg)
 	shm_mq	   *mq;
 	shm_mq_handle *mqh;
 	shm_mq_handle *error_mqh;
+	dsa_handle	pa_dsa_handle;
+	dshash_table_handle pa_dshash_handle;
 	RepOriginId originid;
 	int			worker_slot = DatumGetInt32(main_arg);
 	char		originname[NAMEDATALEN];
@@ -944,6 +1164,8 @@ ParallelApplyWorkerMain(Datum main_arg)
 
 	InitializingApplyWorker = false;
 
+	pa_attach_parallelized_txn_hash(&pa_dsa_handle, &pa_dshash_handle);
+
 	/* Setup replication origin tracking. */
 	StartTransactionCommand();
 	ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid,
@@ -1150,7 +1372,6 @@ pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
 	shm_mq_result result;
 	TimestampTz startTime = 0;
 
-	Assert(!IsTransactionState());
 	Assert(!winfo->serialize_changes);
 
 	/*
@@ -1202,6 +1423,67 @@ pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
 	}
 }
 
+/*
+ * Distribute remote relation information to all active parallel apply workers
+ * that require it.
+ */
+void
+pa_distribute_schema_changes_to_workers(LogicalRepRelation *rel)
+{
+	List	   *workers_stopped = NIL;
+	StringInfoData out;
+
+	if (!ParallelApplyWorkerPool)
+		return;
+
+	initStringInfo(&out);
+
+	write_internal_relation(&out, rel);
+
+	foreach_ptr(ParallelApplyWorkerInfo, winfo, ParallelApplyWorkerPool)
+	{
+		/*
+		 * Skip the worker responsible for the current transaction, as the
+		 * relation information has already been sent to it.
+		 */
+		if (winfo == stream_apply_worker)
+			continue;
+
+		/*
+		 * Skip the worker that is in serialize mode, as they will soon stop
+		 * once they finish applying the transaction.
+		 */
+		if (winfo->serialize_changes)
+			continue;
+
+		elog(DEBUG1, "distributing schema changes to pa workers");
+
+		if (pa_send_data(winfo, out.len, out.data))
+			continue;
+
+		elog(DEBUG1, "failed to distribute, will stop that worker instead");
+
+		/*
+		 * Distribution to this worker failed due to a sending timeout. Wait for
+		 * the worker to complete its transaction and then stop it. This is
+		 * consistent with the handling of workers in serialize mode (see
+		 * pa_free_worker() for details).
+		 */
+		pa_wait_for_transaction(winfo->shared->xid);
+
+		pa_get_last_commit_end(winfo->shared->xid, false, NULL);
+
+		logicalrep_pa_worker_stop(winfo);
+
+		workers_stopped = lappend(workers_stopped, winfo);
+	}
+
+	pfree(out.data);
+
+	foreach_ptr(ParallelApplyWorkerInfo, winfo, workers_stopped)
+		pa_free_worker_info(winfo);
+}
+
 /*
  * Switch to PARTIAL_SERIALIZE mode for the current transaction -- this means
  * that the current data and any subsequent data for this transaction will be
@@ -1284,8 +1566,8 @@ pa_wait_for_xact_finish(ParallelApplyWorkerInfo *winfo)
 
 	/*
 	 * Wait for the transaction lock to be released. This is required to
-	 * detect deadlock among leader and parallel apply workers. Refer to the
-	 * comments atop this file.
+	 * detect detect deadlock among leader and parallel apply workers. Refer
+	 * to the comments atop this file.
 	 */
 	pa_lock_transaction(winfo->shared->xid, AccessShareLock);
 	pa_unlock_transaction(winfo->shared->xid, AccessShareLock);
@@ -1299,6 +1581,7 @@ pa_wait_for_xact_finish(ParallelApplyWorkerInfo *winfo)
 		ereport(ERROR,
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 errmsg("lost connection to the logical replication parallel apply worker")));
+
 }
 
 /*
@@ -1362,6 +1645,9 @@ pa_savepoint_name(Oid suboid, TransactionId xid, char *spname, Size szsp)
 void
 pa_start_subtrans(TransactionId current_xid, TransactionId top_xid)
 {
+	if (!TransactionIdIsValid(top_xid))
+		return;
+
 	if (current_xid != top_xid &&
 		!list_member_xid(subxactlist, current_xid))
 	{
@@ -1618,23 +1904,215 @@ pa_decr_and_wait_stream_block(void)
 void
 pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn)
 {
+	XLogRecPtr	local_lsn = InvalidXLogRecPtr;
+	TransactionId pa_remote_xid = winfo->shared->xid;
+
 	Assert(am_leader_apply_worker());
 
 	/*
-	 * Unlock the shared object lock so that parallel apply worker can
-	 * continue to receive and apply changes.
+	 * Unlock the shared object lock taken for streaming transactions so that
+	 * parallel apply worker can continue to receive and apply changes.
 	 */
-	pa_unlock_stream(winfo->shared->xid, AccessExclusiveLock);
+	if (winfo->stream_txn)
+		pa_unlock_stream(winfo->shared->xid, AccessExclusiveLock);
 
 	/*
-	 * Wait for that worker to finish. This is necessary to maintain commit
-	 * order which avoids failures due to transaction dependencies and
-	 * deadlocks.
+	 * Wait for that worker for streaming transaction to finish. This is
+	 * necessary to maintain commit order which avoids failures due to
+	 * transaction dependencies and deadlocks.
+	 *
+	 * For non-streaming transaction but in partial seralize mode, wait for stop
+	 * as well as the worker is anyway cannot be reused anymore (see
+	 * pa_free_worker() for details).
 	 */
-	pa_wait_for_xact_finish(winfo);
+	if (winfo->serialize_changes || winfo->stream_txn)
+	{
+		pa_wait_for_xact_finish(winfo);
+
+		local_lsn = winfo->shared->last_commit_end;
+		pa_remote_xid = InvalidTransactionId;
+
+		pa_free_worker(winfo);
+	}
 
 	if (!XLogRecPtrIsInvalid(remote_lsn))
-		store_flush_position(remote_lsn, winfo->shared->last_commit_end);
+		store_flush_position(remote_lsn, local_lsn, pa_remote_xid);
 
-	pa_free_worker(winfo);
+	pa_set_stream_apply_worker(NULL);
+}
+
+bool
+pa_transaction_committed(TransactionId xid)
+{
+	bool		found;
+	ParallelApplyWorkerEntry *entry;
+
+	Assert(TransactionIdIsValid(xid));
+
+	/* Find an entry for the requested transaction */
+	entry = hash_search(ParallelApplyTxnHash, &xid, HASH_FIND, &found);
+
+	if (!found)
+		return true;
+
+	if (!entry->winfo)
+		return true;
+
+	return pa_get_xact_state(entry->winfo->shared) == PARALLEL_TRANS_FINISHED;
+}
+
+/*
+ * Attach to the shared hash table for parallelized transactions.
+ */
+static void
+pa_attach_parallelized_txn_hash(dsa_handle *pa_dsa_handle,
+								dshash_table_handle *pa_dshash_handle)
+{
+	MemoryContext oldctx;
+
+	if (parallelized_txns)
+	{
+		Assert(parallel_apply_dsa_area);
+		*pa_dsa_handle = dsa_get_handle(parallel_apply_dsa_area);
+		*pa_dshash_handle = dshash_get_hash_table_handle(parallelized_txns);
+		return;
+	}
+
+	/* Be sure any local memory allocated by DSA routines is persistent. */
+	oldctx = MemoryContextSwitchTo(ApplyContext);
+
+	if (am_leader_apply_worker())
+	{
+		/* Initialize dynamic shared hash table for last-start times. */
+		parallel_apply_dsa_area = dsa_create(LWTRANCHE_PARALLEL_APPLY_DSA);
+		dsa_pin(parallel_apply_dsa_area);
+		dsa_pin_mapping(parallel_apply_dsa_area);
+		parallelized_txns = dshash_create(parallel_apply_dsa_area, &dsh_params, NULL);
+
+		/* Store handles in shared memory for other backends to use. */
+		*pa_dsa_handle = dsa_get_handle(parallel_apply_dsa_area);
+		*pa_dshash_handle = dshash_get_hash_table_handle(parallelized_txns);
+	}
+	else if (am_parallel_apply_worker())
+	{
+		/* Attach to existing dynamic shared hash table. */
+		parallel_apply_dsa_area = dsa_attach(MyParallelShared->parallel_apply_dsa_handle);
+		dsa_pin_mapping(parallel_apply_dsa_area);
+		parallelized_txns = dshash_attach(parallel_apply_dsa_area, &dsh_params,
+										  MyParallelShared->parallelized_txns_handle,
+										  NULL);
+	}
+
+	MemoryContextSwitchTo(oldctx);
+}
+
+/*
+ * Record in-progress transactions from the given list that are being depended
+ * on into the shared hash table.
+ */
+void
+pa_record_dependency_on_transactions(List *depends_on_xids)
+{
+	foreach_xid(xid, depends_on_xids)
+	{
+		bool		found;
+		ParallelApplyWorkerEntry *winfo_entry;
+		ParallelApplyWorkerInfo *winfo;
+		ParallelizedTxnEntry *txn_entry;
+
+		winfo_entry = hash_search(ParallelApplyTxnHash, &xid, HASH_FIND, &found);
+		winfo = winfo_entry->winfo;
+
+		if (winfo->shared->has_dependent_txn)
+			continue;
+
+		txn_entry = dshash_find_or_insert(parallelized_txns, &xid, &found);
+
+		if (found)
+			elog(ERROR, "hash table corrupted");
+
+		winfo->shared->has_dependent_txn = true;
+
+		/*
+		 * If the transaction has been committed now, remove the entry,
+		 * otherwise the parallel apply worker will remove the entry once
+		 * committed the transaction.
+		 */
+		if (pa_get_xact_state(winfo->shared) == PARALLEL_TRANS_FINISHED)
+			dshash_delete_entry(parallelized_txns, txn_entry);
+		else
+			dshash_release_lock(parallelized_txns, txn_entry);
+	}
+}
+
+/*
+ * Mark the transaction state as finished and remove the shared hash entry if
+ * there are dependent transactions waiting for this transaction to complete.
+ */
+void
+pa_commit_transaction(void)
+{
+	TransactionId xid = MyParallelShared->xid;
+	bool		has_dependent_txn;
+
+	SpinLockAcquire(&MyParallelShared->mutex);
+	MyParallelShared->xact_state = PARALLEL_TRANS_FINISHED;
+	has_dependent_txn = MyParallelShared->has_dependent_txn;
+	SpinLockRelease(&MyParallelShared->mutex);
+
+	if (!has_dependent_txn)
+		return;
+
+	dshash_delete_key(parallelized_txns, &xid);
+	elog(DEBUG1, "depended xid %u committed", xid);
+}
+
+/*
+ * Wait for the given transaction to finish.
+ */
+void
+pa_wait_for_depended_transaction(TransactionId xid)
+{
+	elog(DEBUG1, "wait for depended xid %u", xid);
+
+	for (;;)
+	{
+		ParallelizedTxnEntry *txn_entry;
+
+		txn_entry = dshash_find(parallelized_txns, &xid, false);
+
+		/* The entry is removed only if the transaction is committed */
+		if (txn_entry == NULL)
+			break;
+
+		dshash_release_lock(parallelized_txns, txn_entry);
+
+		pa_lock_transaction(xid, AccessShareLock);
+		pa_unlock_transaction(xid, AccessShareLock);
+
+		CHECK_FOR_INTERRUPTS();
+	}
+
+	elog(DEBUG1, "finish waiting for depended xid %u", xid);
+}
+
+/*
+ * Write internal relation description to the output stream.
+ */
+static void
+write_internal_relation(StringInfo s, LogicalRepRelation *rel)
+{
+	pq_sendbyte(s, PARALLEL_APPLY_INTERNAL_MESSAGE);
+	pq_sendbyte(s, LOGICAL_REP_MSG_INTERNAL_RELATION);
+
+	if (rel)
+	{
+		pq_sendint(s, 1, 4);
+		logicalrep_write_internal_rel(s, rel);
+	}
+	else
+	{
+		pq_sendint(s, logicalrep_get_num_rels(), 4);
+		logicalrep_write_all_rels(s);
+	}
 }
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 1b3d9eb49dd..de6c1e930e4 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -691,6 +691,44 @@ logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel,
 	logicalrep_write_attrs(out, rel, columns, include_gencols_type);
 }
 
+/*
+ * Write internal relation description to the output stream.
+ */
+void
+logicalrep_write_internal_rel(StringInfo out, LogicalRepRelation *rel)
+{
+	pq_sendint32(out, rel->remoteid);
+
+	/* Write relation name */
+	pq_sendstring(out, rel->nspname);
+	pq_sendstring(out, rel->relname);
+
+	/* Write the replica identity. */
+	pq_sendbyte(out, rel->replident);
+
+	/* Write attribute description */
+	pq_sendint16(out, rel->natts);
+
+	for (int i = 0; i < rel->natts; i++)
+	{
+		uint8		flags = 0;
+
+		if (bms_is_member(i, rel->attkeys))
+			flags |= LOGICALREP_IS_REPLICA_IDENTITY;
+
+		pq_sendbyte(out, flags);
+
+		/* attribute name */
+		pq_sendstring(out, rel->attnames[i]);
+
+		/* attribute type id */
+		pq_sendint32(out, rel->atttyps[i]);
+
+		/* ignore attribute mode for now */
+		pq_sendint32(out, 0);
+	}
+}
+
 /*
  * Read the relation info from stream and return as LogicalRepRelation.
  */
@@ -1250,6 +1288,10 @@ logicalrep_message_type(LogicalRepMsgType action)
 			return "STREAM ABORT";
 		case LOGICAL_REP_MSG_STREAM_PREPARE:
 			return "STREAM PREPARE";
+		case LOGICAL_REP_MSG_INTERNAL_DEPENDENCY:
+			return "INTERNAL DEPENDENCY";
+		case LOGICAL_REP_MSG_INTERNAL_RELATION:
+			return "INTERNAL RELATION";
 	}
 
 	/*
diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c
index f59046ad620..2e15f8e69b0 100644
--- a/src/backend/replication/logical/relation.c
+++ b/src/backend/replication/logical/relation.c
@@ -946,3 +946,58 @@ FindLogicalRepLocalIndex(Relation localrel, LogicalRepRelation *remoterel,
 
 	return InvalidOid;
 }
+
+/*
+ * Get the number of entries in the LogicalRepRelMap.
+ */
+int
+logicalrep_get_num_rels(void)
+{
+	if (LogicalRepRelMap == NULL)
+		return 0;
+
+	return hash_get_num_entries(LogicalRepRelMap);
+}
+
+/*
+ * Write all the remote relation information from the LogicalRepRelMapEntry to
+ * the output stream.
+ */
+void
+logicalrep_write_all_rels(StringInfo out)
+{
+	LogicalRepRelMapEntry *entry;
+	HASH_SEQ_STATUS status;
+
+	if (LogicalRepRelMap == NULL)
+		return;
+
+	hash_seq_init(&status, LogicalRepRelMap);
+
+	while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL)
+		logicalrep_write_internal_rel(out, &entry->remoterel);
+}
+
+/*
+ * Get the LogicalRepRelMapEntry corresponding to the given relid without
+ * opening the local relation.
+ */
+LogicalRepRelMapEntry *
+logicalrep_get_relentry(LogicalRepRelId remoteid)
+{
+	LogicalRepRelMapEntry *entry;
+	bool		found;
+
+	if (LogicalRepRelMap == NULL)
+		logicalrep_relmap_init();
+
+	/* Search for existing entry. */
+	entry = hash_search(LogicalRepRelMap, (void *) &remoteid,
+						HASH_FIND, &found);
+
+	if (!found)
+		elog(DEBUG1, "no relation map entry for remote relation ID %u",
+			 remoteid);
+
+	return entry;
+}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 0fdc5de57ba..11726b691fa 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -287,6 +287,7 @@ typedef struct FlushPosition
 	dlist_node	node;
 	XLogRecPtr	local_end;
 	XLogRecPtr	remote_end;
+	TransactionId pa_remote_xid;
 } FlushPosition;
 
 static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping);
@@ -462,6 +463,7 @@ static List *on_commit_wakeup_workers_subids = NIL;
 
 bool		in_remote_transaction = false;
 static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
+static TransactionId remote_xid = InvalidTransactionId;
 
 /* fields valid only when processing streamed transaction */
 static bool in_streamed_transaction = false;
@@ -523,6 +525,49 @@ typedef struct ApplySubXactData
 
 static ApplySubXactData subxact_data = {0, 0, InvalidTransactionId, NULL};
 
+typedef struct ReplicaIdentityKey
+{
+	Oid			relid;
+	LogicalRepTupleData *data;
+} ReplicaIdentityKey;
+
+typedef struct ReplicaIdentityEntry
+{
+	ReplicaIdentityKey *keydata;
+	TransactionId remote_xid;
+
+	/* needed for simplehash */
+	uint32		hash;
+	char		status;
+} ReplicaIdentityEntry;
+
+#include "common/hashfn.h"
+
+static uint32 hash_replica_identity(ReplicaIdentityKey *key);
+static bool hash_replica_identity_compare(ReplicaIdentityKey *a,
+										  ReplicaIdentityKey *b);
+
+/* Define parameters for replica identity hash table code generation. */
+#define SH_PREFIX		replica_identity
+#define SH_ELEMENT_TYPE	ReplicaIdentityEntry
+#define SH_KEY_TYPE		ReplicaIdentityKey *
+#define	SH_KEY			keydata
+#define SH_HASH_KEY(tb, key)	hash_replica_identity(key)
+#define SH_EQUAL(tb, a, b)		hash_replica_identity_compare(a, b)
+#define SH_STORE_HASH
+#define SH_GET_HASH(tb, a) (a)->hash
+#define	SH_SCOPE		static inline
+#define SH_DECLARE
+#define SH_DEFINE
+#include "lib/simplehash.h"
+
+#define REPLICA_IDENTITY_INITIAL_SIZE 128
+#define REPLICA_IDENTITY_CLEANUP_THRESHOLD 1024
+
+static replica_identity_hash *replica_identity_table = NULL;
+
+static void write_internal_dependencies(StringInfo s, List *depends_on_xids);
+
 static inline void subxact_filename(char *path, Oid subid, TransactionId xid);
 static inline void changes_filename(char *path, Oid subid, TransactionId xid);
 
@@ -537,11 +582,7 @@ static inline void cleanup_subxact_info(void);
 /*
  * Serialize and deserialize changes for a toplevel transaction.
  */
-static void stream_open_file(Oid subid, TransactionId xid,
-							 bool first_segment);
 static void stream_write_change(char action, StringInfo s);
-static void stream_open_and_write_change(TransactionId xid, char action, StringInfo s);
-static void stream_close_file(void);
 
 static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
 
@@ -601,6 +642,595 @@ static TransApplyAction get_transaction_apply_action(TransactionId xid,
 
 static void replorigin_reset(int code, Datum arg);
 
+/*
+ * Compute the hash value for entries in the replica_identity_table.
+ */
+static uint32
+hash_replica_identity(ReplicaIdentityKey *key)
+{
+	int			i;
+	uint32		hashkey = 0;
+
+	hashkey = hash_combine(hashkey, hash_uint32(key->relid));
+
+	for (i = 0; i < key->data->ncols; i++)
+	{
+		uint32		hkey;
+
+		if (key->data->colstatus[i] == LOGICALREP_COLUMN_NULL)
+			continue;
+
+		hkey = hash_any((const unsigned char *) key->data->colvalues[i].data,
+						key->data->colvalues[i].len);
+		hashkey = hash_combine(hashkey, hkey);
+	}
+
+	return hashkey;
+}
+
+/*
+ * Compare two entries in the replica_identity_table.
+ */
+static bool
+hash_replica_identity_compare(ReplicaIdentityKey *a, ReplicaIdentityKey *b)
+{
+	if (a->relid != b->relid ||
+		a->data->ncols != b->data->ncols)
+		return false;
+
+	for (int i = 0; i < a->data->ncols; i++)
+	{
+		if (a->data->colstatus[i] != b->data->colstatus[i])
+			return false;
+
+		if (a->data->colvalues[i].len != b->data->colvalues[i].len)
+			return false;
+
+		if (strcmp(a->data->colvalues[i].data, b->data->colvalues[i].data))
+			return false;
+
+		elog(DEBUG1, "conflicting key %s", a->data->colvalues[i].data);
+	}
+
+	return true;
+}
+
+/*
+ * Free resources associated with a replica identity key.
+ */
+static void
+free_replica_identity_key(ReplicaIdentityKey *key)
+{
+	Assert(key);
+
+	pfree(key->data->colvalues);
+	pfree(key->data->colstatus);
+	pfree(key->data);
+	pfree(key);
+}
+
+/*
+ * Clean up hash table entries associated with the given transaction IDs.
+ */
+static void
+cleanup_replica_identity_table(List *committed_xid)
+{
+	replica_identity_iterator i;
+	ReplicaIdentityEntry *rientry;
+
+	if (!committed_xid)
+		return;
+
+	replica_identity_start_iterate(replica_identity_table, &i);
+	while ((rientry = replica_identity_iterate(replica_identity_table, &i)) != NULL)
+	{
+		if (!list_member_xid(committed_xid, rientry->remote_xid))
+			continue;
+
+		/* Clean up the hash entry for committed transaction */
+		free_replica_identity_key(rientry->keydata);
+		replica_identity_delete_item(replica_identity_table, rientry);
+	}
+}
+
+/*
+ * Check committed transactions and clean up corresponding entries in the hash
+ * table.
+ */
+static void
+cleanup_committed_replica_identity_entries(void)
+{
+	dlist_mutable_iter iter;
+	List	   *committed_xids = NIL;
+
+	dlist_foreach_modify(iter, &lsn_mapping)
+	{
+		FlushPosition *pos =
+			dlist_container(FlushPosition, node, iter.cur);
+		bool		skipped_write;
+
+		if (!TransactionIdIsValid(pos->pa_remote_xid) ||
+			!XLogRecPtrIsInvalid(pos->local_end))
+			continue;
+
+		pos->local_end = pa_get_last_commit_end(pos->pa_remote_xid, true,
+												&skipped_write);
+
+		elog(DEBUG1,
+			 "got commit end from parallel apply worker, "
+			 "txn: %u, remote_end %X/%X, local_end %X/%X",
+			 pos->pa_remote_xid, LSN_FORMAT_ARGS(pos->remote_end),
+			 LSN_FORMAT_ARGS(pos->local_end));
+
+		if (!skipped_write && XLogRecPtrIsInvalid(pos->local_end))
+			continue;
+
+		committed_xids = lappend_xid(committed_xids, pos->pa_remote_xid);
+	}
+
+	/* cleanup the entries for committed transactions */
+	cleanup_replica_identity_table(committed_xids);
+}
+
+/*
+ * Append a transaction dependency, excluding duplicates and committed
+ * transactions.
+ */
+static List *
+check_and_append_xid_dependency(List *depends_on_xids,
+								TransactionId *depends_on_xid,
+								TransactionId current_xid)
+{
+	Assert(depends_on_xid);
+
+	if (!TransactionIdIsValid(*depends_on_xid))
+		return depends_on_xids;
+
+	if (TransactionIdEquals(*depends_on_xid, current_xid))
+		return depends_on_xids;
+
+	if (list_member_xid(depends_on_xids, *depends_on_xid))
+		return depends_on_xids;
+
+	/*
+	 * Return and reset the xid if the transaction has been committed.
+	 */
+	if (pa_transaction_committed(*depends_on_xid))
+	{
+		*depends_on_xid = InvalidTransactionId;
+		return depends_on_xids;
+	}
+
+	return lappend_xid(depends_on_xids, *depends_on_xid);
+}
+
+/*
+ * Check for dependencies on preceding transactions that modify the same key.
+ * Returns the dependent transactions in 'depends_on_xids' and records the
+ * current change.
+ */
+static void
+check_dependency_on_replica_identity(Oid relid,
+									 LogicalRepTupleData *original_data,
+									 TransactionId new_depended_xid,
+									 List **depends_on_xids)
+{
+	LogicalRepRelMapEntry *relentry;
+	LogicalRepTupleData *ridata;
+	ReplicaIdentityKey *rikey;
+	ReplicaIdentityEntry *rientry;
+	MemoryContext oldctx;
+	int			n_ri;
+	bool		found = false;
+
+	Assert(depends_on_xids);
+
+	/* Search for existing entry */
+	relentry = logicalrep_get_relentry(relid);
+
+	Assert(relentry);
+
+	/*
+	 * First search whether any previous transaction has affected the whole
+	 * table e.g., truncate or schema change from publisher.
+	 */
+	*depends_on_xids = check_and_append_xid_dependency(*depends_on_xids,
+													   &relentry->last_depended_xid,
+													   new_depended_xid);
+
+	n_ri = bms_num_members(relentry->remoterel.attkeys);
+
+	/*
+	 * Return if there are no replica identity columns, indicating that the
+	 * remote relation has neither a replica identity key nor is marked as
+	 * replica identity full.
+	 */
+	if (!n_ri)
+		return;
+
+	/* Check if the RI key value of the tuple is invalid */
+	for (int i = 0; i < original_data->ncols; i++)
+	{
+		if (!bms_is_member(i, relentry->remoterel.attkeys))
+			continue;
+
+		/*
+		 * Return if RI key is NULL or is explicitly marked unchanged. The key
+		 * value could be NULL in the new tuple of a update opertaion which
+		 * means the RI key is not updated.
+		 */
+		if (original_data->colstatus[i] == LOGICALREP_COLUMN_NULL ||
+			original_data->colstatus[i] == LOGICALREP_COLUMN_UNCHANGED)
+			return;
+	}
+
+	oldctx = MemoryContextSwitchTo(ApplyContext);
+
+	/* Allocate space for replica identity values */
+	ridata = palloc0_object(LogicalRepTupleData);
+	ridata->colvalues = palloc0_array(StringInfoData, n_ri);
+	ridata->colstatus = palloc0_array(char, n_ri);
+	ridata->ncols = n_ri;
+
+	for (int i_original = 0, i_ri = 0; i_original < original_data->ncols; i_original++)
+	{
+		StringInfo	original_colvalue = &original_data->colvalues[i_original];
+
+		if (!bms_is_member(i_original, relentry->remoterel.attkeys))
+			continue;
+
+		initStringInfoExt(&ridata->colvalues[i_ri], original_colvalue->len + 1);
+		appendStringInfoString(&ridata->colvalues[i_ri], original_colvalue->data);
+		ridata->colstatus[i_ri] = original_data->colstatus[i_original];
+		i_ri++;
+	}
+
+	rikey = palloc0_object(ReplicaIdentityKey);
+	rikey->relid = relid;
+	rikey->data = ridata;
+
+	if (TransactionIdIsValid(new_depended_xid))
+	{
+		rientry = replica_identity_insert(replica_identity_table, rikey,
+										  &found);
+
+		/*
+		 * Release the key built to search the entry, if the entry already
+		 * exists. Otherwise, initialize the remote_xid.
+		 */
+		if (found)
+		{
+			elog(DEBUG1, "found conflicting replica identity change from %u",
+				 rientry->remote_xid);
+
+			free_replica_identity_key(rikey);
+		}
+		else
+			rientry->remote_xid = InvalidTransactionId;
+	}
+	else
+	{
+		rientry = replica_identity_lookup(replica_identity_table, rikey);
+		free_replica_identity_key(rikey);
+	}
+
+	MemoryContextSwitchTo(oldctx);
+
+	/* Return if no entry found */
+	if (!rientry)
+		return;
+
+	Assert(!found || TransactionIdIsValid(rientry->remote_xid));
+
+	*depends_on_xids = check_and_append_xid_dependency(*depends_on_xids,
+													   &rientry->remote_xid,
+													   new_depended_xid);
+
+	/*
+	 * Update the new depended xid into the entry if valid, the new xid could
+	 * be invalid if the transaction will be applied by the leader itself
+	 * which means all the changes will be committed before processing next
+	 * transaction, so no need to be depended on.
+	 */
+	if (TransactionIdIsValid(new_depended_xid))
+		rientry->remote_xid = new_depended_xid;
+
+	/*
+	 * Remove the entry if the transaction has been committed and no new
+	 * dependency needs to be added.
+	 */
+	else if (!TransactionIdIsValid(rientry->remote_xid))
+	{
+		free_replica_identity_key(rientry->keydata);
+		replica_identity_delete_item(replica_identity_table, rientry);
+	}
+}
+
+/*
+ * Check for preceding transactions that involve insert, delete, or update
+ * operations on the specified table, and return them in 'depends_on_xids'.
+ */
+static void
+find_all_dependencies_on_rel(LogicalRepRelId relid, TransactionId new_depended_xid,
+							 List **depends_on_xids)
+{
+	replica_identity_iterator i;
+	ReplicaIdentityEntry *rientry;
+
+	Assert(depends_on_xids);
+
+	replica_identity_start_iterate(replica_identity_table, &i);
+	while ((rientry = replica_identity_iterate(replica_identity_table, &i)) != NULL)
+	{
+		Assert(TransactionIdIsValid(rientry->remote_xid));
+
+		if (rientry->keydata->relid != relid)
+			continue;
+
+		/* Clean up the hash entry for committed transaction while on it */
+		if (pa_transaction_committed(rientry->remote_xid))
+		{
+			free_replica_identity_key(rientry->keydata);
+			replica_identity_delete_item(replica_identity_table, rientry);
+
+			continue;
+		}
+
+		*depends_on_xids = check_and_append_xid_dependency(*depends_on_xids,
+														   &rientry->remote_xid,
+														   new_depended_xid);
+	}
+}
+
+/*
+ * Check for any preceding transactions that affect the given table and returns
+ * them in 'depends_on_xids'.
+ */
+static void
+check_dependency_on_rel(LogicalRepRelId relid, TransactionId new_depended_xid,
+						List **depends_on_xids)
+{
+	LogicalRepRelMapEntry *relentry;
+
+	Assert(depends_on_xids);
+
+	find_all_dependencies_on_rel(relid, new_depended_xid, depends_on_xids);
+
+	/* Search for existing entry */
+	relentry = logicalrep_get_relentry(relid);
+
+	/*
+	 * The relentry has not been initialized yet, indicating no change has
+	 * been applide yet.
+	 */
+	if (!relentry)
+		return;
+
+	*depends_on_xids = check_and_append_xid_dependency(*depends_on_xids,
+													   &relentry->last_depended_xid,
+													   new_depended_xid);
+
+	if (TransactionIdIsValid(new_depended_xid))
+		relentry->last_depended_xid = new_depended_xid;
+}
+
+/*
+ * Check dependencies related to the current change by determining if the
+ * modification impacts the same row or table as another ongoing transaction. If
+ * needed, instruct parallel apply workers to wait for these preceding
+ * transactions to complete.
+ *
+ * Simultaneously, track the dependency for the current change to ensure that
+ * subsequent transactions address this dependency.
+ */
+static void
+handle_dependency_on_change(LogicalRepMsgType action, StringInfo s,
+							TransactionId new_depended_xid,
+							ParallelApplyWorkerInfo *winfo)
+{
+	LogicalRepRelId relid;
+	LogicalRepTupleData oldtup;
+	LogicalRepTupleData newtup;
+	LogicalRepRelation *rel;
+	List	   *depends_on_xids = NIL;
+	List	   *remote_relids;
+	bool		has_oldtup = false;
+	bool		cascade = false;
+	bool		restart_seqs = false;
+	StringInfoData dependencies;
+
+	/*
+	 * Parse the consume data using a local copy instead of directly consuming
+	 * the given remote change as the caller may also read the data from the
+	 * remote message.
+	 */
+	StringInfoData change = *s;
+
+	/* Compute dependency only for non-streaming transaction */
+	if (in_streamed_transaction || (winfo && winfo->stream_txn))
+		return;
+
+	/* Only the leader checks dependencies and schedules the parallel apply */
+	if (!am_leader_apply_worker())
+		return;
+
+	if (!replica_identity_table)
+		replica_identity_table = replica_identity_create(ApplyContext,
+														 REPLICA_IDENTITY_INITIAL_SIZE,
+														 NULL);
+
+	if (replica_identity_table->members >= REPLICA_IDENTITY_CLEANUP_THRESHOLD)
+		cleanup_committed_replica_identity_entries();
+
+	switch (action)
+	{
+		case LOGICAL_REP_MSG_INSERT:
+			relid = logicalrep_read_insert(&change, &newtup);
+			check_dependency_on_replica_identity(relid, &newtup,
+												 new_depended_xid,
+												 &depends_on_xids);
+			break;
+
+		case LOGICAL_REP_MSG_UPDATE:
+			relid = logicalrep_read_update(&change, &has_oldtup, &oldtup,
+										   &newtup);
+
+			if (has_oldtup)
+				check_dependency_on_replica_identity(relid, &oldtup,
+													 new_depended_xid,
+													 &depends_on_xids);
+
+			check_dependency_on_replica_identity(relid, &newtup,
+												 new_depended_xid,
+												 &depends_on_xids);
+			break;
+
+		case LOGICAL_REP_MSG_DELETE:
+			relid = logicalrep_read_delete(&change, &oldtup);
+			check_dependency_on_replica_identity(relid, &oldtup,
+												 new_depended_xid,
+												 &depends_on_xids);
+			break;
+
+		case LOGICAL_REP_MSG_TRUNCATE:
+			remote_relids = logicalrep_read_truncate(&change, &cascade,
+													 &restart_seqs);
+
+			/*
+			 * Truncate affects all rows in a table, so the current
+			 * transaction should wait for all preceding transactions that
+			 * modified the same table.
+			 */
+			foreach_int(truncated_relid, remote_relids)
+				check_dependency_on_rel(truncated_relid, new_depended_xid,
+										&depends_on_xids);
+
+			break;
+
+		case LOGICAL_REP_MSG_RELATION:
+			rel = logicalrep_read_rel(&change);
+
+			/*
+			 * The replica identity key could be changed, making existing
+			 * entries in the replica identity invalid. In this case, parallel
+			 * apply is not allowed on this specific table until all running
+			 * transactions that modified it have finished.
+			 */
+			check_dependency_on_rel(rel->remoteid, new_depended_xid,
+									&depends_on_xids);
+			break;
+
+		case LOGICAL_REP_MSG_TYPE:
+		case LOGICAL_REP_MSG_MESSAGE:
+
+			/*
+			 * Type updates accompany relation updates, so dependencies have
+			 * already been checked during relation updates. Logical messages
+			 * do not conflict with any changes, so they can be ignored.
+			 */
+			break;
+
+		default:
+			Assert(false);
+			break;
+	}
+
+	if (!depends_on_xids)
+		return;
+
+	/*
+	 * Notify the transactions that they are dependent on the current
+	 * transaction.
+	 */
+	pa_record_dependency_on_transactions(depends_on_xids);
+
+	/*
+	 * If the leader applies the transaction itself, start waiting for
+	 * transactions that depend on the current transaction immediately.
+	 */
+	if (winfo == NULL)
+	{
+		foreach_xid(xid, depends_on_xids)
+			pa_wait_for_depended_transaction(xid);
+
+		return;
+	}
+
+	initStringInfo(&dependencies);
+
+	/* Build the dependency message used to send to parallel apply worker */
+	write_internal_dependencies(&dependencies, depends_on_xids);
+
+	if (!winfo->serialize_changes)
+	{
+		if (pa_send_data(winfo, dependencies.len, dependencies.data))
+			return;
+
+		pa_switch_to_partial_serialize(winfo, true);
+	}
+
+	/* Skip writing the first internal message flag */
+	dependencies.cursor++;
+	stream_write_change(LOGICAL_REP_MSG_INTERNAL_DEPENDENCY,
+						&dependencies);
+}
+
+/*
+ * Write internal dependency information to the output for the parallel apply
+ * worker.
+ */
+static void
+write_internal_dependencies(StringInfo s, List *depends_on_xids)
+{
+	pq_sendbyte(s, PARALLEL_APPLY_INTERNAL_MESSAGE);
+	pq_sendbyte(s, LOGICAL_REP_MSG_INTERNAL_DEPENDENCY);
+	pq_sendint32(s, list_length(depends_on_xids));
+
+	foreach_xid(xid, depends_on_xids)
+		pq_sendint32(s, xid);
+}
+
+/*
+ * Handle internal dependency information.
+ *
+ * Wait for all transactions listed in the message to commit.
+ */
+static void
+apply_handle_internal_dependency(StringInfo s)
+{
+	int			nxids = pq_getmsgint(s, 4);
+
+	for (int i = 0; i < nxids; i++)
+	{
+		TransactionId xid = pq_getmsgint(s, 4);
+
+		pa_wait_for_depended_transaction(xid);
+	}
+}
+
+/*
+ * Handle internal relation information.
+ *
+ * Update all relation details in the relation map cache.
+ */
+static void
+apply_handle_internal_relation(StringInfo s)
+{
+	int			num_rels;
+
+	num_rels = pq_getmsgint(s, 4);
+
+	for (int i = 0; i < num_rels; i++)
+	{
+		LogicalRepRelation *rel = logicalrep_read_rel(s);
+
+		logicalrep_relmap_update(rel);
+
+		elog(DEBUG1, "parallel apply worker worker init relmap for %s",
+			 rel->relname);
+	}
+}
+
 /*
  * Form the origin name for the subscription.
  *
@@ -748,13 +1378,18 @@ handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
 	TransApplyAction apply_action;
 	StringInfoData original_msg;
 
-	apply_action = get_transaction_apply_action(stream_xid, &winfo);
+	Assert(!in_streamed_transaction || TransactionIdIsValid(stream_xid));
+
+	apply_action = get_transaction_apply_action(in_streamed_transaction
+												? stream_xid : remote_xid,
+												&winfo);
 
 	/* not in streaming mode */
 	if (apply_action == TRANS_LEADER_APPLY)
+	{
+		handle_dependency_on_change(action, s, InvalidTransactionId, winfo);
 		return false;
-
-	Assert(TransactionIdIsValid(stream_xid));
+	}
 
 	/*
 	 * The parallel apply worker needs the xid in this message to decide
@@ -766,15 +1401,28 @@ handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
 
 	/*
 	 * We should have received XID of the subxact as the first part of the
-	 * message, so extract it.
+	 * message in streaming transactions, so extract it.
 	 */
-	current_xid = pq_getmsgint(s, 4);
+	if (in_streamed_transaction)
+		current_xid = pq_getmsgint(s, 4);
+	else
+		current_xid = remote_xid;
 
 	if (!TransactionIdIsValid(current_xid))
 		ereport(ERROR,
 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
 				 errmsg_internal("invalid transaction ID in streamed replication transaction")));
 
+	handle_dependency_on_change(action, s, current_xid, winfo);
+
+	/*
+	 * Re-fetch the latest apply action as it might have been changed during
+	 * dependency check.
+	 */
+	apply_action = get_transaction_apply_action(in_streamed_transaction
+												? stream_xid : remote_xid,
+												&winfo);
+
 	switch (apply_action)
 	{
 		case TRANS_LEADER_SERIALIZE:
@@ -1177,17 +1825,49 @@ static void
 apply_handle_begin(StringInfo s)
 {
 	LogicalRepBeginData begin_data;
+	ParallelApplyWorkerInfo *winfo;
+	TransApplyAction apply_action;
 
 	/* There must not be an active streaming transaction. */
 	Assert(!TransactionIdIsValid(stream_xid));
 
 	logicalrep_read_begin(s, &begin_data);
-	set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn);
+
+	remote_xid = begin_data.xid;
+
+	set_apply_error_context_xact(remote_xid, begin_data.final_lsn);
 
 	remote_final_lsn = begin_data.final_lsn;
 
 	maybe_start_skipping_changes(begin_data.final_lsn);
 
+	pa_allocate_worker(remote_xid, false);
+
+	apply_action = get_transaction_apply_action(remote_xid, &winfo);
+
+	elog(DEBUG1, "new remote_xid %u", remote_xid);
+	switch (apply_action)
+	{
+		case TRANS_LEADER_APPLY:
+			break;
+
+		case TRANS_LEADER_SEND_TO_PARALLEL:
+			Assert(winfo);
+			pa_send_data(winfo, s->len, s->data);
+			pa_set_stream_apply_worker(winfo);
+			break;
+
+		case TRANS_PARALLEL_APPLY:
+			/* Hold the lock until the end of the transaction. */
+			pa_lock_transaction(MyParallelShared->xid, AccessExclusiveLock);
+			pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_STARTED);
+			break;
+
+		default:
+			elog(ERROR, "unexpected apply action: %d", (int) apply_action);
+			break;
+	}
+
 	in_remote_transaction = true;
 
 	pgstat_report_activity(STATE_RUNNING, NULL);
@@ -1202,6 +1882,11 @@ static void
 apply_handle_commit(StringInfo s)
 {
 	LogicalRepCommitData commit_data;
+	ParallelApplyWorkerInfo *winfo;
+	TransApplyAction apply_action;
+
+	/* Save the message before it is consumed. */
+	StringInfoData original_msg = *s;
 
 	logicalrep_read_commit(s, &commit_data);
 
@@ -1212,7 +1897,70 @@ apply_handle_commit(StringInfo s)
 								 LSN_FORMAT_ARGS(commit_data.commit_lsn),
 								 LSN_FORMAT_ARGS(remote_final_lsn))));
 
-	apply_handle_commit_internal(&commit_data);
+	apply_action = get_transaction_apply_action(remote_xid, &winfo);
+
+	switch (apply_action)
+	{
+		case TRANS_LEADER_APPLY:
+			apply_handle_commit_internal(&commit_data);
+			break;
+
+		case TRANS_LEADER_SEND_TO_PARALLEL:
+			Assert(winfo);
+
+			if (pa_send_data(winfo, s->len, s->data))
+			{
+				/* Finish processing the transaction. */
+				pa_xact_finish(winfo, commit_data.end_lsn);
+				break;
+			}
+
+			/*
+			 * Switch to serialize mode when we are not able to send the
+			 * change to parallel apply worker.
+			 */
+			pa_switch_to_partial_serialize(winfo, true);
+
+			/* fall through */
+		case TRANS_LEADER_PARTIAL_SERIALIZE:
+			Assert(winfo);
+
+			stream_open_and_write_change(remote_xid, LOGICAL_REP_MSG_COMMIT,
+										 &original_msg);
+
+			pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE);
+
+			/* Finish processing the transaction. */
+			pa_xact_finish(winfo, commit_data.end_lsn);
+			break;
+
+		case TRANS_PARALLEL_APPLY:
+
+			/*
+			 * If the parallel apply worker is applying spooled messages then
+			 * close the file before committing.
+			 */
+			if (stream_fd)
+				stream_close_file();
+
+			apply_handle_commit_internal(&commit_data);
+
+			MyParallelShared->last_commit_end = XactLastCommitEnd;
+
+			pa_commit_transaction();
+
+			pa_unlock_transaction(remote_xid, AccessExclusiveLock);
+			break;
+
+		default:
+			elog(ERROR, "unexpected apply action: %d", (int) apply_action);
+			break;
+	}
+
+	remote_xid = InvalidTransactionId;
+	in_remote_transaction = false;
+
+	elog(DEBUG1, "reset remote_xid %u", remote_xid);
 
 	/* Process any tables that are being synchronized in parallel. */
 	process_syncing_tables(commit_data.end_lsn);
@@ -1332,7 +2080,8 @@ apply_handle_prepare(StringInfo s)
 	 * XactLastCommitEnd, and adding it for this purpose doesn't seems worth
 	 * it.
 	 */
-	store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr);
+	store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr,
+						 InvalidTransactionId);
 
 	in_remote_transaction = false;
 
@@ -1377,6 +2126,8 @@ apply_handle_commit_prepared(StringInfo s)
 	/* There is no transaction when COMMIT PREPARED is called */
 	begin_replication_step();
 
+	/* TODO wait for xid to finish */
+
 	/*
 	 * Update origin state so we can restart streaming from correct position
 	 * in case of crash.
@@ -1389,7 +2140,8 @@ apply_handle_commit_prepared(StringInfo s)
 	CommitTransactionCommand();
 	pgstat_report_stat(false);
 
-	store_flush_position(prepare_data.end_lsn, XactLastCommitEnd);
+	store_flush_position(prepare_data.end_lsn, XactLastCommitEnd,
+						 InvalidTransactionId);
 	in_remote_transaction = false;
 
 	/* Process any tables that are being synchronized in parallel. */
@@ -1455,7 +2207,8 @@ apply_handle_rollback_prepared(StringInfo s)
 	 * transaction because we always flush the WAL record for it. See
 	 * apply_handle_prepare.
 	 */
-	store_flush_position(rollback_data.rollback_end_lsn, InvalidXLogRecPtr);
+	store_flush_position(rollback_data.rollback_end_lsn, InvalidXLogRecPtr,
+						 InvalidTransactionId);
 	in_remote_transaction = false;
 
 	/* Process any tables that are being synchronized in parallel. */
@@ -1514,7 +2267,8 @@ apply_handle_stream_prepare(StringInfo s)
 			 * It is okay not to set the local_end LSN for the prepare because
 			 * we always flush the prepare record. See apply_handle_prepare.
 			 */
-			store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr);
+			store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr,
+								 InvalidTransactionId);
 
 			in_remote_transaction = false;
 
@@ -1705,7 +2459,7 @@ apply_handle_stream_start(StringInfo s)
 
 	/* Try to allocate a worker for the streaming transaction. */
 	if (first_segment)
-		pa_allocate_worker(stream_xid);
+		pa_allocate_worker(stream_xid, true);
 
 	apply_action = get_transaction_apply_action(stream_xid, &winfo);
 
@@ -1763,6 +2517,11 @@ apply_handle_stream_start(StringInfo s)
 		case TRANS_LEADER_PARTIAL_SERIALIZE:
 			Assert(winfo);
 
+			/*
+			 * TODO, the pa worker could start to wait too soon when
+			 * processing some old stream start
+			 */
+
 			/*
 			 * Open the spool file unless it was already opened when switching
 			 * to serialize mode. The transaction started in
@@ -2486,7 +3245,8 @@ apply_handle_commit_internal(LogicalRepCommitData *commit_data)
 
 		pgstat_report_stat(false);
 
-		store_flush_position(commit_data->end_lsn, XactLastCommitEnd);
+		store_flush_position(commit_data->end_lsn, XactLastCommitEnd,
+							 InvalidTransactionId);
 	}
 	else
 	{
@@ -2519,6 +3279,9 @@ apply_handle_relation(StringInfo s)
 
 	/* Also reset all entries in the partition map that refer to remoterel. */
 	logicalrep_partmap_reset_relmap(rel);
+
+	if (am_leader_apply_worker())
+		pa_distribute_schema_changes_to_workers(rel);
 }
 
 /*
@@ -3270,6 +4033,8 @@ FindDeletedTupleInLocalRel(Relation localrel, Oid localidxoid,
 
 /*
  * This handles insert, update, delete on a partitioned table.
+ *
+ * TODO, support parallel apply.
  */
 static void
 apply_handle_tuple_routing(ApplyExecutionData *edata,
@@ -3579,6 +4344,8 @@ apply_handle_truncate(StringInfo s)
 	ListCell   *lc;
 	LOCKMODE	lockmode = AccessExclusiveLock;
 
+	elog(LOG, "truncate");
+
 	/*
 	 * Quick return if we are skipping data modification changes or handling
 	 * streamed transactions.
@@ -3790,6 +4557,14 @@ apply_dispatch(StringInfo s)
 			apply_handle_stream_prepare(s);
 			break;
 
+		case LOGICAL_REP_MSG_INTERNAL_RELATION:
+			apply_handle_internal_relation(s);
+			break;
+
+		case LOGICAL_REP_MSG_INTERNAL_DEPENDENCY:
+			apply_handle_internal_dependency(s);
+			break;
+
 		default:
 			ereport(ERROR,
 					(errcode(ERRCODE_PROTOCOL_VIOLATION),
@@ -3810,6 +4585,10 @@ apply_dispatch(StringInfo s)
  * check which entries on it are already locally flushed. Those we can report
  * as having been flushed.
  *
+ * For non-streaming transactions managed by a parallel apply worker, we will
+ * get the local commit end from the shared parallel apply worker info once the
+ * transaction has been committed by the worker.
+ *
  * The have_pending_txes is true if there are outstanding transactions that
  * need to be flushed.
  */
@@ -3819,6 +4598,7 @@ get_flush_position(XLogRecPtr *write, XLogRecPtr *flush,
 {
 	dlist_mutable_iter iter;
 	XLogRecPtr	local_flush = GetFlushRecPtr(NULL);
+	List	   *committed_pa_xid = NIL;
 
 	*write = InvalidXLogRecPtr;
 	*flush = InvalidXLogRecPtr;
@@ -3828,6 +4608,36 @@ get_flush_position(XLogRecPtr *write, XLogRecPtr *flush,
 		FlushPosition *pos =
 			dlist_container(FlushPosition, node, iter.cur);
 
+		if (TransactionIdIsValid(pos->pa_remote_xid) &&
+			XLogRecPtrIsInvalid(pos->local_end))
+		{
+			bool		skipped_write;
+
+			pos->local_end = pa_get_last_commit_end(pos->pa_remote_xid, true,
+													&skipped_write);
+
+			elog(DEBUG1,
+				 "got commit end from parallel apply worker, "
+				 "txn: %u, remote_end %X/%X, local_end %X/%X",
+				 pos->pa_remote_xid, LSN_FORMAT_ARGS(pos->remote_end),
+				 LSN_FORMAT_ARGS(pos->local_end));
+
+			/*
+			 * Break the loop if the worker has not finished applying the
+			 * transaction. There's no need to check subsequent transactions,
+			 * as they must commit after the current transaction being
+			 * examined and thus won't have their commit end available yet.
+			 */
+			if (!skipped_write && XLogRecPtrIsInvalid(pos->local_end))
+				break;
+
+			committed_pa_xid = lappend_xid(committed_pa_xid, pos->pa_remote_xid);
+		}
+
+		/*
+		 * Worker has finished applying or the transaction was applied in the
+		 * leader apply worker
+		 */
 		*write = pos->remote_end;
 
 		if (pos->local_end <= local_flush)
@@ -3836,29 +4646,19 @@ get_flush_position(XLogRecPtr *write, XLogRecPtr *flush,
 			dlist_delete(iter.cur);
 			pfree(pos);
 		}
-		else
-		{
-			/*
-			 * Don't want to uselessly iterate over the rest of the list which
-			 * could potentially be long. Instead get the last element and
-			 * grab the write position from there.
-			 */
-			pos = dlist_tail_element(FlushPosition, node,
-									 &lsn_mapping);
-			*write = pos->remote_end;
-			*have_pending_txes = true;
-			return;
-		}
 	}
 
 	*have_pending_txes = !dlist_is_empty(&lsn_mapping);
+
+	cleanup_replica_identity_table(committed_pa_xid);
 }
 
 /*
  * Store current remote/local lsn pair in the tracking list.
  */
 void
-store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
+store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn,
+					 TransactionId remote_xid)
 {
 	FlushPosition *flushpos;
 
@@ -3876,6 +4676,7 @@ store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
 	flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
 	flushpos->local_end = local_lsn;
 	flushpos->remote_end = remote_lsn;
+	flushpos->pa_remote_xid = remote_xid;
 
 	dlist_push_tail(&lsn_mapping, &flushpos->node);
 	MemoryContextSwitchTo(ApplyMessageContext);
@@ -5057,7 +5858,7 @@ stream_cleanup_files(Oid subid, TransactionId xid)
  * changes for this transaction, create the buffile, otherwise open the
  * previously created file.
  */
-static void
+void
 stream_open_file(Oid subid, TransactionId xid, bool first_segment)
 {
 	char		path[MAXPGPATH];
@@ -5102,7 +5903,7 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment)
  * stream_close_file
  *	  Close the currently open file with streamed changes.
  */
-static void
+void
 stream_close_file(void)
 {
 	Assert(stream_fd != NULL);
@@ -5150,7 +5951,7 @@ stream_write_change(char action, StringInfo s)
  * target file if not already before writing the message and close the file at
  * the end.
  */
-static void
+void
 stream_open_and_write_change(TransactionId xid, char action, StringInfo s)
 {
 	Assert(!in_streamed_transaction);
diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt
index 0be307d2ca0..fd66b2c0a41 100644
--- a/src/backend/utils/activity/wait_event_names.txt
+++ b/src/backend/utils/activity/wait_event_names.txt
@@ -406,6 +406,7 @@ SubtransSLRU	"Waiting to access the sub-transaction SLRU cache."
 XactSLRU	"Waiting to access the transaction status SLRU cache."
 ParallelVacuumDSA	"Waiting for parallel vacuum dynamic shared memory allocation."
 AioUringCompletion	"Waiting for another process to complete IO via io_uring."
+ParallelApplyDSA	"Waiting for parallel apply dynamic shared memory allocation."
 
 # No "ABI_compatibility" region here as WaitEventLWLock has its own C code.
 
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index b261c60d3fa..7d2aaf2d389 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -75,6 +75,8 @@ typedef enum LogicalRepMsgType
 	LOGICAL_REP_MSG_STREAM_COMMIT = 'c',
 	LOGICAL_REP_MSG_STREAM_ABORT = 'A',
 	LOGICAL_REP_MSG_STREAM_PREPARE = 'p',
+	LOGICAL_REP_MSG_INTERNAL_DEPENDENCY = 'd',
+	LOGICAL_REP_MSG_INTERNAL_RELATION = 'i',
 } LogicalRepMsgType;
 
 /*
@@ -251,6 +253,8 @@ extern void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecP
 extern void logicalrep_write_rel(StringInfo out, TransactionId xid,
 								 Relation rel, Bitmapset *columns,
 								 PublishGencolsType include_gencols_type);
+extern void logicalrep_write_internal_rel(StringInfo out,
+										  LogicalRepRelation *rel);
 extern LogicalRepRelation *logicalrep_read_rel(StringInfo in);
 extern void logicalrep_write_typ(StringInfo out, TransactionId xid,
 								 Oid typoid);
diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h
index 7a561a8e8d8..34a7069e9e5 100644
--- a/src/include/replication/logicalrelation.h
+++ b/src/include/replication/logicalrelation.h
@@ -37,6 +37,8 @@ typedef struct LogicalRepRelMapEntry
 	/* Sync state. */
 	char		state;
 	XLogRecPtr	statelsn;
+
+	TransactionId last_depended_xid;
 } LogicalRepRelMapEntry;
 
 extern void logicalrep_relmap_update(LogicalRepRelation *remoterel);
@@ -50,5 +52,8 @@ extern void logicalrep_rel_close(LogicalRepRelMapEntry *rel,
 								 LOCKMODE lockmode);
 extern bool IsIndexUsableForReplicaIdentityFull(Relation idxrel, AttrMap *attrmap);
 extern Oid	GetRelationIdentityOrPK(Relation rel);
+extern int	logicalrep_get_num_rels(void);
+extern void logicalrep_write_all_rels(StringInfo out);
+extern LogicalRepRelMapEntry *logicalrep_get_relentry(LogicalRepRelId remoteid);
 
 #endif							/* LOGICALRELATION_H */
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 7c0204dd6f4..bf9a7f00f87 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -15,6 +15,7 @@
 #include "access/xlogdefs.h"
 #include "catalog/pg_subscription.h"
 #include "datatype/timestamp.h"
+#include "lib/dshash.h"
 #include "miscadmin.h"
 #include "replication/logicalrelation.h"
 #include "replication/walreceiver.h"
@@ -191,6 +192,11 @@ typedef struct ParallelApplyWorkerShared
 	 */
 	PartialFileSetState fileset_state;
 	FileSet		fileset;
+
+	dsa_handle	parallel_apply_dsa_handle;
+	dshash_table_handle parallelized_txns_handle;
+
+	bool		has_dependent_txn;
 } ParallelApplyWorkerShared;
 
 /*
@@ -225,6 +231,8 @@ typedef struct ParallelApplyWorkerInfo
 	 */
 	bool		in_use;
 
+	bool		stream_txn;
+
 	ParallelApplyWorkerShared *shared;
 } ParallelApplyWorkerInfo;
 
@@ -287,6 +295,10 @@ extern void apply_dispatch(StringInfo s);
 extern void maybe_reread_subscription(void);
 
 extern void stream_cleanup_files(Oid subid, TransactionId xid);
+extern void stream_open_file(Oid subid, TransactionId xid, bool first_segment);
+extern void stream_close_file(void);
+extern void stream_open_and_write_change(TransactionId xid, char action,
+										 StringInfo s);
 
 extern void set_stream_options(WalRcvStreamOptions *options,
 							   char *slotname,
@@ -300,19 +312,23 @@ extern void SetupApplyOrSyncWorker(int worker_slot);
 
 extern void DisableSubscriptionAndExit(void);
 
-extern void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn);
+extern void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn,
+								 TransactionId remote_xid);
 
 /* Function for apply error callback */
 extern void apply_error_callback(void *arg);
 extern void set_apply_error_context_origin(char *originname);
 
 /* Parallel apply worker setup and interactions */
-extern void pa_allocate_worker(TransactionId xid);
+extern void pa_allocate_worker(TransactionId xid, bool stream_txn);
 extern ParallelApplyWorkerInfo *pa_find_worker(TransactionId xid);
+extern XLogRecPtr pa_get_last_commit_end(TransactionId xid, bool delete_entry,
+										 bool *skipped_write);
 extern void pa_detach_all_error_mq(void);
 
 extern bool pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes,
 						 const void *data);
+extern void pa_distribute_schema_changes_to_workers(LogicalRepRelation *rel);
 extern void pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo,
 										   bool stream_locked);
 
@@ -337,12 +353,18 @@ extern void pa_decr_and_wait_stream_block(void);
 
 extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
 						   XLogRecPtr remote_lsn);
+extern bool pa_transaction_committed(TransactionId xid);
+extern void pa_record_dependency_on_transactions(List *depends_on_xids);
+extern void pa_commit_transaction(void);
+extern void pa_wait_for_depended_transaction(TransactionId xid);
 
 #define isParallelApplyWorker(worker) ((worker)->in_use && \
 									   (worker)->type == WORKERTYPE_PARALLEL_APPLY)
 #define isTablesyncWorker(worker) ((worker)->in_use && \
 								   (worker)->type == WORKERTYPE_TABLESYNC)
 
+#define PARALLEL_APPLY_INTERNAL_MESSAGE	'i'
+
 static inline bool
 am_tablesync_worker(void)
 {
diff --git a/src/include/storage/lwlocklist.h b/src/include/storage/lwlocklist.h
index 208d2e3a8ed..cc995f3a252 100644
--- a/src/include/storage/lwlocklist.h
+++ b/src/include/storage/lwlocklist.h
@@ -135,3 +135,4 @@ PG_LWLOCKTRANCHE(SUBTRANS_SLRU, SubtransSLRU)
 PG_LWLOCKTRANCHE(XACT_SLRU, XactSLRU)
 PG_LWLOCKTRANCHE(PARALLEL_VACUUM_DSA, ParallelVacuumDSA)
 PG_LWLOCKTRANCHE(AIO_URING_COMPLETION, AioUringCompletion)
+PG_LWLOCKTRANCHE(PARALLEL_APPLY_DSA, ParallelApplyDSA)
diff --git a/src/test/subscription/t/010_truncate.pl b/src/test/subscription/t/010_truncate.pl
index 3d16c2a800d..c2fba0b9a9c 100644
--- a/src/test/subscription/t/010_truncate.pl
+++ b/src/test/subscription/t/010_truncate.pl
@@ -17,7 +17,7 @@ $node_publisher->start;
 my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
 $node_subscriber->init;
 $node_subscriber->append_conf('postgresql.conf',
-	qq(max_logical_replication_workers = 6));
+	qq(max_logical_replication_workers = 7));
 $node_subscriber->start;
 
 my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
diff --git a/src/test/subscription/t/015_stream.pl b/src/test/subscription/t/015_stream.pl
index 03135b1cd6e..e79ddd9a41c 100644
--- a/src/test/subscription/t/015_stream.pl
+++ b/src/test/subscription/t/015_stream.pl
@@ -232,6 +232,12 @@ $node_subscriber->wait_for_log(
 
 $node_publisher->safe_psql('postgres', "INSERT INTO test_tab_2 values(1)");
 
+# FIXME: Currently, non-streaming transactions are applied in parallel by
+# default. So, the first transaction is handled by a parallel apply worker. To
+# trigger the deadlock, initiate an more transaction to be applied by the
+# leader.
+$node_publisher->safe_psql('postgres', "INSERT INTO test_tab_2 values(1)");
+
 $h->query_safe('COMMIT');
 $h->quit;
 
@@ -247,7 +253,7 @@ $node_publisher->wait_for_catchup($appname);
 
 $result =
   $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2");
-is($result, qq(5001), 'data replicated to subscriber after dropping index');
+is($result, qq(5002), 'data replicated to subscriber after dropping index');
 
 # Clean up test data from the environment.
 $node_publisher->safe_psql('postgres', "TRUNCATE TABLE test_tab_2");
diff --git a/src/test/subscription/t/026_stats.pl b/src/test/subscription/t/026_stats.pl
index 00a1c2fcd48..6842476c8b0 100644
--- a/src/test/subscription/t/026_stats.pl
+++ b/src/test/subscription/t/026_stats.pl
@@ -16,6 +16,7 @@ $node_publisher->start;
 # Create subscriber node.
 my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
 $node_subscriber->init;
+$node_subscriber->append_conf('postgresql.conf', "max_logical_replication_workers = 10");
 $node_subscriber->start;
 
 
diff --git a/src/test/subscription/t/027_nosuperuser.pl b/src/test/subscription/t/027_nosuperuser.pl
index 36af1c16e7f..aec039d565b 100644
--- a/src/test/subscription/t/027_nosuperuser.pl
+++ b/src/test/subscription/t/027_nosuperuser.pl
@@ -87,6 +87,7 @@ $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
 $node_publisher->init(allows_streaming => 'logical');
 $node_subscriber->init;
 $node_publisher->start;
+$node_subscriber->append_conf('postgresql.conf', "max_logical_replication_workers = 10");
 $node_subscriber->start;
 $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
 my %remainder_a = (
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index e6f2e93b2d6..fa4bfdcbd75 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2075,6 +2075,7 @@ ParallelTransState
 ParallelVacuumState
 ParallelWorkerContext
 ParallelWorkerInfo
+ParallelizedTxnEntry
 Param
 ParamCompileHook
 ParamExecData
@@ -2540,6 +2541,8 @@ ReparameterizeForeignPathByChild_function
 ReplaceVarsFromTargetList_context
 ReplaceVarsNoMatchOption
 ReplaceWrapOption
+ReplicaIdentityEntry
+ReplicaIdentityKey
 ReplicaIdentityStmt
 ReplicationKind
 ReplicationSlot
@@ -4027,6 +4030,7 @@ remoteDep
 remove_nulling_relids_context
 rendezvousHashEntry
 rep
+replica_identity_hash
 replace_rte_variables_callback
 replace_rte_variables_context
 report_error_fn
-- 
2.50.1.windows.1

