Hi

Here's the next patch in the split-up series, drop db-specific
(logical) replication slots on DROP DATABASE.

Current behaviour is to ERROR if logical slots exist on the DB,
whether in-use or not.

With this patch we can DROP a database if it has logical slots so long
as they are not active. I haven't added any sort of syntax for this,
it's just done unconditionally.

I don't see any sensible way to stop a slot becoming active after we
check for active slots and begin the actual database DROP, since
ReplicationSlotAcquire will happily acquire a db-specific slot for a
different DB and the only lock it takes is a shared lock on
ReplicationSlotControlLock, which we presumably don't want to hold
throughout DROP DATABASE.

So this patch makes ReplicationSlotAcquire check that the slot
database matches the current database and refuse to acquire the slot
if it does not. The only sensible reason to acquire a slot from a
different DB is to drop it, and then it's only a convenience at best.
Slot drop is the only user-visible behaviour change, since all other
activity on logical slots happened when the backend was already
connected to the slot's DB. Appropriate docs changes have been made
and tests added.

-- 
 Craig Ringer                   http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services
From c126a10e40aba0c39a43a97da591492d6240659c Mon Sep 17 00:00:00 2001
From: Craig Ringer <cr...@2ndquadrant.com>
Date: Wed, 22 Mar 2017 13:21:09 +0800
Subject: [PATCH] Make DROP DATABASE drop logical slots for the DB

Automatically drop all logical replication slots associated with a
database when the database is dropped.

As a side-effect, pg_drop_replication_slot(...) may now only drop
logical slots when connected to the slot's database.
---
 contrib/test_decoding/sql/slot.sql                 |  8 ++
 doc/src/sgml/func.sgml                             |  3 +-
 doc/src/sgml/protocol.sgml                         |  2 +
 src/backend/commands/dbcommands.c                  | 32 +++++--
 src/backend/replication/logical/logical.c          | 12 +--
 src/backend/replication/slot.c                     | 97 +++++++++++++++++++++-
 src/include/replication/slot.h                     |  1 +
 src/test/recovery/t/006_logical_decoding.pl        | 34 +++++++-
 .../recovery/t/010_logical_decoding_timelines.pl   | 30 ++++++-
 9 files changed, 194 insertions(+), 25 deletions(-)

diff --git a/contrib/test_decoding/sql/slot.sql b/contrib/test_decoding/sql/slot.sql
index 7ca83fe..22b22f3 100644
--- a/contrib/test_decoding/sql/slot.sql
+++ b/contrib/test_decoding/sql/slot.sql
@@ -48,3 +48,11 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot1', 'test_
 -- both should error as they should be dropped on error
 SELECT pg_drop_replication_slot('regression_slot1');
 SELECT pg_drop_replication_slot('regression_slot2');
+
+CREATE DATABASE testdb;
+\c testdb
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_otherdb', 'test_decoding');
+\c regression
+SELECT pg_drop_replication_slot('regression_slot_otherdb');
+\c testdb
+SELECT pg_drop_replication_slot('regression_slot_otherdb');
diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index ba6f8dd..78508d7 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -18876,7 +18876,8 @@ postgres=# SELECT * FROM pg_walfile_name_offset(pg_stop_backup());
        <entry>
         Drops the physical or logical replication slot
         named <parameter>slot_name</parameter>. Same as replication protocol
-        command <literal>DROP_REPLICATION_SLOT</>.
+        command <literal>DROP_REPLICATION_SLOT</>. For logical slots, this must
+        be called when connected to the same database the slot was created on.
        </entry>
       </row>
 
diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index b3a5026..5f97141 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -2034,6 +2034,8 @@ The commands accepted in walsender mode are:
      <para>
       Drops a replication slot, freeing any reserved server-side resources. If
       the slot is currently in use by an active connection, this command fails.
+      If the slot is a logical slot that was created in a database other than
+      the database the walsender is connected to, this command fails.
      </para>
      <variablelist>
       <varlistentry>
diff --git a/src/backend/commands/dbcommands.c b/src/backend/commands/dbcommands.c
index 5a63b1a..7fe2c2b 100644
--- a/src/backend/commands/dbcommands.c
+++ b/src/backend/commands/dbcommands.c
@@ -845,19 +845,22 @@ dropdb(const char *dbname, bool missing_ok)
 				 errmsg("cannot drop the currently open database")));
 
 	/*
-	 * Check whether there are, possibly unconnected, logical slots that refer
-	 * to the to-be-dropped database. The database lock we are holding
-	 * prevents the creation of new slots using the database.
+	 * Check whether there are active logical slots that refer to the
+	 * to-be-dropped database. The database lock we are holding prevents the
+	 * creation of new slots using the database or existing slots becoming
+	 * active.
 	 */
-	if (ReplicationSlotsCountDBSlots(db_id, &nslots, &nslots_active))
+	(void) ReplicationSlotsCountDBSlots(db_id, &nslots, &nslots_active);
+	if (nslots_active)
+	{
 		ereport(ERROR,
 				(errcode(ERRCODE_OBJECT_IN_USE),
-			  errmsg("database \"%s\" is used by a logical replication slot",
+			  errmsg("database \"%s\" is used by an active logical replication slot",
 					 dbname),
-				 errdetail_plural("There is %d slot, %d of them active.",
-								  "There are %d slots, %d of them active.",
-								  nslots,
-								  nslots, nslots_active)));
+				 errdetail_plural("There is %d active slot",
+								  "There are %d active slots",
+								  nslots_active, nslots_active)));
+	}
 
 	/*
 	 * Check for other backends in the target database.  (Because we hold the
@@ -899,6 +902,11 @@ dropdb(const char *dbname, bool missing_ok)
 	ReleaseSysCache(tup);
 
 	/*
+	 * Drop db-specific replication slots
+	 */
+	ReplicationSlotsDropDBSlots(db_id);
+
+	/*
 	 * Delete any comments or security labels associated with the database.
 	 */
 	DeleteSharedComments(db_id, DatabaseRelationId);
@@ -2124,11 +2132,17 @@ dbase_redo(XLogReaderState *record)
 			 * InitPostgres() cannot fully re-execute concurrently. This
 			 * avoids backends re-connecting automatically to same database,
 			 * which can happen in some cases.
+			 *
+			 * This will lock out walsenders trying to connect to db-specific
+			 * slots for logical decoding too, so it's safe for us to drop slots.
 			 */
 			LockSharedObjectForSession(DatabaseRelationId, xlrec->db_id, 0, AccessExclusiveLock);
 			ResolveRecoveryConflictWithDatabase(xlrec->db_id);
 		}
 
+		/* Drop any database-specific replication slots */
+		ReplicationSlotsDropDBSlots(xlrec->db_id);
+
 		/* Drop pages for this database that are in the shared buffer cache */
 		DropDatabaseBuffers(xlrec->db_id);
 
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 5529ac8..86a8656 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -235,11 +235,7 @@ CreateInitDecodingContext(char *plugin,
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 		errmsg("cannot use physical replication slot for logical decoding")));
 
-	if (slot->data.database != MyDatabaseId)
-		ereport(ERROR,
-				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-		   errmsg("replication slot \"%s\" was not created in this database",
-				  NameStr(slot->data.name))));
+	Assert(slot->data.database == MyDatabaseId);
 
 	if (IsTransactionState() &&
 		GetTopTransactionIdIfAny() != InvalidTransactionId)
@@ -347,11 +343,7 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 (errmsg("cannot use physical replication slot for logical decoding"))));
 
-	if (slot->data.database != MyDatabaseId)
-		ereport(ERROR,
-				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-		  (errmsg("replication slot \"%s\" was not created in this database",
-				  NameStr(slot->data.name)))));
+	Assert(slot->data.database == MyDatabaseId);
 
 	if (start_lsn == InvalidXLogRecPtr)
 	{
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 5237a9f..5a4eb79 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -321,6 +321,9 @@ ReplicationSlotCreate(const char *name, bool db_specific,
 
 /*
  * Find a previously created slot and mark it as used by this backend.
+ *
+ * If the slot is a logical slot it must be associated with the same
+ * database as the calling backend.
  */
 void
 ReplicationSlotAcquire(const char *name)
@@ -328,6 +331,7 @@ ReplicationSlotAcquire(const char *name)
 	ReplicationSlot *slot = NULL;
 	int			i;
 	int			active_pid = 0; /* Keep compiler quiet */
+	Oid			database = InvalidOid;
 
 	Assert(MyReplicationSlot == NULL);
 
@@ -343,7 +347,9 @@ ReplicationSlotAcquire(const char *name)
 		{
 			SpinLockAcquire(&s->mutex);
 			active_pid = s->active_pid;
-			if (active_pid == 0)
+			database = s->data.database;
+			if (active_pid == 0 &&
+				(database == InvalidOid || database == MyDatabaseId))
 				active_pid = s->active_pid = MyProcPid;
 			SpinLockRelease(&s->mutex);
 			slot = s;
@@ -357,6 +363,11 @@ ReplicationSlotAcquire(const char *name)
 		ereport(ERROR,
 				(errcode(ERRCODE_UNDEFINED_OBJECT),
 				 errmsg("replication slot \"%s\" does not exist", name)));
+	if (database != InvalidOid && database != MyDatabaseId)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+		  (errmsg("replication slot \"%s\" was not created in this database",
+				  NameStr(slot->data.name)))));
 	if (active_pid != MyProcPid)
 		ereport(ERROR,
 				(errcode(ERRCODE_OBJECT_IN_USE),
@@ -796,6 +807,90 @@ ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
 	return false;
 }
 
+/*
+ * ReplicationSlotsDropDBSlots -- Drop all db-specific slots relating to the
+ * passed database oid. The caller must hold an exclusive lock on the
+ * pg_database oid for the database to ensure no replication slots on the
+ * database are active.
+ *
+ * This routine isn't as efficient as it could be - but we don't drop databases
+ * often, especially databases with lots of slots.
+ */
+void
+ReplicationSlotsDropDBSlots(Oid dboid)
+{
+	int			i;
+
+	if (max_replication_slots <= 0)
+		return;
+
+restart:
+	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+	for (i = 0; i < max_replication_slots; i++)
+	{
+		ReplicationSlot *s;
+		NameData slotname;
+		int active_pid;
+
+		s = &ReplicationSlotCtl->replication_slots[i];
+
+		/* cannot change while ReplicationSlotCtlLock is held */
+		if (!s->in_use)
+			continue;
+
+		/* only logical slots are database specific, skip */
+		if (!SlotIsLogical(s))
+			continue;
+
+		/* not our database, skip */
+		if (s->data.database != dboid)
+			continue;
+
+		/* Claim the slot, as if ReplicationSlotAcquire()ing. */
+		SpinLockAcquire(&s->mutex);
+		strncpy(NameStr(slotname), NameStr(s->data.name), NAMEDATALEN);
+		NameStr(slotname)[NAMEDATALEN-1] = '\0';
+		active_pid = s->active_pid;
+		if (active_pid == 0)
+		{
+			MyReplicationSlot = s;
+			s->active_pid = MyProcPid;
+		}
+		SpinLockRelease(&s->mutex);
+
+		/*
+		 * We might fail here if the slot was active. Even though we hold an
+		 * exclusive lock on the database object a logical slot for that DB can
+		 * still be active if it's being dropped by a backend connected to
+		 * another DB.
+		 *
+		 * It's an unlikely race that'll only arise from concurrent user action,
+		 * so we'll just bail out.
+		 */
+		if (active_pid)
+			elog(ERROR, "replication slot %s is in use by pid %d",
+			 	 NameStr(slotname), active_pid);
+
+		/*
+		 * To avoid largely duplicating ReplicationSlotDropAcquired() or
+		 * complicating it with already_locked flags for ProcArrayLock,
+		 * ReplicationSlotControlLock and ReplicationSlotAllocationLock, we
+		 * just release our ReplicationSlotControlLock to drop the slot.
+		 *
+		 * For safety we'll restart our scan from the beginning each
+		 * time we release the lock.
+		 */
+		LWLockRelease(ReplicationSlotControlLock);
+		ReplicationSlotDropAcquired();
+		goto restart;
+	}
+	LWLockRelease(ReplicationSlotControlLock);
+
+	/* recompute limits once after all slots are dropped */
+	ReplicationSlotsComputeRequiredXmin(false);
+	ReplicationSlotsComputeRequiredLSN();
+}
+
 
 /*
  * Check whether the server's configuration supports using replication
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 62cacdb..9a2dbd7 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -177,6 +177,7 @@ extern void ReplicationSlotsComputeRequiredXmin(bool already_locked);
 extern void ReplicationSlotsComputeRequiredLSN(void);
 extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void);
 extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
+extern void ReplicationSlotsDropDBSlots(Oid dboid);
 
 extern void StartupReplicationSlots(void);
 extern void CheckPointReplicationSlots(void);
diff --git a/src/test/recovery/t/006_logical_decoding.pl b/src/test/recovery/t/006_logical_decoding.pl
index 66d5e4a..510dc90 100644
--- a/src/test/recovery/t/006_logical_decoding.pl
+++ b/src/test/recovery/t/006_logical_decoding.pl
@@ -7,7 +7,7 @@ use strict;
 use warnings;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 5;
+use Test::More tests => 15;
 
 # Initialize master node
 my $node_master = get_new_node('master');
@@ -54,7 +54,7 @@ my $stdout_sql = $node_master->safe_psql('postgres', qq[SELECT data FROM pg_logi
 is($stdout_sql, $expected, 'got expected output from SQL decoding session');
 
 my $endpos = $node_master->safe_psql('postgres', "SELECT location FROM pg_logical_slot_peek_changes('test_slot', NULL, NULL) ORDER BY location DESC LIMIT 1;");
-diag "waiting to replay $endpos";
+print "waiting to replay $endpos\n";
 
 my $stdout_recv = $node_master->pg_recvlogical_upto('postgres', 'test_slot', $endpos, 10, 'include-xids' => '0', 'skip-empty-xacts' => '1');
 chomp($stdout_recv);
@@ -64,5 +64,35 @@ $stdout_recv = $node_master->pg_recvlogical_upto('postgres', 'test_slot', $endpo
 chomp($stdout_recv);
 is($stdout_recv, '', 'pg_recvlogical acknowledged changes, nothing pending on slot');
 
+$node_master->safe_psql('postgres', 'CREATE DATABASE otherdb');
+
+is($node_master->psql('otherdb', "SELECT location FROM pg_logical_slot_peek_changes('test_slot', NULL, NULL) ORDER BY location DESC LIMIT 1;"), 3,
+	'replaying logical slot from another database fails');
+
+is($node_master->psql('otherdb', q[SELECT pg_drop_replication_slot('test_slot');]), 3,
+	'dropping logical slot from other DB fails');
+
+$node_master->safe_psql('otherdb', qq[SELECT pg_create_logical_replication_slot('otherdb_slot', 'test_decoding');]);
+
+is($node_master->psql('postgres', 'DROP DATABASE otherdb'), 0,
+	'dropping a DB with inactive logical slots succeeds');
+
+is($node_master->slot('otherdb_slot')->{'slot_name'}, undef,
+	'logical slot was actually dropped with DB');
+
+# Restarting a node with wal_level = logical that has existing
+# slots must succeed, but decoding from those slots must fail.
+$node_master->safe_psql('postgres', 'ALTER SYSTEM SET wal_level = replica');
+is($node_master->safe_psql('postgres', 'SHOW wal_level'), 'logical', 'wal_level is still logical before restart');
+$node_master->restart;
+is($node_master->safe_psql('postgres', 'SHOW wal_level'), 'replica', 'wal_level is replica');
+isnt($node_master->slot('test_slot')->{'catalog_xmin'}, '0',
+	'restored slot catalog_xmin is nonzero');
+is($node_master->psql('postgres', qq[SELECT pg_logical_slot_get_changes('test_slot', NULL, NULL);]), 3,
+	'reading from slot with wal_level < logical fails');
+is($node_master->psql('postgres', q[SELECT pg_drop_replication_slot('test_slot')]), 0,
+	'can drop logical slot while wal_level = replica');
+is($node_master->slot('test_slot')->{'catalog_xmin'}, '', 'slot was dropped');
+
 # done with the node
 $node_master->stop;
diff --git a/src/test/recovery/t/010_logical_decoding_timelines.pl b/src/test/recovery/t/010_logical_decoding_timelines.pl
index 4561a06..b618132 100644
--- a/src/test/recovery/t/010_logical_decoding_timelines.pl
+++ b/src/test/recovery/t/010_logical_decoding_timelines.pl
@@ -15,12 +15,15 @@
 # This module uses the first approach to show that timeline following
 # on a logical slot works.
 #
+# (For convenience, it also tests some recovery-related operations
+# on logical slots).
+#
 use strict;
 use warnings;
 
 use PostgresNode;
 use TestLib;
-use Test::More tests => 10;
+use Test::More tests => 13;
 use RecursiveCopy;
 use File::Copy;
 use IPC::Run ();
@@ -50,6 +53,16 @@ $node_master->safe_psql('postgres',
 $node_master->safe_psql('postgres', "CREATE TABLE decoding(blah text);");
 $node_master->safe_psql('postgres',
 	"INSERT INTO decoding(blah) VALUES ('beforebb');");
+
+# We also want to verify that DROP DATABASE on a standby with a logical
+# slot works. This isn't strictly related to timeline following, but
+# the only way to get a logical slot on a standby right now is to use
+# the same physical copy trick, so:
+$node_master->safe_psql('postgres', 'CREATE DATABASE dropme;');
+$node_master->safe_psql('dropme',
+"SELECT pg_create_logical_replication_slot('dropme_slot', 'test_decoding');"
+);
+
 $node_master->safe_psql('postgres', 'CHECKPOINT;');
 
 my $backup_name = 'b1';
@@ -68,6 +81,17 @@ $node_replica->append_conf(
 
 $node_replica->start;
 
+# If we drop 'dropme' on the master, the standby should drop the
+# db and associated slot.
+is($node_master->psql('postgres', 'DROP DATABASE dropme'), 0,
+	'dropped DB with logical slot OK on master');
+$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('insert'));
+is($node_replica->safe_psql('postgres', q[SELECT 1 FROM pg_database WHERE datname = 'dropme']), '',
+	'dropped DB dropme on standby');
+is($node_master->slot('dropme_slot')->{'slot_name'}, undef,
+	'logical slot was actually dropped on standby');
+
+# Back to testing failover...
 $node_master->safe_psql('postgres',
 "SELECT pg_create_logical_replication_slot('after_basebackup', 'test_decoding');"
 );
@@ -99,10 +123,13 @@ isnt($phys_slot->{'catalog_xmin'}, '',
 cmp_ok($phys_slot->{'xmin'}, '>=', $phys_slot->{'catalog_xmin'},
 	   'xmin on physical slot must not be lower than catalog_xmin');
 
+$node_master->safe_psql('postgres', 'CHECKPOINT');
+
 # Boom, crash
 $node_master->stop('immediate');
 
 $node_replica->promote;
+print "waiting for replica to come up\n";
 $node_replica->poll_query_until('postgres',
 	"SELECT NOT pg_is_in_recovery();");
 
@@ -154,5 +181,4 @@ $stdout = $node_replica->pg_recvlogical_upto('postgres', 'before_basebackup',
 chomp($stdout);
 is($stdout, $final_expected_output_bb, 'got same output from walsender via pg_recvlogical on before_basebackup');
 
-# We don't need the standby anymore
 $node_replica->teardown_node();
-- 
2.5.5

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to