On Sat, Aug 31, 2024 at 01:18:10AM +0100, Ilya Gladyshev wrote:
> LGTM in general, but here are some final nitpicks:
Thanks for reviewing.
> + if (maxFd != 0)
> + (void) select(maxFd + 1, &input_mask, &output_mask,
> &except_mask, NULL);
>
> It´s a good idea to check for the return value of select, in case it
> returns any errors.
Done.
> + dbs_complete++;
> + (void) PQgetResult(slot->conn);
> + PQfinish(slot->conn);
>
> Perhaps it´s rather for me to understand, what does PQgetResult call do
> here?
I believe I was trying to follow the guidance that you should always call
PQgetResult() until it returns NULL, but looking again, I don't see any
need to call it since we free the connection immediately afterwards.
> + /* Check for connection failure. */
> + if (PQconnectPoll(slot->conn) == PGRES_POLLING_FAILED)
> + pg_fatal("connection failure: %s",
> PQerrorMessage(slot->conn));
> +
> + /* Check whether the connection is still establishing.
> */
> + if (PQconnectPoll(slot->conn) != PGRES_POLLING_OK)
> + return;
>
> Are the two consecutive calls of PQconnectPoll intended here? Seems a bit
> wasteful, but maybe that´s ok.
I think we can actually just use PQstatus() here. But furthermore, I think
the way I was initiating connections was completely bogus. IIUC before
calling PQconnectPoll() the first time, we should wait for a write
indicator from select(), and then we should only call PQconnectPoll() after
subsequent indicators from select(). After spending quite a bit of time
staring at the PQconnectPoll() code, I'm quite surprised I haven't seen any
problems thus far. If I had to guess, I'd say that either PQconnectPoll()
is more resilient than I think it is, or I've just gotten lucky because
pg_upgrade establishes connections quickly.
Anyway, to fix this, I've added some more fields to the slot struct to
keep track of the information we need to properly establish connections,
and we now pay careful attention to the return value of select() so that we
know which slots are ready for processing. This seemed like a nice little
optimization independent of fixing connection establishment. I was worried
this was going to require a lot more code, but I think it only added ~50
lines or so.
> We should probably mention this change in the docs as well, I found these
> two places that I think can be improved:
I've adjusted the documentation in v11.
--
nathan
>From 02a9eb5bb035f5fdaf1c165161d96695dec06d45 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <[email protected]>
Date: Wed, 28 Aug 2024 10:45:59 -0500
Subject: [PATCH v11 01/11] Introduce framework for parallelizing various
pg_upgrade tasks.
A number of pg_upgrade steps require connecting to every database
in the cluster and running the same query in each one. When there
are many databases, these steps are particularly time-consuming,
especially since these steps are performed sequentially in a single
process.
This commit introduces a new framework that makes it easy to
parallelize most of these once-in-each-database tasks.
Specifically, it manages a simple state machine of slots and uses
libpq's asynchronous APIs to establish the connections and run the
queries. The --jobs option is used to determine the number of
slots to use. To use this new task framework, callers simply need
to provide the query and a callback function to process its
results, and the framework takes care of the rest. A more complete
description is provided at the top of the new task.c file.
None of the eligible once-in-each-database tasks are converted to
use this new framework in this commit. That will be done via
several follow-up commits.
Reviewed-by: Jeff Davis, Robert Haas, Daniel Gustafsson, Ilya Gladyshev, Corey
Huinker
Discussion: https://postgr.es/m/20240516211638.GA1688936%40nathanxps13
---
doc/src/sgml/ref/pgupgrade.sgml | 6 +-
src/bin/pg_upgrade/Makefile | 1 +
src/bin/pg_upgrade/meson.build | 1 +
src/bin/pg_upgrade/pg_upgrade.h | 21 ++
src/bin/pg_upgrade/task.c | 491 +++++++++++++++++++++++++++++++
src/tools/pgindent/typedefs.list | 5 +
6 files changed, 522 insertions(+), 3 deletions(-)
create mode 100644 src/bin/pg_upgrade/task.c
diff --git a/doc/src/sgml/ref/pgupgrade.sgml b/doc/src/sgml/ref/pgupgrade.sgml
index 9877f2f01c..fc2d0ff845 100644
--- a/doc/src/sgml/ref/pgupgrade.sgml
+++ b/doc/src/sgml/ref/pgupgrade.sgml
@@ -118,7 +118,7 @@ PostgreSQL documentation
<varlistentry>
<term><option>-j <replaceable
class="parameter">njobs</replaceable></option></term>
<term><option>--jobs=<replaceable
class="parameter">njobs</replaceable></option></term>
- <listitem><para>number of simultaneous processes or threads to use
+ <listitem><para>number of simultaneous connections and processes/threads
to use
</para></listitem>
</varlistentry>
@@ -587,8 +587,8 @@ NET STOP postgresql-&majorversion;
<para>
The <option>--jobs</option> option allows multiple CPU cores to be used
- for copying/linking of files and to dump and restore database schemas
- in parallel; a good place to start is the maximum of the number of
+ for copying/linking of files, dumping and restoring database schemas
+ in parallel, etc.; a good place to start is the maximum of the number of
CPU cores and tablespaces. This option can dramatically reduce the
time to upgrade a multi-database server running on a multiprocessor
machine.
diff --git a/src/bin/pg_upgrade/Makefile b/src/bin/pg_upgrade/Makefile
index bde91e2beb..f83d2b5d30 100644
--- a/src/bin/pg_upgrade/Makefile
+++ b/src/bin/pg_upgrade/Makefile
@@ -25,6 +25,7 @@ OBJS = \
relfilenumber.o \
server.o \
tablespace.o \
+ task.o \
util.o \
version.o
diff --git a/src/bin/pg_upgrade/meson.build b/src/bin/pg_upgrade/meson.build
index 9825fa3305..3d88419674 100644
--- a/src/bin/pg_upgrade/meson.build
+++ b/src/bin/pg_upgrade/meson.build
@@ -14,6 +14,7 @@ pg_upgrade_sources = files(
'relfilenumber.c',
'server.c',
'tablespace.c',
+ 'task.c',
'util.c',
'version.c',
)
diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h
index cdb6e2b759..53f693c2d4 100644
--- a/src/bin/pg_upgrade/pg_upgrade.h
+++ b/src/bin/pg_upgrade/pg_upgrade.h
@@ -494,3 +494,24 @@ void parallel_transfer_all_new_dbs(DbInfoArr
*old_db_arr, DbInfoArr *new_db_arr
char *old_pgdata, char *new_pgdata,
char *old_tablespace);
bool reap_child(bool wait_for_child);
+
+/* task.c */
+
+typedef void (*UpgradeTaskProcessCB) (DbInfo *dbinfo, PGresult *res, void
*arg);
+
+/* struct definition is private to task.c */
+typedef struct UpgradeTask UpgradeTask;
+
+UpgradeTask *upgrade_task_create(void);
+void upgrade_task_add_step(UpgradeTask *task, const char *query,
+
UpgradeTaskProcessCB process_cb, bool free_result,
+ void *arg);
+void upgrade_task_run(const UpgradeTask *task, const ClusterInfo
*cluster);
+void upgrade_task_free(UpgradeTask *task);
+
+/* convenient type for common private data needed by several tasks */
+typedef struct
+{
+ FILE *file;
+ char path[MAXPGPATH];
+} UpgradeTaskReport;
diff --git a/src/bin/pg_upgrade/task.c b/src/bin/pg_upgrade/task.c
new file mode 100644
index 0000000000..99d9b9d118
--- /dev/null
+++ b/src/bin/pg_upgrade/task.c
@@ -0,0 +1,491 @@
+/*
+ * task.c
+ * framework for parallelizing pg_upgrade's once-in-each-database
tasks
+ *
+ * This framework provides an efficient way of running the various
+ * once-in-each-database tasks required by pg_upgrade. Specifically, it
+ * parallelizes these tasks by managing a simple state machine of
+ * user_opts.jobs slots and using libpq's asynchronous APIs to establish the
+ * connections and run the queries. Callers simply need to create a callback
+ * function and build/execute an UpgradeTask. A simple example follows:
+ *
+ * static void
+ * my_process_cb(DbInfo *dbinfo, PGresult *res, void *arg)
+ * {
+ * for (int i = 0; i < PQntuples(res); i++)
+ * {
+ * ... process results ...
+ * }
+ * }
+ *
+ * void
+ * my_task(ClusterInfo *cluster)
+ * {
+ * UpgradeTask *task = upgrade_task_create();
+ *
+ * upgrade_task_add_step(task,
+ * "... query
text ...",
+ * my_process_cb,
+ * true,
// let the task free the PGresult
+ * NULL);
// "arg" pointer for callback
+ * upgrade_task_run(task, cluster);
+ * upgrade_task_free(task);
+ * }
+ *
+ * Note that multiple steps can be added to a given task. When there are
+ * multiple steps, the task will run all of the steps consecutively in the same
+ * database connection before freeing the connection and moving on. In other
+ * words, it only ever initiates one connection to each database in the
+ * cluster for a given run.
+ *
+ * Copyright (c) 2024, PostgreSQL Global Development Group
+ * src/bin/pg_upgrade/task.c
+ */
+
+#include "postgres_fe.h"
+
+#include "common/connect.h"
+#include "fe_utils/string_utils.h"
+#include "pg_upgrade.h"
+
+/*
+ * dbs_complete stores the number of databases that we have completed
+ * processing. When this value equals the number of databases in the cluster,
+ * the task is finished.
+ */
+static int dbs_complete;
+
+/*
+ * dbs_processing stores the index of the next database in the cluster's array
+ * of databases that will be picked up for processing. It will always be
+ * greater than or equal to dbs_complete.
+ */
+static int dbs_processing;
+
+/*
+ * This struct stores all the information for a single step of a task. All
+ * steps in a task are run in a single connection before moving on to the next
+ * database (which requires a new connection).
+ */
+typedef struct UpgradeTaskStep
+{
+ UpgradeTaskProcessCB process_cb; /* processes the results of the
query */
+ const char *query; /* query text */
+ bool free_result; /* should we free the result? */
+ void *arg; /* pointer passed to process_cb
*/
+} UpgradeTaskStep;
+
+/*
+ * This struct is a thin wrapper around an array of steps, i.e.,
+ * UpgradeTaskStep.
+ */
+typedef struct UpgradeTask
+{
+ UpgradeTaskStep *steps;
+ int num_steps;
+} UpgradeTask;
+
+/*
+ * The different states for a parallel slot.
+ */
+typedef enum
+{
+ FREE, /* slot available for
use in a new database */
+ CONNECTING, /* waiting for
connection to be established */
+ RUNNING_QUERIES, /* running/processing queries
in the task */
+} UpgradeTaskSlotState;
+
+/*
+ * We maintain an array of user_opts.jobs slots to execute the task.
+ */
+typedef struct
+{
+ UpgradeTaskSlotState state; /* state of the slot */
+ int db_idx; /* index of the
database assigned to slot */
+ int step_idx; /* index of the current
step of task */
+ PGconn *conn; /* current connection managed
by slot */
+ bool ready; /* slot is ready for processing
*/
+ bool select_mode; /* select() mode: true->read,
false->write */
+ int sock; /* file descriptor for
connection's socket */
+} UpgradeTaskSlot;
+
+/*
+ * Initializes an UpgradeTask.
+ */
+UpgradeTask *
+upgrade_task_create(void)
+{
+ UpgradeTask *task = pg_malloc0(sizeof(UpgradeTask));
+
+ /* All tasks must first set a secure search_path. */
+ upgrade_task_add_step(task, ALWAYS_SECURE_SEARCH_PATH_SQL, NULL, true,
NULL);
+ return task;
+}
+
+/*
+ * Frees all storage associated with an UpgradeTask.
+ */
+void
+upgrade_task_free(UpgradeTask *task)
+{
+ if (task->steps)
+ pg_free(task->steps);
+
+ pg_free(task);
+}
+
+/*
+ * Adds a step to an UpgradeTask. The steps will be executed in each database
+ * in the order in which they are added.
+ *
+ * task: task object that must have been initialized via
upgrade_task_create()
+ * query: the query text
+ * process_cb: function that processes the results of the query
+ * free_result: should we free the PGresult, or leave it to the caller?
+ * arg: pointer to task-specific data that is passed to each callback
+ */
+void
+upgrade_task_add_step(UpgradeTask *task, const char *query,
+ UpgradeTaskProcessCB process_cb, bool
free_result,
+ void *arg)
+{
+ UpgradeTaskStep *new_step;
+
+ task->steps = pg_realloc(task->steps,
+ ++task->num_steps *
sizeof(UpgradeTaskStep));
+
+ new_step = &task->steps[task->num_steps - 1];
+ new_step->process_cb = process_cb;
+ new_step->query = query;
+ new_step->free_result = free_result;
+ new_step->arg = arg;
+}
+
+/*
+ * Build a connection string for the slot's current database and asynchronously
+ * start a new connection, but do not wait for the connection to be
+ * established.
+ */
+static void
+start_conn(const ClusterInfo *cluster, UpgradeTaskSlot *slot)
+{
+ PQExpBufferData conn_opts;
+ DbInfo *dbinfo = &cluster->dbarr.dbs[slot->db_idx];
+
+ /* Build connection string with proper quoting */
+ initPQExpBuffer(&conn_opts);
+ appendPQExpBufferStr(&conn_opts, "dbname=");
+ appendConnStrVal(&conn_opts, dbinfo->db_name);
+ appendPQExpBufferStr(&conn_opts, " user=");
+ appendConnStrVal(&conn_opts, os_info.user);
+ appendPQExpBuffer(&conn_opts, " port=%d", cluster->port);
+ if (cluster->sockdir)
+ {
+ appendPQExpBufferStr(&conn_opts, " host=");
+ appendConnStrVal(&conn_opts, cluster->sockdir);
+ }
+
+ slot->conn = PQconnectStart(conn_opts.data);
+
+ if (!slot->conn)
+ pg_fatal("failed to create connection with connection string:
\"%s\"",
+ conn_opts.data);
+
+ termPQExpBuffer(&conn_opts);
+}
+
+/*
+ * Run the process_cb callback function to process the result of a query, and
+ * free the result if the caller indicated we should do so.
+ */
+static void
+process_query_result(const ClusterInfo *cluster, UpgradeTaskSlot *slot,
+ const UpgradeTask *task)
+{
+ UpgradeTaskStep *steps = &task->steps[slot->step_idx];
+ UpgradeTaskProcessCB process_cb = steps->process_cb;
+ DbInfo *dbinfo = &cluster->dbarr.dbs[slot->db_idx];
+ PGresult *res = PQgetResult(slot->conn);
+
+ if (PQstatus(slot->conn) == CONNECTION_BAD ||
+ (PQresultStatus(res) != PGRES_TUPLES_OK &&
+ PQresultStatus(res) != PGRES_COMMAND_OK))
+ pg_fatal("connection failure: %s", PQerrorMessage(slot->conn));
+
+ /*
+ * We assume that a NULL process_cb callback function means there's
+ * nothing to process. This is primarily intended for the inital step
in
+ * every task that sets a safe search_path.
+ */
+ if (process_cb)
+ (*process_cb) (dbinfo, res, steps->arg);
+
+ if (steps->free_result)
+ PQclear(res);
+}
+
+/*
+ * Advances the state machine for a given slot as necessary.
+ */
+static void
+process_slot(const ClusterInfo *cluster, UpgradeTaskSlot *slot, const
UpgradeTask *task)
+{
+ if (!slot->ready)
+ return;
+
+ switch (slot->state)
+ {
+ case FREE:
+
+ /*
+ * If all of the databases in the cluster have been
processed or
+ * are currently being processed by other slots, we are
done.
+ */
+ if (dbs_processing >= cluster->dbarr.ndbs)
+ return;
+
+ /*
+ * Claim the next database in the cluster's array and
initiate a
+ * new connection.
+ */
+ slot->db_idx = dbs_processing++;
+ slot->state = CONNECTING;
+ start_conn(cluster, slot);
+
+ return;
+
+ case CONNECTING:
+
+ {
+ ConnStatusType status = PQstatus(slot->conn);
+
+ /* Check for connection failure. */
+ if (status == CONNECTION_BAD)
+ pg_fatal("connection failure: %s",
PQerrorMessage(slot->conn));
+
+ /* Check whether the connection is still
establishing. */
+ if (status != CONNECTION_OK)
+ return;
+ }
+
+ /*
+ * Move on to running/processing the queries in the
task. We
+ * combine all the queries and send them to the server
together.
+ */
+ slot->state = RUNNING_QUERIES;
+
+ {
+ PQExpBuffer queries = createPQExpBuffer();
+
+ for (int i = 0; i < task->num_steps; i++)
+ appendPQExpBuffer(queries, "%s;",
task->steps[i].query);
+ if (!PQsendQuery(slot->conn, queries->data))
+ pg_fatal("connection failure: %s",
PQerrorMessage(slot->conn));
+ destroyPQExpBuffer(queries);
+ }
+
+ return;
+
+ case RUNNING_QUERIES:
+
+ /*
+ * Consume any available data and clear the read-ready
indicator
+ * for the connection.
+ */
+ if (!PQconsumeInput(slot->conn))
+ pg_fatal("connection failure: %s",
PQerrorMessage(slot->conn));
+
+ /*
+ * Process any results that are ready so that we can
free up this
+ * slot for another database as soon as possible.
+ */
+ for (; slot->step_idx < task->num_steps;
slot->step_idx++)
+ {
+ /* If no more results are available yet, move
on. */
+ if (PQisBusy(slot->conn))
+ return;
+
+ process_query_result(cluster, slot, task);
+ }
+
+ /*
+ * If we just finished processing the result of the
last step in
+ * the task, free the slot. We recursively call this
function on
+ * the newly-freed slot so that we can start initiating
the next
+ * connection immediately instead of waiting for the
next loop
+ * through the slots.
+ */
+ dbs_complete++;
+ PQfinish(slot->conn);
+ memset(slot, 0, sizeof(UpgradeTaskSlot));
+ slot->ready = true;
+
+ process_slot(cluster, slot, task);
+
+ return;
+ }
+}
+
+/*
+ * Returns -1 on error, else the number of ready descriptors.
+ */
+static int
+select_loop(int maxFd, fd_set *input, fd_set *output, bool nowait)
+{
+ fd_set save_input = *input;
+ fd_set save_output = *output;
+ struct timeval timeout = {0, 0};
+
+ if (maxFd == 0)
+ return 0;
+
+ for (;;)
+ {
+ int i;
+
+ *input = save_input;
+ *output = save_output;
+
+ i = select(maxFd + 1, input, output, NULL, nowait ? &timeout :
NULL);
+
+#ifndef WIN32
+ if (i < 0 && errno == EINTR)
+ continue;
+#else
+ if (i == SOCKET_ERROR && WSAGetLastError() == WSAEINTR)
+ continue;
+#endif
+ return i;
+ }
+}
+
+/*
+ * Wait on the slots to either finish connecting or to receive query results if
+ * possible. This avoids a tight loop in upgrade_task_run().
+ */
+static void
+wait_on_slots(UpgradeTaskSlot *slots, int numslots)
+{
+ fd_set input;
+ fd_set output;
+ int maxFd = 0;
+ bool skip_wait = false;
+
+ FD_ZERO(&input);
+ FD_ZERO(&output);
+
+ for (int i = 0; i < numslots; i++)
+ {
+ switch (slots[i].state)
+ {
+ case FREE:
+
+ /*
+ * This function should only ever see free
slots as we are
+ * finishing processing the last few databases,
at which point
+ * we don't have any databases left for them to
process. We'll
+ * never use these slots again, so we can
safely ignore them.
+ */
+ slots[i].ready = false;
+ continue;
+
+ case CONNECTING:
+
+ /*
+ * Don't call PQconnectPoll() again for this
slot until
+ * select() tells us something is ready. Be
sure to use the
+ * previous poll mode in this case.
+ */
+ if (!slots[i].ready)
+ break;
+
+ /*
+ * If we are waiting for the connection to
establish, choose
+ * whether to wait for reading or for writing
on the socket as
+ * appropriate. If neither apply, mark the
slot as ready and
+ * skip waiting so that it is handled ASAP (we
assume this
+ * means the connection is either bad or fully
ready).
+ */
+ {
+ PostgresPollingStatusType status;
+
+ status = PQconnectPoll(slots[i].conn);
+ if (status == PGRES_POLLING_READING)
+ slots[i].select_mode = true;
+ else if (status ==
PGRES_POLLING_WRITING)
+ slots[i].select_mode = false;
+ else
+ {
+ slots[i].ready = true;
+ skip_wait = true;
+ continue;
+ }
+ }
+
+ break;
+
+ case RUNNING_QUERIES:
+
+ /*
+ * Once we've sent the queries, we must wait
for the socket to
+ * be read-ready. Note that process_slot()
handles calling
+ * PQconsumeInput() as required.
+ */
+ slots[i].select_mode = true;
+ break;
+ }
+
+ /*
+ * Add the socket to the set.
+ */
+ slots[i].ready = false;
+ slots[i].sock = PQsocket(slots[i].conn);
+ if (slots[i].sock < 0)
+ pg_fatal("invalid socket");
+ FD_SET(slots[i].sock, slots[i].select_mode ? &input : &output);
+ maxFd = Max(maxFd, slots[i].sock);
+ }
+
+ /*
+ * If we found socket(s) to wait on, wait.
+ */
+ if (select_loop(maxFd, &input, &output, skip_wait) == -1)
+ pg_fatal("select() failed: %m");
+
+ /*
+ * Mark which sockets appear to be ready.
+ */
+ for (int i = 0; i < numslots; i++)
+ slots[i].ready |= (FD_ISSET(slots[i].sock, &input) ||
+ FD_ISSET(slots[i].sock,
&output));
+}
+
+/*
+ * Runs all the steps of the task in every database in the cluster using
+ * user_opts.jobs parallel slots.
+ */
+void
+upgrade_task_run(const UpgradeTask *task, const ClusterInfo *cluster)
+{
+ int jobs = Max(1, user_opts.jobs);
+ UpgradeTaskSlot *slots = pg_malloc0(sizeof(UpgradeTaskSlot) * jobs);
+
+ dbs_complete = 0;
+ dbs_processing = 0;
+
+ /*
+ * Process every slot the first time round.
+ */
+ for (int i = 0; i < jobs; i++)
+ slots[i].ready = true;
+
+ while (dbs_complete < cluster->dbarr.ndbs)
+ {
+ for (int i = 0; i < jobs; i++)
+ process_slot(cluster, &slots[i], task);
+
+ wait_on_slots(slots, jobs);
+ }
+
+ pg_free(slots);
+}
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 9e951a9e6f..43c0f9f85b 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -3039,6 +3039,11 @@ UnresolvedTup
UnresolvedTupData
UpdateContext
UpdateStmt
+UpgradeTask
+UpgradeTaskReport
+UpgradeTaskSlot
+UpgradeTaskSlotState
+UpgradeTaskStep
UploadManifestCmd
UpperRelationKind
UpperUniquePath
--
2.39.3 (Apple Git-146)
>From 07aca3723cbc027690ef383a25fd2e3f2020b934 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <[email protected]>
Date: Fri, 28 Jun 2024 17:21:19 -0500
Subject: [PATCH v11 02/11] Use pg_upgrade's new parallel framework for
subscription checks.
Reviewed-by: FIXME
Discussion: https://postgr.es/m/20240516211638.GA1688936%40nathanxps13
---
src/bin/pg_upgrade/check.c | 206 ++++++++++++++++++++-----------------
1 file changed, 111 insertions(+), 95 deletions(-)
diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index 96adea41e9..f8160e0140 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -1905,6 +1905,38 @@ check_old_cluster_for_valid_slots(void)
check_ok();
}
+/*
+ * Callback function for processing results of query for
+ * check_old_cluster_subscription_state()'s UpgradeTask. If the query returned
+ * any rows (i.e., the check failed), write the details to the report file.
+ */
+static void
+process_old_sub_state_check(DbInfo *dbinfo, PGresult *res, void *arg)
+{
+ UpgradeTaskReport *report = (UpgradeTaskReport *) arg;
+ int ntup = PQntuples(res);
+ int i_srsubstate = PQfnumber(res, "srsubstate");
+ int i_subname = PQfnumber(res, "subname");
+ int i_nspname = PQfnumber(res, "nspname");
+ int i_relname = PQfnumber(res, "relname");
+
+ AssertVariableIsOfType(&process_old_sub_state_check,
UpgradeTaskProcessCB);
+
+ for (int i = 0; i < ntup; i++)
+ {
+ if (report->file == NULL &&
+ (report->file = fopen_priv(report->path, "w")) == NULL)
+ pg_fatal("could not open file \"%s\": %m",
report->path);
+
+ fprintf(report->file, "The table sync state \"%s\" is not
allowed for database:\"%s\" subscription:\"%s\" schema:\"%s\"
relation:\"%s\"\n",
+ PQgetvalue(res, i, i_srsubstate),
+ dbinfo->db_name,
+ PQgetvalue(res, i, i_subname),
+ PQgetvalue(res, i, i_nspname),
+ PQgetvalue(res, i, i_relname));
+ }
+}
+
/*
* check_old_cluster_subscription_state()
*
@@ -1915,115 +1947,99 @@ check_old_cluster_for_valid_slots(void)
static void
check_old_cluster_subscription_state(void)
{
- FILE *script = NULL;
- char output_path[MAXPGPATH];
+ UpgradeTask *task = upgrade_task_create();
+ UpgradeTaskReport report;
+ const char *query;
+ PGresult *res;
+ PGconn *conn;
int ntup;
prep_status("Checking for subscription state");
- snprintf(output_path, sizeof(output_path), "%s/%s",
+ report.file = NULL;
+ snprintf(report.path, sizeof(report.path), "%s/%s",
log_opts.basedir,
"subs_invalid.txt");
- for (int dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++)
- {
- PGresult *res;
- DbInfo *active_db = &old_cluster.dbarr.dbs[dbnum];
- PGconn *conn = connectToServer(&old_cluster,
active_db->db_name);
-
- /* We need to check for pg_replication_origin only once. */
- if (dbnum == 0)
- {
- /*
- * Check that all the subscriptions have their
respective
- * replication origin.
- */
- res = executeQueryOrDie(conn,
- "SELECT
d.datname, s.subname "
- "FROM
pg_catalog.pg_subscription s "
- "LEFT
OUTER JOIN pg_catalog.pg_replication_origin o "
- "
ON o.roname = 'pg_' || s.oid "
- "INNER
JOIN pg_catalog.pg_database d "
- "
ON d.oid = s.subdbid "
- "WHERE
o.roname IS NULL;");
-
- ntup = PQntuples(res);
- for (int i = 0; i < ntup; i++)
- {
- if (script == NULL && (script =
fopen_priv(output_path, "w")) == NULL)
- pg_fatal("could not open file \"%s\":
%m", output_path);
- fprintf(script, "The replication origin is
missing for database:\"%s\" subscription:\"%s\"\n",
- PQgetvalue(res, i, 0),
- PQgetvalue(res, i, 1));
- }
- PQclear(res);
- }
-
- /*
- * We don't allow upgrade if there is a risk of dangling slot or
- * origin corresponding to initial sync after upgrade.
- *
- * A slot/origin not created yet refers to the 'i' (initialize)
state,
- * while 'r' (ready) state refers to a slot/origin created
previously
- * but already dropped. These states are supported for
pg_upgrade. The
- * other states listed below are not supported:
- *
- * a) SUBREL_STATE_DATASYNC: A relation upgraded while in this
state
- * would retain a replication slot, which could not be dropped
by the
- * sync worker spawned after the upgrade because the
subscription ID
- * used for the slot name won't match anymore.
- *
- * b) SUBREL_STATE_SYNCDONE: A relation upgraded while in this
state
- * would retain the replication origin when there is a failure
in
- * tablesync worker immediately after dropping the replication
slot in
- * the publisher.
- *
- * c) SUBREL_STATE_FINISHEDCOPY: A tablesync worker spawned to
work on
- * a relation upgraded while in this state would expect an
origin ID
- * with the OID of the subscription used before the upgrade,
causing
- * it to fail.
- *
- * d) SUBREL_STATE_SYNCWAIT, SUBREL_STATE_CATCHUP and
- * SUBREL_STATE_UNKNOWN: These states are not stored in the
catalog,
- * so we need not allow these states.
- */
- res = executeQueryOrDie(conn,
- "SELECT
r.srsubstate, s.subname, n.nspname, c.relname "
- "FROM
pg_catalog.pg_subscription_rel r "
- "LEFT JOIN
pg_catalog.pg_subscription s"
- " ON
r.srsubid = s.oid "
- "LEFT JOIN
pg_catalog.pg_class c"
- " ON
r.srrelid = c.oid "
- "LEFT JOIN
pg_catalog.pg_namespace n"
- " ON
c.relnamespace = n.oid "
- "WHERE
r.srsubstate NOT IN ('i', 'r') "
- "ORDER BY
s.subname");
-
- ntup = PQntuples(res);
- for (int i = 0; i < ntup; i++)
- {
- if (script == NULL && (script = fopen_priv(output_path,
"w")) == NULL)
- pg_fatal("could not open file \"%s\": %m",
output_path);
-
- fprintf(script, "The table sync state \"%s\" is not
allowed for database:\"%s\" subscription:\"%s\" schema:\"%s\"
relation:\"%s\"\n",
- PQgetvalue(res, i, 0),
- active_db->db_name,
- PQgetvalue(res, i, 1),
- PQgetvalue(res, i, 2),
- PQgetvalue(res, i, 3));
- }
- PQclear(res);
- PQfinish(conn);
+ /*
+ * Check that all the subscriptions have their respective replication
+ * origin. This check only needs to run once.
+ */
+ conn = connectToServer(&old_cluster, old_cluster.dbarr.dbs[0].db_name);
+ res = executeQueryOrDie(conn,
+ "SELECT d.datname,
s.subname "
+ "FROM
pg_catalog.pg_subscription s "
+ "LEFT OUTER JOIN
pg_catalog.pg_replication_origin o "
+ " ON o.roname =
'pg_' || s.oid "
+ "INNER JOIN
pg_catalog.pg_database d "
+ " ON d.oid =
s.subdbid "
+ "WHERE o.roname IS
NULL;");
+ ntup = PQntuples(res);
+ for (int i = 0; i < ntup; i++)
+ {
+ if (report.file == NULL &&
+ (report.file = fopen_priv(report.path, "w")) == NULL)
+ pg_fatal("could not open file \"%s\": %m", report.path);
+ fprintf(report.file, "The replication origin is missing for
database:\"%s\" subscription:\"%s\"\n",
+ PQgetvalue(res, i, 0),
+ PQgetvalue(res, i, 1));
}
+ PQclear(res);
+ PQfinish(conn);
- if (script)
+ /*
+ * We don't allow upgrade if there is a risk of dangling slot or origin
+ * corresponding to initial sync after upgrade.
+ *
+ * A slot/origin not created yet refers to the 'i' (initialize) state,
+ * while 'r' (ready) state refers to a slot/origin created previously
but
+ * already dropped. These states are supported for pg_upgrade. The other
+ * states listed below are not supported:
+ *
+ * a) SUBREL_STATE_DATASYNC: A relation upgraded while in this state
would
+ * retain a replication slot, which could not be dropped by the sync
+ * worker spawned after the upgrade because the subscription ID used for
+ * the slot name won't match anymore.
+ *
+ * b) SUBREL_STATE_SYNCDONE: A relation upgraded while in this state
would
+ * retain the replication origin when there is a failure in tablesync
+ * worker immediately after dropping the replication slot in the
+ * publisher.
+ *
+ * c) SUBREL_STATE_FINISHEDCOPY: A tablesync worker spawned to work on a
+ * relation upgraded while in this state would expect an origin ID with
+ * the OID of the subscription used before the upgrade, causing it to
+ * fail.
+ *
+ * d) SUBREL_STATE_SYNCWAIT, SUBREL_STATE_CATCHUP and
+ * SUBREL_STATE_UNKNOWN: These states are not stored in the catalog, so
we
+ * need not allow these states.
+ */
+ query = "SELECT r.srsubstate, s.subname, n.nspname, c.relname "
+ "FROM pg_catalog.pg_subscription_rel r "
+ "LEFT JOIN pg_catalog.pg_subscription s"
+ " ON r.srsubid = s.oid "
+ "LEFT JOIN pg_catalog.pg_class c"
+ " ON r.srrelid = c.oid "
+ "LEFT JOIN pg_catalog.pg_namespace n"
+ " ON c.relnamespace = n.oid "
+ "WHERE r.srsubstate NOT IN ('i', 'r') "
+ "ORDER BY s.subname";
+
+ upgrade_task_add_step(task, query, process_old_sub_state_check,
+ true, &report);
+
+ upgrade_task_run(task, &old_cluster);
+ upgrade_task_free(task);
+
+ if (report.file)
{
- fclose(script);
+ fclose(report.file);
pg_log(PG_REPORT, "fatal");
pg_fatal("Your installation contains subscriptions without
origin or having relations not in i (initialize) or r (ready) state.\n"
"You can allow the initial sync to finish for
all relations and then restart the upgrade.\n"
"A list of the problematic subscriptions is in
the file:\n"
- " %s", output_path);
+ " %s", report.path);
}
else
check_ok();
--
2.39.3 (Apple Git-146)
>From 550115d651038aad3ef7c2fb277f3c5bbb071a82 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <[email protected]>
Date: Fri, 28 Jun 2024 21:09:33 -0500
Subject: [PATCH v11 03/11] Use pg_upgrade's new parallel framework to get
relation info.
Reviewed-by: FIXME
Discussion: https://postgr.es/m/20240516211638.GA1688936%40nathanxps13
---
src/bin/pg_upgrade/info.c | 296 ++++++++++++++++++++------------------
1 file changed, 154 insertions(+), 142 deletions(-)
diff --git a/src/bin/pg_upgrade/info.c b/src/bin/pg_upgrade/info.c
index d3c1e8918d..2bfc8dcfba 100644
--- a/src/bin/pg_upgrade/info.c
+++ b/src/bin/pg_upgrade/info.c
@@ -11,6 +11,7 @@
#include "access/transam.h"
#include "catalog/pg_class_d.h"
+#include "pqexpbuffer.h"
#include "pg_upgrade.h"
static void create_rel_filename_map(const char *old_data, const char *new_data,
@@ -22,12 +23,14 @@ static void report_unmatched_relation(const RelInfo *rel,
const DbInfo *db,
static void free_db_and_rel_infos(DbInfoArr *db_arr);
static void get_template0_info(ClusterInfo *cluster);
static void get_db_infos(ClusterInfo *cluster);
-static void get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo);
+static char *get_rel_infos_query(void);
+static void process_rel_infos(DbInfo *dbinfo, PGresult *res, void *arg);
static void free_rel_infos(RelInfoArr *rel_arr);
static void print_db_infos(DbInfoArr *db_arr);
static void print_rel_infos(RelInfoArr *rel_arr);
static void print_slot_infos(LogicalSlotInfoArr *slot_arr);
-static void get_old_cluster_logical_slot_infos(DbInfo *dbinfo);
+static char *get_old_cluster_logical_slot_infos_query(void);
+static void process_old_cluster_logical_slot_infos(DbInfo *dbinfo, PGresult
*res, void *arg);
/*
@@ -276,7 +279,9 @@ report_unmatched_relation(const RelInfo *rel, const DbInfo
*db, bool is_new_db)
void
get_db_rel_and_slot_infos(ClusterInfo *cluster)
{
- int dbnum;
+ UpgradeTask *task = upgrade_task_create();
+ char *rel_infos_query = NULL;
+ char *logical_slot_infos_query = NULL;
if (cluster->dbarr.dbs != NULL)
free_db_and_rel_infos(&cluster->dbarr);
@@ -284,15 +289,37 @@ get_db_rel_and_slot_infos(ClusterInfo *cluster)
get_template0_info(cluster);
get_db_infos(cluster);
- for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
+ rel_infos_query = get_rel_infos_query();
+ upgrade_task_add_step(task,
+ rel_infos_query,
+ process_rel_infos,
+ true, NULL);
+
+ /*
+ * Logical slots are only carried over to the new cluster when the old
+ * cluster is on PG17 or newer. This is because before that the logical
+ * slots are not saved at shutdown, so there is no guarantee that the
+ * latest confirmed_flush_lsn is saved to disk which can lead to data
+ * loss. It is still not guaranteed for manually created slots in PG17,
so
+ * subsequent checks done in check_old_cluster_for_valid_slots() would
+ * raise a FATAL error if such slots are included.
+ */
+ if (cluster == &old_cluster &&
+ GET_MAJOR_VERSION(cluster->major_version) > 1600)
{
- DbInfo *pDbInfo = &cluster->dbarr.dbs[dbnum];
+ logical_slot_infos_query =
get_old_cluster_logical_slot_infos_query();
+ upgrade_task_add_step(task,
+
logical_slot_infos_query,
+
process_old_cluster_logical_slot_infos,
+ true, NULL);
+ }
- get_rel_infos(cluster, pDbInfo);
+ upgrade_task_run(task, cluster);
+ upgrade_task_free(task);
- if (cluster == &old_cluster)
- get_old_cluster_logical_slot_infos(pDbInfo);
- }
+ pg_free(rel_infos_query);
+ if (logical_slot_infos_query)
+ pg_free(logical_slot_infos_query);
if (cluster == &old_cluster)
pg_log(PG_VERBOSE, "\nsource databases:");
@@ -431,40 +458,21 @@ get_db_infos(ClusterInfo *cluster)
/*
- * get_rel_infos()
+ * get_rel_infos_query()
*
- * gets the relinfos for all the user tables and indexes of the database
- * referred to by "dbinfo".
+ * Returns the query for retrieving the relation information for all the user
+ * tables and indexes in the database, for use by get_db_rel_and_slot_infos()'s
+ * UpgradeTask.
*
- * Note: the resulting RelInfo array is assumed to be sorted by OID.
- * This allows later processing to match up old and new databases efficiently.
+ * Note: the result is assumed to be sorted by OID. This allows later
+ * processing to match up old and new databases efficiently.
*/
-static void
-get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo)
+static char *
+get_rel_infos_query(void)
{
- PGconn *conn = connectToServer(cluster,
-
dbinfo->db_name);
- PGresult *res;
- RelInfo *relinfos;
- int ntups;
- int relnum;
- int num_rels = 0;
- char *nspname = NULL;
- char *relname = NULL;
- char *tablespace = NULL;
- int i_spclocation,
- i_nspname,
- i_relname,
- i_reloid,
- i_indtable,
- i_toastheap,
- i_relfilenumber,
- i_reltablespace;
- char query[QUERY_ALLOC];
- char *last_namespace = NULL,
- *last_tablespace = NULL;
+ PQExpBufferData query;
- query[0] = '\0'; /* initialize query string to
empty */
+ initPQExpBuffer(&query);
/*
* Create a CTE that collects OIDs of regular user tables and matviews,
@@ -476,34 +484,34 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo)
* output, so we have to copy that system table. It's easiest to do
that
* by treating it as a user table.
*/
- snprintf(query + strlen(query), sizeof(query) - strlen(query),
- "WITH regular_heap (reloid, indtable, toastheap) AS ( "
- " SELECT c.oid, 0::oid, 0::oid "
- " FROM pg_catalog.pg_class c JOIN
pg_catalog.pg_namespace n "
- " ON c.relnamespace = n.oid "
- " WHERE relkind IN (" CppAsString2(RELKIND_RELATION)
", "
- CppAsString2(RELKIND_MATVIEW) ") AND "
+ appendPQExpBuffer(&query,
+ "WITH regular_heap (reloid, indtable,
toastheap) AS ( "
+ " SELECT c.oid, 0::oid, 0::oid "
+ " FROM pg_catalog.pg_class c JOIN
pg_catalog.pg_namespace n "
+ " ON c.relnamespace = n.oid "
+ " WHERE relkind IN ("
CppAsString2(RELKIND_RELATION) ", "
+ CppAsString2(RELKIND_MATVIEW) ") AND "
/* exclude possible orphaned temp tables */
- " ((n.nspname !~ '^pg_temp_' AND "
- " n.nspname !~ '^pg_toast_temp_' AND "
- " n.nspname NOT IN ('pg_catalog',
'information_schema', "
- " 'binary_upgrade', 'pg_toast')
AND "
- " c.oid >= %u::pg_catalog.oid) OR "
- " (n.nspname = 'pg_catalog' AND "
- " relname IN ('pg_largeobject') ))), ",
- FirstNormalObjectId);
+ " ((n.nspname !~ '^pg_temp_' AND "
+ " n.nspname !~ '^pg_toast_temp_'
AND "
+ " n.nspname NOT IN
('pg_catalog', 'information_schema', "
+ "
'binary_upgrade', 'pg_toast') AND "
+ " c.oid >= %u::pg_catalog.oid)
OR "
+ " (n.nspname = 'pg_catalog' AND "
+ " relname IN ('pg_largeobject')
))), ",
+ FirstNormalObjectId);
/*
* Add a CTE that collects OIDs of toast tables belonging to the tables
* selected by the regular_heap CTE. (We have to do this separately
* because the namespace-name rules above don't work for toast tables.)
*/
- snprintf(query + strlen(query), sizeof(query) - strlen(query),
- " toast_heap (reloid, indtable, toastheap) AS ( "
- " SELECT c.reltoastrelid, 0::oid, c.oid "
- " FROM regular_heap JOIN pg_catalog.pg_class c "
- " ON regular_heap.reloid = c.oid "
- " WHERE c.reltoastrelid != 0), ");
+ appendPQExpBufferStr(&query,
+ " toast_heap (reloid,
indtable, toastheap) AS ( "
+ " SELECT c.reltoastrelid,
0::oid, c.oid "
+ " FROM regular_heap JOIN
pg_catalog.pg_class c "
+ " ON regular_heap.reloid
= c.oid "
+ " WHERE c.reltoastrelid !=
0), ");
/*
* Add a CTE that collects OIDs of all valid indexes on the previously
@@ -511,53 +519,68 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo)
* Testing indisready is necessary in 9.2, and harmless in earlier/later
* versions.
*/
- snprintf(query + strlen(query), sizeof(query) - strlen(query),
- " all_index (reloid, indtable, toastheap) AS ( "
- " SELECT indexrelid, indrelid, 0::oid "
- " FROM pg_catalog.pg_index "
- " WHERE indisvalid AND indisready "
- " AND indrelid IN "
- " (SELECT reloid FROM regular_heap "
- " UNION ALL "
- " SELECT reloid FROM toast_heap)) ");
+ appendPQExpBufferStr(&query,
+ " all_index (reloid,
indtable, toastheap) AS ( "
+ " SELECT indexrelid,
indrelid, 0::oid "
+ " FROM pg_catalog.pg_index "
+ " WHERE indisvalid AND
indisready "
+ " AND indrelid IN "
+ " (SELECT reloid FROM
regular_heap "
+ " UNION ALL "
+ " SELECT reloid FROM
toast_heap)) ");
/*
* And now we can write the query that retrieves the data we want for
each
* heap and index relation. Make sure result is sorted by OID.
*/
- snprintf(query + strlen(query), sizeof(query) - strlen(query),
- "SELECT all_rels.*, n.nspname, c.relname, "
- " c.relfilenode, c.reltablespace, "
- " pg_catalog.pg_tablespace_location(t.oid) AS
spclocation "
- "FROM (SELECT * FROM regular_heap "
- " UNION ALL "
- " SELECT * FROM toast_heap "
- " UNION ALL "
- " SELECT * FROM all_index) all_rels "
- " JOIN pg_catalog.pg_class c "
- " ON all_rels.reloid = c.oid "
- " JOIN pg_catalog.pg_namespace n "
- " ON c.relnamespace = n.oid "
- " LEFT OUTER JOIN pg_catalog.pg_tablespace t "
- " ON c.reltablespace = t.oid "
- "ORDER BY 1;");
-
- res = executeQueryOrDie(conn, "%s", query);
-
- ntups = PQntuples(res);
+ appendPQExpBufferStr(&query,
+ "SELECT all_rels.*, n.nspname,
c.relname, "
+ " c.relfilenode,
c.reltablespace, "
+ "
pg_catalog.pg_tablespace_location(t.oid) AS spclocation "
+ "FROM (SELECT * FROM
regular_heap "
+ " UNION ALL "
+ " SELECT * FROM
toast_heap "
+ " UNION ALL "
+ " SELECT * FROM
all_index) all_rels "
+ " JOIN pg_catalog.pg_class c "
+ " ON all_rels.reloid =
c.oid "
+ " JOIN
pg_catalog.pg_namespace n "
+ " ON c.relnamespace =
n.oid "
+ " LEFT OUTER JOIN
pg_catalog.pg_tablespace t "
+ " ON c.reltablespace =
t.oid "
+ "ORDER BY 1;");
+
+ return query.data;
+}
- relinfos = (RelInfo *) pg_malloc(sizeof(RelInfo) * ntups);
+/*
+ * Callback function for processing results of the query returned by
+ * get_rel_infos_query(), which is used for get_db_rel_and_slot_infos()'s
+ * UpgradeTask. This function stores the relation information for later use.
+ */
+static void
+process_rel_infos(DbInfo *dbinfo, PGresult *res, void *arg)
+{
+ int ntups = PQntuples(res);
+ RelInfo *relinfos = (RelInfo *) pg_malloc(sizeof(RelInfo) * ntups);
+ int i_reloid = PQfnumber(res, "reloid");
+ int i_indtable = PQfnumber(res, "indtable");
+ int i_toastheap = PQfnumber(res, "toastheap");
+ int i_nspname = PQfnumber(res, "nspname");
+ int i_relname = PQfnumber(res, "relname");
+ int i_relfilenumber = PQfnumber(res, "relfilenode");
+ int i_reltablespace = PQfnumber(res,
"reltablespace");
+ int i_spclocation = PQfnumber(res, "spclocation");
+ int num_rels = 0;
+ char *nspname = NULL;
+ char *relname = NULL;
+ char *tablespace = NULL;
+ char *last_namespace = NULL;
+ char *last_tablespace = NULL;
- i_reloid = PQfnumber(res, "reloid");
- i_indtable = PQfnumber(res, "indtable");
- i_toastheap = PQfnumber(res, "toastheap");
- i_nspname = PQfnumber(res, "nspname");
- i_relname = PQfnumber(res, "relname");
- i_relfilenumber = PQfnumber(res, "relfilenode");
- i_reltablespace = PQfnumber(res, "reltablespace");
- i_spclocation = PQfnumber(res, "spclocation");
+ AssertVariableIsOfType(&process_rel_infos, UpgradeTaskProcessCB);
- for (relnum = 0; relnum < ntups; relnum++)
+ for (int relnum = 0; relnum < ntups; relnum++)
{
RelInfo *curr = &relinfos[num_rels++];
@@ -610,44 +633,22 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo)
/* A zero reltablespace oid indicates the database
tablespace. */
curr->tablespace = dbinfo->db_tablespace;
}
- PQclear(res);
-
- PQfinish(conn);
dbinfo->rel_arr.rels = relinfos;
dbinfo->rel_arr.nrels = num_rels;
}
/*
- * get_old_cluster_logical_slot_infos()
- *
- * Gets the LogicalSlotInfos for all the logical replication slots of the
- * database referred to by "dbinfo". The status of each logical slot is gotten
- * here, but they are used at the checking phase. See
- * check_old_cluster_for_valid_slots().
+ * get_old_cluster_logical_slot_infos_query()
*
- * Note: This function will not do anything if the old cluster is pre-PG17.
- * This is because before that the logical slots are not saved at shutdown, so
- * there is no guarantee that the latest confirmed_flush_lsn is saved to disk
- * which can lead to data loss. It is still not guaranteed for manually created
- * slots in PG17, so subsequent checks done in
- * check_old_cluster_for_valid_slots() would raise a FATAL error if such slots
- * are included.
+ * Returns the query for retrieving the logical slot information for all the
+ * logical replication slots in the database, for use by
+ * get_db_rel_and_slot_infos()'s UpgradeTask. The status of each logical slot
+ * is checked in check_old_cluster_for_valid_slots().
*/
-static void
-get_old_cluster_logical_slot_infos(DbInfo *dbinfo)
+static char *
+get_old_cluster_logical_slot_infos_query(void)
{
- PGconn *conn;
- PGresult *res;
- LogicalSlotInfo *slotinfos = NULL;
- int num_slots;
-
- /* Logical slots can be migrated since PG17. */
- if (GET_MAJOR_VERSION(old_cluster.major_version) <= 1600)
- return;
-
- conn = connectToServer(&old_cluster, dbinfo->db_name);
-
/*
* Fetch the logical replication slot information. The check whether the
* slot is considered caught up is done by an upgrade function. This
@@ -665,18 +666,32 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo)
* started and stopped several times causing any temporary slots to be
* removed.
*/
- res = executeQueryOrDie(conn, "SELECT slot_name, plugin, two_phase,
failover, "
- "%s as caught_up,
invalidation_reason IS NOT NULL as invalid "
- "FROM
pg_catalog.pg_replication_slots "
- "WHERE slot_type =
'logical' AND "
- "database =
current_database() AND "
- "temporary IS FALSE;",
- user_opts.live_check ?
"FALSE" :
- "(CASE WHEN
invalidation_reason IS NOT NULL THEN FALSE "
- "ELSE (SELECT
pg_catalog.binary_upgrade_logical_slot_has_caught_up(slot_name)) "
- "END)");
-
- num_slots = PQntuples(res);
+ return psprintf("SELECT slot_name, plugin, two_phase, failover, "
+ "%s as caught_up, invalidation_reason
IS NOT NULL as invalid "
+ "FROM pg_catalog.pg_replication_slots "
+ "WHERE slot_type = 'logical' AND "
+ "database = current_database() AND "
+ "temporary IS FALSE;",
+ user_opts.live_check ? "FALSE" :
+ "(CASE WHEN invalidation_reason IS NOT
NULL THEN FALSE "
+ "ELSE (SELECT
pg_catalog.binary_upgrade_logical_slot_has_caught_up(slot_name)) "
+ "END)");
+}
+
+/*
+ * Callback function for processing results of the query returned by
+ * get_old_cluster_logical_slot_infos_query(), which is used for
+ * get_db_rel_and_slot_infos()'s UpgradeTask. This function stores the logical
+ * slot information for later use.
+ */
+static void
+process_old_cluster_logical_slot_infos(DbInfo *dbinfo, PGresult *res, void
*arg)
+{
+ LogicalSlotInfo *slotinfos = NULL;
+ int num_slots = PQntuples(res);
+
+ AssertVariableIsOfType(&process_old_cluster_logical_slot_infos,
+ UpgradeTaskProcessCB);
if (num_slots)
{
@@ -709,9 +724,6 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo)
}
}
- PQclear(res);
- PQfinish(conn);
-
dbinfo->slot_arr.slots = slotinfos;
dbinfo->slot_arr.nslots = num_slots;
}
--
2.39.3 (Apple Git-146)
>From 547090052b0783c81ad0d2c6c694ce40a387740c Mon Sep 17 00:00:00 2001
From: Nathan Bossart <[email protected]>
Date: Wed, 28 Aug 2024 13:32:33 -0500
Subject: [PATCH v11 04/11] Use pg_upgrade's new parallel framework to get
loadable libraries.
Reviewed-by: FIXME
Discussion: https://postgr.es/m/20240516211638.GA1688936%40nathanxps13
---
src/bin/pg_upgrade/function.c | 71 ++++++++++++++++++++++-------------
1 file changed, 45 insertions(+), 26 deletions(-)
diff --git a/src/bin/pg_upgrade/function.c b/src/bin/pg_upgrade/function.c
index 7e3abed098..0588347b49 100644
--- a/src/bin/pg_upgrade/function.c
+++ b/src/bin/pg_upgrade/function.c
@@ -42,6 +42,30 @@ library_name_compare(const void *p1, const void *p2)
((const LibraryInfo *) p2)->dbnum);
}
+/*
+ * Private state for get_loadable_libraries()'s UpgradeTask.
+ */
+struct loadable_libraries_state
+{
+ PGresult **ress; /* results for each database */
+ int totaltups; /* number of tuples in
all results */
+};
+
+/*
+ * Callback function for processing results of query for
+ * get_loadable_libraries()'s UpgradeTask. This function stores the results
+ * for later use within get_loadable_libraries().
+ */
+static void
+process_loadable_libraries(DbInfo *dbinfo, PGresult *res, void *arg)
+{
+ struct loadable_libraries_state *state = (struct
loadable_libraries_state *) arg;
+
+ AssertVariableIsOfType(&process_loadable_libraries,
UpgradeTaskProcessCB);
+
+ state->ress[dbinfo - old_cluster.dbarr.dbs] = res;
+ state->totaltups += PQntuples(res);
+}
/*
* get_loadable_libraries()
@@ -54,47 +78,41 @@ library_name_compare(const void *p1, const void *p2)
void
get_loadable_libraries(void)
{
- PGresult **ress;
int totaltups;
int dbnum;
int n_libinfos;
+ UpgradeTask *task = upgrade_task_create();
+ struct loadable_libraries_state state;
+ char *query;
- ress = (PGresult **) pg_malloc(old_cluster.dbarr.ndbs * sizeof(PGresult
*));
- totaltups = 0;
+ state.ress = (PGresult **) pg_malloc(old_cluster.dbarr.ndbs *
sizeof(PGresult *));
+ state.totaltups = 0;
- /* Fetch all library names, removing duplicates within each DB */
- for (dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++)
- {
- DbInfo *active_db = &old_cluster.dbarr.dbs[dbnum];
- PGconn *conn = connectToServer(&old_cluster,
active_db->db_name);
+ query = psprintf("SELECT DISTINCT probin "
+ "FROM pg_catalog.pg_proc "
+ "WHERE prolang = %u AND "
+ "probin IS NOT NULL AND "
+ "oid >= %u;",
+ ClanguageId,
+ FirstNormalObjectId);
- /*
- * Fetch all libraries containing non-built-in C functions in
this DB.
- */
- ress[dbnum] = executeQueryOrDie(conn,
-
"SELECT DISTINCT probin "
-
"FROM pg_catalog.pg_proc "
-
"WHERE prolang = %u AND "
-
"probin IS NOT NULL AND "
-
"oid >= %u;",
-
ClanguageId,
-
FirstNormalObjectId);
- totaltups += PQntuples(ress[dbnum]);
-
- PQfinish(conn);
- }
+ upgrade_task_add_step(task, query, process_loadable_libraries,
+ false, &state);
+
+ upgrade_task_run(task, &old_cluster);
+ upgrade_task_free(task);
/*
* Allocate memory for required libraries and logical replication output
* plugins.
*/
- n_libinfos = totaltups + count_old_cluster_logical_slots();
+ n_libinfos = state.totaltups + count_old_cluster_logical_slots();
os_info.libraries = (LibraryInfo *) pg_malloc(sizeof(LibraryInfo) *
n_libinfos);
totaltups = 0;
for (dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++)
{
- PGresult *res = ress[dbnum];
+ PGresult *res = state.ress[dbnum];
int ntups;
int rowno;
LogicalSlotInfoArr *slot_arr =
&old_cluster.dbarr.dbs[dbnum].slot_arr;
@@ -129,7 +147,8 @@ get_loadable_libraries(void)
}
}
- pg_free(ress);
+ pg_free(state.ress);
+ pg_free(query);
os_info.num_libraries = totaltups;
}
--
2.39.3 (Apple Git-146)
>From 7ea2132ea86eedaea916e4356d6448af3f769aab Mon Sep 17 00:00:00 2001
From: Nathan Bossart <[email protected]>
Date: Wed, 28 Aug 2024 14:18:39 -0500
Subject: [PATCH v11 05/11] Use pg_upgrade's new parallel framework for
extension updates.
Reviewed-by: FIXME
Discussion: https://postgr.es/m/20240516211638.GA1688936%40nathanxps13
---
src/bin/pg_upgrade/version.c | 94 +++++++++++++++++++-----------------
1 file changed, 49 insertions(+), 45 deletions(-)
diff --git a/src/bin/pg_upgrade/version.c b/src/bin/pg_upgrade/version.c
index 2de6dffccd..5084b08805 100644
--- a/src/bin/pg_upgrade/version.c
+++ b/src/bin/pg_upgrade/version.c
@@ -139,6 +139,41 @@ old_9_6_invalidate_hash_indexes(ClusterInfo *cluster, bool
check_mode)
check_ok();
}
+/*
+ * Callback function for processing results of query for
+ * report_extension_updates()'s UpgradeTask. If the query returned any rows,
+ * write the details to the report file.
+ */
+static void
+process_extension_updates(DbInfo *dbinfo, PGresult *res, void *arg)
+{
+ bool db_used = false;
+ int ntups = PQntuples(res);
+ int i_name = PQfnumber(res, "name");
+ UpgradeTaskReport *report = (UpgradeTaskReport *) arg;
+
+ AssertVariableIsOfType(&process_extension_updates,
UpgradeTaskProcessCB);
+
+ for (int rowno = 0; rowno < ntups; rowno++)
+ {
+ if (report->file == NULL &&
+ (report->file = fopen_priv(report->path, "w")) == NULL)
+ pg_fatal("could not open file \"%s\": %m",
report->path);
+ if (!db_used)
+ {
+ PQExpBufferData connectbuf;
+
+ initPQExpBuffer(&connectbuf);
+ appendPsqlMetaConnect(&connectbuf, dbinfo->db_name);
+ fputs(connectbuf.data, report->file);
+ termPQExpBuffer(&connectbuf);
+ db_used = true;
+ }
+ fprintf(report->file, "ALTER EXTENSION %s UPDATE;\n",
+ quote_identifier(PQgetvalue(res, rowno,
i_name)));
+ }
+}
+
/*
* report_extension_updates()
* Report extensions that should be updated.
@@ -146,57 +181,26 @@ old_9_6_invalidate_hash_indexes(ClusterInfo *cluster,
bool check_mode)
void
report_extension_updates(ClusterInfo *cluster)
{
- int dbnum;
- FILE *script = NULL;
- char *output_path = "update_extensions.sql";
+ UpgradeTaskReport report;
+ UpgradeTask *task = upgrade_task_create();
+ const char *query = "SELECT name "
+ "FROM pg_available_extensions "
+ "WHERE installed_version != default_version";
prep_status("Checking for extension updates");
- for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
- {
- PGresult *res;
- bool db_used = false;
- int ntups;
- int rowno;
- int i_name;
- DbInfo *active_db = &cluster->dbarr.dbs[dbnum];
- PGconn *conn = connectToServer(cluster, active_db->db_name);
-
- /* find extensions needing updates */
- res = executeQueryOrDie(conn,
- "SELECT name "
- "FROM
pg_available_extensions "
- "WHERE
installed_version != default_version"
- );
+ report.file = NULL;
+ strcpy(report.path, "update_extensions.sql");
- ntups = PQntuples(res);
- i_name = PQfnumber(res, "name");
- for (rowno = 0; rowno < ntups; rowno++)
- {
- if (script == NULL && (script = fopen_priv(output_path,
"w")) == NULL)
- pg_fatal("could not open file \"%s\": %m",
output_path);
- if (!db_used)
- {
- PQExpBufferData connectbuf;
+ upgrade_task_add_step(task, query, process_extension_updates,
+ true, &report);
- initPQExpBuffer(&connectbuf);
- appendPsqlMetaConnect(&connectbuf,
active_db->db_name);
- fputs(connectbuf.data, script);
- termPQExpBuffer(&connectbuf);
- db_used = true;
- }
- fprintf(script, "ALTER EXTENSION %s UPDATE;\n",
- quote_identifier(PQgetvalue(res, rowno,
i_name)));
- }
+ upgrade_task_run(task, cluster);
+ upgrade_task_free(task);
- PQclear(res);
-
- PQfinish(conn);
- }
-
- if (script)
+ if (report.file)
{
- fclose(script);
+ fclose(report.file);
report_status(PG_REPORT, "notice");
pg_log(PG_REPORT, "\n"
"Your installation contains extensions that should
be updated\n"
@@ -204,7 +208,7 @@ report_extension_updates(ClusterInfo *cluster)
" %s\n"
"when executed by psql by the database superuser
will update\n"
"these extensions.",
- output_path);
+ report.path);
}
else
check_ok();
--
2.39.3 (Apple Git-146)
>From 3b0c42e31db602d76a400ce460b9d76ebc354991 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <[email protected]>
Date: Sat, 6 Jul 2024 21:06:31 -0500
Subject: [PATCH v11 06/11] Use pg_upgrade's new parallel framework for data
type checks.
Reviewed-by: FIXME
Discussion: https://postgr.es/m/20240516211638.GA1688936%40nathanxps13
---
src/bin/pg_upgrade/check.c | 351 ++++++++++++++++++++-----------------
1 file changed, 191 insertions(+), 160 deletions(-)
diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index f8160e0140..f935b53e1f 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -314,6 +314,147 @@ static DataTypesUsageChecks data_types_usage_checks[] =
}
};
+/*
+ * Private state for check_for_data_types_usage()'s UpgradeTask.
+ */
+struct data_type_check_state
+{
+ DataTypesUsageChecks *check; /* the check for this step */
+ bool *result; /* true if check failed for any
database */
+ PQExpBuffer *report; /* buffer for report on failed checks */
+};
+
+/*
+ * Returns a palloc'd query string for the data type check, for use by
+ * check_for_data_types_usage()'s UpgradeTask.
+ */
+static char *
+data_type_check_query(int checknum)
+{
+ DataTypesUsageChecks *check = &data_types_usage_checks[checknum];
+
+ return psprintf("WITH RECURSIVE oids AS ( "
+ /* start with the type(s) returned by base_query */
+ " %s "
+ " UNION ALL "
+ " SELECT * FROM ( "
+ /* inner WITH because we can only reference the CTE once */
+ " WITH x AS (SELECT oid
FROM oids) "
+ /* domains on any type selected so far */
+ " SELECT t.oid
FROM pg_catalog.pg_type t, x WHERE typbasetype = x.oid AND typtype = 'd' "
+ " UNION ALL "
+ /* arrays over any type selected so far */
+ " SELECT t.oid
FROM pg_catalog.pg_type t, x WHERE typelem = x.oid AND typtype = 'b' "
+ " UNION ALL "
+ /* composite types containing any type selected so far */
+ " SELECT t.oid
FROM pg_catalog.pg_type t, pg_catalog.pg_class c, pg_catalog.pg_attribute a, x "
+ " WHERE t.typtype
= 'c' AND "
+ " t.oid
= c.reltype AND "
+ " c.oid
= a.attrelid AND "
+ " NOT
a.attisdropped AND "
+ "
a.atttypid = x.oid "
+ " UNION ALL "
+ /* ranges containing any type selected so far */
+ " SELECT t.oid
FROM pg_catalog.pg_type t, pg_catalog.pg_range r, x "
+ " WHERE t.typtype
= 'r' AND r.rngtypid = t.oid AND r.rngsubtype = x.oid"
+ " ) foo "
+ ") "
+ /* now look for stored columns of any such type */
+ "SELECT n.nspname, c.relname, a.attname
"
+ "FROM pg_catalog.pg_class c, "
+ " pg_catalog.pg_namespace
n, "
+ " pg_catalog.pg_attribute
a "
+ "WHERE c.oid = a.attrelid AND "
+ " NOT a.attisdropped AND "
+ " a.atttypid IN (SELECT
oid FROM oids) AND "
+ " c.relkind IN ("
+ CppAsString2(RELKIND_RELATION) ", "
+ CppAsString2(RELKIND_MATVIEW) ", "
+ CppAsString2(RELKIND_INDEX) ") AND "
+ " c.relnamespace = n.oid
AND "
+ /* exclude possible orphaned temp tables */
+ " n.nspname !~
'^pg_temp_' AND "
+ " n.nspname !~
'^pg_toast_temp_' AND "
+ /* exclude system catalogs, too */
+ " n.nspname NOT IN
('pg_catalog', 'information_schema')",
+ check->base_query);
+}
+
+/*
+ * Callback function for processing results of queries for
+ * check_for_data_types_usage()'s UpgradeTask. If the query returned any rows
+ * (i.e., the check failed), write the details to the report file.
+ */
+static void
+process_data_type_check(DbInfo *dbinfo, PGresult *res, void *arg)
+{
+ struct data_type_check_state *state = (struct data_type_check_state *)
arg;
+ int ntups = PQntuples(res);
+
+ AssertVariableIsOfType(&process_data_type_check, UpgradeTaskProcessCB);
+
+ if (ntups)
+ {
+ char output_path[MAXPGPATH];
+ int i_nspname;
+ int i_relname;
+ int i_attname;
+ FILE *script = NULL;
+ bool db_used = false;
+
+ snprintf(output_path, sizeof(output_path), "%s/%s",
+ log_opts.basedir,
+ state->check->report_filename);
+
+ /*
+ * Make sure we have a buffer to save reports to now that we
found a
+ * first failing check.
+ */
+ if (*state->report == NULL)
+ *state->report = createPQExpBuffer();
+
+ /*
+ * If this is the first time we see an error for the check in
question
+ * then print a status message of the failure.
+ */
+ if (!(*state->result))
+ {
+ pg_log(PG_REPORT, " failed check: %s",
_(state->check->status));
+ appendPQExpBuffer(*state->report, "\n%s\n%s %s\n",
+
_(state->check->report_text),
+ _("A list of the
problem columns is in the file:"),
+ output_path);
+ }
+ *state->result = true;
+
+ i_nspname = PQfnumber(res, "nspname");
+ i_relname = PQfnumber(res, "relname");
+ i_attname = PQfnumber(res, "attname");
+
+ for (int rowno = 0; rowno < ntups; rowno++)
+ {
+ if (script == NULL && (script = fopen_priv(output_path,
"a")) == NULL)
+ pg_fatal("could not open file \"%s\": %m",
output_path);
+
+ if (!db_used)
+ {
+ fprintf(script, "In database: %s\n",
dbinfo->db_name);
+ db_used = true;
+ }
+ fprintf(script, " %s.%s.%s\n",
+ PQgetvalue(res, rowno, i_nspname),
+ PQgetvalue(res, rowno, i_relname),
+ PQgetvalue(res, rowno, i_attname));
+ }
+
+ if (script)
+ {
+ fclose(script);
+ script = NULL;
+ }
+ }
+}
+
/*
* check_for_data_types_usage()
* Detect whether there are any stored columns depending on given type(s)
@@ -334,13 +475,15 @@ static DataTypesUsageChecks data_types_usage_checks[] =
* there's no storage involved in a view.
*/
static void
-check_for_data_types_usage(ClusterInfo *cluster, DataTypesUsageChecks *checks)
+check_for_data_types_usage(ClusterInfo *cluster)
{
- bool found = false;
bool *results;
- PQExpBufferData report;
- DataTypesUsageChecks *tmp = checks;
+ PQExpBuffer report = NULL;
+ DataTypesUsageChecks *tmp = data_types_usage_checks;
int n_data_types_usage_checks = 0;
+ UpgradeTask *task = upgrade_task_create();
+ char **queries = NULL;
+ struct data_type_check_state *states;
prep_status("Checking data type usage");
@@ -353,175 +496,63 @@ check_for_data_types_usage(ClusterInfo *cluster,
DataTypesUsageChecks *checks)
/* Prepare an array to store the results of checks in */
results = pg_malloc0(sizeof(bool) * n_data_types_usage_checks);
+ queries = pg_malloc0(sizeof(char *) * n_data_types_usage_checks);
+ states = pg_malloc0(sizeof(struct data_type_check_state) *
n_data_types_usage_checks);
- /*
- * Connect to each database in the cluster and run all defined checks
- * against that database before trying the next one.
- */
- for (int dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
+ for (int i = 0; i < n_data_types_usage_checks; i++)
{
- DbInfo *active_db = &cluster->dbarr.dbs[dbnum];
- PGconn *conn = connectToServer(cluster, active_db->db_name);
+ DataTypesUsageChecks *check = &data_types_usage_checks[i];
- for (int checknum = 0; checknum < n_data_types_usage_checks;
checknum++)
+ if (check->threshold_version == MANUAL_CHECK)
{
- PGresult *res;
- int ntups;
- int i_nspname;
- int i_relname;
- int i_attname;
- FILE *script = NULL;
- bool db_used = false;
- char output_path[MAXPGPATH];
- DataTypesUsageChecks *cur_check = &checks[checknum];
-
- if (cur_check->threshold_version == MANUAL_CHECK)
- {
- Assert(cur_check->version_hook);
-
- /*
- * Make sure that the check applies to the
current cluster
- * version and skip if not. If no check hook
has been defined
- * we run the check for all versions.
- */
- if (!cur_check->version_hook(cluster))
- continue;
- }
- else if (cur_check->threshold_version != ALL_VERSIONS)
- {
- if (GET_MAJOR_VERSION(cluster->major_version) >
cur_check->threshold_version)
- continue;
- }
- else
- Assert(cur_check->threshold_version ==
ALL_VERSIONS);
-
- snprintf(output_path, sizeof(output_path), "%s/%s",
- log_opts.basedir,
- cur_check->report_filename);
+ Assert(check->version_hook);
/*
- * The type(s) of interest might be wrapped in a
domain, array,
- * composite, or range, and these container types can
be nested
- * (to varying extents depending on server version, but
that's not
- * of concern here). To handle all these cases we need
a
- * recursive CTE.
+ * Make sure that the check applies to the current
cluster version
+ * and skip it if not.
*/
- res = executeQueryOrDie(conn,
- "WITH
RECURSIVE oids AS ( "
- /* start with the type(s) returned by base_query */
- "
%s "
- "
UNION ALL "
- "
SELECT * FROM ( "
- /* inner WITH because we can only reference the CTE
once */
- "
WITH x AS (SELECT oid FROM oids) "
- /* domains on any type selected so far */
- "
SELECT t.oid FROM pg_catalog.pg_type t, x WHERE typbasetype =
x.oid AND typtype = 'd' "
- "
UNION ALL "
- /* arrays over any type selected so far */
- "
SELECT t.oid FROM pg_catalog.pg_type t, x WHERE typelem = x.oid
AND typtype = 'b' "
- "
UNION ALL "
- /* composite types containing any type selected so far
*/
- "
SELECT t.oid FROM pg_catalog.pg_type t, pg_catalog.pg_class c,
pg_catalog.pg_attribute a, x "
- "
WHERE t.typtype = 'c' AND "
- "
t.oid = c.reltype AND "
- "
c.oid = a.attrelid AND "
- "
NOT a.attisdropped AND "
- "
a.atttypid = x.oid "
- "
UNION ALL "
- /* ranges containing any type selected so far */
- "
SELECT t.oid FROM pg_catalog.pg_type t, pg_catalog.pg_range r,
x "
- "
WHERE t.typtype = 'r' AND r.rngtypid = t.oid AND r.rngsubtype =
x.oid"
- "
) foo "
- ") "
- /* now look for stored columns of any such type */
- "SELECT
n.nspname, c.relname, a.attname "
- "FROM
pg_catalog.pg_class c, "
- "
pg_catalog.pg_namespace n, "
- "
pg_catalog.pg_attribute a "
- "WHERE
c.oid = a.attrelid AND "
- "
NOT a.attisdropped AND "
- "
a.atttypid IN (SELECT oid FROM oids) AND "
- "
c.relkind IN ("
-
CppAsString2(RELKIND_RELATION) ", "
-
CppAsString2(RELKIND_MATVIEW) ", "
-
CppAsString2(RELKIND_INDEX) ") AND "
- "
c.relnamespace = n.oid AND "
- /* exclude possible orphaned temp tables */
- "
n.nspname !~ '^pg_temp_' AND "
- "
n.nspname !~ '^pg_toast_temp_' AND "
- /* exclude system catalogs, too */
- "
n.nspname NOT IN ('pg_catalog', 'information_schema')",
-
cur_check->base_query);
-
- ntups = PQntuples(res);
+ if (!check->version_hook(cluster))
+ continue;
+ }
+ else if (check->threshold_version != ALL_VERSIONS)
+ {
+ if (GET_MAJOR_VERSION(cluster->major_version) >
check->threshold_version)
+ continue;
+ }
+ else
+ Assert(check->threshold_version == ALL_VERSIONS);
- /*
- * The datatype was found, so extract the data and log
to the
- * requested filename. We need to open the file for
appending
- * since the check might have already found the type in
another
- * database earlier in the loop.
- */
- if (ntups)
- {
- /*
- * Make sure we have a buffer to save reports
to now that we
- * found a first failing check.
- */
- if (!found)
- initPQExpBuffer(&report);
- found = true;
-
- /*
- * If this is the first time we see an error
for the check in
- * question then print a status message of the
failure.
- */
- if (!results[checknum])
- {
- pg_log(PG_REPORT, " failed check:
%s", _(cur_check->status));
- appendPQExpBuffer(&report, "\n%s\n%s
%s\n",
-
_(cur_check->report_text),
- _("A
list of the problem columns is in the file:"),
-
output_path);
- }
- results[checknum] = true;
-
- i_nspname = PQfnumber(res, "nspname");
- i_relname = PQfnumber(res, "relname");
- i_attname = PQfnumber(res, "attname");
-
- for (int rowno = 0; rowno < ntups; rowno++)
- {
- if (script == NULL && (script =
fopen_priv(output_path, "a")) == NULL)
- pg_fatal("could not open file
\"%s\": %m", output_path);
-
- if (!db_used)
- {
- fprintf(script, "In database:
%s\n", active_db->db_name);
- db_used = true;
- }
- fprintf(script, " %s.%s.%s\n",
- PQgetvalue(res, rowno,
i_nspname),
- PQgetvalue(res, rowno,
i_relname),
- PQgetvalue(res, rowno,
i_attname));
- }
-
- if (script)
- {
- fclose(script);
- script = NULL;
- }
- }
+ queries[i] = data_type_check_query(i);
- PQclear(res);
- }
+ states[i].check = check;
+ states[i].result = &results[i];
+ states[i].report = &report;
- PQfinish(conn);
+ upgrade_task_add_step(task, queries[i], process_data_type_check,
+ true, &states[i]);
}
- if (found)
- pg_fatal("Data type checks failed: %s", report.data);
+ /*
+ * Connect to each database in the cluster and run all defined checks
+ * against that database before trying the next one.
+ */
+ upgrade_task_run(task, cluster);
+ upgrade_task_free(task);
+
+ if (report)
+ {
+ pg_fatal("Data type checks failed: %s", report->data);
+ destroyPQExpBuffer(report);
+ }
pg_free(results);
+ for (int i = 0; i < n_data_types_usage_checks; i++)
+ {
+ if (queries[i])
+ pg_free(queries[i]);
+ }
+ pg_free(queries);
+ pg_free(states);
check_ok();
}
@@ -616,7 +647,7 @@ check_and_dump_old_cluster(void)
check_old_cluster_subscription_state();
}
- check_for_data_types_usage(&old_cluster, data_types_usage_checks);
+ check_for_data_types_usage(&old_cluster);
/*
* PG 14 changed the function signature of encoding conversion
functions.
--
2.39.3 (Apple Git-146)
>From a3835a8624c380cd822b73d5b6d74860ec1fcd29 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <[email protected]>
Date: Mon, 8 Jul 2024 21:00:20 -0500
Subject: [PATCH v11 07/11] Use pg_upgrade's new parallel framework for isn and
int8 check.
Reviewed-by: FIXME
Discussion: https://postgr.es/m/20240516211638.GA1688936%40nathanxps13
---
src/bin/pg_upgrade/check.c | 97 ++++++++++++++++++++------------------
1 file changed, 50 insertions(+), 47 deletions(-)
diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index f935b53e1f..b8af7e541b 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -1225,6 +1225,39 @@ check_for_prepared_transactions(ClusterInfo *cluster)
check_ok();
}
+/*
+ * Callback function for processing result of query for
+ * check_for_isn_and_int8_passing_mismatch()'s UpgradeTask. If the query
+ * returned any rows (i.e., the check failed), write the details to the report
+ * file.
+ */
+static void
+process_isn_and_int8_passing_mismatch(DbInfo *dbinfo, PGresult *res, void *arg)
+{
+ bool db_used = false;
+ int ntups = PQntuples(res);
+ int i_nspname = PQfnumber(res, "nspname");
+ int i_proname = PQfnumber(res, "proname");
+ UpgradeTaskReport *report = (UpgradeTaskReport *) arg;
+
+ AssertVariableIsOfType(&process_isn_and_int8_passing_mismatch,
+ UpgradeTaskProcessCB);
+
+ for (int rowno = 0; rowno < ntups; rowno++)
+ {
+ if (report->file == NULL &&
+ (report->file = fopen_priv(report->path, "w")) == NULL)
+ pg_fatal("could not open file \"%s\": %m",
report->path);
+ if (!db_used)
+ {
+ fprintf(report->file, "In database: %s\n",
dbinfo->db_name);
+ db_used = true;
+ }
+ fprintf(report->file, " %s.%s\n",
+ PQgetvalue(res, rowno, i_nspname),
+ PQgetvalue(res, rowno, i_proname));
+ }
+}
/*
* check_for_isn_and_int8_passing_mismatch()
@@ -1236,9 +1269,13 @@ check_for_prepared_transactions(ClusterInfo *cluster)
static void
check_for_isn_and_int8_passing_mismatch(ClusterInfo *cluster)
{
- int dbnum;
- FILE *script = NULL;
- char output_path[MAXPGPATH];
+ UpgradeTask *task;
+ UpgradeTaskReport report;
+ const char *query = "SELECT n.nspname, p.proname "
+ "FROM pg_catalog.pg_proc p, "
+ " pg_catalog.pg_namespace n "
+ "WHERE p.pronamespace = n.oid AND "
+ " p.probin = '$libdir/isn'";
prep_status("Checking for contrib/isn with bigint-passing mismatch");
@@ -1250,54 +1287,20 @@ check_for_isn_and_int8_passing_mismatch(ClusterInfo
*cluster)
return;
}
- snprintf(output_path, sizeof(output_path), "%s/%s",
+ report.file = NULL;
+ snprintf(report.path, sizeof(report.path), "%s/%s",
log_opts.basedir,
"contrib_isn_and_int8_pass_by_value.txt");
- for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
- {
- PGresult *res;
- bool db_used = false;
- int ntups;
- int rowno;
- int i_nspname,
- i_proname;
- DbInfo *active_db = &cluster->dbarr.dbs[dbnum];
- PGconn *conn = connectToServer(cluster, active_db->db_name);
-
- /* Find any functions coming from contrib/isn */
- res = executeQueryOrDie(conn,
- "SELECT
n.nspname, p.proname "
- "FROM
pg_catalog.pg_proc p, "
- "
pg_catalog.pg_namespace n "
- "WHERE
p.pronamespace = n.oid AND "
- "
p.probin = '$libdir/isn'");
-
- ntups = PQntuples(res);
- i_nspname = PQfnumber(res, "nspname");
- i_proname = PQfnumber(res, "proname");
- for (rowno = 0; rowno < ntups; rowno++)
- {
- if (script == NULL && (script = fopen_priv(output_path,
"w")) == NULL)
- pg_fatal("could not open file \"%s\": %m",
output_path);
- if (!db_used)
- {
- fprintf(script, "In database: %s\n",
active_db->db_name);
- db_used = true;
- }
- fprintf(script, " %s.%s\n",
- PQgetvalue(res, rowno, i_nspname),
- PQgetvalue(res, rowno, i_proname));
- }
-
- PQclear(res);
-
- PQfinish(conn);
- }
+ task = upgrade_task_create();
+ upgrade_task_add_step(task, query,
process_isn_and_int8_passing_mismatch,
+ true, &report);
+ upgrade_task_run(task, cluster);
+ upgrade_task_free(task);
- if (script)
+ if (report.file)
{
- fclose(script);
+ fclose(report.file);
pg_log(PG_REPORT, "fatal");
pg_fatal("Your installation contains \"contrib/isn\" functions
which rely on the\n"
"bigint data type. Your old and new clusters
pass bigint values\n"
@@ -1305,7 +1308,7 @@ check_for_isn_and_int8_passing_mismatch(ClusterInfo
*cluster)
"manually dump databases in the old cluster
that use \"contrib/isn\"\n"
"facilities, drop them, perform the upgrade,
and then restore them. A\n"
"list of the problem functions is in the
file:\n"
- " %s", output_path);
+ " %s", report.path);
}
else
check_ok();
--
2.39.3 (Apple Git-146)
>From a1803394e21534156306144deeef1065520981b9 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <[email protected]>
Date: Wed, 28 Aug 2024 15:10:19 -0500
Subject: [PATCH v11 08/11] Use pg_upgrade's new parallel framework for postfix
operator check.
Reviewed-by: FIXME
Discussion: https://postgr.es/m/20240516211638.GA1688936%40nathanxps13
---
src/bin/pg_upgrade/check.c | 146 +++++++++++++++++++------------------
1 file changed, 75 insertions(+), 71 deletions(-)
diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index b8af7e541b..28c4ddbca3 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -1315,95 +1315,99 @@ check_for_isn_and_int8_passing_mismatch(ClusterInfo
*cluster)
}
/*
- * Verify that no user defined postfix operators exist.
+ * Callback function for processing result of query for
+ * check_for_user_defined_postfix_ops()'s UpgradeTask. If the query returned
+ * any rows (i.e., the check failed), write the details to the report file.
*/
static void
-check_for_user_defined_postfix_ops(ClusterInfo *cluster)
+process_user_defined_postfix_ops(DbInfo *dbinfo, PGresult *res, void *arg)
{
- int dbnum;
- FILE *script = NULL;
- char output_path[MAXPGPATH];
+ UpgradeTaskReport *report = (UpgradeTaskReport *) arg;
+ int ntups = PQntuples(res);
+ bool db_used = false;
+ int i_oproid = PQfnumber(res, "oproid");
+ int i_oprnsp = PQfnumber(res, "oprnsp");
+ int i_oprname = PQfnumber(res, "oprname");
+ int i_typnsp = PQfnumber(res, "typnsp");
+ int i_typname = PQfnumber(res, "typname");
- prep_status("Checking for user-defined postfix operators");
+ AssertVariableIsOfType(&process_user_defined_postfix_ops,
+ UpgradeTaskProcessCB);
- snprintf(output_path, sizeof(output_path), "%s/%s",
- log_opts.basedir,
- "postfix_ops.txt");
+ if (!ntups)
+ return;
- /* Find any user defined postfix operators */
- for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
+ for (int rowno = 0; rowno < ntups; rowno++)
{
- PGresult *res;
- bool db_used = false;
- int ntups;
- int rowno;
- int i_oproid,
- i_oprnsp,
- i_oprname,
- i_typnsp,
- i_typname;
- DbInfo *active_db = &cluster->dbarr.dbs[dbnum];
- PGconn *conn = connectToServer(cluster, active_db->db_name);
-
- /*
- * The query below hardcodes FirstNormalObjectId as 16384
rather than
- * interpolating that C #define into the query because, if that
- * #define is ever changed, the cutoff we want to use is the
value
- * used by pre-version 14 servers, not that of some future
version.
- */
- res = executeQueryOrDie(conn,
- "SELECT o.oid
AS oproid, "
- "
n.nspname AS oprnsp, "
- "
o.oprname, "
- "
tn.nspname AS typnsp, "
- "
t.typname "
- "FROM
pg_catalog.pg_operator o, "
- "
pg_catalog.pg_namespace n, "
- "
pg_catalog.pg_type t, "
- "
pg_catalog.pg_namespace tn "
- "WHERE
o.oprnamespace = n.oid AND "
- "
o.oprleft = t.oid AND "
- "
t.typnamespace = tn.oid AND "
- "
o.oprright = 0 AND "
- " o.oid >=
16384");
- ntups = PQntuples(res);
- i_oproid = PQfnumber(res, "oproid");
- i_oprnsp = PQfnumber(res, "oprnsp");
- i_oprname = PQfnumber(res, "oprname");
- i_typnsp = PQfnumber(res, "typnsp");
- i_typname = PQfnumber(res, "typname");
- for (rowno = 0; rowno < ntups; rowno++)
+ if (report->file == NULL &&
+ (report->file = fopen_priv(report->path, "w")) == NULL)
+ pg_fatal("could not open file \"%s\": %m",
report->path);
+ if (!db_used)
{
- if (script == NULL &&
- (script = fopen_priv(output_path, "w")) == NULL)
- pg_fatal("could not open file \"%s\": %m",
output_path);
- if (!db_used)
- {
- fprintf(script, "In database: %s\n",
active_db->db_name);
- db_used = true;
- }
- fprintf(script, " (oid=%s) %s.%s (%s.%s, NONE)\n",
- PQgetvalue(res, rowno, i_oproid),
- PQgetvalue(res, rowno, i_oprnsp),
- PQgetvalue(res, rowno, i_oprname),
- PQgetvalue(res, rowno, i_typnsp),
- PQgetvalue(res, rowno, i_typname));
+ fprintf(report->file, "In database: %s\n",
dbinfo->db_name);
+ db_used = true;
}
+ fprintf(report->file, " (oid=%s) %s.%s (%s.%s, NONE)\n",
+ PQgetvalue(res, rowno, i_oproid),
+ PQgetvalue(res, rowno, i_oprnsp),
+ PQgetvalue(res, rowno, i_oprname),
+ PQgetvalue(res, rowno, i_typnsp),
+ PQgetvalue(res, rowno, i_typname));
+ }
+}
- PQclear(res);
+/*
+ * Verify that no user defined postfix operators exist.
+ */
+static void
+check_for_user_defined_postfix_ops(ClusterInfo *cluster)
+{
+ UpgradeTaskReport report;
+ UpgradeTask *task = upgrade_task_create();
+ const char *query;
- PQfinish(conn);
- }
+ /*
+ * The query below hardcodes FirstNormalObjectId as 16384 rather than
+ * interpolating that C #define into the query because, if that #define
is
+ * ever changed, the cutoff we want to use is the value used by
+ * pre-version 14 servers, not that of some future version.
+ */
+ query = "SELECT o.oid AS oproid, "
+ " n.nspname AS oprnsp, "
+ " o.oprname, "
+ " tn.nspname AS typnsp, "
+ " t.typname "
+ "FROM pg_catalog.pg_operator o, "
+ " pg_catalog.pg_namespace n, "
+ " pg_catalog.pg_type t, "
+ " pg_catalog.pg_namespace tn "
+ "WHERE o.oprnamespace = n.oid AND "
+ " o.oprleft = t.oid AND "
+ " t.typnamespace = tn.oid AND "
+ " o.oprright = 0 AND "
+ " o.oid >= 16384";
- if (script)
+ prep_status("Checking for user-defined postfix operators");
+
+ report.file = NULL;
+ snprintf(report.path, sizeof(report.path), "%s/%s",
+ log_opts.basedir,
+ "postfix_ops.txt");
+
+ upgrade_task_add_step(task, query, process_user_defined_postfix_ops,
+ true, &report);
+ upgrade_task_run(task, cluster);
+ upgrade_task_free(task);
+
+ if (report.file)
{
- fclose(script);
+ fclose(report.file);
pg_log(PG_REPORT, "fatal");
pg_fatal("Your installation contains user-defined postfix
operators, which are not\n"
"supported anymore. Consider dropping the
postfix operators and replacing\n"
"them with prefix operators or function
calls.\n"
"A list of user-defined postfix operators is
in the file:\n"
- " %s", output_path);
+ " %s", report.path);
}
else
check_ok();
--
2.39.3 (Apple Git-146)
>From 28f8eefff5c173d6a79c49e9fa9754a4603cfca2 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <[email protected]>
Date: Wed, 28 Aug 2024 15:21:29 -0500
Subject: [PATCH v11 09/11] Use pg_upgrade's new parallel framework for
polymorphics check.
Reviewed-by: FIXME
Discussion: https://postgr.es/m/20240516211638.GA1688936%40nathanxps13
---
src/bin/pg_upgrade/check.c | 159 +++++++++++++++++++------------------
1 file changed, 83 insertions(+), 76 deletions(-)
diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index 28c4ddbca3..92a3aa6a77 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -1413,6 +1413,40 @@ check_for_user_defined_postfix_ops(ClusterInfo *cluster)
check_ok();
}
+/*
+ * Callback function for processing results of query for
+ * check_for_incompatible_polymorphics()'s UpgradeTask. If the query returned
+ * any rows (i.e., the check failed), write the details to the report file.
+ */
+static void
+process_incompat_polymorphics(DbInfo *dbinfo, PGresult *res, void *arg)
+{
+ UpgradeTaskReport *report = (UpgradeTaskReport *) arg;
+ bool db_used = false;
+ int ntups = PQntuples(res);
+ int i_objkind = PQfnumber(res, "objkind");
+ int i_objname = PQfnumber(res, "objname");
+
+ AssertVariableIsOfType(&process_incompat_polymorphics,
+ UpgradeTaskProcessCB);
+
+ for (int rowno = 0; rowno < ntups; rowno++)
+ {
+ if (report->file == NULL &&
+ (report->file = fopen_priv(report->path, "w")) == NULL)
+ pg_fatal("could not open file \"%s\": %m",
report->path);
+ if (!db_used)
+ {
+ fprintf(report->file, "In database: %s\n",
dbinfo->db_name);
+ db_used = true;
+ }
+
+ fprintf(report->file, " %s: %s\n",
+ PQgetvalue(res, rowno, i_objkind),
+ PQgetvalue(res, rowno, i_objname));
+ }
+}
+
/*
* check_for_incompatible_polymorphics()
*
@@ -1422,14 +1456,15 @@ check_for_user_defined_postfix_ops(ClusterInfo *cluster)
static void
check_for_incompatible_polymorphics(ClusterInfo *cluster)
{
- PGresult *res;
- FILE *script = NULL;
- char output_path[MAXPGPATH];
PQExpBufferData old_polymorphics;
+ UpgradeTask *task = upgrade_task_create();
+ UpgradeTaskReport report;
+ char *query;
prep_status("Checking for incompatible polymorphic functions");
- snprintf(output_path, sizeof(output_path), "%s/%s",
+ report.file = NULL;
+ snprintf(report.path, sizeof(report.path), "%s/%s",
log_opts.basedir,
"incompatible_polymorphics.txt");
@@ -1453,80 +1488,51 @@ check_for_incompatible_polymorphics(ClusterInfo
*cluster)
",
'array_positions(anyarray,anyelement)'"
",
'width_bucket(anyelement,anyarray)'");
- for (int dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
- {
- bool db_used = false;
- DbInfo *active_db = &cluster->dbarr.dbs[dbnum];
- PGconn *conn = connectToServer(cluster, active_db->db_name);
- int ntups;
- int i_objkind,
- i_objname;
-
- /*
- * The query below hardcodes FirstNormalObjectId as 16384
rather than
- * interpolating that C #define into the query because, if that
- * #define is ever changed, the cutoff we want to use is the
value
- * used by pre-version 14 servers, not that of some future
version.
- */
- res = executeQueryOrDie(conn,
- /* Aggregate transition functions */
- "SELECT
'aggregate' AS objkind, p.oid::regprocedure::text AS objname "
- "FROM pg_proc
AS p "
- "JOIN
pg_aggregate AS a ON a.aggfnoid=p.oid "
- "JOIN pg_proc
AS transfn ON transfn.oid=a.aggtransfn "
- "WHERE p.oid >=
16384 "
- "AND
a.aggtransfn = ANY(ARRAY[%s]::regprocedure[]) "
- "AND
a.aggtranstype = ANY(ARRAY['anyarray', 'anyelement']::regtype[]) "
-
- /* Aggregate final functions */
- "UNION ALL "
- "SELECT
'aggregate' AS objkind, p.oid::regprocedure::text AS objname "
- "FROM pg_proc
AS p "
- "JOIN
pg_aggregate AS a ON a.aggfnoid=p.oid "
- "JOIN pg_proc
AS finalfn ON finalfn.oid=a.aggfinalfn "
- "WHERE p.oid >=
16384 "
- "AND
a.aggfinalfn = ANY(ARRAY[%s]::regprocedure[]) "
- "AND
a.aggtranstype = ANY(ARRAY['anyarray', 'anyelement']::regtype[]) "
-
- /* Operators */
- "UNION ALL "
- "SELECT
'operator' AS objkind, op.oid::regoperator::text AS objname "
- "FROM
pg_operator AS op "
- "WHERE op.oid
>= 16384 "
- "AND oprcode =
ANY(ARRAY[%s]::regprocedure[]) "
- "AND oprleft =
ANY(ARRAY['anyarray', 'anyelement']::regtype[]);",
-
old_polymorphics.data,
-
old_polymorphics.data,
-
old_polymorphics.data);
-
- ntups = PQntuples(res);
-
- i_objkind = PQfnumber(res, "objkind");
- i_objname = PQfnumber(res, "objname");
-
- for (int rowno = 0; rowno < ntups; rowno++)
- {
- if (script == NULL &&
- (script = fopen_priv(output_path, "w")) == NULL)
- pg_fatal("could not open file \"%s\": %m",
output_path);
- if (!db_used)
- {
- fprintf(script, "In database: %s\n",
active_db->db_name);
- db_used = true;
- }
-
- fprintf(script, " %s: %s\n",
- PQgetvalue(res, rowno, i_objkind),
- PQgetvalue(res, rowno, i_objname));
- }
+ /*
+ * The query below hardcodes FirstNormalObjectId as 16384 rather than
+ * interpolating that C #define into the query because, if that #define
is
+ * ever changed, the cutoff we want to use is the value used by
+ * pre-version 14 servers, not that of some future version.
+ */
- PQclear(res);
- PQfinish(conn);
- }
+ /* Aggregate transition functions */
+ query = psprintf("SELECT 'aggregate' AS objkind,
p.oid::regprocedure::text AS objname "
+ "FROM pg_proc AS p "
+ "JOIN pg_aggregate AS a ON
a.aggfnoid=p.oid "
+ "JOIN pg_proc AS transfn ON
transfn.oid=a.aggtransfn "
+ "WHERE p.oid >= 16384 "
+ "AND a.aggtransfn =
ANY(ARRAY[%s]::regprocedure[]) "
+ "AND a.aggtranstype =
ANY(ARRAY['anyarray', 'anyelement']::regtype[]) "
+
+ /* Aggregate final functions */
+ "UNION ALL "
+ "SELECT 'aggregate' AS objkind,
p.oid::regprocedure::text AS objname "
+ "FROM pg_proc AS p "
+ "JOIN pg_aggregate AS a ON
a.aggfnoid=p.oid "
+ "JOIN pg_proc AS finalfn ON
finalfn.oid=a.aggfinalfn "
+ "WHERE p.oid >= 16384 "
+ "AND a.aggfinalfn =
ANY(ARRAY[%s]::regprocedure[]) "
+ "AND a.aggtranstype =
ANY(ARRAY['anyarray', 'anyelement']::regtype[]) "
+
+ /* Operators */
+ "UNION ALL "
+ "SELECT 'operator' AS objkind,
op.oid::regoperator::text AS objname "
+ "FROM pg_operator AS op "
+ "WHERE op.oid >= 16384 "
+ "AND oprcode =
ANY(ARRAY[%s]::regprocedure[]) "
+ "AND oprleft = ANY(ARRAY['anyarray',
'anyelement']::regtype[]);",
+ old_polymorphics.data,
+ old_polymorphics.data,
+ old_polymorphics.data);
+
+ upgrade_task_add_step(task, query, process_incompat_polymorphics,
+ true, &report);
+ upgrade_task_run(task, cluster);
+ upgrade_task_free(task);
- if (script)
+ if (report.file)
{
- fclose(script);
+ fclose(report.file);
pg_log(PG_REPORT, "fatal");
pg_fatal("Your installation contains user-defined objects that
refer to internal\n"
"polymorphic functions with arguments of type
\"anyarray\" or \"anyelement\".\n"
@@ -1534,12 +1540,13 @@ check_for_incompatible_polymorphics(ClusterInfo
*cluster)
"afterwards, changing them to refer to the new
corresponding functions with\n"
"arguments of type \"anycompatiblearray\" and
\"anycompatible\".\n"
"A list of the problematic objects is in the
file:\n"
- " %s", output_path);
+ " %s", report.path);
}
else
check_ok();
termPQExpBuffer(&old_polymorphics);
+ pg_free(query);
}
/*
--
2.39.3 (Apple Git-146)
>From 96f83a360944941cbf2963a7795e28ad4f3537c1 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <[email protected]>
Date: Wed, 28 Aug 2024 15:27:37 -0500
Subject: [PATCH v11 10/11] Use pg_upgrade's new parallel framework for WITH
OIDS check.
Reviewed-by: FIXME
Discussion: https://postgr.es/m/20240516211638.GA1688936%40nathanxps13
---
src/bin/pg_upgrade/check.c | 100 +++++++++++++++++++------------------
1 file changed, 52 insertions(+), 48 deletions(-)
diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index 92a3aa6a77..dff440b29a 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -1550,72 +1550,76 @@ check_for_incompatible_polymorphics(ClusterInfo
*cluster)
}
/*
- * Verify that no tables are declared WITH OIDS.
+ * Callback function for processing results of query for
+ * check_for_tables_with_oids()'s UpgradeTask. If the query returned any rows
+ * (i.e., the check failed), write the details to the report file.
*/
static void
-check_for_tables_with_oids(ClusterInfo *cluster)
+process_with_oids_check(DbInfo *dbinfo, PGresult *res, void *arg)
{
- int dbnum;
- FILE *script = NULL;
- char output_path[MAXPGPATH];
+ UpgradeTaskReport *report = (UpgradeTaskReport *) arg;
+ bool db_used = false;
+ int ntups = PQntuples(res);
+ int i_nspname = PQfnumber(res, "nspname");
+ int i_relname = PQfnumber(res, "relname");
- prep_status("Checking for tables WITH OIDS");
+ AssertVariableIsOfType(&process_with_oids_check, UpgradeTaskProcessCB);
- snprintf(output_path, sizeof(output_path), "%s/%s",
- log_opts.basedir,
- "tables_with_oids.txt");
+ if (!ntups)
+ return;
- /* Find any tables declared WITH OIDS */
- for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
+ for (int rowno = 0; rowno < ntups; rowno++)
{
- PGresult *res;
- bool db_used = false;
- int ntups;
- int rowno;
- int i_nspname,
- i_relname;
- DbInfo *active_db = &cluster->dbarr.dbs[dbnum];
- PGconn *conn = connectToServer(cluster, active_db->db_name);
-
- res = executeQueryOrDie(conn,
- "SELECT
n.nspname, c.relname "
- "FROM
pg_catalog.pg_class c, "
- "
pg_catalog.pg_namespace n "
- "WHERE
c.relnamespace = n.oid AND "
- "
c.relhasoids AND"
- "
n.nspname NOT IN ('pg_catalog')");
-
- ntups = PQntuples(res);
- i_nspname = PQfnumber(res, "nspname");
- i_relname = PQfnumber(res, "relname");
- for (rowno = 0; rowno < ntups; rowno++)
+ if (report->file == NULL &&
+ (report->file = fopen_priv(report->path, "w")) == NULL)
+ pg_fatal("could not open file \"%s\": %m",
report->path);
+ if (!db_used)
{
- if (script == NULL && (script = fopen_priv(output_path,
"w")) == NULL)
- pg_fatal("could not open file \"%s\": %m",
output_path);
- if (!db_used)
- {
- fprintf(script, "In database: %s\n",
active_db->db_name);
- db_used = true;
- }
- fprintf(script, " %s.%s\n",
- PQgetvalue(res, rowno, i_nspname),
- PQgetvalue(res, rowno, i_relname));
+ fprintf(report->file, "In database: %s\n",
dbinfo->db_name);
+ db_used = true;
}
+ fprintf(report->file, " %s.%s\n",
+ PQgetvalue(res, rowno, i_nspname),
+ PQgetvalue(res, rowno, i_relname));
+ }
+}
- PQclear(res);
+/*
+ * Verify that no tables are declared WITH OIDS.
+ */
+static void
+check_for_tables_with_oids(ClusterInfo *cluster)
+{
+ UpgradeTaskReport report;
+ UpgradeTask *task = upgrade_task_create();
+ const char *query = "SELECT n.nspname, c.relname "
+ "FROM pg_catalog.pg_class c, "
+ " pg_catalog.pg_namespace n "
+ "WHERE c.relnamespace = n.oid AND "
+ " c.relhasoids AND"
+ " n.nspname NOT IN ('pg_catalog')";
- PQfinish(conn);
- }
+ prep_status("Checking for tables WITH OIDS");
- if (script)
+ report.file = NULL;
+ snprintf(report.path, sizeof(report.path), "%s/%s",
+ log_opts.basedir,
+ "tables_with_oids.txt");
+
+ upgrade_task_add_step(task, query, process_with_oids_check,
+ true, &report);
+ upgrade_task_run(task, cluster);
+ upgrade_task_free(task);
+
+ if (report.file)
{
- fclose(script);
+ fclose(report.file);
pg_log(PG_REPORT, "fatal");
pg_fatal("Your installation contains tables declared WITH OIDS,
which is not\n"
"supported anymore. Consider removing the oid
column using\n"
" ALTER TABLE ... SET WITHOUT OIDS;\n"
"A list of tables with the problem is in the
file:\n"
- " %s", output_path);
+ " %s", report.path);
}
else
check_ok();
--
2.39.3 (Apple Git-146)
>From 5fa06c396ec894ba5625ef00cbc621798547c2a7 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <[email protected]>
Date: Wed, 28 Aug 2024 15:35:31 -0500
Subject: [PATCH v11 11/11] Use pg_upgrade's parallel framework for encoding
conversion check.
Reviewed-by: FIXME
Discussion: https://postgr.es/m/20240516211638.GA1688936%40nathanxps13
---
src/bin/pg_upgrade/check.c | 120 ++++++++++++++++++++-----------------
1 file changed, 64 insertions(+), 56 deletions(-)
diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index dff440b29a..01ab3d0694 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -1684,81 +1684,89 @@ check_for_pg_role_prefix(ClusterInfo *cluster)
}
/*
- * Verify that no user-defined encoding conversions exist.
+ * Callback function for processing results of query for
+ * check_for_user_defined_encoding_conversions()'s UpgradeTask. If the query
+ * returned any rows (i.e., the check failed), write the details to the report
+ * file.
*/
static void
-check_for_user_defined_encoding_conversions(ClusterInfo *cluster)
+process_user_defined_encoding_conversions(DbInfo *dbinfo, PGresult *res, void
*arg)
{
- int dbnum;
- FILE *script = NULL;
- char output_path[MAXPGPATH];
+ UpgradeTaskReport *report = (UpgradeTaskReport *) arg;
+ bool db_used = false;
+ int ntups = PQntuples(res);
+ int i_conoid = PQfnumber(res, "conoid");
+ int i_conname = PQfnumber(res, "conname");
+ int i_nspname = PQfnumber(res, "nspname");
- prep_status("Checking for user-defined encoding conversions");
+ AssertVariableIsOfType(&process_user_defined_encoding_conversions,
+ UpgradeTaskProcessCB);
- snprintf(output_path, sizeof(output_path), "%s/%s",
- log_opts.basedir,
- "encoding_conversions.txt");
+ if (!ntups)
+ return;
- /* Find any user defined encoding conversions */
- for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
+ for (int rowno = 0; rowno < ntups; rowno++)
{
- PGresult *res;
- bool db_used = false;
- int ntups;
- int rowno;
- int i_conoid,
- i_conname,
- i_nspname;
- DbInfo *active_db = &cluster->dbarr.dbs[dbnum];
- PGconn *conn = connectToServer(cluster, active_db->db_name);
-
- /*
- * The query below hardcodes FirstNormalObjectId as 16384
rather than
- * interpolating that C #define into the query because, if that
- * #define is ever changed, the cutoff we want to use is the
value
- * used by pre-version 14 servers, not that of some future
version.
- */
- res = executeQueryOrDie(conn,
- "SELECT c.oid
as conoid, c.conname, n.nspname "
- "FROM
pg_catalog.pg_conversion c, "
- "
pg_catalog.pg_namespace n "
- "WHERE
c.connamespace = n.oid AND "
- " c.oid >=
16384");
- ntups = PQntuples(res);
- i_conoid = PQfnumber(res, "conoid");
- i_conname = PQfnumber(res, "conname");
- i_nspname = PQfnumber(res, "nspname");
- for (rowno = 0; rowno < ntups; rowno++)
+ if (report->file == NULL &&
+ (report->file = fopen_priv(report->path, "w")) == NULL)
+ pg_fatal("could not open file \"%s\": %m",
report->path);
+ if (!db_used)
{
- if (script == NULL &&
- (script = fopen_priv(output_path, "w")) == NULL)
- pg_fatal("could not open file \"%s\": %m",
output_path);
- if (!db_used)
- {
- fprintf(script, "In database: %s\n",
active_db->db_name);
- db_used = true;
- }
- fprintf(script, " (oid=%s) %s.%s\n",
- PQgetvalue(res, rowno, i_conoid),
- PQgetvalue(res, rowno, i_nspname),
- PQgetvalue(res, rowno, i_conname));
+ fprintf(report->file, "In database: %s\n",
dbinfo->db_name);
+ db_used = true;
}
+ fprintf(report->file, " (oid=%s) %s.%s\n",
+ PQgetvalue(res, rowno, i_conoid),
+ PQgetvalue(res, rowno, i_nspname),
+ PQgetvalue(res, rowno, i_conname));
+ }
+}
- PQclear(res);
+/*
+ * Verify that no user-defined encoding conversions exist.
+ */
+static void
+check_for_user_defined_encoding_conversions(ClusterInfo *cluster)
+{
+ UpgradeTaskReport report;
+ UpgradeTask *task = upgrade_task_create();
+ const char *query;
- PQfinish(conn);
- }
+ prep_status("Checking for user-defined encoding conversions");
- if (script)
+ report.file = NULL;
+ snprintf(report.path, sizeof(report.path), "%s/%s",
+ log_opts.basedir,
+ "encoding_conversions.txt");
+
+ /*
+ * The query below hardcodes FirstNormalObjectId as 16384 rather than
+ * interpolating that C #define into the query because, if that #define
is
+ * ever changed, the cutoff we want to use is the value used by
+ * pre-version 14 servers, not that of some future version.
+ */
+ query = "SELECT c.oid as conoid, c.conname, n.nspname "
+ "FROM pg_catalog.pg_conversion c, "
+ " pg_catalog.pg_namespace n "
+ "WHERE c.connamespace = n.oid AND "
+ " c.oid >= 16384";
+
+ upgrade_task_add_step(task, query,
+
process_user_defined_encoding_conversions,
+ true, &report);
+ upgrade_task_run(task, cluster);
+ upgrade_task_free(task);
+
+ if (report.file)
{
- fclose(script);
+ fclose(report.file);
pg_log(PG_REPORT, "fatal");
pg_fatal("Your installation contains user-defined encoding
conversions.\n"
"The conversion function parameters changed in
PostgreSQL version 14\n"
"so this cluster cannot currently be upgraded.
You can remove the\n"
"encoding conversions in the old cluster and
restart the upgrade.\n"
"A list of user-defined encoding conversions
is in the file:\n"
- " %s", output_path);
+ " %s", report.path);
}
else
check_ok();
--
2.39.3 (Apple Git-146)