On 2025/04/26 3:03, Masahiko Sawada wrote:
I agree with these changes. I think that while the changes for (2) should be for v19, the changes for (1) might be treated as a bug fix?
Agreed. I've split the patch into two parts: 0001 is for (1) and is a bug fix that should be back-patched to v16, where parallel apply workers were introduced. Since it didn't apply cleanly to v16, I also created a separate patch specifically for v16. 0002 is for (2) and is intended for v19. Regards, -- Fujii Masao Advanced Computing Technology Center Research and Development Headquarters NTT DATA CORPORATION
From 711df86e4b90d2578f15606abeaa3db558434b92 Mon Sep 17 00:00:00 2001 From: Fujii Masao <fu...@postgresql.org> Date: Mon, 28 Apr 2025 14:29:26 +0900 Subject: [PATCH v2 1/2] Fix bug that could block the startup of parallel apply workers. If a logical replication worker fails to start and its parent crashes while waiting, its worker slot can remain marked as "in use". This can prevent new workers from starting, as the launcher may not find a free slot or may incorrectly think the sync or parallel apply worker limits have been reached. To handle this, the launcher already performs garbage collection when no free slot is found or when the sync worker limit is hit, and then retries launching workers. However, it previously did not trigger garbage collection when the parallel apply worker limit was reached. As a result, stale slots could block new parallel apply workers from starting, even though they could have been launched after cleanup. This commit fixes the issue by triggering garbage collection when the parallel apply worker limit is reached as well. If stale slots are cleared and the number of parallel apply workers drops below the limit, new parallel apply worker can then be started successfully. Back-patch to v16, where parallel apply workers were introduced. --- src/backend/replication/logical/launcher.c | 65 +++++++++++----------- 1 file changed, 31 insertions(+), 34 deletions(-) diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 10677da56b2..ac95afe4bae 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -96,7 +96,7 @@ static void logicalrep_launcher_onexit(int code, Datum arg); static void logicalrep_worker_onexit(int code, Datum arg); static void logicalrep_worker_detach(void); static void logicalrep_worker_cleanup(LogicalRepWorker *worker); -static int logicalrep_pa_worker_count(Oid subid); +static void logicalrep_worker_count(Oid subid, int *nsync, int *nparallelapply); static void logicalrep_launcher_attach_dshmem(void); static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time); static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid); @@ -350,16 +350,21 @@ retry: } } - nsyncworkers = logicalrep_sync_worker_count(subid); + logicalrep_worker_count(subid, &nsyncworkers, &nparallelapplyworkers); now = GetCurrentTimestamp(); /* - * If we didn't find a free slot, try to do garbage collection. The - * reason we do this is because if some worker failed to start up and its - * parent has crashed while waiting, the in_use state was never cleared. + * If we can't start a new logical replication background worker because + * no free slot is available, or because the number of sync workers or + * parallel apply workers has reached the limit per subscriptoin, try + * running garbage collection. The reason we do this is because if some + * workers failed to start up and their parent has crashed while waiting, + * the in_use state was never cleared. By freeing up these stale worker + * slots, we may be able to start a new worker. */ - if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription) + if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription || + nparallelapplyworkers >= max_parallel_apply_workers_per_subscription) { bool did_cleanup = false; @@ -399,8 +404,6 @@ retry: return false; } - nparallelapplyworkers = logicalrep_pa_worker_count(subid); - /* * Return false if the number of parallel apply workers reached the limit * per subscription. @@ -844,48 +847,42 @@ logicalrep_worker_onexit(int code, Datum arg) int logicalrep_sync_worker_count(Oid subid) { - int i; int res = 0; - Assert(LWLockHeldByMe(LogicalRepWorkerLock)); - - /* Search for attached worker for a given subscription id. */ - for (i = 0; i < max_logical_replication_workers; i++) - { - LogicalRepWorker *w = &LogicalRepCtx->workers[i]; - - if (isTablesyncWorker(w) && w->subid == subid) - res++; - } - + logicalrep_worker_count(subid, &res, NULL); return res; } /* - * Count the number of registered (but not necessarily running) parallel apply - * workers for a subscription. + * Count the number of registered (but not necessarily running) sync workers + * and parallel apply workers for a subscription. */ -static int -logicalrep_pa_worker_count(Oid subid) +static void +logicalrep_worker_count(Oid subid, int *nsync, int *nparallelapply) { - int i; - int res = 0; - Assert(LWLockHeldByMe(LogicalRepWorkerLock)); + if (nsync != NULL) + *nsync = 0; + if (nparallelapply != NULL) + *nparallelapply = 0; + /* - * Scan all attached parallel apply workers, only counting those which - * have the given subscription id. + * Scan all attached sync and parallel apply workers, only counting those + * which have the given subscription id. */ - for (i = 0; i < max_logical_replication_workers; i++) + for (int i = 0; i < max_logical_replication_workers; i++) { LogicalRepWorker *w = &LogicalRepCtx->workers[i]; - if (isParallelApplyWorker(w) && w->subid == subid) - res++; + if (w->subid == subid) + { + if (nsync != NULL && isTablesyncWorker(w)) + (*nsync)++; + if (nparallelapply != NULL && isParallelApplyWorker(w)) + (*nparallelapply)++; + } } - - return res; } /* -- 2.49.0
From e5bdcc64258e4b68f42d868b13612b771e153268 Mon Sep 17 00:00:00 2001 From: Fujii Masao <fu...@postgresql.org> Date: Mon, 28 Apr 2025 14:29:26 +0900 Subject: [PATCH v2] Fix bug that could block the startup of parallel apply workers. If a logical replication worker fails to start and its parent crashes while waiting, its worker slot can remain marked as "in use". This can prevent new workers from starting, as the launcher may not find a free slot or may incorrectly think the sync or parallel apply worker limits have been reached. To handle this, the launcher already performs garbage collection when no free slot is found or when the sync worker limit is hit, and then retries launching workers. However, it previously did not trigger garbage collection when the parallel apply worker limit was reached. As a result, stale slots could block new parallel apply workers from starting, even though they could have been launched after cleanup. This commit fixes the issue by triggering garbage collection when the parallel apply worker limit is reached as well. If stale slots are cleared and the number of parallel apply workers drops below the limit, new parallel apply worker can then be started successfully. Back-patch to v16, where parallel apply workers were introduced. --- src/backend/replication/logical/launcher.c | 65 +++++++++++----------- 1 file changed, 31 insertions(+), 34 deletions(-) diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 8395ae7b23c..ebf3220da01 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -102,7 +102,7 @@ static void logicalrep_launcher_onexit(int code, Datum arg); static void logicalrep_worker_onexit(int code, Datum arg); static void logicalrep_worker_detach(void); static void logicalrep_worker_cleanup(LogicalRepWorker *worker); -static int logicalrep_pa_worker_count(Oid subid); +static void logicalrep_worker_count(Oid subid, int *nsync, int *nparallelapply); static void logicalrep_launcher_attach_dshmem(void); static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time); static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid); @@ -350,16 +350,21 @@ retry: } } - nsyncworkers = logicalrep_sync_worker_count(subid); + logicalrep_worker_count(subid, &nsyncworkers, &nparallelapplyworkers); now = GetCurrentTimestamp(); /* - * If we didn't find a free slot, try to do garbage collection. The - * reason we do this is because if some worker failed to start up and its - * parent has crashed while waiting, the in_use state was never cleared. + * If we can't start a new logical replication background worker because + * no free slot is available, or because the number of sync workers or + * parallel apply workers has reached the limit per subscriptoin, try + * running garbage collection. The reason we do this is because if some + * workers failed to start up and their parent has crashed while waiting, + * the in_use state was never cleared. By freeing up these stale worker + * slots, we may be able to start a new worker. */ - if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription) + if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription || + nparallelapplyworkers >= max_parallel_apply_workers_per_subscription) { bool did_cleanup = false; @@ -399,8 +404,6 @@ retry: return false; } - nparallelapplyworkers = logicalrep_pa_worker_count(subid); - /* * Return false if the number of parallel apply workers reached the limit * per subscription. @@ -831,48 +834,42 @@ logicalrep_worker_onexit(int code, Datum arg) int logicalrep_sync_worker_count(Oid subid) { - int i; int res = 0; - Assert(LWLockHeldByMe(LogicalRepWorkerLock)); - - /* Search for attached worker for a given subscription id. */ - for (i = 0; i < max_logical_replication_workers; i++) - { - LogicalRepWorker *w = &LogicalRepCtx->workers[i]; - - if (w->subid == subid && OidIsValid(w->relid)) - res++; - } - + logicalrep_worker_count(subid, &res, NULL); return res; } /* - * Count the number of registered (but not necessarily running) parallel apply - * workers for a subscription. + * Count the number of registered (but not necessarily running) sync workers + * and parallel apply workers for a subscription. */ -static int -logicalrep_pa_worker_count(Oid subid) +static void +logicalrep_worker_count(Oid subid, int *nsync, int *nparallelapply) { - int i; - int res = 0; - Assert(LWLockHeldByMe(LogicalRepWorkerLock)); + if (nsync != NULL) + *nsync = 0; + if (nparallelapply != NULL) + *nparallelapply = 0; + /* - * Scan all attached parallel apply workers, only counting those which - * have the given subscription id. + * Scan all attached sync and parallel apply workers, only counting those + * which have the given subscription id. */ - for (i = 0; i < max_logical_replication_workers; i++) + for (int i = 0; i < max_logical_replication_workers; i++) { LogicalRepWorker *w = &LogicalRepCtx->workers[i]; - if (w->subid == subid && isParallelApplyWorker(w)) - res++; + if (w->subid == subid) + { + if (nsync != NULL && OidIsValid(w->relid)) + (*nsync)++; + if (nparallelapply != NULL && isParallelApplyWorker(w)) + (*nparallelapply)++; + } } - - return res; } /* -- 2.49.0
From e3e8335fede955bda99fc896f1f44a8249113e39 Mon Sep 17 00:00:00 2001 From: Fujii Masao <fu...@postgresql.org> Date: Mon, 28 Apr 2025 17:12:47 +0900 Subject: [PATCH v2 2/2] Optimize slot reuse after garbage collection in logicalrep_worker_launch(). Previously, when logicalrep_worker_launch() ran garbage collection and cleaned up at least one worker slot, it would rescan all worker slots to find a free one. However, since it is guaranteed that at least one slot was freed in this case, this additional scan was unnecessary. This commit removes the redundant scan and makes logicalrep_worker_launch() immediately reuse the freed slot. --- src/backend/replication/logical/launcher.c | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index ac95afe4bae..400f06de5af 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -336,7 +336,6 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype, */ LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE); -retry: /* Find unused worker slot. */ for (i = 0; i < max_logical_replication_workers; i++) { @@ -386,11 +385,21 @@ retry: logicalrep_worker_cleanup(w); did_cleanup = true; + + if (worker == NULL) + { + worker = w; + slot = i; + } } } + /* + * Count the current number of sync and parallel apply workers again, + * since garbage collection may have changed it. + */ if (did_cleanup) - goto retry; + logicalrep_worker_count(subid, &nsyncworkers, &nparallelapplyworkers); } /* -- 2.49.0