On Fri, 21 Jun 2024 at 16:51, Hayato Kuroda (Fujitsu) <kuroda.hay...@fujitsu.com> wrote: > > Dear Hackers, > > This is a follow-up thread for pg_createsubscriber [1]. I started a new thread > since there is no activity around here. > > ## Problem > > Assuming that there is a cascading replication like below: > > node A --(logical replication)--> node B --(streaming replication)--> node C > > In this case, subscriptions exist even on node C, but it does not try to > connect > to node A because the logical replication launcher/worker won't be launched. > After the conversion, node C becomes a subscriber for node B, and the > subscription > toward node A remains. Therefore, another worker that tries to connect to > node A > will be launched, raising an ERROR [2]. This failure may occur even during the > conversion. > > ## Solution > > The easiest solution is to drop pre-existing subscriptions from the converted > node. > To avoid establishing connections during the conversion, slot_name is set to > NONE > on the primary first, then drop on the standby. The setting will be restored > on the > primary node. > Attached patch implements the idea. Test script is also included, but not > sure it should > be on the HEAD
Few comments: 1) Should we do this only for the enabled subscription, otherwise the disabled subscriptions will be enabled after running pg_createsubscriber: +obtain_and_disable_pre_existing_subscriptions(struct LogicalRepInfo *dbinfo) +{ + PQExpBuffer query = createPQExpBuffer(); + + for (int i = 0; i < num_dbs; i++) + { + PGconn *conn; + PGresult *res; + int ntups; + + /* Connect to publisher */ + conn = connect_database(dbinfo[i].pubconninfo, true); + + appendPQExpBuffer(query, + "SELECT s.subname, s.subslotname FROM pg_catalog.pg_subscription s " + "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) " + "WHERE d.datname = '%s'", + dbinfo[i].dbname); + 2) disconnect_database not called here, should the connection be disconnected: +drop_pre_existing_subscriptions(struct LogicalRepInfo *dbinfo) +{ + PQExpBuffer query = createPQExpBuffer(); + + for (int i = 0; i < num_dbs; i++) + { + PGconn *conn; + struct LogicalRepInfo info = dbinfo[i]; + + /* Connect to subscriber */ + conn = connect_database(info.subconninfo, false); + + for (int j = 0; j < info.num_subscriptions; j++) + { + appendPQExpBuffer(query, + "DROP SUBSCRIPTION %s;", info.pre_subnames[j]); + PQexec(conn, query->data); + resetPQExpBuffer(query); + } + } 3) Similarly here too: +static void +enable_subscirptions_on_publisher(struct LogicalRepInfo *dbinfo) +{ + PQExpBuffer query = createPQExpBuffer(); + + for (int i = 0; i < num_dbs; i++) + { + PGconn *conn; + struct LogicalRepInfo info = dbinfo[i]; + + /* Connect to publisher */ + conn = connect_database(info.pubconninfo, false); 4) them should be then here: + /* ...and them enable the subscription */ + appendPQExpBuffer(query, + "ALTER SUBSCRIPTION %s ENABLE", + info.pre_subnames[j]); + PQclear(PQexec(conn, query->data)); + resetPQExpBuffer(query); > BTW, I found that LogicalRepInfo.oid won't be used. If needed, I can create > another patch to remove the attribute. I was able to compile without this, I think this can be removed. Regards, Vignesh