From 8e926809636f1be474f26cec3d8b3dce80a17a6d Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Thu, 22 Dec 2022 02:49:48 +0000
Subject: [PATCH v6 1/2] Exit walsender before confirming remote flush in
 logical replication

Currently, at shutdown, walsender processes wait to send all pending data and
ensure the all data is flushed in remote node. This mechanism was added by
985bd7 for supporting clean switch over, but such use-case cannot be supported
for logical replication. This commit remove the blocking in the case.

Author: Hayato Kuroda
---
 doc/src/sgml/logical-replication.sgml | 10 ++++++
 src/backend/replication/walsender.c   | 50 ++++++++++++++++++---------
 2 files changed, 44 insertions(+), 16 deletions(-)

diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index 6bd5f61e2b..ccddb8a35a 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -1701,6 +1701,16 @@ CONTEXT:  processing remote data for replication origin "pg_16395" during "INSER
    being synchronized. Moreover, if the streaming transaction is applied in
    parallel, there may be additional parallel apply workers.
   </para>
+
+  <caution>
+   <para>
+    Unlike physical replication, data synchronization by logical replication is
+    more likely to be suspended. It is because workers sometimes wait for
+    acquiring locks and they do not consume messages from the publisher. It
+    will be resolved automatically when workers acquire locks and start
+    consuming arrivals.
+   </para>
+  </caution>
  </sect1>
 
  <sect1 id="logical-replication-security">
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 4ed3747e3f..25a052adfc 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1450,6 +1450,10 @@ ProcessPendingWrites(void)
 		/* Try to flush pending output to the client */
 		if (pq_flush_if_writable() != 0)
 			WalSndShutdown();
+
+		/* If we got shut down requested, try to exit the process */
+		if (got_STOPPING)
+			WalSndDone(XLogSendLogical);
 	}
 
 	/* reactivate latch so WalSndLoop knows to continue */
@@ -2513,18 +2517,14 @@ WalSndLoop(WalSndSendDataCallback send_data)
 										 application_name)));
 				WalSndSetState(WALSNDSTATE_STREAMING);
 			}
-
-			/*
-			 * When SIGUSR2 arrives, we send any outstanding logs up to the
-			 * shutdown checkpoint record (i.e., the latest record), wait for
-			 * them to be replicated to the standby, and exit. This may be a
-			 * normal termination at shutdown, or a promotion, the walsender
-			 * is not sure which.
-			 */
-			if (got_SIGUSR2)
-				WalSndDone(send_data);
 		}
 
+		/*
+		 * When SIGUSR2 arrives, try to exit the process.
+		 */
+		if (got_SIGUSR2)
+			WalSndDone(send_data);
+
 		/* Check for replication timeout. */
 		WalSndCheckTimeOut();
 
@@ -3094,13 +3094,14 @@ XLogSendLogical(void)
 }
 
 /*
- * Shutdown if the sender is caught up.
+ * Shutdown if the sender is we are in a convenient time.
  *
  * NB: This should only be called when the shutdown signal has been received
  * from postmaster.
  *
- * Note that if we determine that there's still more data to send, this
- * function will return control to the caller.
+ * Note that if we determine that there's still more data to send or we are in
+ * physical replication mode and all WALs are not yet replicated, this function
+ * will return control to the caller.
  */
 static void
 WalSndDone(WalSndSendDataCallback send_data)
@@ -3118,15 +3119,32 @@ WalSndDone(WalSndSendDataCallback send_data)
 	replicatedPtr = XLogRecPtrIsInvalid(MyWalSnd->flush) ?
 		MyWalSnd->write : MyWalSnd->flush;
 
-	if (WalSndCaughtUp && sentPtr == replicatedPtr &&
-		!pq_is_send_pending())
+	/*
+	 * Exit if we are in the convenient time.
+	 *
+	 * When we are logical replication mode, we don't have to wait that all
+	 * sent data to be flushed on the subscriber because we cannot support
+	 * clean switchover for it.
+	 */
+	if (WalSndCaughtUp &&
+		(send_data == XLogSendLogical ||
+		 (sentPtr == replicatedPtr && !pq_is_send_pending())))
 	{
 		QueryCompletion qc;
 
 		/* Inform the standby that XLOG streaming is done */
 		SetQueryCompletion(&qc, CMDTAG_COPY, 0);
 		EndCommand(&qc, DestRemote, false);
-		pq_flush();
+
+		/*
+		 * Flush pending data if writable.
+		 *
+		 * Note that the output buffer may be full in case of logical
+		 * replication. If pq_flush() is called at that time, the walsender
+		 * process will be stuck. Therefore, call pq_flush_if_writable()
+		 * instead.
+		 */
+		pq_flush_if_writable();
 
 		proc_exit(0);
 	}
-- 
2.27.0

