On Tue, Nov 29, 2022 at 09:04:41PM -0800, Nathan Bossart wrote:
> I don't mind fixing it!  There are a couple more I'd like to track down
> before posting another revision.

Okay, here is a new version of the patch.  This seems to clear up
everything that I could find via the tests.

Thanks to this effort, I discovered that we need to include
wal_retrieve_retry_interval in our wait time calculations after failed
tablesyncs (for the suppress-unnecessary-wakeups patch).  I'll make that
change and post that patch in a new thread.

-- 
Nathan Bossart
Amazon Web Services: https://aws.amazon.com
>From 79db8837e5b3054fb6007326d230aef50f3cda87 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nathandboss...@gmail.com>
Date: Mon, 21 Nov 2022 16:01:01 -0800
Subject: [PATCH v4 1/1] wake up logical workers after ALTER SUBSCRIPTION and
 when two_phase mode can be enabled

---
 src/backend/access/transam/xact.c        |  3 ++
 src/backend/catalog/pg_subscription.c    |  7 ++++
 src/backend/commands/alter.c             |  7 ++++
 src/backend/commands/subscriptioncmds.c  |  4 +++
 src/backend/replication/logical/worker.c | 46 ++++++++++++++++++++++++
 src/include/replication/logicalworker.h  |  3 ++
 6 files changed, 70 insertions(+)

diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 8086b857b9..dc00e66cfb 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -47,6 +47,7 @@
 #include "pgstat.h"
 #include "replication/logical.h"
 #include "replication/logicallauncher.h"
+#include "replication/logicalworker.h"
 #include "replication/origin.h"
 #include "replication/snapbuild.h"
 #include "replication/syncrep.h"
@@ -2360,6 +2361,7 @@ CommitTransaction(void)
 	AtEOXact_PgStat(true, is_parallel_worker);
 	AtEOXact_Snapshot(true, false);
 	AtEOXact_ApplyLauncher(true);
+	AtEOXact_LogicalRepWorkers(true);
 	pgstat_report_xact_timestamp(0);
 
 	CurrentResourceOwner = NULL;
@@ -2860,6 +2862,7 @@ AbortTransaction(void)
 		AtEOXact_HashTables(false);
 		AtEOXact_PgStat(false, is_parallel_worker);
 		AtEOXact_ApplyLauncher(false);
+		AtEOXact_LogicalRepWorkers(false);
 		pgstat_report_xact_timestamp(0);
 	}
 
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index a506fc3ec8..d9f21b7dcf 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -25,6 +25,7 @@
 #include "catalog/pg_type.h"
 #include "miscadmin.h"
 #include "nodes/makefuncs.h"
+#include "replication/logicalworker.h"
 #include "storage/lmgr.h"
 #include "utils/array.h"
 #include "utils/builtins.h"
@@ -320,6 +321,12 @@ UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
 	/* Update the catalog. */
 	CatalogTupleUpdate(rel, &tup->t_self, tup);
 
+	/*
+	 * We might be ready to enable two_phase mode, which requires the workers
+	 * to restart.  Wake them up at commit time to check the status.
+	 */
+	LogicalRepWorkersWakeupAtCommit(subid);
+
 	/* Cleanup. */
 	table_close(rel, NoLock);
 }
diff --git a/src/backend/commands/alter.c b/src/backend/commands/alter.c
index 10b6fe19a2..da14e91552 100644
--- a/src/backend/commands/alter.c
+++ b/src/backend/commands/alter.c
@@ -59,6 +59,7 @@
 #include "commands/user.h"
 #include "miscadmin.h"
 #include "parser/parse_func.h"
+#include "replication/logicalworker.h"
 #include "rewrite/rewriteDefine.h"
 #include "tcop/utility.h"
 #include "utils/builtins.h"
@@ -410,6 +411,12 @@ ExecRenameStmt(RenameStmt *stmt)
 										   stmt->newname);
 				table_close(catalog, RowExclusiveLock);
 
+				/*
+				 * Wake up the logical replication workers to handle this
+				 * change quickly.
+				 */
+				LogicalRepWorkersWakeupAtCommit(address.objectId);
+
 				return address;
 			}
 
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index d673557ea4..e29cdcbff9 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -34,6 +34,7 @@
 #include "nodes/makefuncs.h"
 #include "pgstat.h"
 #include "replication/logicallauncher.h"
+#include "replication/logicalworker.h"
 #include "replication/origin.h"
 #include "replication/slot.h"
 #include "replication/walreceiver.h"
@@ -1358,6 +1359,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 
 	table_close(rel, RowExclusiveLock);
 
+	/* Wake up the logical replication workers to handle this change quickly. */
+	LogicalRepWorkersWakeupAtCommit(subid);
+
 	ObjectAddressSet(myself, SubscriptionRelationId, subid);
 
 	InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0);
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index e48a3f589a..c39ca5a3cb 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -253,6 +253,8 @@ WalReceiverConn *LogRepWorkerWalRcvConn = NULL;
 Subscription *MySubscription = NULL;
 static bool MySubscriptionValid = false;
 
+static List *on_commit_wakeup_workers_subids = NIL;
+
 bool		in_remote_transaction = false;
 static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
 
@@ -4092,3 +4094,47 @@ reset_apply_error_context_info(void)
 	apply_error_callback_arg.remote_attnum = -1;
 	set_apply_error_context_xact(InvalidTransactionId, InvalidXLogRecPtr);
 }
+
+/*
+ * Wakeup the stored subscriptions' workers on commit if requested.
+ */
+void
+AtEOXact_LogicalRepWorkers(bool isCommit)
+{
+	if (isCommit && on_commit_wakeup_workers_subids != NIL)
+	{
+		ListCell   *subid;
+
+		LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+		foreach(subid, on_commit_wakeup_workers_subids)
+		{
+			List	   *workers;
+			ListCell   *worker;
+
+			workers = logicalrep_workers_find(lfirst_oid(subid), true);
+			foreach(worker, workers)
+				logicalrep_worker_wakeup_ptr((LogicalRepWorker *) lfirst(worker));
+		}
+		LWLockRelease(LogicalRepWorkerLock);
+	}
+
+	on_commit_wakeup_workers_subids = NIL;
+}
+
+/*
+ * Request wakeup of the workers for the given subscription ID on commit of the
+ * transaction.
+ *
+ * This is used to ensure that the workers process assorted changes as soon as
+ * possible.
+ */
+void
+LogicalRepWorkersWakeupAtCommit(Oid subid)
+{
+	MemoryContext oldcxt;
+
+	oldcxt = MemoryContextSwitchTo(TopTransactionContext);
+	on_commit_wakeup_workers_subids = list_append_unique_oid(on_commit_wakeup_workers_subids,
+															 subid);
+	MemoryContextSwitchTo(oldcxt);
+}
diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h
index cd1b6e8afc..2c2340d758 100644
--- a/src/include/replication/logicalworker.h
+++ b/src/include/replication/logicalworker.h
@@ -16,4 +16,7 @@ extern void ApplyWorkerMain(Datum main_arg);
 
 extern bool IsLogicalWorker(void);
 
+extern void LogicalRepWorkersWakeupAtCommit(Oid subid);
+extern void AtEOXact_LogicalRepWorkers(bool isCommit);
+
 #endif							/* LOGICALWORKER_H */
-- 
2.25.1

Reply via email to