From 40340027c4ed5397a670eb3623cbfc9b1235c848 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Thu, 22 Dec 2022 02:49:48 +0000
Subject: [PATCH v5] Exit walsender before confirming remote flush in logical
 replication

Currently, at shutdown, walsender processes wait to send all pending data and
ensure the all data is flushed in remote node. This mechanism was added by
985bd7 for supporting clean switch over, but such use-case cannot be supported
for logical replication. This commit remove the blocking in the case.

Author: Hayato Kuroda
---
 doc/src/sgml/logical-replication.sgml | 10 ++++++
 src/backend/replication/walsender.c   | 50 ++++++++++++++++++---------
 2 files changed, 44 insertions(+), 16 deletions(-)

diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index 6407804547..88b9a63f30 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -1701,6 +1701,16 @@ CONTEXT:  processing remote data for replication origin "pg_16395" during "INSER
    table is in progress, there will be additional workers for the tables
    being synchronized.
   </para>
+
+  <caution>
+   <para>
+    Unlike physical replication, data synchronization by logical replication is
+    more likely to be suspended. It is because workers sometimes wait for
+    acquiring locks and they do not consume messages from the publisher. It
+    will be resolved automatically when workers acquire locks and start
+    consuming arrivals.
+   </para>
+  </caution>
  </sect1>
 
  <sect1 id="logical-replication-security">
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 015ae2995d..0179eb7142 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1450,6 +1450,10 @@ ProcessPendingWrites(void)
 		/* Try to flush pending output to the client */
 		if (pq_flush_if_writable() != 0)
 			WalSndShutdown();
+
+		/* If we got shut down requested, try to exit the process */
+		if (got_STOPPING)
+			WalSndDone(XLogSendLogical);
 	}
 
 	/* reactivate latch so WalSndLoop knows to continue */
@@ -2513,18 +2517,14 @@ WalSndLoop(WalSndSendDataCallback send_data)
 										 application_name)));
 				WalSndSetState(WALSNDSTATE_STREAMING);
 			}
-
-			/*
-			 * When SIGUSR2 arrives, we send any outstanding logs up to the
-			 * shutdown checkpoint record (i.e., the latest record), wait for
-			 * them to be replicated to the standby, and exit. This may be a
-			 * normal termination at shutdown, or a promotion, the walsender
-			 * is not sure which.
-			 */
-			if (got_SIGUSR2)
-				WalSndDone(send_data);
 		}
 
+		/*
+		 * When SIGUSR2 arrives, try to exit the process.
+		 */
+		if (got_SIGUSR2)
+			WalSndDone(send_data);
+
 		/* Check for replication timeout. */
 		WalSndCheckTimeOut();
 
@@ -3094,13 +3094,14 @@ XLogSendLogical(void)
 }
 
 /*
- * Shutdown if the sender is caught up.
+ * Shutdown if the sender is we are in a convenient time.
  *
  * NB: This should only be called when the shutdown signal has been received
  * from postmaster.
  *
- * Note that if we determine that there's still more data to send, this
- * function will return control to the caller.
+ * Note that if we determine that there's still more data to send or we are in
+ * physical replication mode and all WALs are not yet replicated, this function
+ * will return control to the caller.
  */
 static void
 WalSndDone(WalSndSendDataCallback send_data)
@@ -3118,15 +3119,32 @@ WalSndDone(WalSndSendDataCallback send_data)
 	replicatedPtr = XLogRecPtrIsInvalid(MyWalSnd->flush) ?
 		MyWalSnd->write : MyWalSnd->flush;
 
-	if (WalSndCaughtUp && sentPtr == replicatedPtr &&
-		!pq_is_send_pending())
+	/*
+	 * Exit if we are in the convenient time.
+	 *
+	 * When we are logical replication mode, we don't have to wait that all
+	 * sent data to be flushed on the subscriber because we cannot support
+	 * clean switchover for it.
+	 */
+	if (WalSndCaughtUp &&
+		(send_data == XLogSendLogical ||
+		 (sentPtr == replicatedPtr && !pq_is_send_pending())))
 	{
 		QueryCompletion qc;
 
 		/* Inform the standby that XLOG streaming is done */
 		SetQueryCompletion(&qc, CMDTAG_COPY, 0);
 		EndCommand(&qc, DestRemote, false);
-		pq_flush();
+
+		/*
+		 * Flush pending data if writable.
+		 *
+		 * Note that the output buffer may be full in case of logical
+		 * replication. If pq_flush() is called at that time, the walsender
+		 * process will be stuck. Therefore, call pq_flush_if_writable()
+		 * instead.
+		 */
+		pq_flush_if_writable();
 
 		proc_exit(0);
 	}
-- 
2.27.0

