From ad251e3d796c185fba9ce77058c2f8d41219c478 Mon Sep 17 00:00:00 2001
From: wangw <wangw.fnst@fujitsu.com>
Date: Wed, 18 Jan 2023 13:45:32 +0800
Subject: [PATCH v6] Fix the logical replication timeout during processing of
 DDL.

When there is a DDL in a transaction that generates lots of temporary
data due to rewrite rules, this temporary data will not be processed
by the pgoutput plugin. This means it is possible for a timeout to
occur if a sufficiently long time elapses since the last pgoutput
message. A previous commit (f95d53e) fixed a similar scenario in this
area, but that only fixed timeouts for DML going through pgoutput, so
it did not address this DDL timeout case.

To fix this, we introduced a new ReorderBuffer callback -
'ReorderBufferUpdateProgressTxnCB'. This callback is called to try to update
the process whenever more than CHANGES_THRESHOLD changes are encountered while
sending data of a transaction (and its subtransactions) to the output plugin.
---
 src/backend/replication/logical/logical.c     | 60 +++++++++++++++++++
 .../replication/logical/reorderbuffer.c       | 19 ++++++
 src/backend/replication/pgoutput/pgoutput.c   | 54 ++---------------
 src/include/replication/reorderbuffer.h       | 12 ++++
 src/tools/pgindent/typedefs.list              |  1 +
 5 files changed, 98 insertions(+), 48 deletions(-)

diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 1a58dd7649..ad4523d419 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -93,6 +93,11 @@ static void stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *tx
 static void stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 									   int nrelations, Relation relations[], ReorderBufferChange *change);
 
+/* update progress txn callback */
+static void update_progress_txn_cb_wrapper(ReorderBuffer *cache,
+										   ReorderBufferTXN *txn,
+										   ReorderBufferChange *change);
+
 static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, const char *plugin);
 
 /*
@@ -278,6 +283,12 @@ StartupDecodingContext(List *output_plugin_options,
 	ctx->reorder->commit_prepared = commit_prepared_cb_wrapper;
 	ctx->reorder->rollback_prepared = rollback_prepared_cb_wrapper;
 
+	/*
+	 * Callback to support updating progress during sending data of a
+	 * transaction (and its subtransactions) to the output plugin.
+	 */
+	ctx->reorder->update_progress_txn = update_progress_txn_cb_wrapper;
+
 	ctx->out = makeStringInfo();
 	ctx->prepare_write = prepare_write;
 	ctx->write = do_write;
@@ -1584,6 +1595,55 @@ stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	error_context_stack = errcallback.previous;
 }
 
+/*
+ * Update progress callback while processing a transaction.
+ *
+ * Try send a keepalive message during transaction processing.
+ *
+ * This is done because if we don't send any change to the downstream for a
+ * long time (exceeds the wal_receiver_timeout of standby), then it can
+ * timeout. This can happen for large DDL, or for large transactions when all
+ * or most of the changes are either not published or got filtered out.
+ */
+static void
+update_progress_txn_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+							   ReorderBufferChange *change)
+{
+	LogicalDecodingContext *ctx = cache->private_data;
+	LogicalErrorCallbackState state;
+	ErrorContextCallback errcallback;
+
+	Assert(!ctx->fast_forward);
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "update_progress_txn";
+	state.report_location = change->lsn;
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* set output state */
+	ctx->accept_writes = false;
+	ctx->write_xid = txn->xid;
+
+	/*
+	 * Report this change's lsn so replies from clients can give an up-to-date
+	 * answer. This won't ever be enough (and shouldn't be!) to confirm
+	 * receipt of this transaction, but it might allow another transaction's
+	 * commit to be confirmed with one message.
+	 */
+	ctx->write_location = change->lsn;
+
+	ctx->end_xact = false;
+
+	OutputPluginUpdateProgress(ctx, false);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+}
+
 /*
  * Set the required catalog xmin horizon for historic snapshots in the current
  * replication slot.
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 54ee824e6c..781665a015 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -2130,6 +2130,12 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 			Relation	relation = NULL;
 			Oid			reloid;
 
+			/*
+			 * Static variable used to accumulate the number of changes while
+			 * processing txn.
+			 */
+			static int	changes_count = 0;
+
 			CHECK_FOR_INTERRUPTS();
 
 			/*
@@ -2446,6 +2452,19 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 					elog(ERROR, "tuplecid value in changequeue");
 					break;
 			}
+
+			/*
+			 * Sending keepalive messages after every change has some
+			 * overhead, but testing showed there is no noticeable overhead if
+			 * keepalive is only sent after every ~100 changes.
+			 */
+#define CHANGES_THRESHOLD 100
+
+			if (++changes_count >= CHANGES_THRESHOLD)
+			{
+				rb->update_progress_txn(rb, txn, change);
+				changes_count = 0;
+			}
 		}
 
 		/* speculative insertion record must be freed by now */
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 1a80d67bb9..0ba6b6b39c 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -92,8 +92,6 @@ static void send_relation_and_attrs(Relation relation, TransactionId xid,
 static void send_repl_origin(LogicalDecodingContext *ctx,
 							 RepOriginId origin_id, XLogRecPtr origin_lsn,
 							 bool send_origin);
-static void update_replication_progress(LogicalDecodingContext *ctx,
-										bool skipped_xact);
 
 /*
  * Only 3 publication actions are used for row filtering ("insert", "update",
@@ -586,7 +584,7 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	 * from this transaction has been sent to the downstream.
 	 */
 	sent_begin_txn = txndata->sent_begin_txn;
-	update_replication_progress(ctx, !sent_begin_txn);
+	OutputPluginUpdateProgress(ctx, !sent_begin_txn);
 	pfree(txndata);
 	txn->output_plugin_private = NULL;
 
@@ -625,7 +623,7 @@ static void
 pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 					 XLogRecPtr prepare_lsn)
 {
-	update_replication_progress(ctx, false);
+	OutputPluginUpdateProgress(ctx, false);
 
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
@@ -639,7 +637,7 @@ static void
 pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 							 XLogRecPtr commit_lsn)
 {
-	update_replication_progress(ctx, false);
+	OutputPluginUpdateProgress(ctx, false);
 
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn);
@@ -655,7 +653,7 @@ pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
 							   XLogRecPtr prepare_end_lsn,
 							   TimestampTz prepare_time)
 {
-	update_replication_progress(ctx, false);
+	OutputPluginUpdateProgress(ctx, false);
 
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn,
@@ -1401,8 +1399,6 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	TupleTableSlot *old_slot = NULL;
 	TupleTableSlot *new_slot = NULL;
 
-	update_replication_progress(ctx, false);
-
 	if (!is_publishable_relation(relation))
 		return;
 
@@ -1637,8 +1633,6 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	Oid		   *relids;
 	TransactionId xid = InvalidTransactionId;
 
-	update_replication_progress(ctx, false);
-
 	/* Remember the xid for the change in streaming mode. See pgoutput_change. */
 	if (in_streaming)
 		xid = change->txn->xid;
@@ -1702,8 +1696,6 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
 	TransactionId xid = InvalidTransactionId;
 
-	update_replication_progress(ctx, false);
-
 	if (!data->messages)
 		return;
 
@@ -1903,7 +1895,7 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
 	Assert(!in_streaming);
 	Assert(rbtxn_is_streamed(txn));
 
-	update_replication_progress(ctx, false);
+	OutputPluginUpdateProgress(ctx, false);
 
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
@@ -1924,7 +1916,7 @@ pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
 {
 	Assert(rbtxn_is_streamed(txn));
 
-	update_replication_progress(ctx, false);
+	OutputPluginUpdateProgress(ctx, false);
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
 	OutputPluginWrite(ctx, true);
@@ -2424,37 +2416,3 @@ send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id,
 		}
 	}
 }
-
-/*
- * Try to update progress and send a keepalive message if too many changes were
- * processed.
- *
- * For a large transaction, if we don't send any change to the downstream for a
- * long time (exceeds the wal_receiver_timeout of standby) then it can timeout.
- * This can happen when all or most of the changes are either not published or
- * got filtered out.
- */
-static void
-update_replication_progress(LogicalDecodingContext *ctx, bool skipped_xact)
-{
-	static int	changes_count = 0;
-
-	/*
-	 * We don't want to try sending a keepalive message after processing each
-	 * change as that can have overhead. Tests revealed that there is no
-	 * noticeable overhead in doing it after continuously processing 100 or so
-	 * changes.
-	 */
-#define CHANGES_THRESHOLD 100
-
-	/*
-	 * If we are at the end of transaction LSN, update progress tracking.
-	 * Otherwise, after continuously processing CHANGES_THRESHOLD changes, we
-	 * try to send a keepalive message if required.
-	 */
-	if (ctx->end_xact || ++changes_count >= CHANGES_THRESHOLD)
-	{
-		OutputPluginUpdateProgress(ctx, skipped_xact);
-		changes_count = 0;
-	}
-}
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index f6c4dd75db..3641f03326 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -525,6 +525,12 @@ typedef void (*ReorderBufferStreamTruncateCB) (
 											   Relation relations[],
 											   ReorderBufferChange *change);
 
+/* update progress txn callback signature */
+typedef void (*ReorderBufferUpdateProgressTxnCB) (
+												  ReorderBuffer *rb,
+												  ReorderBufferTXN *txn,
+												  ReorderBufferChange *change);
+
 struct ReorderBuffer
 {
 	/*
@@ -588,6 +594,12 @@ struct ReorderBuffer
 	ReorderBufferStreamMessageCB stream_message;
 	ReorderBufferStreamTruncateCB stream_truncate;
 
+	/*
+	 * Callback to be called when updating progress during sending data of a
+	 * transaction (and its subtransactions) to the output plugin.
+	 */
+	ReorderBufferUpdateProgressTxnCB update_progress_txn;
+
 	/*
 	 * Pointer that will be passed untouched to the callbacks.
 	 */
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 09316039e4..92f6e7c5ad 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2309,6 +2309,7 @@ ReorderBufferToastEnt
 ReorderBufferTupleBuf
 ReorderBufferTupleCidEnt
 ReorderBufferTupleCidKey
+ReorderBufferUpdateProgressTxnCB
 ReorderTuple
 RepOriginId
 ReparameterizeForeignPathByChild_function
-- 
2.23.0.windows.1

