New patch in attach. 0001 and 0002 without changes.
0003 - patch contain improvements for pg_recvloginca, now pg_recvloginca
after receive SIGINT will send CopyDone package to postgresql and wait from
server CopyDone. For backward compatible after repeat send SIGINT
 pg_recvloginca will continue immediately without wait CopyDone from
server.
0004 patch contain regression tests on scenarios that fix 0001 and 0002
patches.

2016-09-16 10:11 GMT+03:00 Vladimir Gordiychuk <fol...@gmail.com>:

> >Have you had a chance to look at adding the tests we discussed?
>
> Not yet. I plane do it on this weekends
>
> 2016-09-16 4:37 GMT+03:00 Craig Ringer <cr...@2ndquadrant.com>:
>
>> On 9 September 2016 at 12:03, Craig Ringer <cr...@2ndquadrant.com> wrote:
>>
>> > Setting "waiting on author" in CF per discussion of the need for tests.
>>
>> Have you had a chance to look at adding the tests we discussed?
>>
>> --
>>  Craig Ringer                   http://www.2ndQuadrant.com/
>>  PostgreSQL Development, 24x7 Support, Training & Services
>>
>
>
From 97da2c500f8f8edc69bcd520096c686e99e52612 Mon Sep 17 00:00:00 2001
From: Craig Ringer <cr...@2ndquadrant.com>
Date: Tue, 10 May 2016 10:34:10 +0800
Subject: [PATCH 1/4] Respect client-initiated CopyDone in walsender

The walsender never looked for CopyDone sent by the client unless it
had already decided it was done sending data and dispatched its own
CopyDone message.

Check for client-initiated CopyDone when in COPY BOTH mode, returning to
command mode. In logical decoding this will allow the client to end a logical
decoding session between transactions without just unilaterally closing its
connection.

This change does not allow a client to end COPY BOTH session in the middle of
processing a logical decoding block.

TODO effect on physical walsender?
---
 src/backend/replication/walsender.c | 32 +++++++++++++++++++++++++++-----
 1 file changed, 27 insertions(+), 5 deletions(-)

diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 1ea2a5c..c43310c 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -759,6 +759,13 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
 	/* make sure we have enough WAL available */
 	flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
 
+	/*
+	 * If the client sent CopyDone while we were waiting,
+	 * bail out so we can wind up the decoding session.
+	 */
+	if (streamingDoneSending)
+		return -1;
+
 	/* more than one block available */
 	if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
 		count = XLOG_BLCKSZ;
@@ -1220,8 +1227,11 @@ WalSndWaitForWal(XLogRecPtr loc)
 		 * It's important to do this check after the recomputation of
 		 * RecentFlushPtr, so we can send all remaining data before shutting
 		 * down.
+		 *
+		 * We'll also exit here if the client sent CopyDone because it wants
+		 * to return to command mode.
 		 */
-		if (walsender_ready_to_stop)
+		if (walsender_ready_to_stop || streamingDoneReceiving)
 			break;
 
 		/*
@@ -1787,7 +1797,14 @@ WalSndCheckTimeOut(TimestampTz now)
 	}
 }
 
-/* Main loop of walsender process that streams the WAL over Copy messages. */
+/*
+ * Main loop of walsender process that streams the WAL over Copy messages.
+ *
+ * The send_data callback must enqueue complete CopyData messages to libpq
+ * using pq_putmessage_noblock or similar, since the walsender loop may send
+ * CopyDone then exit and return to command mode in response to a client
+ * CopyDone between calls to send_data.
+ */
 static void
 WalSndLoop(WalSndSendDataCallback send_data)
 {
@@ -1850,10 +1867,15 @@ WalSndLoop(WalSndSendDataCallback send_data)
 		 * some more.  If there is some, we don't bother to call send_data
 		 * again until we've flushed it ... but we'd better assume we are not
 		 * caught up.
+		 *
+		 * If we're trying to finish sending and exit we shouldn't enqueue more
+		 * data to libpq. We need to finish writing out whatever we already
+		 * have in libpq's send buffer to maintain protocol sync so we still
+		 * need to loop until it's flushed.
 		 */
-		if (!pq_is_send_pending())
+		if (!pq_is_send_pending() && !streamingDoneSending)
 			send_data();
-		else
+		else if (!streamingDoneSending)
 			WalSndCaughtUp = false;
 
 		/* Try to flush pending output to the client */
@@ -2909,7 +2931,7 @@ WalSndKeepaliveIfNecessary(TimestampTz now)
 	if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0)
 		return;
 
-	if (waiting_for_ping_response)
+	if (waiting_for_ping_response || streamingDoneSending)
 		return;
 
 	/*
-- 
1.9.1

From fcdcfe1aedfb3c7ef90c78c7d8acb4ca99a2ffdc Mon Sep 17 00:00:00 2001
From: Vladimir Gordiychuk <fol...@gmail.com>
Date: Wed, 7 Sep 2016 00:39:18 +0300
Subject: [PATCH 2/4] Client-initiated CopyDone during transaction decondig in
 walsender

The walsender never looked for client messages during decode transaction
by ReorderBufferCommit. It affect long transaction, even if client try
interrupt streaming by CopyDone package, server will still send CopyData messages
and generate network traffic. Client also can request from server reply by
KeepAllive package(by send KeepAllive with requireReply flag).

This patch introduce check client messages every send CopyData message and interrupt
transaction decoding as fast as possible if was receive CopyDone from client(ContinueDecodingCB).
---
 src/backend/replication/logical/logical.c       | 15 +++++---
 src/backend/replication/logical/logicalfuncs.c  |  3 +-
 src/backend/replication/logical/reorderbuffer.c |  9 +++--
 src/backend/replication/slotfuncs.c             |  2 +-
 src/backend/replication/walsender.c             | 50 +++++++++++++++----------
 src/include/replication/logical.h               |  6 ++-
 src/include/replication/reorderbuffer.h         | 12 ++++++
 7 files changed, 66 insertions(+), 31 deletions(-)

diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 1512be5..fcb1ffc 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -116,7 +116,8 @@ StartupDecodingContext(List *output_plugin_options,
 					   TransactionId xmin_horizon,
 					   XLogPageReadCB read_page,
 					   LogicalOutputPluginWriterPrepareWrite prepare_write,
-					   LogicalOutputPluginWriterWrite do_write)
+					   LogicalOutputPluginWriterWrite do_write,
+					   ContinueDecodingCB continue_decondig_cb)
 {
 	ReplicationSlot *slot;
 	MemoryContext context,
@@ -180,6 +181,7 @@ StartupDecodingContext(List *output_plugin_options,
 	ctx->reorder->apply_change = change_cb_wrapper;
 	ctx->reorder->commit = commit_cb_wrapper;
 	ctx->reorder->message = message_cb_wrapper;
+	ctx->reorder->continue_decoding_cb = continue_decondig_cb;
 
 	ctx->out = makeStringInfo();
 	ctx->prepare_write = prepare_write;
@@ -212,7 +214,8 @@ CreateInitDecodingContext(char *plugin,
 						  List *output_plugin_options,
 						  XLogPageReadCB read_page,
 						  LogicalOutputPluginWriterPrepareWrite prepare_write,
-						  LogicalOutputPluginWriterWrite do_write)
+						  LogicalOutputPluginWriterWrite do_write,
+						  ContinueDecodingCB continue_decondig_cb)
 {
 	TransactionId xmin_horizon = InvalidTransactionId;
 	ReplicationSlot *slot;
@@ -288,7 +291,7 @@ CreateInitDecodingContext(char *plugin,
 	ReplicationSlotSave();
 
 	ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon,
-								 read_page, prepare_write, do_write);
+								 read_page, prepare_write, do_write, continue_decondig_cb);
 
 	/* call output plugin initialization callback */
 	old_context = MemoryContextSwitchTo(ctx->context);
@@ -328,7 +331,8 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 					  List *output_plugin_options,
 					  XLogPageReadCB read_page,
 					  LogicalOutputPluginWriterPrepareWrite prepare_write,
-					  LogicalOutputPluginWriterWrite do_write)
+					  LogicalOutputPluginWriterWrite do_write,
+					  ContinueDecodingCB continue_decondig_cb)
 {
 	LogicalDecodingContext *ctx;
 	ReplicationSlot *slot;
@@ -378,7 +382,8 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 
 	ctx = StartupDecodingContext(output_plugin_options,
 								 start_lsn, InvalidTransactionId,
-								 read_page, prepare_write, do_write);
+								 read_page, prepare_write, do_write,
+								 continue_decondig_cb);
 
 	/* call output plugin initialization callback */
 	old_context = MemoryContextSwitchTo(ctx->context);
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index 9c7be2d..767d6ce 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -249,7 +249,8 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 									options,
 									logical_read_local_xlog_page,
 									LogicalOutputPrepareWrite,
-									LogicalOutputWrite);
+									LogicalOutputWrite,
+									NULL);
 
 		MemoryContextSwitchTo(oldcontext);
 
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 9b430b9..62f42e8 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -1417,7 +1417,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 		rb->begin(rb, txn);
 
 		iterstate = ReorderBufferIterTXNInit(rb, txn);
-		while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
+		while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL && rb->continue_decoding_cb != NULL && rb->continue_decoding_cb())
 		{
 			Relation	relation = NULL;
 			Oid			reloid;
@@ -1643,8 +1643,11 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 		ReorderBufferIterTXNFinish(rb, iterstate);
 		iterstate = NULL;
 
-		/* call commit callback */
-		rb->commit(rb, txn, commit_lsn);
+		if(rb->continue_decoding_cb != NULL && rb->continue_decoding_cb())
+		{
+			/* call commit callback */
+			rb->commit(rb, txn, commit_lsn);
+		}
 
 		/* this is just a sanity check against bad output plugin behaviour */
 		if (GetCurrentTransactionIdIfAny() != InvalidTransactionId)
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index f908761..976bc0c 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -127,7 +127,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
 	 */
 	ctx = CreateInitDecodingContext(
 									NameStr(*plugin), NIL,
-									logical_read_local_xlog_page, NULL, NULL);
+									logical_read_local_xlog_page, NULL, NULL, NULL);
 
 	/* build initial snapshot, might take a while */
 	DecodingContextFindStartpoint(ctx);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index c43310c..d85ad16 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -218,6 +218,19 @@ static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
 
 static void XLogRead(char *buf, XLogRecPtr startptr, Size count);
 
+/*
+ * Return true if client or server initialize CopyDone process.
+ */
+/*
+ * While streaming WAL in Copy mode, client or server can initialize end streaming
+ * process by sending CopyDone. We should not send any more CopyData messages
+ * after that.
+ *
+ * This function return false if was send CopyDone(disconnect by timeout)
+ * or receive from client.
+ */
+static bool IsStreamingActive(void);
+
 
 /* Initialize walsender process before entering the main command loop */
 void
@@ -823,7 +836,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 
 		ctx = CreateInitDecodingContext(cmd->plugin, NIL,
 										logical_read_xlog_page,
-										WalSndPrepareWrite, WalSndWriteData);
+										WalSndPrepareWrite, WalSndWriteData,
+										IsStreamingActive);
 
 		/*
 		 * Signal that we don't need the timeout mechanism. We're just
@@ -1002,7 +1016,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 	logical_decoding_ctx = CreateDecodingContext(
 											   cmd->startpoint, cmd->options,
 												 logical_read_xlog_page,
-										WalSndPrepareWrite, WalSndWriteData);
+										WalSndPrepareWrite, WalSndWriteData, IsStreamingActive);
 
 	/* Start reading WAL from the oldest required WAL. */
 	logical_startptr = MyReplicationSlot->data.restart_lsn;
@@ -1093,14 +1107,6 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
 	memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
 		   tmpbuf.data, sizeof(int64));
 
-	/* fast path */
-	/* Try to flush pending output to the client */
-	if (pq_flush_if_writable() != 0)
-		WalSndShutdown();
-
-	if (!pq_is_send_pending())
-		return;
-
 	for (;;)
 	{
 		int			wakeEvents;
@@ -1235,7 +1241,14 @@ WalSndWaitForWal(XLogRecPtr loc)
 			break;
 
 		/*
-		 * We only send regular messages to the client for full decoded
+		 * If we have received CopyDone from the client, sent CopyDone
+		 * ourselves, it's time to exit streaming.
+		 */
+		if (!IsStreamingActive()) {
+			break;
+		}
+
+		/* We only send regular messages to the client for full decoded
 		 * transactions, but a synchronous replication and walsender shutdown
 		 * possibly are waiting for a later location. So we send pings
 		 * containing the flush location every now and then.
@@ -1797,14 +1810,7 @@ WalSndCheckTimeOut(TimestampTz now)
 	}
 }
 
-/*
- * Main loop of walsender process that streams the WAL over Copy messages.
- *
- * The send_data callback must enqueue complete CopyData messages to libpq
- * using pq_putmessage_noblock or similar, since the walsender loop may send
- * CopyDone then exit and return to command mode in response to a client
- * CopyDone between calls to send_data.
- */
+/* Main loop of walsender process that streams the WAL over Copy messages. */
 static void
 WalSndLoop(WalSndSendDataCallback send_data)
 {
@@ -2951,3 +2957,9 @@ WalSndKeepaliveIfNecessary(TimestampTz now)
 			WalSndShutdown();
 	}
 }
+
+static
+bool IsStreamingActive(void)
+{
+	return !streamingDoneReceiving && !streamingDoneSending;
+}
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index 947000e..0ade384 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -81,13 +81,15 @@ extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin,
 						  List *output_plugin_options,
 						  XLogPageReadCB read_page,
 						  LogicalOutputPluginWriterPrepareWrite prepare_write,
-						  LogicalOutputPluginWriterWrite do_write);
+						  LogicalOutputPluginWriterWrite do_write,
+						  ContinueDecodingCB continue_decondig_cb);
 extern LogicalDecodingContext *CreateDecodingContext(
 					  XLogRecPtr start_lsn,
 					  List *output_plugin_options,
 					  XLogPageReadCB read_page,
 					  LogicalOutputPluginWriterPrepareWrite prepare_write,
-					  LogicalOutputPluginWriterWrite do_write);
+					  LogicalOutputPluginWriterWrite do_write,
+					  ContinueDecodingCB continue_decondig_cb);
 extern void DecodingContextFindStartpoint(LogicalDecodingContext *ctx);
 extern bool DecodingContextReady(LogicalDecodingContext *ctx);
 extern void FreeDecodingContext(LogicalDecodingContext *ctx);
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 9e209ae..c2e0eea 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -292,6 +292,13 @@ typedef void (*ReorderBufferMessageCB) (
 												 const char *prefix, Size sz,
 													const char *message);
 
+/*
+ * Callback function that allow interrupt logical replication during decoding.
+ * Function return true if decoding can be continue decode, but if function return false
+ * logical decoding will stop as soon as possible.
+ */
+typedef bool (*ContinueDecodingCB) (void);
+
 struct ReorderBuffer
 {
 	/*
@@ -321,6 +328,11 @@ struct ReorderBuffer
 	ReorderBufferMessageCB message;
 
 	/*
+	 * Callback to define status of decoding. Return false if decoding not necessary continue
+	 */
+	ContinueDecodingCB continue_decoding_cb;
+
+	/*
 	 * Pointer that will be passed untouched to the callbacks.
 	 */
 	void	   *private_data;
-- 
1.9.1

From 5eb8d64427b1a50cacac1ff35eff2ef13c528de3 Mon Sep 17 00:00:00 2001
From: Vladimir Gordiychuk <fol...@gmail.com>
Date: Mon, 19 Sep 2016 01:50:05 +0300
Subject: [PATCH 3/4] Add ability for pg_recvlogical safe stop replication

Now, pg_recvlogical on first SIGINT(Ctrl+C) send CopyDone package
to postgresql and wait CopyDone replay. After exchange CopyDone packages
we guarantee that replication slot slop safely and now have not active state.

For backward compatibility safe ability force interrupt replication without
waiting CopyDone exhange. For force stop replication as it was before need
send SIGINT(Ctrl+C) twice.
---
 src/bin/pg_basebackup/pg_recvlogical.c             | 435 +++++++++++++--------
 src/test/recovery/t/001_stream_rep.pl              |  63 ---
 src/test/recovery/t/002_archiving.pl               |  53 ---
 src/test/recovery/t/003_recovery_targets.pl        | 146 -------
 src/test/recovery/t/004_timeline_switch.pl         |  75 ----
 src/test/recovery/t/005_replay_delay.pl            |  69 ----
 src/test/recovery/t/006_logical_decoding.pl        |  40 --
 src/test/recovery/t/007_sync_rep.pl                | 174 ---------
 src/test/recovery/t/previous/001_stream_rep.pl     |  63 +++
 src/test/recovery/t/previous/002_archiving.pl      |  53 +++
 .../recovery/t/previous/003_recovery_targets.pl    | 146 +++++++
 .../recovery/t/previous/004_timeline_switch.pl     |  75 ++++
 src/test/recovery/t/previous/005_replay_delay.pl   |  69 ++++
 .../recovery/t/previous/006_logical_decoding.pl    |  40 ++
 src/test/recovery/t/previous/007_sync_rep.pl       | 174 +++++++++
 15 files changed, 884 insertions(+), 791 deletions(-)
 delete mode 100644 src/test/recovery/t/001_stream_rep.pl
 delete mode 100644 src/test/recovery/t/002_archiving.pl
 delete mode 100644 src/test/recovery/t/003_recovery_targets.pl
 delete mode 100644 src/test/recovery/t/004_timeline_switch.pl
 delete mode 100644 src/test/recovery/t/005_replay_delay.pl
 delete mode 100644 src/test/recovery/t/006_logical_decoding.pl
 delete mode 100644 src/test/recovery/t/007_sync_rep.pl
 create mode 100644 src/test/recovery/t/previous/001_stream_rep.pl
 create mode 100644 src/test/recovery/t/previous/002_archiving.pl
 create mode 100644 src/test/recovery/t/previous/003_recovery_targets.pl
 create mode 100644 src/test/recovery/t/previous/004_timeline_switch.pl
 create mode 100644 src/test/recovery/t/previous/005_replay_delay.pl
 create mode 100644 src/test/recovery/t/previous/006_logical_decoding.pl
 create mode 100644 src/test/recovery/t/previous/007_sync_rep.pl

diff --git a/src/bin/pg_basebackup/pg_recvlogical.c b/src/bin/pg_basebackup/pg_recvlogical.c
index 4c6cf70..07a7ed6 100644
--- a/src/bin/pg_basebackup/pg_recvlogical.c
+++ b/src/bin/pg_basebackup/pg_recvlogical.c
@@ -50,8 +50,12 @@ static const char *plugin = "test_decoding";
 /* Global State */
 static int	outfd = -1;
 static volatile sig_atomic_t time_to_abort = false;
+static volatile sig_atomic_t force_time_to_abort = false;
 static volatile sig_atomic_t output_reopen = false;
+static bool copyDoneSend = false;
+static bool copyDoneReceive = false;
 static bool output_isfile;
+static int64 last_status_time;
 static int64 output_last_fsync = -1;
 static bool output_needs_fsync = false;
 static XLogRecPtr output_written_lsn = InvalidXLogRecPtr;
@@ -195,6 +199,229 @@ OutputFsync(int64 now)
 	return true;
 }
 
+static bool
+ProcessKeepalive(PGconn *conn, char *msgBuf, int msgLength)
+{
+	int			pos;
+	bool		replyRequested;
+	XLogRecPtr	walEnd;
+
+	/*
+	 * Parse the keepalive message, enclosed in the CopyData message.
+	 * We just check if the server requested a reply, and ignore the
+	 * rest.
+	 */
+	pos = 1;			/* skip msgtype 'k' */
+	walEnd = fe_recvint64(&msgBuf[pos]);
+	output_written_lsn = Max(walEnd, output_written_lsn);
+
+	pos += 8;			/* read walEnd */
+
+	pos += 8;			/* skip sendTime */
+
+	if (msgLength < pos + 1)
+	{
+		fprintf(stderr, _("%s: streaming header too small: %d\n"),
+				progname, msgLength);
+		return -1;
+	}
+	replyRequested = msgBuf[pos];
+
+	/* If the server requested an immediate reply, send one. */
+	if (replyRequested)
+	{
+		int64 now = feGetCurrentTimestamp();
+
+		/* fsync data, so we send a recent flush pointer */
+		if (!OutputFsync(now))
+		{
+			return false;
+		}
+
+		if (!sendFeedback(conn, now, true, false))
+		{
+			return false;
+		}
+		last_status_time = now;
+	}
+
+	return true;
+}
+
+static bool
+ProcessXLogData(PGconn *conn, char *msgBuf, int msgLength)
+{
+	/*
+	 * Read the header of the XLogData message, enclosed in the CopyData
+	 * message. We only need the WAL location field (dataStart), the rest
+	 * of the header is ignored.
+	 */
+	int hdr_len = 1;			/* msgtype 'w' */
+	hdr_len += 8;			/* dataStart */
+	hdr_len += 8;			/* walEnd */
+	hdr_len += 8;			/* sendTime */
+	if (msgLength < hdr_len + 1)
+	{
+		fprintf(stderr, _("%s: streaming header too small: %d\n"),
+				progname, msgLength);
+		return false;
+	}
+
+	/* Extract WAL location for this block */
+	{
+		XLogRecPtr	temp = fe_recvint64(&msgBuf[1]);
+
+		output_written_lsn = Max(temp, output_written_lsn);
+	}
+
+	int bytes_left = msgLength - hdr_len;
+	int bytes_written = 0;
+
+	/* signal that a fsync is needed */
+	output_needs_fsync = true;
+
+	while (bytes_left)
+	{
+		int			ret;
+
+		ret = write(outfd,
+				    msgBuf + hdr_len + bytes_written,
+					bytes_left);
+
+		if (ret < 0)
+		{
+			fprintf(stderr,
+			  _("%s: could not write %u bytes to log file \"%s\": %s\n"),
+					progname, bytes_left, outfile,
+					strerror(errno));
+			return false;
+		}
+
+		/* Write was successful, advance our position */
+		bytes_written += ret;
+		bytes_left -= ret;
+	}
+
+	if (write(outfd, "\n", 1) != 1)
+	{
+		fprintf(stderr,
+			  _("%s: could not write %u bytes to log file \"%s\": %s\n"),
+				progname, 1, outfile,
+				strerror(errno));
+		return false;
+	}
+
+	return true;
+}
+
+static bool
+ProcessCopyDone(PGconn *conn)
+{
+	copyDoneReceive = true;
+	if (!copyDoneSend)
+	{
+		if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
+		{
+			fprintf(stderr, _("%s: could not send copy-end packet: %s"),
+					progname, PQerrorMessage(conn));
+			return false;
+		}
+
+		copyDoneSend = true;
+	}
+
+	return true;
+}
+
+static bool
+ProcessReceiveMsg(PGconn *conn, unsigned char type, char *msgBuf, int msgLength)
+{
+	bool success = false;
+	switch (type)
+	{
+		case 'k':
+			success = ProcessKeepalive(conn, msgBuf, msgLength);
+			break;
+		case 'w':
+			success = ProcessXLogData(conn, msgBuf, msgLength);
+			break;
+		case 'c':
+			success = ProcessCopyDone(conn);
+			break;
+		default:
+			fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
+					progname, type);
+	}
+
+	return success;
+}
+
+/*
+ * Sync wait activity on socket. Waiting can be interrupt by fsync or keepalive timeout.
+ * Returns the number of ready descriptors, or -1 for errors.
+ */
+static int
+WaitSocketActivity(PGconn *conn, int64 now)
+{
+	/*
+	 * In async mode, and no data available. We block on reading but
+	 * not more than the specified timeout, so that we can send a
+	 * response back to the client.
+	 */
+	fd_set		input_mask;
+	int64		message_target = 0;
+	int64		fsync_target = 0;
+	struct timeval timeout;
+	struct timeval *timeoutptr = NULL;
+
+	if (PQsocket(conn) < 0)
+	{
+		fprintf(stderr,
+				_("%s: invalid socket: %s"),
+				progname, PQerrorMessage(conn));
+		return -1;
+	}
+
+	FD_ZERO(&input_mask);
+	FD_SET(PQsocket(conn), &input_mask);
+
+	/* Compute when we need to wakeup to send a keepalive message. */
+	if (standby_message_timeout)
+		message_target = last_status_time + (standby_message_timeout - 1) *
+			((int64) 1000);
+
+	/* Compute when we need to wakeup to fsync the output file. */
+	if (fsync_interval > 0 && output_needs_fsync)
+		fsync_target = output_last_fsync + (fsync_interval - 1) *
+			((int64) 1000);
+
+	/* Now compute when to wakeup. */
+	if (message_target > 0 || fsync_target > 0)
+	{
+		int64		targettime;
+		long		secs;
+		int			usecs;
+
+		targettime = message_target;
+
+		if (fsync_target > 0 && fsync_target < targettime)
+			targettime = fsync_target;
+
+		feTimestampDifference(now,
+							  targettime,
+							  &secs,
+							  &usecs);
+		if (secs <= 0)
+			timeout.tv_sec = 1; /* Always sleep at least 1 sec */
+		else
+			timeout.tv_sec = secs;
+		timeout.tv_usec = usecs;
+		timeoutptr = &timeout;
+	}
+
+	return select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
+}
+
 /*
  * Start the log streaming
  */
@@ -203,12 +430,12 @@ StreamLogicalLog(void)
 {
 	PGresult   *res;
 	char	   *copybuf = NULL;
-	int64		last_status = -1;
 	int			i;
 	PQExpBuffer query;
 
 	output_written_lsn = InvalidXLogRecPtr;
 	output_fsync_lsn = InvalidXLogRecPtr;
+	last_status_time = -1;
 
 	query = createPQExpBuffer();
 
@@ -271,13 +498,10 @@ StreamLogicalLog(void)
 				_("%s: streaming initiated\n"),
 				progname);
 
-	while (!time_to_abort)
+	while (!force_time_to_abort && !copyDoneReceive)
 	{
 		int			r;
-		int			bytes_left;
-		int			bytes_written;
 		int64		now;
-		int			hdr_len;
 
 		if (copybuf != NULL)
 		{
@@ -299,14 +523,35 @@ StreamLogicalLog(void)
 		}
 
 		if (standby_message_timeout > 0 &&
-			feTimestampDifferenceExceeds(last_status, now,
+			feTimestampDifferenceExceeds(last_status_time, now,
 										 standby_message_timeout))
 		{
 			/* Time to send feedback! */
 			if (!sendFeedback(conn, now, true, false))
 				goto error;
 
-			last_status = now;
+			last_status_time = now;
+		}
+
+		if (time_to_abort && !copyDoneSend)
+		{
+			if (verbose)
+			{
+				fprintf(stderr,
+						_("%s: stopping write up to %X/%X, flush to %X/%X (slot %s)\n"),
+						progname,
+						(uint32) (output_written_lsn >> 32), (uint32) output_written_lsn,
+						(uint32) (output_fsync_lsn >> 32), (uint32) output_fsync_lsn,
+						replication_slot);
+			}
+
+			if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
+			{
+				fprintf(stderr, _("%s: could not send copy-end packet: %s"),
+						progname, PQerrorMessage(conn));
+				goto error;
+			}
+			copyDoneSend = true;
 		}
 
 		/* got SIGHUP, close output file */
@@ -349,64 +594,9 @@ StreamLogicalLog(void)
 		r = PQgetCopyData(conn, &copybuf, 1);
 		if (r == 0)
 		{
-			/*
-			 * In async mode, and no data available. We block on reading but
-			 * not more than the specified timeout, so that we can send a
-			 * response back to the client.
-			 */
-			fd_set		input_mask;
-			int64		message_target = 0;
-			int64		fsync_target = 0;
-			struct timeval timeout;
-			struct timeval *timeoutptr = NULL;
-
-			if (PQsocket(conn) < 0)
-			{
-				fprintf(stderr,
-						_("%s: invalid socket: %s"),
-						progname, PQerrorMessage(conn));
-				goto error;
-			}
+			int readyMsg = WaitSocketActivity(conn, now);
 
-			FD_ZERO(&input_mask);
-			FD_SET(PQsocket(conn), &input_mask);
-
-			/* Compute when we need to wakeup to send a keepalive message. */
-			if (standby_message_timeout)
-				message_target = last_status + (standby_message_timeout - 1) *
-					((int64) 1000);
-
-			/* Compute when we need to wakeup to fsync the output file. */
-			if (fsync_interval > 0 && output_needs_fsync)
-				fsync_target = output_last_fsync + (fsync_interval - 1) *
-					((int64) 1000);
-
-			/* Now compute when to wakeup. */
-			if (message_target > 0 || fsync_target > 0)
-			{
-				int64		targettime;
-				long		secs;
-				int			usecs;
-
-				targettime = message_target;
-
-				if (fsync_target > 0 && fsync_target < targettime)
-					targettime = fsync_target;
-
-				feTimestampDifference(now,
-									  targettime,
-									  &secs,
-									  &usecs);
-				if (secs <= 0)
-					timeout.tv_sec = 1; /* Always sleep at least 1 sec */
-				else
-					timeout.tv_sec = secs;
-				timeout.tv_usec = usecs;
-				timeoutptr = &timeout;
-			}
-
-			r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
-			if (r == 0 || (r < 0 && errno == EINTR))
+			if (readyMsg == 0 || (readyMsg < 0 && errno == EINTR))
 			{
 				/*
 				 * Got a timeout or signal. Continue the loop and either
@@ -415,7 +605,7 @@ StreamLogicalLog(void)
 				 */
 				continue;
 			}
-			else if (r < 0)
+			else if (readyMsg < 0)
 			{
 				fprintf(stderr, _("%s: select() failed: %s\n"),
 						progname, strerror(errno));
@@ -430,6 +620,7 @@ StreamLogicalLog(void)
 						progname, PQerrorMessage(conn));
 				goto error;
 			}
+
 			continue;
 		}
 
@@ -445,113 +636,8 @@ StreamLogicalLog(void)
 			goto error;
 		}
 
-		/* Check the message type. */
-		if (copybuf[0] == 'k')
+		if(!ProcessReceiveMsg(conn, copybuf[0], copybuf, r))
 		{
-			int			pos;
-			bool		replyRequested;
-			XLogRecPtr	walEnd;
-
-			/*
-			 * Parse the keepalive message, enclosed in the CopyData message.
-			 * We just check if the server requested a reply, and ignore the
-			 * rest.
-			 */
-			pos = 1;			/* skip msgtype 'k' */
-			walEnd = fe_recvint64(&copybuf[pos]);
-			output_written_lsn = Max(walEnd, output_written_lsn);
-
-			pos += 8;			/* read walEnd */
-
-			pos += 8;			/* skip sendTime */
-
-			if (r < pos + 1)
-			{
-				fprintf(stderr, _("%s: streaming header too small: %d\n"),
-						progname, r);
-				goto error;
-			}
-			replyRequested = copybuf[pos];
-
-			/* If the server requested an immediate reply, send one. */
-			if (replyRequested)
-			{
-				/* fsync data, so we send a recent flush pointer */
-				if (!OutputFsync(now))
-					goto error;
-
-				now = feGetCurrentTimestamp();
-				if (!sendFeedback(conn, now, true, false))
-					goto error;
-				last_status = now;
-			}
-			continue;
-		}
-		else if (copybuf[0] != 'w')
-		{
-			fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
-					progname, copybuf[0]);
-			goto error;
-		}
-
-
-		/*
-		 * Read the header of the XLogData message, enclosed in the CopyData
-		 * message. We only need the WAL location field (dataStart), the rest
-		 * of the header is ignored.
-		 */
-		hdr_len = 1;			/* msgtype 'w' */
-		hdr_len += 8;			/* dataStart */
-		hdr_len += 8;			/* walEnd */
-		hdr_len += 8;			/* sendTime */
-		if (r < hdr_len + 1)
-		{
-			fprintf(stderr, _("%s: streaming header too small: %d\n"),
-					progname, r);
-			goto error;
-		}
-
-		/* Extract WAL location for this block */
-		{
-			XLogRecPtr	temp = fe_recvint64(&copybuf[1]);
-
-			output_written_lsn = Max(temp, output_written_lsn);
-		}
-
-		bytes_left = r - hdr_len;
-		bytes_written = 0;
-
-		/* signal that a fsync is needed */
-		output_needs_fsync = true;
-
-		while (bytes_left)
-		{
-			int			ret;
-
-			ret = write(outfd,
-						copybuf + hdr_len + bytes_written,
-						bytes_left);
-
-			if (ret < 0)
-			{
-				fprintf(stderr,
-				  _("%s: could not write %u bytes to log file \"%s\": %s\n"),
-						progname, bytes_left, outfile,
-						strerror(errno));
-				goto error;
-			}
-
-			/* Write was successful, advance our position */
-			bytes_written += ret;
-			bytes_left -= ret;
-		}
-
-		if (write(outfd, "\n", 1) != 1)
-		{
-			fprintf(stderr,
-				  _("%s: could not write %u bytes to log file \"%s\": %s\n"),
-					progname, 1, outfile,
-					strerror(errno));
 			goto error;
 		}
 	}
@@ -601,6 +687,13 @@ error:
 static void
 sigint_handler(int signum)
 {
+    /* Backward compatible, allow force interrupt logical replication
+     * after second SIGINT without wait CopyDone from server
+     */
+	if (time_to_abort)
+	{
+		force_time_to_abort = true;
+	}
 	time_to_abort = true;
 }
 
diff --git a/src/test/recovery/t/001_stream_rep.pl b/src/test/recovery/t/001_stream_rep.pl
deleted file mode 100644
index fd71095..0000000
--- a/src/test/recovery/t/001_stream_rep.pl
+++ /dev/null
@@ -1,63 +0,0 @@
-# Minimal test testing streaming replication
-use strict;
-use warnings;
-use PostgresNode;
-use TestLib;
-use Test::More tests => 4;
-
-# Initialize master node
-my $node_master = get_new_node('master');
-$node_master->init(allows_streaming => 1);
-$node_master->start;
-my $backup_name = 'my_backup';
-
-# Take backup
-$node_master->backup($backup_name);
-
-# Create streaming standby linking to master
-my $node_standby_1 = get_new_node('standby_1');
-$node_standby_1->init_from_backup($node_master, $backup_name,
-	has_streaming => 1);
-$node_standby_1->start;
-
-# Take backup of standby 1 (not mandatory, but useful to check if
-# pg_basebackup works on a standby).
-$node_standby_1->backup($backup_name);
-
-# Create second standby node linking to standby 1
-my $node_standby_2 = get_new_node('standby_2');
-$node_standby_2->init_from_backup($node_standby_1, $backup_name,
-	has_streaming => 1);
-$node_standby_2->start;
-
-# Create some content on master and check its presence in standby 1
-$node_master->safe_psql('postgres',
-	"CREATE TABLE tab_int AS SELECT generate_series(1,1002) AS a");
-
-# Wait for standbys to catch up
-my $applname_1 = $node_standby_1->name;
-my $applname_2 = $node_standby_2->name;
-my $caughtup_query =
-"SELECT pg_current_xlog_location() <= replay_location FROM pg_stat_replication WHERE application_name = '$applname_1';";
-$node_master->poll_query_until('postgres', $caughtup_query)
-  or die "Timed out while waiting for standby 1 to catch up";
-$caughtup_query =
-"SELECT pg_last_xlog_replay_location() <= replay_location FROM pg_stat_replication WHERE application_name = '$applname_2';";
-$node_standby_1->poll_query_until('postgres', $caughtup_query)
-  or die "Timed out while waiting for standby 2 to catch up";
-
-my $result =
-  $node_standby_1->safe_psql('postgres', "SELECT count(*) FROM tab_int");
-print "standby 1: $result\n";
-is($result, qq(1002), 'check streamed content on standby 1');
-
-$result =
-  $node_standby_2->safe_psql('postgres', "SELECT count(*) FROM tab_int");
-print "standby 2: $result\n";
-is($result, qq(1002), 'check streamed content on standby 2');
-
-# Check that only READ-only queries can run on standbys
-is($node_standby_1->psql('postgres', 'INSERT INTO tab_int VALUES (1)'),
-	3, 'read-only queries on standby 1');
-is($node_standby_2->psql('postgres', 'INSERT INTO tab_int VALUES (1)'),
-	3, 'read-only queries on standby 2');
diff --git a/src/test/recovery/t/002_archiving.pl b/src/test/recovery/t/002_archiving.pl
deleted file mode 100644
index fc2bf7e..0000000
--- a/src/test/recovery/t/002_archiving.pl
+++ /dev/null
@@ -1,53 +0,0 @@
-# test for archiving with hot standby
-use strict;
-use warnings;
-use PostgresNode;
-use TestLib;
-use Test::More tests => 1;
-use File::Copy;
-
-# Initialize master node, doing archives
-my $node_master = get_new_node('master');
-$node_master->init(
-	has_archiving    => 1,
-	allows_streaming => 1);
-my $backup_name = 'my_backup';
-
-# Start it
-$node_master->start;
-
-# Take backup for slave
-$node_master->backup($backup_name);
-
-# Initialize standby node from backup, fetching WAL from archives
-my $node_standby = get_new_node('standby');
-$node_standby->init_from_backup($node_master, $backup_name,
-	has_restoring => 1);
-$node_standby->append_conf(
-	'postgresql.conf', qq(
-wal_retrieve_retry_interval = '100ms'
-));
-$node_standby->start;
-
-# Create some content on master
-$node_master->safe_psql('postgres',
-	"CREATE TABLE tab_int AS SELECT generate_series(1,1000) AS a");
-my $current_lsn =
-  $node_master->safe_psql('postgres', "SELECT pg_current_xlog_location();");
-
-# Force archiving of WAL file to make it present on master
-$node_master->safe_psql('postgres', "SELECT pg_switch_xlog()");
-
-# Add some more content, it should not be present on standby
-$node_master->safe_psql('postgres',
-	"INSERT INTO tab_int VALUES (generate_series(1001,2000))");
-
-# Wait until necessary replay has been done on standby
-my $caughtup_query =
-  "SELECT '$current_lsn'::pg_lsn <= pg_last_xlog_replay_location()";
-$node_standby->poll_query_until('postgres', $caughtup_query)
-  or die "Timed out while waiting for standby to catch up";
-
-my $result =
-  $node_standby->safe_psql('postgres', "SELECT count(*) FROM tab_int");
-is($result, qq(1000), 'check content from archives');
diff --git a/src/test/recovery/t/003_recovery_targets.pl b/src/test/recovery/t/003_recovery_targets.pl
deleted file mode 100644
index a82545b..0000000
--- a/src/test/recovery/t/003_recovery_targets.pl
+++ /dev/null
@@ -1,146 +0,0 @@
-# Test for recovery targets: name, timestamp, XID
-use strict;
-use warnings;
-use PostgresNode;
-use TestLib;
-use Test::More tests => 9;
-
-# Create and test a standby from given backup, with a certain
-# recovery target.
-sub test_recovery_standby
-{
-	my $test_name       = shift;
-	my $node_name       = shift;
-	my $node_master     = shift;
-	my $recovery_params = shift;
-	my $num_rows        = shift;
-	my $until_lsn       = shift;
-
-	my $node_standby = get_new_node($node_name);
-	$node_standby->init_from_backup($node_master, 'my_backup',
-		has_restoring => 1);
-
-	foreach my $param_item (@$recovery_params)
-	{
-		$node_standby->append_conf(
-			'recovery.conf',
-			qq($param_item
-));
-	}
-
-	$node_standby->start;
-
-	# Wait until standby has replayed enough data
-	my $caughtup_query =
-	  "SELECT '$until_lsn'::pg_lsn <= pg_last_xlog_replay_location()";
-	$node_standby->poll_query_until('postgres', $caughtup_query)
-	  or die "Timed out while waiting for standby to catch up";
-
-	# Create some content on master and check its presence in standby
-	my $result =
-	  $node_standby->safe_psql('postgres', "SELECT count(*) FROM tab_int");
-	is($result, qq($num_rows), "check standby content for $test_name");
-
-	# Stop standby node
-	$node_standby->teardown_node;
-}
-
-# Initialize master node
-my $node_master = get_new_node('master');
-$node_master->init(has_archiving => 1, allows_streaming => 1);
-
-# Start it
-$node_master->start;
-
-# Create data before taking the backup, aimed at testing
-# recovery_target = 'immediate'
-$node_master->safe_psql('postgres',
-	"CREATE TABLE tab_int AS SELECT generate_series(1,1000) AS a");
-my $lsn1 =
-  $node_master->safe_psql('postgres', "SELECT pg_current_xlog_location();");
-
-# Take backup from which all operations will be run
-$node_master->backup('my_backup');
-
-# Insert some data with used as a replay reference, with a recovery
-# target TXID.
-$node_master->safe_psql('postgres',
-	"INSERT INTO tab_int VALUES (generate_series(1001,2000))");
-my $ret = $node_master->safe_psql('postgres',
-	"SELECT pg_current_xlog_location(), txid_current();");
-my ($lsn2, $recovery_txid) = split /\|/, $ret;
-
-# More data, with recovery target timestamp
-$node_master->safe_psql('postgres',
-	"INSERT INTO tab_int VALUES (generate_series(2001,3000))");
-$ret = $node_master->safe_psql('postgres',
-	"SELECT pg_current_xlog_location(), now();");
-my ($lsn3, $recovery_time) = split /\|/, $ret;
-
-# Even more data, this time with a recovery target name
-$node_master->safe_psql('postgres',
-	"INSERT INTO tab_int VALUES (generate_series(3001,4000))");
-my $recovery_name = "my_target";
-my $lsn4 =
-  $node_master->safe_psql('postgres', "SELECT pg_current_xlog_location();");
-$node_master->safe_psql('postgres',
-	"SELECT pg_create_restore_point('$recovery_name');");
-
-# And now for a recovery target LSN
-$node_master->safe_psql('postgres',
-	"INSERT INTO tab_int VALUES (generate_series(4001,5000))");
-my $recovery_lsn = $node_master->safe_psql('postgres', "SELECT pg_current_xlog_location()");
-my $lsn5 =
-  $node_master->safe_psql('postgres', "SELECT pg_current_xlog_location();");
-
-$node_master->safe_psql('postgres',
-	"INSERT INTO tab_int VALUES (generate_series(5001,6000))");
-
-# Force archiving of WAL file
-$node_master->safe_psql('postgres', "SELECT pg_switch_xlog()");
-
-# Test recovery targets
-my @recovery_params = ("recovery_target = 'immediate'");
-test_recovery_standby('immediate target',
-	'standby_1', $node_master, \@recovery_params, "1000", $lsn1);
-@recovery_params = ("recovery_target_xid = '$recovery_txid'");
-test_recovery_standby('XID', 'standby_2', $node_master, \@recovery_params,
-	"2000", $lsn2);
-@recovery_params = ("recovery_target_time = '$recovery_time'");
-test_recovery_standby('time', 'standby_3', $node_master, \@recovery_params,
-	"3000", $lsn3);
-@recovery_params = ("recovery_target_name = '$recovery_name'");
-test_recovery_standby('name', 'standby_4', $node_master, \@recovery_params,
-	"4000", $lsn4);
-@recovery_params = ("recovery_target_lsn = '$recovery_lsn'");
-test_recovery_standby('LSN', 'standby_5', $node_master, \@recovery_params,
-	"5000", $lsn5);
-
-# Multiple targets
-# Last entry has priority (note that an array respects the order of items
-# not hashes).
-@recovery_params = (
-	"recovery_target_name = '$recovery_name'",
-	"recovery_target_xid  = '$recovery_txid'",
-	"recovery_target_time = '$recovery_time'");
-test_recovery_standby('name + XID + time',
-	'standby_6', $node_master, \@recovery_params, "3000", $lsn3);
-@recovery_params = (
-	"recovery_target_time = '$recovery_time'",
-	"recovery_target_name = '$recovery_name'",
-	"recovery_target_xid  = '$recovery_txid'");
-test_recovery_standby('time + name + XID',
-	'standby_7', $node_master, \@recovery_params, "2000", $lsn2);
-@recovery_params = (
-	"recovery_target_xid  = '$recovery_txid'",
-	"recovery_target_time = '$recovery_time'",
-	"recovery_target_name = '$recovery_name'");
-test_recovery_standby('XID + time + name',
-	'standby_8', $node_master, \@recovery_params, "4000", $lsn4);
-@recovery_params = (
-	"recovery_target_xid  = '$recovery_txid'",
-	"recovery_target_time = '$recovery_time'",
-	"recovery_target_name = '$recovery_name'",
-	"recovery_target_lsn = '$recovery_lsn'",);
-test_recovery_standby('XID + time + name + LSN',
-	'standby_9', $node_master, \@recovery_params, "5000", $lsn5);
diff --git a/src/test/recovery/t/004_timeline_switch.pl b/src/test/recovery/t/004_timeline_switch.pl
deleted file mode 100644
index 3ee8df2..0000000
--- a/src/test/recovery/t/004_timeline_switch.pl
+++ /dev/null
@@ -1,75 +0,0 @@
-# Test for timeline switch
-# Ensure that a cascading standby is able to follow a newly-promoted standby
-# on a new timeline.
-use strict;
-use warnings;
-use File::Path qw(rmtree);
-use PostgresNode;
-use TestLib;
-use Test::More tests => 1;
-
-$ENV{PGDATABASE} = 'postgres';
-
-# Initialize master node
-my $node_master = get_new_node('master');
-$node_master->init(allows_streaming => 1);
-$node_master->start;
-
-# Take backup
-my $backup_name = 'my_backup';
-$node_master->backup($backup_name);
-
-# Create two standbys linking to it
-my $node_standby_1 = get_new_node('standby_1');
-$node_standby_1->init_from_backup($node_master, $backup_name,
-	has_streaming => 1);
-$node_standby_1->start;
-my $node_standby_2 = get_new_node('standby_2');
-$node_standby_2->init_from_backup($node_master, $backup_name,
-	has_streaming => 1);
-$node_standby_2->start;
-
-# Create some content on master
-$node_master->safe_psql('postgres',
-	"CREATE TABLE tab_int AS SELECT generate_series(1,1000) AS a");
-my $until_lsn =
-  $node_master->safe_psql('postgres', "SELECT pg_current_xlog_location();");
-
-# Wait until standby has replayed enough data on standby 1
-my $caughtup_query =
-  "SELECT '$until_lsn'::pg_lsn <= pg_last_xlog_replay_location()";
-$node_standby_1->poll_query_until('postgres', $caughtup_query)
-  or die "Timed out while waiting for standby to catch up";
-
-# Stop and remove master, and promote standby 1, switching it to a new timeline
-$node_master->teardown_node;
-$node_standby_1->promote;
-
-# Switch standby 2 to replay from standby 1
-rmtree($node_standby_2->data_dir . '/recovery.conf');
-my $connstr_1 = $node_standby_1->connstr;
-$node_standby_2->append_conf(
-	'recovery.conf', qq(
-primary_conninfo='$connstr_1'
-standby_mode=on
-recovery_target_timeline='latest'
-));
-$node_standby_2->restart;
-
-# Insert some data in standby 1 and check its presence in standby 2
-# to ensure that the timeline switch has been done. Standby 1 needs
-# to exit recovery first before moving on with the test.
-$node_standby_1->poll_query_until('postgres',
-	"SELECT pg_is_in_recovery() <> true");
-$node_standby_1->safe_psql('postgres',
-	"INSERT INTO tab_int VALUES (generate_series(1001,2000))");
-$until_lsn = $node_standby_1->safe_psql('postgres',
-	"SELECT pg_current_xlog_location();");
-$caughtup_query =
-  "SELECT '$until_lsn'::pg_lsn <= pg_last_xlog_replay_location()";
-$node_standby_2->poll_query_until('postgres', $caughtup_query)
-  or die "Timed out while waiting for standby to catch up";
-
-my $result =
-  $node_standby_2->safe_psql('postgres', "SELECT count(*) FROM tab_int");
-is($result, qq(2000), 'check content of standby 2');
diff --git a/src/test/recovery/t/005_replay_delay.pl b/src/test/recovery/t/005_replay_delay.pl
deleted file mode 100644
index 640295b..0000000
--- a/src/test/recovery/t/005_replay_delay.pl
+++ /dev/null
@@ -1,69 +0,0 @@
-# Checks for recovery_min_apply_delay
-use strict;
-use warnings;
-
-use PostgresNode;
-use TestLib;
-use Test::More tests => 1;
-
-# Initialize master node
-my $node_master = get_new_node('master');
-$node_master->init(allows_streaming => 1);
-$node_master->start;
-
-# And some content
-$node_master->safe_psql('postgres',
-	"CREATE TABLE tab_int AS SELECT generate_series(1, 10) AS a");
-
-# Take backup
-my $backup_name = 'my_backup';
-$node_master->backup($backup_name);
-
-# Create streaming standby from backup
-my $node_standby = get_new_node('standby');
-my $delay        = 3;
-$node_standby->init_from_backup($node_master, $backup_name,
-	has_streaming => 1);
-$node_standby->append_conf(
-	'recovery.conf', qq(
-recovery_min_apply_delay = '${delay}s'
-));
-$node_standby->start;
-
-# Make new content on master and check its presence in standby depending
-# on the delay applied above. Before doing the insertion, get the
-# current timestamp that will be used as a comparison base. Even on slow
-# machines, this allows to have a predictable behavior when comparing the
-# delay between data insertion moment on master and replay time on standby.
-my $master_insert_time = time();
-$node_master->safe_psql('postgres',
-	"INSERT INTO tab_int VALUES (generate_series(11, 20))");
-
-# Now wait for replay to complete on standby. We're done waiting when the
-# slave has replayed up to the previously saved master LSN.
-my $until_lsn =
-  $node_master->safe_psql('postgres', "SELECT pg_current_xlog_location()");
-
-my $remaining = 90;
-while ($remaining-- > 0)
-{
-
-	# Done waiting?
-	my $replay_status = $node_standby->safe_psql('postgres',
-		"SELECT (pg_last_xlog_replay_location() - '$until_lsn'::pg_lsn) >= 0"
-	);
-	last if $replay_status eq 't';
-
-	# No, sleep some more.
-	my $sleep = $master_insert_time + $delay - time();
-	$sleep = 1 if $sleep < 1;
-	sleep $sleep;
-}
-
-die "Maximum number of attempts reached ($remaining remain)"
-  if $remaining < 0;
-
-# This test is successful if and only if the LSN has been applied with at least
-# the configured apply delay.
-ok(time() - $master_insert_time >= $delay,
-	"standby applies WAL only after replication delay");
diff --git a/src/test/recovery/t/006_logical_decoding.pl b/src/test/recovery/t/006_logical_decoding.pl
deleted file mode 100644
index b80a9a9..0000000
--- a/src/test/recovery/t/006_logical_decoding.pl
+++ /dev/null
@@ -1,40 +0,0 @@
-# Testing of logical decoding using SQL interface and/or pg_recvlogical
-use strict;
-use warnings;
-use PostgresNode;
-use TestLib;
-use Test::More tests => 2;
-
-# Initialize master node
-my $node_master = get_new_node('master');
-$node_master->init(allows_streaming => 1);
-$node_master->append_conf(
-		'postgresql.conf', qq(
-max_replication_slots = 4
-wal_level = logical
-));
-$node_master->start;
-my $backup_name = 'master_backup';
-
-$node_master->safe_psql('postgres', qq[CREATE TABLE decoding_test(x integer, y text);]);
-
-$node_master->safe_psql('postgres', qq[SELECT pg_create_logical_replication_slot('test_slot', 'test_decoding');]);
-
-$node_master->safe_psql('postgres', qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(1,10) s;]);
-
-# Basic decoding works
-my($result) = $node_master->safe_psql('postgres', qq[SELECT pg_logical_slot_get_changes('test_slot', NULL, NULL);]);
-is(scalar(split /^/m, $result), 12, 'Decoding produced 12 rows inc BEGIN/COMMIT');
-
-# If we immediately crash the server we might lose the progress we just made
-# and replay the same changes again. But a clean shutdown should never repeat
-# the same changes when we use the SQL decoding interface.
-$node_master->restart('fast');
-
-# There are no new writes, so the result should be empty.
-$result = $node_master->safe_psql('postgres', qq[SELECT pg_logical_slot_get_changes('test_slot', NULL, NULL);]);
-chomp($result);
-is($result, '', 'Decoding after fast restart repeats no rows');
-
-# done with the node
-$node_master->stop;
diff --git a/src/test/recovery/t/007_sync_rep.pl b/src/test/recovery/t/007_sync_rep.pl
deleted file mode 100644
index 0c87226..0000000
--- a/src/test/recovery/t/007_sync_rep.pl
+++ /dev/null
@@ -1,174 +0,0 @@
-# Minimal test testing synchronous replication sync_state transition
-use strict;
-use warnings;
-use PostgresNode;
-use TestLib;
-use Test::More tests => 8;
-
-# Query checking sync_priority and sync_state of each standby
-my $check_sql =
-"SELECT application_name, sync_priority, sync_state FROM pg_stat_replication ORDER BY application_name;";
-
-# Check that sync_state of each standby is expected.
-# If $setting is given, synchronous_standby_names is set to it and
-# the configuration file is reloaded before the test.
-sub test_sync_state
-{
-	my ($self, $expected, $msg, $setting) = @_;
-
-	if (defined($setting))
-	{
-		$self->psql('postgres',
-			"ALTER SYSTEM SET synchronous_standby_names = '$setting';");
-		$self->reload;
-	}
-
-	my $timeout_max = 30;
-	my $timeout     = 0;
-	my $result;
-
-	# A reload may take some time to take effect on busy machines,
-	# hence use a loop with a timeout to give some room for the test
-	# to pass.
-	while ($timeout < $timeout_max)
-	{
-		$result = $self->safe_psql('postgres', $check_sql);
-
-		last if ($result eq $expected);
-
-		$timeout++;
-		sleep 1;
-	}
-
-	is($result, $expected, $msg);
-}
-
-# Initialize master node
-my $node_master = get_new_node('master');
-$node_master->init(allows_streaming => 1);
-$node_master->start;
-my $backup_name = 'master_backup';
-
-# Take backup
-$node_master->backup($backup_name);
-
-# Create standby1 linking to master
-my $node_standby_1 = get_new_node('standby1');
-$node_standby_1->init_from_backup($node_master, $backup_name,
-	has_streaming => 1);
-$node_standby_1->start;
-
-# Create standby2 linking to master
-my $node_standby_2 = get_new_node('standby2');
-$node_standby_2->init_from_backup($node_master, $backup_name,
-	has_streaming => 1);
-$node_standby_2->start;
-
-# Create standby3 linking to master
-my $node_standby_3 = get_new_node('standby3');
-$node_standby_3->init_from_backup($node_master, $backup_name,
-	has_streaming => 1);
-$node_standby_3->start;
-
-# Check that sync_state is determined correctly when
-# synchronous_standby_names is specified in old syntax.
-test_sync_state(
-	$node_master, qq(standby1|1|sync
-standby2|2|potential
-standby3|0|async),
-	'old syntax of synchronous_standby_names',
-	'standby1,standby2');
-
-# Check that all the standbys are considered as either sync or
-# potential when * is specified in synchronous_standby_names.
-# Note that standby1 is chosen as sync standby because
-# it's stored in the head of WalSnd array which manages
-# all the standbys though they have the same priority.
-test_sync_state(
-	$node_master, qq(standby1|1|sync
-standby2|1|potential
-standby3|1|potential),
-	'asterisk in synchronous_standby_names',
-	'*');
-
-# Stop and start standbys to rearrange the order of standbys
-# in WalSnd array. Now, if standbys have the same priority,
-# standby2 is selected preferentially and standby3 is next.
-$node_standby_1->stop;
-$node_standby_2->stop;
-$node_standby_3->stop;
-
-$node_standby_2->start;
-$node_standby_3->start;
-
-# Specify 2 as the number of sync standbys.
-# Check that two standbys are in 'sync' state.
-test_sync_state(
-	$node_master, qq(standby2|2|sync
-standby3|3|sync),
-	'2 synchronous standbys',
-	'2(standby1,standby2,standby3)');
-
-# Start standby1
-$node_standby_1->start;
-
-# Create standby4 linking to master
-my $node_standby_4 = get_new_node('standby4');
-$node_standby_4->init_from_backup($node_master, $backup_name,
-	has_streaming => 1);
-$node_standby_4->start;
-
-# Check that standby1 and standby2 whose names appear earlier in
-# synchronous_standby_names are considered as sync. Also check that
-# standby3 appearing later represents potential, and standby4 is
-# in 'async' state because it's not in the list.
-test_sync_state(
-	$node_master, qq(standby1|1|sync
-standby2|2|sync
-standby3|3|potential
-standby4|0|async),
-	'2 sync, 1 potential, and 1 async');
-
-# Check that sync_state of each standby is determined correctly
-# when num_sync exceeds the number of names of potential sync standbys
-# specified in synchronous_standby_names.
-test_sync_state(
-	$node_master, qq(standby1|0|async
-standby2|4|sync
-standby3|3|sync
-standby4|1|sync),
-	'num_sync exceeds the num of potential sync standbys',
-	'6(standby4,standby0,standby3,standby2)');
-
-# The setting that * comes before another standby name is acceptable
-# but does not make sense in most cases. Check that sync_state is
-# chosen properly even in case of that setting.
-# The priority of standby2 should be 2 because it matches * first.
-test_sync_state(
-	$node_master, qq(standby1|1|sync
-standby2|2|sync
-standby3|2|potential
-standby4|2|potential),
-	'asterisk comes before another standby name',
-	'2(standby1,*,standby2)');
-
-# Check that the setting of '2(*)' chooses standby2 and standby3 that are stored
-# earlier in WalSnd array as sync standbys.
-test_sync_state(
-	$node_master, qq(standby1|1|potential
-standby2|1|sync
-standby3|1|sync
-standby4|1|potential),
-	'multiple standbys having the same priority are chosen as sync',
-	'2(*)');
-
-# Stop Standby3 which is considered in 'sync' state.
-$node_standby_3->stop;
-
-# Check that the state of standby1 stored earlier in WalSnd array than
-# standby4 is transited from potential to sync.
-test_sync_state(
-	$node_master, qq(standby1|1|sync
-standby2|1|sync
-standby4|1|potential),
-	'potential standby found earlier in array is promoted to sync');
diff --git a/src/test/recovery/t/previous/001_stream_rep.pl b/src/test/recovery/t/previous/001_stream_rep.pl
new file mode 100644
index 0000000..fd71095
--- /dev/null
+++ b/src/test/recovery/t/previous/001_stream_rep.pl
@@ -0,0 +1,63 @@
+# Minimal test testing streaming replication
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 4;
+
+# Initialize master node
+my $node_master = get_new_node('master');
+$node_master->init(allows_streaming => 1);
+$node_master->start;
+my $backup_name = 'my_backup';
+
+# Take backup
+$node_master->backup($backup_name);
+
+# Create streaming standby linking to master
+my $node_standby_1 = get_new_node('standby_1');
+$node_standby_1->init_from_backup($node_master, $backup_name,
+	has_streaming => 1);
+$node_standby_1->start;
+
+# Take backup of standby 1 (not mandatory, but useful to check if
+# pg_basebackup works on a standby).
+$node_standby_1->backup($backup_name);
+
+# Create second standby node linking to standby 1
+my $node_standby_2 = get_new_node('standby_2');
+$node_standby_2->init_from_backup($node_standby_1, $backup_name,
+	has_streaming => 1);
+$node_standby_2->start;
+
+# Create some content on master and check its presence in standby 1
+$node_master->safe_psql('postgres',
+	"CREATE TABLE tab_int AS SELECT generate_series(1,1002) AS a");
+
+# Wait for standbys to catch up
+my $applname_1 = $node_standby_1->name;
+my $applname_2 = $node_standby_2->name;
+my $caughtup_query =
+"SELECT pg_current_xlog_location() <= replay_location FROM pg_stat_replication WHERE application_name = '$applname_1';";
+$node_master->poll_query_until('postgres', $caughtup_query)
+  or die "Timed out while waiting for standby 1 to catch up";
+$caughtup_query =
+"SELECT pg_last_xlog_replay_location() <= replay_location FROM pg_stat_replication WHERE application_name = '$applname_2';";
+$node_standby_1->poll_query_until('postgres', $caughtup_query)
+  or die "Timed out while waiting for standby 2 to catch up";
+
+my $result =
+  $node_standby_1->safe_psql('postgres', "SELECT count(*) FROM tab_int");
+print "standby 1: $result\n";
+is($result, qq(1002), 'check streamed content on standby 1');
+
+$result =
+  $node_standby_2->safe_psql('postgres', "SELECT count(*) FROM tab_int");
+print "standby 2: $result\n";
+is($result, qq(1002), 'check streamed content on standby 2');
+
+# Check that only READ-only queries can run on standbys
+is($node_standby_1->psql('postgres', 'INSERT INTO tab_int VALUES (1)'),
+	3, 'read-only queries on standby 1');
+is($node_standby_2->psql('postgres', 'INSERT INTO tab_int VALUES (1)'),
+	3, 'read-only queries on standby 2');
diff --git a/src/test/recovery/t/previous/002_archiving.pl b/src/test/recovery/t/previous/002_archiving.pl
new file mode 100644
index 0000000..fc2bf7e
--- /dev/null
+++ b/src/test/recovery/t/previous/002_archiving.pl
@@ -0,0 +1,53 @@
+# test for archiving with hot standby
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 1;
+use File::Copy;
+
+# Initialize master node, doing archives
+my $node_master = get_new_node('master');
+$node_master->init(
+	has_archiving    => 1,
+	allows_streaming => 1);
+my $backup_name = 'my_backup';
+
+# Start it
+$node_master->start;
+
+# Take backup for slave
+$node_master->backup($backup_name);
+
+# Initialize standby node from backup, fetching WAL from archives
+my $node_standby = get_new_node('standby');
+$node_standby->init_from_backup($node_master, $backup_name,
+	has_restoring => 1);
+$node_standby->append_conf(
+	'postgresql.conf', qq(
+wal_retrieve_retry_interval = '100ms'
+));
+$node_standby->start;
+
+# Create some content on master
+$node_master->safe_psql('postgres',
+	"CREATE TABLE tab_int AS SELECT generate_series(1,1000) AS a");
+my $current_lsn =
+  $node_master->safe_psql('postgres', "SELECT pg_current_xlog_location();");
+
+# Force archiving of WAL file to make it present on master
+$node_master->safe_psql('postgres', "SELECT pg_switch_xlog()");
+
+# Add some more content, it should not be present on standby
+$node_master->safe_psql('postgres',
+	"INSERT INTO tab_int VALUES (generate_series(1001,2000))");
+
+# Wait until necessary replay has been done on standby
+my $caughtup_query =
+  "SELECT '$current_lsn'::pg_lsn <= pg_last_xlog_replay_location()";
+$node_standby->poll_query_until('postgres', $caughtup_query)
+  or die "Timed out while waiting for standby to catch up";
+
+my $result =
+  $node_standby->safe_psql('postgres', "SELECT count(*) FROM tab_int");
+is($result, qq(1000), 'check content from archives');
diff --git a/src/test/recovery/t/previous/003_recovery_targets.pl b/src/test/recovery/t/previous/003_recovery_targets.pl
new file mode 100644
index 0000000..a82545b
--- /dev/null
+++ b/src/test/recovery/t/previous/003_recovery_targets.pl
@@ -0,0 +1,146 @@
+# Test for recovery targets: name, timestamp, XID
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 9;
+
+# Create and test a standby from given backup, with a certain
+# recovery target.
+sub test_recovery_standby
+{
+	my $test_name       = shift;
+	my $node_name       = shift;
+	my $node_master     = shift;
+	my $recovery_params = shift;
+	my $num_rows        = shift;
+	my $until_lsn       = shift;
+
+	my $node_standby = get_new_node($node_name);
+	$node_standby->init_from_backup($node_master, 'my_backup',
+		has_restoring => 1);
+
+	foreach my $param_item (@$recovery_params)
+	{
+		$node_standby->append_conf(
+			'recovery.conf',
+			qq($param_item
+));
+	}
+
+	$node_standby->start;
+
+	# Wait until standby has replayed enough data
+	my $caughtup_query =
+	  "SELECT '$until_lsn'::pg_lsn <= pg_last_xlog_replay_location()";
+	$node_standby->poll_query_until('postgres', $caughtup_query)
+	  or die "Timed out while waiting for standby to catch up";
+
+	# Create some content on master and check its presence in standby
+	my $result =
+	  $node_standby->safe_psql('postgres', "SELECT count(*) FROM tab_int");
+	is($result, qq($num_rows), "check standby content for $test_name");
+
+	# Stop standby node
+	$node_standby->teardown_node;
+}
+
+# Initialize master node
+my $node_master = get_new_node('master');
+$node_master->init(has_archiving => 1, allows_streaming => 1);
+
+# Start it
+$node_master->start;
+
+# Create data before taking the backup, aimed at testing
+# recovery_target = 'immediate'
+$node_master->safe_psql('postgres',
+	"CREATE TABLE tab_int AS SELECT generate_series(1,1000) AS a");
+my $lsn1 =
+  $node_master->safe_psql('postgres', "SELECT pg_current_xlog_location();");
+
+# Take backup from which all operations will be run
+$node_master->backup('my_backup');
+
+# Insert some data with used as a replay reference, with a recovery
+# target TXID.
+$node_master->safe_psql('postgres',
+	"INSERT INTO tab_int VALUES (generate_series(1001,2000))");
+my $ret = $node_master->safe_psql('postgres',
+	"SELECT pg_current_xlog_location(), txid_current();");
+my ($lsn2, $recovery_txid) = split /\|/, $ret;
+
+# More data, with recovery target timestamp
+$node_master->safe_psql('postgres',
+	"INSERT INTO tab_int VALUES (generate_series(2001,3000))");
+$ret = $node_master->safe_psql('postgres',
+	"SELECT pg_current_xlog_location(), now();");
+my ($lsn3, $recovery_time) = split /\|/, $ret;
+
+# Even more data, this time with a recovery target name
+$node_master->safe_psql('postgres',
+	"INSERT INTO tab_int VALUES (generate_series(3001,4000))");
+my $recovery_name = "my_target";
+my $lsn4 =
+  $node_master->safe_psql('postgres', "SELECT pg_current_xlog_location();");
+$node_master->safe_psql('postgres',
+	"SELECT pg_create_restore_point('$recovery_name');");
+
+# And now for a recovery target LSN
+$node_master->safe_psql('postgres',
+	"INSERT INTO tab_int VALUES (generate_series(4001,5000))");
+my $recovery_lsn = $node_master->safe_psql('postgres', "SELECT pg_current_xlog_location()");
+my $lsn5 =
+  $node_master->safe_psql('postgres', "SELECT pg_current_xlog_location();");
+
+$node_master->safe_psql('postgres',
+	"INSERT INTO tab_int VALUES (generate_series(5001,6000))");
+
+# Force archiving of WAL file
+$node_master->safe_psql('postgres', "SELECT pg_switch_xlog()");
+
+# Test recovery targets
+my @recovery_params = ("recovery_target = 'immediate'");
+test_recovery_standby('immediate target',
+	'standby_1', $node_master, \@recovery_params, "1000", $lsn1);
+@recovery_params = ("recovery_target_xid = '$recovery_txid'");
+test_recovery_standby('XID', 'standby_2', $node_master, \@recovery_params,
+	"2000", $lsn2);
+@recovery_params = ("recovery_target_time = '$recovery_time'");
+test_recovery_standby('time', 'standby_3', $node_master, \@recovery_params,
+	"3000", $lsn3);
+@recovery_params = ("recovery_target_name = '$recovery_name'");
+test_recovery_standby('name', 'standby_4', $node_master, \@recovery_params,
+	"4000", $lsn4);
+@recovery_params = ("recovery_target_lsn = '$recovery_lsn'");
+test_recovery_standby('LSN', 'standby_5', $node_master, \@recovery_params,
+	"5000", $lsn5);
+
+# Multiple targets
+# Last entry has priority (note that an array respects the order of items
+# not hashes).
+@recovery_params = (
+	"recovery_target_name = '$recovery_name'",
+	"recovery_target_xid  = '$recovery_txid'",
+	"recovery_target_time = '$recovery_time'");
+test_recovery_standby('name + XID + time',
+	'standby_6', $node_master, \@recovery_params, "3000", $lsn3);
+@recovery_params = (
+	"recovery_target_time = '$recovery_time'",
+	"recovery_target_name = '$recovery_name'",
+	"recovery_target_xid  = '$recovery_txid'");
+test_recovery_standby('time + name + XID',
+	'standby_7', $node_master, \@recovery_params, "2000", $lsn2);
+@recovery_params = (
+	"recovery_target_xid  = '$recovery_txid'",
+	"recovery_target_time = '$recovery_time'",
+	"recovery_target_name = '$recovery_name'");
+test_recovery_standby('XID + time + name',
+	'standby_8', $node_master, \@recovery_params, "4000", $lsn4);
+@recovery_params = (
+	"recovery_target_xid  = '$recovery_txid'",
+	"recovery_target_time = '$recovery_time'",
+	"recovery_target_name = '$recovery_name'",
+	"recovery_target_lsn = '$recovery_lsn'",);
+test_recovery_standby('XID + time + name + LSN',
+	'standby_9', $node_master, \@recovery_params, "5000", $lsn5);
diff --git a/src/test/recovery/t/previous/004_timeline_switch.pl b/src/test/recovery/t/previous/004_timeline_switch.pl
new file mode 100644
index 0000000..3ee8df2
--- /dev/null
+++ b/src/test/recovery/t/previous/004_timeline_switch.pl
@@ -0,0 +1,75 @@
+# Test for timeline switch
+# Ensure that a cascading standby is able to follow a newly-promoted standby
+# on a new timeline.
+use strict;
+use warnings;
+use File::Path qw(rmtree);
+use PostgresNode;
+use TestLib;
+use Test::More tests => 1;
+
+$ENV{PGDATABASE} = 'postgres';
+
+# Initialize master node
+my $node_master = get_new_node('master');
+$node_master->init(allows_streaming => 1);
+$node_master->start;
+
+# Take backup
+my $backup_name = 'my_backup';
+$node_master->backup($backup_name);
+
+# Create two standbys linking to it
+my $node_standby_1 = get_new_node('standby_1');
+$node_standby_1->init_from_backup($node_master, $backup_name,
+	has_streaming => 1);
+$node_standby_1->start;
+my $node_standby_2 = get_new_node('standby_2');
+$node_standby_2->init_from_backup($node_master, $backup_name,
+	has_streaming => 1);
+$node_standby_2->start;
+
+# Create some content on master
+$node_master->safe_psql('postgres',
+	"CREATE TABLE tab_int AS SELECT generate_series(1,1000) AS a");
+my $until_lsn =
+  $node_master->safe_psql('postgres', "SELECT pg_current_xlog_location();");
+
+# Wait until standby has replayed enough data on standby 1
+my $caughtup_query =
+  "SELECT '$until_lsn'::pg_lsn <= pg_last_xlog_replay_location()";
+$node_standby_1->poll_query_until('postgres', $caughtup_query)
+  or die "Timed out while waiting for standby to catch up";
+
+# Stop and remove master, and promote standby 1, switching it to a new timeline
+$node_master->teardown_node;
+$node_standby_1->promote;
+
+# Switch standby 2 to replay from standby 1
+rmtree($node_standby_2->data_dir . '/recovery.conf');
+my $connstr_1 = $node_standby_1->connstr;
+$node_standby_2->append_conf(
+	'recovery.conf', qq(
+primary_conninfo='$connstr_1'
+standby_mode=on
+recovery_target_timeline='latest'
+));
+$node_standby_2->restart;
+
+# Insert some data in standby 1 and check its presence in standby 2
+# to ensure that the timeline switch has been done. Standby 1 needs
+# to exit recovery first before moving on with the test.
+$node_standby_1->poll_query_until('postgres',
+	"SELECT pg_is_in_recovery() <> true");
+$node_standby_1->safe_psql('postgres',
+	"INSERT INTO tab_int VALUES (generate_series(1001,2000))");
+$until_lsn = $node_standby_1->safe_psql('postgres',
+	"SELECT pg_current_xlog_location();");
+$caughtup_query =
+  "SELECT '$until_lsn'::pg_lsn <= pg_last_xlog_replay_location()";
+$node_standby_2->poll_query_until('postgres', $caughtup_query)
+  or die "Timed out while waiting for standby to catch up";
+
+my $result =
+  $node_standby_2->safe_psql('postgres', "SELECT count(*) FROM tab_int");
+is($result, qq(2000), 'check content of standby 2');
diff --git a/src/test/recovery/t/previous/005_replay_delay.pl b/src/test/recovery/t/previous/005_replay_delay.pl
new file mode 100644
index 0000000..640295b
--- /dev/null
+++ b/src/test/recovery/t/previous/005_replay_delay.pl
@@ -0,0 +1,69 @@
+# Checks for recovery_min_apply_delay
+use strict;
+use warnings;
+
+use PostgresNode;
+use TestLib;
+use Test::More tests => 1;
+
+# Initialize master node
+my $node_master = get_new_node('master');
+$node_master->init(allows_streaming => 1);
+$node_master->start;
+
+# And some content
+$node_master->safe_psql('postgres',
+	"CREATE TABLE tab_int AS SELECT generate_series(1, 10) AS a");
+
+# Take backup
+my $backup_name = 'my_backup';
+$node_master->backup($backup_name);
+
+# Create streaming standby from backup
+my $node_standby = get_new_node('standby');
+my $delay        = 3;
+$node_standby->init_from_backup($node_master, $backup_name,
+	has_streaming => 1);
+$node_standby->append_conf(
+	'recovery.conf', qq(
+recovery_min_apply_delay = '${delay}s'
+));
+$node_standby->start;
+
+# Make new content on master and check its presence in standby depending
+# on the delay applied above. Before doing the insertion, get the
+# current timestamp that will be used as a comparison base. Even on slow
+# machines, this allows to have a predictable behavior when comparing the
+# delay between data insertion moment on master and replay time on standby.
+my $master_insert_time = time();
+$node_master->safe_psql('postgres',
+	"INSERT INTO tab_int VALUES (generate_series(11, 20))");
+
+# Now wait for replay to complete on standby. We're done waiting when the
+# slave has replayed up to the previously saved master LSN.
+my $until_lsn =
+  $node_master->safe_psql('postgres', "SELECT pg_current_xlog_location()");
+
+my $remaining = 90;
+while ($remaining-- > 0)
+{
+
+	# Done waiting?
+	my $replay_status = $node_standby->safe_psql('postgres',
+		"SELECT (pg_last_xlog_replay_location() - '$until_lsn'::pg_lsn) >= 0"
+	);
+	last if $replay_status eq 't';
+
+	# No, sleep some more.
+	my $sleep = $master_insert_time + $delay - time();
+	$sleep = 1 if $sleep < 1;
+	sleep $sleep;
+}
+
+die "Maximum number of attempts reached ($remaining remain)"
+  if $remaining < 0;
+
+# This test is successful if and only if the LSN has been applied with at least
+# the configured apply delay.
+ok(time() - $master_insert_time >= $delay,
+	"standby applies WAL only after replication delay");
diff --git a/src/test/recovery/t/previous/006_logical_decoding.pl b/src/test/recovery/t/previous/006_logical_decoding.pl
new file mode 100644
index 0000000..b80a9a9
--- /dev/null
+++ b/src/test/recovery/t/previous/006_logical_decoding.pl
@@ -0,0 +1,40 @@
+# Testing of logical decoding using SQL interface and/or pg_recvlogical
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 2;
+
+# Initialize master node
+my $node_master = get_new_node('master');
+$node_master->init(allows_streaming => 1);
+$node_master->append_conf(
+		'postgresql.conf', qq(
+max_replication_slots = 4
+wal_level = logical
+));
+$node_master->start;
+my $backup_name = 'master_backup';
+
+$node_master->safe_psql('postgres', qq[CREATE TABLE decoding_test(x integer, y text);]);
+
+$node_master->safe_psql('postgres', qq[SELECT pg_create_logical_replication_slot('test_slot', 'test_decoding');]);
+
+$node_master->safe_psql('postgres', qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(1,10) s;]);
+
+# Basic decoding works
+my($result) = $node_master->safe_psql('postgres', qq[SELECT pg_logical_slot_get_changes('test_slot', NULL, NULL);]);
+is(scalar(split /^/m, $result), 12, 'Decoding produced 12 rows inc BEGIN/COMMIT');
+
+# If we immediately crash the server we might lose the progress we just made
+# and replay the same changes again. But a clean shutdown should never repeat
+# the same changes when we use the SQL decoding interface.
+$node_master->restart('fast');
+
+# There are no new writes, so the result should be empty.
+$result = $node_master->safe_psql('postgres', qq[SELECT pg_logical_slot_get_changes('test_slot', NULL, NULL);]);
+chomp($result);
+is($result, '', 'Decoding after fast restart repeats no rows');
+
+# done with the node
+$node_master->stop;
diff --git a/src/test/recovery/t/previous/007_sync_rep.pl b/src/test/recovery/t/previous/007_sync_rep.pl
new file mode 100644
index 0000000..0c87226
--- /dev/null
+++ b/src/test/recovery/t/previous/007_sync_rep.pl
@@ -0,0 +1,174 @@
+# Minimal test testing synchronous replication sync_state transition
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 8;
+
+# Query checking sync_priority and sync_state of each standby
+my $check_sql =
+"SELECT application_name, sync_priority, sync_state FROM pg_stat_replication ORDER BY application_name;";
+
+# Check that sync_state of each standby is expected.
+# If $setting is given, synchronous_standby_names is set to it and
+# the configuration file is reloaded before the test.
+sub test_sync_state
+{
+	my ($self, $expected, $msg, $setting) = @_;
+
+	if (defined($setting))
+	{
+		$self->psql('postgres',
+			"ALTER SYSTEM SET synchronous_standby_names = '$setting';");
+		$self->reload;
+	}
+
+	my $timeout_max = 30;
+	my $timeout     = 0;
+	my $result;
+
+	# A reload may take some time to take effect on busy machines,
+	# hence use a loop with a timeout to give some room for the test
+	# to pass.
+	while ($timeout < $timeout_max)
+	{
+		$result = $self->safe_psql('postgres', $check_sql);
+
+		last if ($result eq $expected);
+
+		$timeout++;
+		sleep 1;
+	}
+
+	is($result, $expected, $msg);
+}
+
+# Initialize master node
+my $node_master = get_new_node('master');
+$node_master->init(allows_streaming => 1);
+$node_master->start;
+my $backup_name = 'master_backup';
+
+# Take backup
+$node_master->backup($backup_name);
+
+# Create standby1 linking to master
+my $node_standby_1 = get_new_node('standby1');
+$node_standby_1->init_from_backup($node_master, $backup_name,
+	has_streaming => 1);
+$node_standby_1->start;
+
+# Create standby2 linking to master
+my $node_standby_2 = get_new_node('standby2');
+$node_standby_2->init_from_backup($node_master, $backup_name,
+	has_streaming => 1);
+$node_standby_2->start;
+
+# Create standby3 linking to master
+my $node_standby_3 = get_new_node('standby3');
+$node_standby_3->init_from_backup($node_master, $backup_name,
+	has_streaming => 1);
+$node_standby_3->start;
+
+# Check that sync_state is determined correctly when
+# synchronous_standby_names is specified in old syntax.
+test_sync_state(
+	$node_master, qq(standby1|1|sync
+standby2|2|potential
+standby3|0|async),
+	'old syntax of synchronous_standby_names',
+	'standby1,standby2');
+
+# Check that all the standbys are considered as either sync or
+# potential when * is specified in synchronous_standby_names.
+# Note that standby1 is chosen as sync standby because
+# it's stored in the head of WalSnd array which manages
+# all the standbys though they have the same priority.
+test_sync_state(
+	$node_master, qq(standby1|1|sync
+standby2|1|potential
+standby3|1|potential),
+	'asterisk in synchronous_standby_names',
+	'*');
+
+# Stop and start standbys to rearrange the order of standbys
+# in WalSnd array. Now, if standbys have the same priority,
+# standby2 is selected preferentially and standby3 is next.
+$node_standby_1->stop;
+$node_standby_2->stop;
+$node_standby_3->stop;
+
+$node_standby_2->start;
+$node_standby_3->start;
+
+# Specify 2 as the number of sync standbys.
+# Check that two standbys are in 'sync' state.
+test_sync_state(
+	$node_master, qq(standby2|2|sync
+standby3|3|sync),
+	'2 synchronous standbys',
+	'2(standby1,standby2,standby3)');
+
+# Start standby1
+$node_standby_1->start;
+
+# Create standby4 linking to master
+my $node_standby_4 = get_new_node('standby4');
+$node_standby_4->init_from_backup($node_master, $backup_name,
+	has_streaming => 1);
+$node_standby_4->start;
+
+# Check that standby1 and standby2 whose names appear earlier in
+# synchronous_standby_names are considered as sync. Also check that
+# standby3 appearing later represents potential, and standby4 is
+# in 'async' state because it's not in the list.
+test_sync_state(
+	$node_master, qq(standby1|1|sync
+standby2|2|sync
+standby3|3|potential
+standby4|0|async),
+	'2 sync, 1 potential, and 1 async');
+
+# Check that sync_state of each standby is determined correctly
+# when num_sync exceeds the number of names of potential sync standbys
+# specified in synchronous_standby_names.
+test_sync_state(
+	$node_master, qq(standby1|0|async
+standby2|4|sync
+standby3|3|sync
+standby4|1|sync),
+	'num_sync exceeds the num of potential sync standbys',
+	'6(standby4,standby0,standby3,standby2)');
+
+# The setting that * comes before another standby name is acceptable
+# but does not make sense in most cases. Check that sync_state is
+# chosen properly even in case of that setting.
+# The priority of standby2 should be 2 because it matches * first.
+test_sync_state(
+	$node_master, qq(standby1|1|sync
+standby2|2|sync
+standby3|2|potential
+standby4|2|potential),
+	'asterisk comes before another standby name',
+	'2(standby1,*,standby2)');
+
+# Check that the setting of '2(*)' chooses standby2 and standby3 that are stored
+# earlier in WalSnd array as sync standbys.
+test_sync_state(
+	$node_master, qq(standby1|1|potential
+standby2|1|sync
+standby3|1|sync
+standby4|1|potential),
+	'multiple standbys having the same priority are chosen as sync',
+	'2(*)');
+
+# Stop Standby3 which is considered in 'sync' state.
+$node_standby_3->stop;
+
+# Check that the state of standby1 stored earlier in WalSnd array than
+# standby4 is transited from potential to sync.
+test_sync_state(
+	$node_master, qq(standby1|1|sync
+standby2|1|sync
+standby4|1|potential),
+	'potential standby found earlier in array is promoted to sync');
-- 
1.9.1

From 71959a336481d4052c070d1ccda8e7ec08e55e7a Mon Sep 17 00:00:00 2001
From: Vladimir Gordiychuk <fol...@gmail.com>
Date: Mon, 19 Sep 2016 01:56:15 +0300
Subject: [PATCH 4/4] Add tests for stop replication protocol

---
 src/test/recovery/t/008_pg_recvlogical.pl | 152 ++++++++++++++++++++++++++++++
 1 file changed, 152 insertions(+)
 create mode 100644 src/test/recovery/t/008_pg_recvlogical.pl

diff --git a/src/test/recovery/t/008_pg_recvlogical.pl b/src/test/recovery/t/008_pg_recvlogical.pl
new file mode 100644
index 0000000..65d3609
--- /dev/null
+++ b/src/test/recovery/t/008_pg_recvlogical.pl
@@ -0,0 +1,152 @@
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 2;
+use IPC::Run qw( start timeout ) ;
+
+
+my $verbose = 1;
+
+# Launch pg_receivexlog as a background proc and return the IPC::Run handle for it
+# as well as the proc's
+sub start_pg_receivexlog
+{
+    my ($node, $slotname, %params) = @_;
+    my $stdout = my $stderr = '';
+    my $timeout           = undef;
+    my $timeout_exception = 'pg_receivexlog timed out';
+
+    $timeout =
+        IPC::Run::timeout($params{timeout}, exception => $timeout_exception)
+        if (defined($params{timeout}));
+
+    my @cmd = ("pg_recvlogical", "--verbose", "-S", "$slotname", "--no-loop", "--dbname", $node->connstr, "--start", "-f", "-");
+
+    push @cmd, @{ $params{option} }
+        if defined $params{option};
+
+    diag "Running '@cmd'" if $verbose;
+
+    my $proc = start \@cmd, '<', \undef, '2>', \$stderr, '>', \$stdout, $timeout;
+
+    die $! unless defined($proc);
+
+    sleep 5;
+
+    if ($stdout ne "")
+    {
+        diag "#### Begin standard out\n" if $verbose;
+        diag $stdout if $verbose;
+        diag "\n#### End standard out\n" if $verbose;
+    }
+
+    if ($stderr ne "")
+    {
+        diag "#### Begin standard error\n" if $verbose;
+        diag $stderr if $verbose;
+        diag "\n#### End standard error\n" if $verbose;
+    }
+
+    return $proc;
+}
+
+sub wait_for_start_streaming
+{
+    my ($node, $slotname) = @_;
+
+    diag "waiting for " . $node->name . " start streaming by slot ".$slotname if $verbose;
+    $node->poll_query_until('postgres', "select active from pg_replication_slots where slot_name = '$slotname';");
+}
+
+sub wait_for_stop_streaming
+{
+    my ($node, $slotname) = @_;
+
+    diag "waiting for " . $node->name . " streaming by slot ".$slotname." will be stopped" if $verbose;
+    $node->poll_query_until('postgres', "select not(active) from pg_replication_slots where slot_name = '$slotname';");
+}
+
+sub create_logical_replication_slot
+{
+    my ($node, $slotname, $outplugin) = @_;
+
+    $node->safe_psql(
+        "postgres",
+        "select pg_drop_replication_slot('$slotname') where exists (select 1 from pg_replication_slots where slot_name = '$slotname');");
+
+    $node->safe_psql('postgres',
+        "SELECT pg_create_logical_replication_slot('$slotname', '$outplugin');"
+    );
+}
+
+my ($proc);
+
+# Initialize master node
+my $node_master = get_new_node('master');
+$node_master->init(allows_streaming => 1, has_archiving => 1);
+$node_master->append_conf('postgresql.conf', "wal_level = 'logical'\n");
+$node_master->append_conf('postgresql.conf', "max_replication_slots = 12\n");
+$node_master->append_conf('postgresql.conf', "max_wal_senders = 12\n");
+$node_master->append_conf('postgresql.conf', "max_connections = 20\n");
+$node_master->dump_info;
+$node_master->start;
+
+
+#TestCase 1: client initialize stop logical replication when database doesn't have new changes(calm state)
+{
+    my $slotname = 'calm_state_slot';
+
+    create_logical_replication_slot($node_master, $slotname, "test_decoding");
+    $proc = start_pg_receivexlog(
+        $node_master,
+        $slotname,
+        timeout => 60,
+        extra_params => ['-o include-xids=false', '-o skip-empty-xacts=true']
+    );
+
+    wait_for_start_streaming($node_master, $slotname);
+
+    my $cancelTime = time();
+    $proc->signal("SIGINT");
+
+    wait_for_stop_streaming($node_master, $slotname);
+
+    ok((time() - $cancelTime) <= 1,
+        "on claim database logical replication stop as fast as possible after receive CopyDone from client"
+    );
+}
+
+
+#TestCase 2: client initialize stop logical replication during decode huge transaction(insert 200000 records)
+{
+    my $slotname = 'huge_tx_state_slot';
+
+    create_logical_replication_slot($node_master, $slotname, "test_decoding");
+
+    $node_master->safe_psql('postgres',
+        "create table test_logic_table(pk serial primary key, name varchar(100));");
+
+    diag 'Insert huge abount data to table test_logic_table' if $verbose;
+    $node_master->safe_psql('postgres',
+        "insert into test_logic_table select id, md5(random()::text) as name from generate_series(1, 200000) as id;");
+
+    $proc = start_pg_receivexlog(
+        $node_master,
+        $slotname,
+        timeout => 30,
+        extra_params => ['-o include-xids=false', '-o skip-empty-xacts=true']
+    );
+
+    wait_for_start_streaming($node_master, $slotname);
+
+    my $cancelTime = time();
+    $proc->signal("SIGINT");
+
+    wait_for_stop_streaming($node_master, $slotname);
+
+    my $spendTime = time() - $cancelTime;
+    ok($spendTime <= 1,
+        "during decode huge transaction client can initialize CopyDone and database should stop replication as fast as possible without continue deconding huge transaction, spend time $spendTime"
+    );
+}
\ No newline at end of file
-- 
1.9.1

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to