On Wed, Jan 04, 2023 at 10:12:19AM -0800, Nathan Bossart wrote:
> From the discussion thus far, it sounds like the alternatives are to 1) add
> a global flag that causes wal_retrieve_retry_interval to be bypassed for
> all workers or to 2) add a hash map in the launcher and a
> restart_immediately flag in each worker slot.  I'll go ahead and create a
> patch for 2 since it seems like the most complete solution, and we can
> evaluate whether the complexity seems appropriate.

Here is a first attempt at adding a hash table to the launcher and a
restart_immediately flag in each worker slot.  This provides a similar
speedup to lowering wal_retrieve_retry_interval to 1ms.  I've noted a
couple of possible race conditions in comments, but none of them seemed
particularly egregious.  Ideally, we'd put the hash table in shared memory
so that other backends could adjust it directly, but IIUC that requires it
to be a fixed size, and the number of subscriptions is virtually unbounded.
There might still be problems with the patch, but I'm hoping it at least
helps further the discussion about which approach to take.

-- 
Nathan Bossart
Amazon Web Services: https://aws.amazon.com
>From 2c4c3c552ca83eeb8a4a7cca724ce455ba98ac0f Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nathandboss...@gmail.com>
Date: Mon, 21 Nov 2022 16:01:01 -0800
Subject: [PATCH v11 1/2] wake up logical workers as needed instead of relying
 on periodic wakeups

---
 src/backend/access/transam/xact.c           |  3 ++
 src/backend/commands/alter.c                |  7 ++++
 src/backend/commands/subscriptioncmds.c     |  4 ++
 src/backend/replication/logical/tablesync.c | 10 +++++
 src/backend/replication/logical/worker.c    | 46 +++++++++++++++++++++
 src/include/replication/logicalworker.h     |  3 ++
 6 files changed, 73 insertions(+)

diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 24221542e7..54145bf805 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -47,6 +47,7 @@
 #include "pgstat.h"
 #include "replication/logical.h"
 #include "replication/logicallauncher.h"
+#include "replication/logicalworker.h"
 #include "replication/origin.h"
 #include "replication/snapbuild.h"
 #include "replication/syncrep.h"
@@ -2360,6 +2361,7 @@ CommitTransaction(void)
 	AtEOXact_PgStat(true, is_parallel_worker);
 	AtEOXact_Snapshot(true, false);
 	AtEOXact_ApplyLauncher(true);
+	AtEOXact_LogicalRepWorkers(true);
 	pgstat_report_xact_timestamp(0);
 
 	CurrentResourceOwner = NULL;
@@ -2860,6 +2862,7 @@ AbortTransaction(void)
 		AtEOXact_HashTables(false);
 		AtEOXact_PgStat(false, is_parallel_worker);
 		AtEOXact_ApplyLauncher(false);
+		AtEOXact_LogicalRepWorkers(false);
 		pgstat_report_xact_timestamp(0);
 	}
 
diff --git a/src/backend/commands/alter.c b/src/backend/commands/alter.c
index 70d359eb6a..4e8102b59f 100644
--- a/src/backend/commands/alter.c
+++ b/src/backend/commands/alter.c
@@ -59,6 +59,7 @@
 #include "commands/user.h"
 #include "miscadmin.h"
 #include "parser/parse_func.h"
+#include "replication/logicalworker.h"
 #include "rewrite/rewriteDefine.h"
 #include "tcop/utility.h"
 #include "utils/builtins.h"
@@ -279,6 +280,12 @@ AlterObjectRename_internal(Relation rel, Oid objectId, const char *new_name)
 		if (strncmp(new_name, "regress_", 8) != 0)
 			elog(WARNING, "subscriptions created by regression test cases should have names starting with \"regress_\"");
 #endif
+
+		/*
+		 * Wake up the logical replication workers to handle this change
+		 * quickly.
+		 */
+		LogicalRepWorkersWakeupAtCommit(objectId);
 	}
 	else if (nameCacheId >= 0)
 	{
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index b9c5df796f..b9bbb2cf4e 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -34,6 +34,7 @@
 #include "nodes/makefuncs.h"
 #include "pgstat.h"
 #include "replication/logicallauncher.h"
+#include "replication/logicalworker.h"
 #include "replication/origin.h"
 #include "replication/slot.h"
 #include "replication/walreceiver.h"
@@ -1362,6 +1363,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 
 	InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0);
 
+	/* Wake up the logical replication workers to handle this change quickly. */
+	LogicalRepWorkersWakeupAtCommit(subid);
+
 	return myself;
 }
 
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 2fdfeb5b4c..fcadc1e98e 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -105,6 +105,7 @@
 #include "pgstat.h"
 #include "replication/logicallauncher.h"
 #include "replication/logicalrelation.h"
+#include "replication/logicalworker.h"
 #include "replication/walreceiver.h"
 #include "replication/worker_internal.h"
 #include "replication/slot.h"
@@ -619,6 +620,15 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 
 	if (started_tx)
 	{
+		/*
+		 * If we are ready to enable two_phase mode, wake up the logical
+		 * replication workers to handle this change quickly.
+		 */
+		CommandCounterIncrement();
+		if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING &&
+			AllTablesyncsReady())
+			LogicalRepWorkersWakeupAtCommit(MyLogicalRepWorker->subid);
+
 		CommitTransactionCommand();
 		pgstat_report_stat(true);
 	}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 446f84fa97..3e2ea32e1e 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -254,6 +254,8 @@ WalReceiverConn *LogRepWorkerWalRcvConn = NULL;
 Subscription *MySubscription = NULL;
 static bool MySubscriptionValid = false;
 
+static List *on_commit_wakeup_workers_subids = NIL;
+
 bool		in_remote_transaction = false;
 static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
 
@@ -4097,3 +4099,47 @@ reset_apply_error_context_info(void)
 	apply_error_callback_arg.remote_attnum = -1;
 	set_apply_error_context_xact(InvalidTransactionId, InvalidXLogRecPtr);
 }
+
+/*
+ * Wakeup the stored subscriptions' workers on commit if requested.
+ */
+void
+AtEOXact_LogicalRepWorkers(bool isCommit)
+{
+	if (isCommit && on_commit_wakeup_workers_subids != NIL)
+	{
+		ListCell   *subid;
+
+		LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+		foreach(subid, on_commit_wakeup_workers_subids)
+		{
+			List	   *workers;
+			ListCell   *worker;
+
+			workers = logicalrep_workers_find(lfirst_oid(subid), true);
+			foreach(worker, workers)
+				logicalrep_worker_wakeup_ptr((LogicalRepWorker *) lfirst(worker));
+		}
+		LWLockRelease(LogicalRepWorkerLock);
+	}
+
+	on_commit_wakeup_workers_subids = NIL;
+}
+
+/*
+ * Request wakeup of the workers for the given subscription ID on commit of the
+ * transaction.
+ *
+ * This is used to ensure that the workers process assorted changes as soon as
+ * possible.
+ */
+void
+LogicalRepWorkersWakeupAtCommit(Oid subid)
+{
+	MemoryContext oldcxt;
+
+	oldcxt = MemoryContextSwitchTo(TopTransactionContext);
+	on_commit_wakeup_workers_subids = list_append_unique_oid(on_commit_wakeup_workers_subids,
+															 subid);
+	MemoryContextSwitchTo(oldcxt);
+}
diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h
index f1e7e8a348..3b2084f81f 100644
--- a/src/include/replication/logicalworker.h
+++ b/src/include/replication/logicalworker.h
@@ -16,4 +16,7 @@ extern void ApplyWorkerMain(Datum main_arg);
 
 extern bool IsLogicalWorker(void);
 
+extern void LogicalRepWorkersWakeupAtCommit(Oid subid);
+extern void AtEOXact_LogicalRepWorkers(bool isCommit);
+
 #endif							/* LOGICALWORKER_H */
-- 
2.25.1

>From 126e41be1c0af2260fa84f6a3bf6b738442df58f Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nathandboss...@gmail.com>
Date: Wed, 4 Jan 2023 11:38:20 -0800
Subject: [PATCH v11 2/2] bypass wal_retrieve_retry_interval for logical
 workers as appropriate

---
 src/backend/commands/subscriptioncmds.c     |   9 +-
 src/backend/replication/logical/launcher.c  | 155 +++++++++++++++-----
 src/backend/replication/logical/tablesync.c |   2 +-
 src/backend/replication/logical/worker.c    |   2 +-
 src/include/replication/worker_internal.h   |   3 +
 5 files changed, 128 insertions(+), 43 deletions(-)

diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index b9bbb2cf4e..3293ec2043 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -1137,8 +1137,13 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 					BoolGetDatum(opts.enabled);
 				replaces[Anum_pg_subscription_subenabled - 1] = true;
 
-				if (opts.enabled)
-					ApplyLauncherWakeupAtCommit();
+				/*
+				 * Even if the subscription is disabled, we wake up the
+				 * launcher so that it clears its last start time.  This
+				 * ensures the workers will be able to start up right away when
+				 * the subscription is enabled again.
+				 */
+				ApplyLauncherWakeupAtCommit();
 
 				update_tuple = true;
 				break;
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index a69e371c05..544e73dd62 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -297,7 +297,7 @@ retry:
 	{
 		LogicalRepWorker *w = &LogicalRepCtx->workers[i];
 
-		if (!w->in_use)
+		if (!w->in_use && !w->restart_immediately)
 		{
 			worker = w;
 			slot = i;
@@ -620,8 +620,14 @@ logicalrep_worker_cleanup(LogicalRepWorker *worker)
 	worker->proc = NULL;
 	worker->dbid = InvalidOid;
 	worker->userid = InvalidOid;
-	worker->subid = InvalidOid;
 	worker->relid = InvalidOid;
+
+	/*
+	 * If restart_immediately was set, retain the subscription ID so that the
+	 * launcher knows which worker it should restart right away.
+	 */
+	if (!worker->restart_immediately)
+		worker->subid = InvalidOid;
 }
 
 /*
@@ -801,7 +807,13 @@ ApplyLauncherWakeup(void)
 void
 ApplyLauncherMain(Datum main_arg)
 {
-	TimestampTz last_start_time = 0;
+	struct launcher_start_time_mapping
+	{
+		Oid			subid;
+		TimestampTz last_start_time;
+	};
+	HTAB	   *last_start_times = NULL;
+	HASHCTL		ctl;
 
 	ereport(DEBUG1,
 			(errmsg_internal("logical replication launcher started")));
@@ -822,6 +834,18 @@ ApplyLauncherMain(Datum main_arg)
 	 */
 	BackgroundWorkerInitializeConnection(NULL, NULL, 0);
 
+	/*
+	 * Prepare a hash table for tracking last start times of workers, to avoid
+	 * immediate restarts.  Ideally, this hash table would be created in shared
+	 * memory so that other backends could adjust it directly to avoid race
+	 * conditions, but it would need to be a fixed size, and the number of
+	 * subscriptions is virtually unbounded.
+	 */
+	ctl.keysize = sizeof(Oid);
+	ctl.entrysize = sizeof(struct launcher_start_time_mapping);
+	last_start_times = hash_create("Logical replication apply worker start times",
+								   256, &ctl, HASH_ELEM | HASH_BLOBS);
+
 	/* Enter main loop */
 	for (;;)
 	{
@@ -832,63 +856,116 @@ ApplyLauncherMain(Datum main_arg)
 		MemoryContext oldctx;
 		TimestampTz now;
 		long		wait_time = DEFAULT_NAPTIME_PER_CYCLE;
+		HASH_SEQ_STATUS status;
+		struct launcher_start_time_mapping *hentry;
 
 		CHECK_FOR_INTERRUPTS();
 
 		now = GetCurrentTimestamp();
 
-		/* Limit the start retry to once a wal_retrieve_retry_interval */
-		if (TimestampDifferenceExceeds(last_start_time, now,
-									   wal_retrieve_retry_interval))
+		/* Use temporary context for the database list and worker info. */
+		subctx = AllocSetContextCreate(TopMemoryContext,
+									   "Logical Replication Launcher sublist",
+									   ALLOCSET_DEFAULT_SIZES);
+		oldctx = MemoryContextSwitchTo(subctx);
+
+		sublist = get_subscription_list();
+		foreach(lc, sublist)
 		{
-			/* Use temporary context for the database list and worker info. */
-			subctx = AllocSetContextCreate(TopMemoryContext,
-										   "Logical Replication Launcher sublist",
-										   ALLOCSET_DEFAULT_SIZES);
-			oldctx = MemoryContextSwitchTo(subctx);
+			Subscription *sub = (Subscription *) lfirst(lc);
+			bool		bypass_retry_interval = false;
+			LogicalRepWorker *w;
 
-			/* search for subscriptions to start or stop. */
-			sublist = get_subscription_list();
+			LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
 
-			/* Start the missing workers for enabled subscriptions. */
-			foreach(lc, sublist)
+			/*
+			 * Bypass wal_retrieve_retry_interval if the worker set
+			 * restart_immediately.  We do this before checking if the
+			 * subscription is enabled so that the slot can be freed
+			 * regardless.
+			 */
+			for (int i = 0; i < max_logical_replication_workers; i++)
 			{
-				Subscription *sub = (Subscription *) lfirst(lc);
-				LogicalRepWorker *w;
+				w = &LogicalRepCtx->workers[i];
 
-				if (!sub->enabled)
-					continue;
+				if (!w->in_use && !w->proc && w->subid == sub->oid &&
+					w->relid == InvalidOid && w->restart_immediately)
+				{
+					w->restart_immediately = false;
+					logicalrep_worker_cleanup(w);
+					bypass_retry_interval = true;
+					break;
+				}
+			}
 
-				LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
-				w = logicalrep_worker_find(sub->oid, InvalidOid, false);
-				LWLockRelease(LogicalRepWorkerLock);
+			w = logicalrep_worker_find(sub->oid, InvalidOid, false);
+			LWLockRelease(LogicalRepWorkerLock);
 
-				if (w == NULL)
-				{
-					last_start_time = now;
-					wait_time = wal_retrieve_retry_interval;
+			/*
+			 * If the subscription isn't enabled, clear its entry in
+			 * last_start_times so that its apply worker is restarted right
+			 * away when it is enabled again.  There is a chance that the
+			 * subscription could be enabled again before we've had a chance to
+			 * clear its entry, in which case we'll wait a little bit before
+			 * starting the worker.
+			 */
+			if (!sub->enabled)
+			{
+				hash_search(last_start_times, &sub->oid, HASH_REMOVE, NULL);
+				continue;
+			}
+
+			/*
+			 * If its okay to start the worker now, do so.  Otherwise, adjust
+			 * wait_time so that we wake up when we can start it.
+			 */
+			if (w == NULL)
+			{
+				bool		found;
 
+				hentry = hash_search(last_start_times, &sub->oid,
+									 HASH_ENTER, &found);
+
+				if (bypass_retry_interval || !found ||
+					TimestampDifferenceExceeds(hentry->last_start_time, now,
+											   wal_retrieve_retry_interval))
+				{
+					hentry->last_start_time = now;
 					logicalrep_worker_launch(sub->dbid, sub->oid, sub->name,
 											 sub->owner, InvalidOid);
 				}
-			}
+				else
+				{
+					long		elapsed;
 
-			/* Switch back to original memory context. */
-			MemoryContextSwitchTo(oldctx);
-			/* Clean the temporary memory. */
-			MemoryContextDelete(subctx);
+					elapsed = TimestampDifferenceMilliseconds(hentry->last_start_time, now);
+					wait_time = Min(wait_time, wal_retrieve_retry_interval - elapsed);
+				}
+			}
 		}
-		else
+
+		/*
+		 * Do garbage collection on the last_start_times hash table.  In
+		 * theory, a subscription OID could be reused before its entry is
+		 * removed, but the risk of that seems low, and at worst the launcher
+		 * will wait a bit longer before starting the new subscription's apply
+		 * worker.  This risk could be reduced by removing entries for
+		 * subscriptions that aren't in sublist, but it doesn't seem worth the
+		 * trouble.
+		 */
+		hash_seq_init(&status, last_start_times);
+		while ((hentry = (struct launcher_start_time_mapping *) hash_seq_search(&status)) != NULL)
 		{
-			/*
-			 * The wait in previous cycle was interrupted in less than
-			 * wal_retrieve_retry_interval since last worker was started, this
-			 * usually means crash of the worker, so we should retry in
-			 * wal_retrieve_retry_interval again.
-			 */
-			wait_time = wal_retrieve_retry_interval;
+			if (TimestampDifferenceExceeds(hentry->last_start_time, now,
+										   wal_retrieve_retry_interval))
+				hash_search(last_start_times, &hentry->subid, HASH_REMOVE, NULL);
 		}
 
+		/* Switch back to original memory context. */
+		MemoryContextSwitchTo(oldctx);
+		/* Clean the temporary memory. */
+		MemoryContextDelete(subctx);
+
 		/* Wait for more work. */
 		rc = WaitLatch(MyLatch,
 					   WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index fcadc1e98e..3fa6472c88 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -465,7 +465,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 		ereport(LOG,
 				(errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled",
 						MySubscription->name)));
-
+		MyLogicalRepWorker->restart_immediately = true;
 		proc_exit(0);
 	}
 
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 3e2ea32e1e..0521459898 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -3130,7 +3130,7 @@ maybe_reread_subscription(void)
 		ereport(LOG,
 				(errmsg("logical replication apply worker for subscription \"%s\" will restart because of a parameter change",
 						MySubscription->name)));
-
+		MyLogicalRepWorker->restart_immediately = true;
 		proc_exit(0);
 	}
 
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 2a3ec5c2d8..68a56c175b 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -66,6 +66,9 @@ typedef struct LogicalRepWorker
 	TimestampTz last_recv_time;
 	XLogRecPtr	reply_lsn;
 	TimestampTz reply_time;
+
+	/* Should the launcher restart the worker immediately? */
+	bool		restart_immediately;
 } LogicalRepWorker;
 
 /* Main memory context for apply worker. Permanent during worker lifetime. */
-- 
2.25.1

Reply via email to