On Wed, May 22, 2024, at 12:16 PM, Euler Taveira wrote:
> I'll summarize all issues as soon as I finish the review of sync slot 
> support. I
> think we should avoid new development if we judge that the item can be
> documented as a limitation for this version. Nevertheless, I will share 
> patches
> so you can give your opinion on whether it is an open item or new development.

Here it is a patch series to fix the issues reported in recent discussions. The
patches 0001 and 0003 aim to fix the buildfarm issues. The patch 0002 removes
synchronized failover slots on subscriber since it has no use. I also included
an optional patch 0004 that improves the usability by checking both servers if
it already failed in any subscriber check.

As I said in this previous email I decided to remove the logic that reacts for
an issue on primary. We can reintroduce another code later if/when we have a
better way to check the recovery progress. It will rely on the recovery_timeout
and it adds recovery_timeout equals to PG_TEST_TIMEOUT_DEFAULT to let the
animals control how long it should wait for the recovery.

Since some animals reported some issues in the check_publisher routine that
checks if the primary_slot_name is in use on primary, this logic was removed
too (patch 0003). We could introduce a way to keep trying this test but the
conclusion is that it is not worth it and if the primary_slot_name does not
exist (due to a setup error), pg_createsubscriber will log an error message and
continue.

The 0002 removes any failover slots that remains on subscriber. Talking about
terminology, I noticed that slotsync.c uses "logical failover slots" and
"failover logical slots", I think the latter sounds better but I'm not a native
speaker. I also don't know if we use a short terminology like "failover slots"
"failover replication slots" or "failover logical replication slots". IMO we
can omit "logical" because "failover" infers it is a logical replication slot.
I'm also not sure about omitting "replication". It is not descriptive enough. I
prefer "failover replication slots".

Before sending this email I realized that I did nothing about physical
replication slots on the standby. I think we should also remove them too
unconditionally.


--
Euler Taveira
EDB   https://www.enterprisedb.com/
From 5d8b4781e6e9bcb00564f45c25e575a8abab6ae8 Mon Sep 17 00:00:00 2001
From: Euler Taveira <euler.tave...@enterprisedb.com>
Date: Tue, 21 May 2024 23:04:57 -0300
Subject: [PATCH 1/4] Only the recovery_timeout controls the end of recovery
 process

It used to check if the target server is connected to the primary server
(send required WAL) to rapidly react when the process won't succeed.
This code is not enough to guarantee that the recovery process will
complete. There is a window between the walreceiver shutdown and the
pg_is_in_recovery() returns false that can reach NUM_CONN_ATTEMPTS
attempts and fails.
Instead, rely only on recovery_timeout option to give up the process
after the specified number of seconds.
---
 doc/src/sgml/ref/pg_createsubscriber.sgml     |  7 -----
 src/bin/pg_basebackup/pg_createsubscriber.c   | 29 ++-----------------
 .../t/040_pg_createsubscriber.pl              |  2 ++
 3 files changed, 5 insertions(+), 33 deletions(-)

diff --git a/doc/src/sgml/ref/pg_createsubscriber.sgml b/doc/src/sgml/ref/pg_createsubscriber.sgml
index 142bffff02..a700697f88 100644
--- a/doc/src/sgml/ref/pg_createsubscriber.sgml
+++ b/doc/src/sgml/ref/pg_createsubscriber.sgml
@@ -325,13 +325,6 @@ PostgreSQL documentation
     connections to the target server should fail.
    </para>
 
-   <para>
-    During the recovery process, if the target server disconnects from the
-    source server, <application>pg_createsubscriber</application> will check a
-    few times if the connection has been reestablished to stream the required
-    WAL.  After a few attempts, it terminates with an error.
-   </para>
-
    <para>
     Since DDL commands are not replicated by logical replication, avoid
     executing DDL commands that change the database schema while running
diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c
index 90cc580811..f62f34b1a7 100644
--- a/src/bin/pg_basebackup/pg_createsubscriber.c
+++ b/src/bin/pg_basebackup/pg_createsubscriber.c
@@ -1360,6 +1360,9 @@ stop_standby_server(const char *datadir)
  *
  * If recovery_timeout option is set, terminate abnormally without finishing
  * the recovery process. By default, it waits forever.
+ *
+ * XXX Is the recovery process still in progress? When recovery process has a
+ * better progress reporting mechanism, it should be added here.
  */
 static void
 wait_for_end_recovery(const char *conninfo, const struct CreateSubscriberOptions *opt)
@@ -1367,9 +1370,6 @@ wait_for_end_recovery(const char *conninfo, const struct CreateSubscriberOptions
 	PGconn	   *conn;
 	int			status = POSTMASTER_STILL_STARTING;
 	int			timer = 0;
-	int			count = 0;		/* number of consecutive connection attempts */
-
-#define NUM_CONN_ATTEMPTS	10
 
 	pg_log_info("waiting for the target server to reach the consistent state");
 
@@ -1377,7 +1377,6 @@ wait_for_end_recovery(const char *conninfo, const struct CreateSubscriberOptions
 
 	for (;;)
 	{
-		PGresult   *res;
 		bool		in_recovery = server_is_in_recovery(conn);
 
 		/*
@@ -1391,28 +1390,6 @@ wait_for_end_recovery(const char *conninfo, const struct CreateSubscriberOptions
 			break;
 		}
 
-		/*
-		 * If it is still in recovery, make sure the target server is
-		 * connected to the primary so it can receive the required WAL to
-		 * finish the recovery process. If it is disconnected try
-		 * NUM_CONN_ATTEMPTS in a row and bail out if not succeed.
-		 */
-		res = PQexec(conn,
-					 "SELECT 1 FROM pg_catalog.pg_stat_wal_receiver");
-		if (PQntuples(res) == 0)
-		{
-			if (++count > NUM_CONN_ATTEMPTS)
-			{
-				stop_standby_server(subscriber_dir);
-				pg_log_error("standby server disconnected from the primary");
-				break;
-			}
-		}
-		else
-			count = 0;			/* reset counter if it connects again */
-
-		PQclear(res);
-
 		/* Bail out after recovery_timeout seconds if this option is set */
 		if (opt->recovery_timeout > 0 && timer >= opt->recovery_timeout)
 		{
diff --git a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
index 2b883e6910..a677fefa94 100644
--- a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
+++ b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
@@ -264,6 +264,7 @@ $node_p->restart;
 command_ok(
 	[
 		'pg_createsubscriber', '--verbose',
+		'--recovery-timeout', "$PostgreSQL::Test::Utils::timeout_default",
 		'--dry-run', '--pgdata',
 		$node_s->data_dir, '--publisher-server',
 		$node_p->connstr('pg1'), '--socket-directory',
@@ -301,6 +302,7 @@ command_ok(
 command_ok(
 	[
 		'pg_createsubscriber', '--verbose',
+		'--recovery-timeout', "$PostgreSQL::Test::Utils::timeout_default",
 		'--verbose', '--pgdata',
 		$node_s->data_dir, '--publisher-server',
 		$node_p->connstr('pg1'), '--socket-directory',
-- 
2.30.2

From a10edc03a6636d6fef9e013a507a8258096a6fd3 Mon Sep 17 00:00:00 2001
From: Euler Taveira <euler.tave...@enterprisedb.com>
Date: Wed, 22 May 2024 12:43:13 -0300
Subject: [PATCH 2/4] Remove failover replication slots on subscriber

After running pg_createsubscriber, these replication slots has no use on
subscriber.
---
 doc/src/sgml/ref/pg_createsubscriber.sgml     |  8 +++
 src/bin/pg_basebackup/pg_createsubscriber.c   | 49 ++++++++++++++++++-
 .../t/040_pg_createsubscriber.pl              | 20 +++++++-
 3 files changed, 75 insertions(+), 2 deletions(-)

diff --git a/doc/src/sgml/ref/pg_createsubscriber.sgml b/doc/src/sgml/ref/pg_createsubscriber.sgml
index a700697f88..ac56c0cce9 100644
--- a/doc/src/sgml/ref/pg_createsubscriber.sgml
+++ b/doc/src/sgml/ref/pg_createsubscriber.sgml
@@ -482,6 +482,14 @@ PostgreSQL documentation
      </para>
     </step>
 
+    <step>
+     <para>
+      If the standby server contains
+      <link linkend="logicaldecoding-replication-slots-synchronization">failover replication slots</link>,
+      they cannot be synchronized anymore so drop them.
+     </para>
+    </step>
+
     <step>
      <para>
       Update the system identifier on the target server. The
diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c
index f62f34b1a7..b42160c6c6 100644
--- a/src/bin/pg_basebackup/pg_createsubscriber.c
+++ b/src/bin/pg_basebackup/pg_createsubscriber.c
@@ -85,6 +85,7 @@ static void setup_recovery(const struct LogicalRepInfo *dbinfo, const char *data
 						   const char *lsn);
 static void drop_primary_replication_slot(struct LogicalRepInfo *dbinfo,
 										  const char *slotname);
+static void drop_failover_replication_slots(struct LogicalRepInfo *dbinfo);
 static char *create_logical_replication_slot(PGconn *conn,
 											 struct LogicalRepInfo *dbinfo);
 static void drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo,
@@ -1178,6 +1179,49 @@ drop_primary_replication_slot(struct LogicalRepInfo *dbinfo, const char *slotnam
 	}
 }
 
+/*
+ * Drop failover replication slots on subscriber. After the transformation, it
+ * has no use.
+ *
+ * XXX we might not fail here. Instead, we provide a warning so the user
+ * eventually drops this replication slot later.
+ */
+static void
+drop_failover_replication_slots(struct LogicalRepInfo *dbinfo)
+{
+	PGconn	   *conn;
+	PGresult   *res;
+
+	conn = connect_database(dbinfo[0].subconninfo, false);
+	if (conn != NULL)
+	{
+		/* Get failover replication slot names. */
+		res = PQexec(conn,
+					 "SELECT slot_name FROM pg_catalog.pg_replication_slots WHERE failover");
+
+		if (PQresultStatus(res) == PGRES_TUPLES_OK)
+		{
+			/* Remove failover replication slots from subscriber. */
+			for (int i = 0; i < PQntuples(res); i++)
+				drop_replication_slot(conn, &dbinfo[0], PQgetvalue(res, i, 0));
+		}
+		else
+		{
+			pg_log_warning("could not obtain failover replication slot information: %s",
+						   PQresultErrorMessage(res));
+			pg_log_warning_hint("Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
+		}
+
+		PQclear(res);
+		disconnect_database(conn, false);
+	}
+	else
+	{
+		pg_log_warning("could not drop failover replication slot");
+		pg_log_warning_hint("Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
+	}
+}
+
 /*
  * Create a logical replication slot and returns a LSN.
  *
@@ -1309,7 +1353,7 @@ start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_
 	PQExpBuffer pg_ctl_cmd = createPQExpBuffer();
 	int			rc;
 
-	appendPQExpBuffer(pg_ctl_cmd, "\"%s\" start -D \"%s\" -s",
+	appendPQExpBuffer(pg_ctl_cmd, "\"%s\" start -D \"%s\" -s -o \"-c sync_replication_slots=off\"",
 					  pg_ctl_path, subscriber_dir);
 	if (restricted_access)
 	{
@@ -2111,6 +2155,9 @@ main(int argc, char **argv)
 	/* Remove primary_slot_name if it exists on primary */
 	drop_primary_replication_slot(dbinfo, primary_slot_name);
 
+	/* Remove failover  replication slots if it exists on subscriber */
+	drop_failover_replication_slots(dbinfo);
+
 	/* Stop the subscriber */
 	pg_log_info("stopping the subscriber");
 	stop_standby_server(subscriber_dir);
diff --git a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
index a677fefa94..0516d4e17e 100644
--- a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
+++ b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
@@ -88,6 +88,7 @@ command_fails(
 
 # Set up node P as primary
 my $node_p = PostgreSQL::Test::Cluster->new('node_p');
+my $pconnstr = $node_p->connstr;
 $node_p->init(allows_streaming => 'logical');
 $node_p->start;
 
@@ -122,6 +123,8 @@ $node_s->init_from_backup($node_p, 'backup_1', has_streaming => 1);
 $node_s->append_conf(
 	'postgresql.conf', qq[
 primary_slot_name = '$slotname'
+primary_conninfo = '$pconnstr dbname=postgres'
+hot_standby_feedback = on
 ]);
 $node_s->set_standby_mode();
 $node_s->start;
@@ -260,6 +263,16 @@ max_worker_processes = 8
 # Restore default settings on both servers
 $node_p->restart;
 
+# Create failover slot to test its removal
+my $fslotname = 'failover_slot';
+$node_p->safe_psql('pg1',
+	"SELECT pg_create_logical_replication_slot('$fslotname', 'pgoutput', false, false, true)");
+$node_s->start;
+$node_s->safe_psql('postgres', "SELECT pg_sync_replication_slots()");
+my $result = $node_s->safe_psql('postgres', "SELECT slot_name FROM pg_replication_slots WHERE slot_name = '$fslotname' AND synced AND NOT temporary");
+is($result, 'failover_slot', 'failover slot is synced');
+$node_s->stop;
+
 # dry run mode on node S
 command_ok(
 	[
@@ -318,7 +331,7 @@ command_ok(
 	'run pg_createsubscriber on node S');
 
 # Confirm the physical replication slot has been removed
-my $result = $node_p->safe_psql('pg1',
+$result = $node_p->safe_psql('pg1',
 	"SELECT count(*) FROM pg_replication_slots WHERE slot_name = '$slotname'"
 );
 is($result, qq(0),
@@ -343,6 +356,11 @@ my @subnames = split("\n", $result);
 $node_s->wait_for_subscription_sync($node_p, $subnames[0]);
 $node_s->wait_for_subscription_sync($node_p, $subnames[1]);
 
+# Confirm the failover slot has been removed
+$result = $node_s->safe_psql('pg1',
+	"SELECT count(*) FROM pg_replication_slots WHERE slot_name = '$fslotname'");
+is($result, qq(0), 'failover slot was removed');
+
 # Check result on database pg1
 $result = $node_s->safe_psql('pg1', 'SELECT * FROM tbl1');
 is( $result, qq(first row
-- 
2.30.2

From f085fbef4ecd31aed470d858e57e9ea3b8014493 Mon Sep 17 00:00:00 2001
From: Euler Taveira <euler.tave...@enterprisedb.com>
Date: Fri, 24 May 2024 14:18:15 -0300
Subject: [PATCH 3/4] Remove replication slot check on primary

It used to check if the replication slot exists and is active on
primary. This check might fail on slow hosts because the replication
slot might not be active at the time of this check.

The current code obtains the replication slot name from the
primary_slot_name on standby and assumes the replication slot exists and
is active on primary. If it doesn't exist, this tool will log an error
and continue.
---
 src/bin/pg_basebackup/pg_createsubscriber.c | 53 ++-------------------
 1 file changed, 5 insertions(+), 48 deletions(-)

diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c
index b42160c6c6..6d5127c45e 100644
--- a/src/bin/pg_basebackup/pg_createsubscriber.c
+++ b/src/bin/pg_basebackup/pg_createsubscriber.c
@@ -880,47 +880,6 @@ check_publisher(const struct LogicalRepInfo *dbinfo)
 	pg_log_debug("publisher: max_wal_senders: %d", max_walsenders);
 	pg_log_debug("publisher: current wal senders: %d", cur_walsenders);
 
-	/*
-	 * If standby sets primary_slot_name, check if this replication slot is in
-	 * use on primary for WAL retention purposes. This replication slot has no
-	 * use after the transformation, hence, it will be removed at the end of
-	 * this process.
-	 */
-	if (primary_slot_name)
-	{
-		PQExpBuffer str = createPQExpBuffer();
-		char	   *psn_esc = PQescapeLiteral(conn, primary_slot_name, strlen(primary_slot_name));
-
-		appendPQExpBuffer(str,
-						  "SELECT 1 FROM pg_catalog.pg_replication_slots "
-						  "WHERE active AND slot_name = %s",
-						  psn_esc);
-
-		pg_free(psn_esc);
-
-		pg_log_debug("command is: %s", str->data);
-
-		res = PQexec(conn, str->data);
-		if (PQresultStatus(res) != PGRES_TUPLES_OK)
-		{
-			pg_log_error("could not obtain replication slot information: %s",
-						 PQresultErrorMessage(res));
-			disconnect_database(conn, true);
-		}
-
-		if (PQntuples(res) != 1)
-		{
-			pg_log_error("could not obtain replication slot information: got %d rows, expected %d row",
-						 PQntuples(res), 1);
-			disconnect_database(conn, true);
-		}
-		else
-			pg_log_info("primary has replication slot \"%s\"",
-						primary_slot_name);
-
-		PQclear(res);
-	}
-
 	disconnect_database(conn, false);
 
 	if (strcmp(wal_level, "logical") != 0)
@@ -2105,12 +2064,7 @@ main(int argc, char **argv)
 	/* Check if the standby server is ready for logical replication */
 	check_subscriber(dbinfo);
 
-	/*
-	 * Check if the primary server is ready for logical replication. This
-	 * routine checks if a replication slot is in use on primary so it relies
-	 * on check_subscriber() to obtain the primary_slot_name. That's why it is
-	 * called after it.
-	 */
+	/* Check if the primary server is ready for logical replication */
 	check_publisher(dbinfo);
 
 	/*
@@ -2152,7 +2106,10 @@ main(int argc, char **argv)
 	 */
 	setup_subscriber(dbinfo, consistent_lsn);
 
-	/* Remove primary_slot_name if it exists on primary */
+	/*
+	 * Remove primary_slot_name if it exists on primary.  This replication slot
+	 * has no use after the transformation.
+	 */
 	drop_primary_replication_slot(dbinfo, primary_slot_name);
 
 	/* Remove failover  replication slots if it exists on subscriber */
-- 
2.30.2

From a4f10c849f7584c4fb12005cec04b5d09701752f Mon Sep 17 00:00:00 2001
From: Euler Taveira <euler.tave...@enterprisedb.com>
Date: Fri, 24 May 2024 14:29:06 -0300
Subject: [PATCH 4/4] Check both servers before exiting

The current code provides multiple errors for each server. If any
subscriber condition is not met, it terminates without checking the
publisher conditions.
This change allows it to check both servers before terminating if any
condition is not met. It saves at least one dry run execution.
---
 src/bin/pg_basebackup/pg_createsubscriber.c | 24 ++++++++++++---------
 1 file changed, 14 insertions(+), 10 deletions(-)

diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c
index 6d5127c45e..b0233786d4 100644
--- a/src/bin/pg_basebackup/pg_createsubscriber.c
+++ b/src/bin/pg_basebackup/pg_createsubscriber.c
@@ -76,9 +76,9 @@ static uint64 get_standby_sysid(const char *datadir);
 static void modify_subscriber_sysid(const struct CreateSubscriberOptions *opt);
 static bool server_is_in_recovery(PGconn *conn);
 static char *generate_object_name(PGconn *conn);
-static void check_publisher(const struct LogicalRepInfo *dbinfo);
+static bool check_publisher(const struct LogicalRepInfo *dbinfo);
 static char *setup_publisher(struct LogicalRepInfo *dbinfo);
-static void check_subscriber(const struct LogicalRepInfo *dbinfo);
+static bool check_subscriber(const struct LogicalRepInfo *dbinfo);
 static void setup_subscriber(struct LogicalRepInfo *dbinfo,
 							 const char *consistent_lsn);
 static void setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir,
@@ -803,7 +803,7 @@ server_is_in_recovery(PGconn *conn)
  *
  * XXX Does it not allow a synchronous replica?
  */
-static void
+static bool
 check_publisher(const struct LogicalRepInfo *dbinfo)
 {
 	PGconn	   *conn;
@@ -908,8 +908,7 @@ check_publisher(const struct LogicalRepInfo *dbinfo)
 
 	pg_free(wal_level);
 
-	if (failed)
-		exit(1);
+	return failed;
 }
 
 /*
@@ -923,7 +922,7 @@ check_publisher(const struct LogicalRepInfo *dbinfo)
  * there is not a reliable way to provide a suitable error saying the server C
  * will be broken at the end of this process (due to pg_resetwal).
  */
-static void
+static bool
 check_subscriber(const struct LogicalRepInfo *dbinfo)
 {
 	PGconn	   *conn;
@@ -1014,8 +1013,7 @@ check_subscriber(const struct LogicalRepInfo *dbinfo)
 		failed = true;
 	}
 
-	if (failed)
-		exit(1);
+	return failed;
 }
 
 /*
@@ -1771,6 +1769,9 @@ main(int argc, char **argv)
 
 	char		pidfile[MAXPGPATH];
 
+	bool		pub_failed = false;
+	bool		sub_failed = false;
+
 	pg_logging_init(argv[0]);
 	pg_logging_set_level(PG_LOG_WARNING);
 	progname = get_progname(argv[0]);
@@ -2062,10 +2063,13 @@ main(int argc, char **argv)
 	start_standby_server(&opt, true);
 
 	/* Check if the standby server is ready for logical replication */
-	check_subscriber(dbinfo);
+	sub_failed = check_subscriber(dbinfo);
 
 	/* Check if the primary server is ready for logical replication */
-	check_publisher(dbinfo);
+	pub_failed = check_publisher(dbinfo);
+
+	if (pub_failed || sub_failed)
+		exit(1);
 
 	/*
 	 * Stop the target server. The recovery process requires that the server
-- 
2.30.2

Reply via email to