On Sun, Dec 18, 2022 at 03:36:07PM -0800, Nathan Bossart wrote:
> This seems to have somehow broken the archiving tests on Windows, so
> obviously I owe some better analysis here.  I didn't see anything obvious
> in the logs, but I will continue to dig.

On Windows, WaitForWALToBecomeAvailable() seems to depend on the call to
WaitLatch() for wal_retrieve_retry_interval to ensure that signals are
dispatched (i.e., pgwin32_dispatch_queued_signals()).  My first instinct is
to just always call WaitLatch() in this code path, even if
wal_retrieve_rety_interval milliseconds have already elapsed.  The attached
0003 does this.

-- 
Nathan Bossart
Amazon Web Services: https://aws.amazon.com
>From d1025a5d7ad9a966f7ec8bee4bb8127e8a0e1d8b Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nathandboss...@gmail.com>
Date: Mon, 21 Nov 2022 16:01:01 -0800
Subject: [PATCH v10 1/4] wake up logical workers as needed instead of relying
 on periodic wakeups

---
 src/backend/access/transam/xact.c           |  3 ++
 src/backend/commands/alter.c                |  7 ++++
 src/backend/commands/subscriptioncmds.c     |  4 ++
 src/backend/replication/logical/tablesync.c | 10 +++++
 src/backend/replication/logical/worker.c    | 46 +++++++++++++++++++++
 src/include/replication/logicalworker.h     |  3 ++
 6 files changed, 73 insertions(+)

diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index b7c7fd9f00..70ad51c591 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -47,6 +47,7 @@
 #include "pgstat.h"
 #include "replication/logical.h"
 #include "replication/logicallauncher.h"
+#include "replication/logicalworker.h"
 #include "replication/origin.h"
 #include "replication/snapbuild.h"
 #include "replication/syncrep.h"
@@ -2360,6 +2361,7 @@ CommitTransaction(void)
 	AtEOXact_PgStat(true, is_parallel_worker);
 	AtEOXact_Snapshot(true, false);
 	AtEOXact_ApplyLauncher(true);
+	AtEOXact_LogicalRepWorkers(true);
 	pgstat_report_xact_timestamp(0);
 
 	CurrentResourceOwner = NULL;
@@ -2860,6 +2862,7 @@ AbortTransaction(void)
 		AtEOXact_HashTables(false);
 		AtEOXact_PgStat(false, is_parallel_worker);
 		AtEOXact_ApplyLauncher(false);
+		AtEOXact_LogicalRepWorkers(false);
 		pgstat_report_xact_timestamp(0);
 	}
 
diff --git a/src/backend/commands/alter.c b/src/backend/commands/alter.c
index 10b6fe19a2..d095cd3ced 100644
--- a/src/backend/commands/alter.c
+++ b/src/backend/commands/alter.c
@@ -59,6 +59,7 @@
 #include "commands/user.h"
 #include "miscadmin.h"
 #include "parser/parse_func.h"
+#include "replication/logicalworker.h"
 #include "rewrite/rewriteDefine.h"
 #include "tcop/utility.h"
 #include "utils/builtins.h"
@@ -279,6 +280,12 @@ AlterObjectRename_internal(Relation rel, Oid objectId, const char *new_name)
 		if (strncmp(new_name, "regress_", 8) != 0)
 			elog(WARNING, "subscriptions created by regression test cases should have names starting with \"regress_\"");
 #endif
+
+		/*
+		 * Wake up the logical replication workers to handle this change
+		 * quickly.
+		 */
+		LogicalRepWorkersWakeupAtCommit(objectId);
 	}
 	else if (nameCacheId >= 0)
 	{
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index d673557ea4..d6993c26e5 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -34,6 +34,7 @@
 #include "nodes/makefuncs.h"
 #include "pgstat.h"
 #include "replication/logicallauncher.h"
+#include "replication/logicalworker.h"
 #include "replication/origin.h"
 #include "replication/slot.h"
 #include "replication/walreceiver.h"
@@ -1362,6 +1363,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 
 	InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0);
 
+	/* Wake up the logical replication workers to handle this change quickly. */
+	LogicalRepWorkersWakeupAtCommit(subid);
+
 	return myself;
 }
 
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 94e813ac53..509fe2eb19 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -105,6 +105,7 @@
 #include "pgstat.h"
 #include "replication/logicallauncher.h"
 #include "replication/logicalrelation.h"
+#include "replication/logicalworker.h"
 #include "replication/walreceiver.h"
 #include "replication/worker_internal.h"
 #include "replication/slot.h"
@@ -619,6 +620,15 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 
 	if (started_tx)
 	{
+		/*
+		 * If we are ready to enable two_phase mode, wake up the logical
+		 * replication workers to handle this change quickly.
+		 */
+		CommandCounterIncrement();
+		if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING &&
+			AllTablesyncsReady())
+			LogicalRepWorkersWakeupAtCommit(MyLogicalRepWorker->subid);
+
 		CommitTransactionCommand();
 		pgstat_report_stat(true);
 	}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 96772e4d73..722f796c7a 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -254,6 +254,8 @@ WalReceiverConn *LogRepWorkerWalRcvConn = NULL;
 Subscription *MySubscription = NULL;
 static bool MySubscriptionValid = false;
 
+static List *on_commit_wakeup_workers_subids = NIL;
+
 bool		in_remote_transaction = false;
 static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
 
@@ -4097,3 +4099,47 @@ reset_apply_error_context_info(void)
 	apply_error_callback_arg.remote_attnum = -1;
 	set_apply_error_context_xact(InvalidTransactionId, InvalidXLogRecPtr);
 }
+
+/*
+ * Wakeup the stored subscriptions' workers on commit if requested.
+ */
+void
+AtEOXact_LogicalRepWorkers(bool isCommit)
+{
+	if (isCommit && on_commit_wakeup_workers_subids != NIL)
+	{
+		ListCell   *subid;
+
+		LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+		foreach(subid, on_commit_wakeup_workers_subids)
+		{
+			List	   *workers;
+			ListCell   *worker;
+
+			workers = logicalrep_workers_find(lfirst_oid(subid), true);
+			foreach(worker, workers)
+				logicalrep_worker_wakeup_ptr((LogicalRepWorker *) lfirst(worker));
+		}
+		LWLockRelease(LogicalRepWorkerLock);
+	}
+
+	on_commit_wakeup_workers_subids = NIL;
+}
+
+/*
+ * Request wakeup of the workers for the given subscription ID on commit of the
+ * transaction.
+ *
+ * This is used to ensure that the workers process assorted changes as soon as
+ * possible.
+ */
+void
+LogicalRepWorkersWakeupAtCommit(Oid subid)
+{
+	MemoryContext oldcxt;
+
+	oldcxt = MemoryContextSwitchTo(TopTransactionContext);
+	on_commit_wakeup_workers_subids = list_append_unique_oid(on_commit_wakeup_workers_subids,
+															 subid);
+	MemoryContextSwitchTo(oldcxt);
+}
diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h
index cd1b6e8afc..2c2340d758 100644
--- a/src/include/replication/logicalworker.h
+++ b/src/include/replication/logicalworker.h
@@ -16,4 +16,7 @@ extern void ApplyWorkerMain(Datum main_arg);
 
 extern bool IsLogicalWorker(void);
 
+extern void LogicalRepWorkersWakeupAtCommit(Oid subid);
+extern void AtEOXact_LogicalRepWorkers(bool isCommit);
+
 #endif							/* LOGICALWORKER_H */
-- 
2.25.1

>From b16d2c4b8b32b609f5dc5c5ff3527b8f986ab0d8 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nathandboss...@gmail.com>
Date: Thu, 15 Dec 2022 14:11:43 -0800
Subject: [PATCH v10 2/4] handle race condition when restarting wal receivers

---
 src/backend/postmaster/postmaster.c | 24 ++++++++++++++++++------
 1 file changed, 18 insertions(+), 6 deletions(-)

diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index f459dab360..4107a85b51 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -1601,9 +1601,9 @@ checkControlFile(void)
  *
  * In normal conditions we wait at most one minute, to ensure that the other
  * background tasks handled by ServerLoop get done even when no requests are
- * arriving.  However, if there are background workers waiting to be started,
- * we don't actually sleep so that they are quickly serviced.  Other exception
- * cases are as shown in the code.
+ * arriving.  However, if there are background workers or a WAL receiver
+ * waiting to be started, we make sure they are quickly serviced.  Other
+ * exception cases are as shown in the code.
  */
 static void
 DetermineSleepTime(struct timeval *timeout)
@@ -1611,11 +1611,12 @@ DetermineSleepTime(struct timeval *timeout)
 	TimestampTz next_wakeup = 0;
 
 	/*
-	 * Normal case: either there are no background workers at all, or we're in
-	 * a shutdown sequence (during which we ignore bgworkers altogether).
+	 * Normal case: either there are no background workers and no WAL receiver
+	 * at all, or we're in a shutdown sequence (during which we ignore
+	 * bgworkers altogether).
 	 */
 	if (Shutdown > NoShutdown ||
-		(!StartWorkerNeeded && !HaveCrashedWorker))
+		(!StartWorkerNeeded && !HaveCrashedWorker && !WalReceiverRequested))
 	{
 		if (AbortStartTime != 0)
 		{
@@ -1640,6 +1641,17 @@ DetermineSleepTime(struct timeval *timeout)
 		return;
 	}
 
+	/*
+	 * We're probably waiting for SIGCHLD before starting the WAL receiver.  We
+	 * don't expect that to take long.
+	 */
+	if (WalReceiverRequested)
+	{
+		timeout->tv_sec = 0;
+		timeout->tv_usec = 100000;	/* 100ms */
+		return;
+	}
+
 	if (HaveCrashedWorker)
 	{
 		slist_mutable_iter siter;
-- 
2.25.1

>From 34a892e3d7f234f84914aaa5c46579eb3fabb291 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nathandboss...@gmail.com>
Date: Sat, 31 Dec 2022 15:16:54 -0800
Subject: [PATCH v10 3/4] ensure signals are dispatched in startup process on
 Windows

---
 src/backend/access/transam/xlogrecovery.c | 36 ++++++++++++++---------
 1 file changed, 22 insertions(+), 14 deletions(-)

diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index d5a81f9d83..6c4c5a18a1 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -3466,6 +3466,8 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
 		 */
 		if (lastSourceFailed)
 		{
+			long		wait_time;
+
 			/*
 			 * Don't allow any retry loops to occur during nonblocking
 			 * readahead.  Let the caller process everything that has been
@@ -3556,33 +3558,39 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
 					 * and retry from the archive, but if it hasn't been long
 					 * since last attempt, sleep wal_retrieve_retry_interval
 					 * milliseconds to avoid busy-waiting.
+					 *
+					 * NB: Even if it's already been wal_retrieve_rety_interval
+					 * milliseconds since the last attempt, we still call
+					 * WaitLatch() with the timeout set to 0 to make sure any
+					 * queued signals are dispatched on Windows builds.
 					 */
 					now = GetCurrentTimestamp();
 					if (!TimestampDifferenceExceeds(last_fail_time, now,
 													wal_retrieve_retry_interval))
 					{
-						long		wait_time;
-
 						wait_time = wal_retrieve_retry_interval -
 							TimestampDifferenceMilliseconds(last_fail_time, now);
 
 						elog(LOG, "waiting for WAL to become available at %X/%X",
 							 LSN_FORMAT_ARGS(RecPtr));
+					}
+					else
+						wait_time = 0;
 
-						/* Do background tasks that might benefit us later. */
-						KnownAssignedTransactionIdsIdleMaintenance();
+					/* Do background tasks that might benefit us later. */
+					KnownAssignedTransactionIdsIdleMaintenance();
 
-						(void) WaitLatch(&XLogRecoveryCtl->recoveryWakeupLatch,
-										 WL_LATCH_SET | WL_TIMEOUT |
-										 WL_EXIT_ON_PM_DEATH,
-										 wait_time,
-										 WAIT_EVENT_RECOVERY_RETRIEVE_RETRY_INTERVAL);
-						ResetLatch(&XLogRecoveryCtl->recoveryWakeupLatch);
-						now = GetCurrentTimestamp();
+					(void) WaitLatch(&XLogRecoveryCtl->recoveryWakeupLatch,
+									 WL_LATCH_SET | WL_TIMEOUT |
+									 WL_EXIT_ON_PM_DEATH,
+									 wait_time,
+									 WAIT_EVENT_RECOVERY_RETRIEVE_RETRY_INTERVAL);
+					ResetLatch(&XLogRecoveryCtl->recoveryWakeupLatch);
+					now = GetCurrentTimestamp();
+
+					/* Handle interrupt signals of startup process */
+					HandleStartupProcInterrupts();
 
-						/* Handle interrupt signals of startup process */
-						HandleStartupProcInterrupts();
-					}
 					last_fail_time = now;
 					currentSource = XLOG_FROM_ARCHIVE;
 					break;
-- 
2.25.1

>From ea9ff25e584348cabc30d46778ebb7a538cef991 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nathandboss...@gmail.com>
Date: Thu, 15 Dec 2022 09:16:22 -0800
Subject: [PATCH v10 4/4] set wal_retrieve_retry_interval to 1ms in tests

---
 src/test/perl/PostgreSQL/Test/Cluster.pm | 2 +-
 src/test/recovery/t/002_archiving.pl     | 2 --
 src/test/subscription/t/004_sync.pl      | 2 --
 3 files changed, 1 insertion(+), 5 deletions(-)

diff --git a/src/test/perl/PostgreSQL/Test/Cluster.pm b/src/test/perl/PostgreSQL/Test/Cluster.pm
index 7411188dc8..c6aeee07fc 100644
--- a/src/test/perl/PostgreSQL/Test/Cluster.pm
+++ b/src/test/perl/PostgreSQL/Test/Cluster.pm
@@ -532,7 +532,7 @@ sub init
 	print $conf "log_line_prefix = '%m [%p] %q%a '\n";
 	print $conf "log_statement = all\n";
 	print $conf "log_replication_commands = on\n";
-	print $conf "wal_retrieve_retry_interval = '500ms'\n";
+	print $conf "wal_retrieve_retry_interval = '1ms'\n";
 
 	# If a setting tends to affect whether tests pass or fail, print it after
 	# TEMP_CONFIG.  Otherwise, print it before TEMP_CONFIG, thereby permitting
diff --git a/src/test/recovery/t/002_archiving.pl b/src/test/recovery/t/002_archiving.pl
index d69da4e5ef..8e269671b2 100644
--- a/src/test/recovery/t/002_archiving.pl
+++ b/src/test/recovery/t/002_archiving.pl
@@ -28,8 +28,6 @@ my $node_standby = PostgreSQL::Test::Cluster->new('standby');
 # of the primary.
 $node_standby->init_from_backup($node_primary, $backup_name,
 	has_restoring => 1);
-$node_standby->append_conf('postgresql.conf',
-	"wal_retrieve_retry_interval = '100ms'");
 
 # Set archive_cleanup_command and recovery_end_command, checking their
 # execution by the backend with dummy commands.
diff --git a/src/test/subscription/t/004_sync.pl b/src/test/subscription/t/004_sync.pl
index fd4bf7bacd..edb6fbf3a6 100644
--- a/src/test/subscription/t/004_sync.pl
+++ b/src/test/subscription/t/004_sync.pl
@@ -16,8 +16,6 @@ $node_publisher->start;
 # Create subscriber node
 my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
 $node_subscriber->init(allows_streaming => 'logical');
-$node_subscriber->append_conf('postgresql.conf',
-	"wal_retrieve_retry_interval = 1ms");
 $node_subscriber->start;
 
 # Create some preexisting content on publisher
-- 
2.25.1

Reply via email to