From 49c17070ce789f09105ef53f47626cc9c2c1be14 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Fri, 18 Aug 2023 11:57:37 +0000
Subject: [PATCH v24 3/3] pg_upgrade: Add check function for logical
 replication slots

To prevent data loss, pg_upgrade will fail if the old node has slots with the
status 'lost', or with unconsumed WAL records.

Author: Hayato Kuroda
Reviewed-by: Wang Wei, Vignesh C, Peter Smith, Hou Zhijie
---
 src/bin/pg_upgrade/check.c                    | 109 ++++++++++++++++++
 src/bin/pg_upgrade/controldata.c              |  37 ++++++
 src/bin/pg_upgrade/pg_upgrade.h               |   3 +
 .../t/003_logical_replication_slots.pl        |  88 ++++++++++++--
 4 files changed, 230 insertions(+), 7 deletions(-)

diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index 61a81c5011..e9cdbcbebf 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -9,6 +9,7 @@
 
 #include "postgres_fe.h"
 
+#include "access/xlogdefs.h"
 #include "catalog/pg_authid_d.h"
 #include "catalog/pg_collation.h"
 #include "fe_utils/string_utils.h"
@@ -31,6 +32,7 @@ static void check_for_pg_role_prefix(ClusterInfo *cluster);
 static void check_for_new_tablespace_dir(ClusterInfo *new_cluster);
 static void check_for_user_defined_encoding_conversions(ClusterInfo *cluster);
 static void check_new_cluster_logical_replication_slots(void);
+static void check_for_confirmed_flush_lsn(ClusterInfo *cluster);
 
 
 /*
@@ -108,6 +110,21 @@ check_and_dump_old_cluster(bool live_check)
 	check_for_reg_data_type_usage(&old_cluster);
 	check_for_isn_and_int8_passing_mismatch(&old_cluster);
 
+	/* Logical replication slots can be migrated since PG17. */
+	if (GET_MAJOR_VERSION(old_cluster.major_version) >= 1700)
+	{
+		check_for_lost_slots(&old_cluster);
+
+		/*
+		 * Do additional checks if a live check is not required. This requires
+		 * that confirmed_flush_lsn of all the slots is the same as the latest
+		 * checkpoint location, but it would be satisfied only when the server
+		 * has been shut down.
+		 */
+		if (!live_check)
+			check_for_confirmed_flush_lsn(&old_cluster);
+	}
+
 	/*
 	 * PG 16 increased the size of the 'aclitem' type, which breaks the
 	 * on-disk format for existing data.
@@ -1467,3 +1484,95 @@ check_new_cluster_logical_replication_slots(void)
 
 	check_ok();
 }
+
+/*
+ * Verify that all logical replication slots are usable.
+ */
+void
+check_for_lost_slots(ClusterInfo *cluster)
+{
+	int			i,
+				ntups,
+				i_slotname;
+	PGresult   *res;
+	DbInfo	   *active_db = &cluster->dbarr.dbs[0];
+	PGconn	   *conn;
+
+	/* Quick exit if the cluster does not have logical slots. */
+	if (count_logical_slots(cluster) == 0)
+		return;
+
+	conn = connectToServer(cluster, active_db->db_name);
+
+	prep_status("Checking wal_status for logical replication slots");
+
+	/* Check there are no logical replication slots with a 'lost' state. */
+	res = executeQueryOrDie(conn,
+							"SELECT slot_name FROM pg_catalog.pg_replication_slots "
+							"WHERE wal_status = 'lost' AND "
+							"temporary IS FALSE;");
+
+	ntups = PQntuples(res);
+	i_slotname = PQfnumber(res, "slot_name");
+
+	for (i = 0; i < ntups; i++)
+		pg_log(PG_WARNING,
+			   "\nWARNING: logical replication slot \"%s\" is in 'lost' state.",
+			   PQgetvalue(res, i, i_slotname));
+
+	PQclear(res);
+	PQfinish(conn);
+
+	if (ntups)
+		pg_fatal("One or more logical replication slots with a state of 'lost' were detected.");
+
+	check_ok();
+}
+
+/*
+ * Verify that all logical replication slots consumed all WALs, except a
+ * CHECKPOINT_SHUTDOWN record.
+ */
+static void
+check_for_confirmed_flush_lsn(ClusterInfo *cluster)
+{
+	int			i,
+				ntups,
+				i_slotname;
+	PGresult   *res;
+	DbInfo	   *active_db = &cluster->dbarr.dbs[0];
+	PGconn	   *conn;
+
+	/* Quick exit if the cluster does not have logical slots. */
+	if (count_logical_slots(cluster) == 0)
+		return;
+
+	conn = connectToServer(cluster, active_db->db_name);
+
+	prep_status("Checking confirmed_flush_lsn for logical replication slots");
+
+	/*
+	 * Check that all logical replication slots have reached the latest
+	 * checkpoint position (SHUTDOWN_CHECKPOINT record).
+	 */
+	res = executeQueryOrDie(conn,
+							"SELECT slot_name FROM pg_catalog.pg_replication_slots "
+							"WHERE confirmed_flush_lsn != '%X/%X' AND temporary IS FALSE;",
+							LSN_FORMAT_ARGS(old_cluster.controldata.chkpnt_latest));
+
+	ntups = PQntuples(res);
+	i_slotname = PQfnumber(res, "slot_name");
+
+	for (i = 0; i < ntups; i++)
+		pg_log(PG_WARNING,
+				"\nWARNING: logical replication slot \"%s\" has not consumed WALs yet",
+				PQgetvalue(res, i, i_slotname));
+
+	PQclear(res);
+	PQfinish(conn);
+
+	if (ntups)
+		pg_fatal("One or more logical replication slots still have unconsumed WAL records.");
+
+	check_ok();
+}
diff --git a/src/bin/pg_upgrade/controldata.c b/src/bin/pg_upgrade/controldata.c
index 4beb65ab22..51eb0df99c 100644
--- a/src/bin/pg_upgrade/controldata.c
+++ b/src/bin/pg_upgrade/controldata.c
@@ -169,6 +169,43 @@ get_control_data(ClusterInfo *cluster, bool live_check)
 				}
 				got_cluster_state = true;
 			}
+
+			else if ((p = strstr(bufin, "Latest checkpoint location:")) != NULL)
+			{
+				/*
+				 * Gather the latest checkpoint location if the cluster is PG17
+				 * or later. This is used for upgrading logical replication
+				 * slots.
+				 */
+				if (GET_MAJOR_VERSION(cluster->major_version) >= 1700)
+				{
+					char *slash = NULL;
+					uint32 upper_lsn, lower_lsn;
+
+					p = strchr(p, ':');
+
+					if (p == NULL || strlen(p) <= 1)
+						pg_fatal("%d: controldata retrieval problem", __LINE__);
+
+					p++;			/* remove ':' char */
+
+					p = strpbrk(p, "01234567890ABCDEF");
+
+					if (p == NULL || strlen(p) <= 1)
+						pg_fatal("%d: controldata retrieval problem", __LINE__);
+
+					/*
+					 * The upper and lower part of LSN must be read separately
+					 * because it is reported in %X/%X format.
+					 */
+					upper_lsn = strtoul(p, &slash, 16);
+					lower_lsn = strtoul(++slash, NULL, 16);
+
+					/* And combine them */
+					cluster->controldata.chkpnt_latest =
+										((uint64) upper_lsn << 32) | lower_lsn;
+				}
+			}
 		}
 
 		rc = pclose(output);
diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h
index 2dac266537..9e0069a31d 100644
--- a/src/bin/pg_upgrade/pg_upgrade.h
+++ b/src/bin/pg_upgrade/pg_upgrade.h
@@ -10,6 +10,7 @@
 #include <sys/stat.h>
 #include <sys/time.h>
 
+#include "access/xlogdefs.h"
 #include "common/relpath.h"
 #include "libpq-fe.h"
 
@@ -242,6 +243,7 @@ typedef struct
 	bool		date_is_int;
 	bool		float8_pass_by_value;
 	uint32		data_checksum_version;
+	XLogRecPtr	chkpnt_latest;
 } ControlData;
 
 /*
@@ -366,6 +368,7 @@ void		output_completion_banner(char *deletion_script_file_name);
 void		check_cluster_versions(void);
 void		check_cluster_compatibility(bool live_check);
 void		create_script_for_old_cluster_deletion(char **deletion_script_file_name);
+void		check_for_lost_slots(ClusterInfo *cluster);
 
 
 /* controldata.c */
diff --git a/src/bin/pg_upgrade/t/003_logical_replication_slots.pl b/src/bin/pg_upgrade/t/003_logical_replication_slots.pl
index ae87c33708..812ed55adf 100644
--- a/src/bin/pg_upgrade/t/003_logical_replication_slots.pl
+++ b/src/bin/pg_upgrade/t/003_logical_replication_slots.pl
@@ -22,6 +22,10 @@ $old_publisher->init(allows_streaming => 'logical');
 my $new_publisher = PostgreSQL::Test::Cluster->new('new_publisher');
 $new_publisher->init(allows_streaming => 'replica');
 
+# Initialize subscriber cluster
+my $subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$subscriber->init(allows_streaming => 'logical');
+
 my $bindir = $new_publisher->config_data('--bindir');
 
 # ------------------------------
@@ -65,13 +69,19 @@ $old_publisher->start;
 $old_publisher->safe_psql('postgres',
 	"SELECT pg_create_logical_replication_slot('test_slot2', 'test_decoding', false, true);"
 );
+
+# 2. Consume WAL records to avoid another type of upgrade failure. It will be
+#	 tested in subsequent cases.
+$old_publisher->safe_psql('postgres',
+	"SELECT count(*) FROM pg_logical_slot_get_changes('test_slot1', NULL, NULL);"
+);
 $old_publisher->stop;
 
-# 2. max_replication_slots is set to smaller than the number of slots (2)
+# 3. max_replication_slots is set to smaller than the number of slots (2)
 #	 present on the old cluster
 $new_publisher->append_conf('postgresql.conf', "max_replication_slots = 1");
 
-# 3. wal_level is set correctly on the new cluster
+# 4. wal_level is set correctly on the new cluster
 $new_publisher->append_conf('postgresql.conf', "wal_level = 'logical'");
 
 # pg_upgrade will fail because the new cluster has insufficient max_replication_slots
@@ -95,7 +105,7 @@ ok( -d $new_publisher->data_dir . "/pg_upgrade_output.d",
 rmtree($new_publisher->data_dir . "/pg_upgrade_output.d");
 
 # ------------------------------
-# TEST: Successful upgrade
+# TEST: Confirm pg_upgrade fails when the slot still has unconsumed WAL records
 
 # Preparations for the subsequent test:
 # 1. Remove the slot 'test_slot2', leaving only 1 slot remaining on the old
@@ -106,12 +116,60 @@ $old_publisher->safe_psql('postgres',
 	"SELECT * FROM pg_drop_replication_slot('test_slot2');"
 );
 
-# 2. Consume WAL records
+# 2. Generate extra WAL records. Because these WAL records do not get consumed
+#	 it will cause the upcoming pg_upgrade test to fail.
 $old_publisher->safe_psql('postgres',
-	"SELECT count(*) FROM pg_logical_slot_get_changes('test_slot1', NULL, NULL)"
+	"CREATE TABLE tbl AS SELECT generate_series(1, 10) AS a;"
 );
 $old_publisher->stop;
 
+# pg_upgrade will fail because the slot still has unconsumed WAL records
+command_fails(
+	[
+		'pg_upgrade', '--no-sync',
+		'-d',         $old_publisher->data_dir,
+		'-D',         $new_publisher->data_dir,
+		'-b',         $bindir,
+		'-B',         $bindir,
+		'-s',         $new_publisher->host,
+		'-p',         $old_publisher->port,
+		'-P',         $new_publisher->port,
+		$mode,
+	],
+	'run of pg_upgrade of old cluster with idle replication slots');
+ok( -d $new_publisher->data_dir . "/pg_upgrade_output.d",
+	"pg_upgrade_output.d/ not removed after pg_upgrade failure");
+
+# Clean up
+rmtree($new_publisher->data_dir . "/pg_upgrade_output.d");
+
+# ------------------------------
+# TEST: Successful upgrade
+
+# Preparations for the subsequent test:
+# 1. Remove the remained slot
+$old_publisher->start;
+$old_publisher->safe_psql('postgres',
+	"SELECT * FROM pg_drop_replication_slot('test_slot1');"
+);
+
+# 2. Setup logical replication
+my $old_connstr = $old_publisher->connstr . ' dbname=postgres';
+$old_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION pub FOR ALL TABLES;"
+);
+$subscriber->start;
+$subscriber->safe_psql(
+	'postgres', qq[
+	CREATE TABLE tbl (a int);
+	CREATE SUBSCRIPTION sub CONNECTION '$old_connstr' PUBLICATION pub WITH (two_phase = 'true')
+]);
+$subscriber->wait_for_subscription_sync($old_publisher, 'sub');
+
+# 3. Disable the subscription once
+$subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub DISABLE");
+$old_publisher->stop;
+
 # Actual run, successful upgrade is expected
 command_ok(
 	[
@@ -133,7 +191,23 @@ ok( !-d $new_publisher->data_dir . "/pg_upgrade_output.d",
 $new_publisher->start;
 my $result = $new_publisher->safe_psql('postgres',
 	"SELECT slot_name, two_phase FROM pg_replication_slots");
-is($result, qq(test_slot1|t), 'check the slot exists on new cluster');
-$new_publisher->stop;
+is($result, qq(sub|t), 'check the slot exists on new cluster');
+
+# Update the connection
+my $new_connstr = $new_publisher->connstr . ' dbname=postgres';
+$subscriber->safe_psql('postgres',
+	"ALTER SUBSCRIPTION sub CONNECTION '$new_connstr'");
+$subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE");
+
+# Check whether changes on the new publisher get replicated to the subscriber
+$new_publisher->safe_psql('postgres',
+	"INSERT INTO tbl VALUES (generate_series(11, 20))");
+$new_publisher->wait_for_catchup('sub');
+$result = $subscriber->safe_psql('postgres', "SELECT count(*) FROM tbl");
+is($result, qq(20), 'check changes are shipped to the subscriber');
+
+# Clean up
+$subscriber->stop();
+$new_publisher->stop();
 
 done_testing();
-- 
2.27.0

