On Fri, Feb 18, 2011 at 12:10 PM, Robert Haas <robertmh...@gmail.com> wrote:
> IMHO, that's so broken as to be useless.
>
> I would really like to have a solution to this problem, though.
> Relying on TCP keepalives is weak.

Agreed.

I updated the replication timeout patch which I submitted before.
http://archives.postgresql.org/message-id/AANLkTinSvcdAYryNfZqd0wepyh1Pf7YX6Q0KxhZjas6a%40mail.gmail.com

Since the patch implements also non-blocking send functions,
the timeout can work properly even when the send buffer has
been filled up.

> There are two things that I think are pretty clear.  If the receiver
> has wal_receiver_status_interval=0, then we should ignore
> replication_timeout for that connection.

The patch still doesn't check that wal_receiver_status_interval
is set up properly. I'll implement that later.

Regards,

Regards,

-- 
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
*** a/doc/src/sgml/config.sgml
--- b/doc/src/sgml/config.sgml
***************
*** 2015,2020 **** SET ENABLE_SEQSCAN TO OFF;
--- 2015,2042 ----
         </para>
        </listitem>
       </varlistentry>
+ 
+      <varlistentry id="guc-replication-timeout" xreflabel="replication_timeout">
+       <term><varname>replication_timeout</varname> (<type>integer</type>)</term>
+       <indexterm>
+        <primary><varname>replication_timeout</> configuration parameter</primary>
+       </indexterm>
+       <listitem>
+        <para>
+         Specifies the maximum time, in milliseconds, to wait for the reply
+         from the standby before terminating replication.  This is useful for
+         the primary server to detect the standby crash or network outage.
+         A value of zero (the default) turns this off.  This parameter can
+         only be set in the <filename>postgresql.conf</> file or on the server
+         command line.
+        </para>
+        <para>
+         To make the timeout work properly, <xref linkend="guc-wal-receiver-status-interval">
+         must be enabled on the standby, and its value must be less than the
+         value of <varname>replication_timeout</>.
+        </para>
+       </listitem>
+      </varlistentry>
       </variablelist>
      </sect2>
  
***************
*** 2125,2130 **** SET ENABLE_SEQSCAN TO OFF;
--- 2147,2157 ----
         the <filename>postgresql.conf</> file or on the server command line.
         The default value is 10 seconds.
        </para>
+       <para>
+        When <xref linkend="guc-replication-timeout"> is enabled on the primary,
+        <varname>wal_receiver_status_interval</> must be enabled, and its value
+        must be less than the value of <varname>replication_timeout</>.
+       </para>
        </listitem>
       </varlistentry>
  
*** a/src/backend/libpq/pqcomm.c
--- b/src/backend/libpq/pqcomm.c
***************
*** 56,61 ****
--- 56,63 ----
   *		pq_putbytes		- send bytes to connection (not flushed until pq_flush)
   *		pq_flush		- flush pending output
   *		pq_getbyte_if_available - get a byte if available without blocking
+  *		pq_putbytes_if_writable	- send bytes to connection if writable without blocking
+  *		pq_flush_if_writable	- flush pending output if writable without blocking
   *
   * message-level I/O (and old-style-COPY-OUT cruft):
   *		pq_putmessage	- send a normal message (suppressed in COPY OUT mode)
***************
*** 112,117 **** static char sock_path[MAXPGPATH];
--- 114,120 ----
  
  static char PqSendBuffer[PQ_BUFFER_SIZE];
  static int	PqSendPointer;		/* Next index to store a byte in PqSendBuffer */
+ static int	PqSendStart;		/* Next index to send a byte in PqSendBuffer */
  
  static char PqRecvBuffer[PQ_BUFFER_SIZE];
  static int	PqRecvPointer;		/* Next index to read a byte from PqRecvBuffer */
***************
*** 128,133 **** static bool DoingCopyOut;
--- 131,137 ----
  static void pq_close(int code, Datum arg);
  static int	internal_putbytes(const char *s, size_t len);
  static int	internal_flush(void);
+ static int	internal_flush_if_writable(void);
  
  #ifdef HAVE_UNIX_SOCKETS
  static int	Lock_AF_UNIX(unsigned short portNumber, char *unixSocketName);
***************
*** 1153,1158 **** internal_putbytes(const char *s, size_t len)
--- 1157,1212 ----
  }
  
  /* --------------------------------
+  *		pq_putbytes_if_writable - send bytes to connection (not flushed
+  *			until pq_flush), if writable
+  *
+  * Returns the number of bytes written without blocking, or EOF if trouble.
+  * --------------------------------
+  */
+ int
+ pq_putbytes_if_writable(const char *s, size_t len)
+ {
+ 	size_t		amount;
+ 	size_t		nwritten = 0;
+ 
+ 	/* Should not be called by old-style COPY OUT */
+ 	Assert(!DoingCopyOut);
+ 	/* No-op if reentrant call */
+ 	if (PqCommBusy)
+ 		return 0;
+ 	PqCommBusy = true;
+ 
+ 	while (len > 0)
+ 	{
+ 		/* If buffer is full, then flush it out */
+ 		if (PqSendPointer >= PQ_BUFFER_SIZE)
+ 		{
+ 			int		r;
+ 
+ 			r = internal_flush_if_writable();
+ 			if (r == 0)
+ 				break;
+ 			if (r == EOF)
+ 			{
+ 				PqCommBusy = false;
+ 				return r;
+ 			}
+ 		}
+ 		amount = PQ_BUFFER_SIZE - PqSendPointer;
+ 		if (amount > len)
+ 			amount = len;
+ 		memcpy(PqSendBuffer + PqSendPointer, s, amount);
+ 		PqSendPointer += amount;
+ 		s += amount;
+ 		len -= amount;
+ 		nwritten += amount;
+ 	}
+ 
+ 	PqCommBusy = false;
+ 	return (int) nwritten;
+ }
+ 
+ /* --------------------------------
   *		pq_flush		- flush pending output
   *
   *		returns 0 if OK, EOF if trouble
***************
*** 1224,1229 **** internal_flush(void)
--- 1278,1411 ----
  	return 0;
  }
  
+ /* --------------------------------
+  *		pq_flush_if_writable - flush pending output if writable
+  *
+  * Returns 1 if OK, 0 if pending output cannot be written without blocking,
+  * or EOF if trouble.
+  * --------------------------------
+  */
+ int
+ pq_flush_if_writable(void)
+ {
+ 	int			res;
+ 
+ 	/* No-op if reentrant call */
+ 	if (PqCommBusy)
+ 		return 0;
+ 	PqCommBusy = true;
+ 	res = internal_flush_if_writable();
+ 	PqCommBusy = false;
+ 	return res;
+ }
+ 
+ int
+ internal_flush_if_writable(void)
+ {
+ 	static int	last_reported_send_errno = 0;
+ 
+ 	char	   *bufptr = PqSendBuffer + PqSendStart;
+ 	char	   *bufend = PqSendBuffer + PqSendPointer;
+ 
+ 	while (bufptr < bufend)
+ 	{
+ 		int			r;
+ 
+ 		/* Temporarily put the socket into non-blocking mode */
+ #ifdef WIN32
+ 		pgwin32_noblock = 1;
+ #else
+ 		if (!pg_set_noblock(MyProcPort->sock))
+ 			ereport(ERROR,
+ 					(errmsg("could not set socket to non-blocking mode: %m")));
+ #endif
+ 		MyProcPort->noblock = true;
+ 		PG_TRY();
+ 		{
+ 			r = secure_write(MyProcPort, bufptr, bufend - bufptr);
+ 
+ 			if (r < 0)
+ 			{
+ 				/*
+ 				 * Ok if no data writable without blocking or interrupted (though
+ 				 * EINTR really shouldn't happen with a non-blocking socket).
+ 				 * Report other errors.
+ 				 */
+ 				if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
+ 					r = 0;
+ 				else
+ 				{
+ 					if (errno != last_reported_send_errno)
+ 					{
+ 						/*
+ 						 * Careful: an ereport() that tries to write to the
+ 						 * client would cause recursion to here, leading to
+ 						 * stack overflow and core dump!  This message must
+ 						 * go *only* to the postmaster log.
+ 						 *
+ 						 * If a client disconnects while we're in the midst
+ 						 * of output, we might write quite a bit of data before
+ 						 * we get to a safe query abort point.  So, suppress
+ 						 * duplicate log messages.
+ 						 */
+ 						last_reported_send_errno = errno;
+ 						ereport(COMMERROR,
+ 								(errcode_for_socket_access(),
+ 								 errmsg("could not send data to client: %m")));
+ 					}
+ 
+ 					/*
+ 					 * We drop the buffered data anyway so that processing can
+ 					 * continue, even though we'll probably quit soon.
+ 					 */
+ 					PqSendStart = PqSendPointer = 0;
+ 					r = EOF;
+ 				}
+ 			}
+ 			else if (r == 0)
+ 			{
+ 				/* EOF detected */
+ 				r = EOF;
+ 			}
+ 		}
+ 		PG_CATCH();
+ 		{
+ 			/*
+ 			 * The rest of the backend code assumes the socket is in blocking
+ 			 * mode, so treat failure as FATAL.
+ 			 */
+ #ifdef WIN32
+ 			pgwin32_noblock = 0;
+ #else
+ 			if (!pg_set_block(MyProcPort->sock))
+ 				ereport(FATAL,
+ 						(errmsg("could not set socket to blocking mode: %m")));
+ #endif
+ 			MyProcPort->noblock = false;
+ 			PG_RE_THROW();
+ 		}
+ 		PG_END_TRY();
+ #ifdef WIN32
+ 		pgwin32_noblock = 0;
+ #else
+ 		if (!pg_set_block(MyProcPort->sock))
+ 			ereport(FATAL,
+ 					(errmsg("could not set socket to blocking mode: %m")));
+ #endif
+ 		MyProcPort->noblock = false;
+ 
+ 		if (r == 0 || r == EOF)
+ 			return r;
+ 
+ 		last_reported_send_errno = 0;	/* reset after any successful send */
+ 		bufptr += r;
+ 		PqSendStart += r;
+ 	}
+ 
+ 	PqSendStart = PqSendPointer = 0;
+ 	return 1;
+ }
+ 
  
  /* --------------------------------
   * Message-level I/O routines begin here.
*** a/src/backend/port/unix_latch.c
--- b/src/backend/port/unix_latch.c
***************
*** 193,211 **** DisownLatch(volatile Latch *latch)
  bool
  WaitLatch(volatile Latch *latch, long timeout)
  {
! 	return WaitLatchOrSocket(latch, PGINVALID_SOCKET, timeout) > 0;
  }
  
  /*
   * Like WaitLatch, but will also return when there's data available in
!  * 'sock' for reading. Returns 0 if timeout was reached, 1 if the latch
!  * was set, or 2 if the scoket became readable.
   */
  int
! WaitLatchOrSocket(volatile Latch *latch, pgsocket sock, long timeout)
  {
  	struct timeval tv, *tvp = NULL;
  	fd_set		input_mask;
  	int			rc;
  	int			result = 0;
  
--- 193,214 ----
  bool
  WaitLatch(volatile Latch *latch, long timeout)
  {
! 	return WaitLatchOrSocket(latch, PGINVALID_SOCKET, false, false, timeout) > 0;
  }
  
  /*
   * Like WaitLatch, but will also return when there's data available in
!  * 'sock' for reading or writing. Returns 0 if timeout was reached,
!  * 1 if the latch was set, 2 if the scoket became readable, or 3 if
!  * the socket became writable.
   */
  int
! WaitLatchOrSocket(volatile Latch *latch, pgsocket sock, bool forRead,
! 				  bool forWrite, long timeout)
  {
  	struct timeval tv, *tvp = NULL;
  	fd_set		input_mask;
+ 	fd_set		output_mask;
  	int			rc;
  	int			result = 0;
  
***************
*** 241,254 **** WaitLatchOrSocket(volatile Latch *latch, pgsocket sock, long timeout)
  		FD_ZERO(&input_mask);
  		FD_SET(selfpipe_readfd, &input_mask);
  		hifd = selfpipe_readfd;
! 		if (sock != PGINVALID_SOCKET)
  		{
  			FD_SET(sock, &input_mask);
  			if (sock > hifd)
  				hifd = sock;
  		}
  
! 		rc = select(hifd + 1, &input_mask, NULL, NULL, tvp);
  		if (rc < 0)
  		{
  			if (errno == EINTR)
--- 244,265 ----
  		FD_ZERO(&input_mask);
  		FD_SET(selfpipe_readfd, &input_mask);
  		hifd = selfpipe_readfd;
! 		if (sock != PGINVALID_SOCKET && forRead)
  		{
  			FD_SET(sock, &input_mask);
  			if (sock > hifd)
  				hifd = sock;
  		}
  
! 		FD_ZERO(&output_mask);
! 		if (sock != PGINVALID_SOCKET && forWrite)
! 		{
! 			FD_SET(sock, &output_mask);
! 			if (sock > hifd)
! 				hifd = sock;
! 		}
! 
! 		rc = select(hifd + 1, &input_mask, &output_mask, NULL, tvp);
  		if (rc < 0)
  		{
  			if (errno == EINTR)
***************
*** 263,273 **** WaitLatchOrSocket(volatile Latch *latch, pgsocket sock, long timeout)
  			result = 0;
  			break;
  		}
! 		if (sock != PGINVALID_SOCKET && FD_ISSET(sock, &input_mask))
  		{
  			result = 2;
  			break;		/* data available in socket */
  		}
  	}
  	waiting = false;
  
--- 274,291 ----
  			result = 0;
  			break;
  		}
! 		if (sock != PGINVALID_SOCKET && forRead &&
! 			FD_ISSET(sock, &input_mask))
  		{
  			result = 2;
  			break;		/* data available in socket */
  		}
+ 		if (sock != PGINVALID_SOCKET && forWrite &&
+ 			FD_ISSET(sock, &output_mask))
+ 		{
+ 			result = 3;
+ 			break;		/* data writable in socket */
+ 		}
  	}
  	waiting = false;
  
*** a/src/backend/port/win32/socket.c
--- b/src/backend/port/win32/socket.c
***************
*** 14,20 ****
  #include "postgres.h"
  
  /*
!  * Indicate if pgwin32_recv() should operate in non-blocking mode.
   *
   * Since the socket emulation layer always sets the actual socket to
   * non-blocking mode in order to be able to deliver signals, we must
--- 14,21 ----
  #include "postgres.h"
  
  /*
!  * Indicate if pgwin32_recv() and pgwin32_send() should operate
!  * in non-blocking mode.
   *
   * Since the socket emulation layer always sets the actual socket to
   * non-blocking mode in order to be able to deliver signals, we must
***************
*** 399,404 **** pgwin32_send(SOCKET s, char *buf, int len, int flags)
--- 400,415 ----
  			return -1;
  		}
  
+ 		if (pgwin32_noblock)
+ 		{
+ 			/*
+ 			 * No data sent, and we are in "emulated non-blocking mode", so
+ 			 * return indicating that we'd block if we were to continue.
+ 			 */
+ 			errno = EWOULDBLOCK;
+ 			return -1;
+ 		}
+ 
  		/* No error, zero bytes (win2000+) or error+WSAEWOULDBLOCK (<=nt4) */
  
  		if (pgwin32_waitforsinglesocket(s, FD_WRITE | FD_CLOSE, INFINITE) == 0)
*** a/src/backend/port/win32_latch.c
--- b/src/backend/port/win32_latch.c
***************
*** 85,95 **** DisownLatch(volatile Latch *latch)
  bool
  WaitLatch(volatile Latch *latch, long timeout)
  {
! 	return WaitLatchOrSocket(latch, PGINVALID_SOCKET, timeout) > 0;
  }
  
  int
! WaitLatchOrSocket(volatile Latch *latch, SOCKET sock, long timeout)
  {
  	DWORD		rc;
  	HANDLE		events[3];
--- 85,96 ----
  bool
  WaitLatch(volatile Latch *latch, long timeout)
  {
! 	return WaitLatchOrSocket(latch, PGINVALID_SOCKET, false, false, timeout) > 0;
  }
  
  int
! WaitLatchOrSocket(volatile Latch *latch, SOCKET sock, bool forRead,
! 				  bool forWrite, long timeout)
  {
  	DWORD		rc;
  	HANDLE		events[3];
***************
*** 103,112 **** WaitLatchOrSocket(volatile Latch *latch, SOCKET sock, long timeout)
  	events[0] = latchevent;
  	events[1] = pgwin32_signal_event;
  	numevents = 2;
! 	if (sock != PGINVALID_SOCKET)
  	{
  		sockevent = WSACreateEvent();
! 		WSAEventSelect(sock, sockevent, FD_READ);
  		events[numevents++] = sockevent;
  	}
  
--- 104,120 ----
  	events[0] = latchevent;
  	events[1] = pgwin32_signal_event;
  	numevents = 2;
! 	if (sock != PGINVALID_SOCKET && (forRead || forWrite))
  	{
+ 		int		flags = 0;
+ 
+ 		if (forRead)
+ 			flags |= FD_READ;
+ 		if (forWrite)
+ 			flags |= FD_WRITE;
+ 
  		sockevent = WSACreateEvent();
! 		WSAEventSelect(sock, sockevent, flags);
  		events[numevents++] = sockevent;
  	}
  
***************
*** 139,146 **** WaitLatchOrSocket(volatile Latch *latch, SOCKET sock, long timeout)
  			pgwin32_dispatch_queued_signals();
  		else if (rc == WAIT_OBJECT_0 + 2)
  		{
  			Assert(sock != PGINVALID_SOCKET);
! 			result = 2;
  			break;
  		}
  		else if (rc != WAIT_OBJECT_0)
--- 147,165 ----
  			pgwin32_dispatch_queued_signals();
  		else if (rc == WAIT_OBJECT_0 + 2)
  		{
+ 			WSANETWORKEVENTS resEvents;
+ 
  			Assert(sock != PGINVALID_SOCKET);
! 
! 			ZeroMemory(&resEvents, sizeof(resEvents));
! 			if (WSAEnumNetworkEvents(sock, sockevent, &resEvents) == SOCKET_ERROR)
! 				ereport(FATAL,
! 						(errmsg_internal("failed to enumerate network events: %i", (int) GetLastError())));
! 
! 			if (forRead && resEvents.lNetworkEvents & FD_READ)
! 				result = 2;
! 			if (forWrite && resEvents.lNetworkEvents & FD_WRITE)
! 				result = 3;
  			break;
  		}
  		else if (rc != WAIT_OBJECT_0)
***************
*** 148,154 **** WaitLatchOrSocket(volatile Latch *latch, SOCKET sock, long timeout)
  	}
  
  	/* Clean up the handle we created for the socket */
! 		if (sock != PGINVALID_SOCKET)
  	{
  		WSAEventSelect(sock, sockevent, 0);
  		WSACloseEvent(sockevent);
--- 167,173 ----
  	}
  
  	/* Clean up the handle we created for the socket */
! 	if (sock != PGINVALID_SOCKET && (forRead || forWrite))
  	{
  		WSAEventSelect(sock, sockevent, 0);
  		WSACloseEvent(sockevent);
*** a/src/backend/replication/walsender.c
--- b/src/backend/replication/walsender.c
***************
*** 74,79 **** bool		am_walsender = false;		/* Am I a walsender process ? */
--- 74,91 ----
  /* User-settable parameters for walsender */
  int			max_wal_senders = 0;	/* the maximum number of concurrent walsenders */
  int			WalSndDelay = 1000;	/* max sleep time between some actions */
+ int			replication_timeout = 0;	/* maximum time to send one WAL data message */
+ 
+ /*
+  * Buffer for WAL sending
+  *
+  * WalSndOutBuffer is a work area in which the output message is constructed.
+  * It's used in just so we can avoid re-palloc'ing the buffer on each cycle.
+  * It must be of size 6 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE.
+  */
+ static char	   *WalSndOutBuffer;
+ static int		WalSndOutHead;		/* head of pending output */
+ static int		WalSndOutTail;		/* tail of pending output */
  
  /*
   * These variables are used similarly to openLogFile/Id/Seg/Off,
***************
*** 95,100 **** static XLogRecPtr sentPtr = {0, 0};
--- 107,117 ----
   */
  static StringInfoData reply_message;
  
+ /*
+  * Timestamp of the last receipt of the reply from the standby.
+  */
+ static TimestampTz last_reply_timestamp;
+ 
  /* Flags set by signal handlers for later service in main loop */
  static volatile sig_atomic_t got_SIGHUP = false;
  volatile sig_atomic_t walsender_shutdown_requested = false;
***************
*** 113,119 **** static int	WalSndLoop(void);
  static void InitWalSnd(void);
  static void WalSndHandshake(void);
  static void WalSndKill(int code, Datum arg);
! static bool XLogSend(char *msgbuf, bool *caughtup);
  static void IdentifySystem(void);
  static void StartReplication(StartReplicationCmd * cmd);
  static void ProcessStandbyMessage(void);
--- 130,136 ----
  static void InitWalSnd(void);
  static void WalSndHandshake(void);
  static void WalSndKill(int code, Datum arg);
! static bool XLogSend(bool *caughtup, bool *pending);
  static void IdentifySystem(void);
  static void StartReplication(StartReplicationCmd * cmd);
  static void ProcessStandbyMessage(void);
***************
*** 467,472 **** ProcessRepliesIfAny(void)
--- 484,490 ----
  {
  	unsigned char firstchar;
  	int			r;
+ 	int		received = false;
  
  	for (;;)
  	{
***************
*** 479,487 **** ProcessRepliesIfAny(void)
  					 errmsg("unexpected EOF on standby connection")));
  			proc_exit(0);
  		}
! 		if (r == 0)
  		{
! 			/* no data available without blocking */
  			return;
  		}
  
--- 497,510 ----
  					 errmsg("unexpected EOF on standby connection")));
  			proc_exit(0);
  		}
! 		if (r == 0)	/* no data available without blocking */
  		{
! 			/*
! 			 * Save the last reply timestamp if we've received at least
! 			 * one reply.
! 			 */
! 			if (received)
! 				last_reply_timestamp = GetCurrentTimestamp();
  			return;
  		}
  
***************
*** 493,498 **** ProcessRepliesIfAny(void)
--- 516,522 ----
  				 */
  			case 'd':
  				ProcessStandbyMessage();
+ 				received = true;
  				break;
  
  				/*
***************
*** 669,683 **** ProcessStandbyHSFeedbackMessage(void)
  static int
  WalSndLoop(void)
  {
- 	char	   *output_message;
  	bool		caughtup = false;
  
  	/*
  	 * Allocate buffer that will be used for each output message.  We do this
  	 * just once to reduce palloc overhead.  The buffer must be made large
  	 * enough for maximum-sized messages.
  	 */
! 	output_message = palloc(1 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE);
  
  	/*
  	 * Allocate buffer that will be used for processing reply messages.  As
--- 693,708 ----
  static int
  WalSndLoop(void)
  {
  	bool		caughtup = false;
+ 	bool		pending = false;
  
  	/*
  	 * Allocate buffer that will be used for each output message.  We do this
  	 * just once to reduce palloc overhead.  The buffer must be made large
  	 * enough for maximum-sized messages.
  	 */
! 	WalSndOutBuffer = palloc(6 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE);
! 	WalSndOutHead = WalSndOutTail = 0;
  
  	/*
  	 * Allocate buffer that will be used for processing reply messages.  As
***************
*** 685,690 **** WalSndLoop(void)
--- 710,718 ----
  	 */
  	initStringInfo(&reply_message);
  
+ 	/* Initialize the last reply timestamp */
+ 	last_reply_timestamp = GetCurrentTimestamp();
+ 
  	/* Loop forever, unless we get an error */
  	for (;;)
  	{
***************
*** 708,717 **** WalSndLoop(void)
  		 */
  		if (walsender_ready_to_stop)
  		{
! 			if (!XLogSend(output_message, &caughtup))
  				break;
  			ProcessRepliesIfAny();
! 			if (caughtup)
  				walsender_shutdown_requested = true;
  		}
  
--- 736,745 ----
  		 */
  		if (walsender_ready_to_stop)
  		{
! 			if (!XLogSend(&caughtup, &pending))
  				break;
  			ProcessRepliesIfAny();
! 			if (caughtup && !pending)
  				walsender_shutdown_requested = true;
  		}
  
***************
*** 726,735 **** WalSndLoop(void)
  		}
  
  		/*
! 		 * If we had sent all accumulated WAL in last round, nap for the
! 		 * configured time before retrying.
  		 */
! 		if (caughtup)
  		{
  			/*
  			 * Even if we wrote all the WAL that was available when we started
--- 754,764 ----
  		}
  
  		/*
! 		 * If we had sent all accumulated WAL in last round or could not
! 		 * flush pending WAL in output buffer because the socket was not
! 		 * writable, nap for the configured time before retrying.
  		 */
! 		if (caughtup || pending)
  		{
  			/*
  			 * Even if we wrote all the WAL that was available when we started
***************
*** 740,764 **** WalSndLoop(void)
  			 */
  			ResetLatch(&MyWalSnd->latch);
  
! 			if (!XLogSend(output_message, &caughtup))
  				break;
! 			if (caughtup && !got_SIGHUP && !walsender_ready_to_stop && !walsender_shutdown_requested)
  			{
  				/*
  				 * XXX: We don't really need the periodic wakeups anymore,
  				 * WaitLatchOrSocket should reliably wake up as soon as
  				 * something interesting happens.
  				 */
  
  				/* Sleep */
  				WaitLatchOrSocket(&MyWalSnd->latch, MyProcPort->sock,
! 								  WalSndDelay * 1000L);
  			}
  		}
  		else
  		{
  			/* Attempt to send the log once every loop */
! 			if (!XLogSend(output_message, &caughtup))
  				break;
  		}
  
--- 769,828 ----
  			 */
  			ResetLatch(&MyWalSnd->latch);
  
! 			if (!XLogSend(&caughtup, &pending))
  				break;
! 			if ((caughtup || pending) && !got_SIGHUP && !walsender_ready_to_stop &&
! 					!walsender_shutdown_requested)
  			{
+ 				TimestampTz	finish_time;
+ 				long		sleeptime;
+ 
  				/*
  				 * XXX: We don't really need the periodic wakeups anymore,
  				 * WaitLatchOrSocket should reliably wake up as soon as
  				 * something interesting happens.
  				 */
  
+ 				/* Reschedule replication timeout */
+ 				if (replication_timeout > 0)
+ 				{
+ 					long		secs;
+ 					int		usecs;
+ 
+ 					finish_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
+ 											replication_timeout);
+ 					TimestampDifference(GetCurrentTimestamp(),
+ 								finish_time, &secs, &usecs);
+ 					sleeptime = secs * 1000 + usecs / 1000;
+ 					if (WalSndDelay < sleeptime)
+ 						sleeptime = WalSndDelay;
+ 				}
+ 				else
+ 					sleeptime = WalSndDelay;
+ 
  				/* Sleep */
  				WaitLatchOrSocket(&MyWalSnd->latch, MyProcPort->sock,
! 								  true, (WalSndOutTail > 0),
! 								  sleeptime * 1000L);
! 
! 				/* Check for replication timeout */
! 				if (replication_timeout > 0 && GetCurrentTimestamp() >= finish_time)
! 				{
! 					/*
! 					 * Since typically expiration of replication timeout means
! 					 * communication problem, we don't send the error message
! 					 * to the standby.
! 					 */
! 					ereport(COMMERROR,
! 							(errmsg("terminating walsender process due to replication timeout")));
! 					break;
! 				}
  			}
  		}
  		else
  		{
  			/* Attempt to send the log once every loop */
! 			if (!XLogSend(&caughtup, &pending))
  				break;
  		}
  
***************
*** 986,1009 **** XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
   * Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
   * but not yet sent to the client, and send it.
   *
-  * msgbuf is a work area in which the output message is constructed.  It's
-  * passed in just so we can avoid re-palloc'ing the buffer on each cycle.
-  * It must be of size 1 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE.
-  *
   * If there is no unsent WAL remaining, *caughtup is set to true, otherwise
   * *caughtup is set to false.
   *
   * Returns true if OK, false if trouble.
   */
  static bool
! XLogSend(char *msgbuf, bool *caughtup)
  {
  	XLogRecPtr	SendRqstPtr;
  	XLogRecPtr	startptr;
! 	XLogRecPtr	endptr;
  	Size		nbytes;
  	WalDataMessageHeader msghdr;
  
  	/*
  	 * Attempt to send all data that's already been written out and fsync'd to
  	 * disk.  We cannot go further than what's been written out given the
--- 1050,1097 ----
   * Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
   * but not yet sent to the client, and send it.
   *
   * If there is no unsent WAL remaining, *caughtup is set to true, otherwise
   * *caughtup is set to false.
   *
+  * If there is pending WAL in output buffer, *pending is set to true,
+  * otherwise *pending is set to false.
+  *
   * Returns true if OK, false if trouble.
   */
  static bool
! XLogSend(bool *caughtup, bool *pending)
  {
  	XLogRecPtr	SendRqstPtr;
  	XLogRecPtr	startptr;
! 	static XLogRecPtr	endptr;
  	Size		nbytes;
+ 	uint32		n32;
+ 	int			res;
  	WalDataMessageHeader msghdr;
  
+ 	/* Attempt to flush pending WAL in output buffer */
+ 	if (*pending)
+ 	{
+ 		if (WalSndOutHead != WalSndOutTail)
+ 		{
+ 			res = pq_putbytes_if_writable(WalSndOutBuffer + WalSndOutHead,
+ 										  WalSndOutTail - WalSndOutHead);
+ 			if (res == EOF)
+ 				return false;
+ 			WalSndOutHead += res;
+ 			if (WalSndOutHead != WalSndOutTail)
+ 				return true;
+ 		}
+ 
+ 		res = pq_flush_if_writable();
+ 		if (res == EOF)
+ 			return false;
+ 		if (res == 0)
+ 			return true;
+ 
+ 		goto updt;
+ 	}
+ 
  	/*
  	 * Attempt to send all data that's already been written out and fsync'd to
  	 * disk.  We cannot go further than what's been written out given the
***************
*** 1072,1084 **** XLogSend(char *msgbuf, bool *caughtup)
  	/*
  	 * OK to read and send the slice.
  	 */
! 	msgbuf[0] = 'w';
  
  	/*
  	 * Read the log directly into the output buffer to avoid extra memcpy
  	 * calls.
  	 */
! 	XLogRead(msgbuf + 1 + sizeof(WalDataMessageHeader), startptr, nbytes);
  
  	/*
  	 * We fill the message header last so that the send timestamp is taken as
--- 1160,1178 ----
  	/*
  	 * OK to read and send the slice.
  	 */
! 	WalSndOutBuffer[0] = 'd';
! 	WalSndOutBuffer[5] = 'w';
! 	WalSndOutHead = 0;
! 	WalSndOutTail = 6 + sizeof(WalDataMessageHeader) + nbytes;
! 
! 	n32 = htonl((uint32) WalSndOutTail - 1);
! 	memcpy(WalSndOutBuffer + 1, &n32, 4);
  
  	/*
  	 * Read the log directly into the output buffer to avoid extra memcpy
  	 * calls.
  	 */
! 	XLogRead(WalSndOutBuffer + 6 + sizeof(WalDataMessageHeader), startptr, nbytes);
  
  	/*
  	 * We fill the message header last so that the send timestamp is taken as
***************
*** 1088,1100 **** XLogSend(char *msgbuf, bool *caughtup)
  	msghdr.walEnd = SendRqstPtr;
  	msghdr.sendTime = GetCurrentTimestamp();
  
! 	memcpy(msgbuf + 1, &msghdr, sizeof(WalDataMessageHeader));
  
! 	pq_putmessage('d', msgbuf, 1 + sizeof(WalDataMessageHeader) + nbytes);
  
  	/* Flush pending output to the client */
! 	if (pq_flush())
  		return false;
  
  	sentPtr = endptr;
  
--- 1182,1215 ----
  	msghdr.walEnd = SendRqstPtr;
  	msghdr.sendTime = GetCurrentTimestamp();
  
! 	memcpy(WalSndOutBuffer + 6, &msghdr, sizeof(WalDataMessageHeader));
  
! 	res = pq_putbytes_if_writable(WalSndOutBuffer, WalSndOutTail);
! 	if (res == EOF)
! 		return false;
! 
! 	WalSndOutHead = res;
! 	if (WalSndOutHead != WalSndOutTail)
! 	{
! 		*caughtup = false;
! 		*pending = true;
! 		return true;
! 	}
  
  	/* Flush pending output to the client */
! 	res = pq_flush_if_writable();
! 	if (res == EOF)
  		return false;
+ 	if (res == 0)
+ 	{
+ 		*caughtup = false;
+ 		*pending = true;
+ 		return true;
+ 	}
+ 
+ updt:
+ 	WalSndOutHead = WalSndOutTail = 0;
+ 	*pending = false;
  
  	sentPtr = endptr;
  
*** a/src/backend/utils/misc/guc.c
--- b/src/backend/utils/misc/guc.c
***************
*** 1847,1852 **** static struct config_int ConfigureNamesInt[] =
--- 1847,1862 ----
  	},
  
  	{
+ 		{"replication_timeout", PGC_SIGHUP, WAL_REPLICATION,
+ 			gettext_noop("Sets the maximum time to wait for WAL replication."),
+ 			NULL,
+ 			GUC_UNIT_MS
+ 		},
+ 		&replication_timeout,
+ 		0, 0, INT_MAX, NULL, NULL
+ 	},
+ 
+ 	{
  		{"commit_delay", PGC_USERSET, WAL_SETTINGS,
  			gettext_noop("Sets the delay in microseconds between transaction commit and "
  						 "flushing WAL to disk."),
*** a/src/backend/utils/misc/postgresql.conf.sample
--- b/src/backend/utils/misc/postgresql.conf.sample
***************
*** 191,196 ****
--- 191,197 ----
  #wal_sender_delay = 1s		# walsender cycle time, 1-10000 milliseconds
  #wal_keep_segments = 0		# in logfile segments, 16MB each; 0 disables
  #vacuum_defer_cleanup_age = 0	# number of xacts by which cleanup is delayed
+ #replication_timeout = 0 # in milliseconds, 0 is disabled
  
  # - Standby Servers -
  
*** a/src/include/libpq/libpq.h
--- b/src/include/libpq/libpq.h
***************
*** 59,65 **** extern int	pq_getbyte(void);
--- 59,67 ----
  extern int	pq_peekbyte(void);
  extern int	pq_getbyte_if_available(unsigned char *c);
  extern int	pq_putbytes(const char *s, size_t len);
+ extern int	pq_putbytes_if_writable(const char *s, size_t len);
  extern int	pq_flush(void);
+ extern int	pq_flush_if_writable(void);
  extern int	pq_putmessage(char msgtype, const char *s, size_t len);
  extern void pq_startcopyout(void);
  extern void pq_endcopyout(bool errorAbort);
*** a/src/include/replication/walsender.h
--- b/src/include/replication/walsender.h
***************
*** 70,75 **** extern volatile sig_atomic_t walsender_ready_to_stop;
--- 70,76 ----
  /* user-settable parameters */
  extern int	WalSndDelay;
  extern int	max_wal_senders;
+ extern int	replication_timeout;
  
  extern int	WalSenderMain(void);
  extern void WalSndSignals(void);
*** a/src/include/storage/latch.h
--- b/src/include/storage/latch.h
***************
*** 40,46 **** extern void OwnLatch(volatile Latch *latch);
  extern void DisownLatch(volatile Latch *latch);
  extern bool WaitLatch(volatile Latch *latch, long timeout);
  extern int	WaitLatchOrSocket(volatile Latch *latch, pgsocket sock,
! 				  long timeout);
  extern void SetLatch(volatile Latch *latch);
  extern void ResetLatch(volatile Latch *latch);
  #define TestLatch(latch) (((volatile Latch *) latch)->is_set)
--- 40,46 ----
  extern void DisownLatch(volatile Latch *latch);
  extern bool WaitLatch(volatile Latch *latch, long timeout);
  extern int	WaitLatchOrSocket(volatile Latch *latch, pgsocket sock,
! 				  bool forRead, bool forWrite, long timeout);
  extern void SetLatch(volatile Latch *latch);
  extern void ResetLatch(volatile Latch *latch);
  #define TestLatch(latch) (((volatile Latch *) latch)->is_set)
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to