On Fri, 21 Jun 2024 at 16:51, Hayato Kuroda (Fujitsu)
<[email protected]> wrote:
>
> Dear Hackers,
>
> This is a follow-up thread for pg_createsubscriber [1]. I started a new thread
> since there is no activity around here.
>
> ## Problem
>
> Assuming that there is a cascading replication like below:
>
> node A --(logical replication)--> node B --(streaming replication)--> node C
>
> In this case, subscriptions exist even on node C, but it does not try to
> connect
> to node A because the logical replication launcher/worker won't be launched.
> After the conversion, node C becomes a subscriber for node B, and the
> subscription
> toward node A remains. Therefore, another worker that tries to connect to
> node A
> will be launched, raising an ERROR [2]. This failure may occur even during the
> conversion.
>
> ## Solution
>
> The easiest solution is to drop pre-existing subscriptions from the converted
> node.
> To avoid establishing connections during the conversion, slot_name is set to
> NONE
> on the primary first, then drop on the standby. The setting will be restored
> on the
> primary node.
> Attached patch implements the idea. Test script is also included, but not
> sure it should
> be on the HEAD
Few comments:
1) Should we do this only for the enabled subscription, otherwise the
disabled subscriptions will be enabled after running
pg_createsubscriber:
+obtain_and_disable_pre_existing_subscriptions(struct LogicalRepInfo *dbinfo)
+{
+ PQExpBuffer query = createPQExpBuffer();
+
+ for (int i = 0; i < num_dbs; i++)
+ {
+ PGconn *conn;
+ PGresult *res;
+ int ntups;
+
+ /* Connect to publisher */
+ conn = connect_database(dbinfo[i].pubconninfo, true);
+
+ appendPQExpBuffer(query,
+ "SELECT s.subname,
s.subslotname FROM pg_catalog.pg_subscription s "
+ "INNER JOIN
pg_catalog.pg_database d ON (s.subdbid = d.oid) "
+ "WHERE d.datname = '%s'",
+ dbinfo[i].dbname);
+
2) disconnect_database not called here, should the connection be disconnected:
+drop_pre_existing_subscriptions(struct LogicalRepInfo *dbinfo)
+{
+ PQExpBuffer query = createPQExpBuffer();
+
+ for (int i = 0; i < num_dbs; i++)
+ {
+ PGconn *conn;
+ struct LogicalRepInfo info = dbinfo[i];
+
+ /* Connect to subscriber */
+ conn = connect_database(info.subconninfo, false);
+
+ for (int j = 0; j < info.num_subscriptions; j++)
+ {
+ appendPQExpBuffer(query,
+ "DROP
SUBSCRIPTION %s;", info.pre_subnames[j]);
+ PQexec(conn, query->data);
+ resetPQExpBuffer(query);
+ }
+ }
3) Similarly here too:
+static void
+enable_subscirptions_on_publisher(struct LogicalRepInfo *dbinfo)
+{
+ PQExpBuffer query = createPQExpBuffer();
+
+ for (int i = 0; i < num_dbs; i++)
+ {
+ PGconn *conn;
+ struct LogicalRepInfo info = dbinfo[i];
+
+ /* Connect to publisher */
+ conn = connect_database(info.pubconninfo, false);
4) them should be then here:
+ /* ...and them enable the subscription */
+ appendPQExpBuffer(query,
+ "ALTER
SUBSCRIPTION %s ENABLE",
+ info.pre_subnames[j]);
+ PQclear(PQexec(conn, query->data));
+ resetPQExpBuffer(query);
> BTW, I found that LogicalRepInfo.oid won't be used. If needed, I can create
> another patch to remove the attribute.
I was able to compile without this, I think this can be removed.
Regards,
Vignesh