Here's a version that I feel is committable (0001).  There was an issue
when returning from the inner loop, if in a previous iteration we had
released the lock.  In that case we need to return with the lock not
held, so that the caller can acquire it again, but weren't.  This seems
pretty hard to hit in practice (I suppose somebody needs to destroy the
slot just as checkpointer killed the walsender, but before checkpointer
marks it as its own) ... but if it did happen, I think checkpointer
would block with no recourse.  Also added some comments and slightly
restructured the code.

There are plenty of conflicts in pg13, but it's all easy to handle.

I wrote a test (0002) to cover the case of signalling a walsender, which
is currently not covered (we only deal with the case of a standby that's
not running).  There are some sharp edges in this code -- I had to make
it use background_psql() to send a CHECKPOINT, which hangs, because I
previously send a SIGSTOP to the walreceiver.  Maybe there's a better
way to achieve a walreceiver that remains connected but doesn't consume
input from the primary, but I don't know what it is.  Anyway, the code
becomes covered with this.  I would like to at least see it in master,
to gather some reactions from buildfarm.

-- 
Álvaro Herrera       Valdivia, Chile
<Schwern> It does it in a really, really complicated way
<crab> why does it need to be complicated?
<Schwern> Because it's MakeMaker.
>From f296a5b880a5688656d308300d1015ba0562c1b7 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Thu, 10 Jun 2021 16:43:43 -0400
Subject: [PATCH 1/2] the code fix

---
 src/backend/replication/slot.c | 234 +++++++++++++++++++++------------
 1 file changed, 147 insertions(+), 87 deletions(-)

diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index c88b803e5d..c36b16d070 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -1160,6 +1160,150 @@ ReplicationSlotReserveWal(void)
 	}
 }
 
+/*
+ * Helper for InvalidateObsoleteReplicationSlots -- acquires the given slot
+ * and mark it invalid, if necessary and possible.
+ *
+ * Returns whether ReplicationSlotControlLock was released (and in that case,
+ * we're not holding the lock at return; otherwise we are).
+ *
+ * This is inherently racy, because we want to release the
+ * LWLock for syscalls, so caller must restart if we return true.
+ */
+static bool
+InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN)
+{
+	int		last_signaled_pid = 0;
+	bool	released_lock = false;
+
+	for (;;)
+	{
+		XLogRecPtr	restart_lsn;
+		NameData	slotname;
+		int			active_pid = 0;
+
+		Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));
+
+		if (!s->in_use)
+		{
+			if (released_lock)
+				LWLockRelease(ReplicationSlotControlLock);
+			break;
+		}
+
+		/*
+		 * Check if the slot needs to be invalidated. If it needs to be
+		 * invalidated, and is not currently acquired, acquire it and mark it
+		 * as having been invalidated.  We do this with the spinlock held to
+		 * avoid race conditions -- for example the restart_lsn could move
+		 * forward, or the slot could be dropped.
+		 */
+		SpinLockAcquire(&s->mutex);
+
+		restart_lsn = s->data.restart_lsn;
+
+		/*
+		 * If the slot is already invalid or is fresh enough, we don't need to
+		 * do anything.
+		 */
+		if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= oldestLSN)
+		{
+			SpinLockRelease(&s->mutex);
+			if (released_lock)
+				LWLockRelease(ReplicationSlotControlLock);
+			break;
+		}
+
+		slotname = s->data.name;
+		active_pid = s->active_pid;
+
+		/*
+		 * If the slot can be acquired, do so and mark it invalidated
+		 * immediately.  Otherwise we'll signal the owning process, below, and
+		 * retry.
+		 */
+		if (active_pid == 0)
+		{
+			MyReplicationSlot = s;
+			s->active_pid = MyProcPid;
+			s->data.invalidated_at = restart_lsn;
+			s->data.restart_lsn = InvalidXLogRecPtr;
+		}
+
+		SpinLockRelease(&s->mutex);
+
+		if (active_pid != 0)
+		{
+			LWLockRelease(ReplicationSlotControlLock);
+			released_lock = true;
+
+			/*
+			 * Signal to terminate the process that owns the slot.
+			 *
+			 * There is the race condition that other process may own the slot
+			 * after its current owner process is terminated and before this
+			 * process owns it. To handle that, we signal only if the PID of
+			 * the owning process has changed from the previous time. (This
+			 * logic assumes that the same PID is not reused very quickly.)
+			 */
+			if (last_signaled_pid != active_pid)
+			{
+				ereport(LOG,
+						(errmsg("terminating process %d to release replication slot \"%s\"",
+								active_pid, NameStr(slotname))));
+
+				(void) kill(active_pid, SIGTERM);
+				last_signaled_pid = active_pid;
+			}
+
+			/*
+			 * Wait until the slot is released.
+			 *
+			 * Will immediately return in the first iteration, so we can
+			 * recheck the condition before sleeping. That addresses the
+			 * otherwise possible race of the slot already having been
+			 * released.
+			 */
+			ConditionVariableSleep(&s->active_cv,
+								   WAIT_EVENT_REPLICATION_SLOT_DROP);
+
+			/* re-acquire lock and try again */
+			LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+			continue;
+		}
+		else
+		{
+			/*
+			 * We hold the slot now and have already invalidated it; flush it
+			 * to ensure that state persists.
+			 *
+			 * Don't want to hold ReplicationSlotControlLock across file
+			 * system operations, so release it now but be sure to tell caller
+			 * to restart from scratch.
+			 */
+			LWLockRelease(ReplicationSlotControlLock);
+			released_lock = true;
+
+			/* Make sure the invalidated state persists across server restart */
+			ReplicationSlotMarkDirty();
+			ReplicationSlotSave();
+			ReplicationSlotRelease();
+
+			ereport(LOG,
+					(errmsg("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size",
+							NameStr(slotname),
+							LSN_FORMAT_ARGS(restart_lsn))));
+
+			/* done with this slot for now */
+			break;
+		}
+	}
+
+	Assert(released_lock == !LWLockHeldByMe(ReplicationSlotControlLock));
+
+	return released_lock;
+}
+
 /*
  * Mark any slot that points to an LSN older than the given segment
  * as invalid; it requires WAL that's about to be removed.
@@ -1178,99 +1322,15 @@ restart:
 	for (int i = 0; i < max_replication_slots; i++)
 	{
 		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
-		XLogRecPtr	restart_lsn = InvalidXLogRecPtr;
-		NameData	slotname;
-		int			wspid;
-		int			last_signaled_pid = 0;
 
 		if (!s->in_use)
 			continue;
 
-		SpinLockAcquire(&s->mutex);
-		slotname = s->data.name;
-		restart_lsn = s->data.restart_lsn;
-		SpinLockRelease(&s->mutex);
-
-		if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= oldestLSN)
-			continue;
-		LWLockRelease(ReplicationSlotControlLock);
-		CHECK_FOR_INTERRUPTS();
-
-		/* Get ready to sleep on the slot in case it is active */
-		ConditionVariablePrepareToSleep(&s->active_cv);
-
-		for (;;)
+		if (InvalidatePossiblyObsoleteSlot(s, oldestLSN))
 		{
-			/*
-			 * Try to mark this slot as used by this process.
-			 *
-			 * Note that ReplicationSlotAcquireInternal(SAB_Inquire) should
-			 * not cancel the prepared condition variable if this slot is
-			 * active in other process. Because in this case we have to wait
-			 * on that CV for the process owning the slot to be terminated,
-			 * later.
-			 */
-			wspid = ReplicationSlotAcquireInternal(s, NULL, SAB_Inquire);
-
-			/*
-			 * Exit the loop if we successfully acquired the slot or the slot
-			 * was dropped during waiting for the owning process to be
-			 * terminated. For example, the latter case is likely to happen
-			 * when the slot is temporary because it's automatically dropped
-			 * by the termination of the owning process.
-			 */
-			if (wspid <= 0)
-				break;
-
-			/*
-			 * Signal to terminate the process that owns the slot.
-			 *
-			 * There is the race condition where other process may own the
-			 * slot after the process using it was terminated and before this
-			 * process owns it. To handle this case, we signal again if the
-			 * PID of the owning process is changed than the last.
-			 *
-			 * XXX This logic assumes that the same PID is not reused very
-			 * quickly.
-			 */
-			if (last_signaled_pid != wspid)
-			{
-				ereport(LOG,
-						(errmsg("terminating process %d because replication slot \"%s\" is too far behind",
-								wspid, NameStr(slotname))));
-				(void) kill(wspid, SIGTERM);
-				last_signaled_pid = wspid;
-			}
-
-			ConditionVariableTimedSleep(&s->active_cv, 10,
-										WAIT_EVENT_REPLICATION_SLOT_DROP);
-		}
-		ConditionVariableCancelSleep();
-
-		/*
-		 * Do nothing here and start from scratch if the slot has already been
-		 * dropped.
-		 */
-		if (wspid == -1)
+			/* if the lock was released, start from scratch */
 			goto restart;
-
-		ereport(LOG,
-				(errmsg("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size",
-						NameStr(slotname),
-						LSN_FORMAT_ARGS(restart_lsn))));
-
-		SpinLockAcquire(&s->mutex);
-		s->data.invalidated_at = s->data.restart_lsn;
-		s->data.restart_lsn = InvalidXLogRecPtr;
-		SpinLockRelease(&s->mutex);
-
-		/* Make sure the invalidated state persists across server restart */
-		ReplicationSlotMarkDirty();
-		ReplicationSlotSave();
-		ReplicationSlotRelease();
-
-		/* if we did anything, start from scratch */
-		goto restart;
+		}
 	}
 	LWLockRelease(ReplicationSlotControlLock);
 }
-- 
2.20.1

>From 1dbc6cc64c2ade649032050afc4229c1dc1805e9 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Thu, 10 Jun 2021 16:44:03 -0400
Subject: [PATCH 2/2] the test

---
 src/test/recovery/t/019_replslot_limit.pl | 73 ++++++++++++++++++++++-
 1 file changed, 70 insertions(+), 3 deletions(-)

diff --git a/src/test/recovery/t/019_replslot_limit.pl b/src/test/recovery/t/019_replslot_limit.pl
index 7094aa0704..fa47c915a5 100644
--- a/src/test/recovery/t/019_replslot_limit.pl
+++ b/src/test/recovery/t/019_replslot_limit.pl
@@ -11,7 +11,7 @@ use TestLib;
 use PostgresNode;
 
 use File::Path qw(rmtree);
-use Test::More tests => 14;
+use Test::More tests => 17;
 use Time::HiRes qw(usleep);
 
 $ENV{PGDATABASE} = 'postgres';
@@ -211,8 +211,8 @@ for (my $i = 0; $i < 10000; $i++)
 }
 ok($failed, 'check that replication has been broken');
 
-$node_primary->stop('immediate');
-$node_standby->stop('immediate');
+$node_primary->stop;
+$node_standby->stop;
 
 my $node_primary2 = get_new_node('primary2');
 $node_primary2->init(allows_streaming => 1);
@@ -253,6 +253,73 @@ my @result =
 		timeout => '60'));
 is($result[1], 'finished', 'check if checkpoint command is not blocked');
 
+$node_primary2->stop;
+$node_standby->stop;
+
+# The next test depends on Perl's `kill`, which apparently is not
+# portable to Windows.  (It would be nice to use Test::More's `subtest`,
+# but that's not in the ancient version we require.)
+done_testing() if $TestLib::windows_os;
+
+# Get a slot terminated while the walsender is active
+# We do this by sending SIGSTOP to the walreceiver.  Skip this on Windows.
+my $node_primary3 = get_new_node('primary3');
+$node_primary3->init(allows_streaming => 1, extra => ['--wal-segsize=1']);
+$node_primary3->append_conf(
+	'postgresql.conf', qq(
+	min_wal_size = 2MB
+	max_wal_size = 2MB
+	log_checkpoints = yes
+	max_slot_wal_keep_size = 1MB
+	));
+$node_primary3->start;
+$node_primary3->safe_psql('postgres',
+	"SELECT pg_create_physical_replication_slot('rep3')");
+# Take backup
+$backup_name = 'my_backup';
+$node_primary3->backup($backup_name);
+# Create standby
+my $node_standby3 = get_new_node('standby_3');
+$node_standby3->init_from_backup($node_primary3, $backup_name,
+	has_streaming => 1);
+$node_standby3->append_conf('postgresql.conf', "primary_slot_name = 'rep3'");
+$node_standby3->start;
+$node_primary3->wait_for_catchup($node_standby3->name, 'replay');
+my $pid = $node_standby3->safe_psql('postgres',
+	"SELECT pid FROM pg_stat_activity WHERE backend_type = 'walreceiver'");
+like($pid, qr/^[0-9]+$/, "have walreceiver pid $pid");
+
+# freeze walreceiver. Slot will still be active, but it won't advance
+kill 'STOP', $pid;
+$logstart = get_log_size($node_primary3);
+advance_wal($node_primary3, 2);
+
+my ($in, $out, $timer, $h);
+$timer = IPC::Run::timeout(180);
+$h = $node_primary3->background_psql('postgres', \$in, \$out, $timer);
+$in .= qq{
+checkpoint;
+};
+$h->pump_nb;
+ok( find_in_log($node_primary3,
+		"to release replication slot"),
+	"walreceiver termination logged");
+
+# Now let it continue to its demise
+kill 'CONT', $pid;
+
+$node_primary3->poll_query_until('postgres',
+	"SELECT wal_status FROM pg_replication_slots WHERE slot_name = 'rep3'",
+	"lost") or die "timed out waiting for slot to be lost";
+
+ok( find_in_log($node_primary3,
+		'invalidating slot "rep3" because its restart_lsn'),
+	"slot invalidation logged");
+
+
+$node_primary3->stop;
+$node_standby3->stop;
+
 #####################################
 # Advance WAL of $node by $n segments
 sub advance_wal
-- 
2.20.1

Reply via email to