On 2020-Apr-06, Alvaro Herrera wrote: > Lastly, I noticed that we're now changing the slot's restart_lsn to > Invalid without being the slot's owner, which goes counter to what is > said in slot.h: > > * - Individual fields are protected by mutex where only the backend owning > * the slot is authorized to update the fields from its own slot. The > * backend owning the slot does not need to take this lock when reading its > * own fields, while concurrent backends not owning this slot should take the > * lock when reading this slot's data. > > What this means is that if the slot owner walsender updates the > restart_lsn to a newer value just as we (checkpointer) are trying to set > it to Invalid, the owner's value might persist and our value would be > lost. > > AFAICT if we were really stressed about getting this exactly correct, > then we would have to kill the walsender, wait for it to die, then > ReplicationSlotAcquire and *then* update > MyReplicationSlot->data.restart_lsn.
So I had cold feet about the whole business of trying to write a non-owned replication slot, so I tried to implemented the "exactly correct" idea above. That's v25 here. I think there's a race condition in this: if we kill a walsender and it restarts immediately before we (checkpoint) can acquire the slot, we will wait for it to terminate on its own. Fixing this requires changing the ReplicationSlotAcquire API so that it knows not to wait but not raise error either (so we can use an infinite loop: "acquire, if busy send signal") I also include a separate diff for a change that might or might not be necessary, where xmins reserved by slots with restart_lsn=invalid are ignored. I'm not yet sure that we should include this, but we should keep an eye on it. -- Álvaro Herrera https://www.2ndQuadrant.com/ PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
>From 5d8636482558cc8cb831addf2fd7b1ff4de53634 Mon Sep 17 00:00:00 2001 From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp> Date: Wed, 19 Dec 2018 12:43:57 +0900 Subject: [PATCH v25] Add WAL relief vent for replication slots Replication slot is useful to maintain replication connection in the configurations where replication is so delayed that connection is broken. On the other hand so many WAL files can fill up disk that the master downs by a long delay. This feature, which is activated by a GUC "max_slot_wal_keep_size", protects master servers from suffering disk full by limiting the number of WAL files reserved by replication slots. --- doc/src/sgml/catalogs.sgml | 39 +++++ doc/src/sgml/config.sgml | 23 +++ doc/src/sgml/high-availability.sgml | 8 +- src/backend/access/transam/xlog.c | 146 +++++++++++++++--- src/backend/catalog/system_views.sql | 4 +- src/backend/replication/slot.c | 76 +++++++++ src/backend/replication/slotfuncs.c | 42 ++++- src/backend/utils/misc/guc.c | 13 ++ src/backend/utils/misc/postgresql.conf.sample | 1 + src/include/access/xlog.h | 17 ++ src/include/catalog/pg_proc.dat | 6 +- src/include/replication/slot.h | 1 + src/test/regress/expected/rules.out | 6 +- 13 files changed, 349 insertions(+), 33 deletions(-) diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index 64614b569c..de8ca5ccca 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -9907,6 +9907,45 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx </entry> </row> + <row> + <entry><structfield>wal_status</structfield></entry> + <entry><type>text</type></entry> + <entry></entry> + + <entry>Availability of WAL files claimed by this slot. + Valid values are: + <simplelist> + <member> + <literal>normal</literal> means that the claimed files + are within <varname>max_wal_size</varname> + </member> + <member> + <literal>keeping</literal> means that <varname>max_wal_size</varname> + is exceeded but still held by replication slots or + <varname>wal_keep_segments</varname> + </member> + <member> + <literal>lost</literal> means that some of them are definitely lost + and the session using this slot cannot continue replication. This + state will be hardly seen because walsender that enters this state is + terminated immediately. + </member> + </simplelist> + The last two states are seen only when + <xref linkend="guc-max-slot-wal-keep-size"/> is + non-negative. If <structfield>restart_lsn</structfield> is NULL, this + field is null. + </entry> + </row> + + <row> + <entry><structfield>min_safe_lsn</structfield></entry> + <entry><type>pg_lsn</type></entry> + <entry></entry> + <entry>The minimum LSN currently available for walsenders. + </entry> + </row> + </tbody> </tgroup> </table> diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index f68c992213..095b3668b8 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -3777,6 +3777,29 @@ restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"' # Windows </listitem> </varlistentry> + <varlistentry id="guc-max-slot-wal-keep-size" xreflabel="max_slot_wal_keep_size"> + <term><varname>max_slot_wal_keep_size</varname> (<type>integer</type>) + <indexterm> + <primary><varname>max_slot_wal_keep_size</varname> configuration parameter</primary> + </indexterm> + </term> + <listitem> + <para> + Specify the maximum size of WAL files + that <link linkend="streaming-replication-slots">replication + slots</link> are allowed to retain in the <filename>pg_wal</filename> + directory at checkpoint time. + If <varname>max_slot_wal_keep_size</varname> is -1 (the default), + replication slots retain unlimited amount of WAL files. If + restart_lsn of a replication slot gets behind more than that megabytes + from the current LSN, the standby using the slot may no longer be able + to continue replication due to removal of required WAL files. You + can see the WAL availability of replication slots + in <link linkend="view-pg-replication-slots">pg_replication_slots</link>. + </para> + </listitem> + </varlistentry> + <varlistentry id="guc-wal-sender-timeout" xreflabel="wal_sender_timeout"> <term><varname>wal_sender_timeout</varname> (<type>integer</type>) <indexterm> diff --git a/doc/src/sgml/high-availability.sgml b/doc/src/sgml/high-availability.sgml index bb5d9962ed..4659b9ef5d 100644 --- a/doc/src/sgml/high-availability.sgml +++ b/doc/src/sgml/high-availability.sgml @@ -925,9 +925,11 @@ primary_conninfo = 'host=192.168.1.50 port=5432 user=foo password=foopass' <xref linkend="guc-archive-command"/>. However, these methods often result in retaining more WAL segments than required, whereas replication slots retain only the number of segments - known to be needed. An advantage of these methods is that they bound - the space requirement for <literal>pg_wal</literal>; there is currently no way - to do this using replication slots. + known to be needed. On the other hand, replication slots can retain so + many WAL segments that they fill up the space allocated + for <literal>pg_wal</literal>; + <xref linkend="guc-max-slot-wal-keep-size"/> limits the size of WAL files + retained by replication slots. </para> <para> Similarly, <xref linkend="guc-hot-standby-feedback"/> diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index abf954ba39..bceddd6c05 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -107,6 +107,7 @@ int wal_level = WAL_LEVEL_MINIMAL; int CommitDelay = 0; /* precommit delay in microseconds */ int CommitSiblings = 5; /* # concurrent xacts needed to sleep */ int wal_retrieve_retry_interval = 5000; +int max_slot_wal_keep_size_mb = -1; #ifdef WAL_DEBUG bool XLOG_DEBUG = false; @@ -758,7 +759,7 @@ static ControlFileData *ControlFile = NULL; */ #define UsableBytesInPage (XLOG_BLCKSZ - SizeOfXLogShortPHD) -/* Convert min_wal_size_mb and max_wal_size_mb to equivalent segment count */ +/* Convert values of GUCs measured in megabytes to equiv. segment count */ #define ConvertToXSegs(x, segsize) \ (x / ((segsize) / (1024 * 1024))) @@ -3937,9 +3938,10 @@ XLogGetLastRemovedSegno(void) return lastRemovedSegNo; } + /* - * Update the last removed segno pointer in shared memory, to reflect - * that the given XLOG file has been removed. + * Update the last removed segno pointer in shared memory, to reflect that the + * given XLOG file has been removed. */ static void UpdateLastRemovedPtr(char *filename) @@ -9005,6 +9007,7 @@ CreateCheckPoint(int flags) */ XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size); KeepLogSeg(recptr, &_logSegNo); + InvalidateObsoleteReplicationSlots(_logSegNo); _logSegNo--; RemoveOldXlogFiles(_logSegNo, RedoRecPtr, recptr); @@ -9339,6 +9342,7 @@ CreateRestartPoint(int flags) replayPtr = GetXLogReplayRecPtr(&replayTLI); endptr = (receivePtr < replayPtr) ? replayPtr : receivePtr; KeepLogSeg(endptr, &_logSegNo); + InvalidateObsoleteReplicationSlots(_logSegNo); _logSegNo--; /* @@ -9407,48 +9411,144 @@ CreateRestartPoint(int flags) return true; } +/* + * Report availability of WAL for a replication slot + * restart_lsn and active_pid are straight from the slot info + * + * Returns one of the following enum values. + * + * WALAVAIL_NORMAL means targetLSN is available because it is in the range of + * max_wal_size. If max_slot_wal_keep_size is smaller than max_wal_size, this + * state is not returned. + * + * WALAVAIL_PRESERVED means it is still available by preserving extra segments + * beyond max_wal_size. + * + * WALAVAIL_REMOVED means it is definitely lost. The replication stream on the + * slot cannot continue. + * + * WALAVAIL_INVALID_LSN means the slot hasn't been set to reserve WAL. + */ +WalAvailability +GetWalAvailability(XLogRecPtr restart_lsn, pid_t walsender_pid) +{ + XLogRecPtr currpos; + XLogSegNo currSeg; /* segid of currpos */ + XLogSegNo restartSeg; /* segid of restart_lsn */ + XLogSegNo oldestSeg; /* actual oldest segid */ + XLogSegNo oldestSegMaxWalSize; /* oldest segid kept by max_wal_size */ + XLogSegNo oldestSlotSeg = InvalidXLogRecPtr;/* oldest segid kept by slot */ + uint64 keepSegs; + + /* slot does not reserve WAL. Either deactivated, or has never been active */ + if (XLogRecPtrIsInvalid(restart_lsn)) + return WALAVAIL_INVALID_LSN; + + currpos = GetXLogWriteRecPtr(); + + /* calculate oldest segment currently needed by slots */ + XLByteToSeg(restart_lsn, restartSeg, wal_segment_size); + KeepLogSeg(currpos, &oldestSlotSeg); + + /* + * Find the oldest extant segment file. We get 1 until checkpoint removes + * the first WAL segment file since startup, which causes the status being + * wrong under certain abnormal conditions but that doesn't actually harm. + */ + oldestSeg = XLogGetLastRemovedSegno() + 1; + + /* calculate oldest segment by max_wal_size and wal_keep_segments */ + XLByteToSeg(currpos, currSeg, wal_segment_size); + keepSegs = ConvertToXSegs(Max(max_wal_size_mb, wal_keep_segments), + wal_segment_size) + 1; + + if (currSeg > keepSegs) + oldestSegMaxWalSize = currSeg - keepSegs; + else + oldestSegMaxWalSize = 1; + + /* + * If max_slot_wal_keep_size has changed after the last call, the segment + * that would been kept by the current setting might have been lost by the + * previous setting. No point in showing normal or keeping status values if + * the restartSeg is known to be lost. + */ + if (restartSeg >= oldestSeg) + { + /* + * show "normal" when restartSeg is within max_wal_size. If + * max_slot_wal_keep_size is smaller than max_wal_size, there's no + * point in showing the status. + */ + if ((max_slot_wal_keep_size_mb <= 0 || + max_slot_wal_keep_size_mb >= max_wal_size_mb) && + oldestSegMaxWalSize <= restartSeg) + return WALAVAIL_NORMAL; + + /* being retained by slots */ + if (oldestSlotSeg <= restartSeg) + return WALAVAIL_PRESERVED; + } + + /* definitely lost. the walsender can no longer restart */ + return WALAVAIL_REMOVED; +} + + /* * Retreat *logSegNo to the last segment that we need to retain because of * either wal_keep_segments or replication slots. * * This is calculated by subtracting wal_keep_segments from the given xlog * location, recptr and by making sure that that result is below the - * requirement of replication slots. + * requirement of replication slots. For the latter criterion we do consider + * the effects of max_slot_wal_keep_size: reserve at most that much space back + * from recptr. */ static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo) { + XLogSegNo currSegNo; XLogSegNo segno; XLogRecPtr keep; - XLByteToSeg(recptr, segno, wal_segment_size); - keep = XLogGetReplicationSlotMinimumLSN(); + XLByteToSeg(recptr, currSegNo, wal_segment_size); + segno = currSegNo; - /* compute limit for wal_keep_segments first */ - if (wal_keep_segments > 0) + /* + * Calculate how many segments are kept by slots first, adjusting + * for max_slot_wal_keep_size. + */ + keep = XLogGetReplicationSlotMinimumLSN(); + if (keep != InvalidXLogRecPtr) { - /* avoid underflow, don't go below 1 */ - if (segno <= wal_keep_segments) - segno = 1; - else - segno = segno - wal_keep_segments; + XLByteToSeg(keep, segno, wal_segment_size); + + /* Cap by max_slot_wal_keep_size ... */ + if (max_slot_wal_keep_size_mb >= 0) + { + XLogRecPtr slot_keep_segs; + + slot_keep_segs = + ConvertToXSegs(max_slot_wal_keep_size_mb, wal_segment_size); + + if (currSegNo - segno > slot_keep_segs) + segno = currSegNo - slot_keep_segs; + } } - /* then check whether slots limit removal further */ - if (max_replication_slots > 0 && keep != InvalidXLogRecPtr) + /* but, keep at least wal_keep_segments if that's set */ + if (wal_keep_segments > 0 && currSegNo - segno < wal_keep_segments) { - XLogSegNo slotSegNo; - - XLByteToSeg(keep, slotSegNo, wal_segment_size); - - if (slotSegNo <= 0) + /* avoid underflow, don't go below 1 */ + if (currSegNo <= wal_keep_segments) segno = 1; - else if (slotSegNo < segno) - segno = slotSegNo; + else + segno = currSegNo - wal_keep_segments; } /* don't delete WAL segments newer than the calculated segment */ - if (segno < *logSegNo) + if (XLogRecPtrIsInvalid(*logSegNo) || segno < *logSegNo) *logSegNo = segno; } diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 813ea8bfc3..d406ea8118 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -876,7 +876,9 @@ CREATE VIEW pg_replication_slots AS L.xmin, L.catalog_xmin, L.restart_lsn, - L.confirmed_flush_lsn + L.confirmed_flush_lsn, + L.wal_status, + L.min_safe_lsn FROM pg_get_replication_slots() AS L LEFT JOIN pg_database D ON (L.datoid = D.oid); diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 47851ec4c1..d3e623b845 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -743,6 +743,10 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked) /* * Compute the oldest restart LSN across all slots and inform xlog module. + * + * Note: while max_slot_wal_keep_size is theoretically relevant for this + * purpose, we don't try to account for that, because this module doesn't + * know what to compare against. */ void ReplicationSlotsComputeRequiredLSN(void) @@ -818,6 +822,9 @@ ReplicationSlotsComputeLogicalRestartLSN(void) restart_lsn = s->data.restart_lsn; SpinLockRelease(&s->mutex); + if (restart_lsn == InvalidXLogRecPtr) + continue; + if (result == InvalidXLogRecPtr || restart_lsn < result) result = restart_lsn; @@ -1064,6 +1071,75 @@ ReplicationSlotReserveWal(void) } } +/* + * Mark any slot that points to an LSN older than the given segment + * as invalid; it requires WAL that's about to be removed. + * + * NB - this runs as part of checkpoint, so avoid raising errors if possible. + */ +void +InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno) +{ + XLogRecPtr oldestLSN; + + XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN); + +restart: + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + for (int i = 0; i < max_replication_slots; i++) + { + ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; + XLogRecPtr restart_lsn = InvalidXLogRecPtr; + char *slotname; + int wspid; + + if (!s->in_use) + continue; + + SpinLockAcquire(&s->mutex); + if (s->data.restart_lsn == InvalidXLogRecPtr || + s->data.restart_lsn >= oldestLSN) + { + SpinLockRelease(&s->mutex); + continue; + } + + slotname = pstrdup(NameStr(s->data.name)); + restart_lsn = s->data.restart_lsn; + wspid = s->active_pid; + + SpinLockRelease(&s->mutex); + LWLockRelease(ReplicationSlotControlLock); + + if (wspid != 0) + { + ereport(LOG, + (errmsg("terminating walsender %d because replication slot is too far behind", + wspid))); + (void) kill(wspid, SIGTERM); + } + + /* Here we wait until the walsender is gone */ + ReplicationSlotAcquire(slotname, false); + + ereport(LOG, + (errmsg("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size", + slotname, + (uint32) (restart_lsn >> 32), + (uint32) restart_lsn))); + + SpinLockAcquire(&s->mutex); + s->data.restart_lsn = InvalidXLogRecPtr; + SpinLockRelease(&s->mutex); + ReplicationSlotRelease(); + + /* if we did anything, start from scratch */ + CHECK_FOR_INTERRUPTS(); + goto restart; + } + LWLockRelease(ReplicationSlotControlLock); +} + /* * Flush all replication slots to disk. * diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index ce0c9127bc..91a5d0f290 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -234,7 +234,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS) Datum pg_get_replication_slots(PG_FUNCTION_ARGS) { -#define PG_GET_REPLICATION_SLOTS_COLS 11 +#define PG_GET_REPLICATION_SLOTS_COLS 13 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; TupleDesc tupdesc; Tuplestorestate *tupstore; @@ -288,6 +288,8 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) Oid database; NameData slot_name; NameData plugin; + WalAvailability walstate; + XLogSegNo last_removed_seg; int i; if (!slot->in_use) @@ -355,6 +357,40 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) else nulls[i++] = true; + walstate = GetWalAvailability(restart_lsn, active_pid); + + switch (walstate) + { + case WALAVAIL_INVALID_LSN: + nulls[i++] = true; + break; + + case WALAVAIL_NORMAL: + values[i++] = CStringGetTextDatum("normal"); + break; + + case WALAVAIL_PRESERVED: + values[i++] = CStringGetTextDatum("keeping"); + break; + + case WALAVAIL_REMOVED: + values[i++] = CStringGetTextDatum("lost"); + break; + } + + if (max_slot_wal_keep_size_mb >= 0 && + (walstate == WALAVAIL_NORMAL || walstate == WALAVAIL_PRESERVED) && + ((last_removed_seg = XLogGetLastRemovedSegno()) != 0)) + { + XLogRecPtr min_safe_lsn; + + XLogSegNoOffsetToRecPtr(last_removed_seg + 1, 0, + wal_segment_size, min_safe_lsn); + values[i++] = Int64GetDatum(min_safe_lsn); + } + else + nulls[i++] = true; + tuplestore_putvalues(tupstore, tupdesc, values, nulls); } LWLockRelease(ReplicationSlotControlLock); @@ -377,6 +413,8 @@ pg_physical_replication_slot_advance(XLogRecPtr moveto) XLogRecPtr startlsn = MyReplicationSlot->data.restart_lsn; XLogRecPtr retlsn = startlsn; + Assert(moveto != InvalidXLogRecPtr); + if (startlsn < moveto) { SpinLockAcquire(&MyReplicationSlot->mutex); @@ -414,6 +452,8 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto) ResourceOwner old_resowner = CurrentResourceOwner; XLogRecPtr retlsn; + Assert(moveto != InvalidXLogRecPtr); + PG_TRY(); { /* diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 03a22d71ac..5bdc02fce2 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -2784,6 +2784,19 @@ static struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + { + {"max_slot_wal_keep_size", PGC_SIGHUP, REPLICATION_SENDING, + gettext_noop("Sets the maximum WAL size that can be reserved by replication slots."), + gettext_noop("Replication slots will be marked as failed, and segments released " + "for deletion or recycling, if this much space is occupied by WAL " + "on disk."), + GUC_UNIT_MB + }, + &max_slot_wal_keep_size_mb, + -1, -1, MAX_KILOBYTES, + NULL, NULL, NULL + }, + { {"wal_sender_timeout", PGC_USERSET, REPLICATION_SENDING, gettext_noop("Sets the maximum time to wait for WAL replication."), diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 1ae8b77306..995b6ca155 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -289,6 +289,7 @@ #max_wal_senders = 10 # max number of walsender processes # (change requires restart) #wal_keep_segments = 0 # in logfile segments; 0 disables +#max_slot_wal_keep_size = -1 # measured in bytes; -1 disables #wal_sender_timeout = 60s # in milliseconds; 0 disables #max_replication_slots = 10 # max number of replication slots diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 7412caa5f2..1c3be04505 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -108,6 +108,7 @@ extern int wal_segment_size; extern int min_wal_size_mb; extern int max_wal_size_mb; extern int wal_keep_segments; +extern int max_slot_wal_keep_size_mb; extern int XLOGbuffers; extern int XLogArchiveTimeout; extern int wal_retrieve_retry_interval; @@ -255,6 +256,19 @@ typedef struct CheckpointStatsData extern CheckpointStatsData CheckpointStats; +/* + * WAL segment availability status + * + * This is used as the return value of GetWalAvailability. + */ +typedef enum WalAvailability +{ + WALAVAIL_INVALID_LSN, /* parameter error */ + WALAVAIL_NORMAL, /* WAL segment is within max_wal_size */ + WALAVAIL_PRESERVED, /* WAL segment is preserved by repslots */ + WALAVAIL_REMOVED /* WAL segment has been removed */ +} WalAvailability; + struct XLogRecData; extern XLogRecPtr XLogInsertRecord(struct XLogRecData *rdata, @@ -305,6 +319,9 @@ extern void ShutdownXLOG(int code, Datum arg); extern void InitXLOGAccess(void); extern void CreateCheckPoint(int flags); extern bool CreateRestartPoint(int flags); +extern WalAvailability GetWalAvailability(XLogRecPtr restart_lsn, + pid_t walsender_pid); +extern XLogRecPtr CalculateMaxmumSafeLSN(void); extern void XLogPutNextOid(Oid nextOid); extern XLogRecPtr XLogRestorePoint(const char *rpName); extern void UpdateFullPageWrites(void); diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index a649e44d08..ef808c5c43 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -9986,9 +9986,9 @@ proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f', proretset => 't', provolatile => 's', prorettype => 'record', proargtypes => '', - proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn}', - proargmodes => '{o,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn}', + proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,pg_lsn}', + proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,min_safe_lsn}', prosrc => 'pg_get_replication_slots' }, { oid => '3786', descr => 'set up a logical replication slot', proname => 'pg_create_logical_replication_slot', provolatile => 'v', diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 3e95b019b3..6e469ea749 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -198,6 +198,7 @@ extern void ReplicationSlotsComputeRequiredLSN(void); extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void); extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive); extern void ReplicationSlotsDropDBSlots(Oid dboid); +extern void InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno); extern void StartupReplicationSlots(void); extern void CheckPointReplicationSlots(void); diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 6eec8ec568..ac31840739 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1462,8 +1462,10 @@ pg_replication_slots| SELECT l.slot_name, l.xmin, l.catalog_xmin, l.restart_lsn, - l.confirmed_flush_lsn - FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn) + l.confirmed_flush_lsn, + l.wal_status, + l.min_safe_lsn + FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, min_safe_lsn) LEFT JOIN pg_database d ON ((l.datoid = d.oid))); pg_roles| SELECT pg_authid.rolname, pg_authid.rolsuper, -- 2.20.1
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index d3e623b845..31e12e4043 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -714,6 +714,7 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked) ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; TransactionId effective_xmin; TransactionId effective_catalog_xmin; + XLogRecPtr restart_lsn; if (!s->in_use) continue; @@ -721,8 +722,13 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked) SpinLockAcquire(&s->mutex); effective_xmin = s->effective_xmin; effective_catalog_xmin = s->effective_catalog_xmin; + restart_lsn = s->data.restart_lsn; SpinLockRelease(&s->mutex); + /* is slot still relevant? */ + if (restart_lsn == InvalidXLogRecPtr) + continue; + /* check the data xmin */ if (TransactionIdIsValid(effective_xmin) && (!TransactionIdIsValid(agg_xmin) ||