At Tue, 07 Apr 2020 12:09:05 +0900 (JST), Kyotaro Horiguchi <horikyota....@gmail.com> wrote in > > it seems to me that it suffices to check restart_lsn for being invalid > > in the couple of places where the slot's owner advances (which is the > > two auxiliary functions for ProcessStandbyReplyMessage). I have done so > > in the attached. There are other places where the restart_lsn is set, > > but those seem to be used only when the slot is created. I don't think > > we need to cover for those, but I'm not 100% sure about that. > > StartLogicalReplcation does > "XLogBeginRead(,MyReplicationSlot->data.restart_lsn)". If the > restart_lsn is invalid, following call to XLogReadRecord runs into > assertion failure. Walsender (or StartLogicalReplication) should > correctly reject reconnection from the subscriber if restart_lsn is > invalid. > > > However, the change in PhysicalConfirmReceivedLocation() breaks > > the way slots work for pg_basebackup: apparently the slot is created > > with a restart_lsn of Invalid and we only advance it the first time we > > process a feedback message from pg_basebackup. I have a vague feeling > > that that's bogus, but I'll have to look at the involved code a little > > bit more closely to be sure about this. > > Mmm. Couldn't we have a new member 'invalidated' in ReplicationSlot?
I did that in the attached. The invalidated is shared-but-not-saved member of a slot and initialized to false then irreversibly changed to true when the slot loses required segment. It is checked by the new function CheckReplicationSlotInvalidated() at acquireing a slot and at updating slot by standby reply message. This change stops walsender without explicitly killing but I didn't remove that code. When logical slot loses segment, the publisher complains as: [backend ] LOG: slot "s1" is invalidated at 0/370001C0 due to exceeding max_slot_wal_keep_size [walsender] FATAL: terminating connection due to administrator command The subscriber tries to reconnect and that fails as follows: [19350] ERROR: replication slot "s1" is invalidated [19352] ERROR: replication slot "s1" is invalidated ... If the publisher restarts, the message is not seen and see the following instead. [19372] ERROR: requested WAL segment 000000010000000000000037 has already been removed The check is done at ReplicationSlotAcquire, some slot-related SQL functions are affected. =# select pg_replication_slot_advance('s1', '0/37000000'); ERROR: replication slot "s1" is invalidated After restarting the publisher, the message changes as the same with walsender. =# select pg_replication_slot_advance('s1', '0/380001C0'); ERROR: requested WAL segment pg_wal/000000010000000000000037 has already been removed Since I didn't touch restart_lsn at all so no fear for changing other behavior inadvertently. regards. -- Kyotaro Horiguchi NTT Open Source Software Center
>From 3f81c5740ea3554835bbe794820624b56c9c3ea8 Mon Sep 17 00:00:00 2001 From: Kyotaro Horiguchi <horikyoga....@gmail.com> Date: Tue, 7 Apr 2020 11:09:54 +0900 Subject: [PATCH] further change type 2 --- src/backend/access/transam/xlog.c | 17 +++++---- src/backend/replication/slot.c | 59 ++++++++++++++++++++++++----- src/backend/replication/walsender.c | 2 + src/include/replication/slot.h | 7 ++++ 4 files changed, 68 insertions(+), 17 deletions(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 8f28ffaab9..c5b96126ee 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -9559,20 +9559,21 @@ KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo) XLByteToSeg(recptr, currSegNo, wal_segment_size); segno = currSegNo; + /* + * Calculate how many segments are kept by slots first, adjusting + * for max_slot_wal_keep_size. + */ keep = XLogGetReplicationSlotMinimumLSN(); - - /* - * Calculate how many segments are kept by slots first. - */ - /* Cap keepSegs by max_slot_wal_keep_size */ if (keep != InvalidXLogRecPtr) { XLByteToSeg(keep, segno, wal_segment_size); - /* Reduce it if slots already reserves too many. */ + /* Cap by max_slot_wal_keep_size ... */ if (max_slot_wal_keep_size_mb >= 0) { - XLogRecPtr slot_keep_segs = + XLogRecPtr slot_keep_segs; + + slot_keep_segs = ConvertToXSegs(max_slot_wal_keep_size_mb, wal_segment_size); if (currSegNo - segno > slot_keep_segs) @@ -9580,7 +9581,7 @@ KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo) } } - /* but, keep at least wal_keep_segments segments if any */ + /* but, keep at least wal_keep_segments if that's set */ if (wal_keep_segments > 0 && currSegNo - segno < wal_keep_segments) { /* avoid underflow, don't go below 1 */ diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 86ddff8b9d..0a28b27607 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -277,6 +277,7 @@ ReplicationSlotCreate(const char *name, bool db_specific, StrNCpy(NameStr(slot->data.name), name, NAMEDATALEN); slot->data.database = db_specific ? MyDatabaseId : InvalidOid; slot->data.persistency = persistency; + slot->invalidated = false; /* and then data only present in shared memory */ slot->just_dirtied = false; @@ -323,6 +324,29 @@ ReplicationSlotCreate(const char *name, bool db_specific, ConditionVariableBroadcast(&slot->active_cv); } + +/* + * Check if the slot is invalidated. + */ +void +CheckReplicationSlotInvalidated(ReplicationSlot *slot) +{ + bool invalidated; + + /* Take lock to read slot name for error message. */ + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + invalidated = slot->invalidated; + + /* If the slot is invalidated, error out. */ + if (invalidated) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("replication slot \"%s\" is invalidated", + NameStr(slot->data.name)))); + + LWLockRelease(ReplicationSlotControlLock); +} + /* * Find a previously created slot and mark it as used by this backend. */ @@ -412,6 +436,9 @@ retry: /* We made this slot active, so it's ours now. */ MyReplicationSlot = slot; + + /* Finally, check if the slot is invalidated */ + CheckReplicationSlotInvalidated(slot); } /* @@ -1083,25 +1110,39 @@ InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno) for (int i = 0; i < max_replication_slots; i++) { ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; + XLogRecPtr restart_lsn = InvalidXLogRecPtr; - if (!s->in_use || s->data.restart_lsn == InvalidXLogRecPtr) + if (!s->in_use) + continue; + + if (s->invalidated) continue; if (s->data.restart_lsn < oldestLSN) { - elog(LOG, "slot %s is invalidated at %X/%X due to exceeding max_slot_wal_keep_size", - s->data.name.data, - (uint32) (s->data.restart_lsn >> 32), - (uint32) s->data.restart_lsn); - /* mark this slot as invalid */ SpinLockAcquire(&s->mutex); - s->data.restart_lsn = InvalidXLogRecPtr; - /* remember PID for killing, if active*/ + /* mark this slot as invalid */ + s->invalidated = true; + + /* remember restart_lsn for logging */ + restart_lsn = s->data.restart_lsn; + + SpinLockRelease(&s->mutex); + + /* remember PID for killing, if active */ if (s->active_pid != 0) pids = lappend_int(pids, s->active_pid); - SpinLockRelease(&s->mutex); } + SpinLockRelease(&s->mutex); + + if (restart_lsn != InvalidXLogRecPtr) + ereport(LOG, + errmsg("slot \"%s\" is invalidated at %X/%X due to exceeding max_slot_wal_keep_size", + NameStr(s->data.name), + (uint32) (restart_lsn >> 32), + (uint32) restart_lsn)); + } LWLockRelease(ReplicationSlotControlLock); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 76ec3c7dd0..c582f34fcc 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1897,6 +1897,8 @@ ProcessStandbyReplyMessage(void) */ if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr) { + CheckReplicationSlotInvalidated(MyReplicationSlot); + if (SlotIsLogical(MyReplicationSlot)) LogicalConfirmReceivedLocation(flushPtr); else diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 6e469ea749..f7531ca495 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -98,6 +98,9 @@ typedef struct ReplicationSlotPersistentData * backend owning the slot does not need to take this lock when reading its * own fields, while concurrent backends not owning this slot should take the * lock when reading this slot's data. + * - The invalidated field is initially false then changed to true + * irreversibly by other than the owner and read by the possible next owner + * process after the termination of the current owner. */ typedef struct ReplicationSlot { @@ -131,6 +134,9 @@ typedef struct ReplicationSlot /* data surviving shutdowns and crashes */ ReplicationSlotPersistentData data; + /* is invalidated ? */ + bool invalidated; + /* is somebody performing io on this slot? */ LWLock io_in_progress_lock; @@ -187,6 +193,7 @@ extern void ReplicationSlotDrop(const char *name, bool nowait); extern void ReplicationSlotAcquire(const char *name, bool nowait); extern void ReplicationSlotRelease(void); extern void ReplicationSlotCleanup(void); +extern void CheckReplicationSlotInvalidated(ReplicationSlot *slot); extern void ReplicationSlotSave(void); extern void ReplicationSlotMarkDirty(void); -- 2.18.2