From 75cfe5e958017e6db33bc3a31145765631eca3ca Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Wed, 8 Feb 2023 11:57:37 -0800
Subject: [PATCH v4] Rework LogicalOutputPluginWriterUpdateProgress

Currently whenever we need to add a callback to the output plugin, the output
plugin needs to confirm whether it needs to call WalSndUpdateProgress or not. As
the number of callbacks increases, this framework becomes less maintainable. So
we decided to improve this by calling WalSndUpdateProgress from the common code,
rather than in the output plugins.

We introduced LogicalDecodingContext->did_write to confirm whether the output
plugins actually sends data or not. We use this flag to check if we should send
a keep-alive message to the subscriber prevent delays, when an empty transaction
is skipped in synchronous logical replication. We also renamed the function
WalSndUpdateProgress to WalSndUpdateProgressAndKeepalive because it now not only
updates progress, but also sends keepalive messages to the subscriber if
necessary.

Since we can know whether the change is an end of transaction change in the
common code, we removed the LogicalDecodingContext->end_xact introduced in
commit f95d53e.

In commit 8c58624, in order to fix logical replication timeout during large
DDLs, we tried to send the keepalive message to the subscriber for all change
types in the function ReorderBufferProcessTXN. Now, we only handle the
timeout-related issues in the common change type (see function
is_skip_threshold_change).

BTW, fixed an error check for synchronization when skipping empty transactions
in function WalSndUpdateProgressAndKeepalive.
---
 src/backend/replication/logical/decode.c      |   3 +
 src/backend/replication/logical/logical.c     | 243 +++++++++++-------
 .../replication/logical/reorderbuffer.c       |  33 +--
 src/backend/replication/pgoutput/pgoutput.c   |  10 -
 src/backend/replication/walsender.c           |  49 ++--
 src/include/replication/logical.h             |  23 +-
 src/include/replication/output_plugin.h       |   1 -
 src/include/replication/reorderbuffer.h       |  12 -
 src/tools/pgindent/typedefs.list              |   2 +-
 9 files changed, 207 insertions(+), 169 deletions(-)

diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 8fe7bb65f1..53cfc13a93 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -666,6 +666,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 		}
 		ReorderBufferForget(ctx->reorder, xid, buf->origptr);
 
+		UpdateDecodingProgressAndKeepalive(ctx, xid, buf->endptr, true);
 		return;
 	}
 
@@ -763,6 +764,8 @@ DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 	{
 		ReorderBufferSkipPrepare(ctx->reorder, xid);
 		ReorderBufferInvalidate(ctx->reorder, xid, buf->origptr);
+
+		UpdateDecodingProgressAndKeepalive(ctx, xid, buf->endptr, true);
 		return;
 	}
 
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index c3ec97a0a6..730eafb32e 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -93,10 +93,11 @@ static void stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *tx
 static void stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 									   int nrelations, Relation relations[], ReorderBufferChange *change);
 
-/* callback to update txn's progress */
-static void update_progress_txn_cb_wrapper(ReorderBuffer *cache,
-										   ReorderBufferTXN *txn,
-										   XLogRecPtr lsn);
+
+static void UpdateProgressAndKeepalive(LogicalDecodingContext *ctx,
+									   bool finished_xact);
+
+static bool is_keepalive_threshold_exceeded(LogicalDecodingContext *ctx);
 
 static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, const char *plugin);
 
@@ -156,7 +157,7 @@ StartupDecodingContext(List *output_plugin_options,
 					   XLogReaderRoutine *xl_routine,
 					   LogicalOutputPluginWriterPrepareWrite prepare_write,
 					   LogicalOutputPluginWriterWrite do_write,
-					   LogicalOutputPluginWriterUpdateProgress update_progress)
+					   LogicalOutputPluginWriterUpdateProgressAndKeepalive update_progress_and_keepalive)
 {
 	ReplicationSlot *slot;
 	MemoryContext context,
@@ -283,16 +284,10 @@ StartupDecodingContext(List *output_plugin_options,
 	ctx->reorder->commit_prepared = commit_prepared_cb_wrapper;
 	ctx->reorder->rollback_prepared = rollback_prepared_cb_wrapper;
 
-	/*
-	 * Callback to support updating progress during sending data of a
-	 * transaction (and its subtransactions) to the output plugin.
-	 */
-	ctx->reorder->update_progress_txn = update_progress_txn_cb_wrapper;
-
 	ctx->out = makeStringInfo();
 	ctx->prepare_write = prepare_write;
 	ctx->write = do_write;
-	ctx->update_progress = update_progress;
+	ctx->update_progress_and_keepalive = update_progress_and_keepalive;
 
 	ctx->output_plugin_options = output_plugin_options;
 
@@ -316,7 +311,7 @@ StartupDecodingContext(List *output_plugin_options,
  *		marking WAL reserved beforehand.  In that scenario, it's up to the
  *		caller to guarantee that WAL remains available.
  * xl_routine -- XLogReaderRoutine for underlying XLogReader
- * prepare_write, do_write, update_progress --
+ * prepare_write, do_write, update_progress_and_keepalive --
  *		callbacks that perform the use-case dependent, actual, work.
  *
  * Needs to be called while in a memory context that's at least as long lived
@@ -334,7 +329,7 @@ CreateInitDecodingContext(const char *plugin,
 						  XLogReaderRoutine *xl_routine,
 						  LogicalOutputPluginWriterPrepareWrite prepare_write,
 						  LogicalOutputPluginWriterWrite do_write,
-						  LogicalOutputPluginWriterUpdateProgress update_progress)
+						  LogicalOutputPluginWriterUpdateProgressAndKeepalive update_progress_and_keepalive)
 {
 	TransactionId xmin_horizon = InvalidTransactionId;
 	ReplicationSlot *slot;
@@ -435,7 +430,7 @@ CreateInitDecodingContext(const char *plugin,
 	ctx = StartupDecodingContext(NIL, restart_lsn, xmin_horizon,
 								 need_full_snapshot, false,
 								 xl_routine, prepare_write, do_write,
-								 update_progress);
+								 update_progress_and_keepalive);
 
 	/* call output plugin initialization callback */
 	old_context = MemoryContextSwitchTo(ctx->context);
@@ -475,7 +470,7 @@ CreateInitDecodingContext(const char *plugin,
  * xl_routine
  *		XLogReaderRoutine used by underlying xlogreader
  *
- * prepare_write, do_write, update_progress
+ * prepare_write, do_write, update_progress_and_keepalive
  *		callbacks that have to be filled to perform the use-case dependent,
  *		actual work.
  *
@@ -493,7 +488,7 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 					  XLogReaderRoutine *xl_routine,
 					  LogicalOutputPluginWriterPrepareWrite prepare_write,
 					  LogicalOutputPluginWriterWrite do_write,
-					  LogicalOutputPluginWriterUpdateProgress update_progress)
+					  LogicalOutputPluginWriterUpdateProgressAndKeepalive update_progress_and_keepalive)
 {
 	LogicalDecodingContext *ctx;
 	ReplicationSlot *slot;
@@ -547,7 +542,7 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 	ctx = StartupDecodingContext(output_plugin_options,
 								 start_lsn, InvalidTransactionId, false,
 								 fast_forward, xl_routine, prepare_write,
-								 do_write, update_progress);
+								 do_write, update_progress_and_keepalive);
 
 	/* call output plugin initialization callback */
 	old_context = MemoryContextSwitchTo(ctx->context);
@@ -662,7 +657,8 @@ void
 OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
 {
 	if (!ctx->accept_writes)
-		elog(ERROR, "writes are only accepted in commit, begin and change callbacks");
+		elog(ERROR, "writes are only accepted in output plugin callbacks, "
+			 "except startup, shutdown, filter_by_origin, and filter_prepare.");
 
 	ctx->prepare_write(ctx, ctx->write_location, ctx->write_xid, last_write);
 	ctx->prepared_write = true;
@@ -679,20 +675,7 @@ OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
 
 	ctx->write(ctx, ctx->write_location, ctx->write_xid, last_write);
 	ctx->prepared_write = false;
-}
-
-/*
- * Update progress tracking (if supported).
- */
-void
-OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx,
-						   bool skipped_xact)
-{
-	if (!ctx->update_progress)
-		return;
-
-	ctx->update_progress(ctx, ctx->write_location, ctx->write_xid,
-						 skipped_xact);
+	ctx->did_write = true;
 }
 
 /*
@@ -759,7 +742,6 @@ startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool i
 
 	/* set output state */
 	ctx->accept_writes = false;
-	ctx->end_xact = false;
 
 	/* do the actual work: call callback */
 	ctx->callbacks.startup_cb(ctx, opt, is_init);
@@ -787,7 +769,6 @@ shutdown_cb_wrapper(LogicalDecodingContext *ctx)
 
 	/* set output state */
 	ctx->accept_writes = false;
-	ctx->end_xact = false;
 
 	/* do the actual work: call callback */
 	ctx->callbacks.shutdown_cb(ctx);
@@ -823,7 +804,7 @@ begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
 	ctx->accept_writes = true;
 	ctx->write_xid = txn->xid;
 	ctx->write_location = txn->first_lsn;
-	ctx->end_xact = false;
+	ctx->did_write = false;
 
 	/* do the actual work: call callback */
 	ctx->callbacks.begin_cb(ctx, txn);
@@ -855,13 +836,15 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	ctx->accept_writes = true;
 	ctx->write_xid = txn->xid;
 	ctx->write_location = txn->end_lsn; /* points to the end of the record */
-	ctx->end_xact = true;
+	ctx->did_write = false;
 
 	/* do the actual work: call callback */
 	ctx->callbacks.commit_cb(ctx, txn, commit_lsn);
 
 	/* Pop the error context stack */
 	error_context_stack = errcallback.previous;
+
+	UpdateProgressAndKeepalive(ctx, true);
 }
 
 /*
@@ -896,7 +879,7 @@ begin_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
 	ctx->accept_writes = true;
 	ctx->write_xid = txn->xid;
 	ctx->write_location = txn->first_lsn;
-	ctx->end_xact = false;
+	ctx->did_write = false;
 
 	/*
 	 * If the plugin supports two-phase commits then begin prepare callback is
@@ -941,7 +924,7 @@ prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	ctx->accept_writes = true;
 	ctx->write_xid = txn->xid;
 	ctx->write_location = txn->end_lsn; /* points to the end of the record */
-	ctx->end_xact = true;
+	ctx->did_write = false;
 
 	/*
 	 * If the plugin supports two-phase commits then prepare callback is
@@ -958,6 +941,8 @@ prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 
 	/* Pop the error context stack */
 	error_context_stack = errcallback.previous;
+
+	UpdateProgressAndKeepalive(ctx, true);
 }
 
 static void
@@ -986,7 +971,7 @@ commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	ctx->accept_writes = true;
 	ctx->write_xid = txn->xid;
 	ctx->write_location = txn->end_lsn; /* points to the end of the record */
-	ctx->end_xact = true;
+	ctx->did_write = false;
 
 	/*
 	 * If the plugin support two-phase commits then commit prepared callback
@@ -1003,6 +988,8 @@ commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 
 	/* Pop the error context stack */
 	error_context_stack = errcallback.previous;
+
+	UpdateProgressAndKeepalive(ctx, true);
 }
 
 static void
@@ -1032,11 +1019,13 @@ rollback_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	ctx->accept_writes = true;
 	ctx->write_xid = txn->xid;
 	ctx->write_location = txn->end_lsn; /* points to the end of the record */
-	ctx->end_xact = true;
+	ctx->did_write = false;
 
 	/*
 	 * If the plugin support two-phase commits then rollback prepared callback
 	 * is mandatory
+	 *
+	 * FIXME: This should have been caught much earlier.
 	 */
 	if (ctx->callbacks.rollback_prepared_cb == NULL)
 		ereport(ERROR,
@@ -1050,6 +1039,8 @@ rollback_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 
 	/* Pop the error context stack */
 	error_context_stack = errcallback.previous;
+
+	UpdateProgressAndKeepalive(ctx, true);
 }
 
 static void
@@ -1074,6 +1065,7 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	/* set output state */
 	ctx->accept_writes = true;
 	ctx->write_xid = txn->xid;
+	ctx->did_write = false;
 
 	/*
 	 * Report this change's lsn so replies from clients can give an up-to-date
@@ -1083,12 +1075,13 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	 */
 	ctx->write_location = change->lsn;
 
-	ctx->end_xact = false;
-
 	ctx->callbacks.change_cb(ctx, txn, relation, change);
 
 	/* Pop the error context stack */
 	error_context_stack = errcallback.previous;
+
+	if (is_keepalive_threshold_exceeded(ctx))
+		UpdateProgressAndKeepalive(ctx, false);
 }
 
 static void
@@ -1102,7 +1095,7 @@ truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	Assert(!ctx->fast_forward);
 
 	if (!ctx->callbacks.truncate_cb)
-		return;
+		goto out;
 
 	/* Push callback + info on the error context stack */
 	state.ctx = ctx;
@@ -1116,6 +1109,7 @@ truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	/* set output state */
 	ctx->accept_writes = true;
 	ctx->write_xid = txn->xid;
+	ctx->did_write = false;
 
 	/*
 	 * Report this change's lsn so replies from clients can give an up-to-date
@@ -1125,12 +1119,14 @@ truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	 */
 	ctx->write_location = change->lsn;
 
-	ctx->end_xact = false;
-
 	ctx->callbacks.truncate_cb(ctx, txn, nrelations, relations, change);
 
 	/* Pop the error context stack */
 	error_context_stack = errcallback.previous;
+
+out:
+	if (is_keepalive_threshold_exceeded(ctx))
+		UpdateProgressAndKeepalive(ctx, false);
 }
 
 bool
@@ -1154,7 +1150,6 @@ filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, TransactionId xid,
 
 	/* set output state */
 	ctx->accept_writes = false;
-	ctx->end_xact = false;
 
 	/* do the actual work: call callback */
 	ret = ctx->callbacks.filter_prepare_cb(ctx, xid, gid);
@@ -1185,7 +1180,6 @@ filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
 
 	/* set output state */
 	ctx->accept_writes = false;
-	ctx->end_xact = false;
 
 	/* do the actual work: call callback */
 	ret = ctx->callbacks.filter_by_origin_cb(ctx, origin_id);
@@ -1208,7 +1202,7 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	Assert(!ctx->fast_forward);
 
 	if (ctx->callbacks.message_cb == NULL)
-		return;
+		goto out;
 
 	/* Push callback + info on the error context stack */
 	state.ctx = ctx;
@@ -1223,7 +1217,7 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	ctx->accept_writes = true;
 	ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
 	ctx->write_location = message_lsn;
-	ctx->end_xact = false;
+	ctx->did_write = false;
 
 	/* do the actual work: call callback */
 	ctx->callbacks.message_cb(ctx, txn, message_lsn, transactional, prefix,
@@ -1231,6 +1225,10 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 
 	/* Pop the error context stack */
 	error_context_stack = errcallback.previous;
+
+out:
+	if (is_keepalive_threshold_exceeded(ctx))
+		UpdateProgressAndKeepalive(ctx, false);
 }
 
 static void
@@ -1258,6 +1256,7 @@ stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	/* set output state */
 	ctx->accept_writes = true;
 	ctx->write_xid = txn->xid;
+	ctx->did_write = false;
 
 	/*
 	 * Report this message's lsn so replies from clients can give an
@@ -1267,8 +1266,6 @@ stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	 */
 	ctx->write_location = first_lsn;
 
-	ctx->end_xact = false;
-
 	/* in streaming mode, stream_start_cb is required */
 	if (ctx->callbacks.stream_start_cb == NULL)
 		ereport(ERROR,
@@ -1280,6 +1277,8 @@ stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 
 	/* Pop the error context stack */
 	error_context_stack = errcallback.previous;
+
+	/* don't call update progress, we didn't really make any */
 }
 
 static void
@@ -1307,6 +1306,7 @@ stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	/* set output state */
 	ctx->accept_writes = true;
 	ctx->write_xid = txn->xid;
+	ctx->did_write = false;
 
 	/*
 	 * Report this message's lsn so replies from clients can give an
@@ -1316,8 +1316,6 @@ stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	 */
 	ctx->write_location = last_lsn;
 
-	ctx->end_xact = false;
-
 	/* in streaming mode, stream_stop_cb is required */
 	if (ctx->callbacks.stream_stop_cb == NULL)
 		ereport(ERROR,
@@ -1329,6 +1327,8 @@ stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 
 	/* Pop the error context stack */
 	error_context_stack = errcallback.previous;
+
+	/* don't call update progress, we didn't really make any */
 }
 
 static void
@@ -1357,7 +1357,7 @@ stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	ctx->accept_writes = true;
 	ctx->write_xid = txn->xid;
 	ctx->write_location = abort_lsn;
-	ctx->end_xact = true;
+	ctx->did_write = false;
 
 	/* in streaming mode, stream_abort_cb is required */
 	if (ctx->callbacks.stream_abort_cb == NULL)
@@ -1370,6 +1370,8 @@ stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 
 	/* Pop the error context stack */
 	error_context_stack = errcallback.previous;
+
+	UpdateProgressAndKeepalive(ctx, (txn->toptxn == NULL));
 }
 
 static void
@@ -1402,7 +1404,7 @@ stream_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	ctx->accept_writes = true;
 	ctx->write_xid = txn->xid;
 	ctx->write_location = txn->end_lsn;
-	ctx->end_xact = true;
+	ctx->did_write = false;
 
 	/* in streaming mode with two-phase commits, stream_prepare_cb is required */
 	if (ctx->callbacks.stream_prepare_cb == NULL)
@@ -1415,6 +1417,8 @@ stream_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 
 	/* Pop the error context stack */
 	error_context_stack = errcallback.previous;
+
+	UpdateProgressAndKeepalive(ctx, true);
 }
 
 static void
@@ -1443,7 +1447,7 @@ stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	ctx->accept_writes = true;
 	ctx->write_xid = txn->xid;
 	ctx->write_location = txn->end_lsn;
-	ctx->end_xact = true;
+	ctx->did_write = false;
 
 	/* in streaming mode, stream_commit_cb is required */
 	if (ctx->callbacks.stream_commit_cb == NULL)
@@ -1456,6 +1460,8 @@ stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 
 	/* Pop the error context stack */
 	error_context_stack = errcallback.previous;
+
+	UpdateProgressAndKeepalive(ctx, true);
 }
 
 static void
@@ -1483,6 +1489,7 @@ stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	/* set output state */
 	ctx->accept_writes = true;
 	ctx->write_xid = txn->xid;
+	ctx->did_write = false;
 
 	/*
 	 * Report this change's lsn so replies from clients can give an up-to-date
@@ -1492,8 +1499,6 @@ stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	 */
 	ctx->write_location = change->lsn;
 
-	ctx->end_xact = false;
-
 	/* in streaming mode, stream_change_cb is required */
 	if (ctx->callbacks.stream_change_cb == NULL)
 		ereport(ERROR,
@@ -1505,6 +1510,9 @@ stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 
 	/* Pop the error context stack */
 	error_context_stack = errcallback.previous;
+
+	if (is_keepalive_threshold_exceeded(ctx))
+		UpdateProgressAndKeepalive(ctx, false);
 }
 
 static void
@@ -1523,7 +1531,7 @@ stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 
 	/* this callback is optional */
 	if (ctx->callbacks.stream_message_cb == NULL)
-		return;
+		goto out;
 
 	/* Push callback + info on the error context stack */
 	state.ctx = ctx;
@@ -1538,7 +1546,7 @@ stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	ctx->accept_writes = true;
 	ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
 	ctx->write_location = message_lsn;
-	ctx->end_xact = false;
+	ctx->did_write = false;
 
 	/* do the actual work: call callback */
 	ctx->callbacks.stream_message_cb(ctx, txn, message_lsn, transactional, prefix,
@@ -1546,6 +1554,10 @@ stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 
 	/* Pop the error context stack */
 	error_context_stack = errcallback.previous;
+
+out:
+	if (is_keepalive_threshold_exceeded(ctx))
+		UpdateProgressAndKeepalive(ctx, false);
 }
 
 static void
@@ -1564,7 +1576,7 @@ stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 
 	/* this callback is optional */
 	if (!ctx->callbacks.stream_truncate_cb)
-		return;
+		goto out;
 
 	/* Push callback + info on the error context stack */
 	state.ctx = ctx;
@@ -1578,6 +1590,7 @@ stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	/* set output state */
 	ctx->accept_writes = true;
 	ctx->write_xid = txn->xid;
+	ctx->did_write = false;
 
 	/*
 	 * Report this change's lsn so replies from clients can give an up-to-date
@@ -1587,51 +1600,72 @@ stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	 */
 	ctx->write_location = change->lsn;
 
-	ctx->end_xact = false;
-
 	ctx->callbacks.stream_truncate_cb(ctx, txn, nrelations, relations, change);
 
 	/* Pop the error context stack */
 	error_context_stack = errcallback.previous;
+
+out:
+	if (is_keepalive_threshold_exceeded(ctx))
+		UpdateProgressAndKeepalive(ctx, false);
 }
 
-static void
-update_progress_txn_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
-							   XLogRecPtr lsn)
+/*
+ * Helper function to check whether a large number of changes have been skipped
+ * continuously.
+ */
+static bool
+is_keepalive_threshold_exceeded(LogicalDecodingContext *ctx)
 {
-	LogicalDecodingContext *ctx = cache->private_data;
-	LogicalErrorCallbackState state;
-	ErrorContextCallback errcallback;
-
-	Assert(!ctx->fast_forward);
-
-	/* Push callback + info on the error context stack */
-	state.ctx = ctx;
-	state.callback_name = "update_progress_txn";
-	state.report_location = lsn;
-	errcallback.callback = output_plugin_error_callback;
-	errcallback.arg = (void *) &state;
-	errcallback.previous = error_context_stack;
-	error_context_stack = &errcallback;
+	/*
+	 * The counter for accumulating the number of consecutively skipped
+	 * changes.
+	 *
+	 * XXX This counter is not reset at the end of a transaction. The worst
+	 * case this leads to is a delay in sending the keepalive message for the
+	 * next transaction. But testing shows that using CHANGES_THRESHOLD (see
+	 * below) is safe enough.
+	 */
+	static int	changes_count = 0;
 
-	/* set output state */
-	ctx->accept_writes = false;
-	ctx->write_xid = txn->xid;
+	/* If the change was published, reset the counter and return false */
+	if (ctx->did_write)
+	{
+		changes_count = 0;
+		return false;
+	}
 
 	/*
-	 * Report this change's lsn so replies from clients can give an up-to-date
-	 * answer. This won't ever be enough (and shouldn't be!) to confirm
-	 * receipt of this transaction, but it might allow another transaction's
-	 * commit to be confirmed with one message.
+	 * It is possible that the data is not sent to downstream for a long time
+	 * either because the output plugin filtered it or there is a DDL that
+	 * generates a lot of data that is not processed by the plugin. So, in
+	 * such cases, the downstream can timeout. To avoid that we try to send a
+	 * keepalive message if required.  Trying to send a keepalive message
+	 * after every change has some overhead, but testing showed there is no
+	 * noticeable overhead if we do it after every ~100 changes.
 	 */
-	ctx->write_location = lsn;
+#define CHANGES_THRESHOLD 100
+	if (++changes_count >= CHANGES_THRESHOLD)
+	{
+		changes_count = 0;
+		return true;
+	}
 
-	ctx->end_xact = false;
+	return false;
+}
 
-	OutputPluginUpdateProgress(ctx, false);
+/*
+ * Update progress tracking and send keep alive (if required).
+ */
+static void
+UpdateProgressAndKeepalive(LogicalDecodingContext *ctx, bool finished_xact)
+{
+	if (!ctx->update_progress_and_keepalive)
+		return;
 
-	/* Pop the error context stack */
-	error_context_stack = errcallback.previous;
+	ctx->update_progress_and_keepalive(ctx, ctx->write_location,
+									   ctx->write_xid, ctx->did_write,
+									   finished_xact);
 }
 
 /*
@@ -1922,3 +1956,26 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
 	rb->totalTxns = 0;
 	rb->totalBytes = 0;
 }
+
+/*
+ * UpdateDecodingProgressAndKeepalive
+ *
+ * During the logical decoding process, when no data is sent to the subscriber
+ * due to skipping the entire transaction or processing unlogged table data
+ * and temporary data, try to update progress and send a keepalive message.
+ */
+void
+UpdateDecodingProgressAndKeepalive(LogicalDecodingContext *ctx,
+								   TransactionId xid,
+								   XLogRecPtr lsn,
+								   bool finished_xact)
+{
+	/* set output state */
+	ctx->accept_writes = false;
+	ctx->write_xid = xid;
+	ctx->write_location = lsn;
+	ctx->did_write = false;
+
+	if (finished_xact || is_keepalive_threshold_exceeded(ctx))
+		UpdateProgressAndKeepalive(ctx, finished_xact);
+}
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 2d17c551a8..161531db0e 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -2110,8 +2110,6 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 	PG_TRY();
 	{
 		ReorderBufferChange *change;
-		int			changes_count = 0;	/* used to accumulate the number of
-										 * changes */
 
 		if (using_subtxn)
 			BeginInternalSubTransaction(streaming ? "stream" : "replay");
@@ -2226,14 +2224,24 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 										 MAIN_FORKNUM));
 
 					if (!RelationIsLogicallyLogged(relation))
+					{
+						UpdateDecodingProgressAndKeepalive((LogicalDecodingContext *) rb->private_data,
+														   txn->xid,
+														   change->lsn, false);
 						goto change_done;
+					}
 
 					/*
 					 * Ignore temporary heaps created during DDL unless the
 					 * plugin has asked for them.
 					 */
 					if (relation->rd_rel->relrewrite && !rb->output_rewrites)
+					{
+						UpdateDecodingProgressAndKeepalive((LogicalDecodingContext *) rb->private_data,
+														   txn->xid,
+														   change->lsn, false);
 						goto change_done;
+					}
 
 					/*
 					 * For now ignore sequence changes entirely. Most of the
@@ -2452,24 +2460,6 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 					elog(ERROR, "tuplecid value in changequeue");
 					break;
 			}
-
-			/*
-			 * It is possible that the data is not sent to downstream for a
-			 * long time either because the output plugin filtered it or there
-			 * is a DDL that generates a lot of data that is not processed by
-			 * the plugin. So, in such cases, the downstream can timeout. To
-			 * avoid that we try to send a keepalive message if required.
-			 * Trying to send a keepalive message after every change has some
-			 * overhead, but testing showed there is no noticeable overhead if
-			 * we do it after every ~100 changes.
-			 */
-#define CHANGES_THRESHOLD 100
-
-			if (++changes_count >= CHANGES_THRESHOLD)
-			{
-				rb->update_progress_txn(rb, txn, change->lsn);
-				changes_count = 0;
-			}
 		}
 
 		/* speculative insertion record must be freed by now */
@@ -2926,6 +2916,9 @@ ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
 			ReorderBufferImmediateInvalidation(rb, txn->ninvalidations,
 											   txn->invalidations);
 	}
+	else
+		UpdateDecodingProgressAndKeepalive((LogicalDecodingContext *)rb->private_data,
+										   xid, lsn, (txn->toptxn == NULL));
 
 	/* cosmetic... */
 	txn->final_lsn = lsn;
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 0df1acbb7a..5cc92a24ad 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -592,7 +592,6 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	 * from this transaction has been sent to the downstream.
 	 */
 	sent_begin_txn = txndata->sent_begin_txn;
-	OutputPluginUpdateProgress(ctx, !sent_begin_txn);
 	pfree(txndata);
 	txn->output_plugin_private = NULL;
 
@@ -631,8 +630,6 @@ static void
 pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 					 XLogRecPtr prepare_lsn)
 {
-	OutputPluginUpdateProgress(ctx, false);
-
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
 	OutputPluginWrite(ctx, true);
@@ -645,8 +642,6 @@ static void
 pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 							 XLogRecPtr commit_lsn)
 {
-	OutputPluginUpdateProgress(ctx, false);
-
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn);
 	OutputPluginWrite(ctx, true);
@@ -661,8 +656,6 @@ pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
 							   XLogRecPtr prepare_end_lsn,
 							   TimestampTz prepare_time)
 {
-	OutputPluginUpdateProgress(ctx, false);
-
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn,
 									   prepare_time);
@@ -1903,8 +1896,6 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
 	Assert(!in_streaming);
 	Assert(rbtxn_is_streamed(txn));
 
-	OutputPluginUpdateProgress(ctx, false);
-
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
 	OutputPluginWrite(ctx, true);
@@ -1924,7 +1915,6 @@ pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
 {
 	Assert(rbtxn_is_streamed(txn));
 
-	OutputPluginUpdateProgress(ctx, false);
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
 	OutputPluginWrite(ctx, true);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 75e8363e24..6d00a54ccc 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -242,7 +242,7 @@ static void ProcessStandbyMessage(void);
 static void ProcessStandbyReplyMessage(void);
 static void ProcessStandbyHSFeedbackMessage(void);
 static void ProcessRepliesIfAny(void);
-static void ProcessPendingWrites(void);
+static void WalSndSendPending(void);
 static void WalSndKeepalive(bool requestReply, XLogRecPtr writePtr);
 static void WalSndKeepaliveIfNecessary(void);
 static void WalSndCheckTimeOut(void);
@@ -250,8 +250,11 @@ static long WalSndComputeSleeptime(TimestampTz now);
 static void WalSndWait(uint32 socket_events, long timeout, uint32 wait_event);
 static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
 static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
-static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
-								 bool skipped_xact);
+static void WalSndUpdateProgressAndKeepalive(LogicalDecodingContext *ctx,
+											 XLogRecPtr lsn,
+											 TransactionId xid,
+											 bool did_write,
+											 bool finished_xact);
 static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
 static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
 static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
@@ -1126,7 +1129,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 												   .segment_open = WalSndSegmentOpen,
 												   .segment_close = wal_segment_close),
 										WalSndPrepareWrite, WalSndWriteData,
-										WalSndUpdateProgress);
+										WalSndUpdateProgressAndKeepalive);
 
 		/*
 		 * Signal that we don't need the timeout mechanism. We're just
@@ -1285,7 +1288,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 										 .segment_open = WalSndSegmentOpen,
 										 .segment_close = wal_segment_close),
 							  WalSndPrepareWrite, WalSndWriteData,
-							  WalSndUpdateProgress);
+							  WalSndUpdateProgressAndKeepalive);
 	xlogreader = logical_decoding_ctx->reader;
 
 	WalSndSetState(WALSNDSTATE_CATCHUP);
@@ -1394,15 +1397,16 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
 		WalSndShutdown();
 
 	/* Try taking fast path unless we get too close to walsender timeout. */
-	if (now < TimestampTzPlusMilliseconds(last_reply_timestamp,
-										  wal_sender_timeout / 2) &&
+	if ((wal_sender_timeout <= 0 ||
+		 now < TimestampTzPlusMilliseconds(last_reply_timestamp,
+										  wal_sender_timeout / 2)) &&
 		!pq_is_send_pending())
 	{
 		return;
 	}
 
 	/* If we have pending write here, go to slow path */
-	ProcessPendingWrites();
+	WalSndSendPending();
 }
 
 /*
@@ -1410,7 +1414,7 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
  * side and check timeouts during that.
  */
 static void
-ProcessPendingWrites(void)
+WalSndSendPending(void)
 {
 	for (;;)
 	{
@@ -1457,20 +1461,21 @@ ProcessPendingWrites(void)
 }
 
 /*
- * LogicalDecodingContext 'update_progress' callback.
+ * LogicalDecodingContext 'update_progress_and_keepalive' callback.
  *
  * Write the current position to the lag tracker (see XLogSendPhysical).
  *
- * When skipping empty transactions, send a keepalive message if necessary.
+ * When a transaction is skipped or the data is not sent to subscriber for a
+ * long time, send a keepalive message if necessary.
  */
 static void
-WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
-					 bool skipped_xact)
+WalSndUpdateProgressAndKeepalive(LogicalDecodingContext *ctx, XLogRecPtr lsn,
+								 TransactionId xid, bool did_write,
+								 bool finished_xact)
 {
 	static TimestampTz sendTime = 0;
 	TimestampTz now = GetCurrentTimestamp();
 	bool		pending_writes = false;
-	bool		end_xact = ctx->end_xact;
 
 	/*
 	 * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
@@ -1481,8 +1486,9 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId
 	 * transaction LSN.
 	 */
 #define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS	1000
-	if (end_xact && TimestampDifferenceExceeds(sendTime, now,
-											   WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS))
+	if (finished_xact &&
+		TimestampDifferenceExceeds(sendTime, now,
+								   WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS))
 	{
 		LagTrackerWrite(lsn, now);
 		sendTime = now;
@@ -1496,8 +1502,7 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId
 	 * the worst case we will just send an extra keepalive message when it is
 	 * really not required.
 	 */
-	if (skipped_xact &&
-		SyncRepRequested() &&
+	if (finished_xact && !did_write &&
 		((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_defined)
 	{
 		WalSndKeepalive(false, lsn);
@@ -1518,10 +1523,10 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId
 	 * for large transactions where we don't send any changes to the
 	 * downstream and the receiver can timeout due to that.
 	 */
-	if (pending_writes || (!end_xact &&
+	if (pending_writes || (!finished_xact && wal_sender_timeout > 0 &&
 						   now >= TimestampTzPlusMilliseconds(last_reply_timestamp,
 															  wal_sender_timeout / 2)))
-		ProcessPendingWrites();
+		WalSndSendPending();
 }
 
 /*
@@ -3054,8 +3059,8 @@ XLogSendLogical(void)
 	{
 		/*
 		 * Note the lack of any call to LagTrackerWrite() which is handled by
-		 * WalSndUpdateProgress which is called by output plugin through
-		 * logical decoding write api.
+		 * WalSndUpdateProgressAndKeepalive which is called by output plugin
+		 * through logical decoding write api.
 		 */
 		LogicalDecodingProcessRecord(logical_decoding_ctx, logical_decoding_ctx->reader);
 
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index 5f49554ea0..cf9acd1fc7 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -24,11 +24,11 @@ typedef void (*LogicalOutputPluginWriterWrite) (struct LogicalDecodingContext *l
 
 typedef LogicalOutputPluginWriterWrite LogicalOutputPluginWriterPrepareWrite;
 
-typedef void (*LogicalOutputPluginWriterUpdateProgress) (struct LogicalDecodingContext *lr,
-														 XLogRecPtr Ptr,
-														 TransactionId xid,
-														 bool skipped_xact
-);
+typedef void (*LogicalOutputPluginWriterUpdateProgressAndKeepalive) (struct LogicalDecodingContext *lr,
+																	 XLogRecPtr Ptr,
+																	 TransactionId xid,
+																	 bool did_write,
+																	 bool finished_xact);
 
 typedef struct LogicalDecodingContext
 {
@@ -63,7 +63,7 @@ typedef struct LogicalDecodingContext
 	 */
 	LogicalOutputPluginWriterPrepareWrite prepare_write;
 	LogicalOutputPluginWriterWrite write;
-	LogicalOutputPluginWriterUpdateProgress update_progress;
+	LogicalOutputPluginWriterUpdateProgressAndKeepalive update_progress_and_keepalive;
 
 	/*
 	 * Output buffer.
@@ -105,10 +105,9 @@ typedef struct LogicalDecodingContext
 	 */
 	bool		accept_writes;
 	bool		prepared_write;
+	bool		did_write;
 	XLogRecPtr	write_location;
 	TransactionId write_xid;
-	/* Are we processing the end LSN of a transaction? */
-	bool		end_xact;
 } LogicalDecodingContext;
 
 
@@ -121,14 +120,14 @@ extern LogicalDecodingContext *CreateInitDecodingContext(const char *plugin,
 														 XLogReaderRoutine *xl_routine,
 														 LogicalOutputPluginWriterPrepareWrite prepare_write,
 														 LogicalOutputPluginWriterWrite do_write,
-														 LogicalOutputPluginWriterUpdateProgress update_progress);
+														 LogicalOutputPluginWriterUpdateProgressAndKeepalive update_progress_and_keepalive);
 extern LogicalDecodingContext *CreateDecodingContext(XLogRecPtr start_lsn,
 													 List *output_plugin_options,
 													 bool fast_forward,
 													 XLogReaderRoutine *xl_routine,
 													 LogicalOutputPluginWriterPrepareWrite prepare_write,
 													 LogicalOutputPluginWriterWrite do_write,
-													 LogicalOutputPluginWriterUpdateProgress update_progress);
+													 LogicalOutputPluginWriterUpdateProgressAndKeepalive update_progress_and_keepalive);
 extern void DecodingContextFindStartpoint(LogicalDecodingContext *ctx);
 extern bool DecodingContextReady(LogicalDecodingContext *ctx);
 extern void FreeDecodingContext(LogicalDecodingContext *ctx);
@@ -144,5 +143,9 @@ extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx,
 extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id);
 extern void ResetLogicalStreamingState(void);
 extern void UpdateDecodingStats(LogicalDecodingContext *ctx);
+extern void UpdateDecodingProgressAndKeepalive(LogicalDecodingContext *ctx,
+											   TransactionId xid,
+											   XLogRecPtr lsn,
+											   bool finished_xact);
 
 #endif
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 2d89d26586..b9358e1544 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -245,6 +245,5 @@ typedef struct OutputPluginCallbacks
 /* Functions in replication/logical/logical.c */
 extern void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write);
 extern void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write);
-extern void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx, bool skipped_xact);
 
 #endif							/* OUTPUT_PLUGIN_H */
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 215d1494e9..e5db041df1 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -526,12 +526,6 @@ typedef void (*ReorderBufferStreamTruncateCB) (
 											   Relation relations[],
 											   ReorderBufferChange *change);
 
-/* update progress txn callback signature */
-typedef void (*ReorderBufferUpdateProgressTxnCB) (
-												  ReorderBuffer *rb,
-												  ReorderBufferTXN *txn,
-												  XLogRecPtr lsn);
-
 struct ReorderBuffer
 {
 	/*
@@ -595,12 +589,6 @@ struct ReorderBuffer
 	ReorderBufferStreamMessageCB stream_message;
 	ReorderBufferStreamTruncateCB stream_truncate;
 
-	/*
-	 * Callback to be called when updating progress during sending data of a
-	 * transaction (and its subtransactions) to the output plugin.
-	 */
-	ReorderBufferUpdateProgressTxnCB update_progress_txn;
-
 	/*
 	 * Pointer that will be passed untouched to the callbacks.
 	 */
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 86a9303bf5..53fd0fe621 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1469,7 +1469,7 @@ LogicalDecodingContext
 LogicalErrorCallbackState
 LogicalOutputPluginInit
 LogicalOutputPluginWriterPrepareWrite
-LogicalOutputPluginWriterUpdateProgress
+LogicalOutputPluginWriterUpdateProgressAndKeepalive
 LogicalOutputPluginWriterWrite
 LogicalRepBeginData
 LogicalRepCommitData
-- 
2.39.1.windows.1

