From 4226b471e1f88f0e5c55715c46671baf13d8c42d Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Mon, 17 Apr 2017 10:44:27 +0900
Subject: [PATCH 1/2] Add a GUC parameter apply_worker_timeout.

Terminate replication connections that are inactive longer than
the specified number of milliseconds.
---
 doc/src/sgml/config.sgml                      | 31 +++++++++++++++++++++++----
 src/backend/replication/logical/worker.c      | 11 ++++++----
 src/backend/utils/misc/guc.c                  | 12 +++++++++++
 src/backend/utils/misc/postgresql.conf.sample |  2 ++
 src/include/replication/logicalworker.h       |  2 ++
 5 files changed, 50 insertions(+), 8 deletions(-)

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 744c5e8..f584431 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -3421,9 +3421,8 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="
      </para>
 
      <para>
-      Note that <varname>wal_receiver_timeout</varname> and
-      <varname>wal_retrieve_retry_interval</varname> configuration parameters
-      affect the logical replication workers as well.
+      Note that <varname>wal_retrieve_retry_interval</varname> configuration parameters
+      affects the logical replication workers as well.
      </para>
 
      <variablelist>
@@ -3474,7 +3473,31 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="
       </listitem>
      </varlistentry>
 
-     </variablelist>
+     <varlistentry id="guc-apply-worker-timeout" xreflabel="apply_worker_timeout">
+      <term><varname>apply_worker_timeout</varname> (<type>integer</type>)        
+      <indexterm>                                                                 
+       <primary><varname>apply_worker_timeout</> configuration parameter</primary>
+      </indexterm>                                                                
+      </term>                                                                     
+      <listitem>                                                                  
+       <para>                                                                     
+        Terminate replication connections that are inactive longer                
+        than the specified number of milliseconds. This is useful for             
+        the receiving subscriber to detect a publisher node crash or network      
+        outage.                                                                   
+       </para>
+       <para>
+        A value of zero disables the timeout mechanism.  This parameter           
+        can only be set in                                                        
+        the <filename>postgresql.conf</> file or on the server command line.
+       </para>
+       <para>
+        The default value is 60 seconds.                                          
+       </para>                                                                    
+      </listitem>                                                                 
+     </varlistentry>                                                              
+
+    </variablelist>
     </sect2>
 
    </sect1>
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 656d399..8d65837 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -86,6 +86,9 @@
 
 #define NAPTIME_PER_CYCLE 1000	/* max sleep time between cycles (1s) */
 
+/* GUC variable */
+int	apply_worker_timeout;
+
 typedef struct FlushPosition
 {
 	dlist_node node;
@@ -1150,7 +1153,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 			/*
 			 * We didn't receive anything new. If we haven't heard
 			 * anything from the server for more than
-			 * wal_receiver_timeout / 2, ping the server. Also, if
+			 * apply_worker_timeout / 2, ping the server. Also, if
 			 * it's been longer than wal_receiver_status_interval
 			 * since the last update we sent, send a status update to
 			 * the master anyway, to report any progress in applying
@@ -1162,14 +1165,14 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 			 * Check if time since last receive from standby has
 			 * reached the configured limit.
 			 */
-			if (wal_receiver_timeout > 0)
+			if (apply_worker_timeout > 0)
 			{
 				TimestampTz now = GetCurrentTimestamp();
 				TimestampTz timeout;
 
 				timeout =
 					TimestampTzPlusMilliseconds(last_recv_timestamp,
-												wal_receiver_timeout);
+												apply_worker_timeout);
 
 				if (now >= timeout)
 					ereport(ERROR,
@@ -1182,7 +1185,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 				if (!ping_sent)
 				{
 					timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
-														  (wal_receiver_timeout / 2));
+														  (apply_worker_timeout / 2));
 					if (now >= timeout)
 					{
 						requestReply = true;
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 9ad8361..378bad2 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -63,6 +63,7 @@
 #include "postmaster/syslogger.h"
 #include "postmaster/walwriter.h"
 #include "replication/logicallauncher.h"
+#include "replication/logicalworker.h"
 #include "replication/slot.h"
 #include "replication/syncrep.h"
 #include "replication/walreceiver.h"
@@ -1828,6 +1829,17 @@ static struct config_int ConfigureNamesInt[] =
 	},
 
 	{
+		{"apply_worker_timeout", PGC_SIGHUP, REPLICATION_SUBSCRIBERS,
+			gettext_noop("Sets the maximum wait time to receive data from the publisher."),
+			NULL,
+			GUC_UNIT_MS
+		},
+		&apply_worker_timeout,
+		60 * 1000, 0, INT_MAX,
+		NULL, NULL, NULL
+	},
+
+	{
 		{"max_connections", PGC_POSTMASTER, CONN_AUTH_SETTINGS,
 			gettext_noop("Sets the maximum number of concurrent connections."),
 			NULL
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 1435d92..20b5bb7 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -278,6 +278,8 @@
 
 #max_logical_replication_workers = 4	# taken from max_worker_processes
 #max_sync_workers_per_subscription = 2	# taken from max_logical_replication_workers
+#apply_worker_timeout = 60s		# time that apply worker waits for
+					# communication from publisher#
 
 
 #------------------------------------------------------------------------------
diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h
index 3e0affa..6395850 100644
--- a/src/include/replication/logicalworker.h
+++ b/src/include/replication/logicalworker.h
@@ -12,6 +12,8 @@
 #ifndef LOGICALWORKER_H
 #define LOGICALWORKER_H
 
+extern int apply_worker_timeout;
+
 extern void ApplyWorkerMain(Datum main_arg);
 
 #endif   /* LOGICALWORKER_H */
-- 
2.8.1

