Hi, If the primary has a connected standby, the WAL files required for the standby cannot be deleted. So if it has fallen too far behind for some reasons, a disk full failure might occur on the primary. This is one of the problems that should be fixed for v9.0.
We can cope with that case by carefully monitoring the standby lag. In addition to this, I think that we should put an upper limit on the number of WAL files held in pg_xlog for the standby (i.e., the maximum delay of the standby) as a safeguard against a disk full error. The attached patch introduces new GUC 'replication_lag_segments' which specifies the maximum number of WAL files held in pg_xlog to send to the standby. The replication to the standby which falls more than the upper limit behind is automatically terminated, which would avoid a disk full erro on the primary. This GUC is also useful to hold some WAL files for the incoming standby. This would avoid the problem that a WAL file required for the standby doesn't exist in the primary at the start of replication, to some extent. The code is also available in the 'replication' branch in my git repository. git://git.postgresql.org/git/users/fujii/postgres.git branch: replication Comment? Objection? Review? Regards, -- Fujii Masao NIPPON TELEGRAPH AND TELEPHONE CORPORATION NTT Open Source Software Center
*** a/doc/src/sgml/config.sgml --- b/doc/src/sgml/config.sgml *************** *** 1789,1794 **** archive_command = 'copy "%p" "C:\\server\\archivedir\\%f"' # Windows --- 1789,1812 ---- </para> </listitem> </varlistentry> + <varlistentry id="guc-replication-lag-segments" xreflabel="replication_lag_segments"> + <term><varname>replication_lag_segments</varname> (<type>integer</type>)</term> + <indexterm> + <primary><varname>replication_lag_segments</> configuration parameter</primary> + </indexterm> + <listitem> + <para> + Specifies the maximum number of log file segments held in <filename>pg_xlog</> + directory to send to the standby server (each segment is normally 16 megabytes). + The replication to the standby server which falls more than <varname> + replication_lag_segments</> behind is terminated. This is useful for + avoiding a disk full error on the primary and holding the segments required for + the incoming standby server. The default value is zero, which disables that + upper limit. This parameter can only be set in the <filename>postgresql.conf</> + file or on the server command line. + </para> + </listitem> + </varlistentry> </variablelist> </sect2> <sect2 id="runtime-config-standby"> *** a/src/backend/access/transam/xlog.c --- b/src/backend/access/transam/xlog.c *************** *** 1725,1730 **** XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch) --- 1725,1736 ---- if (XLogArchivingActive()) XLogArchiveNotifySeg(openLogId, openLogSeg); + /* + * Check for the standbys' delay and terminate replication + * if needed. + */ + CheckStandbysDelay(LogwrtResult.Write); + Write->lastSegSwitchTime = (pg_time_t) time(NULL); /* *************** *** 7213,7240 **** CreateCheckPoint(int flags) smgrpostckpt(); /* ! * If there's connected standby servers doing XLOG streaming, don't ! * delete XLOG files that have not been streamed to all of them yet. ! * This does nothing to prevent them from being deleted when the ! * standby is disconnected (e.g because of network problems), but at ! * least it avoids an open replication connection from failing because * of that. */ if ((_logId || _logSeg) && MaxWalSenders > 0) { - XLogRecPtr oldest; uint32 log; uint32 seg; ! oldest = GetOldestWALSendPointer(); ! if (oldest.xlogid != 0 || oldest.xrecoff != 0) { ! XLByteToSeg(oldest, log, seg); ! if (log < _logId || (log == _logId && seg < _logSeg)) ! { ! _logId = log; ! _logSeg = seg; ! } } } --- 7219,7260 ---- smgrpostckpt(); /* ! * Don't delete XLOG files which could be still required for ! * connected or incoming standbys, under the given upper limit. ! * This avoids a replication connection from failing because * of that. */ if ((_logId || _logSeg) && MaxWalSenders > 0) { uint32 log; uint32 seg; + bool need_comp = true; ! if (RepLagSegs > 0) { ! /* ! * Ensure that there is no too lagged standbys before ! * deleting XLOG files. ! */ ! CheckStandbysDelay(recptr); ! XLByteToSeg(recptr, log, seg); ! PrevLogSegs(log, seg, RepLagSegs - 1); ! } ! else ! { ! XLogRecPtr oldest; ! ! oldest = GetOldestWALSendPointer(); ! if (oldest.xlogid == 0 || oldest.xrecoff == 0) ! need_comp = false; ! else ! XLByteToSeg(oldest, log, seg); ! } ! ! if (need_comp && (log < _logId || (log == _logId && seg < _logSeg))) ! { ! _logId = log; ! _logSeg = seg; } } *** a/src/backend/replication/walsender.c --- b/src/backend/replication/walsender.c *************** *** 66,71 **** bool am_walsender = false; /* Am I a walsender process ? */ --- 66,72 ---- /* User-settable parameters for walsender */ int MaxWalSenders = 0; /* the maximum number of concurrent walsenders */ int WalSndDelay = 200; /* max sleep time between some actions */ + int RepLagSegs = 0; /* the maximum number of WAL files held for standby */ #define NAPTIME_PER_CYCLE 100 /* max sleep time between cycles (100ms) */ *************** *** 86,94 **** static XLogRecPtr sentPtr = {0, 0}; /* Flags set by signal handlers for later service in main loop */ static volatile sig_atomic_t got_SIGHUP = false; ! static volatile sig_atomic_t shutdown_requested = false; static volatile sig_atomic_t ready_to_stop = false; /* Signal handlers */ static void WalSndSigHupHandler(SIGNAL_ARGS); static void WalSndShutdownHandler(SIGNAL_ARGS); --- 87,150 ---- /* Flags set by signal handlers for later service in main loop */ static volatile sig_atomic_t got_SIGHUP = false; ! static volatile sig_atomic_t got_SIGTERM = false; static volatile sig_atomic_t ready_to_stop = false; + static void ProcessWalSndInterrupts(void); + static void EnableWalSndImmediateExit(void); + static void DisableWalSndImmediateExit(void); + + /* + * About SIGTERM handling: + * + * We can't just exit(1) within SIGTERM signal handler, because the signal + * might arrive in the middle of some critical operation, like while we're + * holding a spinlock. We also can't just set a flag in signal handler and + * check it in the main loop, because we perform some blocking libpq + * operations like pq_flush(), which can take a long time to finish. + * + * We use a combined approach: When WalSndImmediateInterruptOK is true, it's + * safe for the signal handler to elog(FATAL) immediately. Otherwise it just + * sets got_SIGTERM flag, which is checked in the main loop when convenient. + * + * This is very much like what regular backends do with ImmediateInterruptOK, + * ProcessInterrupts() etc. + */ + static volatile bool WalSndImmediateInterruptOK = false; + + static void + ProcessWalSndInterrupts(void) + { + /* + * Although walsender interrupt handling doesn't use the same scheme + * as regular backends, call CHECK_FOR_INTERRUPTS() to make sure we + * receive any incoming signals on Win32. + */ + CHECK_FOR_INTERRUPTS(); + + if (got_SIGTERM) + { + WalSndImmediateInterruptOK = false; + ereport(FATAL, + (errcode(ERRCODE_ADMIN_SHUTDOWN), + errmsg("terminating replication because the standby falls too far behind"))); + } + } + + static void + EnableWalSndImmediateExit() + { + WalSndImmediateInterruptOK = true; + ProcessWalSndInterrupts(); + } + + static void + DisableWalSndImmediateExit() + { + WalSndImmediateInterruptOK = false; + ProcessWalSndInterrupts(); + } + /* Signal handlers */ static void WalSndSigHupHandler(SIGNAL_ARGS); static void WalSndShutdownHandler(SIGNAL_ARGS); *************** *** 386,396 **** WalSndLoop(void) if (ready_to_stop) { XLogSend(&output_message); ! shutdown_requested = true; } /* Normal exit from the walsender is here */ ! if (shutdown_requested) { /* Inform the standby that XLOG streaming was done */ pq_puttextmessage('C', "COPY 0"); --- 442,452 ---- if (ready_to_stop) { XLogSend(&output_message); ! got_SIGTERM = true; } /* Normal exit from the walsender is here */ ! if (got_SIGTERM) { /* Inform the standby that XLOG streaming was done */ pq_puttextmessage('C', "COPY 0"); *************** *** 410,416 **** WalSndLoop(void) remain = WalSndDelay; while (remain > 0) { ! if (got_SIGHUP || shutdown_requested || ready_to_stop) break; /* --- 466,472 ---- remain = WalSndDelay; while (remain > 0) { ! if (got_SIGHUP || got_SIGTERM || ready_to_stop) break; /* *************** *** 586,591 **** XLogSend(StringInfo outMsg) --- 642,648 ---- { XLogRecPtr SendRqstPtr; char activitymsg[50]; + int res; /* use volatile pointer to prevent code rearrangement */ volatile WalSnd *walsnd = MyWalSnd; *************** *** 676,682 **** XLogSend(StringInfo outMsg) --- 733,741 ---- outMsg->len += nbytes; outMsg->data[outMsg->len] = '\0'; + EnableWalSndImmediateExit(); pq_putmessage('d', outMsg->data, outMsg->len); + DisableWalSndImmediateExit(); resetStringInfo(outMsg); } *************** *** 686,692 **** XLogSend(StringInfo outMsg) SpinLockRelease(&walsnd->mutex); /* Flush pending output */ ! if (pq_flush()) return false; /* Report progress of XLOG streaming in PS display */ --- 745,754 ---- SpinLockRelease(&walsnd->mutex); /* Flush pending output */ ! EnableWalSndImmediateExit(); ! res = pq_flush(); ! DisableWalSndImmediateExit(); ! if (res) return false; /* Report progress of XLOG streaming in PS display */ *************** *** 704,714 **** WalSndSigHupHandler(SIGNAL_ARGS) got_SIGHUP = true; } ! /* SIGTERM: set flag to shut down */ static void WalSndShutdownHandler(SIGNAL_ARGS) { ! shutdown_requested = true; } /* --- 766,780 ---- got_SIGHUP = true; } ! /* SIGTERM: set flag for main loop, or shutdown immediately if safe */ static void WalSndShutdownHandler(SIGNAL_ARGS) { ! got_SIGTERM = true; ! ! /* Don't joggle the elbow of proc_exit */ ! if (!proc_exit_inprogress && WalSndImmediateInterruptOK) ! ProcessWalSndInterrupts(); } /* *************** *** 844,846 **** GetOldestWALSendPointer(void) --- 910,976 ---- } return oldest; } + + /* + * Check if the standbys have fallen too far behind. + */ + void + CheckStandbysDelay(XLogRecPtr newptr) + { + uint32 old_segno; + uint32 new_segno; + uint32 old_highbits; + uint32 new_highbits; + int i; + bool found = false; + + if (RepLagSegs <= 0) + return; + + for (i = 0; i < MaxWalSenders; i++) + { + /* use volatile pointer to prevent code rearrangement */ + volatile WalSnd *walsnd = &WalSndCtl->walsnds[i]; + XLogRecPtr oldptr; + pid_t walsndpid; + + SpinLockAcquire(&walsnd->mutex); + oldptr = walsnd->sentPtr; + walsndpid = walsnd->pid; + SpinLockRelease(&walsnd->mutex); + + if (walsndpid == 0) + continue; + + /* + * Check to see whether this standby has fallen behind more than + * the upper limit. + * + * This code is based on XLogCheckpointNeeded(). + */ + old_segno = (oldptr.xlogid % XLogSegSize) * XLogSegsPerFile + + (oldptr.xrecoff / XLogSegSize); + old_highbits = oldptr.xlogid / XLogSegSize; + if (!found) + { + new_segno = (newptr.xlogid % XLogSegSize) * XLogSegsPerFile + + (newptr.xrecoff / XLogSegSize); + new_highbits = newptr.xlogid / XLogSegSize; + found = true; + } + + if ((new_highbits != old_highbits || + new_segno >= old_segno + (uint32) (RepLagSegs - 1)) && + walsndpid == walsnd->pid) + kill(walsndpid, SIGTERM); + /* + * XXX: Should we recycle (or remove) old log files here? Otherwise + * the number of log files would continue to increase until the next + * checkpoint has recycled them. Which increases the chance of a disk + * full failure. But it's not good to recycle log files during + * acquiring WALWriteLock. + */ + + /* standby is keeping up well */ + } + } *** a/src/backend/utils/misc/guc.c --- b/src/backend/utils/misc/guc.c *************** *** 1721,1726 **** static struct config_int ConfigureNamesInt[] = --- 1721,1735 ---- }, { + {"replication_lag_segments", PGC_SIGHUP, WAL_REPLICATION, + gettext_noop("Sets the maximum number of WAL files held for the standby server."), + NULL + }, + &RepLagSegs, + 0, 0, INT_MAX, NULL, NULL + }, + + { {"commit_delay", PGC_USERSET, WAL_SETTINGS, gettext_noop("Sets the delay in microseconds between transaction commit and " "flushing WAL to disk."), *** a/src/backend/utils/misc/postgresql.conf.sample --- b/src/backend/utils/misc/postgresql.conf.sample *************** *** 189,194 **** --- 189,195 ---- #max_wal_senders = 0 # max number of walsender processes #wal_sender_delay = 200ms # 1-10000 milliseconds + #replication_lag_segments = 0 # in logfile segments, 16MB each; 0 disables #------------------------------------------------------------------------------ *** a/src/include/access/xlog_internal.h --- b/src/include/access/xlog_internal.h *************** *** 151,156 **** typedef XLogLongPageHeaderData *XLogLongPageHeader; --- 151,170 ---- } \ } while (0) + /* Decrement an xlogid/segment pair by segs */ + #define PrevLogSegs(logId, logSeg, segs) \ + do { \ + logId -= segs / XLogSegsPerFile; \ + logSeg -= segs % XLogSegsPerFile; \ + if (logSeg < 0) \ + { \ + logId--; \ + logSeg += XLogSegsPerFile; \ + } \ + if (logId < 0) \ + logId = logSeg = 0; \ + } while (0) + /* Align a record pointer to next page */ #define NextLogPage(recptr) \ do { \ *** a/src/include/replication/walsender.h --- b/src/include/replication/walsender.h *************** *** 39,49 **** extern bool am_walsender; --- 39,51 ---- /* user-settable parameters */ extern int WalSndDelay; + extern int RepLagSegs; extern int WalSenderMain(void); extern void WalSndSignals(void); extern Size WalSndShmemSize(void); extern void WalSndShmemInit(void); extern XLogRecPtr GetOldestWALSendPointer(void); + extern void CheckStandbysDelay(XLogRecPtr newptr); #endif /* _WALSENDER_H */
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers