Hi all, Being interested by this feature, I did a patch review.
This features adds the GUC "max_slot_wal_keep_size". This is the maximum amount of WAL that can be kept in "pg_wal" by active slots. If the amount of WAL is superior to this limit, the slot is deactivated and its status (new filed in pg_replication_slot) is set as "lost". Patching ======== The patch v13-0003 does not apply on HEAD anymore. The patch v13-0005 applies using "git am --ignore-space-change" Other patches applies correctly. Please, find attached the v14 set of patches rebased on master. Documentation ============= The documentation explains the GUC and related columns in "pg_replication_slot". It reflects correctly the current behavior of the patch. Usability ========= The patch implement what it described. It is easy to enable and disable. The GUC name is describing correctly its purpose. This feature is useful in some HA scenario where slot are required (eg. no possible archiving), but where primary availability is more important than standbys. In "pg_replication_slots" view, the new "wal_status" field is misleading. Consider this sentence and the related behavior from documentation (catalogs.sgml): <literal>keeping</literal> means that some of them are to be removed by the next checkpoint. "keeping" appears when the current checkpoint will delete some WAL further than "current_lsn - max_slot_wal_keep_size", but still required by at least one slot. As some WAL required by some slots will be deleted quite soon, probably before anyone can react, "keeping" status is misleading here. We are already in the red zone. I would expect this "wal_status" to be: - streaming: slot lag between 0 and "max_wal_size" - keeping: slot lag between "max_wal_size" and "max_slot_wal_keep_size". the slot actually protect some WALs from being deleted - lost: slot lag superior of max_slot_wal_keep_size. The slot couldn't protect some WAL from deletion Documentation follows with: The last two states are seen only when max_slot_wal_keep_size is non-negative This is true with the current behavior. However, if "keeping" is set as soon as te slot lag is superior than "max_wal_size", this status could be useful even with "max_slot_wal_keep_size = -1". As soon as a slot is stacking WALs that should have been removed by previous checkpoint, it "keeps" them. Feature tests ============= I have played with various traffic shaping setup between nodes, to observe how columns "active", "wal_status" and "remain" behaves in regard to each others using: while true; do sleep 0.3; psql -p 5432 -AXtc " select now(), active, restart_lsn, wal_status, pg_size_pretty(remain) from pg_replication_slots where slot_name='slot_limit_st'" done The primary is created using: initdb -U postgres -D slot_limit_pr --wal-segsize=1 cat<<EOF >>slot_limit_pr/postgresql.conf port=5432 max_wal_size = 3MB min_wal_size = 2MB max_slot_wal_keep_size = 4MB logging_collector = on synchronous_commit = off EOF WAL activity is generated using a simple pgbench workload. Then, during this activity, packets on loopback are delayed using: tc qdisc add dev lo root handle 1:0 netem delay 140msec Here is how the wal_status behave. I removed the timestamps, but the record order is the original one: t|1/7B116898|streaming|1872 kB t|1/7B1A0000|lost|0 bytes t|1/7B320000|keeping|0 bytes t|1/7B780000|lost|0 bytes t|1/7BB00000|keeping|0 bytes t|1/7BE00000|keeping|0 bytes t|1/7C100000|lost|0 bytes t|1/7C400000|keeping|0 bytes t|1/7C700000|lost|0 bytes t|1/7CA40000|keeping|0 bytes t|1/7CDE0000|lost|0 bytes t|1/7D100000|keeping|0 bytes t|1/7D400000|keeping|0 bytes t|1/7D7C0000|keeping|0 bytes t|1/7DB40000|keeping|0 bytes t|1/7DE60000|lost|0 bytes t|1/7E180000|keeping|0 bytes t|1/7E500000|keeping|0 bytes t|1/7E860000|lost|0 bytes t|1/7EB80000|keeping|0 bytes [...x15] t|1/80800000|keeping|0 bytes t|1/80900000|streaming|940 kB t|1/80A00000|streaming|1964 kB When increasing the network delay to 145ms, the slot has been lost for real. Note that it has been shown as lost but active twice (during approx 0.6s) before being deactivated. t|1/85700000|streaming|2048 kB t|1/85800000|keeping|0 bytes t|1/85940000|lost|0 bytes t|1/85AC0000|lost|0 bytes f|1/85C40000|lost|0 bytes Finally, at least once the following messages appeared in primary logs **before** the "wal_status" changed from "keeping" to "streaming": WARNING: some replication slots have lost required WAL segments So the slot lost one WAL, but the standby has been able to catch-up anyway. My humble opinion about these results: * after many different tests, the status "keeping" appears only when "remain" equals 0. In current implementation, "keeping" really adds no value... * "remain" should be NULL if "max_slot_wal_keep_size=-1 or if the slot isn't active * the "lost" status should be a definitive status * it seems related, but maybe the "wal_status" should be set as "lost" only when the slot has been deactivate ? * logs should warn about a failing slot as soon as it is effectively deactivated, not before.
>From ab5bd3801cdf5e60765b937249f317bcc0533917 Mon Sep 17 00:00:00 2001 From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp> Date: Thu, 21 Dec 2017 21:20:20 +0900 Subject: [PATCH 1/6] Add WAL relief vent for replication slots Replication slot is useful to maintain replication connection in the configurations where replication is so delayed that connection is broken. On the other hand so many WAL files can fill up disk that the master downs by a long delay. This feature, which is activated by a GUC "max_slot_wal_keep_size", protects master servers from suffering disk full by limiting the number of WAL files reserved by replication slots. --- src/backend/access/transam/xlog.c | 124 ++++++++++++++---- src/backend/replication/slot.c | 57 ++++++++ src/backend/utils/misc/guc.c | 12 ++ src/backend/utils/misc/postgresql.conf.sample | 1 + src/include/access/xlog.h | 1 + src/include/replication/slot.h | 1 + 6 files changed, 173 insertions(+), 23 deletions(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index e08320e829..9bea2a403f 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -104,6 +104,7 @@ int wal_level = WAL_LEVEL_MINIMAL; int CommitDelay = 0; /* precommit delay in microseconds */ int CommitSiblings = 5; /* # concurrent xacts needed to sleep */ int wal_retrieve_retry_interval = 5000; +int max_slot_wal_keep_size_mb = -1; #ifdef WAL_DEBUG bool XLOG_DEBUG = false; @@ -871,6 +872,7 @@ static void checkTimeLineSwitch(XLogRecPtr lsn, TimeLineID newTLI, static void LocalSetXLogInsertAllowed(void); static void CreateEndOfRecoveryRecord(void); static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags); +static XLogSegNo GetOldestKeepSegment(XLogRecPtr currpos, XLogRecPtr minSlotPtr); static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo); static XLogRecPtr XLogGetReplicationSlotMinimumLSN(void); @@ -9272,6 +9274,54 @@ CreateRestartPoint(int flags) return true; } +/* + * Returns minimum segment number that the next checkpoint must leave + * considering wal_keep_segments, replication slots and + * max_slot_wal_keep_size. + * + * currLSN is the current insert location. + * minSlotLSN is the minimum restart_lsn of all active slots. + */ +static XLogSegNo +GetOldestKeepSegment(XLogRecPtr currLSN, XLogRecPtr minSlotLSN) +{ + XLogSegNo currSeg; + XLogSegNo minSlotSeg; + uint64 keepSegs = 0; /* # of segments actually kept */ + + XLByteToSeg(currLSN, currSeg, wal_segment_size); + XLByteToSeg(minSlotLSN, minSlotSeg, wal_segment_size); + + /* + * Calculate how many segments are kept by slots first. The second + * term of the condition is just a sanity check. + */ + if (minSlotLSN != InvalidXLogRecPtr && minSlotSeg <= currSeg) + keepSegs = currSeg - minSlotSeg; + + /* Cap keepSegs by max_slot_wal_keep_size */ + if (max_slot_wal_keep_size_mb >= 0) + { + uint64 limitSegs; + + limitSegs = ConvertToXSegs(max_slot_wal_keep_size_mb, wal_segment_size); + + /* Reduce it if slots already reserves too many. */ + if (limitSegs < keepSegs) + keepSegs = limitSegs; + } + + /* but, keep at least wal_keep_segments segments if any */ + if (wal_keep_segments > 0 && keepSegs < wal_keep_segments) + keepSegs = wal_keep_segments; + + /* avoid underflow, don't go below 1 */ + if (currSeg <= keepSegs) + return 1; + + return currSeg - keepSegs; +} + /* * Retreat *logSegNo to the last segment that we need to retain because of * either wal_keep_segments or replication slots. @@ -9283,38 +9333,66 @@ CreateRestartPoint(int flags) static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo) { - XLogSegNo segno; - XLogRecPtr keep; + XLogRecPtr slotminptr = InvalidXLogRecPtr; + XLogSegNo minSegNo; + XLogSegNo slotSegNo; - XLByteToSeg(recptr, segno, wal_segment_size); - keep = XLogGetReplicationSlotMinimumLSN(); + if (max_replication_slots > 0) + slotminptr = XLogGetReplicationSlotMinimumLSN(); - /* compute limit for wal_keep_segments first */ - if (wal_keep_segments > 0) - { - /* avoid underflow, don't go below 1 */ - if (segno <= wal_keep_segments) - segno = 1; - else - segno = segno - wal_keep_segments; - } + /* + * We should keep certain number of WAL segments after this checkpoint. + */ + minSegNo = GetOldestKeepSegment(recptr, slotminptr); - /* then check whether slots limit removal further */ - if (max_replication_slots > 0 && keep != InvalidXLogRecPtr) + /* + * Warn the checkpoint is going to flush the segments required by + * replication slots. + */ + if (!XLogRecPtrIsInvalid(slotminptr)) { - XLogSegNo slotSegNo; + static XLogSegNo prev_lost_segs = 0; /* avoid duplicate messages */ - XLByteToSeg(keep, slotSegNo, wal_segment_size); + XLByteToSeg(slotminptr, slotSegNo, wal_segment_size); - if (slotSegNo <= 0) - segno = 1; - else if (slotSegNo < segno) - segno = slotSegNo; + if (slotSegNo < minSegNo) + { + XLogSegNo lost_segs = minSegNo - slotSegNo; + if (prev_lost_segs != lost_segs) + { + /* We have lost a new segment, warn it.*/ + XLogRecPtr minlsn; + char *slot_names; + int nslots; + + XLogSegNoOffsetToRecPtr(minSegNo, 0, wal_segment_size, minlsn); + slot_names = + ReplicationSlotsEnumerateBehinds(minlsn, ", ", &nslots); + + /* + * Some of the affected slots could have just been removed. + * We don't need show anything here if no affected slot + * remains. + */ + if (slot_names) + { + ereport(WARNING, + (errmsg ("some replication slots have lost required WAL segments"), + errdetail_plural( + "Slot %s lost %ld segment(s).", + "Slots %s lost at most %ld segment(s).", + nslots, slot_names, lost_segs))); + } + } + prev_lost_segs = lost_segs; + } + else + prev_lost_segs = 0; } /* don't delete WAL segments newer than the calculated segment */ - if (segno < *logSegNo) - *logSegNo = segno; + if (minSegNo < *logSegNo) + *logSegNo = minSegNo; } /* diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 55c306e465..7f9eab6edf 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -1064,6 +1064,63 @@ ReplicationSlotReserveWal(void) } } +/* + * Returns names of replication slots that their restart_lsn are behind + * specified LSN, in palloc'ed character array stuffed with slot names + * delimited by the given separator. Returns NULL if no slot matches. If + * pnslots is given, the number of the returned slots is returned there. + */ +char * +ReplicationSlotsEnumerateBehinds(XLogRecPtr target, char *separator, int *pnslots) +{ + static StringInfoData retstr; + static bool retstr_initialized = false; + bool insert_separator = false; + int i; + int nslots = 0; + + Assert (separator); + if (max_replication_slots <= 0) + return NULL; + + if (!retstr_initialized) + { + initStringInfo(&retstr); + retstr_initialized = true; + } + else + resetStringInfo(&retstr); + + /* construct name list */ + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + for (i = 0 ; i < max_replication_slots ; i++) + { + ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; + + if (s->in_use && s->data.restart_lsn < target) + { + if (insert_separator) + appendStringInfoString(&retstr, separator); + + /* + * Slot names consist only with lower-case letters. We don't + * bother quoting. + */ + appendStringInfoString(&retstr, NameStr(s->data.name)); + insert_separator = true; + nslots++; + } + } + LWLockRelease(ReplicationSlotControlLock); + + /* return the number of slots in the list if requested */ + if (pnslots) + *pnslots = nslots; + + /* return NULL instead of an empty string */ + return retstr.data[0] ? retstr.data : NULL; +} + /* * Flush all replication slots to disk. * diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 1208eb9a68..80245e9def 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -2655,6 +2655,18 @@ static struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + { + {"max_slot_wal_keep_size", PGC_SIGHUP, REPLICATION_SENDING, + gettext_noop("Sets the maximum size of extra WALs kept by replication slots."), + NULL, + GUC_UNIT_MB + }, + &max_slot_wal_keep_size_mb, + -1, -1, + MAX_KILOBYTES, /* XXX: This is in megabytes, like max/min_wal_size */ + NULL, NULL, NULL + }, + { {"wal_sender_timeout", PGC_USERSET, REPLICATION_SENDING, gettext_noop("Sets the maximum time to wait for WAL replication."), diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 5ee5e09ddf..5da2706a99 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -286,6 +286,7 @@ #max_wal_senders = 10 # max number of walsender processes # (change requires restart) #wal_keep_segments = 0 # in logfile segments; 0 disables +#max_slot_wal_keep_size = -1 # measured in bytes; -1 disables #wal_sender_timeout = 60s # in milliseconds; 0 disables #max_replication_slots = 10 # max number of replication slots diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 237f4e0350..e5322abdf5 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -108,6 +108,7 @@ extern int wal_segment_size; extern int min_wal_size_mb; extern int max_wal_size_mb; extern int wal_keep_segments; +extern int max_slot_wal_keep_size_mb; extern int XLOGbuffers; extern int XLogArchiveTimeout; extern int wal_retrieve_retry_interval; diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 8fbddea78f..e0fee0663c 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -199,6 +199,7 @@ extern void ReplicationSlotsComputeRequiredLSN(void); extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void); extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive); extern void ReplicationSlotsDropDBSlots(Oid dboid); +extern char *ReplicationSlotsEnumerateBehinds(XLogRecPtr target, char *separator, int *pnslots); extern void StartupReplicationSlots(void); extern void CheckPointReplicationSlots(void); -- 2.20.1
>From 140829a51e13816afb59be0e11246d36d48ac9ed Mon Sep 17 00:00:00 2001 From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp> Date: Thu, 21 Dec 2017 21:23:25 +0900 Subject: [PATCH 2/6] Add monitoring aid for max_slot_wal_keep_size Adds two columns "status" and "remain" in pg_replication_slot. Setting max_slot_wal_keep_size, replication connections may lose sync by a long delay. The "status" column shows whether the slot is reconnectable or not, or about to lose reserving WAL segments. The "remain" column shows the remaining bytes of WAL that can be advance until the slot loses required WAL records. --- contrib/test_decoding/expected/ddl.out | 4 +- contrib/test_decoding/sql/ddl.sql | 2 + src/backend/access/transam/xlog.c | 152 +++++++++++++++++++++++-- src/backend/catalog/system_views.sql | 4 +- src/backend/replication/slotfuncs.c | 16 ++- src/include/access/xlog.h | 1 + src/include/catalog/pg_proc.dat | 6 +- src/test/regress/expected/rules.out | 6 +- 8 files changed, 174 insertions(+), 17 deletions(-) diff --git a/contrib/test_decoding/expected/ddl.out b/contrib/test_decoding/expected/ddl.out index 2c999fd3eb..cf0318f697 100644 --- a/contrib/test_decoding/expected/ddl.out +++ b/contrib/test_decoding/expected/ddl.out @@ -723,8 +723,8 @@ SELECT pg_drop_replication_slot('regression_slot'); (1 row) /* check that the slot is gone */ +\x SELECT * FROM pg_replication_slots; - slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn ------------+--------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+--------------------- (0 rows) +\x diff --git a/contrib/test_decoding/sql/ddl.sql b/contrib/test_decoding/sql/ddl.sql index 856495c952..0f2b9992f7 100644 --- a/contrib/test_decoding/sql/ddl.sql +++ b/contrib/test_decoding/sql/ddl.sql @@ -387,4 +387,6 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc SELECT pg_drop_replication_slot('regression_slot'); /* check that the slot is gone */ +\x SELECT * FROM pg_replication_slots; +\x diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 9bea2a403f..e9ac50d057 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -872,7 +872,9 @@ static void checkTimeLineSwitch(XLogRecPtr lsn, TimeLineID newTLI, static void LocalSetXLogInsertAllowed(void); static void CreateEndOfRecoveryRecord(void); static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags); -static XLogSegNo GetOldestKeepSegment(XLogRecPtr currpos, XLogRecPtr minSlotPtr); +static XLogSegNo GetOldestXLogFileSegNo(void); +static XLogSegNo GetOldestKeepSegment(XLogRecPtr currpos, XLogRecPtr minSlotPtr, + XLogRecPtr targetLSN, uint64 *restBytes); static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo); static XLogRecPtr XLogGetReplicationSlotMinimumLSN(void); @@ -6658,6 +6660,12 @@ StartupXLOG(void) */ StartupReplicationOrigin(); + /* + * Initialize lastRemovedSegNo looking pg_wal directory. The minimum + * segment number is 1 so wrap-around cannot happen. + */ + XLogCtl->lastRemovedSegNo = GetOldestXLogFileSegNo() - 1; + /* * Initialize unlogged LSN. On a clean shutdown, it's restored from the * control file. On recovery, all unlogged relations are blown away, so @@ -9274,6 +9282,96 @@ CreateRestartPoint(int flags) return true; } + +/* + * Finds the oldest segment number in XLOG directory. + * + * This function is intended to be used to initialize + * XLogCtl->lastRemovedSegNo. + */ +static XLogSegNo +GetOldestXLogFileSegNo(void) +{ + DIR *xldir; + struct dirent *xlde; + XLogSegNo segno = 0; + + xldir = AllocateDir(XLOGDIR); + while ((xlde = ReadDir(xldir, XLOGDIR)) != NULL) + { + TimeLineID tli; + XLogSegNo fsegno; + + /* Ignore files that are not XLOG segments */ + if (!IsXLogFileName(xlde->d_name) && + !IsPartialXLogFileName(xlde->d_name)) + continue; + + XLogFromFileName(xlde->d_name, &tli, &fsegno, wal_segment_size); + + /* + * Get minimum segment ignoring timeline ID, the same way with + * RemoveOldXlogFiles(). + */ + if (segno == 0 || fsegno < segno) + segno = fsegno; + } + + FreeDir(xldir); + + return segno; +} + +/* + * Returns availability of the record at given targetLSN. + * + * Returns three kinds of value in string. + * "streaming" means the WAL record at targetLSN is available. + * "keeping" means it is still available but about to be removed at the next + * checkpoint. + * "lost" means it is already removed. + * + * If restBytes is not NULL, sets the remaining LSN bytes until the segment + * for targetLSN will be removed. + */ +char * +GetLsnAvailability(XLogRecPtr targetLSN, uint64 *restBytes) +{ + XLogRecPtr currpos; + XLogRecPtr slotPtr; + XLogSegNo targetSeg; + XLogSegNo tailSeg; + XLogSegNo oldestSeg; + + Assert(!XLogRecPtrIsInvalid(targetLSN)); + Assert(restBytes); + + currpos = GetXLogWriteRecPtr(); + + SpinLockAcquire(&XLogCtl->info_lck); + oldestSeg = XLogCtl->lastRemovedSegNo; + SpinLockRelease(&XLogCtl->info_lck); + + /* oldest segment is just after the last removed segment */ + oldestSeg++; + + XLByteToSeg(targetLSN, targetSeg, wal_segment_size); + + slotPtr = XLogGetReplicationSlotMinimumLSN(); + tailSeg = GetOldestKeepSegment(currpos, slotPtr, targetLSN, restBytes); + + /* targetSeg is being reserved by slots */ + if (tailSeg <= targetSeg) + return "streaming"; + + /* targetSeg is no longer reserved but still available */ + if (oldestSeg <= targetSeg) + return "keeping"; + + /* targetSeg has gone */ + return "lost"; +} + /* * Returns minimum segment number that the next checkpoint must leave * considering wal_keep_segments, replication slots and @@ -9281,13 +9379,19 @@ CreateRestartPoint(int flags) * * currLSN is the current insert location. * minSlotLSN is the minimum restart_lsn of all active slots. + * targetLSN is used when restBytes is not NULL. + * + * If restBytes is not NULL, sets the remaining LSN bytes until the segment + * for targetLSN will be removed. */ static XLogSegNo -GetOldestKeepSegment(XLogRecPtr currLSN, XLogRecPtr minSlotLSN) +GetOldestKeepSegment(XLogRecPtr currLSN, XLogRecPtr minSlotLSN, + XLogRecPtr targetLSN, uint64 *restBytes) { XLogSegNo currSeg; XLogSegNo minSlotSeg; uint64 keepSegs = 0; /* # of segments actually kept */ + uint64 limitSegs = 0; /* # of maximum segments possibly kept */ XLByteToSeg(currLSN, currSeg, wal_segment_size); XLByteToSeg(minSlotLSN, minSlotSeg, wal_segment_size); @@ -9302,8 +9406,6 @@ GetOldestKeepSegment(XLogRecPtr currLSN, XLogRecPtr minSlotLSN) /* Cap keepSegs by max_slot_wal_keep_size */ if (max_slot_wal_keep_size_mb >= 0) { - uint64 limitSegs; - limitSegs = ConvertToXSegs(max_slot_wal_keep_size_mb, wal_segment_size); /* Reduce it if slots already reserves too many. */ @@ -9311,9 +9413,42 @@ GetOldestKeepSegment(XLogRecPtr currLSN, XLogRecPtr minSlotLSN) keepSegs = limitSegs; } - /* but, keep at least wal_keep_segments segments if any */ - if (wal_keep_segments > 0 && keepSegs < wal_keep_segments) - keepSegs = wal_keep_segments; + if (wal_keep_segments > 0) + { + /* but, keep at least wal_keep_segments segments if any */ + if (keepSegs < wal_keep_segments) + keepSegs = wal_keep_segments; + + /* ditto for limitSegs */ + if (limitSegs < wal_keep_segments) + limitSegs = wal_keep_segments; + } + + /* + * If requested, calculate the remaining LSN bytes until the slot gives up + * keeping WAL records. + */ + if (restBytes) + { + uint64 fragbytes; + XLogSegNo targetSeg; + + *restBytes = 0; + + XLByteToSeg(targetLSN, targetSeg, wal_segment_size); + + /* avoid underflow */ + if (max_slot_wal_keep_size_mb >= 0 && currSeg <= targetSeg + limitSegs) + { + /* + * This slot still has all required segments. Calculate how many + * LSN bytes the slot has until it loses targetLSN. + */ + fragbytes = wal_segment_size - (currLSN % wal_segment_size); + XLogSegNoOffsetToRecPtr(targetSeg + limitSegs - currSeg, fragbytes, + wal_segment_size, *restBytes); + } + } /* avoid underflow, don't go below 1 */ if (currSeg <= keepSegs) @@ -9343,7 +9478,8 @@ KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo) /* * We should keep certain number of WAL segments after this checkpoint. */ - minSegNo = GetOldestKeepSegment(recptr, slotminptr); + minSegNo = + GetOldestKeepSegment(recptr, slotminptr, InvalidXLogRecPtr, NULL); /* * Warn the checkpoint is going to flush the segments required by diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index ea4c85e395..6a9491e64a 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -849,7 +849,9 @@ CREATE VIEW pg_replication_slots AS L.xmin, L.catalog_xmin, L.restart_lsn, - L.confirmed_flush_lsn + L.confirmed_flush_lsn, + L.wal_status, + L.remain FROM pg_get_replication_slots() AS L LEFT JOIN pg_database D ON (L.datoid = D.oid); diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 808a6f5b83..b06ac21cd9 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -221,7 +221,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS) Datum pg_get_replication_slots(PG_FUNCTION_ARGS) { -#define PG_GET_REPLICATION_SLOTS_COLS 11 +#define PG_GET_REPLICATION_SLOTS_COLS 13 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; TupleDesc tupdesc; Tuplestorestate *tupstore; @@ -343,6 +343,20 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) else nulls[i++] = true; + if (restart_lsn == InvalidXLogRecPtr) + { + values[i++] = CStringGetTextDatum("unknown"); + values[i++] = LSNGetDatum(InvalidXLogRecPtr); + } + else + { + uint64 remaining_bytes; + + values[i++] = CStringGetTextDatum( + GetLsnAvailability(restart_lsn, &remaining_bytes)); + values[i++] = Int64GetDatum(remaining_bytes); + } + tuplestore_putvalues(tupstore, tupdesc, values, nulls); } LWLockRelease(ReplicationSlotControlLock); diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index e5322abdf5..3bf8432309 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -305,6 +305,7 @@ extern void ShutdownXLOG(int code, Datum arg); extern void InitXLOGAccess(void); extern void CreateCheckPoint(int flags); extern bool CreateRestartPoint(int flags); +extern char *GetLsnAvailability(XLogRecPtr targetLSN, uint64 *restBytes); extern void XLogPutNextOid(Oid nextOid); extern XLogRecPtr XLogRestorePoint(const char *rpName); extern void UpdateFullPageWrites(void); diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 87335248a0..32dde9cd13 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -9800,9 +9800,9 @@ proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f', proretset => 't', provolatile => 's', prorettype => 'record', proargtypes => '', - proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn}', - proargmodes => '{o,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn}', + proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8}', + proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,remain}', prosrc => 'pg_get_replication_slots' }, { oid => '3786', descr => 'set up a logical replication slot', proname => 'pg_create_logical_replication_slot', provolatile => 'v', diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 210e9cd146..74c44891a4 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1461,8 +1461,10 @@ pg_replication_slots| SELECT l.slot_name, l.xmin, l.catalog_xmin, l.restart_lsn, - l.confirmed_flush_lsn - FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn) + l.confirmed_flush_lsn, + l.wal_status, + l.remain + FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, remain) LEFT JOIN pg_database d ON ((l.datoid = d.oid))); pg_roles| SELECT pg_authid.rolname, pg_authid.rolsuper, -- 2.20.1
>From 335eeda6d2f74531a3b76249a2a1fe04f471065f Mon Sep 17 00:00:00 2001 From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp> Date: Wed, 19 Dec 2018 12:43:57 +0900 Subject: [PATCH 3/6] Add primary_slot_name to init_from_backup in TAP test. It is convenient that priary_slot_name can be specified on taking a base backup. This adds a new parameter of the name to the perl function. --- src/test/perl/PostgresNode.pm | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/test/perl/PostgresNode.pm b/src/test/perl/PostgresNode.pm index 6019f37f91..2c7a894914 100644 --- a/src/test/perl/PostgresNode.pm +++ b/src/test/perl/PostgresNode.pm @@ -680,11 +680,7 @@ sub init_from_backup chmod(0700, $data_path); # Base configuration for this node - $self->append_conf( - 'postgresql.conf', - qq( -port = $port -)); + $self->append_conf('postgresql.conf', qq(port = $port)); if ($use_tcp) { $self->append_conf('postgresql.conf', "listen_addresses = '$host'"); @@ -694,6 +690,11 @@ port = $port $self->append_conf('postgresql.conf', "unix_socket_directories = '$host'"); } + $self->append_conf('postgresql.conf', qq(port = $port)); + $self->append_conf('postgresql.conf', + qq(primary_slot_name = $params{primary_slot_name})) + if defined $params{primary_slot_name}; + $self->enable_streaming($root_node) if $params{has_streaming}; $self->enable_restoring($root_node) if $params{has_restoring}; return; -- 2.20.1
>From 209a72b703ea6234df5fcecd2eff65a3f9d7a72b Mon Sep 17 00:00:00 2001 From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp> Date: Thu, 21 Dec 2017 17:33:53 +0900 Subject: [PATCH 4/6] TAP test for the slot limit feature --- src/test/recovery/t/016_replslot_limit.pl | 184 ++++++++++++++++++++++ 1 file changed, 184 insertions(+) create mode 100644 src/test/recovery/t/016_replslot_limit.pl diff --git a/src/test/recovery/t/016_replslot_limit.pl b/src/test/recovery/t/016_replslot_limit.pl new file mode 100644 index 0000000000..e150ca7a54 --- /dev/null +++ b/src/test/recovery/t/016_replslot_limit.pl @@ -0,0 +1,184 @@ +# Test for replication slot limit +# Ensure that max_slot_wal_keep_size limits the number of WAL files to +# be kept by replication slots. + +use strict; +use warnings; +use File::Path qw(rmtree); +use PostgresNode; +use TestLib; +use Test::More tests => 11; +use Time::HiRes qw(usleep); + +$ENV{PGDATABASE} = 'postgres'; + +# Initialize master node, setting wal-segsize to 1MB +my $node_master = get_new_node('master'); +$node_master->init(allows_streaming => 1, extra => ['--wal-segsize=1']); +$node_master->append_conf('postgresql.conf', qq( +min_wal_size = 2MB +max_wal_size = 3MB +)); +$node_master->start; +$node_master->safe_psql('postgres', "SELECT pg_create_physical_replication_slot('rep1')"); + +# The slot state should be the state "unknown" before the first connection +my $result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, remain FROM pg_replication_slots WHERE slot_name = 'rep1'"); +is($result, "|unknown|0", 'check the state of non-reserved slot is "unknown"'); + + +# Take backup +my $backup_name = 'my_backup'; +$node_master->backup($backup_name); + +# Create a standby linking to it using the replication slot +my $node_standby = get_new_node('standby_1'); +$node_standby->init_from_backup($node_master, $backup_name, has_streaming => 1, primary_slot_name => 'rep1'); + +$node_standby->start; + +# Wait until standby has replayed enough data +my $start_lsn = $node_master->lsn('write'); +$node_master->wait_for_catchup($node_standby, 'replay', $start_lsn); + +# Stop standby +$node_standby->stop; + + +# Preparation done, the slot is the state "streaming" now +$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, remain FROM pg_replication_slots WHERE slot_name = 'rep1'"); +is($result, "$start_lsn|streaming|0", 'check the catching-up state'); + +# Advance WAL by five segments (= 5MB) on master +advance_wal($node_master, 5); +$node_master->safe_psql('postgres', "CHECKPOINT;"); + +# The slot is always "safe" when max_slot_wal_keep_size is not set +$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, remain FROM pg_replication_slots WHERE slot_name = 'rep1'"); +is($result, "$start_lsn|streaming|0", 'check that slot is working'); + +# The standby can connect to master +$node_standby->start; + +$start_lsn = $node_master->lsn('write'); +$node_master->wait_for_catchup($node_standby, 'replay', $start_lsn); + +$node_standby->stop; + +# Set max_slot_wal_keep_size on master +my $max_slot_wal_keep_size_mb = 3; +$node_master->append_conf('postgresql.conf', qq( +max_slot_wal_keep_size = ${max_slot_wal_keep_size_mb}MB +)); +$node_master->reload; + +# The slot is in safe state. The remaining bytes should be as almost +# (max_slot_wal_keep_size + 1) times large as the segment size +$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, pg_size_pretty(remain) as remain FROM pg_replication_slots WHERE slot_name = 'rep1'"); +is($result, "$start_lsn|streaming|4096 kB", 'check that max_slot_wal_keep_size is working'); + +# Advance WAL again then checkpoint +advance_wal($node_master, 2); +$node_master->safe_psql('postgres', "CHECKPOINT;"); + +# The slot is still working. +$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, pg_size_pretty(remain) as remain FROM pg_replication_slots WHERE slot_name = 'rep1'"); +is($result, "$start_lsn|streaming|2048 kB", 'check that remaining byte is calculated correctly'); + +# wal_keep_segments overrides max_slot_wal_keep_size +$result = $node_master->safe_psql('postgres', "ALTER SYSTEM SET wal_keep_segments to 6; SELECT pg_reload_conf();"); +$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, pg_size_pretty(remain) as remain FROM pg_replication_slots WHERE slot_name = 'rep1'"); +is($result, "$start_lsn|streaming|5120 kB", 'check that wal_keep_segments overrides max_slot_wal_keep_size'); + +# restore wal_keep_segments +$result = $node_master->safe_psql('postgres', "ALTER SYSTEM SET wal_keep_segments to 0; SELECT pg_reload_conf();"); + +# Advance WAL again without checkpoint +advance_wal($node_master, 2); + +# Slot gets to 'keeping' state +$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, pg_size_pretty(remain) as remain FROM pg_replication_slots WHERE slot_name = 'rep1'"); +is($result, "$start_lsn|keeping|0 bytes", 'check that the slot state changes to "keeping"'); + +# The standby still can connect to master +$node_standby->start; + +$start_lsn = $node_master->lsn('write'); +$node_master->wait_for_catchup($node_standby, 'replay', $start_lsn); + +$node_standby->stop; + +ok(!find_in_log($node_standby, + "requested WAL segment [0-9A-F]+ has already been removed"), + 'check that required WAL segments are still available'); + +# Advance WAL again, the slot loses some segments. +my $logstart = get_log_size($node_master); +advance_wal($node_master, 5); +$node_master->safe_psql('postgres', "CHECKPOINT;"); + +# WARNING should be issued +ok(find_in_log($node_master, + "some replication slots have lost required WAL segments\n". + ".*Slot rep1 lost 2 segment\\(s\\)\\.", + $logstart), + 'check that the warning is logged'); + +# This slot should be broken +$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, pg_size_pretty(remain) as remain FROM pg_replication_slots WHERE slot_name = 'rep1'"); +is($result, "$start_lsn|lost|0 bytes", 'check that the slot state changes to "lost"'); + +# The standby no longer can connect to the master +$logstart = get_log_size($node_standby); +$node_standby->start; + +my $failed = 0; +for (my $i = 0 ; $i < 10000 ; $i++) +{ + if (find_in_log($node_standby, + "requested WAL segment [0-9A-F]+ has already been removed", + $logstart)) + { + $failed = 1; + last; + } + usleep(100_000); +} +ok($failed, 'check that replication has been broken'); + +$node_standby->stop; + +##################################### +# Advance WAL of $node by $n segments +sub advance_wal +{ + my ($node, $n) = @_; + + # Advance by $n segments (= (16 * $n) MB) on master + for (my $i = 0 ; $i < $n ; $i++) + { + $node->safe_psql('postgres', "CREATE TABLE t (); DROP TABLE t; SELECT pg_switch_wal();"); + } +} + +# return the size of logfile of $node in bytes +sub get_log_size +{ + my ($node) = @_; + + return (stat $node->logfile)[7]; +} + +# find $pat in logfile of $node after $off-th byte +sub find_in_log +{ + my ($node, $pat, $off) = @_; + + $off = 0 unless defined $off; + my $log = TestLib::slurp_file($node->logfile); + return 0 if (length($log) <= $off); + + $log = substr($log, $off); + + return $log =~ m/$pat/; +} -- 2.20.1
>From b0301d8f50f0a511ad8fcb9e2b45ed97acfc35d8 Mon Sep 17 00:00:00 2001 From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp> Date: Thu, 11 Jan 2018 15:00:32 +0900 Subject: [PATCH 5/6] Documentation for slot-limit feature --- doc/src/sgml/catalogs.sgml | 28 ++++++++++++++++++++++++++++ doc/src/sgml/config.sgml | 23 +++++++++++++++++++++++ doc/src/sgml/high-availability.sgml | 8 +++++--- 3 files changed, 56 insertions(+), 3 deletions(-) diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index f2b9d404cb..acfc80f921 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -9975,6 +9975,34 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx </entry> </row> + <row> + <entry><structfield>wal_status</structfield></entry> + <entry><type>text</type></entry> + <entry></entry> + + <entry>Availability of WAL records claimed by this + slot. <literal>streaming</literal>, <literal>keeping</literal>, + <literal>lost</literal> + or <literal>unknown</literal>. <literal>streaming</literal> means that + the claimed records are available. <literal>keeping</literal> means that + some of them are to be removed by the next checkpoint. + <literal>lost</literal> means that some of them are no longer + available. The last two states are seen only when + <xref linkend="guc-max-slot-wal-keep-size"/> is + non-negative. If <structfield>restart_lsn</structfield> is NULL, this + field is <literal>unknown</literal>. + </entry> + </row> + + <row> + <entry><structfield>remain</structfield></entry> + <entry><type>bigint</type></entry> + <entry></entry> + <entry>The amount in bytes that WAL location (LSN) can advance until + this slot may lose required WAL records. + </entry> + </row> + </tbody> </tgroup> </table> diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 84341a30e5..f203d42921 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -3650,6 +3650,29 @@ restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"' # Windows </listitem> </varlistentry> + <varlistentry id="guc-max-slot-wal-keep-size" xreflabel="max_slot_wal_keep_size"> + <term><varname>max_slot_wal_keep_size</varname> (<type>integer</type>) + <indexterm> + <primary><varname>max_slot_wal_keep_size</varname> configuration parameter</primary> + </indexterm> + </term> + <listitem> + <para> + Specify the maximum size of WAL files + that <link linkend="streaming-replication-slots">replication + slots</link> are allowed to retain in the <filename>pg_wal</filename> + directory at checkpoint time. + If <varname>max_slot_wal_keep_size</varname> is -1 (the default), + replication slots retain unlimited size of WAL files. If restart_lsn + of a replication slot gets behind more than that bytes from the + current LSN, the standby using the slot may no longer be able to + reconnect due to removal of required WAL records. You can see the WAL + availability of replication slots + in <link linkend="view-pg-replication-slots">pg_replication_slots</link>. + </para> + </listitem> + </varlistentry> + <varlistentry id="guc-wal-sender-timeout" xreflabel="wal_sender_timeout"> <term><varname>wal_sender_timeout</varname> (<type>integer</type>) <indexterm> diff --git a/doc/src/sgml/high-availability.sgml b/doc/src/sgml/high-availability.sgml index 543691dad4..ae8c3a2aca 100644 --- a/doc/src/sgml/high-availability.sgml +++ b/doc/src/sgml/high-availability.sgml @@ -925,9 +925,11 @@ primary_conninfo = 'host=192.168.1.50 port=5432 user=foo password=foopass' <xref linkend="guc-archive-command"/>. However, these methods often result in retaining more WAL segments than required, whereas replication slots retain only the number of segments - known to be needed. An advantage of these methods is that they bound - the space requirement for <literal>pg_wal</literal>; there is currently no way - to do this using replication slots. + known to be needed. On the other hand, replication slots can retain so + many WAL segments that they fill up the space allocated + for <literal>pg_wal</literal>; + <xref linkend="guc-max-slot-wal-keep-size"/> limits the size of WAL files + retained by replication slots. </para> <para> Similarly, <xref linkend="guc-hot-standby-feedback"/> -- 2.20.1
>From a4693ba0c4965d1800a31d0d9918d0a6edf40ed5 Mon Sep 17 00:00:00 2001 From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp> Date: Fri, 26 Oct 2018 10:07:05 +0900 Subject: [PATCH 6/6] Check removal of in-reading segment file. Checkpoints can recycle a segment file while it is being read by ReadRecord and that leads to an apparently odd error message during logical decoding. This patch explicitly checks that then error out immediately. Reading a recycled file is safe. Inconsistency caused by overwrites as a new segment are caught by page/record validation. So this is only for keeping consistency with the wal_status shown in pg_replication_slots. --- src/backend/access/transam/xlogreader.c | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index 88be7fe022..a32e38dc7a 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -26,6 +26,7 @@ #include "replication/origin.h" #ifndef FRONTEND +#include "access/xlog.h" #include "utils/memutils.h" #endif @@ -224,7 +225,9 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg) uint32 pageHeaderSize; bool gotheader; int readOff; - +#ifndef FRONTEND + XLogSegNo targetSegNo; +#endif /* * randAccess indicates whether to verify the previous-record pointer of * the record we're reading. We only do this if we're reading @@ -270,6 +273,22 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg) targetPagePtr = RecPtr - (RecPtr % XLOG_BLCKSZ); targetRecOff = RecPtr % XLOG_BLCKSZ; +#ifndef FRONTEND + /* + * Although It's safe that the current segment is recycled as a new + * segment since we check the page/record header at reading, it leads to + * an apparently strange error message when logical replication, which can + * be prevented by explicitly checking if the current segment is removed. + */ + XLByteToSeg(targetPagePtr, targetSegNo, state->wal_segment_size); + if (targetSegNo <= XLogGetLastRemovedSegno()) + { + report_invalid_record(state, + "WAL segment for LSN %X/%X has been removed", + (uint32)(RecPtr >> 32), (uint32) RecPtr); + goto err; + } +#endif /* * Read the page containing the record into state->readBuf. Request enough * byte to cover the whole record header, or at least the part of it that -- 2.20.1