A number of pg_upgrade steps require connecting to each database and running a query. When there are many databases, these steps are particularly time-consuming, especially since this is done sequentially in a single process. At a quick glance, I see the following such steps:
* create_logical_replication_slots * check_for_data_types_usage * check_for_isn_and_int8_passing_mismatch * check_for_user_defined_postfix_ops * check_for_incompatible_polymorphics * check_for_tables_with_oids * check_for_user_defined_encoding_conversions * check_old_cluster_subscription_state * get_loadable_libraries * get_db_rel_and_slot_infos * old_9_6_invalidate_hash_indexes * report_extension_updates I set out to parallelize these kinds of steps via multiple threads or processes, but I ended up realizing that we could likely achieve much of the same gain with libpq's asynchronous APIs. Specifically, both establishing the connections and running the queries can be done without blocking, so we can just loop over a handful of slots and advance a simple state machine for each. The attached is a proof-of-concept grade patch for doing this for get_db_rel_and_slot_infos(), which yielded the following results on my laptop for "pg_upgrade --link --sync-method=syncfs --jobs 8" for a cluster with 10K empty databases. total pg_upgrade_time: * HEAD: 14m 8s * patch: 10m 58s get_db_rel_and_slot_infos() on old cluster: * HEAD: 2m 45s * patch: 36s get_db_rel_and_slot_infos() on new cluster: * HEAD: 1m 46s * patch: 29s I am posting this early to get thoughts on the general approach. If we proceeded with this strategy, I'd probably create some generic tooling that each relevant step would provide a set of callback functions. -- Nathan Bossart Amazon Web Services: https://aws.amazon.com
>From 05a9903295cb3b57ca9144217e89f0aac27277b5 Mon Sep 17 00:00:00 2001 From: Nathan Bossart <nat...@postgresql.org> Date: Wed, 15 May 2024 12:07:10 -0500 Subject: [PATCH v1 1/1] parallel get relinfos --- src/bin/pg_upgrade/info.c | 266 +++++++++++++++++++++++-------- src/tools/pgindent/typedefs.list | 1 + 2 files changed, 202 insertions(+), 65 deletions(-) diff --git a/src/bin/pg_upgrade/info.c b/src/bin/pg_upgrade/info.c index 95c22a7200..bb28e262c7 100644 --- a/src/bin/pg_upgrade/info.c +++ b/src/bin/pg_upgrade/info.c @@ -11,6 +11,7 @@ #include "access/transam.h" #include "catalog/pg_class_d.h" +#include "fe_utils/string_utils.h" #include "pg_upgrade.h" static void create_rel_filename_map(const char *old_data, const char *new_data, @@ -22,13 +23,16 @@ static void report_unmatched_relation(const RelInfo *rel, const DbInfo *db, static void free_db_and_rel_infos(DbInfoArr *db_arr); static void get_template0_info(ClusterInfo *cluster); static void get_db_infos(ClusterInfo *cluster); -static void get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo); +static void start_rel_infos_query(PGconn *conn); +static void get_rel_infos_result(PGconn *conn, DbInfo *dbinfo); static void free_rel_infos(RelInfoArr *rel_arr); static void print_db_infos(DbInfoArr *db_arr); static void print_rel_infos(RelInfoArr *rel_arr); static void print_slot_infos(LogicalSlotInfoArr *slot_arr); -static void get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check); -static void get_db_subscription_count(DbInfo *dbinfo); +static void start_old_cluster_logical_slot_infos_query(PGconn *conn, bool live_check); +static void get_old_cluster_logical_slot_infos_result(PGconn *conn, DbInfo *dbinfo); +static void start_db_sub_count_query(PGconn *conn, DbInfo *dbinfo); +static void get_db_sub_count_result(PGconn *conn, DbInfo *dbinfo); /* @@ -268,6 +272,16 @@ report_unmatched_relation(const RelInfo *rel, const DbInfo *db, bool is_new_db) reloid, db->db_name, reldesc); } +typedef enum +{ + UNUSED, + CONN_STARTED, + CONNECTING, + STARTED_RELINFO_QUERY, + STARTED_LOGICAL_QUERY, + STARTED_SUBSCRIPTION_QUERY, +} InfoState; + /* * get_db_rel_and_slot_infos() * @@ -279,7 +293,12 @@ report_unmatched_relation(const RelInfo *rel, const DbInfo *db, bool is_new_db) void get_db_rel_and_slot_infos(ClusterInfo *cluster, bool live_check) { - int dbnum; + int dbnum = 0; + int dbnum_proc = 0; + InfoState *states; + int *dbs; + PGconn **conns; + int jobs = (user_opts.jobs < 1) ? 1 : user_opts.jobs; if (cluster->dbarr.dbs != NULL) free_db_and_rel_infos(&cluster->dbarr); @@ -287,20 +306,103 @@ get_db_rel_and_slot_infos(ClusterInfo *cluster, bool live_check) get_template0_info(cluster); get_db_infos(cluster); - for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++) - { - DbInfo *pDbInfo = &cluster->dbarr.dbs[dbnum]; + states = (InfoState *) pg_malloc(sizeof(InfoState *) * jobs); + dbs = (int *) pg_malloc(sizeof(int) * jobs); + conns = (PGconn **) pg_malloc(sizeof(PGconn *) * jobs); - get_rel_infos(cluster, pDbInfo); + for (int i = 0; i < jobs; i++) + states[i] = UNUSED; - /* - * Retrieve the logical replication slots infos and the subscriptions - * count for the old cluster. - */ - if (cluster == &old_cluster) + while (dbnum < cluster->dbarr.ndbs) + { + for (int i = 0; i < jobs; i++) { - get_old_cluster_logical_slot_infos(pDbInfo, live_check); - get_db_subscription_count(pDbInfo); + switch (states[i]) + { + case UNUSED: + if (dbnum_proc < cluster->dbarr.ndbs) + { + PQExpBufferData conn_opts; + + dbs[i] = dbnum_proc++; + + /* Build connection string with proper quoting */ + initPQExpBuffer(&conn_opts); + appendPQExpBufferStr(&conn_opts, "dbname="); + appendConnStrVal(&conn_opts, cluster->dbarr.dbs[dbs[i]].db_name); + appendPQExpBufferStr(&conn_opts, " user="); + appendConnStrVal(&conn_opts, os_info.user); + appendPQExpBuffer(&conn_opts, " port=%d", cluster->port); + if (cluster->sockdir) + { + appendPQExpBufferStr(&conn_opts, " host="); + appendConnStrVal(&conn_opts, cluster->sockdir); + } + + conns[i] = PQconnectStart(conn_opts.data); + termPQExpBuffer(&conn_opts); + states[i] = CONNECTING; + } + break; + case CONNECTING: + if (PQconnectPoll(conns[i]) == PGRES_POLLING_FAILED) + { + pg_log(PG_REPORT, "%s", PQerrorMessage(conns[i])); + exit(1); + } + if (PQconnectPoll(conns[i]) == PGRES_POLLING_OK) + states[i] = CONN_STARTED; + break; + case CONN_STARTED: + if (PQstatus(conns[i]) == CONNECTION_OK) + { + start_rel_infos_query(conns[i]); + states[i] = STARTED_RELINFO_QUERY; + } + break; + case STARTED_RELINFO_QUERY: + if (PQisBusy(conns[i])) + PQconsumeInput(conns[i]); + else + { + get_rel_infos_result(conns[i], &cluster->dbarr.dbs[dbs[i]]); + + if (cluster == &old_cluster && + GET_MAJOR_VERSION(old_cluster.major_version) >= 1700) + { + start_old_cluster_logical_slot_infos_query(conns[i], live_check); + states[i] = STARTED_LOGICAL_QUERY; + } + else + { + dbnum++; + PQfinish(conns[i]); + states[i] = UNUSED; + } + } + break; + case STARTED_LOGICAL_QUERY: + if (PQisBusy(conns[i])) + PQconsumeInput(conns[i]); + else + { + get_old_cluster_logical_slot_infos_result(conns[i], &cluster->dbarr.dbs[dbs[i]]); + start_db_sub_count_query(conns[i], &cluster->dbarr.dbs[dbs[i]]); + states[i] = STARTED_SUBSCRIPTION_QUERY; + } + break; + case STARTED_SUBSCRIPTION_QUERY: + if (PQisBusy(conns[i])) + PQconsumeInput(conns[i]); + else + { + get_db_sub_count_result(conns[i], &cluster->dbarr.dbs[dbs[i]]); + dbnum++; + PQfinish(conns[i]); + states[i] = UNUSED; + } + break; + } } } @@ -450,29 +552,9 @@ get_db_infos(ClusterInfo *cluster) * This allows later processing to match up old and new databases efficiently. */ static void -get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo) +start_rel_infos_query(PGconn *conn) { - PGconn *conn = connectToServer(cluster, - dbinfo->db_name); - PGresult *res; - RelInfo *relinfos; - int ntups; - int relnum; - int num_rels = 0; - char *nspname = NULL; - char *relname = NULL; - char *tablespace = NULL; - int i_spclocation, - i_nspname, - i_relname, - i_reloid, - i_indtable, - i_toastheap, - i_relfilenumber, - i_reltablespace; char query[QUERY_ALLOC]; - char *last_namespace = NULL, - *last_tablespace = NULL; query[0] = '\0'; /* initialize query string to empty */ @@ -552,7 +634,38 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo) " ON c.reltablespace = t.oid " "ORDER BY 1;"); - res = executeQueryOrDie(conn, "%s", query); + if (PQsendQuery(conn, query) == 0) + { + /* TODO: fail */ + } +} + +static void +get_rel_infos_result(PGconn *conn, DbInfo *dbinfo) +{ + PGresult *res = PQgetResult(conn); + RelInfo *relinfos; + int ntups; + int relnum; + int num_rels = 0; + char *nspname = NULL; + char *relname = NULL; + char *tablespace = NULL; + int i_spclocation, + i_nspname, + i_relname, + i_reloid, + i_indtable, + i_toastheap, + i_relfilenumber, + i_reltablespace; + char *last_namespace = NULL, + *last_tablespace = NULL; + + if (PQgetResult(conn) != NULL) + { + /* TODO: fail */ + } ntups = PQntuples(res); @@ -622,8 +735,6 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo) } PQclear(res); - PQfinish(conn); - dbinfo->rel_arr.rels = relinfos; dbinfo->rel_arr.nrels = num_rels; } @@ -645,19 +756,14 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo) * are included. */ static void -get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check) +start_old_cluster_logical_slot_infos_query(PGconn *conn, bool live_check) { - PGconn *conn; - PGresult *res; - LogicalSlotInfo *slotinfos = NULL; - int num_slots; + char query[QUERY_ALLOC]; /* Logical slots can be migrated since PG17. */ if (GET_MAJOR_VERSION(old_cluster.major_version) <= 1600) return; - conn = connectToServer(&old_cluster, dbinfo->db_name); - /* * Fetch the logical replication slot information. The check whether the * slot is considered caught up is done by an upgrade function. This @@ -675,16 +781,34 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check) * started and stopped several times causing any temporary slots to be * removed. */ - res = executeQueryOrDie(conn, "SELECT slot_name, plugin, two_phase, failover, " - "%s as caught_up, invalidation_reason IS NOT NULL as invalid " - "FROM pg_catalog.pg_replication_slots " - "WHERE slot_type = 'logical' AND " - "database = current_database() AND " - "temporary IS FALSE;", - live_check ? "FALSE" : - "(CASE WHEN invalidation_reason IS NOT NULL THEN FALSE " - "ELSE (SELECT pg_catalog.binary_upgrade_logical_slot_has_caught_up(slot_name)) " - "END)"); + snprintf(query, sizeof(query), "SELECT slot_name, plugin, two_phase, failover, " + "%s as caught_up, invalidation_reason IS NOT NULL as invalid " + "FROM pg_catalog.pg_replication_slots " + "WHERE slot_type = 'logical' AND " + "database = current_database() AND " + "temporary IS FALSE;", + live_check ? "FALSE" : + "(CASE WHEN invalidation_reason IS NOT NULL THEN FALSE " + "ELSE (SELECT pg_catalog.binary_upgrade_logical_slot_has_caught_up(slot_name)) " + "END)"); + + if (PQsendQuery(conn, query) == 0) + { + /* TODO: fail */ + } +} + +static void +get_old_cluster_logical_slot_infos_result(PGconn *conn, DbInfo *dbinfo) +{ + PGresult *res = PQgetResult(conn); + LogicalSlotInfo *slotinfos = NULL; + int num_slots; + + if (PQgetResult(conn) != NULL) + { + /* TODO: fail */ + } num_slots = PQntuples(res); @@ -720,7 +844,6 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check) } PQclear(res); - PQfinish(conn); dbinfo->slot_arr.slots = slotinfos; dbinfo->slot_arr.nslots = num_slots; @@ -757,23 +880,36 @@ count_old_cluster_logical_slots(void) * not be able to upgrade the logical replication clusters completely. */ static void -get_db_subscription_count(DbInfo *dbinfo) +start_db_sub_count_query(PGconn *conn, DbInfo *dbinfo) { - PGconn *conn; - PGresult *res; + char query[QUERY_ALLOC]; /* Subscriptions can be migrated since PG17. */ if (GET_MAJOR_VERSION(old_cluster.major_version) < 1700) return; - conn = connectToServer(&old_cluster, dbinfo->db_name); - res = executeQueryOrDie(conn, "SELECT count(*) " - "FROM pg_catalog.pg_subscription WHERE subdbid = %u", - dbinfo->db_oid); + snprintf(query, sizeof(query), "SELECT count(*) " + "FROM pg_catalog.pg_subscription WHERE subdbid = %u", + dbinfo->db_oid); + if (PQsendQuery(conn, query) == 0) + { + /* TODO: fail */ + } +} + +static void +get_db_sub_count_result(PGconn *conn, DbInfo *dbinfo) +{ + PGresult *res = PQgetResult(conn); + + if (PQgetResult(conn) != NULL) + { + /* TODO: fail */ + } + dbinfo->nsubs = atoi(PQgetvalue(res, 0, 0)); PQclear(res); - PQfinish(conn); } /* diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 2b83c340fb..015019b18d 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1225,6 +1225,7 @@ IndxInfo InferClause InferenceElem InfoItem +InfoState InhInfo InheritableSocket InitSampleScan_function -- 2.25.1