On 17/09/2025 8:18 AM, Zhijie Hou (Fujitsu) wrote:
On Wednesday, September 17, 2025 2:40 AM Konstantin Knizhnik 
<[email protected]>  wrote:
I think debug_logical_replication_streaming=immediate differs from real parallel
apply . It wasn't designed to simulate genuine parallel application because it
restricts parallelism by requiring the leader to wait for each transaction to
complete on commit. To achieve in-order parallel apply, each parallel apply
worker should wait for the preceding transaction to finish, similar to the
dependency wait in the current POC patch. We plan to extend the patch to support
in-order parallel apply and will test its performance.


You was right.
I tried to preserve commit order with your patch (using my random update test) and was surprised that performance penalty is quite small:

I run pgbench performing random updates using 10 clients during 100 seconds and then check how long time it takes subscriber to caught up (seconds):

master: 488
parallel-apply no order: 74
parallel-apply preserve order: 88

So looks like serialization of commits adds not so much overhead and it makes it possible to use it by default, avoiding all effects which may be caused by changing commit order at subscriber.

Patch is attached (it is based on your patch) and adds preserve_commit_order GUC.
diff --git a/src/backend/replication/logical/applyparallelworker.c 
b/src/backend/replication/logical/applyparallelworker.c
index 31a92d1a24a..13e5fc218d8 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -254,6 +298,9 @@ static ParallelApplyWorkerInfo *stream_apply_worker = NULL;
 /* A list to maintain subtransactions, if any. */
 static List *subxactlist = NIL;
 
+/* GUC */
+bool preserve_commit_order = true;
+
 static void pa_free_worker_info(ParallelApplyWorkerInfo *winfo);
 static ParallelTransState pa_get_xact_state(ParallelApplyWorkerShared 
*wshared);
 static PartialFileSetState pa_get_fileset_state(void);
@@ -2117,2 +2120,76 @@ write_internal_relation(StringInfo s, LogicalRepRelation 
*rel)
        }
 }
+
+#include "postmaster/bgworker_internals.h"
+
+typedef struct
+{
+       ConditionVariable cv;
+       slock_t mutex;
+       size_t head;
+       size_t tail;
+       TransactionId ring[MAX_PARALLEL_WORKER_LIMIT];
+} ParallelApplyShmem;
+
+static ParallelApplyShmem* pa_shmem;
+
+void
+pa_commit(TransactionId xid)
+{
+       SpinLockAcquire(&pa_shmem->mutex);
+       pa_shmem->ring[pa_shmem->head++ % MAX_PARALLEL_WORKER_LIMIT] = xid;
+       SpinLockRelease(&pa_shmem->mutex);
+       ConditionVariableBroadcast(&pa_shmem->cv);
+}
+
+
+void
+pa_before_apply_commit(void)
+{
+       TransactionId xid = MyParallelShared->xid;
+
+       if (!preserve_commit_order)
+               return;
+
+       while (true)
+       {
+               SpinLockAcquire(&pa_shmem->mutex);
+               if (pa_shmem->head > pa_shmem->tail && 
pa_shmem->ring[pa_shmem->tail % MAX_PARALLEL_WORKER_LIMIT] == xid)
+               {
+                       SpinLockRelease(&pa_shmem->mutex);
+                       break;
+               }
+               SpinLockRelease(&pa_shmem->mutex);
+               ConditionVariableSleep(&pa_shmem->cv, 
WAIT_EVENT_LOGICAL_PARALLEL_APPLY_MAIN);
+       }
+       ConditionVariableCancelSleep();
+}
+
+void
+pa_after_apply_commit(void)
+{
+       SpinLockAcquire(&pa_shmem->mutex);
+       pa_shmem->tail += 1;
+       SpinLockRelease(&pa_shmem->mutex);
+       ConditionVariableBroadcast(&pa_shmem->cv);
+}
+
+Size
+ParallelApplyShmemSize(void)
+{
+       return sizeof(ParallelApplyShmem);
+}
+
+void
+ParallelApplyShmemInit(void)
+{
+       bool            found;
+
+       pa_shmem = (ParallelApplyShmem*)ShmemInitStruct("Parallel worker 
shmem", sizeof(ParallelApplyShmem), &found);
+       if (!found)
+       {
+               pa_shmem->head = pa_shmem->tail = 0;
+               ConditionVariableInit(&pa_shmem->cv);
+               SpinLockInit(&pa_shmem->mutex);
+       }
+}
diff --git a/src/backend/replication/logical/worker.c 
b/src/backend/replication/logical/worker.c
index 22ad9051db3..bf8bfcbdd3b 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -1911,40 +1911,43 @@ apply_handle_commit(StringInfo s)
                        if (pa_send_data(winfo, s->len, s->data))
                        {
+                               pa_commit(winfo->shared->xid);
                                /* Finish processing the transaction. */
                                pa_xact_finish(winfo, commit_data.end_lsn);
                                break;
                        }

                        /*
                         * Switch to serialize mode when we are not able to 
send the
                         * change to parallel apply worker.
                         */
                        pa_switch_to_partial_serialize(winfo, true);

                        /* fall through */
                case TRANS_LEADER_PARTIAL_SERIALIZE:
                        Assert(winfo);

                        stream_open_and_write_change(remote_xid, 
LOGICAL_REP_MSG_COMMIT,
                                                                                
 &original_msg);

                        pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE);

                        /* Finish processing the transaction. */
                        pa_xact_finish(winfo, commit_data.end_lsn);
                        break;

                case TRANS_PARALLEL_APPLY:

                        /*
                         * If the parallel apply worker is applying spooled 
messages then
                         * close the file before committing.
                         */
                        if (stream_fd)
                                stream_close_file();

+                       pa_before_apply_commit();
                        apply_handle_commit_internal(&commit_data);
+                       pa_after_apply_commit();

                        MyParallelShared->last_commit_end = XactLastCommitEnd;

                        pa_commit_transaction();
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 2fa045e6b0f..71b8abc4337 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -150,6 +150,7 @@ CalculateShmemSize(int *num_semaphores)
        size = add_size(size, InjectionPointShmemSize());
        size = add_size(size, SlotSyncShmemSize());
        size = add_size(size, AioShmemSize());
+       size = add_size(size, ParallelApplyShmemSize());
 
        /* include additional requested shmem from preload libraries */
        size = add_size(size, total_addin_request);
@@ -332,6 +333,7 @@ CreateOrAttachShmemStructs(void)
        PgArchShmemInit();
        ApplyLauncherShmemInit();
        SlotSyncShmemInit();
+       ParallelApplyShmemInit();
 
        /*
         * Set up other modules that need some shared memory space
diff --git a/src/backend/utils/misc/guc_tables.c 
b/src/backend/utils/misc/guc_tables.c
index f137129209f..5b60a4c6655 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -73,6 +73,7 @@
 #include "postmaster/walsummarizer.h"
 #include "postmaster/walwriter.h"
 #include "replication/logicallauncher.h"
+#include "replication/logicalworker.h"
 #include "replication/slot.h"
 #include "replication/slotsync.h"
 #include "replication/syncrep.h"
@@ -976,6 +977,17 @@ struct config_bool ConfigureNamesBool[] =
                NULL, NULL, NULL
        },
 
+       {
+               {"preserve_commit_order", PGC_SIGHUP, REPLICATION_SUBSCRIBERS,
+                       gettext_noop("Commit LR transactions at subscriber in 
the same order as at publisher."),
+                       NULL,
+                       GUC_EXPLAIN
+               },
+               &preserve_commit_order,
+               true,
+               NULL, NULL, NULL
+       },
+
        {
                {"enable_parallel_hash", PGC_USERSET, QUERY_TUNING_METHOD,
                        gettext_noop("Enables the planner's use of parallel 
hash plans."),
diff --git a/src/include/replication/logicallauncher.h 
b/src/include/replication/logicallauncher.h
index b29453e8e4f..2efeff720f2 100644
--- a/src/include/replication/logicallauncher.h
+++ b/src/include/replication/logicallauncher.h
@@ -34,4 +34,7 @@ extern bool IsLogicalLauncher(void);
 
 extern pid_t GetLeaderApplyWorkerPid(pid_t pid);
 
+extern Size ParallelApplyShmemSize(void);
+extern void ParallelApplyShmemInit(void);
+
 #endif                                                 /* LOGICALLAUNCHER_H */
diff --git a/src/include/replication/logicalworker.h 
b/src/include/replication/logicalworker.h
index 88912606e4d..cb030eea402 100644
--- a/src/include/replication/logicalworker.h
+++ b/src/include/replication/logicalworker.h
@@ -14,6 +14,8 @@
 
 #include <signal.h>
 
+extern PGDLLIMPORT bool preserve_commit_order;
+
 extern PGDLLIMPORT volatile sig_atomic_t ParallelApplyMessagePending;
 
 extern void ApplyWorkerMain(Datum main_arg);
diff --git a/src/include/replication/worker_internal.h 
b/src/include/replication/worker_internal.h
index 7c0204dd6f4..e48a2219131 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -363,4 +385,10 @@ am_parallel_apply_worker(void)
        return isParallelApplyWorker(MyLogicalRepWorker);
 }
 
+extern void pa_before_apply_commit(void);
+
+extern void pa_after_apply_commit(void);
+
+extern void pa_commit(TransactionId xid);
+
 #endif                                                 /* WORKER_INTERNAL_H */

Reply via email to