From 1a7007ee91df1db8912cde36b44a6f48d5b37b4b Mon Sep 17 00:00:00 2001
From: wangw <wangw.fnst@fujitsu.com>
Date: Mon, 7 Feb 2022 23:06:00 +0800
Subject: [PATCH] Fix the timeout of subscriber in long transactions.

We don't send keep-alive messages for a long time while processing large
transactions during logical replication where we don't send any data of such
transactions (say because the table modified in the transaction is not
published) and then subscriber will timeout. So in this case, send keep-alive
message to the subscriber.
---
 src/backend/replication/logical/logical.c   |  4 +-
 src/backend/replication/pgoutput/pgoutput.c | 63 +++++++++++++++++++--
 src/backend/replication/walsender.c         | 31 ++++++++--
 src/include/replication/logical.h           |  3 +-
 src/include/replication/output_plugin.h     |  2 +-
 5 files changed, 88 insertions(+), 15 deletions(-)

diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 9bc3a2d8de..89e745809b 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -672,12 +672,12 @@ OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
  * Update progress tracking (if supported).
  */
 void
-OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx)
+OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx, bool send_keep_alive)
 {
 	if (!ctx->update_progress)
 		return;
 
-	ctx->update_progress(ctx, ctx->write_location, ctx->write_xid);
+	ctx->update_progress(ctx, ctx->write_location, ctx->write_xid, send_keep_alive);
 }
 
 /*
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index af8d51aee9..a75720fe96 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -419,7 +419,7 @@ static void
 pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 					XLogRecPtr commit_lsn)
 {
-	OutputPluginUpdateProgress(ctx);
+	OutputPluginUpdateProgress(ctx, false);
 
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_commit(ctx->out, txn, commit_lsn);
@@ -450,7 +450,7 @@ static void
 pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 					 XLogRecPtr prepare_lsn)
 {
-	OutputPluginUpdateProgress(ctx);
+	OutputPluginUpdateProgress(ctx, false);
 
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
@@ -464,7 +464,7 @@ static void
 pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 							 XLogRecPtr commit_lsn)
 {
-	OutputPluginUpdateProgress(ctx);
+	OutputPluginUpdateProgress(ctx, false);
 
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn);
@@ -480,7 +480,7 @@ pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
 							   XLogRecPtr prepare_end_lsn,
 							   TimestampTz prepare_time)
 {
-	OutputPluginUpdateProgress(ctx);
+	OutputPluginUpdateProgress(ctx, false);
 
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn,
@@ -634,6 +634,15 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	RelationSyncEntry *relentry;
 	TransactionId xid = InvalidTransactionId;
 	Relation	ancestor = NULL;
+	static int skipped_changes_count = 0;
+	/*
+	 * Conservatively, at least 150,000 changes can be skipped in 1s.
+	 *
+	 * Because we use half of wal_sender_timeout as the threshold, and the unit
+	 * of wal_sender_timeout in process is ms, the final threshold is
+	 * wal_sender_timeout * 75.
+	 */
+	int skipped_changes_threshold = wal_sender_timeout * 75;
 
 	if (!is_publishable_relation(relation))
 		return;
@@ -654,20 +663,62 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	{
 		case REORDER_BUFFER_CHANGE_INSERT:
 			if (!relentry->pubactions.pubinsert)
+			{
+				if (++skipped_changes_count >= skipped_changes_threshold)
+				{
+					OutputPluginUpdateProgress(ctx, true);
+
+					/*
+					 * After sending keepalive message, reset
+					 * skipped_changes_count.
+					 */
+					skipped_changes_count = 0;
+				}
 				return;
+			}
 			break;
 		case REORDER_BUFFER_CHANGE_UPDATE:
 			if (!relentry->pubactions.pubupdate)
+			{
+				if (++skipped_changes_count >= skipped_changes_threshold)
+				{
+					OutputPluginUpdateProgress(ctx, true);
+
+					/*
+					 * After sending keepalive message, reset
+					 * skipped_changes_count.
+					 */
+					skipped_changes_count = 0;
+				}
 				return;
+			}
 			break;
 		case REORDER_BUFFER_CHANGE_DELETE:
 			if (!relentry->pubactions.pubdelete)
+			{
+				if (++skipped_changes_count >= skipped_changes_threshold)
+				{
+					OutputPluginUpdateProgress(ctx, true);
+
+					/*
+					 * After sending keepalive message, reset
+					 * skipped_changes_count.
+					 */
+					skipped_changes_count = 0;
+				}
 				return;
+			}
 			break;
 		default:
 			Assert(false);
 	}
 
+	/*
+	 * skipped_changes_count is reset when processing changes that do not need to
+	 * be skipped.
+	 */
+	skipped_changes_count = 0;
+
 	/* Avoid leaking memory by using and resetting our own context */
 	old = MemoryContextSwitchTo(data->context);
 
@@ -1011,7 +1062,7 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
 	Assert(!in_streaming);
 	Assert(rbtxn_is_streamed(txn));
 
-	OutputPluginUpdateProgress(ctx);
+	OutputPluginUpdateProgress(ctx, false);
 
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
@@ -1032,7 +1083,7 @@ pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
 {
 	Assert(rbtxn_is_streamed(txn));
 
-	OutputPluginUpdateProgress(ctx);
+	OutputPluginUpdateProgress(ctx, false);
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
 	OutputPluginWrite(ctx, true);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 655760fee3..69fd172a7c 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -248,7 +248,7 @@ static long WalSndComputeSleeptime(TimestampTz now);
 static void WalSndWait(uint32 socket_events, long timeout, uint32 wait_event);
 static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
 static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
-static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid);
+static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool send_keep_alive);
 static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
 static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
 static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
@@ -1446,24 +1446,45 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
  * LogicalDecodingContext 'update_progress' callback.
  *
  * Write the current position to the lag tracker (see XLogSendPhysical).
+ * If send_keep_alive is true, send keepalive message to standby.
  */
 static void
-WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
+WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool send_keep_alive)
 {
-	static TimestampTz sendTime = 0;
+	static TimestampTz trackTime = 0;
 	TimestampTz now = GetCurrentTimestamp();
 
+	if (send_keep_alive)
+	{
+		/*
+		 * If half of wal_sender_timeout has lapsed without send message standby,
+		 * send a keep-alive message to the standby.
+		 */
+		static TimestampTz sendTime = 0;
+		TimestampTz ping_time = TimestampTzPlusMilliseconds(sendTime,
+											wal_sender_timeout / 2);
+		if (now >= ping_time)
+		{
+			WalSndKeepalive(false);
+
+			/* Try to flush pending output to the client */
+			if (pq_flush_if_writable() != 0)
+				WalSndShutdown();
+			sendTime = now;
+		}
+	}
+
 	/*
 	 * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
 	 * avoid flooding the lag tracker when we commit frequently.
 	 */
 #define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS	1000
-	if (!TimestampDifferenceExceeds(sendTime, now,
+	if (!TimestampDifferenceExceeds(trackTime, now,
 									WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS))
 		return;
 
 	LagTrackerWrite(lsn, now);
-	sendTime = now;
+	trackTime = now;
 }
 
 /*
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index 1097cc9799..ae16068f52 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -26,7 +26,8 @@ typedef LogicalOutputPluginWriterWrite LogicalOutputPluginWriterPrepareWrite;
 
 typedef void (*LogicalOutputPluginWriterUpdateProgress) (struct LogicalDecodingContext *lr,
 														 XLogRecPtr Ptr,
-														 TransactionId xid
+														 TransactionId xid,
+														 bool send_keep_alive
 );
 
 typedef struct LogicalDecodingContext
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 41157fda7c..c29c4b13d3 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -243,6 +243,6 @@ typedef struct OutputPluginCallbacks
 /* Functions in replication/logical/logical.c */
 extern void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write);
 extern void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write);
-extern void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx);
+extern void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx, bool send_keep_alive);
 
 #endif							/* OUTPUT_PLUGIN_H */
-- 
2.18.4

