On Thu, Jan 05, 2023 at 09:29:24AM -0800, Nathan Bossart wrote:
> On Thu, Jan 05, 2023 at 10:57:58AM +0530, Amit Kapila wrote:
>> True, if we want we can use dshash for this.
> 
> I'll look into this.

Here is an attempt at using dshash.  This is quite a bit cleaner since we
don't need garbage collection or the flag in the worker slots.  There is
some extra work required to set up the table, but it doesn't seem too bad.

-- 
Nathan Bossart
Amazon Web Services: https://aws.amazon.com
>From 8853500e9b238b0d07871940d4e7db9e43802b8b Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nathandboss...@gmail.com>
Date: Mon, 21 Nov 2022 16:01:01 -0800
Subject: [PATCH v13 1/2] wake up logical workers as needed instead of relying
 on periodic wakeups

---
 src/backend/access/transam/xact.c           |  3 ++
 src/backend/commands/alter.c                |  7 +++
 src/backend/commands/subscriptioncmds.c     |  7 +++
 src/backend/replication/logical/tablesync.c | 49 ++++++++++++---------
 src/backend/replication/logical/worker.c    | 46 +++++++++++++++++++
 src/include/replication/logicalworker.h     |  3 ++
 6 files changed, 93 insertions(+), 22 deletions(-)

diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 24221542e7..54145bf805 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -47,6 +47,7 @@
 #include "pgstat.h"
 #include "replication/logical.h"
 #include "replication/logicallauncher.h"
+#include "replication/logicalworker.h"
 #include "replication/origin.h"
 #include "replication/snapbuild.h"
 #include "replication/syncrep.h"
@@ -2360,6 +2361,7 @@ CommitTransaction(void)
 	AtEOXact_PgStat(true, is_parallel_worker);
 	AtEOXact_Snapshot(true, false);
 	AtEOXact_ApplyLauncher(true);
+	AtEOXact_LogicalRepWorkers(true);
 	pgstat_report_xact_timestamp(0);
 
 	CurrentResourceOwner = NULL;
@@ -2860,6 +2862,7 @@ AbortTransaction(void)
 		AtEOXact_HashTables(false);
 		AtEOXact_PgStat(false, is_parallel_worker);
 		AtEOXact_ApplyLauncher(false);
+		AtEOXact_LogicalRepWorkers(false);
 		pgstat_report_xact_timestamp(0);
 	}
 
diff --git a/src/backend/commands/alter.c b/src/backend/commands/alter.c
index 70d359eb6a..4e8102b59f 100644
--- a/src/backend/commands/alter.c
+++ b/src/backend/commands/alter.c
@@ -59,6 +59,7 @@
 #include "commands/user.h"
 #include "miscadmin.h"
 #include "parser/parse_func.h"
+#include "replication/logicalworker.h"
 #include "rewrite/rewriteDefine.h"
 #include "tcop/utility.h"
 #include "utils/builtins.h"
@@ -279,6 +280,12 @@ AlterObjectRename_internal(Relation rel, Oid objectId, const char *new_name)
 		if (strncmp(new_name, "regress_", 8) != 0)
 			elog(WARNING, "subscriptions created by regression test cases should have names starting with \"regress_\"");
 #endif
+
+		/*
+		 * Wake up the logical replication workers to handle this change
+		 * quickly.
+		 */
+		LogicalRepWorkersWakeupAtCommit(objectId);
 	}
 	else if (nameCacheId >= 0)
 	{
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index b9c5df796f..c0fd82cdf2 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -34,6 +34,7 @@
 #include "nodes/makefuncs.h"
 #include "pgstat.h"
 #include "replication/logicallauncher.h"
+#include "replication/logicalworker.h"
 #include "replication/origin.h"
 #include "replication/slot.h"
 #include "replication/walreceiver.h"
@@ -1362,6 +1363,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 
 	InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0);
 
+	/* Wake up the logical replication workers to handle this change quickly. */
+	LogicalRepWorkersWakeupAtCommit(subid);
+
 	return myself;
 }
 
@@ -1733,6 +1737,9 @@ AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
 							  form->oid, 0);
 
 	ApplyLauncherWakeupAtCommit();
+
+	/* Wake up the logical replication workers to handle this change quickly. */
+	LogicalRepWorkersWakeupAtCommit(form->oid);
 }
 
 /*
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 2fdfeb5b4c..8c8f27ebcf 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -415,6 +415,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 	static HTAB *last_start_times = NULL;
 	ListCell   *lc;
 	bool		started_tx = false;
+	bool		should_exit = false;
 
 	Assert(!IsTransactionState());
 
@@ -446,28 +447,6 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 		last_start_times = NULL;
 	}
 
-	/*
-	 * Even when the two_phase mode is requested by the user, it remains as
-	 * 'pending' until all tablesyncs have reached READY state.
-	 *
-	 * When this happens, we restart the apply worker and (if the conditions
-	 * are still ok) then the two_phase tri-state will become 'enabled' at
-	 * that time.
-	 *
-	 * Note: If the subscription has no tables then leave the state as
-	 * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
-	 * work.
-	 */
-	if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING &&
-		AllTablesyncsReady())
-	{
-		ereport(LOG,
-				(errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled",
-						MySubscription->name)));
-
-		proc_exit(0);
-	}
-
 	/*
 	 * Process all tables that are being synchronized.
 	 */
@@ -619,9 +598,35 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 
 	if (started_tx)
 	{
+		/*
+		 * Even when the two_phase mode is requested by the user, it remains as
+		 * 'pending' until all tablesyncs have reached READY state.
+		 *
+		 * When this happens, we restart the apply worker and (if the conditions
+		 * are still ok) then the two_phase tri-state will become 'enabled' at
+		 * that time.
+		 *
+		 * Note: If the subscription has no tables then leave the state as
+		 * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
+		 * work.
+		 */
+		CommandCounterIncrement();	/* make updates visible */
+		if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING &&
+			AllTablesyncsReady())
+		{
+			ereport(LOG,
+					(errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled",
+							MySubscription->name)));
+
+			should_exit = true;
+		}
+
 		CommitTransactionCommand();
 		pgstat_report_stat(true);
 	}
+
+	if (should_exit)
+		proc_exit(0);
 }
 
 /*
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index f8e8cf71eb..cf38ed821e 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -254,6 +254,8 @@ WalReceiverConn *LogRepWorkerWalRcvConn = NULL;
 Subscription *MySubscription = NULL;
 static bool MySubscriptionValid = false;
 
+static List *on_commit_wakeup_workers_subids = NIL;
+
 bool		in_remote_transaction = false;
 static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
 
@@ -4092,3 +4094,47 @@ reset_apply_error_context_info(void)
 	apply_error_callback_arg.remote_attnum = -1;
 	set_apply_error_context_xact(InvalidTransactionId, InvalidXLogRecPtr);
 }
+
+/*
+ * Wakeup the stored subscriptions' workers on commit if requested.
+ */
+void
+AtEOXact_LogicalRepWorkers(bool isCommit)
+{
+	if (isCommit && on_commit_wakeup_workers_subids != NIL)
+	{
+		ListCell   *subid;
+
+		LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+		foreach(subid, on_commit_wakeup_workers_subids)
+		{
+			List	   *workers;
+			ListCell   *worker;
+
+			workers = logicalrep_workers_find(lfirst_oid(subid), true);
+			foreach(worker, workers)
+				logicalrep_worker_wakeup_ptr((LogicalRepWorker *) lfirst(worker));
+		}
+		LWLockRelease(LogicalRepWorkerLock);
+	}
+
+	on_commit_wakeup_workers_subids = NIL;
+}
+
+/*
+ * Request wakeup of the workers for the given subscription ID on commit of the
+ * transaction.
+ *
+ * This is used to ensure that the workers process assorted changes as soon as
+ * possible.
+ */
+void
+LogicalRepWorkersWakeupAtCommit(Oid subid)
+{
+	MemoryContext oldcxt;
+
+	oldcxt = MemoryContextSwitchTo(TopTransactionContext);
+	on_commit_wakeup_workers_subids = list_append_unique_oid(on_commit_wakeup_workers_subids,
+															 subid);
+	MemoryContextSwitchTo(oldcxt);
+}
diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h
index f1e7e8a348..3b2084f81f 100644
--- a/src/include/replication/logicalworker.h
+++ b/src/include/replication/logicalworker.h
@@ -16,4 +16,7 @@ extern void ApplyWorkerMain(Datum main_arg);
 
 extern bool IsLogicalWorker(void);
 
+extern void LogicalRepWorkersWakeupAtCommit(Oid subid);
+extern void AtEOXact_LogicalRepWorkers(bool isCommit);
+
 #endif							/* LOGICALWORKER_H */
-- 
2.25.1

>From 8aae426e8a10df698e299ed4ff7a13e5b8444efc Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nathandboss...@gmail.com>
Date: Thu, 5 Jan 2023 14:22:53 -0800
Subject: [PATCH v13 2/2] bypass wal_retrieve_retry_interval for logical
 workers as appropriate

---
 doc/src/sgml/monitoring.sgml                |  10 +
 src/backend/commands/subscriptioncmds.c     |  20 ++
 src/backend/replication/logical/launcher.c  | 201 +++++++++++++++-----
 src/backend/replication/logical/tablesync.c |   6 +
 src/backend/replication/logical/worker.c    |   8 +
 src/backend/storage/lmgr/lwlock.c           |   4 +
 src/include/replication/logicallauncher.h   |   2 +
 src/include/storage/lwlock.h                |   2 +
 8 files changed, 208 insertions(+), 45 deletions(-)

diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 5bcba0fdec..8f06e234c8 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -1991,6 +1991,16 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
       <entry>Waiting to read or update information
        about <quote>heavyweight</quote> locks.</entry>
      </row>
+     <row>
+      <entry><literal>LogicalRepLauncherDSA</literal></entry>
+      <entry>Waiting for logical replication launcher dynamic shared memory
+      allocator access</entry>
+     </row>
+     <row>
+      <entry><literal>LogicalRepLauncherHash</literal></entry>
+      <entry>Waiting for logical replication launcher shared memory hash table
+      access</entry>
+     </row>
      <row>
       <entry><literal>LogicalRepWorker</literal></entry>
       <entry>Waiting to read or update the state of logical replication
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index c0fd82cdf2..bab576e966 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -1139,6 +1139,18 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 
 				if (opts.enabled)
 					ApplyLauncherWakeupAtCommit();
+				else
+				{
+					/*
+					 * Clear the last-start time for the apply worker so the
+					 * worker is restarted right away when the subscription is
+					 * enabled again.  If this transaction rolls back, the
+					 * launcher might restart the apply worker before
+					 * wal_retrieve_retry_interval milliseconds have elapsed,
+					 * but that's probably okay.
+					 */
+					logicalrep_launcher_delete_last_start_time(sub->oid);
+				}
 
 				update_tuple = true;
 				break;
@@ -1504,6 +1516,14 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	}
 	list_free(subworkers);
 
+	/*
+	 * Clear the last-start time for the apply worker to free up space.  If
+	 * this transaction rolls back, the launcher might restart the apply worker
+	 * before wal_retrieve_retry_interval milliseconds have elapsed, but that's
+	 * probably okay.
+	 */
+	logicalrep_launcher_delete_last_start_time(subid);
+
 	/*
 	 * Cleanup of tablesync replication origins.
 	 *
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index a69e371c05..dfe49db64f 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -25,6 +25,7 @@
 #include "catalog/pg_subscription.h"
 #include "catalog/pg_subscription_rel.h"
 #include "funcapi.h"
+#include "lib/dshash.h"
 #include "libpq/pqsignal.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -56,6 +57,25 @@
 int			max_logical_replication_workers = 4;
 int			max_sync_workers_per_subscription = 2;
 
+/* an entry in the last-start times hash table */
+typedef struct LauncherLastStartTimesEntry
+{
+	Oid		subid;
+	TimestampTz last_start_time;
+} LauncherLastStartTimesEntry;
+
+/* parameters for the last-start times hash table */
+static const dshash_parameters dsh_params = {
+	sizeof(Oid),
+	sizeof(LauncherLastStartTimesEntry),
+	dshash_memcmp,
+	dshash_memhash,
+	LWTRANCHE_LAUNCHER_HASH
+};
+
+static dsa_area *last_start_times_dsa = NULL;
+static dshash_table *last_start_times = NULL;
+
 LogicalRepWorker *MyLogicalRepWorker = NULL;
 
 typedef struct LogicalRepCtxStruct
@@ -63,6 +83,10 @@ typedef struct LogicalRepCtxStruct
 	/* Supervisor process. */
 	pid_t		launcher_pid;
 
+	/* hash table for last-start times */
+	dsa_handle	last_start_dsa;
+	dshash_table_handle last_start_dsh;
+
 	/* Background workers. */
 	LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER];
 } LogicalRepCtxStruct;
@@ -74,6 +98,9 @@ static void logicalrep_launcher_onexit(int code, Datum arg);
 static void logicalrep_worker_onexit(int code, Datum arg);
 static void logicalrep_worker_detach(void);
 static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
+static void logicalrep_launcher_attach_dshmem(void);
+static void logicalrep_launcher_set_last_start_time(Oid subid, TimestampTz start_time);
+static TimestampTz logicalrep_launcher_get_last_start_time(Oid subid);
 
 static bool on_commit_launcher_wakeup = false;
 
@@ -756,6 +783,9 @@ ApplyLauncherShmemInit(void)
 			memset(worker, 0, sizeof(LogicalRepWorker));
 			SpinLockInit(&worker->relmutex);
 		}
+
+		LogicalRepCtx->last_start_dsa = DSM_HANDLE_INVALID;
+		LogicalRepCtx->last_start_dsh = DSM_HANDLE_INVALID;
 	}
 }
 
@@ -801,8 +831,6 @@ ApplyLauncherWakeup(void)
 void
 ApplyLauncherMain(Datum main_arg)
 {
-	TimestampTz last_start_time = 0;
-
 	ereport(DEBUG1,
 			(errmsg_internal("logical replication launcher started")));
 
@@ -837,58 +865,55 @@ ApplyLauncherMain(Datum main_arg)
 
 		now = GetCurrentTimestamp();
 
-		/* Limit the start retry to once a wal_retrieve_retry_interval */
-		if (TimestampDifferenceExceeds(last_start_time, now,
-									   wal_retrieve_retry_interval))
-		{
-			/* Use temporary context for the database list and worker info. */
-			subctx = AllocSetContextCreate(TopMemoryContext,
-										   "Logical Replication Launcher sublist",
-										   ALLOCSET_DEFAULT_SIZES);
-			oldctx = MemoryContextSwitchTo(subctx);
-
-			/* search for subscriptions to start or stop. */
-			sublist = get_subscription_list();
-
-			/* Start the missing workers for enabled subscriptions. */
-			foreach(lc, sublist)
-			{
-				Subscription *sub = (Subscription *) lfirst(lc);
-				LogicalRepWorker *w;
+		/* Use temporary context for the database list and worker info. */
+		subctx = AllocSetContextCreate(TopMemoryContext,
+									   "Logical Replication Launcher sublist",
+									   ALLOCSET_DEFAULT_SIZES);
+		oldctx = MemoryContextSwitchTo(subctx);
 
-				if (!sub->enabled)
-					continue;
+		sublist = get_subscription_list();
+		foreach(lc, sublist)
+		{
+			Subscription *sub = (Subscription *) lfirst(lc);
+			LogicalRepWorker *w;
+			TimestampTz last_start;
 
-				LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
-				w = logicalrep_worker_find(sub->oid, InvalidOid, false);
-				LWLockRelease(LogicalRepWorkerLock);
+			if (!sub->enabled)
+				continue;
 
-				if (w == NULL)
-				{
-					last_start_time = now;
-					wait_time = wal_retrieve_retry_interval;
+			LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
+			w = logicalrep_worker_find(sub->oid, InvalidOid, false);
+			LWLockRelease(LogicalRepWorkerLock);
 
-					logicalrep_worker_launch(sub->dbid, sub->oid, sub->name,
-											 sub->owner, InvalidOid);
-				}
-			}
+			if (w != NULL)
+				continue;
 
-			/* Switch back to original memory context. */
-			MemoryContextSwitchTo(oldctx);
-			/* Clean the temporary memory. */
-			MemoryContextDelete(subctx);
-		}
-		else
-		{
 			/*
-			 * The wait in previous cycle was interrupted in less than
-			 * wal_retrieve_retry_interval since last worker was started, this
-			 * usually means crash of the worker, so we should retry in
-			 * wal_retrieve_retry_interval again.
+			 * If the worker is eligible to start now, launch it.  Otherwise,
+			 * adjust wait_time so that we wake up when it can be started.
 			 */
-			wait_time = wal_retrieve_retry_interval;
+			last_start = logicalrep_launcher_get_last_start_time(sub->oid);
+			if (TimestampDifferenceExceeds(last_start, now,
+										   wal_retrieve_retry_interval))
+			{
+				logicalrep_launcher_set_last_start_time(sub->oid, now);
+				logicalrep_worker_launch(sub->dbid, sub->oid, sub->name,
+										 sub->owner, InvalidOid);
+			}
+			else
+			{
+				long		elapsed;
+
+				elapsed = TimestampDifferenceMilliseconds(last_start, now);
+				wait_time = Min(wait_time, wal_retrieve_retry_interval - elapsed);
+			}
 		}
 
+		/* Switch back to original memory context. */
+		MemoryContextSwitchTo(oldctx);
+		/* Clean the temporary memory. */
+		MemoryContextDelete(subctx);
+
 		/* Wait for more work. */
 		rc = WaitLatch(MyLatch,
 					   WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
@@ -996,3 +1021,89 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
 
 	return (Datum) 0;
 }
+
+/*
+ * Initialize or attach to the dynamic shared hash table that stores the
+ * last-start times, if not already done.  This must be called before using the
+ * table.
+ */
+static void
+logicalrep_launcher_attach_dshmem(void)
+{
+	MemoryContext oldcontext;
+
+	oldcontext = MemoryContextSwitchTo(TopMemoryContext);
+	LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
+
+	if (LogicalRepCtx->last_start_dsh == DSM_HANDLE_INVALID)
+	{
+		/* Initialize dynamic shared hash table for last-start times. */
+		last_start_times_dsa = dsa_create(LWTRANCHE_LAUNCHER_DSA);
+		dsa_pin(last_start_times_dsa);
+		dsa_pin_mapping(last_start_times_dsa);
+		last_start_times = dshash_create(last_start_times_dsa, &dsh_params, 0);
+
+		/* Store handles in shared memory for other backends to use. */
+		LogicalRepCtx->last_start_dsa = dsa_get_handle(last_start_times_dsa);
+		LogicalRepCtx->last_start_dsh = dshash_get_hash_table_handle(last_start_times);
+	}
+	else if (!last_start_times)
+	{
+		/* Attach to existing dynamic shared hash table. */
+		last_start_times_dsa = dsa_attach(LogicalRepCtx->last_start_dsa);
+		dsa_pin_mapping(last_start_times_dsa);
+		last_start_times = dshash_attach(last_start_times_dsa, &dsh_params,
+										 LogicalRepCtx->last_start_dsh, 0);
+	}
+
+	LWLockRelease(LogicalRepWorkerLock);
+	MemoryContextSwitchTo(oldcontext);
+}
+
+/*
+ * Set the last-start time for the subscription.
+ */
+static void
+logicalrep_launcher_set_last_start_time(Oid subid, TimestampTz start_time)
+{
+	LauncherLastStartTimesEntry *entry;
+	bool		found;
+
+	logicalrep_launcher_attach_dshmem();
+
+	entry = dshash_find_or_insert(last_start_times, &subid, &found);
+	entry->last_start_time = start_time;
+	dshash_release_lock(last_start_times, entry);
+}
+
+/*
+ * Return the last-start time for the subscription, or 0 if there isn't one.
+ */
+static TimestampTz
+logicalrep_launcher_get_last_start_time(Oid subid)
+{
+	LauncherLastStartTimesEntry *entry;
+	TimestampTz ret;
+
+	logicalrep_launcher_attach_dshmem();
+
+	entry = dshash_find(last_start_times, &subid, false);
+	if (entry == NULL)
+		return 0;
+
+	ret = entry->last_start_time;
+	dshash_release_lock(last_start_times, entry);
+
+	return ret;
+}
+
+/*
+ * Remove the last-start time for the subscription, if one exists.
+ */
+void
+logicalrep_launcher_delete_last_start_time(Oid subid)
+{
+	logicalrep_launcher_attach_dshmem();
+
+	(void) dshash_delete_key(last_start_times, &subid);
+}
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 8c8f27ebcf..adc1043211 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -618,6 +618,12 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 					(errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled",
 							MySubscription->name)));
 
+			/*
+			 * Clear the last-start time for the worker so that the launcher
+			 * restarts it immediately, bypassing wal_retrieve_retry_interval.
+			 */
+			logicalrep_launcher_delete_last_start_time(MySubscription->oid);
+
 			should_exit = true;
 		}
 
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index cf38ed821e..b319990bcc 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -164,6 +164,7 @@
 #include "postmaster/walwriter.h"
 #include "replication/decode.h"
 #include "replication/logical.h"
+#include "replication/logicallauncher.h"
 #include "replication/logicalproto.h"
 #include "replication/logicalrelation.h"
 #include "replication/logicalworker.h"
@@ -3126,6 +3127,13 @@ maybe_reread_subscription(void)
 				(errmsg("logical replication apply worker for subscription \"%s\" will restart because of a parameter change",
 						MySubscription->name)));
 
+		/*
+		 * Clear the last-start time for the apply worker so that the launcher
+		 * will restart it immediately, bypassing wal_retreive_retry_interval.
+		 */
+		if (!am_tablesync_worker())
+			logicalrep_launcher_delete_last_start_time(MySubscription->oid);
+
 		proc_exit(0);
 	}
 
diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c
index 196bece0a3..d2ec396045 100644
--- a/src/backend/storage/lmgr/lwlock.c
+++ b/src/backend/storage/lmgr/lwlock.c
@@ -186,6 +186,10 @@ static const char *const BuiltinTrancheNames[] = {
 	"PgStatsHash",
 	/* LWTRANCHE_PGSTATS_DATA: */
 	"PgStatsData",
+	/* LWTRANCHE_LAUNCHER_DSA: */
+	"LogicalRepLauncherDSA",
+	/* LWTRANCHE_LAUNCHER_HASH: */
+	"LogicalRepLauncherHash",
 };
 
 StaticAssertDecl(lengthof(BuiltinTrancheNames) ==
diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h
index e1661b6c91..00f6f89d72 100644
--- a/src/include/replication/logicallauncher.h
+++ b/src/include/replication/logicallauncher.h
@@ -26,4 +26,6 @@ extern void AtEOXact_ApplyLauncher(bool isCommit);
 
 extern bool IsLogicalLauncher(void);
 
+extern void logicalrep_launcher_delete_last_start_time(Oid subid);
+
 #endif							/* LOGICALLAUNCHER_H */
diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h
index e4162db613..d2c7afb8f4 100644
--- a/src/include/storage/lwlock.h
+++ b/src/include/storage/lwlock.h
@@ -204,6 +204,8 @@ typedef enum BuiltinTrancheIds
 	LWTRANCHE_PGSTATS_DSA,
 	LWTRANCHE_PGSTATS_HASH,
 	LWTRANCHE_PGSTATS_DATA,
+	LWTRANCHE_LAUNCHER_DSA,
+	LWTRANCHE_LAUNCHER_HASH,
 	LWTRANCHE_FIRST_USER_DEFINED
 }			BuiltinTrancheIds;
 
-- 
2.25.1

Reply via email to