From 4dd3cc2fcb51757b3e9a1e733e2efb334b3574e8 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Mon, 24 Apr 2023 11:03:28 +0000
Subject: [PATCH v9 4/4] Change the method used to check logical replication
 slots during the live check

When a live check is requested, there is a possibility of additional changes
occurring, which may cause the current WAL position to exceed the confirmed_flush_lsn
of the slot. As a result, we check the confirmed_flush_lsn of each logical slot
instead. This is sufficient as all the WAL records will be sent during thepublisher's
shutdown.
---
 src/bin/pg_upgrade/check.c                    | 68 ++++++++++++++++++-
 .../t/003_logical_replication_slots.pl        | 50 ++++++++++++--
 2 files changed, 111 insertions(+), 7 deletions(-)

diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index 294668e4dc..985614acab 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -35,6 +35,7 @@ static void check_for_new_tablespace_dir(ClusterInfo *new_cluster);
 static void check_for_user_defined_encoding_conversions(ClusterInfo *cluster);
 static void check_for_parameter_settings(ClusterInfo *new_cluster);
 static void check_for_confirmed_flush_lsn(ClusterInfo *cluster);
+static void check_are_logical_slots_active(ClusterInfo *cluster);
 
 /*
  * fix_path_separator
@@ -109,7 +110,19 @@ check_and_dump_old_cluster(bool live_check)
 	check_for_reg_data_type_usage(&old_cluster);
 	check_for_isn_and_int8_passing_mismatch(&old_cluster);
 	if (user_opts.include_logical_slots)
-		check_for_confirmed_flush_lsn(&old_cluster);
+	{
+		/*
+		 * The method used to check logical replication slots is dependent on
+		 * the value of the live_check parameter. This change was implemented
+		 * because, during a live check, it is possible for additional changes
+		 * to occur at the old node, which could cause the current WAL position
+		 * to exceed the confirmed_flush_lsn of the slot.
+		 */
+		if (live_check)
+			check_are_logical_slots_active(&old_cluster);
+		else
+			check_for_confirmed_flush_lsn(&old_cluster);
+	}
 
 	/*
 	 * PG 16 increased the size of the 'aclitem' type, which breaks the on-disk
@@ -1469,6 +1482,59 @@ check_for_parameter_settings(ClusterInfo *new_cluster)
 	check_ok();
 }
 
+/*
+ * Verify that all logical replication slots are active
+ */
+static void
+check_are_logical_slots_active(ClusterInfo *cluster)
+{
+	int			i,
+				ntups,
+				i_slotname;
+	bool		is_error = false;
+	PGresult   *res;
+	DbInfo	   *active_db = &cluster->dbarr.dbs[0];
+	PGconn	   *conn = connectToServer(cluster, active_db->db_name);
+
+	Assert(user_opts.include_logical_slots);
+
+	/* --include-logical-replication-slots can be used since PG16. */
+	if (GET_MAJOR_VERSION(cluster->major_version < 1600))
+		return;
+
+	prep_status("Checking for logical replication slots");
+
+	res = executeQueryOrDie(conn,
+							"SELECT slot_name FROM pg_catalog.pg_replication_slots "
+							"WHERE active IS FALSE "
+							"AND temporary = false AND wal_status IN ('reserved', 'extended');");
+
+	ntups = PQntuples(res);
+	i_slotname = PQfnumber(res, "slot_name");
+
+	for (i = 0; i < ntups; i++)
+	{
+		char	   *slotname;
+
+		is_error = true;
+
+		slotname = PQgetvalue(res, i, i_slotname);
+
+		pg_log(PG_WARNING,
+			   "\nWARNING: logical replication slot \"%s\" is not active.",
+			   slotname);
+	}
+
+	PQclear(res);
+	PQfinish(conn);
+
+	if (is_error)
+		pg_fatal("--include-logical-replication-slot with --check requires that "
+				 "all logical replication slots are active");
+
+	check_ok();
+}
+
 /*
  * Verify that all logical replication slots consumed all WALs, except a
  * CHECKPOINT_SHUTDOWN record.
diff --git a/src/bin/pg_upgrade/t/003_logical_replication_slots.pl b/src/bin/pg_upgrade/t/003_logical_replication_slots.pl
index 5a49ac96bb..a3260e0c88 100644
--- a/src/bin/pg_upgrade/t/003_logical_replication_slots.pl
+++ b/src/bin/pg_upgrade/t/003_logical_replication_slots.pl
@@ -73,7 +73,6 @@ command_fails(
 		'-p',         $old_publisher->port,
 		'-P',         $new_publisher->port,
 		$mode,        '--include-logical-replication-slots',
-		'--check'
 	],
 	'run of pg_upgrade of old publisher with wrong max_replication_slots');
 ok( -d $new_publisher->data_dir . "/pg_upgrade_output.d",
@@ -105,7 +104,6 @@ command_fails(
 		'-p',         $old_publisher->port,
 		'-P',         $new_publisher->port,
 		$mode,        '--include-logical-replication-slots',
-		'--check'
 	],
 	'run of pg_upgrade of old publisher with idle replication slots');
 ok( -d $new_publisher->data_dir . "/pg_upgrade_output.d",
@@ -130,9 +128,49 @@ $subscriber->wait_for_subscription_sync($old_publisher, 'sub');
 my $result = $subscriber->safe_psql('postgres', "SELECT count(*) FROM tbl");
 is($result, qq(20), 'check initial rows on subscriber');
 
+# Start a background session and open a transaction (not committed yet)
+my $bsession = $old_publisher->background_psql('postgres');
+$bsession->query_safe(
+	q{
+BEGIN;
+INSERT INTO tbl VALUES (generate_series(21, 30))
+});
+
+$result = $old_publisher->safe_psql('postgres',
+	"SELECT count(*) FROM pg_replication_slots WHERE pg_current_wal_insert_lsn() > confirmed_flush_lsn"
+);
+is($result, qq(1),
+	'check the current WAL position exceeds confirmed_flush_lsn');
+
+
+# Run pg_upgrade --check. In the command the status of each logical slots will
+# be checked and then this will be succeeded.
+command_ok(
+	[
+		'pg_upgrade', '--no-sync',
+		'-d',         $old_publisher->data_dir,
+		'-D',         $new_publisher->data_dir,
+		'-b',         $bindir,
+		'-B',         $bindir,
+		'-s',         $new_publisher->host,
+		'-p',         $old_publisher->port,
+		'-P',         $new_publisher->port,
+		$mode,        '--include-logical-replication-slots',
+		'--check'
+	],
+	'run of pg_upgrade of old publisher');
+ok( !-d $new_publisher->data_dir . "/pg_upgrade_output.d",
+	"pg_upgrade_output.d/ removed after pg_upgrade success");
+
+# Cleanup
+$bsession->query_safe("COMMIT");
+$bsession->quit;
+$old_publisher->wait_for_catchup('sub');
+
 # Define another replication slot which allows to decode prepared transactions
 $old_publisher->safe_psql('postgres',
-	"SELECT pg_catalog.pg_create_logical_replication_slot('twophase_slot', 'pgoutput', false, true)");
+	"SELECT pg_catalog.pg_create_logical_replication_slot('twophase_slot', 'pgoutput', false, true)"
+);
 
 # Preparations for upgrading publisher
 $old_publisher->stop;
@@ -160,7 +198,7 @@ $new_publisher->start;
 $result =
   $new_publisher->safe_psql('postgres',
 	"SELECT slot_name, two_phase FROM pg_replication_slots");
-is($result, qq(sub|f
+is( $result, qq(sub|f
 twophase_slot|t), 'check the replication slot is copied to new publisher');
 
 # Change connection string and enable logical replication
@@ -172,11 +210,11 @@ $subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE");
 
 # Check whether changes on the new publisher get replicated to the subscriber
 $new_publisher->safe_psql('postgres',
-	"INSERT INTO tbl VALUES (generate_series(21, 30))");
+	"INSERT INTO tbl VALUES (generate_series(31, 40))");
 
 $new_publisher->wait_for_catchup('sub');
 
 $result = $subscriber->safe_psql('postgres', "SELECT count(*) FROM tbl");
-is($result, qq(30), 'check changes are shipped to subscriber');
+is($result, qq(40), 'check changes are shipped to subscriber');
 
 done_testing();
-- 
2.27.0

