On Tue, Jan 10, 2023 at 10:59:14AM +0530, Amit Kapila wrote:
> I haven't looked in detail but isn't it better to explain somewhere in
> the comments that it achieves to rate limit the restart of workers in
> case of error and allows them to restart immediately in case of
> subscription parameter change?

I expanded one of the existing comments to make this clear.

> Another minor point: Don't we need to set the launcher's latch after
> removing the entry from the hash table to avoid the launcher waiting
> on the latch for a bit longer?

The launcher's latch should be set when the apply worker exits.  The apply
worker's notify_pid is set to the launcher, which means the launcher
will be sent SIGUSR1 on exit.  The launcher's SIGUSR1 handler sets its
latch.

Of course, if the launcher restarts, then the notify_pid will no longer be
accurate.  However, I see that workers also register a before_shmem_exit
callback that will send SIGUSR1 to the launcher_pid currently stored in
shared memory.  (I wonder if there is a memory ordering bug here.)

-- 
Nathan Bossart
Amazon Web Services: https://aws.amazon.com
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 358d2ff90f..3cf8dfa11e 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -2006,6 +2006,16 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
       <entry>Waiting to read or update information
        about <quote>heavyweight</quote> locks.</entry>
      </row>
+     <row>
+      <entry><literal>LogicalRepLauncherDSA</literal></entry>
+      <entry>Waiting for logical replication launcher dynamic shared memory
+      allocator access</entry>
+     </row>
+     <row>
+      <entry><literal>LogicalRepLauncherHash</literal></entry>
+      <entry>Waiting for logical replication launcher shared memory hash table
+      access</entry>
+     </row>
      <row>
       <entry><literal>LogicalRepWorker</literal></entry>
       <entry>Waiting to read or update the state of logical replication
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index baff00dd74..468e5e0bf1 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -1504,6 +1504,14 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	}
 	list_free(subworkers);
 
+	/*
+	 * Clear the last-start time for the apply worker to free up space.  If
+	 * this transaction rolls back, the launcher might restart the apply worker
+	 * before wal_retrieve_retry_interval milliseconds have elapsed, but that's
+	 * probably okay.
+	 */
+	logicalrep_launcher_delete_last_start_time(subid);
+
 	/*
 	 * Cleanup of tablesync replication origins.
 	 *
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index afb7acddaa..ecf239e4c2 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -25,6 +25,7 @@
 #include "catalog/pg_subscription.h"
 #include "catalog/pg_subscription_rel.h"
 #include "funcapi.h"
+#include "lib/dshash.h"
 #include "libpq/pqsignal.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -57,6 +58,25 @@ int			max_logical_replication_workers = 4;
 int			max_sync_workers_per_subscription = 2;
 int			max_parallel_apply_workers_per_subscription = 2;
 
+/* an entry in the last-start times hash table */
+typedef struct LauncherLastStartTimesEntry
+{
+	Oid		subid;
+	TimestampTz last_start_time;
+} LauncherLastStartTimesEntry;
+
+/* parameters for the last-start times hash table */
+static const dshash_parameters dsh_params = {
+	sizeof(Oid),
+	sizeof(LauncherLastStartTimesEntry),
+	dshash_memcmp,
+	dshash_memhash,
+	LWTRANCHE_LAUNCHER_HASH
+};
+
+static dsa_area *last_start_times_dsa = NULL;
+static dshash_table *last_start_times = NULL;
+
 LogicalRepWorker *MyLogicalRepWorker = NULL;
 
 typedef struct LogicalRepCtxStruct
@@ -64,6 +84,10 @@ typedef struct LogicalRepCtxStruct
 	/* Supervisor process. */
 	pid_t		launcher_pid;
 
+	/* hash table for last-start times */
+	dsa_handle	last_start_dsa;
+	dshash_table_handle last_start_dsh;
+
 	/* Background workers. */
 	LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER];
 } LogicalRepCtxStruct;
@@ -76,6 +100,9 @@ static void logicalrep_worker_onexit(int code, Datum arg);
 static void logicalrep_worker_detach(void);
 static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
 static int	logicalrep_pa_worker_count(Oid subid);
+static void logicalrep_launcher_attach_dshmem(void);
+static void logicalrep_launcher_set_last_start_time(Oid subid, TimestampTz start_time);
+static TimestampTz logicalrep_launcher_get_last_start_time(Oid subid);
 
 static bool on_commit_launcher_wakeup = false;
 
@@ -902,6 +929,9 @@ ApplyLauncherShmemInit(void)
 			memset(worker, 0, sizeof(LogicalRepWorker));
 			SpinLockInit(&worker->relmutex);
 		}
+
+		LogicalRepCtx->last_start_dsa = DSM_HANDLE_INVALID;
+		LogicalRepCtx->last_start_dsh = DSM_HANDLE_INVALID;
 	}
 }
 
@@ -947,8 +977,6 @@ ApplyLauncherWakeup(void)
 void
 ApplyLauncherMain(Datum main_arg)
 {
-	TimestampTz last_start_time = 0;
-
 	ereport(DEBUG1,
 			(errmsg_internal("logical replication launcher started")));
 
@@ -983,58 +1011,63 @@ ApplyLauncherMain(Datum main_arg)
 
 		now = GetCurrentTimestamp();
 
-		/* Limit the start retry to once a wal_retrieve_retry_interval */
-		if (TimestampDifferenceExceeds(last_start_time, now,
-									   wal_retrieve_retry_interval))
-		{
-			/* Use temporary context for the database list and worker info. */
-			subctx = AllocSetContextCreate(TopMemoryContext,
-										   "Logical Replication Launcher sublist",
-										   ALLOCSET_DEFAULT_SIZES);
-			oldctx = MemoryContextSwitchTo(subctx);
-
-			/* search for subscriptions to start or stop. */
-			sublist = get_subscription_list();
-
-			/* Start the missing workers for enabled subscriptions. */
-			foreach(lc, sublist)
-			{
-				Subscription *sub = (Subscription *) lfirst(lc);
-				LogicalRepWorker *w;
+		/* Use temporary context for the database list and worker info. */
+		subctx = AllocSetContextCreate(TopMemoryContext,
+									   "Logical Replication Launcher sublist",
+									   ALLOCSET_DEFAULT_SIZES);
+		oldctx = MemoryContextSwitchTo(subctx);
 
-				if (!sub->enabled)
-					continue;
+		sublist = get_subscription_list();
+		foreach(lc, sublist)
+		{
+			Subscription *sub = (Subscription *) lfirst(lc);
+			LogicalRepWorker *w;
+			TimestampTz last_start;
 
-				LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
-				w = logicalrep_worker_find(sub->oid, InvalidOid, false);
-				LWLockRelease(LogicalRepWorkerLock);
+			if (!sub->enabled)
+				continue;
 
-				if (w == NULL)
-				{
-					last_start_time = now;
-					wait_time = wal_retrieve_retry_interval;
+			LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
+			w = logicalrep_worker_find(sub->oid, InvalidOid, false);
+			LWLockRelease(LogicalRepWorkerLock);
 
-					logicalrep_worker_launch(sub->dbid, sub->oid, sub->name,
-											 sub->owner, InvalidOid, DSM_HANDLE_INVALID);
-				}
-			}
+			if (w != NULL)
+				continue;
 
-			/* Switch back to original memory context. */
-			MemoryContextSwitchTo(oldctx);
-			/* Clean the temporary memory. */
-			MemoryContextDelete(subctx);
-		}
-		else
-		{
 			/*
-			 * The wait in previous cycle was interrupted in less than
-			 * wal_retrieve_retry_interval since last worker was started, this
-			 * usually means crash of the worker, so we should retry in
-			 * wal_retrieve_retry_interval again.
+			 * If the worker is eligible to start now, launch it.  Otherwise,
+			 * adjust wait_time so that we wake up when it can be started.
+			 * Each subscription's apply worker can only be restarted once per
+			 * wal_retrieve_retry_interval so that errors do not cause us to
+			 * repeatedly restart the worker as fast as posible.  In cases
+			 * where a restart is expected (e.g., subscription parameter
+			 * changes), another process should remove the last-start entry for
+			 * the subscription so that wal_retrieve_retry_interval is bypassed
+			 * and the worker is restarted immediately.
 			 */
-			wait_time = wal_retrieve_retry_interval;
+			last_start = logicalrep_launcher_get_last_start_time(sub->oid);
+			if (TimestampDifferenceExceeds(last_start, now,
+										   wal_retrieve_retry_interval))
+			{
+				logicalrep_launcher_set_last_start_time(sub->oid, now);
+				logicalrep_worker_launch(sub->dbid, sub->oid, sub->name,
+										 sub->owner, InvalidOid,
+										 DSM_HANDLE_INVALID);
+			}
+			else
+			{
+				long		elapsed;
+
+				elapsed = TimestampDifferenceMilliseconds(last_start, now);
+				wait_time = Min(wait_time, wal_retrieve_retry_interval - elapsed);
+			}
 		}
 
+		/* Switch back to original memory context. */
+		MemoryContextSwitchTo(oldctx);
+		/* Clean the temporary memory. */
+		MemoryContextDelete(subctx);
+
 		/* Wait for more work. */
 		rc = WaitLatch(MyLatch,
 					   WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
@@ -1146,3 +1179,89 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
 
 	return (Datum) 0;
 }
+
+/*
+ * Initialize or attach to the dynamic shared hash table that stores the
+ * last-start times, if not already done.  This must be called before using the
+ * table.
+ */
+static void
+logicalrep_launcher_attach_dshmem(void)
+{
+	MemoryContext oldcontext;
+
+	oldcontext = MemoryContextSwitchTo(TopMemoryContext);
+	LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
+
+	if (LogicalRepCtx->last_start_dsh == DSM_HANDLE_INVALID)
+	{
+		/* Initialize dynamic shared hash table for last-start times. */
+		last_start_times_dsa = dsa_create(LWTRANCHE_LAUNCHER_DSA);
+		dsa_pin(last_start_times_dsa);
+		dsa_pin_mapping(last_start_times_dsa);
+		last_start_times = dshash_create(last_start_times_dsa, &dsh_params, 0);
+
+		/* Store handles in shared memory for other backends to use. */
+		LogicalRepCtx->last_start_dsa = dsa_get_handle(last_start_times_dsa);
+		LogicalRepCtx->last_start_dsh = dshash_get_hash_table_handle(last_start_times);
+	}
+	else if (!last_start_times)
+	{
+		/* Attach to existing dynamic shared hash table. */
+		last_start_times_dsa = dsa_attach(LogicalRepCtx->last_start_dsa);
+		dsa_pin_mapping(last_start_times_dsa);
+		last_start_times = dshash_attach(last_start_times_dsa, &dsh_params,
+										 LogicalRepCtx->last_start_dsh, 0);
+	}
+
+	LWLockRelease(LogicalRepWorkerLock);
+	MemoryContextSwitchTo(oldcontext);
+}
+
+/*
+ * Set the last-start time for the subscription.
+ */
+static void
+logicalrep_launcher_set_last_start_time(Oid subid, TimestampTz start_time)
+{
+	LauncherLastStartTimesEntry *entry;
+	bool		found;
+
+	logicalrep_launcher_attach_dshmem();
+
+	entry = dshash_find_or_insert(last_start_times, &subid, &found);
+	entry->last_start_time = start_time;
+	dshash_release_lock(last_start_times, entry);
+}
+
+/*
+ * Return the last-start time for the subscription, or 0 if there isn't one.
+ */
+static TimestampTz
+logicalrep_launcher_get_last_start_time(Oid subid)
+{
+	LauncherLastStartTimesEntry *entry;
+	TimestampTz ret;
+
+	logicalrep_launcher_attach_dshmem();
+
+	entry = dshash_find(last_start_times, &subid, false);
+	if (entry == NULL)
+		return 0;
+
+	ret = entry->last_start_time;
+	dshash_release_lock(last_start_times, entry);
+
+	return ret;
+}
+
+/*
+ * Remove the last-start time for the subscription, if one exists.
+ */
+void
+logicalrep_launcher_delete_last_start_time(Oid subid)
+{
+	logicalrep_launcher_attach_dshmem();
+
+	(void) dshash_delete_key(last_start_times, &subid);
+}
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 38dfce7129..aa5ee089f7 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -628,7 +628,15 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 	}
 
 	if (should_exit)
+	{
+		/*
+		 * Clear the last-start time for this worker so that the launcher will
+		 * restart it immediately.
+		 */
+		logicalrep_launcher_delete_last_start_time(MySubscription->oid);
+
 		proc_exit(0);
+	}
 }
 
 /*
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 79cda39445..0c41d5fdd5 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -174,6 +174,7 @@
 #include "postmaster/walwriter.h"
 #include "replication/decode.h"
 #include "replication/logical.h"
+#include "replication/logicallauncher.h"
 #include "replication/logicalproto.h"
 #include "replication/logicalrelation.h"
 #include "replication/logicalworker.h"
@@ -3809,6 +3810,13 @@ apply_worker_exit(void)
 		return;
 	}
 
+	/*
+	 * Clear the last-start time for the apply worker so that the launcher
+	 * will restart it immediately, bypassing wal_retrieve_retry_interval.
+	 */
+	if (!am_tablesync_worker())
+		logicalrep_launcher_delete_last_start_time(MyLogicalRepWorker->subid);
+
 	proc_exit(0);
 }
 
@@ -3849,6 +3857,8 @@ maybe_reread_subscription(void)
 				(errmsg("%s for subscription \"%s\" will stop because the subscription was removed",
 						get_worker_name(), MySubscription->name)));
 
+		if (!am_tablesync_worker() && !am_parallel_apply_worker())
+			logicalrep_launcher_delete_last_start_time(MyLogicalRepWorker->subid);
 		proc_exit(0);
 	}
 
@@ -4429,6 +4439,8 @@ InitializeApplyWorker(void)
 				(errmsg("%s for subscription %u will not start because the subscription was removed during startup",
 						get_worker_name(), MyLogicalRepWorker->subid)));
 
+		if (!am_tablesync_worker() && !am_parallel_apply_worker())
+			logicalrep_launcher_delete_last_start_time(MyLogicalRepWorker->subid);
 		proc_exit(0);
 	}
 
@@ -4691,6 +4703,8 @@ DisableSubscriptionAndExit(void)
 			errmsg("subscription \"%s\" has been disabled because of an error",
 				   MySubscription->name));
 
+	if (!am_tablesync_worker() && !am_parallel_apply_worker())
+		logicalrep_launcher_delete_last_start_time(MyLogicalRepWorker->subid);
 	proc_exit(0);
 }
 
diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c
index 196bece0a3..d2ec396045 100644
--- a/src/backend/storage/lmgr/lwlock.c
+++ b/src/backend/storage/lmgr/lwlock.c
@@ -186,6 +186,10 @@ static const char *const BuiltinTrancheNames[] = {
 	"PgStatsHash",
 	/* LWTRANCHE_PGSTATS_DATA: */
 	"PgStatsData",
+	/* LWTRANCHE_LAUNCHER_DSA: */
+	"LogicalRepLauncherDSA",
+	/* LWTRANCHE_LAUNCHER_HASH: */
+	"LogicalRepLauncherHash",
 };
 
 StaticAssertDecl(lengthof(BuiltinTrancheNames) ==
diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h
index c85593ad11..923f96256a 100644
--- a/src/include/replication/logicallauncher.h
+++ b/src/include/replication/logicallauncher.h
@@ -27,4 +27,6 @@ extern void AtEOXact_ApplyLauncher(bool isCommit);
 
 extern bool IsLogicalLauncher(void);
 
+extern void logicalrep_launcher_delete_last_start_time(Oid subid);
+
 #endif							/* LOGICALLAUNCHER_H */
diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h
index e4162db613..d2c7afb8f4 100644
--- a/src/include/storage/lwlock.h
+++ b/src/include/storage/lwlock.h
@@ -204,6 +204,8 @@ typedef enum BuiltinTrancheIds
 	LWTRANCHE_PGSTATS_DSA,
 	LWTRANCHE_PGSTATS_HASH,
 	LWTRANCHE_PGSTATS_DATA,
+	LWTRANCHE_LAUNCHER_DSA,
+	LWTRANCHE_LAUNCHER_HASH,
 	LWTRANCHE_FIRST_USER_DEFINED
 }			BuiltinTrancheIds;
 

Reply via email to