From c022357ec0b3bae5fc9bd3d2b18262eeba39c2a9 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Fri, 18 Aug 2023 11:57:37 +0000
Subject: [PATCH v22 3/3] pg_upgrade: Add check function for logical
 replication slots

In order to prevent data loss, pg_upgrade will fail if the old node has slots with
the status 'lost', or with unconsumed WAL records.

Author: Hayato Kuroda
Reviewed-by: Wang Wei, Vignesh C, Peter Smith
---
 src/bin/pg_upgrade/check.c                    | 93 +++++++++++++++++++
 src/bin/pg_upgrade/controldata.c              | 34 +++++++
 src/bin/pg_upgrade/info.c                     | 14 +++
 src/bin/pg_upgrade/pg_upgrade.h               |  5 +
 .../t/003_logical_replication_slots.pl        | 69 ++++++++++++--
 5 files changed, 209 insertions(+), 6 deletions(-)

diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index ed5b07fbb7..859fbb7cdc 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -9,6 +9,7 @@
 
 #include "postgres_fe.h"
 
+#include "access/xlogdefs.h"
 #include "catalog/pg_authid_d.h"
 #include "catalog/pg_collation.h"
 #include "fe_utils/string_utils.h"
@@ -1468,3 +1469,95 @@ check_for_logical_replication_slots(ClusterInfo *new_cluster)
 
 	check_ok();
 }
+
+/*
+ * Verify that all logical replication slots are usable.
+ */
+void
+check_for_lost_slots(ClusterInfo *cluster)
+{
+	int			i,
+				ntups,
+				i_slotname;
+	PGresult   *res;
+	DbInfo	   *active_db = &cluster->dbarr.dbs[0];
+	PGconn	   *conn = connectToServer(cluster, active_db->db_name);
+
+	/* logical slots can be migrated since PG17. */
+	if (GET_MAJOR_VERSION(cluster->major_version) <= 1600)
+		return;
+
+	prep_status("Checking wal_status for logical replication slots");
+
+	/* Check there are no logical replication slots with a 'lost' state. */
+	res = executeQueryOrDie(conn,
+							"SELECT slot_name FROM pg_catalog.pg_replication_slots "
+							"WHERE wal_status = 'lost' AND "
+							"temporary IS FALSE;");
+
+	ntups = PQntuples(res);
+	i_slotname = PQfnumber(res, "slot_name");
+
+	for (i = 0; i < ntups; i++)
+	{
+		pg_log(PG_WARNING,
+			   "\nWARNING: logical replication slot \"%s\" is in 'lost' state.",
+			   PQgetvalue(res, i, i_slotname));
+	}
+
+	PQclear(res);
+	PQfinish(conn);
+
+	if (ntups)
+		pg_fatal("One or more logical replication slots with a state of 'lost' were detected.");
+
+	check_ok();
+}
+
+/*
+ * Verify that all logical replication slots consumed all WALs, except a
+ * CHECKPOINT_SHUTDOWN record.
+ */
+void
+check_for_confirmed_flush_lsn(ClusterInfo *cluster)
+{
+	int			i,
+				ntups,
+				i_slotname;
+	PGresult   *res;
+	DbInfo	   *active_db = &cluster->dbarr.dbs[0];
+	PGconn	   *conn = connectToServer(cluster, active_db->db_name);
+
+	/* logical slots can be migrated since PG17. */
+	if (GET_MAJOR_VERSION(cluster->major_version) <= 1600)
+		return;
+
+	prep_status("Checking confirmed_flush_lsn for logical replication slots");
+
+	/*
+	 * Check that all logical replication slots have reached the latest
+	 * checkpoint position (SHUTDOWN_CHECKPOINT record).
+	 */
+	res = executeQueryOrDie(conn,
+							"SELECT slot_name FROM pg_catalog.pg_replication_slots "
+							"WHERE confirmed_flush_lsn != '%X/%X' AND temporary IS FALSE;",
+							LSN_FORMAT_ARGS(old_cluster.controldata.chkpnt_latest));
+
+	ntups = PQntuples(res);
+	i_slotname = PQfnumber(res, "slot_name");
+
+	for (i = 0; i < ntups; i++)
+	{
+		pg_log(PG_WARNING,
+				"\nWARNING: logical replication slot \"%s\" has not consumed WALs yet",
+				PQgetvalue(res, i, i_slotname));
+	}
+
+	PQclear(res);
+	PQfinish(conn);
+
+	if (ntups)
+		pg_fatal("One or more logical replication slots still have unconsumed WAL records.");
+
+	check_ok();
+}
diff --git a/src/bin/pg_upgrade/controldata.c b/src/bin/pg_upgrade/controldata.c
index 4beb65ab22..30ad6ba1e0 100644
--- a/src/bin/pg_upgrade/controldata.c
+++ b/src/bin/pg_upgrade/controldata.c
@@ -169,6 +169,40 @@ get_control_data(ClusterInfo *cluster, bool live_check)
 				}
 				got_cluster_state = true;
 			}
+
+			else if ((p = strstr(bufin, "Latest checkpoint location:")) != NULL)
+			{
+				/*
+				 * Gather latest checkpoint location if the cluster is newer or
+				 * equal to 17. This is used for upgrading logical replication
+				 * slots.
+				 */
+				if (GET_MAJOR_VERSION(cluster->major_version) >= 17)
+				{
+					char *slash = NULL;
+					uint64 upper_lsn, lower_lsn;
+
+					p = strchr(p, ':');
+
+					if (p == NULL || strlen(p) <= 1)
+						pg_fatal("%d: controldata retrieval problem", __LINE__);
+
+					p++;			/* remove ':' char */
+
+					p = strpbrk(p, "01234567890ABCDEF");
+
+					/*
+					 * Upper and lower part of LSN must be read separately
+					 * because it is reported as %X/%X format.
+					 */
+					upper_lsn = strtoul(p, &slash, 16);
+					lower_lsn = strtoul(++slash, NULL, 16);
+
+					/* And combine them */
+					cluster->controldata.chkpnt_latest =
+										(upper_lsn << 32) | lower_lsn;
+				}
+			}
 		}
 
 		rc = pclose(output);
diff --git a/src/bin/pg_upgrade/info.c b/src/bin/pg_upgrade/info.c
index b8505ae65b..08e002db28 100644
--- a/src/bin/pg_upgrade/info.c
+++ b/src/bin/pg_upgrade/info.c
@@ -684,6 +684,20 @@ get_logical_slot_infos(ClusterInfo *cluster, bool live_check)
 			print_slot_infos(&pDbInfo->slot_arr);
 		}
 	}
+
+	/*
+	 * Do additional checks if slots are found on the old node. If something is
+	 * found on the new node, a subsequent function
+	 * check_new_cluster_is_empty() would report the name of slots and raise a
+	 * fatal error.
+	 */
+	if (cluster == &old_cluster && slot_count)
+	{
+		check_for_lost_slots(cluster);
+
+		if (!live_check)
+			check_for_confirmed_flush_lsn(cluster);
+	}
 }
 
 /*
diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h
index 6ba2efe1b3..6df948ac73 100644
--- a/src/bin/pg_upgrade/pg_upgrade.h
+++ b/src/bin/pg_upgrade/pg_upgrade.h
@@ -10,6 +10,7 @@
 #include <sys/stat.h>
 #include <sys/time.h>
 
+#include "access/xlogdefs.h"
 #include "common/relpath.h"
 #include "libpq-fe.h"
 
@@ -242,6 +243,8 @@ typedef struct
 	bool		date_is_int;
 	bool		float8_pass_by_value;
 	uint32		data_checksum_version;
+
+	XLogRecPtr	chkpnt_latest;
 } ControlData;
 
 /*
@@ -366,6 +369,8 @@ void		output_completion_banner(char *deletion_script_file_name);
 void		check_cluster_versions(void);
 void		check_cluster_compatibility(bool live_check);
 void		create_script_for_old_cluster_deletion(char **deletion_script_file_name);
+void		check_for_lost_slots(ClusterInfo *cluster);
+void		check_for_confirmed_flush_lsn(ClusterInfo *cluster);
 
 
 /* controldata.c */
diff --git a/src/bin/pg_upgrade/t/003_logical_replication_slots.pl b/src/bin/pg_upgrade/t/003_logical_replication_slots.pl
index 3df2d3c284..3828307577 100644
--- a/src/bin/pg_upgrade/t/003_logical_replication_slots.pl
+++ b/src/bin/pg_upgrade/t/003_logical_replication_slots.pl
@@ -17,15 +17,16 @@ my $mode = $ENV{PG_TEST_PG_UPGRADE_MODE} || '--copy';
 # Initialize old node
 my $old_publisher = PostgreSQL::Test::Cluster->new('old_publisher');
 $old_publisher->init(allows_streaming => 'logical');
-$old_publisher->start;
 
 # Initialize new node
 my $new_publisher = PostgreSQL::Test::Cluster->new('new_publisher');
 $new_publisher->init(allows_streaming => 'replica');
 
-my $bindir = $new_publisher->config_data('--bindir');
+# Initialize subscriber node
+my $subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$subscriber->init(allows_streaming => 'logical');
 
-$old_publisher->stop;
+my $bindir = $new_publisher->config_data('--bindir');
 
 # Create a slot on old node
 $old_publisher->start;
@@ -59,6 +60,7 @@ $old_publisher->start;
 $old_publisher->safe_psql(
 	'postgres', qq[
 	SELECT pg_create_logical_replication_slot('test_slot2', 'test_decoding', false, true);
+	SELECT count(*) FROM pg_logical_slot_get_changes('test_slot1', NULL, NULL);
 ]);
 
 $old_publisher->stop;
@@ -89,15 +91,57 @@ ok( -d $new_publisher->data_dir . "/pg_upgrade_output.d",
 # Clean up
 rmtree($new_publisher->data_dir . "/pg_upgrade_output.d");
 
-# Remove an unnecessary slot and consume WAL records
+# Remove an unnecessary slot and generate WALs. These records would not be
+# consumed before doing pg_upgrade, so that the upcoming test would fail.
 $old_publisher->start;
 $old_publisher->safe_psql(
 	'postgres', qq[
 	SELECT pg_drop_replication_slot('test_slot2');
-	SELECT count(*) FROM pg_logical_slot_get_changes('test_slot1', NULL, NULL)
+	CREATE TABLE tbl AS SELECT generate_series(1, 10) AS a;
 ]);
 $old_publisher->stop;
 
+# Cause a failure at the start of pg_upgrade because the slot still have
+# unconsumed WAL records
+command_fails(
+	[
+		'pg_upgrade', '--no-sync',
+		'-d',         $old_publisher->data_dir,
+		'-D',         $new_publisher->data_dir,
+		'-b',         $bindir,
+		'-B',         $bindir,
+		'-s',         $new_publisher->host,
+		'-p',         $old_publisher->port,
+		'-P',         $new_publisher->port,
+		$mode,
+	],
+	'run of pg_upgrade of old node with idle replication slots');
+ok( -d $new_publisher->data_dir . "/pg_upgrade_output.d",
+	"pg_upgrade_output.d/ not removed after pg_upgrade failure");
+
+# Clean up
+rmtree($new_publisher->data_dir . "/pg_upgrade_output.d");
+
+# Setup logical replication
+my $old_connstr = $old_publisher->connstr . ' dbname=postgres';
+$old_publisher->start();
+$old_publisher->safe_psql('postgres', qq[
+	SELECT pg_drop_replication_slot('test_slot1');
+	CREATE PUBLICATION pub FOR ALL TABLES;
+]);
+$subscriber->start;
+$subscriber->safe_psql(
+	'postgres', qq[
+	CREATE TABLE tbl (a int);
+	CREATE SUBSCRIPTION sub CONNECTION '$old_connstr' PUBLICATION pub WITH (two_phase = 'true')
+]);
+
+# Wait for initial table sync to finish
+$subscriber->wait_for_subscription_sync($old_publisher, 'sub');
+
+$subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub DISABLE");
+$old_publisher->stop;
+
 # Actual run, pg_upgrade_output.d is removed at the end
 command_ok(
 	[
@@ -118,6 +162,19 @@ ok( !-d $new_publisher->data_dir . "/pg_upgrade_output.d",
 $new_publisher->start;
 my $result = $new_publisher->safe_psql('postgres',
 	"SELECT slot_name, two_phase FROM pg_replication_slots");
-is($result, qq(test_slot1|t), 'check the slot exists on new node');
+is($result, qq(sub|t), 'check the slot exists on new node');
+
+# Change the connection
+my $new_connstr = $new_publisher->connstr . ' dbname=postgres';
+$subscriber->safe_psql('postgres',
+	"ALTER SUBSCRIPTION sub CONNECTION '$new_connstr'");
+$subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE");
+
+# Check whether changes on the new publisher get replicated to the subscriber
+$new_publisher->safe_psql('postgres',
+	"INSERT INTO tbl VALUES (generate_series(11, 20))");
+$new_publisher->wait_for_catchup('sub');
+$result = $subscriber->safe_psql('postgres', "SELECT count(*) FROM tbl");
+is($result, qq(20), 'check changes are shipped to subscriber');
 
 done_testing();
-- 
2.27.0

