Hello, this is a rebased version. It gets a change of the meaning of monitoring value along with rebasing.
In previous version, the "live" column mysteriously predicts the necessary segments will be kept or lost by the next checkpoint and the "distance" offered a still more mysterious value. In this version the meaning of the two columns became clear and informative. pg_replication_slots - live : true the slot have not lost necessary segments. - distance: how many bytes LSN can advance before the margin defined by max_slot_wal_keep_size (and wal_keep_segments) is exhasuted, or how many bytes this slot have lost xlog from restart_lsn. There is a case where live = t and distance = 0. The slot is currently having all the necessary segments but will start to lose them at most two checkpoint passes. regards, -- Kyotaro Horiguchi NTT Open Source Software Center
>From 57eaa2b878d30bfcebb093cca0e772fe7a9bff0e Mon Sep 17 00:00:00 2001 From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp> Date: Tue, 28 Feb 2017 11:39:48 +0900 Subject: [PATCH 1/2] Add WAL releaf vent for replication slots Adds a capability to limit the number of segments kept by replication slots by a GUC variable. --- src/backend/access/transam/xlog.c | 24 ++++++++++++++++++++++++ src/backend/utils/misc/guc.c | 11 +++++++++++ src/backend/utils/misc/postgresql.conf.sample | 1 + src/include/access/xlog.h | 1 + 4 files changed, 37 insertions(+) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index dd028a1..f79cefb 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -105,6 +105,7 @@ int wal_level = WAL_LEVEL_MINIMAL; int CommitDelay = 0; /* precommit delay in microseconds */ int CommitSiblings = 5; /* # concurrent xacts needed to sleep */ int wal_retrieve_retry_interval = 5000; +int max_slot_wal_keep_size_mb = 0; #ifdef WAL_DEBUG bool XLOG_DEBUG = false; @@ -9432,9 +9433,32 @@ KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo) if (max_replication_slots > 0 && keep != InvalidXLogRecPtr) { XLogSegNo slotSegNo; + int slotlimitsegs = ConvertToXSegs(max_slot_wal_keep_size_mb); XLByteToSeg(keep, slotSegNo, wal_segment_size); + /* + * ignore slots if too many wal segments are kept. + * max_slot_wal_keep_size is just accumulated on wal_keep_segments. + */ + if (max_slot_wal_keep_size_mb > 0 && slotSegNo + slotlimitsegs < segno) + { + segno = segno - slotlimitsegs; /* must be positive */ + + /* + * warn only if the checkpoint flushes the required segment. + * we assume here that *logSegNo is calculated keep location. + */ + if (slotSegNo < *logSegNo) + ereport(WARNING, + (errmsg ("restart LSN of replication slots is ignored by checkpoint"), + errdetail("Some replication slots have lost required WAL segnents to continue by up to %ld segments.", + (segno < *logSegNo ? segno : *logSegNo) - slotSegNo))); + + /* emergency vent */ + slotSegNo = segno; + } + if (slotSegNo <= 0) segno = 1; else if (slotSegNo < segno) diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 65372d7..511023a 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -2368,6 +2368,17 @@ static struct config_int ConfigureNamesInt[] = }, { + {"max_slot_wal_keep_size", PGC_SIGHUP, REPLICATION_SENDING, + gettext_noop("Sets the maximum size of extra WALs kept by replication slots."), + NULL, + GUC_UNIT_MB + }, + &max_slot_wal_keep_size_mb, + 0, 0, INT_MAX, + NULL, NULL, NULL + }, + + { {"wal_sender_timeout", PGC_SIGHUP, REPLICATION_SENDING, gettext_noop("Sets the maximum time to wait for WAL replication."), NULL, diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 368b280..e76c73a 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -234,6 +234,7 @@ #max_wal_senders = 10 # max number of walsender processes # (change requires restart) #wal_keep_segments = 0 # in logfile segments; 0 disables +#max_slot_wal_keep_size = 0 # measured in bytes; 0 disables #wal_sender_timeout = 60s # in milliseconds; 0 disables #max_replication_slots = 10 # max number of replication slots diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 0f2b8bd..f0c0255 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -98,6 +98,7 @@ extern int wal_segment_size; extern int min_wal_size_mb; extern int max_wal_size_mb; extern int wal_keep_segments; +extern int max_slot_wal_keep_size_mb; extern int XLOGbuffers; extern int XLogArchiveTimeout; extern int wal_retrieve_retry_interval; -- 2.9.2
>From 37749d46ba97de38e4593f141bb8a82c67fc0af5 Mon Sep 17 00:00:00 2001 From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp> Date: Thu, 7 Sep 2017 19:13:22 +0900 Subject: [PATCH 2/2] Add monitoring aid for max_replication_slots. Adds two columns "live" and "distance" in pg_replication_slot. Setting max_slot_wal_keep_size, long-disconnected slots may lose sync. The two columns shows how long a slot can live on or how many bytes a slot have lost if max_slot_wal_keep_size is set. --- src/backend/access/transam/xlog.c | 137 ++++++++++++++++++++++++++++++++++- src/backend/catalog/system_views.sql | 4 +- src/backend/replication/slotfuncs.c | 16 +++- src/include/access/xlog.h | 1 + src/include/catalog/pg_proc.h | 2 +- src/test/regress/expected/rules.out | 6 +- 6 files changed, 160 insertions(+), 6 deletions(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index f79cefb..a9203ff 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -9402,6 +9402,140 @@ CreateRestartPoint(int flags) return true; } + +/* + * Returns the segment number of the oldest file in XLOG directory. + */ +static XLogSegNo +GetOldestXLogFileSegNo(void) +{ + DIR *xldir; + struct dirent *xlde; + XLogSegNo segno = 0; + + xldir = AllocateDir(XLOGDIR); + if (xldir == NULL) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open write-ahead log directory \"%s\": %m", + XLOGDIR))); + + while ((xlde = ReadDir(xldir, XLOGDIR)) != NULL) + { + TimeLineID tli; + XLogSegNo fsegno; + + /* Ignore files that are not XLOG segments */ + if (!IsXLogFileName(xlde->d_name) && + !IsPartialXLogFileName(xlde->d_name)) + continue; + + XLogFromFileName(xlde->d_name, &tli, &fsegno, wal_segment_size); + + /* get minimum segment ignorig timeline ID */ + if (segno == 0 || fsegno < segno) + segno = fsegno; + } + + return segno; +} + +/* + * Check if the record on the given restartLSN is present in XLOG files. + * + * Returns true if it is present. If distance is given, it receives the + * distance to the point where the margin defined by max_slot_wal_keep_size_mb + * and wal_keep_segments will be exhausted or how many bytes we have lost + * after restartLSN. + * + * true and distance = 0 means that restartLSN will be lost by at most two + * checkpoints. + */ +bool +IsLsnStillAvaiable(XLogRecPtr restartLSN, uint64 *distance) +{ + XLogRecPtr currpos; + XLogSegNo currSeg; + XLogSegNo restartSeg; + XLogSegNo tailSeg; + XLogSegNo oldestSeg; + uint64 keepSegs; + uint64 restbytes; + + + currpos = GetXLogWriteRecPtr(); + + SpinLockAcquire(&XLogCtl->info_lck); + oldestSeg = XLogCtl->lastRemovedSegNo; + SpinLockRelease(&XLogCtl->info_lck); + + /* + * oldestSeg is zero before at least one segment has been removed since + * startup. Use oldest segno taken from file names. + */ + if (oldestSeg == 0) + { + static XLogSegNo oldestFileSeg = 0; + + if (oldestFileSeg == 0) + oldestFileSeg = GetOldestXLogFileSegNo(); + /* let it have the same meaning with lastRemovedSegNo here */ + oldestSeg = oldestFileSeg - 1; + } + + /* oldest segment is just after the last removed segment */ + oldestSeg++; + + XLByteToSeg(currpos, currSeg, wal_segment_size); + XLByteToSeg(restartLSN, restartSeg, wal_segment_size); + + /* no need to calculate distance. very easy. */ + if (!distance) + return oldestSeg <= restartSeg; + + /* This must perform the same thing as KeepLogSeg. */ + Assert(wal_keep_segments >= 0); + Assert(max_slot_wal_keep_size_mb >= 0); + keepSegs = wal_keep_segments + + ConvertToXSegs(max_slot_wal_keep_size_mb, wal_segment_size); + + /* + * calculate the oldest segment that will be kept by wal_keep_segments and + * max_slot_wal_keep_size_mb + */ + if (currSeg < keepSegs) + tailSeg = 0; + else + tailSeg = currSeg - keepSegs; + + if (restartSeg < oldestSeg) + { + /* + * restartSeg has been removed. Calculate how many bytes from the + * restartLSN have lost. + */ + restbytes = (restartSeg + 1) * wal_segment_size - restartLSN; + *distance = + (oldestSeg - (restartSeg + 1)) * wal_segment_size + restbytes; + + return false; + } + + if (tailSeg <= restartSeg) + { + /* Return how many bytes we can advance before the slot loses margin */ + restbytes = wal_segment_size - (currpos % wal_segment_size); + *distance = (restartSeg - tailSeg) * wal_segment_size + restbytes; + } + else + { + /* the margin ran out */ + *distance = 0; + } + + return true; +} + /* * Retreat *logSegNo to the last segment that we need to retain because of * either wal_keep_segments or replication slots. @@ -9433,7 +9567,8 @@ KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo) if (max_replication_slots > 0 && keep != InvalidXLogRecPtr) { XLogSegNo slotSegNo; - int slotlimitsegs = ConvertToXSegs(max_slot_wal_keep_size_mb); + int slotlimitsegs = + ConvertToXSegs(max_slot_wal_keep_size_mb, wal_segment_size); XLByteToSeg(keep, slotSegNo, wal_segment_size); diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index dc40cde..c55c88b 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -793,7 +793,9 @@ CREATE VIEW pg_replication_slots AS L.xmin, L.catalog_xmin, L.restart_lsn, - L.confirmed_flush_lsn + L.confirmed_flush_lsn, + L.live, + L.distance FROM pg_get_replication_slots() AS L LEFT JOIN pg_database D ON (L.datoid = D.oid); diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index ab776e8..107da1a 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -182,7 +182,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS) Datum pg_get_replication_slots(PG_FUNCTION_ARGS) { -#define PG_GET_REPLICATION_SLOTS_COLS 11 +#define PG_GET_REPLICATION_SLOTS_COLS 13 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; TupleDesc tupdesc; Tuplestorestate *tupstore; @@ -304,6 +304,20 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) else nulls[i++] = true; + if (max_slot_wal_keep_size_mb > 0 && restart_lsn != InvalidXLogRecPtr) + { + uint64 distance; + + values[i++] = BoolGetDatum(IsLsnStillAvaiable(restart_lsn, + &distance)); + values[i++] = Int64GetDatum(distance); + } + else + { + values[i++] = BoolGetDatum(true); + nulls[i++] = true; + } + tuplestore_putvalues(tupstore, tupdesc, values, nulls); } LWLockRelease(ReplicationSlotControlLock); diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index f0c0255..a7a1e4d 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -269,6 +269,7 @@ extern void ShutdownXLOG(int code, Datum arg); extern void InitXLOGAccess(void); extern void CreateCheckPoint(int flags); extern bool CreateRestartPoint(int flags); +extern bool IsLsnStillAvaiable(XLogRecPtr restartLSN, uint64 *distance); extern void XLogPutNextOid(Oid nextOid); extern XLogRecPtr XLogRestorePoint(const char *rpName); extern void UpdateFullPageWrites(void); diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h index 93c031a..0913e56 100644 --- a/src/include/catalog/pg_proc.h +++ b/src/include/catalog/pg_proc.h @@ -5340,7 +5340,7 @@ DATA(insert OID = 3779 ( pg_create_physical_replication_slot PGNSP PGUID 12 1 0 DESCR("create a physical replication slot"); DATA(insert OID = 3780 ( pg_drop_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 1 0 2278 "19" _null_ _null_ _null_ _null_ _null_ pg_drop_replication_slot _null_ _null_ _null_ )); DESCR("drop a replication slot"); -DATA(insert OID = 3781 ( pg_get_replication_slots PGNSP PGUID 12 1 10 0 0 f f f f f t s s 0 0 2249 "" "{19,19,25,26,16,16,23,28,28,3220,3220}" "{o,o,o,o,o,o,o,o,o,o,o}" "{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn}" _null_ _null_ pg_get_replication_slots _null_ _null_ _null_ )); +DATA(insert OID = 3781 ( pg_get_replication_slots PGNSP PGUID 12 1 10 0 0 f f f f f t s s 0 0 2249 "" "{19,19,25,26,16,16,23,28,28,3220,3220,16,3220}" "{o,o,o,o,o,o,o,o,o,o,o,o,o}" "{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,live,distance}" _null_ _null_ pg_get_replication_slots _null_ _null_ _null_ )); DESCR("information about replication slots currently in use"); DATA(insert OID = 3786 ( pg_create_logical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 3 0 2249 "19 19 16" "{19,19,16,25,3220}" "{i,i,i,o,o}" "{slot_name,plugin,temporary,slot_name,lsn}" _null_ _null_ pg_create_logical_replication_slot _null_ _null_ _null_ )); DESCR("set up a logical replication slot"); diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index f1c1b44..16a99d8 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1451,8 +1451,10 @@ pg_replication_slots| SELECT l.slot_name, l.xmin, l.catalog_xmin, l.restart_lsn, - l.confirmed_flush_lsn - FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn) + l.confirmed_flush_lsn, + l.live, + l.distance + FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, live, distance) LEFT JOIN pg_database d ON ((l.datoid = d.oid))); pg_roles| SELECT pg_authid.rolname, pg_authid.rolsuper, -- 2.9.2
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers