From 099a56985968be9a18f46dce66b4ceada571183d Mon Sep 17 00:00:00 2001
From: Dave Cramer <davecramer@gmail.com>
Date: Tue, 24 Jul 2018 10:03:34 -0400
Subject: [PATCH 2/4] Client-initiated CopyDone during transaction decoding in 
 walsender

---
 src/backend/replication/logical/logical.c       | 14 ++++++---
 src/backend/replication/logical/logicalfuncs.c  |  2 +-
 src/backend/replication/logical/reorderbuffer.c | 11 +++++--
 src/backend/replication/slotfuncs.c             |  4 +--
 src/backend/replication/walsender.c             | 42 ++++++++++++++++---------
 src/include/replication/logical.h               |  6 ++--
 src/include/replication/reorderbuffer.h         | 13 ++++++++
 7 files changed, 65 insertions(+), 27 deletions(-)

diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 3cd4eef..2dc02dc 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -121,7 +121,8 @@ StartupDecodingContext(List *output_plugin_options,
 					   XLogPageReadCB read_page,
 					   LogicalOutputPluginWriterPrepareWrite prepare_write,
 					   LogicalOutputPluginWriterWrite do_write,
-					   LogicalOutputPluginWriterUpdateProgress update_progress)
+					   LogicalOutputPluginWriterUpdateProgress update_progress,
+					   ContinueDecodingCB continue_decoding_cb)
 {
 	ReplicationSlot *slot;
 	MemoryContext context,
@@ -188,6 +189,7 @@ StartupDecodingContext(List *output_plugin_options,
 	ctx->reorder->apply_truncate = truncate_cb_wrapper;
 	ctx->reorder->commit = commit_cb_wrapper;
 	ctx->reorder->message = message_cb_wrapper;
+	ctx->reorder->continue_decoding_cb = continue_decoding_cb;
 
 	ctx->out = makeStringInfo();
 	ctx->prepare_write = prepare_write;
@@ -226,7 +228,8 @@ CreateInitDecodingContext(char *plugin,
 						  XLogPageReadCB read_page,
 						  LogicalOutputPluginWriterPrepareWrite prepare_write,
 						  LogicalOutputPluginWriterWrite do_write,
-						  LogicalOutputPluginWriterUpdateProgress update_progress)
+						  LogicalOutputPluginWriterUpdateProgress update_progress,
+						  ContinueDecodingCB continue_decoding_cb)
 {
 	TransactionId xmin_horizon = InvalidTransactionId;
 	ReplicationSlot *slot;
@@ -314,7 +317,7 @@ CreateInitDecodingContext(char *plugin,
 	ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon,
 								 need_full_snapshot, true,
 								 read_page, prepare_write, do_write,
-								 update_progress);
+								 update_progress, continue_decoding_cb);
 
 	/* call output plugin initialization callback */
 	old_context = MemoryContextSwitchTo(ctx->context);
@@ -361,7 +364,8 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 					  XLogPageReadCB read_page,
 					  LogicalOutputPluginWriterPrepareWrite prepare_write,
 					  LogicalOutputPluginWriterWrite do_write,
-					  LogicalOutputPluginWriterUpdateProgress update_progress)
+					  LogicalOutputPluginWriterUpdateProgress update_progress,
+					  ContinueDecodingCB continue_decoding_cb)
 {
 	LogicalDecodingContext *ctx;
 	ReplicationSlot *slot;
@@ -412,7 +416,7 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 	ctx = StartupDecodingContext(output_plugin_options,
 								 start_lsn, InvalidTransactionId, false,
 								 fast_forward, read_page, prepare_write,
-								 do_write, update_progress);
+								 do_write, update_progress, continue_decoding_cb);
 
 	/* call output plugin initialization callback */
 	old_context = MemoryContextSwitchTo(ctx->context);
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index 45aae71..3d8e61d 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -254,7 +254,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 									false,
 									logical_read_local_xlog_page,
 									LogicalOutputPrepareWrite,
-									LogicalOutputWrite, NULL);
+									LogicalOutputWrite, NULL, NULL);
 
 		MemoryContextSwitchTo(oldcontext);
 
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 9b55b94..aa024fa 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -1460,7 +1460,9 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 		rb->begin(rb, txn);
 
 		iterstate = ReorderBufferIterTXNInit(rb, txn);
-		while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
+		while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL &&
+			   (rb->continue_decoding_cb == NULL ||
+				rb->continue_decoding_cb()))
 		{
 			Relation	relation = NULL;
 			Oid			reloid;
@@ -1727,8 +1729,11 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 		ReorderBufferIterTXNFinish(rb, iterstate);
 		iterstate = NULL;
 
-		/* call commit callback */
-		rb->commit(rb, txn, commit_lsn);
+		if (rb->continue_decoding_cb == NULL || rb->continue_decoding_cb())
+		{
+			/* call commit callback */
+			rb->commit(rb, txn, commit_lsn);
+		}
 
 		/* this is just a sanity check against bad output plugin behaviour */
 		if (GetCurrentTransactionIdIfAny() != InvalidTransactionId)
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 8782bad..82e8f37 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -137,7 +137,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
 	ctx = CreateInitDecodingContext(NameStr(*plugin), NIL,
 									false,	/* do not build snapshot */
 									logical_read_local_xlog_page, NULL, NULL,
-									NULL);
+									NULL, NULL);
 
 	/* build initial snapshot, might take a while */
 	DecodingContextFindStartpoint(ctx);
@@ -370,7 +370,7 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto)
 									NIL,
 									true,	/* fast_forward */
 									logical_read_local_xlog_page,
-									NULL, NULL, NULL);
+									NULL, NULL, NULL, NULL);
 
 		/*
 		 * Start reading at the slot's restart_lsn, which we know to point to
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index f624048..23ab992 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -253,6 +253,23 @@ static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
 
 static void XLogRead(char *buf, XLogRecPtr startptr, Size count);
 
+/*
+ * Return true until either the client or server side have requested that we wind
+ * up COPY BOTH mode by sending a CopyDone.
+ *
+ * If we receive a CopyDone from the client we should avoid sending any further
+ * CopyData messages and return to command mode as promptly as possible.
+ *
+ * While in the middle of sending data to a client we notice a client-initated
+ * CopyDone when WalSndWriteData() calls ProcessRepliesIfAny() and it
+ * sets streamingDoneSending.
+ */
+static
+bool IsStreamingActive(void)
+{
+	return !streamingDoneReceiving && !streamingDoneSending;
+}
+
 
 /* Initialize walsender process before entering the main command loop */
 void
@@ -935,7 +952,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 		ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
 										logical_read_xlog_page,
 										WalSndPrepareWrite, WalSndWriteData,
-										WalSndUpdateProgress);
+										WalSndUpdateProgress, IsStreamingActive);
 
 		/*
 		 * Signal that we don't need the timeout mechanism. We're just
@@ -1094,7 +1111,8 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 												 logical_read_xlog_page,
 												 WalSndPrepareWrite,
 												 WalSndWriteData,
-												 WalSndUpdateProgress);
+												 WalSndUpdateProgress,
+												 IsStreamingActive);
 
 	/* Start reading WAL from the oldest required WAL. */
 	logical_startptr = MyReplicationSlot->data.restart_lsn;
@@ -1186,17 +1204,6 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
 
 	CHECK_FOR_INTERRUPTS();
 
-	/* Try to flush pending output to the client */
-	if (pq_flush_if_writable() != 0)
-		WalSndShutdown();
-
-	/* Try taking fast path unless we get too close to walsender timeout. */
-	if (now < TimestampTzPlusMilliseconds(last_reply_timestamp,
-										  wal_sender_timeout / 2) &&
-		!pq_is_send_pending())
-	{
-		return;
-	}
 
 	/* If we have pending write here, go to slow path */
 	for (;;)
@@ -1367,7 +1374,14 @@ WalSndWaitForWal(XLogRecPtr loc)
 			break;
 
 		/*
-		 * We only send regular messages to the client for full decoded
+		 * If we have received CopyDone from the client, sent CopyDone
+		 * ourselves, it's time to exit streaming.
+		 */
+		if (!IsStreamingActive()) {
+			break;
+		}
+
+		/* We only send regular messages to the client for full decoded
 		 * transactions, but a synchronous replication and walsender shutdown
 		 * possibly are waiting for a later location. So we send pings
 		 * containing the flush location every now and then.
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index c25ac1f..4822bfc 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -100,7 +100,8 @@ extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin,
 						  XLogPageReadCB read_page,
 						  LogicalOutputPluginWriterPrepareWrite prepare_write,
 						  LogicalOutputPluginWriterWrite do_write,
-						  LogicalOutputPluginWriterUpdateProgress update_progress);
+						  LogicalOutputPluginWriterUpdateProgress update_progress,
+						  ContinueDecodingCB continue_decoding_cb);
 extern LogicalDecodingContext *CreateDecodingContext(
 					  XLogRecPtr start_lsn,
 					  List *output_plugin_options,
@@ -108,7 +109,8 @@ extern LogicalDecodingContext *CreateDecodingContext(
 					  XLogPageReadCB read_page,
 					  LogicalOutputPluginWriterPrepareWrite prepare_write,
 					  LogicalOutputPluginWriterWrite do_write,
-					  LogicalOutputPluginWriterUpdateProgress update_progress);
+					  LogicalOutputPluginWriterUpdateProgress update_progress,
+					  ContinueDecodingCB continue_decoding_cb);
 extern void DecodingContextFindStartpoint(LogicalDecodingContext *ctx);
 extern bool DecodingContextReady(LogicalDecodingContext *ctx);
 extern void FreeDecodingContext(LogicalDecodingContext *ctx);
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 1f52f6b..0a14c81 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -326,6 +326,14 @@ typedef void (*ReorderBufferMessageCB) (
 										const char *prefix, Size sz,
 										const char *message);
 
+
+/*
+ * Callback function that allow interrupt logical replication during decoding.
+ * Function return true if decoding can be continue decode, but if function return false
+ * logical decoding will stop as soon as possible.
+ */
+typedef bool (*ContinueDecodingCB) (void);
+
 struct ReorderBuffer
 {
 	/*
@@ -365,6 +373,11 @@ struct ReorderBuffer
 	ReorderBufferMessageCB message;
 
 	/*
+	 * Callback to define status of decoding. Return false if decoding not necessary continue
+	 */
+	ContinueDecodingCB continue_decoding_cb;
+
+	/*
 	 * Pointer that will be passed untouched to the callbacks.
 	 */
 	void	   *private_data;
-- 
2.6.4

