At Tue, 07 Apr 2020 12:09:05 +0900 (JST), Kyotaro Horiguchi 
<horikyota....@gmail.com> wrote in 
> > it seems to me that it suffices to check restart_lsn for being invalid
> > in the couple of places where the slot's owner advances (which is the
> > two auxiliary functions for ProcessStandbyReplyMessage).  I have done so
> > in the attached.  There are other places where the restart_lsn is set,
> > but those seem to be used only when the slot is created.  I don't think
> > we need to cover for those, but I'm not 100% sure about that.
> 
> StartLogicalReplcation does
> "XLogBeginRead(,MyReplicationSlot->data.restart_lsn)". If the
> restart_lsn is invalid, following call to XLogReadRecord runs into
> assertion failure.  Walsender (or StartLogicalReplication) should
> correctly reject reconnection from the subscriber if restart_lsn is
> invalid.
> 
> > However, the change in PhysicalConfirmReceivedLocation() breaks
> > the way slots work for pg_basebackup: apparently the slot is created
> > with a restart_lsn of Invalid and we only advance it the first time we
> > process a feedback message from pg_basebackup.  I have a vague feeling
> > that that's bogus, but I'll have to look at the involved code a little
> > bit more closely to be sure about this.
> 
> Mmm. Couldn't we have a new member 'invalidated' in ReplicationSlot?

I did that in the attached. The invalidated is shared-but-not-saved
member of a slot and initialized to false then irreversibly changed to
true when the slot loses required segment.

It is checked by the new function CheckReplicationSlotInvalidated() at
acquireing a slot and at updating slot by standby reply message. This
change stops walsender without explicitly killing but I didn't remove
that code.

When logical slot loses segment, the publisher complains as:


[backend  ] LOG:  slot "s1" is invalidated at 0/370001C0 due to exceeding 
max_slot_wal_keep_size
[walsender] FATAL:  terminating connection due to administrator command

The subscriber tries to reconnect and that fails as follows:

[19350] ERROR:  replication slot "s1" is invalidated
[19352] ERROR:  replication slot "s1" is invalidated
...

If the publisher restarts, the message is not seen and see the
following instead.

[19372] ERROR:  requested WAL segment 000000010000000000000037 has already been 
removed

The check is done at ReplicationSlotAcquire, some slot-related SQL
functions are affected.

=# select pg_replication_slot_advance('s1', '0/37000000');
ERROR:  replication slot "s1" is invalidated

After restarting the publisher, the message changes as the same with
walsender.

=# select pg_replication_slot_advance('s1', '0/380001C0');
ERROR:  requested WAL segment pg_wal/000000010000000000000037 has already been 
removed

Since I didn't touch restart_lsn at all so no fear for changing other
behavior inadvertently.

regards.

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
>From 3f81c5740ea3554835bbe794820624b56c9c3ea8 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horikyoga....@gmail.com>
Date: Tue, 7 Apr 2020 11:09:54 +0900
Subject: [PATCH] further change type 2

---
 src/backend/access/transam/xlog.c   | 17 +++++----
 src/backend/replication/slot.c      | 59 ++++++++++++++++++++++++-----
 src/backend/replication/walsender.c |  2 +
 src/include/replication/slot.h      |  7 ++++
 4 files changed, 68 insertions(+), 17 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 8f28ffaab9..c5b96126ee 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -9559,20 +9559,21 @@ KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo)
 	XLByteToSeg(recptr, currSegNo, wal_segment_size);
 	segno = currSegNo;
 
+	/*
+	 * Calculate how many segments are kept by slots first, adjusting
+	 * for max_slot_wal_keep_size.
+	 */
 	keep = XLogGetReplicationSlotMinimumLSN();
-
-	/*
-	 * Calculate how many segments are kept by slots first.
-	 */
-	/* Cap keepSegs by max_slot_wal_keep_size */
 	if (keep != InvalidXLogRecPtr)
 	{
 		XLByteToSeg(keep, segno, wal_segment_size);
 
-		/* Reduce it if slots already reserves too many. */
+		/* Cap by max_slot_wal_keep_size ... */
 		if (max_slot_wal_keep_size_mb >= 0)
 		{
-			XLogRecPtr slot_keep_segs =
+			XLogRecPtr	slot_keep_segs;
+
+			slot_keep_segs =
 				ConvertToXSegs(max_slot_wal_keep_size_mb, wal_segment_size);
 
 			if (currSegNo - segno > slot_keep_segs)
@@ -9580,7 +9581,7 @@ KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo)
 		}
 	}
 
-	/* but, keep at least wal_keep_segments segments if any */
+	/* but, keep at least wal_keep_segments if that's set */
 	if (wal_keep_segments > 0 && currSegNo - segno < wal_keep_segments)
 	{
 		/* avoid underflow, don't go below 1 */
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 86ddff8b9d..0a28b27607 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -277,6 +277,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
 	StrNCpy(NameStr(slot->data.name), name, NAMEDATALEN);
 	slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
 	slot->data.persistency = persistency;
+	slot->invalidated = false;
 
 	/* and then data only present in shared memory */
 	slot->just_dirtied = false;
@@ -323,6 +324,29 @@ ReplicationSlotCreate(const char *name, bool db_specific,
 	ConditionVariableBroadcast(&slot->active_cv);
 }
 
+
+/*
+ * Check if the slot is invalidated.
+ */
+void
+CheckReplicationSlotInvalidated(ReplicationSlot *slot)
+{
+	bool invalidated;
+
+	/* Take lock to read slot name for error message. */
+	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+	invalidated = slot->invalidated;
+
+	/* If the slot is invalidated, error out. */
+	if (invalidated)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("replication slot \"%s\" is invalidated",
+						NameStr(slot->data.name))));
+
+	LWLockRelease(ReplicationSlotControlLock);
+}
+
 /*
  * Find a previously created slot and mark it as used by this backend.
  */
@@ -412,6 +436,9 @@ retry:
 
 	/* We made this slot active, so it's ours now. */
 	MyReplicationSlot = slot;
+
+	/* Finally, check if the slot is invalidated */
+	CheckReplicationSlotInvalidated(slot);
 }
 
 /*
@@ -1083,25 +1110,39 @@ InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno)
 	for (int i = 0; i < max_replication_slots; i++)
 	{
 		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+		XLogRecPtr restart_lsn = InvalidXLogRecPtr;
 
-		if (!s->in_use || s->data.restart_lsn == InvalidXLogRecPtr)
+		if (!s->in_use)
+			continue;
+
+		if (s->invalidated)
 			continue;
 
 		if (s->data.restart_lsn < oldestLSN)
 		{
-			elog(LOG, "slot %s is invalidated at %X/%X due to exceeding max_slot_wal_keep_size",
-				 s->data.name.data,
-				 (uint32) (s->data.restart_lsn >> 32),
-				 (uint32) s->data.restart_lsn);
-			/* mark this slot as invalid */
 			SpinLockAcquire(&s->mutex);
-			s->data.restart_lsn = InvalidXLogRecPtr;
 
-			/* remember PID for killing, if active*/
+			/* mark this slot as invalid */
+			s->invalidated = true;
+
+			/* remember restart_lsn for logging */
+			restart_lsn = s->data.restart_lsn;
+
+			SpinLockRelease(&s->mutex);
+
+			/* remember PID for killing, if active */
 			if (s->active_pid != 0)
 				pids = lappend_int(pids, s->active_pid);
-			SpinLockRelease(&s->mutex);
 		}
+		SpinLockRelease(&s->mutex);
+
+		if (restart_lsn != InvalidXLogRecPtr)
+			ereport(LOG,
+					errmsg("slot \"%s\" is invalidated at %X/%X due to exceeding max_slot_wal_keep_size",
+						   NameStr(s->data.name),
+						   (uint32) (restart_lsn >> 32),
+						   (uint32) restart_lsn));
+
 	}
 	LWLockRelease(ReplicationSlotControlLock);
 
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 76ec3c7dd0..c582f34fcc 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1897,6 +1897,8 @@ ProcessStandbyReplyMessage(void)
 	 */
 	if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
 	{
+		CheckReplicationSlotInvalidated(MyReplicationSlot);
+
 		if (SlotIsLogical(MyReplicationSlot))
 			LogicalConfirmReceivedLocation(flushPtr);
 		else
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 6e469ea749..f7531ca495 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -98,6 +98,9 @@ typedef struct ReplicationSlotPersistentData
  * backend owning the slot does not need to take this lock when reading its
  * own fields, while concurrent backends not owning this slot should take the
  * lock when reading this slot's data.
+ * - The invalidated field is initially false then changed to true
+ * irreversibly by other than the owner and read by the possible next owner
+ * process after the termination of the current owner.
  */
 typedef struct ReplicationSlot
 {
@@ -131,6 +134,9 @@ typedef struct ReplicationSlot
 	/* data surviving shutdowns and crashes */
 	ReplicationSlotPersistentData data;
 
+	/* is invalidated ? */
+	bool		invalidated;
+
 	/* is somebody performing io on this slot? */
 	LWLock		io_in_progress_lock;
 
@@ -187,6 +193,7 @@ extern void ReplicationSlotDrop(const char *name, bool nowait);
 extern void ReplicationSlotAcquire(const char *name, bool nowait);
 extern void ReplicationSlotRelease(void);
 extern void ReplicationSlotCleanup(void);
+extern void CheckReplicationSlotInvalidated(ReplicationSlot *slot);
 extern void ReplicationSlotSave(void);
 extern void ReplicationSlotMarkDirty(void);
 
-- 
2.18.2

Reply via email to