On 24.03.2011 15:24, Fujii Masao wrote:
On Wed, Mar 23, 2011 at 7:33 PM, Heikki Linnakangas
<heikki.linnakan...@enterprisedb.com>  wrote:
I don't much like the API for this. Walsender shouldn't need to know about
the details of the FE/BE protocol, pq_putbytes_if_available() seems too low
level to be useful.

I think a better API would be to have a non-blocking version of
pq_putmessage(). We can make the output buffer in pqcomm.c resizeable, so
that when the message doesn't fit in the output buffer in pq_putmessage(),
the buffer is enlarged instead of trying to flush it.

Attached is a patch using that approach. This is a much smaller patch, and
easier to understand.

Agreed. Thanks for improving the patch.

pq_flush_if_writable() calls internal_flush() without using PG_TRY block.
This seems unsafe because for example pgwin32_waitforsinglesocket()
called by secure_write() can throw ERROR.

Perhaps it's time to give up on the assumption that the socket is in blocking mode except within those two functions. Attached patch adds the pq_set_nonblocking() function from your patch, and adds calls to it before all secure_read/write operations to put the socket in the right mode. There's only a few of those operations.

Should we use COMMERROR instead of ERROR if we fail to put the socket in the right mode?

--
  Heikki Linnakangas
  EnterpriseDB   http://www.enterprisedb.com
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index e0ebee6..3192ef7 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -2019,6 +2019,28 @@ SET ENABLE_SEQSCAN TO OFF;
        </para>
       </listitem>
      </varlistentry>
+
+     <varlistentry id="guc-replication-timeout" xreflabel="replication_timeout">
+      <term><varname>replication_timeout</varname> (<type>integer</type>)</term>
+      <indexterm>
+       <primary><varname>replication_timeout</> configuration parameter</primary>
+      </indexterm>
+      <listitem>
+       <para>
+        Specifies the maximum time, in milliseconds, to wait for the reply
+        from the standby before terminating replication.  This is useful for
+        the primary server to detect the standby crash or network outage.
+        A value of zero turns this off.  This parameter can only be set in
+        the <filename>postgresql.conf</> file or on the server command line.
+        The default value is 60 seconds.
+       </para>
+       <para>
+        To make the timeout work properly, <xref linkend="guc-wal-receiver-status-interval">
+        must be enabled on the standby, and its value must be less than the
+        value of <varname>replication_timeout</>.
+       </para>
+      </listitem>
+     </varlistentry>
      </variablelist>
     </sect2>
 
@@ -2216,6 +2238,11 @@ SET ENABLE_SEQSCAN TO OFF;
        the <filename>postgresql.conf</> file or on the server command line.
        The default value is 10 seconds.
       </para>
+      <para>
+       When <xref linkend="guc-replication-timeout"> is enabled on the primary,
+       <varname>wal_receiver_status_interval</> must be enabled, and its value
+       must be less than the value of <varname>replication_timeout</>.
+      </para>
       </listitem>
      </varlistentry>
 
diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c
index 3c7b05b..db313a8 100644
--- a/src/backend/libpq/pqcomm.c
+++ b/src/backend/libpq/pqcomm.c
@@ -55,10 +55,12 @@
  *		pq_peekbyte		- peek at next byte from connection
  *		pq_putbytes		- send bytes to connection (not flushed until pq_flush)
  *		pq_flush		- flush pending output
+ *		pq_flush_if_writable - flush pending output if writable without blocking
  *		pq_getbyte_if_available - get a byte if available without blocking
  *
  * message-level I/O (and old-style-COPY-OUT cruft):
  *		pq_putmessage	- send a normal message (suppressed in COPY OUT mode)
+ *		pq_putmessage_noblock - buffer a normal message without blocking (suppressed in COPY OUT mode)
  *		pq_startcopyout - inform libpq that a COPY OUT transfer is beginning
  *		pq_endcopyout	- end a COPY OUT transfer
  *
@@ -92,6 +94,7 @@
 #include "miscadmin.h"
 #include "storage/ipc.h"
 #include "utils/guc.h"
+#include "utils/memutils.h"
 
 /*
  * Configuration options
@@ -105,15 +108,21 @@ static char sock_path[MAXPGPATH];
 
 
 /*
- * Buffers for low-level I/O
+ * Buffers for low-level I/O.
+ *
+ * The receive buffer is fixed size. Send buffer is usually 8k, but can be
+ * enlarged by pq_putmessage_noblock() if the message doesn't fit otherwise.
  */
 
-#define PQ_BUFFER_SIZE 8192
+#define PQ_SEND_BUFFER_SIZE 8192
+#define PQ_RECV_BUFFER_SIZE 8192
 
-static char PqSendBuffer[PQ_BUFFER_SIZE];
+static char *PqSendBuffer;
+static int	PqSendBufferSize;	/* Size send buffer */
 static int	PqSendPointer;		/* Next index to store a byte in PqSendBuffer */
+static int	PqSendStart;		/* Next index to send a byte in PqSendBuffer */
 
-static char PqRecvBuffer[PQ_BUFFER_SIZE];
+static char PqRecvBuffer[PQ_RECV_BUFFER_SIZE];
 static int	PqRecvPointer;		/* Next index to read a byte from PqRecvBuffer */
 static int	PqRecvLength;		/* End of data available in PqRecvBuffer */
 
@@ -128,6 +137,7 @@ static bool DoingCopyOut;
 static void pq_close(int code, Datum arg);
 static int	internal_putbytes(const char *s, size_t len);
 static int	internal_flush(void);
+static void pq_set_nonblocking(bool nonblocking);
 
 #ifdef HAVE_UNIX_SOCKETS
 static int	Lock_AF_UNIX(unsigned short portNumber, char *unixSocketName);
@@ -142,7 +152,9 @@ static int	Setup_AF_UNIX(void);
 void
 pq_init(void)
 {
-	PqSendPointer = PqRecvPointer = PqRecvLength = 0;
+	PqSendBufferSize = PQ_SEND_BUFFER_SIZE;
+	PqSendBuffer = MemoryContextAlloc(TopMemoryContext, PqSendBufferSize);
+	PqSendPointer = PqSendStart = PqRecvPointer = PqRecvLength = 0;
 	PqCommBusy = false;
 	DoingCopyOut = false;
 	on_proc_exit(pq_close, 0);
@@ -194,6 +206,7 @@ pq_close(int code, Datum arg)
 #endif   /* ENABLE_GSS || ENABLE_SSPI */
 
 		/* Cleanly shut down SSL layer */
+		pq_set_nonblocking(false); /* XXX: Is this required? */
 		secure_close(MyProcPort);
 
 		/*
@@ -732,6 +745,37 @@ TouchSocketFile(void)
  * --------------------------------
  */
 
+/* --------------------------------
+ *            pq_set_nonblocking - set socket blocking/non-blocking
+ *
+ * Sets the socket non-blocking if nonblocking is TRUE, or sets it
+ * blocking otherwise.
+ * --------------------------------
+ */
+static void
+pq_set_nonblocking(bool nonblocking)
+{
+	if (MyProcPort->noblock == nonblocking)
+		return;
+
+#ifdef WIN32
+	pgwin32_noblock = nonblocking ? 1 : 0;
+#else
+	if (nonblocking)
+	{
+		if (!pg_set_noblock(MyProcPort->sock))
+			ereport(ERROR,
+					(errmsg("could not set socket to non-blocking mode: %m")));
+	}
+	else
+	{
+		if (!pg_set_block(MyProcPort->sock))
+			ereport(ERROR,
+					(errmsg("could not set socket to blocking mode: %m")));
+	}
+#endif
+	MyProcPort->noblock = nonblocking;
+}
 
 /* --------------------------------
  *		pq_recvbuf - load some bytes into the input buffer
@@ -756,13 +800,15 @@ pq_recvbuf(void)
 			PqRecvLength = PqRecvPointer = 0;
 	}
 
+	pq_set_nonblocking(false);
+
 	/* Can fill buffer from PqRecvLength and upwards */
 	for (;;)
 	{
 		int			r;
 
 		r = secure_read(MyProcPort, PqRecvBuffer + PqRecvLength,
-						PQ_BUFFER_SIZE - PqRecvLength);
+						PQ_RECV_BUFFER_SIZE - PqRecvLength);
 
 		if (r < 0)
 		{
@@ -825,7 +871,6 @@ pq_peekbyte(void)
 	return (unsigned char) PqRecvBuffer[PqRecvPointer];
 }
 
-
 /* --------------------------------
  *		pq_getbyte_if_available - get a single byte from connection,
  *			if available
@@ -845,72 +890,38 @@ pq_getbyte_if_available(unsigned char *c)
 		return 1;
 	}
 
-	/* Temporarily put the socket into non-blocking mode */
-#ifdef WIN32
-	pgwin32_noblock = 1;
-#else
-	if (!pg_set_noblock(MyProcPort->sock))
-		ereport(ERROR,
-				(errmsg("could not set socket to non-blocking mode: %m")));
-#endif
-	MyProcPort->noblock = true;
-	PG_TRY();
+	/* Put the socket into non-blocking mode */
+	pq_set_nonblocking(true);
+
+	r = secure_read(MyProcPort, c, 1);
+	if (r < 0)
 	{
-		r = secure_read(MyProcPort, c, 1);
-		if (r < 0)
+		/*
+		 * Ok if no data available without blocking or interrupted (though
+		 * EINTR really shouldn't happen with a non-blocking socket).
+		 * Report other errors.
+		 */
+		if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
+			r = 0;
+		else
 		{
 			/*
-			 * Ok if no data available without blocking or interrupted (though
-			 * EINTR really shouldn't happen with a non-blocking socket).
-			 * Report other errors.
+			 * Careful: an ereport() that tries to write to the client
+			 * would cause recursion to here, leading to stack overflow
+			 * and core dump!  This message must go *only* to the
+			 * postmaster log.
 			 */
-			if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
-				r = 0;
-			else
-			{
-				/*
-				 * Careful: an ereport() that tries to write to the client
-				 * would cause recursion to here, leading to stack overflow
-				 * and core dump!  This message must go *only* to the
-				 * postmaster log.
-				 */
-				ereport(COMMERROR,
-						(errcode_for_socket_access(),
-						 errmsg("could not receive data from client: %m")));
-				r = EOF;
-			}
-		}
-		else if (r == 0)
-		{
-			/* EOF detected */
+			ereport(COMMERROR,
+					(errcode_for_socket_access(),
+					 errmsg("could not receive data from client: %m")));
 			r = EOF;
 		}
 	}
-	PG_CATCH();
+	else if (r == 0)
 	{
-		/*
-		 * The rest of the backend code assumes the socket is in blocking
-		 * mode, so treat failure as FATAL.
-		 */
-#ifdef WIN32
-		pgwin32_noblock = 0;
-#else
-		if (!pg_set_block(MyProcPort->sock))
-			ereport(FATAL,
-					(errmsg("could not set socket to blocking mode: %m")));
-#endif
-		MyProcPort->noblock = false;
-		PG_RE_THROW();
+		/* EOF detected */
+		r = EOF;
 	}
-	PG_END_TRY();
-#ifdef WIN32
-	pgwin32_noblock = 0;
-#else
-	if (!pg_set_block(MyProcPort->sock))
-		ereport(FATAL,
-				(errmsg("could not set socket to blocking mode: %m")));
-#endif
-	MyProcPort->noblock = false;
 
 	return r;
 }
@@ -1138,10 +1149,13 @@ internal_putbytes(const char *s, size_t len)
 	while (len > 0)
 	{
 		/* If buffer is full, then flush it out */
-		if (PqSendPointer >= PQ_BUFFER_SIZE)
+		if (PqSendPointer >= PqSendBufferSize)
+		{
+			pq_set_nonblocking(false);
 			if (internal_flush())
 				return EOF;
-		amount = PQ_BUFFER_SIZE - PqSendPointer;
+		}
+		amount = PqSendBufferSize - PqSendPointer;
 		if (amount > len)
 			amount = len;
 		memcpy(PqSendBuffer + PqSendPointer, s, amount);
@@ -1167,17 +1181,25 @@ pq_flush(void)
 	if (PqCommBusy)
 		return 0;
 	PqCommBusy = true;
+	pq_set_nonblocking(false);
 	res = internal_flush();
 	PqCommBusy = false;
 	return res;
 }
 
+/* --------------------------------
+ *		internal_flush - flush pending output
+ *
+ * Returns 0 if OK (meaning everything was sent, or operation would block
+ * and the socket is in non-blocking mode), or EOF if trouble.
+ * --------------------------------
+ */
 static int
 internal_flush(void)
 {
 	static int	last_reported_send_errno = 0;
 
-	char	   *bufptr = PqSendBuffer;
+	char	   *bufptr = PqSendBuffer + PqSendStart;
 	char	   *bufend = PqSendBuffer + PqSendPointer;
 
 	while (bufptr < bufend)
@@ -1192,6 +1214,16 @@ internal_flush(void)
 				continue;		/* Ok if we were interrupted */
 
 			/*
+			 * Ok if no data writable without blocking, and the socket
+			 * is in non-blocking mode.
+			 */
+			if (errno == EAGAIN ||
+				errno == EWOULDBLOCK)
+			{
+				return 0;
+			}
+
+			/*
 			 * Careful: an ereport() that tries to write to the client would
 			 * cause recursion to here, leading to stack overflow and core
 			 * dump!  This message must go *only* to the postmaster log.
@@ -1212,18 +1244,56 @@ internal_flush(void)
 			 * We drop the buffered data anyway so that processing can
 			 * continue, even though we'll probably quit soon.
 			 */
-			PqSendPointer = 0;
+			PqSendStart = PqSendPointer = 0;
 			return EOF;
 		}
 
 		last_reported_send_errno = 0;	/* reset after any successful send */
 		bufptr += r;
+		PqSendStart += r;
 	}
 
-	PqSendPointer = 0;
+	PqSendStart = PqSendPointer = 0;
 	return 0;
 }
 
+/* --------------------------------
+ *		pq_flush_if_writable - flush pending output if writable without blocking
+ *
+ * Returns 0 if OK, or EOF if trouble.
+ * --------------------------------
+ */
+int
+pq_flush_if_writable(void)
+{
+	int			res;
+
+	/* Quick exit if nothing to do */
+	if (PqSendPointer == PqSendStart)
+		return 0;
+
+	/* No-op if reentrant call */
+	if (PqCommBusy)
+		return 0;
+
+	/* Temporarily put the socket into non-blocking mode */
+	pq_set_nonblocking(true);
+
+	PqCommBusy = true;
+	res = internal_flush();
+	PqCommBusy = false;
+	return res;
+}
+
+/* --------------------------------
+ *		pq_is_send_pending	- is there any pending data in the output buffer?
+ * --------------------------------
+ */
+bool
+pq_is_send_pending(void)
+{
+	return (PqSendStart < PqSendPointer);
+}
 
 /* --------------------------------
  * Message-level I/O routines begin here.
@@ -1286,6 +1356,25 @@ fail:
 }
 
 /* --------------------------------
+ *		pq_putmessage_noblock	- like pq_putmessage, but never blocks
+ *
+ *		If the output buffer is too small to hold the message, the buffer
+ *		is enlarged.
+ */
+int
+pq_putmessage_noblock(char msgtype, const char *s, size_t len)
+{
+	int required = PqSendPointer + len + 5 ;
+	if (required > PqSendBufferSize)
+	{
+		PqSendBuffer = repalloc(PqSendBuffer, required);
+		PqSendBufferSize = required;
+	}
+	return pq_putmessage(msgtype, s, len);
+}
+
+
+/* --------------------------------
  *		pq_startcopyout - inform libpq that an old-style COPY OUT transfer
  *			is beginning
  * --------------------------------
diff --git a/src/backend/port/unix_latch.c b/src/backend/port/unix_latch.c
index a4f559e..32d0cb5 100644
--- a/src/backend/port/unix_latch.c
+++ b/src/backend/port/unix_latch.c
@@ -193,19 +193,21 @@ DisownLatch(volatile Latch *latch)
 bool
 WaitLatch(volatile Latch *latch, long timeout)
 {
-	return WaitLatchOrSocket(latch, PGINVALID_SOCKET, timeout) > 0;
+	return WaitLatchOrSocket(latch, PGINVALID_SOCKET, false, false, timeout) > 0;
 }
 
 /*
  * Like WaitLatch, but will also return when there's data available in
- * 'sock' for reading. Returns 0 if timeout was reached, 1 if the latch
- * was set, or 2 if the scoket became readable.
+ * 'sock' for reading or writing. Returns 0 if timeout was reached,
+ * 1 if the latch was set, 2 if the socket became readable or writable.
  */
 int
-WaitLatchOrSocket(volatile Latch *latch, pgsocket sock, long timeout)
+WaitLatchOrSocket(volatile Latch *latch, pgsocket sock, bool forRead,
+				  bool forWrite, long timeout)
 {
 	struct timeval tv, *tvp = NULL;
 	fd_set		input_mask;
+	fd_set		output_mask;
 	int			rc;
 	int			result = 0;
 
@@ -241,14 +243,22 @@ WaitLatchOrSocket(volatile Latch *latch, pgsocket sock, long timeout)
 		FD_ZERO(&input_mask);
 		FD_SET(selfpipe_readfd, &input_mask);
 		hifd = selfpipe_readfd;
-		if (sock != PGINVALID_SOCKET)
+		if (sock != PGINVALID_SOCKET && forRead)
 		{
 			FD_SET(sock, &input_mask);
 			if (sock > hifd)
 				hifd = sock;
 		}
 
-		rc = select(hifd + 1, &input_mask, NULL, NULL, tvp);
+		FD_ZERO(&output_mask);
+		if (sock != PGINVALID_SOCKET && forWrite)
+		{
+			FD_SET(sock, &output_mask);
+			if (sock > hifd)
+				hifd = sock;
+		}
+
+		rc = select(hifd + 1, &input_mask, &output_mask, NULL, tvp);
 		if (rc < 0)
 		{
 			if (errno == EINTR)
@@ -263,7 +273,9 @@ WaitLatchOrSocket(volatile Latch *latch, pgsocket sock, long timeout)
 			result = 0;
 			break;
 		}
-		if (sock != PGINVALID_SOCKET && FD_ISSET(sock, &input_mask))
+		if (sock != PGINVALID_SOCKET &&
+			((forRead && FD_ISSET(sock, &input_mask)) ||
+			 (forWrite && FD_ISSET(sock, &output_mask))))
 		{
 			result = 2;
 			break;		/* data available in socket */
diff --git a/src/backend/port/win32/socket.c b/src/backend/port/win32/socket.c
index 76dd6be..dbbd4a3 100644
--- a/src/backend/port/win32/socket.c
+++ b/src/backend/port/win32/socket.c
@@ -14,7 +14,8 @@
 #include "postgres.h"
 
 /*
- * Indicate if pgwin32_recv() should operate in non-blocking mode.
+ * Indicate if pgwin32_recv() and pgwin32_send() should operate
+ * in non-blocking mode.
  *
  * Since the socket emulation layer always sets the actual socket to
  * non-blocking mode in order to be able to deliver signals, we must
@@ -399,6 +400,16 @@ pgwin32_send(SOCKET s, char *buf, int len, int flags)
 			return -1;
 		}
 
+		if (pgwin32_noblock)
+		{
+			/*
+			 * No data sent, and we are in "emulated non-blocking mode", so
+			 * return indicating that we'd block if we were to continue.
+			 */
+			errno = EWOULDBLOCK;
+			return -1;
+		}
+
 		/* No error, zero bytes (win2000+) or error+WSAEWOULDBLOCK (<=nt4) */
 
 		if (pgwin32_waitforsinglesocket(s, FD_WRITE | FD_CLOSE, INFINITE) == 0)
diff --git a/src/backend/port/win32_latch.c b/src/backend/port/win32_latch.c
index ac20c49..f42cfef 100644
--- a/src/backend/port/win32_latch.c
+++ b/src/backend/port/win32_latch.c
@@ -85,11 +85,12 @@ DisownLatch(volatile Latch *latch)
 bool
 WaitLatch(volatile Latch *latch, long timeout)
 {
-	return WaitLatchOrSocket(latch, PGINVALID_SOCKET, timeout) > 0;
+	return WaitLatchOrSocket(latch, PGINVALID_SOCKET, false, false, timeout) > 0;
 }
 
 int
-WaitLatchOrSocket(volatile Latch *latch, SOCKET sock, long timeout)
+WaitLatchOrSocket(volatile Latch *latch, SOCKET sock, bool forRead,
+				  bool forWrite, long timeout)
 {
 	DWORD		rc;
 	HANDLE		events[3];
@@ -103,10 +104,17 @@ WaitLatchOrSocket(volatile Latch *latch, SOCKET sock, long timeout)
 	events[0] = latchevent;
 	events[1] = pgwin32_signal_event;
 	numevents = 2;
-	if (sock != PGINVALID_SOCKET)
+	if (sock != PGINVALID_SOCKET && (forRead || forWrite))
 	{
+		int		flags = 0;
+
+		if (forRead)
+			flags |= FD_READ;
+		if (forWrite)
+			flags |= FD_WRITE;
+
 		sockevent = WSACreateEvent();
-		WSAEventSelect(sock, sockevent, FD_READ);
+		WSAEventSelect(sock, sockevent, flags);
 		events[numevents++] = sockevent;
 	}
 
@@ -139,8 +147,18 @@ WaitLatchOrSocket(volatile Latch *latch, SOCKET sock, long timeout)
 			pgwin32_dispatch_queued_signals();
 		else if (rc == WAIT_OBJECT_0 + 2)
 		{
+			WSANETWORKEVENTS resEvents;
+
 			Assert(sock != PGINVALID_SOCKET);
-			result = 2;
+
+			ZeroMemory(&resEvents, sizeof(resEvents));
+			if (WSAEnumNetworkEvents(sock, sockevent, &resEvents) == SOCKET_ERROR)
+				ereport(FATAL,
+						(errmsg_internal("failed to enumerate network events: %i", (int) GetLastError())));
+
+			if ((forRead && resEvents.lNetworkEvents & FD_READ) ||
+				(forWrite && resEvents.lNetworkEvents & FD_WRITE))
+				result = 2;
 			break;
 		}
 		else if (rc != WAIT_OBJECT_0)
@@ -148,7 +166,7 @@ WaitLatchOrSocket(volatile Latch *latch, SOCKET sock, long timeout)
 	}
 
 	/* Clean up the handle we created for the socket */
-		if (sock != PGINVALID_SOCKET)
+	if (sock != PGINVALID_SOCKET && (forRead || forWrite))
 	{
 		WSAEventSelect(sock, sockevent, 0);
 		WSACloseEvent(sockevent);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index f76b5b0..36406d2 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -74,6 +74,7 @@ bool		am_walsender = false;		/* Am I a walsender process ? */
 /* User-settable parameters for walsender */
 int			max_wal_senders = 0;	/* the maximum number of concurrent walsenders */
 int			WalSndDelay = 1000;	/* max sleep time between some actions */
+int			replication_timeout = 60 * 1000;	/* maximum time to send one WAL data message */
 
 /*
  * These variables are used similarly to openLogFile/Id/Seg/Off,
@@ -95,6 +96,11 @@ static XLogRecPtr sentPtr = {0, 0};
  */
 static StringInfoData reply_message;
 
+/*
+ * Timestamp of the last receipt of the reply from the standby.
+ */
+static TimestampTz last_reply_timestamp;
+
 /* Flags set by signal handlers for later service in main loop */
 static volatile sig_atomic_t got_SIGHUP = false;
 volatile sig_atomic_t walsender_shutdown_requested = false;
@@ -113,7 +119,7 @@ static int	WalSndLoop(void);
 static void InitWalSnd(void);
 static void WalSndHandshake(void);
 static void WalSndKill(int code, Datum arg);
-static bool XLogSend(char *msgbuf, bool *caughtup);
+static void XLogSend(char *msgbuf, bool *caughtup);
 static void IdentifySystem(void);
 static void StartReplication(StartReplicationCmd * cmd);
 static void ProcessStandbyMessage(void);
@@ -469,6 +475,7 @@ ProcessRepliesIfAny(void)
 {
 	unsigned char firstchar;
 	int			r;
+	int		received = false;
 
 	for (;;)
 	{
@@ -484,7 +491,7 @@ ProcessRepliesIfAny(void)
 		if (r == 0)
 		{
 			/* no data available without blocking */
-			return;
+			break;
 		}
 
 		/* Handle the very limited subset of commands expected in this phase */
@@ -495,6 +502,7 @@ ProcessRepliesIfAny(void)
 				 */
 			case 'd':
 				ProcessStandbyMessage();
+				received = true;
 				break;
 
 				/*
@@ -510,6 +518,12 @@ ProcessRepliesIfAny(void)
 								firstchar)));
 		}
 	}
+	/*
+	 * Save the last reply timestamp if we've received at least
+	 * one reply.
+	 */
+	if (received)
+		last_reply_timestamp = GetCurrentTimestamp();
 }
 
 /*
@@ -688,6 +702,9 @@ WalSndLoop(void)
 	 */
 	initStringInfo(&reply_message);
 
+	/* Initialize the last reply timestamp */
+	last_reply_timestamp = GetCurrentTimestamp();
+
 	/* Loop forever, unless we get an error */
 	for (;;)
 	{
@@ -706,19 +723,6 @@ WalSndLoop(void)
 			SyncRepInitConfig();
 		}
 
-		/*
-		 * When SIGUSR2 arrives, we send all outstanding logs up to the
-		 * shutdown checkpoint record (i.e., the latest record) and exit.
-		 */
-		if (walsender_ready_to_stop)
-		{
-			if (!XLogSend(output_message, &caughtup))
-				break;
-			ProcessRepliesIfAny();
-			if (caughtup)
-				walsender_shutdown_requested = true;
-		}
-
 		/* Normal exit from the walsender is here */
 		if (walsender_shutdown_requested)
 		{
@@ -730,11 +734,13 @@ WalSndLoop(void)
 		}
 
 		/*
-		 * If we had sent all accumulated WAL in last round, nap for the
-		 * configured time before retrying.
+		 * If we don't have any pending data in the output buffer, try to
+		 * send some more.
 		 */
-		if (caughtup)
+		if (!pq_is_send_pending())
 		{
+			XLogSend(output_message, &caughtup);
+
 			/*
 			 * Even if we wrote all the WAL that was available when we started
 			 * sending, more might have arrived while we were sending this
@@ -742,28 +748,79 @@ WalSndLoop(void)
 			 * received any signals from that time. Let's arm the latch
 			 * again, and after that check that we're still up-to-date.
 			 */
-			ResetLatch(&MyWalSnd->latch);
-
-			if (!XLogSend(output_message, &caughtup))
-				break;
-			if (caughtup && !got_SIGHUP && !walsender_ready_to_stop && !walsender_shutdown_requested)
+			if (caughtup && !pq_is_send_pending())
 			{
-				/*
-				 * XXX: We don't really need the periodic wakeups anymore,
-				 * WaitLatchOrSocket should reliably wake up as soon as
-				 * something interesting happens.
-				 */
+				ResetLatch(&MyWalSnd->latch);
 
-				/* Sleep */
-				WaitLatchOrSocket(&MyWalSnd->latch, MyProcPort->sock,
-								  WalSndDelay * 1000L);
+				XLogSend(output_message, &caughtup);
 			}
 		}
-		else
+
+		/* Flush pending output to the client */
+		if (pq_flush_if_writable() != 0)
+			break;
+
+		/*
+		 * When SIGUSR2 arrives, we send any outstanding logs up to the
+		 * shutdown checkpoint record (i.e., the latest record) and exit.
+		 */
+		if (walsender_ready_to_stop && !pq_is_send_pending())
 		{
-			/* Attempt to send the log once every loop */
-			if (!XLogSend(output_message, &caughtup))
+			XLogSend(output_message, &caughtup);
+			ProcessRepliesIfAny();
+			if (caughtup && !pq_is_send_pending())
+				walsender_shutdown_requested = true;
+		}
+
+		if ((caughtup || pq_is_send_pending()) &&
+			!got_SIGHUP &&
+			!walsender_shutdown_requested)
+		{
+			TimestampTz	finish_time;
+			long		sleeptime;
+
+			/* Reschedule replication timeout */
+			if (replication_timeout > 0)
+			{
+				long		secs;
+				int		usecs;
+
+				finish_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
+														  replication_timeout);
+				TimestampDifference(GetCurrentTimestamp(),
+									finish_time, &secs, &usecs);
+				sleeptime = secs * 1000 + usecs / 1000;
+				if (WalSndDelay < sleeptime)
+					sleeptime = WalSndDelay;
+			}
+			else
+			{
+				/*
+				 * XXX: Without timeout, we don't really need the periodic
+				 * wakeups anymore, WaitLatchOrSocket should reliably wake up
+				 * as soon as something interesting happens.
+				 */
+				sleeptime = WalSndDelay;
+			}
+
+			/* Sleep */
+			WaitLatchOrSocket(&MyWalSnd->latch, MyProcPort->sock,
+							  true, pq_is_send_pending(),
+							  sleeptime * 1000L);
+
+			/* Check for replication timeout */
+			if (replication_timeout > 0 &&
+				GetCurrentTimestamp() >= finish_time)
+			{
+				/*
+				 * Since typically expiration of replication timeout means
+				 * communication problem, we don't send the error message
+				 * to the standby.
+				 */
+				ereport(COMMERROR,
+						(errmsg("terminating walsender process due to replication timeout")));
 				break;
+			}
 		}
 
 		/*
@@ -993,7 +1050,8 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
 
 /*
  * Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
- * but not yet sent to the client, and send it.
+ * but not yet sent to the client, and buffer it in the libpq output
+ * buffer.
  *
  * msgbuf is a work area in which the output message is constructed.  It's
  * passed in just so we can avoid re-palloc'ing the buffer on each cycle.
@@ -1001,10 +1059,9 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
  *
  * If there is no unsent WAL remaining, *caughtup is set to true, otherwise
  * *caughtup is set to false.
- *
- * Returns true if OK, false if trouble.
+
  */
-static bool
+static void
 XLogSend(char *msgbuf, bool *caughtup)
 {
 	XLogRecPtr	SendRqstPtr;
@@ -1027,7 +1084,7 @@ XLogSend(char *msgbuf, bool *caughtup)
 	if (XLByteLE(SendRqstPtr, sentPtr))
 	{
 		*caughtup = true;
-		return true;
+		return;
 	}
 
 	/*
@@ -1099,11 +1156,7 @@ XLogSend(char *msgbuf, bool *caughtup)
 
 	memcpy(msgbuf + 1, &msghdr, sizeof(WalDataMessageHeader));
 
-	pq_putmessage('d', msgbuf, 1 + sizeof(WalDataMessageHeader) + nbytes);
-
-	/* Flush pending output to the client */
-	if (pq_flush())
-		return false;
+	pq_putmessage_noblock('d', msgbuf, 1 + sizeof(WalDataMessageHeader) + nbytes);
 
 	sentPtr = endptr;
 
@@ -1127,7 +1180,7 @@ XLogSend(char *msgbuf, bool *caughtup)
 		set_ps_display(activitymsg, false);
 	}
 
-	return true;
+	return;
 }
 
 /* SIGHUP: set flag to re-read config file at next convenient time */
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 9ca1329..b49bdae 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -1856,6 +1856,16 @@ static struct config_int ConfigureNamesInt[] =
 	},
 
 	{
+		{"replication_timeout", PGC_SIGHUP, WAL_REPLICATION,
+			gettext_noop("Sets the maximum time to wait for WAL replication."),
+			NULL,
+			GUC_UNIT_MS
+		},
+		&replication_timeout,
+		60 * 1000, 0, INT_MAX, NULL, NULL
+	},
+
+	{
 		{"commit_delay", PGC_USERSET, WAL_SETTINGS,
 			gettext_noop("Sets the delay in microseconds between transaction commit and "
 						 "flushing WAL to disk."),
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index ed70223..4348185 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -200,6 +200,7 @@
 #wal_sender_delay = 1s		# walsender cycle time, 1-10000 milliseconds
 #wal_keep_segments = 0		# in logfile segments, 16MB each; 0 disables
 #vacuum_defer_cleanup_age = 0	# number of xacts by which cleanup is delayed
+#replication_timeout = 60s # in milliseconds, 0 is disabled
 
 # - Standby Servers -
 
diff --git a/src/include/libpq/libpq.h b/src/include/libpq/libpq.h
index 8ecab6d..b20b0c2 100644
--- a/src/include/libpq/libpq.h
+++ b/src/include/libpq/libpq.h
@@ -60,7 +60,10 @@ extern int	pq_peekbyte(void);
 extern int	pq_getbyte_if_available(unsigned char *c);
 extern int	pq_putbytes(const char *s, size_t len);
 extern int	pq_flush(void);
+extern int	pq_flush_if_writable(void);
+extern bool	pq_is_send_pending(void);
 extern int	pq_putmessage(char msgtype, const char *s, size_t len);
+extern int	pq_putmessage_noblock(char msgtype, const char *s, size_t len);
 extern void pq_startcopyout(void);
 extern void pq_endcopyout(bool errorAbort);
 
diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h
index 150a71f..2670a2e 100644
--- a/src/include/replication/walsender.h
+++ b/src/include/replication/walsender.h
@@ -98,6 +98,7 @@ extern volatile sig_atomic_t walsender_ready_to_stop;
 /* user-settable parameters */
 extern int	WalSndDelay;
 extern int	max_wal_senders;
+extern int	replication_timeout;
 
 extern int	WalSenderMain(void);
 extern void WalSndSignals(void);
diff --git a/src/include/storage/latch.h b/src/include/storage/latch.h
index 31744ff..f64e13b 100644
--- a/src/include/storage/latch.h
+++ b/src/include/storage/latch.h
@@ -40,7 +40,7 @@ extern void OwnLatch(volatile Latch *latch);
 extern void DisownLatch(volatile Latch *latch);
 extern bool WaitLatch(volatile Latch *latch, long timeout);
 extern int	WaitLatchOrSocket(volatile Latch *latch, pgsocket sock,
-				  long timeout);
+				  bool forRead, bool forWrite, long timeout);
 extern void SetLatch(volatile Latch *latch);
 extern void ResetLatch(volatile Latch *latch);
 #define TestLatch(latch) (((volatile Latch *) latch)->is_set)
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to