From 6c86780aadf28d67e2571095ffd9ef2b3e3370c5 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Mon, 24 Apr 2023 11:03:28 +0000
Subject: [PATCH v13 4/4] Change the method used to check logical replication
 slots during the live check

When a live check is requested, there is a possibility of additional changes
occurring, which may cause the current WAL position to exceed the confirmed_flush_lsn
of the slot. As a result, we check the confirmed_flush_lsn of each logical slot
instead. This is sufficient as all the WAL records will be sent during thepublisher's
shutdown.
---
 src/bin/pg_upgrade/check.c                    | 68 ++++++++++++++++++-
 .../t/003_logical_replication_slots.pl        | 66 +++++++++++++++++-
 2 files changed, 130 insertions(+), 4 deletions(-)

diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index 61ca36a853..528576b00e 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -35,6 +35,7 @@ static void check_for_new_tablespace_dir(ClusterInfo *new_cluster);
 static void check_for_user_defined_encoding_conversions(ClusterInfo *cluster);
 static void check_for_parameter_settings(ClusterInfo *new_cluster);
 static void check_for_confirmed_flush_lsn(ClusterInfo *cluster);
+static void check_are_logical_slots_active(ClusterInfo *cluster);
 
 /*
  * fix_path_separator
@@ -112,7 +113,19 @@ check_and_dump_old_cluster(bool live_check)
 	check_for_reg_data_type_usage(&old_cluster);
 	check_for_isn_and_int8_passing_mismatch(&old_cluster);
 	if (user_opts.include_logical_slots)
-		check_for_confirmed_flush_lsn(&old_cluster);
+	{
+		/*
+		 * The method used to check logical replication slots is dependent on
+		 * the value of the live_check parameter. This change was implemented
+		 * because, during a live check, it is possible for additional changes
+		 * to occur at the old node, which could cause the current WAL position
+		 * to exceed the confirmed_flush_lsn of the slot.
+		 */
+		if (live_check)
+			check_are_logical_slots_active(&old_cluster);
+		else
+			check_for_confirmed_flush_lsn(&old_cluster);
+	}
 
 	/*
 	 * PG 16 increased the size of the 'aclitem' type, which breaks the on-disk
@@ -1479,6 +1492,59 @@ check_for_parameter_settings(ClusterInfo *new_cluster)
 	check_ok();
 }
 
+/*
+ * Verify that all logical replication slots are active
+ */
+static void
+check_are_logical_slots_active(ClusterInfo *cluster)
+{
+	int			i,
+				ntups,
+				i_slotname;
+	bool		is_error = false;
+	PGresult   *res;
+	DbInfo	   *active_db = &cluster->dbarr.dbs[0];
+	PGconn	   *conn = connectToServer(cluster, active_db->db_name);
+
+	Assert(user_opts.include_logical_slots);
+
+	/* --include-logical-replication-slots can be used since PG16. */
+	if (GET_MAJOR_VERSION(cluster->major_version < 1600))
+		return;
+
+	prep_status("Checking for logical replication slots");
+
+	res = executeQueryOrDie(conn,
+							"SELECT slot_name FROM pg_catalog.pg_replication_slots "
+							"WHERE active IS FALSE "
+							"AND temporary = false AND wal_status IN ('reserved', 'extended');");
+
+	ntups = PQntuples(res);
+	i_slotname = PQfnumber(res, "slot_name");
+
+	for (i = 0; i < ntups; i++)
+	{
+		char	   *slotname;
+
+		is_error = true;
+
+		slotname = PQgetvalue(res, i, i_slotname);
+
+		pg_log(PG_WARNING,
+			   "\nWARNING: logical replication slot \"%s\" is not active",
+			   slotname);
+	}
+
+	PQclear(res);
+	PQfinish(conn);
+
+	if (is_error)
+		pg_fatal("--include-logical-replication-slots with --check requires that "
+				 "all logical replication slots are active");
+
+	check_ok();
+}
+
 /*
  * Verify that all logical replication slots consumed all WALs, except a
  * CHECKPOINT_SHUTDOWN record.
diff --git a/src/bin/pg_upgrade/t/003_logical_replication_slots.pl b/src/bin/pg_upgrade/t/003_logical_replication_slots.pl
index 21fefca084..d7fd864bd7 100644
--- a/src/bin/pg_upgrade/t/003_logical_replication_slots.pl
+++ b/src/bin/pg_upgrade/t/003_logical_replication_slots.pl
@@ -19,6 +19,10 @@ my $old_node = PostgreSQL::Test::Cluster->new('old_node');
 $old_node->init(allows_streaming => 'logical');
 $old_node->start;
 
+# Initialize subscriber, which will be used only for --check
+my $subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$subscriber->init(allows_streaming => 'logical');
+
 # Initialize new node
 my $new_node = PostgreSQL::Test::Cluster->new('new_node');
 $new_node->init(allows_streaming => 1);
@@ -76,15 +80,71 @@ rmtree($new_node->data_dir . "/pg_upgrade_output.d");
 # non-zero value to succeed the pg_upgrade
 $new_node->append_conf('postgresql.conf', "max_replication_slots = 10");
 
-# Create a slot on old node, and generate WALs
+# Setup logical replication
 $old_node->start;
+$old_node->safe_psql('postgres',
+	"CREATE TABLE tbl AS SELECT generate_series(1, 10) AS a");
+$old_node->safe_psql('postgres', "CREATE PUBLICATION pub FOR ALL TABLES");
+my $old_connstr = $old_node->connstr . ' dbname=postgres';
+
+$subscriber->start;
+$subscriber->safe_psql('postgres', "CREATE TABLE tbl (a int)");
+$subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION sub CONNECTION '$old_connstr' PUBLICATION pub WITH (copy_data = true)"
+);
+
+# Wait for initial table sync to finish
+$subscriber->wait_for_subscription_sync($old_node, 'sub');
+
+my $result = $subscriber->safe_psql('postgres', "SELECT count(*) FROM tbl");
+is($result, qq(10), 'check initial rows on subscriber');
+
+# Start a background session and open a transaction (not committed yet)
+my $bsession = $old_node->background_psql('postgres');
+$bsession->query_safe(
+	q{
+BEGIN;
+INSERT INTO tbl VALUES (generate_series(11, 20))
+});
+
+$result = $old_node->safe_psql('postgres',
+	"SELECT count(*) FROM pg_replication_slots WHERE pg_current_wal_insert_lsn() > confirmed_flush_lsn"
+);
+is($result, qq(1),
+	'check the current WAL position exceeds confirmed_flush_lsn');
+
+# Run pg_upgrade --check. In the command the status of each logical slots will
+# be checked and then this will be succeeded.
+command_ok(
+	[
+		'pg_upgrade', '--no-sync',
+		'-d',         $old_node->data_dir,
+		'-D',         $new_node->data_dir,
+		'-b',         $bindir,
+		'-B',         $bindir,
+		'-s',         $new_node->host,
+		'-p',         $old_node->port,
+		'-P',         $new_node->port,
+		$mode,        '--include-logical-replication-slots',
+		'--check'
+	],
+	'run of pg_upgrade of old node');
+ok( !-d $old_node->data_dir . "/pg_upgrade_output.d",
+	"pg_upgrade_output.d/ removed after pg_upgrade success");
+
+# Cleanup
+$bsession->query_safe("ABORT");
+$bsession->quit;
+$subscriber->safe_psql('postgres', "DROP SUBSCRIPTION sub");
+
+# Create a slot on old node, and generate WALs
 $old_node->safe_psql(
 	'postgres', qq[
 	SELECT pg_create_logical_replication_slot('test_slot', 'test_decoding', false, true);
-	CREATE TABLE tbl AS SELECT generate_series(1, 10) AS a;
+	INSERT INTO tbl VALUES (generate_series(11, 20));
 ]);
 
-my $result = $old_node->safe_psql('postgres',
+$result = $old_node->safe_psql('postgres',
 	"SELECT count(*) FROM pg_logical_slot_peek_changes('test_slot', NULL, NULL)"
 );
 
-- 
2.27.0

