From 88ce1eede98cc6fa14de5cfb674b77a4961847f1 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Mon, 1 Dec 2025 16:28:38 +0900
Subject: [PATCH v4 2/4] Introduce a shared hash table to store parallelized
 transactions

This hash table is used for ensuring that parallel workers wait until dependent
transactions are committed.

The shared hash table contains transaction IDs that the leader allocated to
parallel workers. The hash entries are inserted with a remote XID when the
leader bypasses remote transactions to parallel apply workers. Entries are
deleted when parallel workers are committed to corresponding transactions.

When the parallel worker tries to wait for other transactions, it checks the
hash table for the remote XIDs. The process can go ahead only when entries are
removed from the hash.

Author: Hou Zhijie <houzj.fnst@fujitsu.com>
Author: Hayato Kuroda <kuroda.hayato@fujitsu.com>
---
 .../replication/logical/applyparallelworker.c | 100 +++++++++++++++++-
 .../utils/activity/wait_event_names.txt       |   1 +
 src/include/replication/worker_internal.h     |   4 +
 src/include/storage/lwlocklist.h              |   1 +
 4 files changed, 105 insertions(+), 1 deletion(-)

diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
index 735a3e9acad..bc8a0480778 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -218,12 +218,35 @@ typedef struct ParallelApplyWorkerEntry
 	ParallelApplyWorkerInfo *winfo;
 } ParallelApplyWorkerEntry;
 
+/* an entry in the parallelized_txns shared hash table */
+typedef struct ParallelizedTxnEntry
+{
+	TransactionId xid;			/* Hash key */
+} ParallelizedTxnEntry;
+
 /*
  * A hash table used to cache the state of streaming transactions being applied
  * by the parallel apply workers.
  */
 static HTAB *ParallelApplyTxnHash = NULL;
 
+/*
+ * A hash table used to track the parallelized transactions that could be
+ * depended on by other transactions.
+ */
+static dsa_area *parallel_apply_dsa_area = NULL;
+static dshash_table *parallelized_txns = NULL;
+
+/* parameters for the parallelized_txns shared hash table */
+static const dshash_parameters dsh_params = {
+	sizeof(TransactionId),
+	sizeof(ParallelizedTxnEntry),
+	dshash_memcmp,
+	dshash_memhash,
+	dshash_memcpy,
+	LWTRANCHE_PARALLEL_APPLY_DSA
+};
+
 /*
 * A list (pool) of active parallel apply workers. The information for
 * the new worker is added to the list after successfully launching it. The
@@ -257,6 +280,8 @@ static List *subxactlist = NIL;
 static void pa_free_worker_info(ParallelApplyWorkerInfo *winfo);
 static ParallelTransState pa_get_xact_state(ParallelApplyWorkerShared *wshared);
 static PartialFileSetState pa_get_fileset_state(void);
+static void pa_attach_parallelized_txn_hash(dsa_handle *pa_dsa_handle,
+											dshash_table_handle *pa_dshash_handle);
 
 /*
  * Returns true if it is OK to start a parallel apply worker, false otherwise.
@@ -334,6 +359,15 @@ pa_setup_dsm(ParallelApplyWorkerInfo *winfo)
 	shm_mq	   *mq;
 	Size		queue_size = DSM_QUEUE_SIZE;
 	Size		error_queue_size = DSM_ERROR_QUEUE_SIZE;
+	dsa_handle	parallel_apply_dsa_handle;
+	dshash_table_handle parallelized_txns_handle;
+
+	pa_attach_parallelized_txn_hash(&parallel_apply_dsa_handle,
+									&parallelized_txns_handle);
+
+	if (parallel_apply_dsa_handle == DSA_HANDLE_INVALID ||
+		parallelized_txns_handle == DSHASH_HANDLE_INVALID)
+		return false;
 
 	/*
 	 * Estimate how much shared memory we need.
@@ -369,6 +403,8 @@ pa_setup_dsm(ParallelApplyWorkerInfo *winfo)
 	pg_atomic_init_u32(&(shared->pending_stream_count), 0);
 	shared->last_commit_end = InvalidXLogRecPtr;
 	shared->fileset_state = FS_EMPTY;
+	shared->parallel_apply_dsa_handle = parallel_apply_dsa_handle;
+	shared->parallelized_txns_handle = parallelized_txns_handle;
 
 	shm_toc_insert(toc, PARALLEL_APPLY_KEY_SHARED, shared);
 
@@ -864,6 +900,8 @@ ParallelApplyWorkerMain(Datum main_arg)
 	shm_mq	   *mq;
 	shm_mq_handle *mqh;
 	shm_mq_handle *error_mqh;
+	dsa_handle	pa_dsa_handle;
+	dshash_table_handle pa_dshash_handle;
 	RepOriginId originid;
 	int			worker_slot = DatumGetInt32(main_arg);
 	char		originname[NAMEDATALEN];
@@ -951,6 +989,8 @@ ParallelApplyWorkerMain(Datum main_arg)
 
 	InitializingApplyWorker = false;
 
+	pa_attach_parallelized_txn_hash(&pa_dsa_handle, &pa_dshash_handle);
+
 	/* Setup replication origin tracking. */
 	StartTransactionCommand();
 	ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid,
@@ -1646,6 +1686,51 @@ pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn)
 	pa_free_worker(winfo);
 }
 
+/*
+ * Attach to the shared hash table for parallelized transactions.
+ */
+static void
+pa_attach_parallelized_txn_hash(dsa_handle *pa_dsa_handle,
+								dshash_table_handle *pa_dshash_handle)
+{
+	MemoryContext oldctx;
+
+	if (parallelized_txns)
+	{
+		Assert(parallel_apply_dsa_area);
+		*pa_dsa_handle = dsa_get_handle(parallel_apply_dsa_area);
+		*pa_dshash_handle = dshash_get_hash_table_handle(parallelized_txns);
+		return;
+	}
+
+	/* Be sure any local memory allocated by DSA routines is persistent. */
+	oldctx = MemoryContextSwitchTo(ApplyContext);
+
+	if (am_leader_apply_worker())
+	{
+		/* Initialize dynamic shared hash table for last-start times. */
+		parallel_apply_dsa_area = dsa_create(LWTRANCHE_PARALLEL_APPLY_DSA);
+		dsa_pin(parallel_apply_dsa_area);
+		dsa_pin_mapping(parallel_apply_dsa_area);
+		parallelized_txns = dshash_create(parallel_apply_dsa_area, &dsh_params, NULL);
+
+		/* Store handles in shared memory for other backends to use. */
+		*pa_dsa_handle = dsa_get_handle(parallel_apply_dsa_area);
+		*pa_dshash_handle = dshash_get_hash_table_handle(parallelized_txns);
+	}
+	else if (am_parallel_apply_worker())
+	{
+		/* Attach to existing dynamic shared hash table. */
+		parallel_apply_dsa_area = dsa_attach(MyParallelShared->parallel_apply_dsa_handle);
+		dsa_pin_mapping(parallel_apply_dsa_area);
+		parallelized_txns = dshash_attach(parallel_apply_dsa_area, &dsh_params,
+										  MyParallelShared->parallelized_txns_handle,
+										  NULL);
+	}
+
+	MemoryContextSwitchTo(oldctx);
+}
+
 /*
  * Wait for the given transaction to finish.
  */
@@ -1656,7 +1741,20 @@ pa_wait_for_depended_transaction(TransactionId xid)
 
 	for (;;)
 	{
-		/* XXX wait until given transaction is finished */
+		ParallelizedTxnEntry *txn_entry;
+
+		txn_entry = dshash_find(parallelized_txns, &xid, false);
+
+		/* The entry is removed only if the transaction is committed */
+		if (txn_entry == NULL)
+			break;
+
+		dshash_release_lock(parallelized_txns, txn_entry);
+
+		pa_lock_transaction(xid, AccessShareLock);
+		pa_unlock_transaction(xid, AccessShareLock);
+
+		CHECK_FOR_INTERRUPTS();
 	}
 
 	elog(DEBUG1, "finish waiting for depended xid %u", xid);
diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt
index c1ac71ff7f2..a561f8ff459 100644
--- a/src/backend/utils/activity/wait_event_names.txt
+++ b/src/backend/utils/activity/wait_event_names.txt
@@ -404,6 +404,7 @@ SubtransSLRU	"Waiting to access the sub-transaction SLRU cache."
 XactSLRU	"Waiting to access the transaction status SLRU cache."
 ParallelVacuumDSA	"Waiting for parallel vacuum dynamic shared memory allocation."
 AioUringCompletion	"Waiting for another process to complete IO via io_uring."
+ParallelApplyDSA	"Waiting for parallel apply dynamic shared memory allocation."
 
 # No "ABI_compatibility" region here as WaitEventLWLock has its own C code.
 
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index a3526eae578..ddcdcc05053 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -15,6 +15,7 @@
 #include "access/xlogdefs.h"
 #include "catalog/pg_subscription.h"
 #include "datatype/timestamp.h"
+#include "lib/dshash.h"
 #include "miscadmin.h"
 #include "replication/logicalrelation.h"
 #include "replication/walreceiver.h"
@@ -197,6 +198,9 @@ typedef struct ParallelApplyWorkerShared
 	 */
 	PartialFileSetState fileset_state;
 	FileSet		fileset;
+
+	dsa_handle	parallel_apply_dsa_handle;
+	dshash_table_handle parallelized_txns_handle;
 } ParallelApplyWorkerShared;
 
 /*
diff --git a/src/include/storage/lwlocklist.h b/src/include/storage/lwlocklist.h
index 5b0ce383408..d68940b02bc 100644
--- a/src/include/storage/lwlocklist.h
+++ b/src/include/storage/lwlocklist.h
@@ -136,3 +136,4 @@ PG_LWLOCKTRANCHE(SUBTRANS_SLRU, SubtransSLRU)
 PG_LWLOCKTRANCHE(XACT_SLRU, XactSLRU)
 PG_LWLOCKTRANCHE(PARALLEL_VACUUM_DSA, ParallelVacuumDSA)
 PG_LWLOCKTRANCHE(AIO_URING_COMPLETION, AioUringCompletion)
+PG_LWLOCKTRANCHE(PARALLEL_APPLY_DSA, ParallelApplyDSA)
-- 
2.47.3

