I found some additional places that should remove the last-start time from
the hash table.  I've added those in v14.

On Fri, Jan 06, 2023 at 10:30:18AM +0530, Amit Kapila wrote:
> On Thu, Jan 5, 2023 at 10:49 PM Nathan Bossart <nathandboss...@gmail.com> 
> wrote:
>> On Thu, Jan 05, 2023 at 11:34:37AM +0530, Amit Kapila wrote:
>> > On Thu, Jan 5, 2023 at 10:16 AM Nathan Bossart <nathandboss...@gmail.com> 
>> > wrote:
>> >> In v12, I moved the restart for two_phase mode to the end of
>> >> process_syncing_tables_for_apply() so that we don't need to rely on 
>> >> another
>> >> iteration of the loop.
>> >
>> > This should work but it is better to add a comment before calling
>> > CommandCounterIncrement() to indicate that this is for making changes
>> > to the relation state visible.
>>
>> Will do.
> 
> Isn't it better to move this part into a separate patch as this is
> useful even without the main patch to improve wakeups?

І moved it to a separate patch in v14.

>> > Thinking along similar lines, won't apply worker need to be notified
>> > of SUBREL_STATE_SYNCWAIT state change by the tablesync worker?
>>
>> wait_for_worker_state_change() should notify the apply worker in this case.
> 
> I think this is yet to be included in the patch, right?

This is already present on HEAD.

-- 
Nathan Bossart
Amazon Web Services: https://aws.amazon.com
>From 955111b35d9d416b8d895649525b25c03a15e653 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nathandboss...@gmail.com>
Date: Thu, 5 Jan 2023 21:17:42 -0800
Subject: [PATCH v14 1/3] move restart for two_phase mode to end of
 process_syncing_tables_for_apply()

---
 src/backend/replication/logical/tablesync.c | 48 +++++++++++----------
 1 file changed, 26 insertions(+), 22 deletions(-)

diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 2fdfeb5b4c..280fed8cdb 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -415,6 +415,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 	static HTAB *last_start_times = NULL;
 	ListCell   *lc;
 	bool		started_tx = false;
+	bool		should_exit = false;
 
 	Assert(!IsTransactionState());
 
@@ -446,28 +447,6 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 		last_start_times = NULL;
 	}
 
-	/*
-	 * Even when the two_phase mode is requested by the user, it remains as
-	 * 'pending' until all tablesyncs have reached READY state.
-	 *
-	 * When this happens, we restart the apply worker and (if the conditions
-	 * are still ok) then the two_phase tri-state will become 'enabled' at
-	 * that time.
-	 *
-	 * Note: If the subscription has no tables then leave the state as
-	 * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
-	 * work.
-	 */
-	if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING &&
-		AllTablesyncsReady())
-	{
-		ereport(LOG,
-				(errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled",
-						MySubscription->name)));
-
-		proc_exit(0);
-	}
-
 	/*
 	 * Process all tables that are being synchronized.
 	 */
@@ -619,9 +598,34 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 
 	if (started_tx)
 	{
+		/*
+		 * Even when the two_phase mode is requested by the user, it remains as
+		 * 'pending' until all tablesyncs have reached READY state.
+		 *
+		 * When this happens, we restart the apply worker and (if the conditions
+		 * are still ok) then the two_phase tri-state will become 'enabled' at
+		 * that time.
+		 *
+		 * Note: If the subscription has no tables then leave the state as
+		 * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
+		 * work.
+		 */
+		CommandCounterIncrement();	/* make updates visible */
+		if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING &&
+			AllTablesyncsReady())
+		{
+			ereport(LOG,
+					(errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled",
+							MySubscription->name)));
+			should_exit = true;
+		}
+
 		CommitTransactionCommand();
 		pgstat_report_stat(true);
 	}
+
+	if (should_exit)
+		proc_exit(0);
 }
 
 /*
-- 
2.25.1

>From 3018a4029b573c69f6674b86735d0d94c1350eb4 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nathandboss...@gmail.com>
Date: Mon, 21 Nov 2022 16:01:01 -0800
Subject: [PATCH v14 2/3] wake up logical workers as needed instead of relying
 on periodic wakeups

---
 src/backend/access/transam/xact.c        |  3 ++
 src/backend/commands/alter.c             |  7 ++++
 src/backend/commands/subscriptioncmds.c  |  7 ++++
 src/backend/replication/logical/worker.c | 46 ++++++++++++++++++++++++
 src/include/replication/logicalworker.h  |  3 ++
 5 files changed, 66 insertions(+)

diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 24221542e7..54145bf805 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -47,6 +47,7 @@
 #include "pgstat.h"
 #include "replication/logical.h"
 #include "replication/logicallauncher.h"
+#include "replication/logicalworker.h"
 #include "replication/origin.h"
 #include "replication/snapbuild.h"
 #include "replication/syncrep.h"
@@ -2360,6 +2361,7 @@ CommitTransaction(void)
 	AtEOXact_PgStat(true, is_parallel_worker);
 	AtEOXact_Snapshot(true, false);
 	AtEOXact_ApplyLauncher(true);
+	AtEOXact_LogicalRepWorkers(true);
 	pgstat_report_xact_timestamp(0);
 
 	CurrentResourceOwner = NULL;
@@ -2860,6 +2862,7 @@ AbortTransaction(void)
 		AtEOXact_HashTables(false);
 		AtEOXact_PgStat(false, is_parallel_worker);
 		AtEOXact_ApplyLauncher(false);
+		AtEOXact_LogicalRepWorkers(false);
 		pgstat_report_xact_timestamp(0);
 	}
 
diff --git a/src/backend/commands/alter.c b/src/backend/commands/alter.c
index 70d359eb6a..4e8102b59f 100644
--- a/src/backend/commands/alter.c
+++ b/src/backend/commands/alter.c
@@ -59,6 +59,7 @@
 #include "commands/user.h"
 #include "miscadmin.h"
 #include "parser/parse_func.h"
+#include "replication/logicalworker.h"
 #include "rewrite/rewriteDefine.h"
 #include "tcop/utility.h"
 #include "utils/builtins.h"
@@ -279,6 +280,12 @@ AlterObjectRename_internal(Relation rel, Oid objectId, const char *new_name)
 		if (strncmp(new_name, "regress_", 8) != 0)
 			elog(WARNING, "subscriptions created by regression test cases should have names starting with \"regress_\"");
 #endif
+
+		/*
+		 * Wake up the logical replication workers to handle this change
+		 * quickly.
+		 */
+		LogicalRepWorkersWakeupAtCommit(objectId);
 	}
 	else if (nameCacheId >= 0)
 	{
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index b9c5df796f..c0fd82cdf2 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -34,6 +34,7 @@
 #include "nodes/makefuncs.h"
 #include "pgstat.h"
 #include "replication/logicallauncher.h"
+#include "replication/logicalworker.h"
 #include "replication/origin.h"
 #include "replication/slot.h"
 #include "replication/walreceiver.h"
@@ -1362,6 +1363,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 
 	InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0);
 
+	/* Wake up the logical replication workers to handle this change quickly. */
+	LogicalRepWorkersWakeupAtCommit(subid);
+
 	return myself;
 }
 
@@ -1733,6 +1737,9 @@ AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
 							  form->oid, 0);
 
 	ApplyLauncherWakeupAtCommit();
+
+	/* Wake up the logical replication workers to handle this change quickly. */
+	LogicalRepWorkersWakeupAtCommit(form->oid);
 }
 
 /*
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index f8e8cf71eb..cf38ed821e 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -254,6 +254,8 @@ WalReceiverConn *LogRepWorkerWalRcvConn = NULL;
 Subscription *MySubscription = NULL;
 static bool MySubscriptionValid = false;
 
+static List *on_commit_wakeup_workers_subids = NIL;
+
 bool		in_remote_transaction = false;
 static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
 
@@ -4092,3 +4094,47 @@ reset_apply_error_context_info(void)
 	apply_error_callback_arg.remote_attnum = -1;
 	set_apply_error_context_xact(InvalidTransactionId, InvalidXLogRecPtr);
 }
+
+/*
+ * Wakeup the stored subscriptions' workers on commit if requested.
+ */
+void
+AtEOXact_LogicalRepWorkers(bool isCommit)
+{
+	if (isCommit && on_commit_wakeup_workers_subids != NIL)
+	{
+		ListCell   *subid;
+
+		LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+		foreach(subid, on_commit_wakeup_workers_subids)
+		{
+			List	   *workers;
+			ListCell   *worker;
+
+			workers = logicalrep_workers_find(lfirst_oid(subid), true);
+			foreach(worker, workers)
+				logicalrep_worker_wakeup_ptr((LogicalRepWorker *) lfirst(worker));
+		}
+		LWLockRelease(LogicalRepWorkerLock);
+	}
+
+	on_commit_wakeup_workers_subids = NIL;
+}
+
+/*
+ * Request wakeup of the workers for the given subscription ID on commit of the
+ * transaction.
+ *
+ * This is used to ensure that the workers process assorted changes as soon as
+ * possible.
+ */
+void
+LogicalRepWorkersWakeupAtCommit(Oid subid)
+{
+	MemoryContext oldcxt;
+
+	oldcxt = MemoryContextSwitchTo(TopTransactionContext);
+	on_commit_wakeup_workers_subids = list_append_unique_oid(on_commit_wakeup_workers_subids,
+															 subid);
+	MemoryContextSwitchTo(oldcxt);
+}
diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h
index f1e7e8a348..3b2084f81f 100644
--- a/src/include/replication/logicalworker.h
+++ b/src/include/replication/logicalworker.h
@@ -16,4 +16,7 @@ extern void ApplyWorkerMain(Datum main_arg);
 
 extern bool IsLogicalWorker(void);
 
+extern void LogicalRepWorkersWakeupAtCommit(Oid subid);
+extern void AtEOXact_LogicalRepWorkers(bool isCommit);
+
 #endif							/* LOGICALWORKER_H */
-- 
2.25.1

>From a1354698bb06ed645595a1c233e506a16ae3108c Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nathandboss...@gmail.com>
Date: Thu, 5 Jan 2023 14:22:53 -0800
Subject: [PATCH v14 3/3] bypass wal_retrieve_retry_interval for logical
 workers as appropriate

---
 doc/src/sgml/monitoring.sgml                |  10 +
 src/backend/commands/subscriptioncmds.c     |   8 +
 src/backend/replication/logical/launcher.c  | 201 +++++++++++++++-----
 src/backend/replication/logical/tablesync.c |   7 +
 src/backend/replication/logical/worker.c    |  18 ++
 src/backend/storage/lmgr/lwlock.c           |   4 +
 src/include/replication/logicallauncher.h   |   2 +
 src/include/storage/lwlock.h                |   2 +
 8 files changed, 207 insertions(+), 45 deletions(-)

diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 5bcba0fdec..8f06e234c8 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -1991,6 +1991,16 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
       <entry>Waiting to read or update information
        about <quote>heavyweight</quote> locks.</entry>
      </row>
+     <row>
+      <entry><literal>LogicalRepLauncherDSA</literal></entry>
+      <entry>Waiting for logical replication launcher dynamic shared memory
+      allocator access</entry>
+     </row>
+     <row>
+      <entry><literal>LogicalRepLauncherHash</literal></entry>
+      <entry>Waiting for logical replication launcher shared memory hash table
+      access</entry>
+     </row>
      <row>
       <entry><literal>LogicalRepWorker</literal></entry>
       <entry>Waiting to read or update the state of logical replication
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index c0fd82cdf2..6c2c10712e 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -1504,6 +1504,14 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	}
 	list_free(subworkers);
 
+	/*
+	 * Clear the last-start time for the apply worker to free up space.  If
+	 * this transaction rolls back, the launcher might restart the apply worker
+	 * before wal_retrieve_retry_interval milliseconds have elapsed, but that's
+	 * probably okay.
+	 */
+	logicalrep_launcher_delete_last_start_time(subid);
+
 	/*
 	 * Cleanup of tablesync replication origins.
 	 *
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index a69e371c05..dfe49db64f 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -25,6 +25,7 @@
 #include "catalog/pg_subscription.h"
 #include "catalog/pg_subscription_rel.h"
 #include "funcapi.h"
+#include "lib/dshash.h"
 #include "libpq/pqsignal.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -56,6 +57,25 @@
 int			max_logical_replication_workers = 4;
 int			max_sync_workers_per_subscription = 2;
 
+/* an entry in the last-start times hash table */
+typedef struct LauncherLastStartTimesEntry
+{
+	Oid		subid;
+	TimestampTz last_start_time;
+} LauncherLastStartTimesEntry;
+
+/* parameters for the last-start times hash table */
+static const dshash_parameters dsh_params = {
+	sizeof(Oid),
+	sizeof(LauncherLastStartTimesEntry),
+	dshash_memcmp,
+	dshash_memhash,
+	LWTRANCHE_LAUNCHER_HASH
+};
+
+static dsa_area *last_start_times_dsa = NULL;
+static dshash_table *last_start_times = NULL;
+
 LogicalRepWorker *MyLogicalRepWorker = NULL;
 
 typedef struct LogicalRepCtxStruct
@@ -63,6 +83,10 @@ typedef struct LogicalRepCtxStruct
 	/* Supervisor process. */
 	pid_t		launcher_pid;
 
+	/* hash table for last-start times */
+	dsa_handle	last_start_dsa;
+	dshash_table_handle last_start_dsh;
+
 	/* Background workers. */
 	LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER];
 } LogicalRepCtxStruct;
@@ -74,6 +98,9 @@ static void logicalrep_launcher_onexit(int code, Datum arg);
 static void logicalrep_worker_onexit(int code, Datum arg);
 static void logicalrep_worker_detach(void);
 static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
+static void logicalrep_launcher_attach_dshmem(void);
+static void logicalrep_launcher_set_last_start_time(Oid subid, TimestampTz start_time);
+static TimestampTz logicalrep_launcher_get_last_start_time(Oid subid);
 
 static bool on_commit_launcher_wakeup = false;
 
@@ -756,6 +783,9 @@ ApplyLauncherShmemInit(void)
 			memset(worker, 0, sizeof(LogicalRepWorker));
 			SpinLockInit(&worker->relmutex);
 		}
+
+		LogicalRepCtx->last_start_dsa = DSM_HANDLE_INVALID;
+		LogicalRepCtx->last_start_dsh = DSM_HANDLE_INVALID;
 	}
 }
 
@@ -801,8 +831,6 @@ ApplyLauncherWakeup(void)
 void
 ApplyLauncherMain(Datum main_arg)
 {
-	TimestampTz last_start_time = 0;
-
 	ereport(DEBUG1,
 			(errmsg_internal("logical replication launcher started")));
 
@@ -837,58 +865,55 @@ ApplyLauncherMain(Datum main_arg)
 
 		now = GetCurrentTimestamp();
 
-		/* Limit the start retry to once a wal_retrieve_retry_interval */
-		if (TimestampDifferenceExceeds(last_start_time, now,
-									   wal_retrieve_retry_interval))
-		{
-			/* Use temporary context for the database list and worker info. */
-			subctx = AllocSetContextCreate(TopMemoryContext,
-										   "Logical Replication Launcher sublist",
-										   ALLOCSET_DEFAULT_SIZES);
-			oldctx = MemoryContextSwitchTo(subctx);
-
-			/* search for subscriptions to start or stop. */
-			sublist = get_subscription_list();
-
-			/* Start the missing workers for enabled subscriptions. */
-			foreach(lc, sublist)
-			{
-				Subscription *sub = (Subscription *) lfirst(lc);
-				LogicalRepWorker *w;
+		/* Use temporary context for the database list and worker info. */
+		subctx = AllocSetContextCreate(TopMemoryContext,
+									   "Logical Replication Launcher sublist",
+									   ALLOCSET_DEFAULT_SIZES);
+		oldctx = MemoryContextSwitchTo(subctx);
 
-				if (!sub->enabled)
-					continue;
+		sublist = get_subscription_list();
+		foreach(lc, sublist)
+		{
+			Subscription *sub = (Subscription *) lfirst(lc);
+			LogicalRepWorker *w;
+			TimestampTz last_start;
 
-				LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
-				w = logicalrep_worker_find(sub->oid, InvalidOid, false);
-				LWLockRelease(LogicalRepWorkerLock);
+			if (!sub->enabled)
+				continue;
 
-				if (w == NULL)
-				{
-					last_start_time = now;
-					wait_time = wal_retrieve_retry_interval;
+			LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
+			w = logicalrep_worker_find(sub->oid, InvalidOid, false);
+			LWLockRelease(LogicalRepWorkerLock);
 
-					logicalrep_worker_launch(sub->dbid, sub->oid, sub->name,
-											 sub->owner, InvalidOid);
-				}
-			}
+			if (w != NULL)
+				continue;
 
-			/* Switch back to original memory context. */
-			MemoryContextSwitchTo(oldctx);
-			/* Clean the temporary memory. */
-			MemoryContextDelete(subctx);
-		}
-		else
-		{
 			/*
-			 * The wait in previous cycle was interrupted in less than
-			 * wal_retrieve_retry_interval since last worker was started, this
-			 * usually means crash of the worker, so we should retry in
-			 * wal_retrieve_retry_interval again.
+			 * If the worker is eligible to start now, launch it.  Otherwise,
+			 * adjust wait_time so that we wake up when it can be started.
 			 */
-			wait_time = wal_retrieve_retry_interval;
+			last_start = logicalrep_launcher_get_last_start_time(sub->oid);
+			if (TimestampDifferenceExceeds(last_start, now,
+										   wal_retrieve_retry_interval))
+			{
+				logicalrep_launcher_set_last_start_time(sub->oid, now);
+				logicalrep_worker_launch(sub->dbid, sub->oid, sub->name,
+										 sub->owner, InvalidOid);
+			}
+			else
+			{
+				long		elapsed;
+
+				elapsed = TimestampDifferenceMilliseconds(last_start, now);
+				wait_time = Min(wait_time, wal_retrieve_retry_interval - elapsed);
+			}
 		}
 
+		/* Switch back to original memory context. */
+		MemoryContextSwitchTo(oldctx);
+		/* Clean the temporary memory. */
+		MemoryContextDelete(subctx);
+
 		/* Wait for more work. */
 		rc = WaitLatch(MyLatch,
 					   WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
@@ -996,3 +1021,89 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
 
 	return (Datum) 0;
 }
+
+/*
+ * Initialize or attach to the dynamic shared hash table that stores the
+ * last-start times, if not already done.  This must be called before using the
+ * table.
+ */
+static void
+logicalrep_launcher_attach_dshmem(void)
+{
+	MemoryContext oldcontext;
+
+	oldcontext = MemoryContextSwitchTo(TopMemoryContext);
+	LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
+
+	if (LogicalRepCtx->last_start_dsh == DSM_HANDLE_INVALID)
+	{
+		/* Initialize dynamic shared hash table for last-start times. */
+		last_start_times_dsa = dsa_create(LWTRANCHE_LAUNCHER_DSA);
+		dsa_pin(last_start_times_dsa);
+		dsa_pin_mapping(last_start_times_dsa);
+		last_start_times = dshash_create(last_start_times_dsa, &dsh_params, 0);
+
+		/* Store handles in shared memory for other backends to use. */
+		LogicalRepCtx->last_start_dsa = dsa_get_handle(last_start_times_dsa);
+		LogicalRepCtx->last_start_dsh = dshash_get_hash_table_handle(last_start_times);
+	}
+	else if (!last_start_times)
+	{
+		/* Attach to existing dynamic shared hash table. */
+		last_start_times_dsa = dsa_attach(LogicalRepCtx->last_start_dsa);
+		dsa_pin_mapping(last_start_times_dsa);
+		last_start_times = dshash_attach(last_start_times_dsa, &dsh_params,
+										 LogicalRepCtx->last_start_dsh, 0);
+	}
+
+	LWLockRelease(LogicalRepWorkerLock);
+	MemoryContextSwitchTo(oldcontext);
+}
+
+/*
+ * Set the last-start time for the subscription.
+ */
+static void
+logicalrep_launcher_set_last_start_time(Oid subid, TimestampTz start_time)
+{
+	LauncherLastStartTimesEntry *entry;
+	bool		found;
+
+	logicalrep_launcher_attach_dshmem();
+
+	entry = dshash_find_or_insert(last_start_times, &subid, &found);
+	entry->last_start_time = start_time;
+	dshash_release_lock(last_start_times, entry);
+}
+
+/*
+ * Return the last-start time for the subscription, or 0 if there isn't one.
+ */
+static TimestampTz
+logicalrep_launcher_get_last_start_time(Oid subid)
+{
+	LauncherLastStartTimesEntry *entry;
+	TimestampTz ret;
+
+	logicalrep_launcher_attach_dshmem();
+
+	entry = dshash_find(last_start_times, &subid, false);
+	if (entry == NULL)
+		return 0;
+
+	ret = entry->last_start_time;
+	dshash_release_lock(last_start_times, entry);
+
+	return ret;
+}
+
+/*
+ * Remove the last-start time for the subscription, if one exists.
+ */
+void
+logicalrep_launcher_delete_last_start_time(Oid subid)
+{
+	logicalrep_launcher_attach_dshmem();
+
+	(void) dshash_delete_key(last_start_times, &subid);
+}
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 280fed8cdb..adc1043211 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -617,6 +617,13 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 			ereport(LOG,
 					(errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled",
 							MySubscription->name)));
+
+			/*
+			 * Clear the last-start time for the worker so that the launcher
+			 * restarts it immediately, bypassing wal_retrieve_retry_interval.
+			 */
+			logicalrep_launcher_delete_last_start_time(MySubscription->oid);
+
 			should_exit = true;
 		}
 
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index cf38ed821e..ac125daad5 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -164,6 +164,7 @@
 #include "postmaster/walwriter.h"
 #include "replication/decode.h"
 #include "replication/logical.h"
+#include "replication/logicallauncher.h"
 #include "replication/logicalproto.h"
 #include "replication/logicalrelation.h"
 #include "replication/logicalworker.h"
@@ -3089,6 +3090,8 @@ maybe_reread_subscription(void)
 						"stop because the subscription was removed",
 						MySubscription->name)));
 
+		if (!am_tablesync_worker())
+			logicalrep_launcher_delete_last_start_time(MyLogicalRepWorker->subid);
 		proc_exit(0);
 	}
 
@@ -3100,6 +3103,8 @@ maybe_reread_subscription(void)
 						"stop because the subscription was disabled",
 						MySubscription->name)));
 
+		if (!am_tablesync_worker())
+			logicalrep_launcher_delete_last_start_time(MyLogicalRepWorker->subid);
 		proc_exit(0);
 	}
 
@@ -3126,6 +3131,13 @@ maybe_reread_subscription(void)
 				(errmsg("logical replication apply worker for subscription \"%s\" will restart because of a parameter change",
 						MySubscription->name)));
 
+		/*
+		 * Clear the last-start time for the apply worker so that the launcher
+		 * will restart it immediately, bypassing wal_retrieve_retry_interval.
+		 */
+		if (!am_tablesync_worker())
+			logicalrep_launcher_delete_last_start_time(MySubscription->oid);
+
 		proc_exit(0);
 	}
 
@@ -3671,6 +3683,8 @@ ApplyWorkerMain(Datum main_arg)
 				(errmsg("logical replication apply worker for subscription %u will not "
 						"start because the subscription was removed during startup",
 						MyLogicalRepWorker->subid)));
+		if (!am_tablesync_worker())
+			logicalrep_launcher_delete_last_start_time(MyLogicalRepWorker->subid);
 		proc_exit(0);
 	}
 
@@ -3684,6 +3698,8 @@ ApplyWorkerMain(Datum main_arg)
 						"start because the subscription was disabled during startup",
 						MySubscription->name)));
 
+		if (!am_tablesync_worker())
+			logicalrep_launcher_delete_last_start_time(MyLogicalRepWorker->subid);
 		proc_exit(0);
 	}
 
@@ -3884,6 +3900,8 @@ DisableSubscriptionAndExit(void)
 			errmsg("subscription \"%s\" has been disabled because of an error",
 				   MySubscription->name));
 
+	if (!am_tablesync_worker())
+		logicalrep_launcher_delete_last_start_time(MyLogicalRepWorker->subid);
 	proc_exit(0);
 }
 
diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c
index 196bece0a3..d2ec396045 100644
--- a/src/backend/storage/lmgr/lwlock.c
+++ b/src/backend/storage/lmgr/lwlock.c
@@ -186,6 +186,10 @@ static const char *const BuiltinTrancheNames[] = {
 	"PgStatsHash",
 	/* LWTRANCHE_PGSTATS_DATA: */
 	"PgStatsData",
+	/* LWTRANCHE_LAUNCHER_DSA: */
+	"LogicalRepLauncherDSA",
+	/* LWTRANCHE_LAUNCHER_HASH: */
+	"LogicalRepLauncherHash",
 };
 
 StaticAssertDecl(lengthof(BuiltinTrancheNames) ==
diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h
index e1661b6c91..00f6f89d72 100644
--- a/src/include/replication/logicallauncher.h
+++ b/src/include/replication/logicallauncher.h
@@ -26,4 +26,6 @@ extern void AtEOXact_ApplyLauncher(bool isCommit);
 
 extern bool IsLogicalLauncher(void);
 
+extern void logicalrep_launcher_delete_last_start_time(Oid subid);
+
 #endif							/* LOGICALLAUNCHER_H */
diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h
index e4162db613..d2c7afb8f4 100644
--- a/src/include/storage/lwlock.h
+++ b/src/include/storage/lwlock.h
@@ -204,6 +204,8 @@ typedef enum BuiltinTrancheIds
 	LWTRANCHE_PGSTATS_DSA,
 	LWTRANCHE_PGSTATS_HASH,
 	LWTRANCHE_PGSTATS_DATA,
+	LWTRANCHE_LAUNCHER_DSA,
+	LWTRANCHE_LAUNCHER_HASH,
 	LWTRANCHE_FIRST_USER_DEFINED
 }			BuiltinTrancheIds;
 
-- 
2.25.1

Reply via email to