From 3421adeb9a89c54c4733ae1862b7ba18e129b797 Mon Sep 17 00:00:00 2001
From: wangw <wangw.fnst@fujitsu.com>
Date: Thu, 21 Apr 2022 09:27:20 +0800
Subject: [PATCH v18] Fix the logical replication timeout during large
 transactions.

The problem is that we don't send keep-alive messages for a long time
while processing large transactions during logical replication where we
don't send any data of such transactions. This can happen when the table
modified in the transaction is not published or because all the changes
got filtered. We do try to send the keep_alive if necessary at the end of
the transaction (via WalSndWriteData()) but by that time the
subscriber-side can timeout and exit.

To fix this we try to send the keepalive message if required after
processing certain threshold of changes.
---
 src/backend/replication/logical/logical.c   | 93 ++++++++++++++++++++-
 src/backend/replication/pgoutput/pgoutput.c |  6 ++
 src/backend/replication/walsender.c         | 27 +++++-
 src/include/replication/logical.h           |  5 ++
 4 files changed, 125 insertions(+), 6 deletions(-)

diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 788769dd73..3eb7d77582 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -669,17 +669,41 @@ OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
 }
 
 /*
- * Update progress tracking (if supported).
+ * Update progress tracking and try to send a keepalive message (if supported).
+ *
+ * For a large transaction, if we don't send any change to the downstream for a
+ * long time (exceeds the wal_receiver_timeout of standby) then it can timeout.
+ * This can happen when all or most of the changes are either not published or
+ * got filtered out.
  */
 void
 OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx,
 						   bool skipped_xact)
 {
+	static int	changes_count = 0;
+
 	if (!ctx->update_progress)
 		return;
 
-	ctx->update_progress(ctx, ctx->write_location, ctx->write_xid,
-						 skipped_xact);
+	/*
+	 * We don't want to try sending a keepalive message after processing each
+	 * change as that can have overhead. Tests revealed that there is no
+	 * noticeable overhead in doing it after continuously processing 100 or so
+	 * changes.
+	 */
+#define CHANGES_THRESHOLD 100
+
+	/*
+	 * If we are at the end of transaction LSN, update progress tracking.
+	 * Otherwise, after continuously processing CHANGES_THRESHOLD changes, we
+	 * try to send a keepalive message if required.
+	 */
+	if (ctx->end_xact || ++changes_count >= CHANGES_THRESHOLD)
+	{
+		ctx->update_progress(ctx, ctx->write_location, ctx->write_xid,
+							 skipped_xact);
+		changes_count = 0;
+	}
 }
 
 /*
@@ -747,6 +771,9 @@ startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool i
 	/* set output state */
 	ctx->accept_writes = false;
 
+	/* set lsn state */
+	ctx->end_xact = false;
+
 	/* do the actual work: call callback */
 	ctx->callbacks.startup_cb(ctx, opt, is_init);
 
@@ -774,6 +801,9 @@ shutdown_cb_wrapper(LogicalDecodingContext *ctx)
 	/* set output state */
 	ctx->accept_writes = false;
 
+	/* set lsn state */
+	ctx->end_xact = false;
+
 	/* do the actual work: call callback */
 	ctx->callbacks.shutdown_cb(ctx);
 
@@ -809,6 +839,9 @@ begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
 	ctx->write_xid = txn->xid;
 	ctx->write_location = txn->first_lsn;
 
+	/* set lsn state */
+	ctx->end_xact = false;
+
 	/* do the actual work: call callback */
 	ctx->callbacks.begin_cb(ctx, txn);
 
@@ -840,6 +873,9 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	ctx->write_xid = txn->xid;
 	ctx->write_location = txn->end_lsn; /* points to the end of the record */
 
+	/* set lsn state */
+	ctx->end_xact = true;
+
 	/* do the actual work: call callback */
 	ctx->callbacks.commit_cb(ctx, txn, commit_lsn);
 
@@ -880,6 +916,9 @@ begin_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
 	ctx->write_xid = txn->xid;
 	ctx->write_location = txn->first_lsn;
 
+	/* set lsn state */
+	ctx->end_xact = false;
+
 	/*
 	 * If the plugin supports two-phase commits then begin prepare callback is
 	 * mandatory
@@ -924,6 +963,9 @@ prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	ctx->write_xid = txn->xid;
 	ctx->write_location = txn->end_lsn; /* points to the end of the record */
 
+	/* set lsn state */
+	ctx->end_xact = true;
+
 	/*
 	 * If the plugin supports two-phase commits then prepare callback is
 	 * mandatory
@@ -968,6 +1010,9 @@ commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	ctx->write_xid = txn->xid;
 	ctx->write_location = txn->end_lsn; /* points to the end of the record */
 
+	/* set lsn state */
+	ctx->end_xact = true;
+
 	/*
 	 * If the plugin support two-phase commits then commit prepared callback
 	 * is mandatory
@@ -1013,6 +1058,9 @@ rollback_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	ctx->write_xid = txn->xid;
 	ctx->write_location = txn->end_lsn; /* points to the end of the record */
 
+	/* set lsn state */
+	ctx->end_xact = true;
+
 	/*
 	 * If the plugin support two-phase commits then rollback prepared callback
 	 * is mandatory
@@ -1062,6 +1110,9 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	 */
 	ctx->write_location = change->lsn;
 
+	/* set lsn state */
+	ctx->end_xact = false;
+
 	ctx->callbacks.change_cb(ctx, txn, relation, change);
 
 	/* Pop the error context stack */
@@ -1102,6 +1153,9 @@ truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	 */
 	ctx->write_location = change->lsn;
 
+	/* set lsn state */
+	ctx->end_xact = false;
+
 	ctx->callbacks.truncate_cb(ctx, txn, nrelations, relations, change);
 
 	/* Pop the error context stack */
@@ -1130,6 +1184,9 @@ filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, TransactionId xid,
 	/* set output state */
 	ctx->accept_writes = false;
 
+	/* set lsn state */
+	ctx->end_xact = false;
+
 	/* do the actual work: call callback */
 	ret = ctx->callbacks.filter_prepare_cb(ctx, xid, gid);
 
@@ -1160,6 +1217,9 @@ filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
 	/* set output state */
 	ctx->accept_writes = false;
 
+	/* set lsn state */
+	ctx->end_xact = false;
+
 	/* do the actual work: call callback */
 	ret = ctx->callbacks.filter_by_origin_cb(ctx, origin_id);
 
@@ -1197,6 +1257,9 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
 	ctx->write_location = message_lsn;
 
+	/* set lsn state */
+	ctx->end_xact = false;
+
 	/* do the actual work: call callback */
 	ctx->callbacks.message_cb(ctx, txn, message_lsn, transactional, prefix,
 							  message_size, message);
@@ -1239,6 +1302,9 @@ stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	 */
 	ctx->write_location = first_lsn;
 
+	/* set lsn state */
+	ctx->end_xact = false;
+
 	/* in streaming mode, stream_start_cb is required */
 	if (ctx->callbacks.stream_start_cb == NULL)
 		ereport(ERROR,
@@ -1286,6 +1352,9 @@ stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	 */
 	ctx->write_location = last_lsn;
 
+	/* set lsn state */
+	ctx->end_xact = false;
+
 	/* in streaming mode, stream_stop_cb is required */
 	if (ctx->callbacks.stream_stop_cb == NULL)
 		ereport(ERROR,
@@ -1326,6 +1395,9 @@ stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	ctx->write_xid = txn->xid;
 	ctx->write_location = abort_lsn;
 
+	/* set lsn state */
+	ctx->end_xact = true;
+
 	/* in streaming mode, stream_abort_cb is required */
 	if (ctx->callbacks.stream_abort_cb == NULL)
 		ereport(ERROR,
@@ -1370,6 +1442,9 @@ stream_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	ctx->write_xid = txn->xid;
 	ctx->write_location = txn->end_lsn;
 
+	/* set lsn state */
+	ctx->end_xact = true;
+
 	/* in streaming mode with two-phase commits, stream_prepare_cb is required */
 	if (ctx->callbacks.stream_prepare_cb == NULL)
 		ereport(ERROR,
@@ -1410,6 +1485,9 @@ stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	ctx->write_xid = txn->xid;
 	ctx->write_location = txn->end_lsn;
 
+	/* set lsn state */
+	ctx->end_xact = true;
+
 	/* in streaming mode, stream_commit_cb is required */
 	if (ctx->callbacks.stream_commit_cb == NULL)
 		ereport(ERROR,
@@ -1457,6 +1535,9 @@ stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	 */
 	ctx->write_location = change->lsn;
 
+	/* set lsn state */
+	ctx->end_xact = false;
+
 	/* in streaming mode, stream_change_cb is required */
 	if (ctx->callbacks.stream_change_cb == NULL)
 		ereport(ERROR,
@@ -1502,6 +1583,9 @@ stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
 	ctx->write_location = message_lsn;
 
+	/* set lsn state */
+	ctx->end_xact = false;
+
 	/* do the actual work: call callback */
 	ctx->callbacks.stream_message_cb(ctx, txn, message_lsn, transactional, prefix,
 									 message_size, message);
@@ -1549,6 +1633,9 @@ stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	 */
 	ctx->write_location = change->lsn;
 
+	/* set lsn state */
+	ctx->end_xact = false;
+
 	ctx->callbacks.stream_truncate_cb(ctx, txn, nrelations, relations, change);
 
 	/* Pop the error context stack */
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index b197bfd565..4e2531601f 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -1360,6 +1360,8 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	TupleTableSlot *old_slot = NULL;
 	TupleTableSlot *new_slot = NULL;
 
+	OutputPluginUpdateProgress(ctx, false);
+
 	if (!is_publishable_relation(relation))
 		return;
 
@@ -1592,6 +1594,8 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	Oid		   *relids;
 	TransactionId xid = InvalidTransactionId;
 
+	OutputPluginUpdateProgress(ctx, false);
+
 	/* Remember the xid for the change in streaming mode. See pgoutput_change. */
 	if (in_streaming)
 		xid = change->txn->xid;
@@ -1655,6 +1659,8 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
 	TransactionId xid = InvalidTransactionId;
 
+	OutputPluginUpdateProgress(ctx, false);
+
 	if (!data->messages)
 		return;
 
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 63a818140b..d9068ab591 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1475,6 +1475,9 @@ ProcessPendingWrites(void)
  * Write the current position to the lag tracker (see XLogSendPhysical).
  *
  * When skipping empty transactions, send a keepalive message if necessary.
+ *
+ * In logical replication, if too many changes are processed then try to send a
+ * keepalive message. It might avoid a timeout in the subscriber.
  */
 static void
 WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
@@ -1482,14 +1485,20 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId
 {
 	static TimestampTz sendTime = 0;
 	TimestampTz now = GetCurrentTimestamp();
+	bool	pending_writes = false,
+			end_xact = ctx->end_xact;
 
 	/*
 	 * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
 	 * avoid flooding the lag tracker when we commit frequently.
+	 *
+	 * We don't have a mechanism to get the ack for any LSN other than end xact
+	 * LSN from the downstream. So, we track lag only for end of transaction
+	 * LSN.
 	 */
 #define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS	1000
-	if (TimestampDifferenceExceeds(sendTime, now,
-								   WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS))
+	if (end_xact && TimestampDifferenceExceeds(sendTime, now,
+												 WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS))
 	{
 		LagTrackerWrite(lsn, now);
 		sendTime = now;
@@ -1515,8 +1524,20 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId
 
 		/* If we have pending write here, make sure it's actually flushed */
 		if (pq_is_send_pending())
-			ProcessPendingWrites();
+			pending_writes = true;
 	}
+
+	/*
+	 * Process pending writes if any or try to send a keepalive if required. We
+	 * don't need to try sending keep alive messages at the transaction end as
+	 * that will be done at a later point in time. This is required only for
+	 * large transactions where we don't send any changes to the downstream and
+	 * the receiver can timeout due to that.
+	 */
+	if (pending_writes || (!end_xact &&
+						   now >= TimestampTzPlusMilliseconds(last_reply_timestamp,
+															  wal_sender_timeout / 2)))
+		ProcessPendingWrites();
 }
 
 /*
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index a6ef16ad5b..0a2fdf498f 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -50,6 +50,11 @@ typedef struct LogicalDecodingContext
 	 */
 	bool		fast_forward;
 
+	/*
+	 * Is the end of transaction LSN?
+	 */
+	bool		end_xact;
+
 	OutputPluginCallbacks callbacks;
 	OutputPluginOptions options;
 
-- 
2.23.0.windows.1

