Hi,

If the primary has a connected standby, the WAL files required for
the standby cannot be deleted. So if it has fallen too far behind
for some reasons, a disk full failure might occur on the primary.
This is one of the problems that should be fixed for v9.0.

We can cope with that case by carefully monitoring the standby lag.
In addition to this, I think that we should put an upper limit on
the number of WAL files held in pg_xlog for the standby (i.e.,
the maximum delay of the standby) as a safeguard against a disk
full error.

The attached patch introduces new GUC 'replication_lag_segments'
which specifies the maximum number of WAL files held in pg_xlog
to send to the standby. The replication to the standby which
falls more than the upper limit behind is automatically terminated,
which would avoid a disk full erro on the primary.

This GUC is also useful to hold some WAL files for the incoming
standby. This would avoid the problem that a WAL file required
for the standby doesn't exist in the primary at the start of
replication, to some extent.

The code is also available in the 'replication' branch in my
git repository.

    git://git.postgresql.org/git/users/fujii/postgres.git
    branch: replication

Comment? Objection? Review?

Regards,

-- 
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
*** a/doc/src/sgml/config.sgml
--- b/doc/src/sgml/config.sgml
***************
*** 1789,1794 **** archive_command = 'copy "%p" "C:\\server\\archivedir\\%f"'  # Windows
--- 1789,1812 ----
         </para>
         </listitem>
        </varlistentry>
+       <varlistentry id="guc-replication-lag-segments" xreflabel="replication_lag_segments">
+        <term><varname>replication_lag_segments</varname> (<type>integer</type>)</term>
+        <indexterm>
+         <primary><varname>replication_lag_segments</> configuration parameter</primary>
+        </indexterm>
+        <listitem>
+        <para>
+         Specifies the maximum number of log file segments held in <filename>pg_xlog</>
+         directory to send to the standby server (each segment is normally 16 megabytes).
+         The replication to the standby server which falls more than <varname>
+         replication_lag_segments</> behind is terminated. This is useful for
+         avoiding a disk full error on the primary and holding the segments required for
+         the incoming standby server. The default value is zero, which disables that
+         upper limit. This parameter can only be set in the <filename>postgresql.conf</>
+         file or on the server command line.
+        </para>
+        </listitem>
+       </varlistentry>
       </variablelist>
      </sect2>
      <sect2 id="runtime-config-standby">
*** a/src/backend/access/transam/xlog.c
--- b/src/backend/access/transam/xlog.c
***************
*** 1725,1730 **** XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
--- 1725,1736 ----
  				if (XLogArchivingActive())
  					XLogArchiveNotifySeg(openLogId, openLogSeg);
  
+ 				/*
+ 				 * Check for the standbys' delay and terminate replication
+ 				 * if needed.
+ 				 */
+ 				CheckStandbysDelay(LogwrtResult.Write);
+ 
  				Write->lastSegSwitchTime = (pg_time_t) time(NULL);
  
  				/*
***************
*** 7213,7240 **** CreateCheckPoint(int flags)
  	smgrpostckpt();
  
  	/*
! 	 * If there's connected standby servers doing XLOG streaming, don't
! 	 * delete XLOG files that have not been streamed to all of them yet.
! 	 * This does nothing to prevent them from being deleted when the
! 	 * standby is disconnected (e.g because of network problems), but at
! 	 * least it avoids an open replication connection from failing because
  	 * of that.
  	 */
  	if ((_logId || _logSeg) && MaxWalSenders > 0)
  	{
- 		XLogRecPtr oldest;
  		uint32	log;
  		uint32	seg;
  
! 		oldest = GetOldestWALSendPointer();
! 		if (oldest.xlogid != 0 || oldest.xrecoff != 0)
  		{
! 			XLByteToSeg(oldest, log, seg);
! 			if (log < _logId || (log == _logId && seg < _logSeg))
! 			{
! 				_logId	= log;
! 				_logSeg	= seg;
! 			}
  		}
  	}
  
--- 7219,7260 ----
  	smgrpostckpt();
  
  	/*
! 	 * Don't delete XLOG files which could be still required for
! 	 * connected or incoming standbys, under the given upper limit.
! 	 * This avoids a replication connection from failing because
  	 * of that.
  	 */
  	if ((_logId || _logSeg) && MaxWalSenders > 0)
  	{
  		uint32	log;
  		uint32	seg;
+ 		bool	need_comp = true;
  
! 		if (RepLagSegs > 0)
  		{
! 			/*
! 			 * Ensure that there is no too lagged standbys before
! 			 * deleting XLOG files.
! 			 */
! 			CheckStandbysDelay(recptr);
! 			XLByteToSeg(recptr, log, seg);
! 			PrevLogSegs(log, seg, RepLagSegs - 1);
! 		}
! 		else
! 		{
! 			XLogRecPtr oldest;
! 
! 			oldest = GetOldestWALSendPointer();
! 			if (oldest.xlogid == 0 || oldest.xrecoff == 0)
! 				need_comp = false;
! 			else
! 				XLByteToSeg(oldest, log, seg);
! 		}
! 
! 		if (need_comp && (log < _logId || (log == _logId && seg < _logSeg)))
! 		{
! 			_logId	= log;
! 			_logSeg	= seg;
  		}
  	}
  
*** a/src/backend/replication/walsender.c
--- b/src/backend/replication/walsender.c
***************
*** 66,71 **** bool	am_walsender	= false;	/* Am I a walsender process ? */
--- 66,72 ----
  /* User-settable parameters for walsender */
  int	MaxWalSenders = 0;		/* the maximum number of concurrent walsenders */
  int	WalSndDelay	= 200;		/* max sleep time between some actions */
+ int	RepLagSegs	= 0;		/* the maximum number of WAL files held for standby */
  
  #define NAPTIME_PER_CYCLE 100	/* max sleep time between cycles (100ms) */
  
***************
*** 86,94 **** static XLogRecPtr sentPtr = {0, 0};
  
  /* Flags set by signal handlers for later service in main loop */
  static volatile sig_atomic_t got_SIGHUP = false;
! static volatile sig_atomic_t shutdown_requested = false;
  static volatile sig_atomic_t ready_to_stop = false;
  
  /* Signal handlers */
  static void WalSndSigHupHandler(SIGNAL_ARGS);
  static void WalSndShutdownHandler(SIGNAL_ARGS);
--- 87,150 ----
  
  /* Flags set by signal handlers for later service in main loop */
  static volatile sig_atomic_t got_SIGHUP = false;
! static volatile sig_atomic_t got_SIGTERM = false;
  static volatile sig_atomic_t ready_to_stop = false;
  
+ static void ProcessWalSndInterrupts(void);
+ static void EnableWalSndImmediateExit(void);
+ static void DisableWalSndImmediateExit(void);
+ 
+ /*
+  * About SIGTERM handling:
+  *
+  * We can't just exit(1) within SIGTERM signal handler, because the signal
+  * might arrive in the middle of some critical operation, like while we're
+  * holding a spinlock. We also can't just set a flag in signal handler and
+  * check it in the main loop, because we perform some blocking libpq
+  * operations like pq_flush(), which can take a long time to finish.
+  *
+  * We use a combined approach: When WalSndImmediateInterruptOK is true, it's
+  * safe for the signal handler to elog(FATAL) immediately. Otherwise it just
+  * sets got_SIGTERM flag, which is checked in the main loop when convenient.
+  *
+  * This is very much like what regular backends do with ImmediateInterruptOK,
+  * ProcessInterrupts() etc.
+  */
+ static volatile bool WalSndImmediateInterruptOK = false;
+ 
+ static void
+ ProcessWalSndInterrupts(void)
+ {
+ 	/*
+ 	 * Although walsender interrupt handling doesn't use the same scheme
+ 	 * as regular backends, call CHECK_FOR_INTERRUPTS() to make sure we
+ 	 * receive any incoming signals on Win32.
+ 	 */
+ 	CHECK_FOR_INTERRUPTS();
+ 
+ 	if (got_SIGTERM)
+ 	{
+ 		WalSndImmediateInterruptOK = false;
+ 		ereport(FATAL,
+ 				(errcode(ERRCODE_ADMIN_SHUTDOWN),
+ 				 errmsg("terminating replication because the standby falls too far behind")));
+ 	}
+ }
+ 
+ static void
+ EnableWalSndImmediateExit()
+ {
+ 	WalSndImmediateInterruptOK = true;
+ 	ProcessWalSndInterrupts();
+ }
+ 
+ static void
+ DisableWalSndImmediateExit()
+ {
+ 	WalSndImmediateInterruptOK = false;
+ 	ProcessWalSndInterrupts();
+ }
+ 
  /* Signal handlers */
  static void WalSndSigHupHandler(SIGNAL_ARGS);
  static void WalSndShutdownHandler(SIGNAL_ARGS);
***************
*** 386,396 **** WalSndLoop(void)
  		if (ready_to_stop)
  		{
  			XLogSend(&output_message);
! 			shutdown_requested = true;
  		}
  
  		/* Normal exit from the walsender is here */
! 		if (shutdown_requested)
  		{
  			/* Inform the standby that XLOG streaming was done */
  			pq_puttextmessage('C', "COPY 0");
--- 442,452 ----
  		if (ready_to_stop)
  		{
  			XLogSend(&output_message);
! 			got_SIGTERM = true;
  		}
  
  		/* Normal exit from the walsender is here */
! 		if (got_SIGTERM)
  		{
  			/* Inform the standby that XLOG streaming was done */
  			pq_puttextmessage('C', "COPY 0");
***************
*** 410,416 **** WalSndLoop(void)
  		remain = WalSndDelay;
  		while (remain > 0)
  		{
! 			if (got_SIGHUP || shutdown_requested || ready_to_stop)
  				break;
  
  			/*
--- 466,472 ----
  		remain = WalSndDelay;
  		while (remain > 0)
  		{
! 			if (got_SIGHUP || got_SIGTERM || ready_to_stop)
  				break;
  
  			/*
***************
*** 586,591 **** XLogSend(StringInfo outMsg)
--- 642,648 ----
  {
  	XLogRecPtr	SendRqstPtr;
  	char	activitymsg[50];
+ 	int	res;
  	/* use volatile pointer to prevent code rearrangement */
  	volatile WalSnd *walsnd = MyWalSnd;
  
***************
*** 676,682 **** XLogSend(StringInfo outMsg)
--- 733,741 ----
  		outMsg->len += nbytes;
  		outMsg->data[outMsg->len] = '\0';
  
+ 		EnableWalSndImmediateExit();
  		pq_putmessage('d', outMsg->data, outMsg->len);
+ 		DisableWalSndImmediateExit();
  		resetStringInfo(outMsg);
  	}
  
***************
*** 686,692 **** XLogSend(StringInfo outMsg)
  	SpinLockRelease(&walsnd->mutex);
  
  	/* Flush pending output */
! 	if (pq_flush())
  		return false;
  
  	/* Report progress of XLOG streaming in PS display */
--- 745,754 ----
  	SpinLockRelease(&walsnd->mutex);
  
  	/* Flush pending output */
! 	EnableWalSndImmediateExit();
! 	res = pq_flush();
! 	DisableWalSndImmediateExit();
! 	if (res)
  		return false;
  
  	/* Report progress of XLOG streaming in PS display */
***************
*** 704,714 **** WalSndSigHupHandler(SIGNAL_ARGS)
  	got_SIGHUP = true;
  }
  
! /* SIGTERM: set flag to shut down */
  static void
  WalSndShutdownHandler(SIGNAL_ARGS)
  {
! 	shutdown_requested = true;
  }
  
  /*
--- 766,780 ----
  	got_SIGHUP = true;
  }
  
! /* SIGTERM: set flag for main loop, or shutdown immediately if safe */
  static void
  WalSndShutdownHandler(SIGNAL_ARGS)
  {
! 	got_SIGTERM = true;
! 
! 	/* Don't joggle the elbow of proc_exit */
! 	if (!proc_exit_inprogress && WalSndImmediateInterruptOK)
! 		ProcessWalSndInterrupts();
  }
  
  /*
***************
*** 844,846 **** GetOldestWALSendPointer(void)
--- 910,976 ----
  	}
  	return oldest;
  }
+ 
+ /*
+  * Check if the standbys have fallen too far behind.
+  */
+ void
+ CheckStandbysDelay(XLogRecPtr newptr)
+ {
+ 	uint32	old_segno;
+ 	uint32	new_segno;
+ 	uint32	old_highbits;
+ 	uint32	new_highbits;
+ 	int	i;
+ 	bool	found = false;
+ 
+ 	if (RepLagSegs <= 0)
+ 		return;
+ 
+ 	for (i = 0; i < MaxWalSenders; i++)
+ 	{
+ 		/* use volatile pointer to prevent code rearrangement */
+ 		volatile WalSnd	*walsnd = &WalSndCtl->walsnds[i];
+ 		XLogRecPtr	oldptr;
+ 		pid_t	walsndpid;
+ 
+ 		SpinLockAcquire(&walsnd->mutex);
+ 		oldptr = walsnd->sentPtr;
+ 		walsndpid = walsnd->pid;
+ 		SpinLockRelease(&walsnd->mutex);
+ 
+ 		if (walsndpid == 0)
+ 			continue;
+ 
+ 		/* 
+ 		 * Check to see whether this standby has fallen behind more than
+ 		 * the upper limit.
+ 		 *
+ 		 * This code is based on XLogCheckpointNeeded().
+ 		 */
+ 		old_segno = (oldptr.xlogid % XLogSegSize) * XLogSegsPerFile +
+ 			(oldptr.xrecoff / XLogSegSize);
+ 		old_highbits = oldptr.xlogid / XLogSegSize;
+ 		if (!found)
+ 		{
+ 			new_segno = (newptr.xlogid % XLogSegSize) * XLogSegsPerFile +
+ 				(newptr.xrecoff / XLogSegSize);
+ 			new_highbits = newptr.xlogid / XLogSegSize;
+ 			found = true;
+ 		}
+ 
+ 		if ((new_highbits != old_highbits ||
+ 			 new_segno >= old_segno + (uint32) (RepLagSegs - 1)) &&
+ 			walsndpid == walsnd->pid)
+ 			kill(walsndpid, SIGTERM);
+ 		/*
+ 		 * XXX: Should we recycle (or remove) old log files here? Otherwise
+ 		 * the number of log files would continue to increase until the next
+ 		 * checkpoint has recycled them. Which increases the chance of a disk
+ 		 * full failure. But it's not good to recycle log files during
+ 		 * acquiring WALWriteLock.
+ 		 */
+ 
+ 		/* standby is keeping up well */
+ 	}
+ }
*** a/src/backend/utils/misc/guc.c
--- b/src/backend/utils/misc/guc.c
***************
*** 1721,1726 **** static struct config_int ConfigureNamesInt[] =
--- 1721,1735 ----
  	},
  
  	{
+ 		{"replication_lag_segments", PGC_SIGHUP, WAL_REPLICATION,
+ 			gettext_noop("Sets the maximum number of WAL files held for the standby server."),
+ 			NULL
+ 		},
+ 		&RepLagSegs,
+ 		0, 0, INT_MAX, NULL, NULL
+ 	},
+ 
+ 	{
  		{"commit_delay", PGC_USERSET, WAL_SETTINGS,
  			gettext_noop("Sets the delay in microseconds between transaction commit and "
  						 "flushing WAL to disk."),
*** a/src/backend/utils/misc/postgresql.conf.sample
--- b/src/backend/utils/misc/postgresql.conf.sample
***************
*** 189,194 ****
--- 189,195 ----
  
  #max_wal_senders = 0		# max number of walsender processes
  #wal_sender_delay = 200ms	# 1-10000 milliseconds
+ #replication_lag_segments = 0	# in logfile segments, 16MB each; 0 disables
  
  
  #------------------------------------------------------------------------------
*** a/src/include/access/xlog_internal.h
--- b/src/include/access/xlog_internal.h
***************
*** 151,156 **** typedef XLogLongPageHeaderData *XLogLongPageHeader;
--- 151,170 ----
  		} \
  	} while (0)
  
+ /* Decrement an xlogid/segment pair by segs */
+ #define PrevLogSegs(logId, logSeg, segs)	\
+ 	do {	\
+ 		logId	-= segs / XLogSegsPerFile;	\
+ 		logSeg	-= segs % XLogSegsPerFile;	\
+ 		if (logSeg < 0)	\
+ 		{	\
+ 			logId--;	\
+ 			logSeg	+= XLogSegsPerFile;	\
+ 		}	\
+ 		if (logId < 0)	\
+ 			logId = logSeg = 0;	\
+ 	} while (0)
+ 
  /* Align a record pointer to next page */
  #define NextLogPage(recptr)	\
  	do {	\
*** a/src/include/replication/walsender.h
--- b/src/include/replication/walsender.h
***************
*** 39,49 **** extern bool	am_walsender;
--- 39,51 ----
  
  /* user-settable parameters */
  extern int	WalSndDelay;
+ extern int	RepLagSegs;
  
  extern int WalSenderMain(void);
  extern void WalSndSignals(void);
  extern Size WalSndShmemSize(void);
  extern void WalSndShmemInit(void);
  extern XLogRecPtr GetOldestWALSendPointer(void);
+ extern void CheckStandbysDelay(XLogRecPtr newptr);
  
  #endif	/* _WALSENDER_H */
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to