On 16.03.2011 11:11, Fujii Masao wrote:
On Wed, Mar 16, 2011 at 4:49 PM, Fujii Masao<masao.fu...@gmail.com>  wrote:
Agreed. I'll change the patch.

Done. I attached the updated patch.

I don't much like the API for this. Walsender shouldn't need to know about the details of the FE/BE protocol, pq_putbytes_if_available() seems too low level to be useful.

I think a better API would be to have a non-blocking version of pq_putmessage(). We can make the output buffer in pqcomm.c resizeable, so that when the message doesn't fit in the output buffer in pq_putmessage(), the buffer is enlarged instead of trying to flush it.

Attached is a patch using that approach. This is a much smaller patch, and easier to understand. I'm not totally happy with the walsender main loop, it seems to work as it is, but the logic has become quite complicated. Ideas welcome on how to simplify that.

--
  Heikki Linnakangas
  EnterpriseDB   http://www.enterprisedb.com
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index e0ebee6..3192ef7 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -2019,6 +2019,28 @@ SET ENABLE_SEQSCAN TO OFF;
        </para>
       </listitem>
      </varlistentry>
+
+     <varlistentry id="guc-replication-timeout" xreflabel="replication_timeout">
+      <term><varname>replication_timeout</varname> (<type>integer</type>)</term>
+      <indexterm>
+       <primary><varname>replication_timeout</> configuration parameter</primary>
+      </indexterm>
+      <listitem>
+       <para>
+        Specifies the maximum time, in milliseconds, to wait for the reply
+        from the standby before terminating replication.  This is useful for
+        the primary server to detect the standby crash or network outage.
+        A value of zero turns this off.  This parameter can only be set in
+        the <filename>postgresql.conf</> file or on the server command line.
+        The default value is 60 seconds.
+       </para>
+       <para>
+        To make the timeout work properly, <xref linkend="guc-wal-receiver-status-interval">
+        must be enabled on the standby, and its value must be less than the
+        value of <varname>replication_timeout</>.
+       </para>
+      </listitem>
+     </varlistentry>
      </variablelist>
     </sect2>
 
@@ -2216,6 +2238,11 @@ SET ENABLE_SEQSCAN TO OFF;
        the <filename>postgresql.conf</> file or on the server command line.
        The default value is 10 seconds.
       </para>
+      <para>
+       When <xref linkend="guc-replication-timeout"> is enabled on the primary,
+       <varname>wal_receiver_status_interval</> must be enabled, and its value
+       must be less than the value of <varname>replication_timeout</>.
+      </para>
       </listitem>
      </varlistentry>
 
diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c
index 3c7b05b..b6dc8cc 100644
--- a/src/backend/libpq/pqcomm.c
+++ b/src/backend/libpq/pqcomm.c
@@ -56,9 +56,11 @@
  *		pq_putbytes		- send bytes to connection (not flushed until pq_flush)
  *		pq_flush		- flush pending output
  *		pq_getbyte_if_available - get a byte if available without blocking
+ *		pq_flush_if_writable	- flush pending output if writable without blocking
  *
  * message-level I/O (and old-style-COPY-OUT cruft):
  *		pq_putmessage	- send a normal message (suppressed in COPY OUT mode)
+ *		pq_putmessage_noblock - buffer a normal message without blocking (suppressed in COPY OUT mode)
  *		pq_startcopyout - inform libpq that a COPY OUT transfer is beginning
  *		pq_endcopyout	- end a COPY OUT transfer
  *
@@ -92,6 +94,7 @@
 #include "miscadmin.h"
 #include "storage/ipc.h"
 #include "utils/guc.h"
+#include "utils/memutils.h"
 
 /*
  * Configuration options
@@ -108,12 +111,15 @@ static char sock_path[MAXPGPATH];
  * Buffers for low-level I/O
  */
 
-#define PQ_BUFFER_SIZE 8192
+#define PQ_SEND_BUFFER_SIZE 8192
+#define PQ_RECV_BUFFER_SIZE 8192
 
-static char PqSendBuffer[PQ_BUFFER_SIZE];
+static char *PqSendBuffer;
+static int	PqSendBufferSize;
 static int	PqSendPointer;		/* Next index to store a byte in PqSendBuffer */
+static int	PqSendStart;		/* Next index to send a byte in PqSendBuffer */
 
-static char PqRecvBuffer[PQ_BUFFER_SIZE];
+static char PqRecvBuffer[PQ_RECV_BUFFER_SIZE];
 static int	PqRecvPointer;		/* Next index to read a byte from PqRecvBuffer */
 static int	PqRecvLength;		/* End of data available in PqRecvBuffer */
 
@@ -142,7 +148,9 @@ static int	Setup_AF_UNIX(void);
 void
 pq_init(void)
 {
-	PqSendPointer = PqRecvPointer = PqRecvLength = 0;
+	PqSendBufferSize = PQ_SEND_BUFFER_SIZE;
+	PqSendBuffer = MemoryContextAlloc(TopMemoryContext, PqSendBufferSize);
+	PqSendPointer = PqSendStart = PqRecvPointer = PqRecvLength = 0;
 	PqCommBusy = false;
 	DoingCopyOut = false;
 	on_proc_exit(pq_close, 0);
@@ -762,7 +770,7 @@ pq_recvbuf(void)
 		int			r;
 
 		r = secure_read(MyProcPort, PqRecvBuffer + PqRecvLength,
-						PQ_BUFFER_SIZE - PqRecvLength);
+						PQ_RECV_BUFFER_SIZE - PqRecvLength);
 
 		if (r < 0)
 		{
@@ -1138,10 +1146,10 @@ internal_putbytes(const char *s, size_t len)
 	while (len > 0)
 	{
 		/* If buffer is full, then flush it out */
-		if (PqSendPointer >= PQ_BUFFER_SIZE)
+		if (PqSendPointer >= PqSendBufferSize)
 			if (internal_flush())
 				return EOF;
-		amount = PQ_BUFFER_SIZE - PqSendPointer;
+		amount = PqSendBufferSize - PqSendPointer;
 		if (amount > len)
 			amount = len;
 		memcpy(PqSendBuffer + PqSendPointer, s, amount);
@@ -1172,12 +1180,19 @@ pq_flush(void)
 	return res;
 }
 
+/* --------------------------------
+ *		internal_flush - flush pending output
+ *
+ * Returns 0 if OK (meaning everything was sent, or operation would block
+ * and the socket is in non-blocking mode), or EOF if trouble.
+ * --------------------------------
+ */
 static int
 internal_flush(void)
 {
 	static int	last_reported_send_errno = 0;
 
-	char	   *bufptr = PqSendBuffer;
+	char	   *bufptr = PqSendBuffer + PqSendStart;
 	char	   *bufend = PqSendBuffer + PqSendPointer;
 
 	while (bufptr < bufend)
@@ -1192,6 +1207,16 @@ internal_flush(void)
 				continue;		/* Ok if we were interrupted */
 
 			/*
+			 * Ok if no data writable without blocking, and the socket
+			 * is in non-blocking mode.
+			 */
+			if (errno == EAGAIN ||
+				errno == EWOULDBLOCK)
+			{
+				return 0;
+			}
+
+			/*
 			 * Careful: an ereport() that tries to write to the client would
 			 * cause recursion to here, leading to stack overflow and core
 			 * dump!  This message must go *only* to the postmaster log.
@@ -1212,18 +1237,74 @@ internal_flush(void)
 			 * We drop the buffered data anyway so that processing can
 			 * continue, even though we'll probably quit soon.
 			 */
-			PqSendPointer = 0;
+			PqSendStart = PqSendPointer = 0;
 			return EOF;
 		}
 
 		last_reported_send_errno = 0;	/* reset after any successful send */
 		bufptr += r;
+		PqSendStart += r;
 	}
 
-	PqSendPointer = 0;
+	PqSendStart = PqSendPointer = 0;
 	return 0;
 }
 
+/* --------------------------------
+ *		pq_flush_if_writable - flush pending output if writable
+ *
+ * Returns 0 if OK, or EOF if trouble.
+ * --------------------------------
+ */
+int
+pq_flush_if_writable(void)
+{
+	int			res;
+
+	/* Quick exit if nothing to do */
+	if (PqSendPointer == PqSendStart)
+		return 0;
+
+	/* No-op if reentrant call */
+	if (PqCommBusy)
+		return 0;
+
+	PqCommBusy = true;
+
+	/* Temporarily put the socket into non-blocking mode */
+#ifdef WIN32
+	pgwin32_noblock = 1;
+#else
+	if (!pg_set_noblock(MyProcPort->sock))
+		ereport(ERROR,
+				(errmsg("could not set socket to non-blocking mode: %m")));
+#endif
+	MyProcPort->noblock = true;
+
+	res = internal_flush();
+
+#ifdef WIN32
+	pgwin32_noblock = 0;
+#else
+	if (!pg_set_block(MyProcPort->sock))
+		ereport(FATAL,
+				(errmsg("could not set socket to blocking mode: %m")));
+#endif
+	MyProcPort->noblock = false;
+
+	PqCommBusy = false;
+	return res;
+}
+
+/* --------------------------------
+ *		pq_is_send_pending	- is there any pending data in the output buffer?
+ * --------------------------------
+ */
+bool
+pq_is_send_pending(void)
+{
+	return (PqSendStart < PqSendPointer);
+}
 
 /* --------------------------------
  * Message-level I/O routines begin here.
@@ -1286,6 +1367,25 @@ fail:
 }
 
 /* --------------------------------
+ *		pq_putmessage_noblock	- like pq_putmessage, but never blocks
+ *
+ *		If the output buffer is too small to hold the message, the buffer
+ *		is enlarged.
+ */
+int
+pq_putmessage_noblock(char msgtype, const char *s, size_t len)
+{
+	int required = PqSendPointer + len + 5 ;
+	if (required > PqSendBufferSize)
+	{
+		PqSendBuffer = repalloc(PqSendBuffer, required);
+		PqSendBufferSize = required;
+	}
+	return pq_putmessage(msgtype, s, len);
+}
+
+
+/* --------------------------------
  *		pq_startcopyout - inform libpq that an old-style COPY OUT transfer
  *			is beginning
  * --------------------------------
diff --git a/src/backend/port/unix_latch.c b/src/backend/port/unix_latch.c
index a4f559e..32d0cb5 100644
--- a/src/backend/port/unix_latch.c
+++ b/src/backend/port/unix_latch.c
@@ -193,19 +193,21 @@ DisownLatch(volatile Latch *latch)
 bool
 WaitLatch(volatile Latch *latch, long timeout)
 {
-	return WaitLatchOrSocket(latch, PGINVALID_SOCKET, timeout) > 0;
+	return WaitLatchOrSocket(latch, PGINVALID_SOCKET, false, false, timeout) > 0;
 }
 
 /*
  * Like WaitLatch, but will also return when there's data available in
- * 'sock' for reading. Returns 0 if timeout was reached, 1 if the latch
- * was set, or 2 if the scoket became readable.
+ * 'sock' for reading or writing. Returns 0 if timeout was reached,
+ * 1 if the latch was set, 2 if the socket became readable or writable.
  */
 int
-WaitLatchOrSocket(volatile Latch *latch, pgsocket sock, long timeout)
+WaitLatchOrSocket(volatile Latch *latch, pgsocket sock, bool forRead,
+				  bool forWrite, long timeout)
 {
 	struct timeval tv, *tvp = NULL;
 	fd_set		input_mask;
+	fd_set		output_mask;
 	int			rc;
 	int			result = 0;
 
@@ -241,14 +243,22 @@ WaitLatchOrSocket(volatile Latch *latch, pgsocket sock, long timeout)
 		FD_ZERO(&input_mask);
 		FD_SET(selfpipe_readfd, &input_mask);
 		hifd = selfpipe_readfd;
-		if (sock != PGINVALID_SOCKET)
+		if (sock != PGINVALID_SOCKET && forRead)
 		{
 			FD_SET(sock, &input_mask);
 			if (sock > hifd)
 				hifd = sock;
 		}
 
-		rc = select(hifd + 1, &input_mask, NULL, NULL, tvp);
+		FD_ZERO(&output_mask);
+		if (sock != PGINVALID_SOCKET && forWrite)
+		{
+			FD_SET(sock, &output_mask);
+			if (sock > hifd)
+				hifd = sock;
+		}
+
+		rc = select(hifd + 1, &input_mask, &output_mask, NULL, tvp);
 		if (rc < 0)
 		{
 			if (errno == EINTR)
@@ -263,7 +273,9 @@ WaitLatchOrSocket(volatile Latch *latch, pgsocket sock, long timeout)
 			result = 0;
 			break;
 		}
-		if (sock != PGINVALID_SOCKET && FD_ISSET(sock, &input_mask))
+		if (sock != PGINVALID_SOCKET &&
+			((forRead && FD_ISSET(sock, &input_mask)) ||
+			 (forWrite && FD_ISSET(sock, &output_mask))))
 		{
 			result = 2;
 			break;		/* data available in socket */
diff --git a/src/backend/port/win32/socket.c b/src/backend/port/win32/socket.c
index 76dd6be..dbbd4a3 100644
--- a/src/backend/port/win32/socket.c
+++ b/src/backend/port/win32/socket.c
@@ -14,7 +14,8 @@
 #include "postgres.h"
 
 /*
- * Indicate if pgwin32_recv() should operate in non-blocking mode.
+ * Indicate if pgwin32_recv() and pgwin32_send() should operate
+ * in non-blocking mode.
  *
  * Since the socket emulation layer always sets the actual socket to
  * non-blocking mode in order to be able to deliver signals, we must
@@ -399,6 +400,16 @@ pgwin32_send(SOCKET s, char *buf, int len, int flags)
 			return -1;
 		}
 
+		if (pgwin32_noblock)
+		{
+			/*
+			 * No data sent, and we are in "emulated non-blocking mode", so
+			 * return indicating that we'd block if we were to continue.
+			 */
+			errno = EWOULDBLOCK;
+			return -1;
+		}
+
 		/* No error, zero bytes (win2000+) or error+WSAEWOULDBLOCK (<=nt4) */
 
 		if (pgwin32_waitforsinglesocket(s, FD_WRITE | FD_CLOSE, INFINITE) == 0)
diff --git a/src/backend/port/win32_latch.c b/src/backend/port/win32_latch.c
index ac20c49..f42cfef 100644
--- a/src/backend/port/win32_latch.c
+++ b/src/backend/port/win32_latch.c
@@ -85,11 +85,12 @@ DisownLatch(volatile Latch *latch)
 bool
 WaitLatch(volatile Latch *latch, long timeout)
 {
-	return WaitLatchOrSocket(latch, PGINVALID_SOCKET, timeout) > 0;
+	return WaitLatchOrSocket(latch, PGINVALID_SOCKET, false, false, timeout) > 0;
 }
 
 int
-WaitLatchOrSocket(volatile Latch *latch, SOCKET sock, long timeout)
+WaitLatchOrSocket(volatile Latch *latch, SOCKET sock, bool forRead,
+				  bool forWrite, long timeout)
 {
 	DWORD		rc;
 	HANDLE		events[3];
@@ -103,10 +104,17 @@ WaitLatchOrSocket(volatile Latch *latch, SOCKET sock, long timeout)
 	events[0] = latchevent;
 	events[1] = pgwin32_signal_event;
 	numevents = 2;
-	if (sock != PGINVALID_SOCKET)
+	if (sock != PGINVALID_SOCKET && (forRead || forWrite))
 	{
+		int		flags = 0;
+
+		if (forRead)
+			flags |= FD_READ;
+		if (forWrite)
+			flags |= FD_WRITE;
+
 		sockevent = WSACreateEvent();
-		WSAEventSelect(sock, sockevent, FD_READ);
+		WSAEventSelect(sock, sockevent, flags);
 		events[numevents++] = sockevent;
 	}
 
@@ -139,8 +147,18 @@ WaitLatchOrSocket(volatile Latch *latch, SOCKET sock, long timeout)
 			pgwin32_dispatch_queued_signals();
 		else if (rc == WAIT_OBJECT_0 + 2)
 		{
+			WSANETWORKEVENTS resEvents;
+
 			Assert(sock != PGINVALID_SOCKET);
-			result = 2;
+
+			ZeroMemory(&resEvents, sizeof(resEvents));
+			if (WSAEnumNetworkEvents(sock, sockevent, &resEvents) == SOCKET_ERROR)
+				ereport(FATAL,
+						(errmsg_internal("failed to enumerate network events: %i", (int) GetLastError())));
+
+			if ((forRead && resEvents.lNetworkEvents & FD_READ) ||
+				(forWrite && resEvents.lNetworkEvents & FD_WRITE))
+				result = 2;
 			break;
 		}
 		else if (rc != WAIT_OBJECT_0)
@@ -148,7 +166,7 @@ WaitLatchOrSocket(volatile Latch *latch, SOCKET sock, long timeout)
 	}
 
 	/* Clean up the handle we created for the socket */
-		if (sock != PGINVALID_SOCKET)
+	if (sock != PGINVALID_SOCKET && (forRead || forWrite))
 	{
 		WSAEventSelect(sock, sockevent, 0);
 		WSACloseEvent(sockevent);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index f76b5b0..36406d2 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -74,6 +74,7 @@ bool		am_walsender = false;		/* Am I a walsender process ? */
 /* User-settable parameters for walsender */
 int			max_wal_senders = 0;	/* the maximum number of concurrent walsenders */
 int			WalSndDelay = 1000;	/* max sleep time between some actions */
+int			replication_timeout = 60 * 1000;	/* maximum time to send one WAL data message */
 
 /*
  * These variables are used similarly to openLogFile/Id/Seg/Off,
@@ -95,6 +96,11 @@ static XLogRecPtr sentPtr = {0, 0};
  */
 static StringInfoData reply_message;
 
+/*
+ * Timestamp of the last receipt of the reply from the standby.
+ */
+static TimestampTz last_reply_timestamp;
+
 /* Flags set by signal handlers for later service in main loop */
 static volatile sig_atomic_t got_SIGHUP = false;
 volatile sig_atomic_t walsender_shutdown_requested = false;
@@ -113,7 +119,7 @@ static int	WalSndLoop(void);
 static void InitWalSnd(void);
 static void WalSndHandshake(void);
 static void WalSndKill(int code, Datum arg);
-static bool XLogSend(char *msgbuf, bool *caughtup);
+static void XLogSend(char *msgbuf, bool *caughtup);
 static void IdentifySystem(void);
 static void StartReplication(StartReplicationCmd * cmd);
 static void ProcessStandbyMessage(void);
@@ -469,6 +475,7 @@ ProcessRepliesIfAny(void)
 {
 	unsigned char firstchar;
 	int			r;
+	int		received = false;
 
 	for (;;)
 	{
@@ -484,7 +491,7 @@ ProcessRepliesIfAny(void)
 		if (r == 0)
 		{
 			/* no data available without blocking */
-			return;
+			break;
 		}
 
 		/* Handle the very limited subset of commands expected in this phase */
@@ -495,6 +502,7 @@ ProcessRepliesIfAny(void)
 				 */
 			case 'd':
 				ProcessStandbyMessage();
+				received = true;
 				break;
 
 				/*
@@ -510,6 +518,12 @@ ProcessRepliesIfAny(void)
 								firstchar)));
 		}
 	}
+	/*
+	 * Save the last reply timestamp if we've received at least
+	 * one reply.
+	 */
+	if (received)
+		last_reply_timestamp = GetCurrentTimestamp();
 }
 
 /*
@@ -688,6 +702,9 @@ WalSndLoop(void)
 	 */
 	initStringInfo(&reply_message);
 
+	/* Initialize the last reply timestamp */
+	last_reply_timestamp = GetCurrentTimestamp();
+
 	/* Loop forever, unless we get an error */
 	for (;;)
 	{
@@ -706,19 +723,6 @@ WalSndLoop(void)
 			SyncRepInitConfig();
 		}
 
-		/*
-		 * When SIGUSR2 arrives, we send all outstanding logs up to the
-		 * shutdown checkpoint record (i.e., the latest record) and exit.
-		 */
-		if (walsender_ready_to_stop)
-		{
-			if (!XLogSend(output_message, &caughtup))
-				break;
-			ProcessRepliesIfAny();
-			if (caughtup)
-				walsender_shutdown_requested = true;
-		}
-
 		/* Normal exit from the walsender is here */
 		if (walsender_shutdown_requested)
 		{
@@ -730,11 +734,13 @@ WalSndLoop(void)
 		}
 
 		/*
-		 * If we had sent all accumulated WAL in last round, nap for the
-		 * configured time before retrying.
+		 * If we don't have any pending data in the output buffer, try to
+		 * send some more.
 		 */
-		if (caughtup)
+		if (!pq_is_send_pending())
 		{
+			XLogSend(output_message, &caughtup);
+
 			/*
 			 * Even if we wrote all the WAL that was available when we started
 			 * sending, more might have arrived while we were sending this
@@ -742,28 +748,79 @@ WalSndLoop(void)
 			 * received any signals from that time. Let's arm the latch
 			 * again, and after that check that we're still up-to-date.
 			 */
-			ResetLatch(&MyWalSnd->latch);
-
-			if (!XLogSend(output_message, &caughtup))
-				break;
-			if (caughtup && !got_SIGHUP && !walsender_ready_to_stop && !walsender_shutdown_requested)
+			if (caughtup && !pq_is_send_pending())
 			{
-				/*
-				 * XXX: We don't really need the periodic wakeups anymore,
-				 * WaitLatchOrSocket should reliably wake up as soon as
-				 * something interesting happens.
-				 */
+				ResetLatch(&MyWalSnd->latch);
 
-				/* Sleep */
-				WaitLatchOrSocket(&MyWalSnd->latch, MyProcPort->sock,
-								  WalSndDelay * 1000L);
+				XLogSend(output_message, &caughtup);
 			}
 		}
-		else
+
+		/* Flush pending output to the client */
+		if (pq_flush_if_writable() != 0)
+			break;
+
+		/*
+		 * When SIGUSR2 arrives, we send any outstanding logs up to the
+		 * shutdown checkpoint record (i.e., the latest record) and exit.
+		 */
+		if (walsender_ready_to_stop && !pq_is_send_pending())
 		{
-			/* Attempt to send the log once every loop */
-			if (!XLogSend(output_message, &caughtup))
+			XLogSend(output_message, &caughtup);
+			ProcessRepliesIfAny();
+			if (caughtup && !pq_is_send_pending())
+				walsender_shutdown_requested = true;
+		}
+
+		if ((caughtup || pq_is_send_pending()) &&
+			!got_SIGHUP &&
+			!walsender_shutdown_requested)
+		{
+			TimestampTz	finish_time;
+			long		sleeptime;
+
+			/* Reschedule replication timeout */
+			if (replication_timeout > 0)
+			{
+				long		secs;
+				int		usecs;
+
+				finish_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
+														  replication_timeout);
+				TimestampDifference(GetCurrentTimestamp(),
+									finish_time, &secs, &usecs);
+				sleeptime = secs * 1000 + usecs / 1000;
+				if (WalSndDelay < sleeptime)
+					sleeptime = WalSndDelay;
+			}
+			else
+			{
+				/*
+				 * XXX: Without timeout, we don't really need the periodic
+				 * wakeups anymore, WaitLatchOrSocket should reliably wake up
+				 * as soon as something interesting happens.
+				 */
+				sleeptime = WalSndDelay;
+			}
+
+			/* Sleep */
+			WaitLatchOrSocket(&MyWalSnd->latch, MyProcPort->sock,
+							  true, pq_is_send_pending(),
+							  sleeptime * 1000L);
+
+			/* Check for replication timeout */
+			if (replication_timeout > 0 &&
+				GetCurrentTimestamp() >= finish_time)
+			{
+				/*
+				 * Since typically expiration of replication timeout means
+				 * communication problem, we don't send the error message
+				 * to the standby.
+				 */
+				ereport(COMMERROR,
+						(errmsg("terminating walsender process due to replication timeout")));
 				break;
+			}
 		}
 
 		/*
@@ -993,7 +1050,8 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
 
 /*
  * Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
- * but not yet sent to the client, and send it.
+ * but not yet sent to the client, and buffer it in the libpq output
+ * buffer.
  *
  * msgbuf is a work area in which the output message is constructed.  It's
  * passed in just so we can avoid re-palloc'ing the buffer on each cycle.
@@ -1001,10 +1059,9 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
  *
  * If there is no unsent WAL remaining, *caughtup is set to true, otherwise
  * *caughtup is set to false.
- *
- * Returns true if OK, false if trouble.
+
  */
-static bool
+static void
 XLogSend(char *msgbuf, bool *caughtup)
 {
 	XLogRecPtr	SendRqstPtr;
@@ -1027,7 +1084,7 @@ XLogSend(char *msgbuf, bool *caughtup)
 	if (XLByteLE(SendRqstPtr, sentPtr))
 	{
 		*caughtup = true;
-		return true;
+		return;
 	}
 
 	/*
@@ -1099,11 +1156,7 @@ XLogSend(char *msgbuf, bool *caughtup)
 
 	memcpy(msgbuf + 1, &msghdr, sizeof(WalDataMessageHeader));
 
-	pq_putmessage('d', msgbuf, 1 + sizeof(WalDataMessageHeader) + nbytes);
-
-	/* Flush pending output to the client */
-	if (pq_flush())
-		return false;
+	pq_putmessage_noblock('d', msgbuf, 1 + sizeof(WalDataMessageHeader) + nbytes);
 
 	sentPtr = endptr;
 
@@ -1127,7 +1180,7 @@ XLogSend(char *msgbuf, bool *caughtup)
 		set_ps_display(activitymsg, false);
 	}
 
-	return true;
+	return;
 }
 
 /* SIGHUP: set flag to re-read config file at next convenient time */
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 9ca1329..b49bdae 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -1856,6 +1856,16 @@ static struct config_int ConfigureNamesInt[] =
 	},
 
 	{
+		{"replication_timeout", PGC_SIGHUP, WAL_REPLICATION,
+			gettext_noop("Sets the maximum time to wait for WAL replication."),
+			NULL,
+			GUC_UNIT_MS
+		},
+		&replication_timeout,
+		60 * 1000, 0, INT_MAX, NULL, NULL
+	},
+
+	{
 		{"commit_delay", PGC_USERSET, WAL_SETTINGS,
 			gettext_noop("Sets the delay in microseconds between transaction commit and "
 						 "flushing WAL to disk."),
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index ed70223..4348185 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -200,6 +200,7 @@
 #wal_sender_delay = 1s		# walsender cycle time, 1-10000 milliseconds
 #wal_keep_segments = 0		# in logfile segments, 16MB each; 0 disables
 #vacuum_defer_cleanup_age = 0	# number of xacts by which cleanup is delayed
+#replication_timeout = 60s # in milliseconds, 0 is disabled
 
 # - Standby Servers -
 
diff --git a/src/include/libpq/libpq.h b/src/include/libpq/libpq.h
index 8ecab6d..b20b0c2 100644
--- a/src/include/libpq/libpq.h
+++ b/src/include/libpq/libpq.h
@@ -60,7 +60,10 @@ extern int	pq_peekbyte(void);
 extern int	pq_getbyte_if_available(unsigned char *c);
 extern int	pq_putbytes(const char *s, size_t len);
 extern int	pq_flush(void);
+extern int	pq_flush_if_writable(void);
+extern bool	pq_is_send_pending(void);
 extern int	pq_putmessage(char msgtype, const char *s, size_t len);
+extern int	pq_putmessage_noblock(char msgtype, const char *s, size_t len);
 extern void pq_startcopyout(void);
 extern void pq_endcopyout(bool errorAbort);
 
diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h
index 150a71f..2670a2e 100644
--- a/src/include/replication/walsender.h
+++ b/src/include/replication/walsender.h
@@ -98,6 +98,7 @@ extern volatile sig_atomic_t walsender_ready_to_stop;
 /* user-settable parameters */
 extern int	WalSndDelay;
 extern int	max_wal_senders;
+extern int	replication_timeout;
 
 extern int	WalSenderMain(void);
 extern void WalSndSignals(void);
diff --git a/src/include/storage/latch.h b/src/include/storage/latch.h
index 31744ff..f64e13b 100644
--- a/src/include/storage/latch.h
+++ b/src/include/storage/latch.h
@@ -40,7 +40,7 @@ extern void OwnLatch(volatile Latch *latch);
 extern void DisownLatch(volatile Latch *latch);
 extern bool WaitLatch(volatile Latch *latch, long timeout);
 extern int	WaitLatchOrSocket(volatile Latch *latch, pgsocket sock,
-				  long timeout);
+				  bool forRead, bool forWrite, long timeout);
 extern void SetLatch(volatile Latch *latch);
 extern void ResetLatch(volatile Latch *latch);
 #define TestLatch(latch) (((volatile Latch *) latch)->is_set)
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to