On Wed, Jan 04, 2023 at 08:12:37PM -0800, Nathan Bossart wrote:
> On Thu, Jan 05, 2023 at 09:09:12AM +0530, Amit Kapila wrote:
>> But there doesn't appear to be any guarantee that the result for
>> AllTablesyncsReady() will change between the time it is invoked
>> earlier in the function and at the place you have it in the patch.
>> This is because the value of 'table_states_valid' may not have
>> changed. So, how is this supposed to work?
> 
> The call to CommandCounterIncrement() should set table_states_valid to
> false if needed.

In v12, I moved the restart for two_phase mode to the end of
process_syncing_tables_for_apply() so that we don't need to rely on another
iteration of the loop.

-- 
Nathan Bossart
Amazon Web Services: https://aws.amazon.com
>From 57bfee9615e25d62dc975325695fdf445c002a65 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nathandboss...@gmail.com>
Date: Mon, 21 Nov 2022 16:01:01 -0800
Subject: [PATCH v12 1/2] wake up logical workers as needed instead of relying
 on periodic wakeups

---
 src/backend/access/transam/xact.c           |  3 ++
 src/backend/commands/alter.c                |  7 +++
 src/backend/commands/subscriptioncmds.c     |  4 ++
 src/backend/replication/logical/tablesync.c | 49 ++++++++++++---------
 src/backend/replication/logical/worker.c    | 46 +++++++++++++++++++
 src/include/replication/logicalworker.h     |  3 ++
 6 files changed, 90 insertions(+), 22 deletions(-)

diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 24221542e7..54145bf805 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -47,6 +47,7 @@
 #include "pgstat.h"
 #include "replication/logical.h"
 #include "replication/logicallauncher.h"
+#include "replication/logicalworker.h"
 #include "replication/origin.h"
 #include "replication/snapbuild.h"
 #include "replication/syncrep.h"
@@ -2360,6 +2361,7 @@ CommitTransaction(void)
 	AtEOXact_PgStat(true, is_parallel_worker);
 	AtEOXact_Snapshot(true, false);
 	AtEOXact_ApplyLauncher(true);
+	AtEOXact_LogicalRepWorkers(true);
 	pgstat_report_xact_timestamp(0);
 
 	CurrentResourceOwner = NULL;
@@ -2860,6 +2862,7 @@ AbortTransaction(void)
 		AtEOXact_HashTables(false);
 		AtEOXact_PgStat(false, is_parallel_worker);
 		AtEOXact_ApplyLauncher(false);
+		AtEOXact_LogicalRepWorkers(false);
 		pgstat_report_xact_timestamp(0);
 	}
 
diff --git a/src/backend/commands/alter.c b/src/backend/commands/alter.c
index 70d359eb6a..4e8102b59f 100644
--- a/src/backend/commands/alter.c
+++ b/src/backend/commands/alter.c
@@ -59,6 +59,7 @@
 #include "commands/user.h"
 #include "miscadmin.h"
 #include "parser/parse_func.h"
+#include "replication/logicalworker.h"
 #include "rewrite/rewriteDefine.h"
 #include "tcop/utility.h"
 #include "utils/builtins.h"
@@ -279,6 +280,12 @@ AlterObjectRename_internal(Relation rel, Oid objectId, const char *new_name)
 		if (strncmp(new_name, "regress_", 8) != 0)
 			elog(WARNING, "subscriptions created by regression test cases should have names starting with \"regress_\"");
 #endif
+
+		/*
+		 * Wake up the logical replication workers to handle this change
+		 * quickly.
+		 */
+		LogicalRepWorkersWakeupAtCommit(objectId);
 	}
 	else if (nameCacheId >= 0)
 	{
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index b9c5df796f..b9bbb2cf4e 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -34,6 +34,7 @@
 #include "nodes/makefuncs.h"
 #include "pgstat.h"
 #include "replication/logicallauncher.h"
+#include "replication/logicalworker.h"
 #include "replication/origin.h"
 #include "replication/slot.h"
 #include "replication/walreceiver.h"
@@ -1362,6 +1363,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 
 	InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0);
 
+	/* Wake up the logical replication workers to handle this change quickly. */
+	LogicalRepWorkersWakeupAtCommit(subid);
+
 	return myself;
 }
 
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 2fdfeb5b4c..85e2a2861f 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -415,6 +415,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 	static HTAB *last_start_times = NULL;
 	ListCell   *lc;
 	bool		started_tx = false;
+	bool		should_exit = false;
 
 	Assert(!IsTransactionState());
 
@@ -446,28 +447,6 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 		last_start_times = NULL;
 	}
 
-	/*
-	 * Even when the two_phase mode is requested by the user, it remains as
-	 * 'pending' until all tablesyncs have reached READY state.
-	 *
-	 * When this happens, we restart the apply worker and (if the conditions
-	 * are still ok) then the two_phase tri-state will become 'enabled' at
-	 * that time.
-	 *
-	 * Note: If the subscription has no tables then leave the state as
-	 * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
-	 * work.
-	 */
-	if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING &&
-		AllTablesyncsReady())
-	{
-		ereport(LOG,
-				(errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled",
-						MySubscription->name)));
-
-		proc_exit(0);
-	}
-
 	/*
 	 * Process all tables that are being synchronized.
 	 */
@@ -619,9 +598,35 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 
 	if (started_tx)
 	{
+		/*
+		 * Even when the two_phase mode is requested by the user, it remains as
+		 * 'pending' until all tablesyncs have reached READY state.
+		 *
+		 * When this happens, we restart the apply worker and (if the conditions
+		 * are still ok) then the two_phase tri-state will become 'enabled' at
+		 * that time.
+		 *
+		 * Note: If the subscription has no tables then leave the state as
+		 * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
+		 * work.
+		 */
+		CommandCounterIncrement();
+		if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING &&
+			AllTablesyncsReady())
+		{
+			ereport(LOG,
+					(errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled",
+							MySubscription->name)));
+
+			should_exit = true;
+		}
+
 		CommitTransactionCommand();
 		pgstat_report_stat(true);
 	}
+
+	if (should_exit)
+		proc_exit(0);
 }
 
 /*
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 446f84fa97..3e2ea32e1e 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -254,6 +254,8 @@ WalReceiverConn *LogRepWorkerWalRcvConn = NULL;
 Subscription *MySubscription = NULL;
 static bool MySubscriptionValid = false;
 
+static List *on_commit_wakeup_workers_subids = NIL;
+
 bool		in_remote_transaction = false;
 static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
 
@@ -4097,3 +4099,47 @@ reset_apply_error_context_info(void)
 	apply_error_callback_arg.remote_attnum = -1;
 	set_apply_error_context_xact(InvalidTransactionId, InvalidXLogRecPtr);
 }
+
+/*
+ * Wakeup the stored subscriptions' workers on commit if requested.
+ */
+void
+AtEOXact_LogicalRepWorkers(bool isCommit)
+{
+	if (isCommit && on_commit_wakeup_workers_subids != NIL)
+	{
+		ListCell   *subid;
+
+		LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+		foreach(subid, on_commit_wakeup_workers_subids)
+		{
+			List	   *workers;
+			ListCell   *worker;
+
+			workers = logicalrep_workers_find(lfirst_oid(subid), true);
+			foreach(worker, workers)
+				logicalrep_worker_wakeup_ptr((LogicalRepWorker *) lfirst(worker));
+		}
+		LWLockRelease(LogicalRepWorkerLock);
+	}
+
+	on_commit_wakeup_workers_subids = NIL;
+}
+
+/*
+ * Request wakeup of the workers for the given subscription ID on commit of the
+ * transaction.
+ *
+ * This is used to ensure that the workers process assorted changes as soon as
+ * possible.
+ */
+void
+LogicalRepWorkersWakeupAtCommit(Oid subid)
+{
+	MemoryContext oldcxt;
+
+	oldcxt = MemoryContextSwitchTo(TopTransactionContext);
+	on_commit_wakeup_workers_subids = list_append_unique_oid(on_commit_wakeup_workers_subids,
+															 subid);
+	MemoryContextSwitchTo(oldcxt);
+}
diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h
index f1e7e8a348..3b2084f81f 100644
--- a/src/include/replication/logicalworker.h
+++ b/src/include/replication/logicalworker.h
@@ -16,4 +16,7 @@ extern void ApplyWorkerMain(Datum main_arg);
 
 extern bool IsLogicalWorker(void);
 
+extern void LogicalRepWorkersWakeupAtCommit(Oid subid);
+extern void AtEOXact_LogicalRepWorkers(bool isCommit);
+
 #endif							/* LOGICALWORKER_H */
-- 
2.25.1

>From f2df8156500f55a4db433dc385da5b4020fe8fa9 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nathandboss...@gmail.com>
Date: Wed, 4 Jan 2023 11:38:20 -0800
Subject: [PATCH v12 2/2] bypass wal_retrieve_retry_interval for logical
 workers as appropriate

---
 src/backend/commands/subscriptioncmds.c     |   9 +-
 src/backend/replication/logical/launcher.c  | 155 +++++++++++++++-----
 src/backend/replication/logical/tablesync.c |   1 +
 src/backend/replication/logical/worker.c    |   2 +-
 src/include/replication/worker_internal.h   |   3 +
 5 files changed, 128 insertions(+), 42 deletions(-)

diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index b9bbb2cf4e..3293ec2043 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -1137,8 +1137,13 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 					BoolGetDatum(opts.enabled);
 				replaces[Anum_pg_subscription_subenabled - 1] = true;
 
-				if (opts.enabled)
-					ApplyLauncherWakeupAtCommit();
+				/*
+				 * Even if the subscription is disabled, we wake up the
+				 * launcher so that it clears its last start time.  This
+				 * ensures the workers will be able to start up right away when
+				 * the subscription is enabled again.
+				 */
+				ApplyLauncherWakeupAtCommit();
 
 				update_tuple = true;
 				break;
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index a69e371c05..544e73dd62 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -297,7 +297,7 @@ retry:
 	{
 		LogicalRepWorker *w = &LogicalRepCtx->workers[i];
 
-		if (!w->in_use)
+		if (!w->in_use && !w->restart_immediately)
 		{
 			worker = w;
 			slot = i;
@@ -620,8 +620,14 @@ logicalrep_worker_cleanup(LogicalRepWorker *worker)
 	worker->proc = NULL;
 	worker->dbid = InvalidOid;
 	worker->userid = InvalidOid;
-	worker->subid = InvalidOid;
 	worker->relid = InvalidOid;
+
+	/*
+	 * If restart_immediately was set, retain the subscription ID so that the
+	 * launcher knows which worker it should restart right away.
+	 */
+	if (!worker->restart_immediately)
+		worker->subid = InvalidOid;
 }
 
 /*
@@ -801,7 +807,13 @@ ApplyLauncherWakeup(void)
 void
 ApplyLauncherMain(Datum main_arg)
 {
-	TimestampTz last_start_time = 0;
+	struct launcher_start_time_mapping
+	{
+		Oid			subid;
+		TimestampTz last_start_time;
+	};
+	HTAB	   *last_start_times = NULL;
+	HASHCTL		ctl;
 
 	ereport(DEBUG1,
 			(errmsg_internal("logical replication launcher started")));
@@ -822,6 +834,18 @@ ApplyLauncherMain(Datum main_arg)
 	 */
 	BackgroundWorkerInitializeConnection(NULL, NULL, 0);
 
+	/*
+	 * Prepare a hash table for tracking last start times of workers, to avoid
+	 * immediate restarts.  Ideally, this hash table would be created in shared
+	 * memory so that other backends could adjust it directly to avoid race
+	 * conditions, but it would need to be a fixed size, and the number of
+	 * subscriptions is virtually unbounded.
+	 */
+	ctl.keysize = sizeof(Oid);
+	ctl.entrysize = sizeof(struct launcher_start_time_mapping);
+	last_start_times = hash_create("Logical replication apply worker start times",
+								   256, &ctl, HASH_ELEM | HASH_BLOBS);
+
 	/* Enter main loop */
 	for (;;)
 	{
@@ -832,63 +856,116 @@ ApplyLauncherMain(Datum main_arg)
 		MemoryContext oldctx;
 		TimestampTz now;
 		long		wait_time = DEFAULT_NAPTIME_PER_CYCLE;
+		HASH_SEQ_STATUS status;
+		struct launcher_start_time_mapping *hentry;
 
 		CHECK_FOR_INTERRUPTS();
 
 		now = GetCurrentTimestamp();
 
-		/* Limit the start retry to once a wal_retrieve_retry_interval */
-		if (TimestampDifferenceExceeds(last_start_time, now,
-									   wal_retrieve_retry_interval))
+		/* Use temporary context for the database list and worker info. */
+		subctx = AllocSetContextCreate(TopMemoryContext,
+									   "Logical Replication Launcher sublist",
+									   ALLOCSET_DEFAULT_SIZES);
+		oldctx = MemoryContextSwitchTo(subctx);
+
+		sublist = get_subscription_list();
+		foreach(lc, sublist)
 		{
-			/* Use temporary context for the database list and worker info. */
-			subctx = AllocSetContextCreate(TopMemoryContext,
-										   "Logical Replication Launcher sublist",
-										   ALLOCSET_DEFAULT_SIZES);
-			oldctx = MemoryContextSwitchTo(subctx);
+			Subscription *sub = (Subscription *) lfirst(lc);
+			bool		bypass_retry_interval = false;
+			LogicalRepWorker *w;
 
-			/* search for subscriptions to start or stop. */
-			sublist = get_subscription_list();
+			LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
 
-			/* Start the missing workers for enabled subscriptions. */
-			foreach(lc, sublist)
+			/*
+			 * Bypass wal_retrieve_retry_interval if the worker set
+			 * restart_immediately.  We do this before checking if the
+			 * subscription is enabled so that the slot can be freed
+			 * regardless.
+			 */
+			for (int i = 0; i < max_logical_replication_workers; i++)
 			{
-				Subscription *sub = (Subscription *) lfirst(lc);
-				LogicalRepWorker *w;
+				w = &LogicalRepCtx->workers[i];
 
-				if (!sub->enabled)
-					continue;
+				if (!w->in_use && !w->proc && w->subid == sub->oid &&
+					w->relid == InvalidOid && w->restart_immediately)
+				{
+					w->restart_immediately = false;
+					logicalrep_worker_cleanup(w);
+					bypass_retry_interval = true;
+					break;
+				}
+			}
 
-				LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
-				w = logicalrep_worker_find(sub->oid, InvalidOid, false);
-				LWLockRelease(LogicalRepWorkerLock);
+			w = logicalrep_worker_find(sub->oid, InvalidOid, false);
+			LWLockRelease(LogicalRepWorkerLock);
 
-				if (w == NULL)
-				{
-					last_start_time = now;
-					wait_time = wal_retrieve_retry_interval;
+			/*
+			 * If the subscription isn't enabled, clear its entry in
+			 * last_start_times so that its apply worker is restarted right
+			 * away when it is enabled again.  There is a chance that the
+			 * subscription could be enabled again before we've had a chance to
+			 * clear its entry, in which case we'll wait a little bit before
+			 * starting the worker.
+			 */
+			if (!sub->enabled)
+			{
+				hash_search(last_start_times, &sub->oid, HASH_REMOVE, NULL);
+				continue;
+			}
+
+			/*
+			 * If its okay to start the worker now, do so.  Otherwise, adjust
+			 * wait_time so that we wake up when we can start it.
+			 */
+			if (w == NULL)
+			{
+				bool		found;
 
+				hentry = hash_search(last_start_times, &sub->oid,
+									 HASH_ENTER, &found);
+
+				if (bypass_retry_interval || !found ||
+					TimestampDifferenceExceeds(hentry->last_start_time, now,
+											   wal_retrieve_retry_interval))
+				{
+					hentry->last_start_time = now;
 					logicalrep_worker_launch(sub->dbid, sub->oid, sub->name,
 											 sub->owner, InvalidOid);
 				}
-			}
+				else
+				{
+					long		elapsed;
 
-			/* Switch back to original memory context. */
-			MemoryContextSwitchTo(oldctx);
-			/* Clean the temporary memory. */
-			MemoryContextDelete(subctx);
+					elapsed = TimestampDifferenceMilliseconds(hentry->last_start_time, now);
+					wait_time = Min(wait_time, wal_retrieve_retry_interval - elapsed);
+				}
+			}
 		}
-		else
+
+		/*
+		 * Do garbage collection on the last_start_times hash table.  In
+		 * theory, a subscription OID could be reused before its entry is
+		 * removed, but the risk of that seems low, and at worst the launcher
+		 * will wait a bit longer before starting the new subscription's apply
+		 * worker.  This risk could be reduced by removing entries for
+		 * subscriptions that aren't in sublist, but it doesn't seem worth the
+		 * trouble.
+		 */
+		hash_seq_init(&status, last_start_times);
+		while ((hentry = (struct launcher_start_time_mapping *) hash_seq_search(&status)) != NULL)
 		{
-			/*
-			 * The wait in previous cycle was interrupted in less than
-			 * wal_retrieve_retry_interval since last worker was started, this
-			 * usually means crash of the worker, so we should retry in
-			 * wal_retrieve_retry_interval again.
-			 */
-			wait_time = wal_retrieve_retry_interval;
+			if (TimestampDifferenceExceeds(hentry->last_start_time, now,
+										   wal_retrieve_retry_interval))
+				hash_search(last_start_times, &hentry->subid, HASH_REMOVE, NULL);
 		}
 
+		/* Switch back to original memory context. */
+		MemoryContextSwitchTo(oldctx);
+		/* Clean the temporary memory. */
+		MemoryContextDelete(subctx);
+
 		/* Wait for more work. */
 		rc = WaitLatch(MyLatch,
 					   WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 85e2a2861f..d6213c55fc 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -618,6 +618,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 					(errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled",
 							MySubscription->name)));
 
+			MyLogicalRepWorker->restart_immediately = true;
 			should_exit = true;
 		}
 
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 3e2ea32e1e..0521459898 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -3130,7 +3130,7 @@ maybe_reread_subscription(void)
 		ereport(LOG,
 				(errmsg("logical replication apply worker for subscription \"%s\" will restart because of a parameter change",
 						MySubscription->name)));
-
+		MyLogicalRepWorker->restart_immediately = true;
 		proc_exit(0);
 	}
 
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 2a3ec5c2d8..68a56c175b 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -66,6 +66,9 @@ typedef struct LogicalRepWorker
 	TimestampTz last_recv_time;
 	XLogRecPtr	reply_lsn;
 	TimestampTz reply_time;
+
+	/* Should the launcher restart the worker immediately? */
+	bool		restart_immediately;
 } LogicalRepWorker;
 
 /* Main memory context for apply worker. Permanent during worker lifetime. */
-- 
2.25.1

Reply via email to