On 2025/04/26 3:03, Masahiko Sawada wrote:
I agree with these changes.

I think that while the changes for (2) should be for v19, the changes
for (1) might be treated as a bug fix?

Agreed. I've split the patch into two parts:

0001 is for (1) and is a bug fix that should be back-patched to v16,
where parallel apply workers were introduced. Since it didn't apply
cleanly to v16, I also created a separate patch specifically for v16.

0002 is for (2) and is intended for v19.

Regards,

--
Fujii Masao
Advanced Computing Technology Center
Research and Development Headquarters
NTT DATA CORPORATION
From 711df86e4b90d2578f15606abeaa3db558434b92 Mon Sep 17 00:00:00 2001
From: Fujii Masao <fu...@postgresql.org>
Date: Mon, 28 Apr 2025 14:29:26 +0900
Subject: [PATCH v2 1/2] Fix bug that could block the startup of parallel apply
 workers.

If a logical replication worker fails to start and its parent crashes
while waiting, its worker slot can remain marked as "in use".
This can prevent new workers from starting, as the launcher may not
find a free slot or may incorrectly think the sync or parallel apply
worker limits have been reached.

To handle this, the launcher already performs garbage collection
when no free slot is found or when the sync worker limit is hit,
and then retries launching workers. However, it previously did not
trigger garbage collection when the parallel apply worker limit
was reached. As a result, stale slots could block new parallel apply
workers from starting, even though they could have been launched
after cleanup.

This commit fixes the issue by triggering garbage collection
when the parallel apply worker limit is reached as well. If stale slots
are cleared and the number of parallel apply workers drops below
the limit, new parallel apply worker can then be started successfully.

Back-patch to v16, where parallel apply workers were introduced.
---
 src/backend/replication/logical/launcher.c | 65 +++++++++++-----------
 1 file changed, 31 insertions(+), 34 deletions(-)

diff --git a/src/backend/replication/logical/launcher.c 
b/src/backend/replication/logical/launcher.c
index 10677da56b2..ac95afe4bae 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -96,7 +96,7 @@ static void logicalrep_launcher_onexit(int code, Datum arg);
 static void logicalrep_worker_onexit(int code, Datum arg);
 static void logicalrep_worker_detach(void);
 static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
-static int     logicalrep_pa_worker_count(Oid subid);
+static void logicalrep_worker_count(Oid subid, int *nsync, int 
*nparallelapply);
 static void logicalrep_launcher_attach_dshmem(void);
 static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time);
 static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid);
@@ -350,16 +350,21 @@ retry:
                }
        }
 
-       nsyncworkers = logicalrep_sync_worker_count(subid);
+       logicalrep_worker_count(subid, &nsyncworkers, &nparallelapplyworkers);
 
        now = GetCurrentTimestamp();
 
        /*
-        * If we didn't find a free slot, try to do garbage collection.  The
-        * reason we do this is because if some worker failed to start up and 
its
-        * parent has crashed while waiting, the in_use state was never cleared.
+        * If we can't start a new logical replication background worker because
+        * no free slot is available, or because the number of sync workers or
+        * parallel apply workers has reached the limit per subscriptoin, try
+        * running garbage collection. The reason we do this is because if some
+        * workers failed to start up and their parent has crashed while 
waiting,
+        * the in_use state was never cleared. By freeing up these stale worker
+        * slots, we may be able to start a new worker.
         */
-       if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)
+       if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription 
||
+               nparallelapplyworkers >= 
max_parallel_apply_workers_per_subscription)
        {
                bool            did_cleanup = false;
 
@@ -399,8 +404,6 @@ retry:
                return false;
        }
 
-       nparallelapplyworkers = logicalrep_pa_worker_count(subid);
-
        /*
         * Return false if the number of parallel apply workers reached the 
limit
         * per subscription.
@@ -844,48 +847,42 @@ logicalrep_worker_onexit(int code, Datum arg)
 int
 logicalrep_sync_worker_count(Oid subid)
 {
-       int                     i;
        int                     res = 0;
 
-       Assert(LWLockHeldByMe(LogicalRepWorkerLock));
-
-       /* Search for attached worker for a given subscription id. */
-       for (i = 0; i < max_logical_replication_workers; i++)
-       {
-               LogicalRepWorker *w = &LogicalRepCtx->workers[i];
-
-               if (isTablesyncWorker(w) && w->subid == subid)
-                       res++;
-       }
-
+       logicalrep_worker_count(subid, &res, NULL);
        return res;
 }
 
 /*
- * Count the number of registered (but not necessarily running) parallel apply
- * workers for a subscription.
+ * Count the number of registered (but not necessarily running) sync workers
+ * and parallel apply workers for a subscription.
  */
-static int
-logicalrep_pa_worker_count(Oid subid)
+static void
+logicalrep_worker_count(Oid subid, int *nsync, int *nparallelapply)
 {
-       int                     i;
-       int                     res = 0;
-
        Assert(LWLockHeldByMe(LogicalRepWorkerLock));
 
+       if (nsync != NULL)
+               *nsync = 0;
+       if (nparallelapply != NULL)
+               *nparallelapply = 0;
+
        /*
-        * Scan all attached parallel apply workers, only counting those which
-        * have the given subscription id.
+        * Scan all attached sync and parallel apply workers, only counting 
those
+        * which have the given subscription id.
         */
-       for (i = 0; i < max_logical_replication_workers; i++)
+       for (int i = 0; i < max_logical_replication_workers; i++)
        {
                LogicalRepWorker *w = &LogicalRepCtx->workers[i];
 
-               if (isParallelApplyWorker(w) && w->subid == subid)
-                       res++;
+               if (w->subid == subid)
+               {
+                       if (nsync != NULL && isTablesyncWorker(w))
+                               (*nsync)++;
+                       if (nparallelapply != NULL && isParallelApplyWorker(w))
+                               (*nparallelapply)++;
+               }
        }
-
-       return res;
 }
 
 /*
-- 
2.49.0

From e5bdcc64258e4b68f42d868b13612b771e153268 Mon Sep 17 00:00:00 2001
From: Fujii Masao <fu...@postgresql.org>
Date: Mon, 28 Apr 2025 14:29:26 +0900
Subject: [PATCH v2] Fix bug that could block the startup of parallel apply
 workers.

If a logical replication worker fails to start and its parent crashes
while waiting, its worker slot can remain marked as "in use".
This can prevent new workers from starting, as the launcher may not
find a free slot or may incorrectly think the sync or parallel apply
worker limits have been reached.

To handle this, the launcher already performs garbage collection
when no free slot is found or when the sync worker limit is hit,
and then retries launching workers. However, it previously did not
trigger garbage collection when the parallel apply worker limit
was reached. As a result, stale slots could block new parallel apply
workers from starting, even though they could have been launched
after cleanup.

This commit fixes the issue by triggering garbage collection
when the parallel apply worker limit is reached as well. If stale slots
are cleared and the number of parallel apply workers drops below
the limit, new parallel apply worker can then be started successfully.

Back-patch to v16, where parallel apply workers were introduced.
---
 src/backend/replication/logical/launcher.c | 65 +++++++++++-----------
 1 file changed, 31 insertions(+), 34 deletions(-)

diff --git a/src/backend/replication/logical/launcher.c 
b/src/backend/replication/logical/launcher.c
index 8395ae7b23c..ebf3220da01 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -102,7 +102,7 @@ static void logicalrep_launcher_onexit(int code, Datum arg);
 static void logicalrep_worker_onexit(int code, Datum arg);
 static void logicalrep_worker_detach(void);
 static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
-static int     logicalrep_pa_worker_count(Oid subid);
+static void logicalrep_worker_count(Oid subid, int *nsync, int 
*nparallelapply);
 static void logicalrep_launcher_attach_dshmem(void);
 static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time);
 static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid);
@@ -350,16 +350,21 @@ retry:
                }
        }
 
-       nsyncworkers = logicalrep_sync_worker_count(subid);
+       logicalrep_worker_count(subid, &nsyncworkers, &nparallelapplyworkers);
 
        now = GetCurrentTimestamp();
 
        /*
-        * If we didn't find a free slot, try to do garbage collection.  The
-        * reason we do this is because if some worker failed to start up and 
its
-        * parent has crashed while waiting, the in_use state was never cleared.
+        * If we can't start a new logical replication background worker because
+        * no free slot is available, or because the number of sync workers or
+        * parallel apply workers has reached the limit per subscriptoin, try
+        * running garbage collection. The reason we do this is because if some
+        * workers failed to start up and their parent has crashed while 
waiting,
+        * the in_use state was never cleared. By freeing up these stale worker
+        * slots, we may be able to start a new worker.
         */
-       if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)
+       if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription 
||
+               nparallelapplyworkers >= 
max_parallel_apply_workers_per_subscription)
        {
                bool            did_cleanup = false;
 
@@ -399,8 +404,6 @@ retry:
                return false;
        }
 
-       nparallelapplyworkers = logicalrep_pa_worker_count(subid);
-
        /*
         * Return false if the number of parallel apply workers reached the 
limit
         * per subscription.
@@ -831,48 +834,42 @@ logicalrep_worker_onexit(int code, Datum arg)
 int
 logicalrep_sync_worker_count(Oid subid)
 {
-       int                     i;
        int                     res = 0;
 
-       Assert(LWLockHeldByMe(LogicalRepWorkerLock));
-
-       /* Search for attached worker for a given subscription id. */
-       for (i = 0; i < max_logical_replication_workers; i++)
-       {
-               LogicalRepWorker *w = &LogicalRepCtx->workers[i];
-
-               if (w->subid == subid && OidIsValid(w->relid))
-                       res++;
-       }
-
+       logicalrep_worker_count(subid, &res, NULL);
        return res;
 }
 
 /*
- * Count the number of registered (but not necessarily running) parallel apply
- * workers for a subscription.
+ * Count the number of registered (but not necessarily running) sync workers
+ * and parallel apply workers for a subscription.
  */
-static int
-logicalrep_pa_worker_count(Oid subid)
+static void
+logicalrep_worker_count(Oid subid, int *nsync, int *nparallelapply)
 {
-       int                     i;
-       int                     res = 0;
-
        Assert(LWLockHeldByMe(LogicalRepWorkerLock));
 
+       if (nsync != NULL)
+               *nsync = 0;
+       if (nparallelapply != NULL)
+               *nparallelapply = 0;
+
        /*
-        * Scan all attached parallel apply workers, only counting those which
-        * have the given subscription id.
+        * Scan all attached sync and parallel apply workers, only counting 
those
+        * which have the given subscription id.
         */
-       for (i = 0; i < max_logical_replication_workers; i++)
+       for (int i = 0; i < max_logical_replication_workers; i++)
        {
                LogicalRepWorker *w = &LogicalRepCtx->workers[i];
 
-               if (w->subid == subid && isParallelApplyWorker(w))
-                       res++;
+               if (w->subid == subid)
+               {
+                       if (nsync != NULL && OidIsValid(w->relid))
+                               (*nsync)++;
+                       if (nparallelapply != NULL && isParallelApplyWorker(w))
+                               (*nparallelapply)++;
+               }
        }
-
-       return res;
 }
 
 /*
-- 
2.49.0

From e3e8335fede955bda99fc896f1f44a8249113e39 Mon Sep 17 00:00:00 2001
From: Fujii Masao <fu...@postgresql.org>
Date: Mon, 28 Apr 2025 17:12:47 +0900
Subject: [PATCH v2 2/2] Optimize slot reuse after garbage collection in
 logicalrep_worker_launch().

Previously, when logicalrep_worker_launch() ran garbage collection and
cleaned up at least one worker slot, it would rescan all worker slots to
find a free one. However, since it is guaranteed that at least one slot was
freed in this case, this additional scan was unnecessary.

This commit removes the redundant scan and makes logicalrep_worker_launch()
immediately reuse the freed slot.
---
 src/backend/replication/logical/launcher.c | 13 +++++++++++--
 1 file changed, 11 insertions(+), 2 deletions(-)

diff --git a/src/backend/replication/logical/launcher.c 
b/src/backend/replication/logical/launcher.c
index ac95afe4bae..400f06de5af 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -336,7 +336,6 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype,
         */
        LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
 
-retry:
        /* Find unused worker slot. */
        for (i = 0; i < max_logical_replication_workers; i++)
        {
@@ -386,11 +385,21 @@ retry:
 
                                logicalrep_worker_cleanup(w);
                                did_cleanup = true;
+
+                               if (worker == NULL)
+                               {
+                                       worker = w;
+                                       slot = i;
+                               }
                        }
                }
 
+               /*
+                * Count the current number of sync and parallel apply workers 
again,
+                * since garbage collection may have changed it.
+                */
                if (did_cleanup)
-                       goto retry;
+                       logicalrep_worker_count(subid, &nsyncworkers, 
&nparallelapplyworkers);
        }
 
        /*
-- 
2.49.0

Reply via email to