From a47c0da05a8ffc9119ad5f939c0e0d2956b8b506 Mon Sep 17 00:00:00 2001
From: Julien Rouhaud <julien.rouhaud@free.fr>
Date: Fri, 28 Jun 2019 13:21:58 +0200
Subject: [PATCH 2/2] Add parallel processing to reindexdb

---
 doc/src/sgml/ref/reindexdb.sgml    |  23 +++
 src/bin/scripts/Makefile           |   2 +-
 src/bin/scripts/reindexdb.c        | 311 ++++++++++++++++++++++++-----
 src/bin/scripts/t/090_reindexdb.pl |  14 +-
 4 files changed, 300 insertions(+), 50 deletions(-)

diff --git a/doc/src/sgml/ref/reindexdb.sgml b/doc/src/sgml/ref/reindexdb.sgml
index 25b5a72770..a7031030b9 100644
--- a/doc/src/sgml/ref/reindexdb.sgml
+++ b/doc/src/sgml/ref/reindexdb.sgml
@@ -166,6 +166,29 @@ PostgreSQL documentation
       </listitem>
      </varlistentry>
 
+     <varlistentry>
+      <term><option>-j <replaceable class="parameter">njobs</replaceable></option></term>
+      <term><option>--jobs=<replaceable class="parameter">njobs</replaceable></option></term>
+      <listitem>
+       <para>
+        Execute the reindex commands in parallel by running
+        <replaceable class="parameter">njobs</replaceable>
+        commands simultaneously.  This option reduces the time of the
+        processing but it also increases the load on the database server.
+       </para>
+       <para>
+        <application>reindexdb</application> will open
+        <replaceable class="parameter">njobs</replaceable> connections to the
+        database, so make sure your <xref linkend="guc-max-connections"/>
+        setting is high enough to accommodate all connections.
+       </para>
+       <para>
+        Note that this mode is not compatible the <option>-i / --index</option>
+        or the <option>-s / --system</option> options.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry>
       <term><option>-q</option></term>
       <term><option>--quiet</option></term>
diff --git a/src/bin/scripts/Makefile b/src/bin/scripts/Makefile
index 3cd793b134..ede665090f 100644
--- a/src/bin/scripts/Makefile
+++ b/src/bin/scripts/Makefile
@@ -29,7 +29,7 @@ dropdb: dropdb.o common.o $(WIN32RES) | submake-libpq submake-libpgport submake-
 dropuser: dropuser.o common.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils
 clusterdb: clusterdb.o common.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils
 vacuumdb: vacuumdb.o common.o scripts_parallel.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils
-reindexdb: reindexdb.o common.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils
+reindexdb: reindexdb.o common.o scripts_parallel.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils
 pg_isready: pg_isready.o common.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils
 
 install: all installdirs
diff --git a/src/bin/scripts/reindexdb.c b/src/bin/scripts/reindexdb.c
index ca61348a0e..45b1f5b4c8 100644
--- a/src/bin/scripts/reindexdb.c
+++ b/src/bin/scripts/reindexdb.c
@@ -10,10 +10,14 @@
  */
 
 #include "postgres_fe.h"
+
+#include "catalog/pg_class_d.h"
 #include "common.h"
 #include "common/logging.h"
+#include "fe_utils/connect.h"
 #include "fe_utils/simple_list.h"
 #include "fe_utils/string_utils.h"
+#include "scripts_parallel.h"
 
 typedef enum ReindexType
 {
@@ -25,16 +29,25 @@ typedef enum ReindexType
 } ReindexType;
 
 
-static void reindex_one_database(const char *name, const char *dbname,
-								 ReindexType type, const char *host,
+static SimpleStringList *get_parallel_object_list(PGconn *conn,
+												  const char *progname,
+												  bool echo);
+static void reindex_one_database(const char *dbname, ReindexType type,
+								 SimpleStringList *user_list, const char *host,
 								 const char *port, const char *username,
 								 enum trivalue prompt_password, const char *progname,
-								 bool echo, bool verbose, bool concurrently);
+								 bool echo, bool verbose, bool concurrently,
+								 int concurrentCons);
 static void reindex_all_databases(const char *maintenance_db,
 								  const char *host, const char *port,
 								  const char *username, enum trivalue prompt_password,
 								  const char *progname, bool echo,
-								  bool quiet, bool verbose, bool concurrently);
+								  bool quiet, bool verbose, bool concurrently,
+								  int concurrentCons);
+static void run_reindex_command(PGconn *conn, ReindexType type,
+								const char *name, const char *progname, bool echo,
+								bool verbose, bool concurrently, bool async);
+
 static void help(const char *progname);
 
 int
@@ -54,6 +67,7 @@ main(int argc, char *argv[])
 		{"system", no_argument, NULL, 's'},
 		{"table", required_argument, NULL, 't'},
 		{"index", required_argument, NULL, 'i'},
+		{"jobs", required_argument, NULL, 'j'},
 		{"verbose", no_argument, NULL, 'v'},
 		{"concurrently", no_argument, NULL, 1},
 		{"maintenance-db", required_argument, NULL, 2},
@@ -79,6 +93,9 @@ main(int argc, char *argv[])
 	SimpleStringList indexes = {NULL, NULL};
 	SimpleStringList tables = {NULL, NULL};
 	SimpleStringList schemas = {NULL, NULL};
+	int			concurrentCons = 1;
+	int			tbl_count = 0,
+				nsp_count = 0;
 
 	pg_logging_init(argv[0]);
 	progname = get_progname(argv[0]);
@@ -87,7 +104,7 @@ main(int argc, char *argv[])
 	handle_help_version_opts(argc, argv, "reindexdb", help);
 
 	/* process command-line options */
-	while ((c = getopt_long(argc, argv, "h:p:U:wWeqS:d:ast:i:v", long_options, &optindex)) != -1)
+	while ((c = getopt_long(argc, argv, "h:p:U:wWeqS:d:ast:i:j:v", long_options, &optindex)) != -1)
 	{
 		switch (c)
 		{
@@ -114,6 +131,7 @@ main(int argc, char *argv[])
 				break;
 			case 'S':
 				simple_string_list_append(&schemas, optarg);
+				nsp_count++;
 				break;
 			case 'd':
 				dbname = pg_strdup(optarg);
@@ -126,10 +144,25 @@ main(int argc, char *argv[])
 				break;
 			case 't':
 				simple_string_list_append(&tables, optarg);
+				tbl_count++;
 				break;
 			case 'i':
 				simple_string_list_append(&indexes, optarg);
 				break;
+			case 'j':
+				concurrentCons = atoi(optarg);
+				if (concurrentCons <= 0)
+				{
+					pg_log_error("number of parallel jobs must be at least 1");
+					exit(1);
+				}
+				if (concurrentCons > FD_SETSIZE - 1)
+				{
+					pg_log_error("too many parallel jobs requested (maximum: %d)",
+								 FD_SETSIZE - 1);
+					exit(1);
+				}
+				break;
 			case 'v':
 				verbose = true;
 				break;
@@ -194,7 +227,8 @@ main(int argc, char *argv[])
 		}
 
 		reindex_all_databases(maintenance_db, host, port, username,
-							  prompt_password, progname, echo, quiet, verbose, concurrently);
+							  prompt_password, progname, echo, quiet, verbose,
+							  concurrently, 1);
 	}
 	else if (syscatalog)
 	{
@@ -214,6 +248,12 @@ main(int argc, char *argv[])
 			exit(1);
 		}
 
+		if (concurrentCons > 1)
+		{
+			pg_log_error("cannot use multiple jobs to reindex system catalogs");
+			exit(1);
+		}
+
 		if (dbname == NULL)
 		{
 			if (getenv("PGDATABASE"))
@@ -224,9 +264,9 @@ main(int argc, char *argv[])
 				dbname = get_user_name_or_exit(progname);
 		}
 
-		reindex_one_database(NULL, dbname, REINDEX_SYSTEM, host,
+		reindex_one_database(dbname, REINDEX_SYSTEM, NULL, host,
 							 port, username, prompt_password, progname,
-							 echo, verbose, concurrently);
+							 echo, verbose, concurrently, 1);
 	}
 	else
 	{
@@ -240,62 +280,60 @@ main(int argc, char *argv[])
 				dbname = get_user_name_or_exit(progname);
 		}
 
-		if (schemas.head != NULL)
+		if (indexes.head != NULL)
 		{
-			SimpleStringListCell *cell;
-
-			for (cell = schemas.head; cell; cell = cell->next)
+			if (concurrentCons > 1)
 			{
-				reindex_one_database(cell->val, dbname, REINDEX_SCHEMA, host,
-									 port, username, prompt_password, progname,
-									 echo, verbose, concurrently);
+				pg_log_error("cannot use multiple jobs to reindex multiple indexes");
+				exit(1);
 			}
+
+			reindex_one_database(dbname, REINDEX_INDEX, &indexes, host,
+								 port, username, prompt_password, progname,
+								 echo, verbose, concurrently,
+								 1);
 		}
 
-		if (indexes.head != NULL)
-		{
-			SimpleStringListCell *cell;
+		if (schemas.head != NULL)
+			reindex_one_database(dbname, REINDEX_SCHEMA, &schemas, host,
+								 port, username, prompt_password, progname,
+								 echo, verbose, concurrently,
+								 Min(concurrentCons, nsp_count));
 
-			for (cell = indexes.head; cell; cell = cell->next)
-			{
-				reindex_one_database(cell->val, dbname, REINDEX_INDEX, host,
-									 port, username, prompt_password, progname,
-									 echo, verbose, concurrently);
-			}
-		}
 		if (tables.head != NULL)
-		{
-			SimpleStringListCell *cell;
-
-			for (cell = tables.head; cell; cell = cell->next)
-			{
-				reindex_one_database(cell->val, dbname, REINDEX_TABLE, host,
-									 port, username, prompt_password, progname,
-									 echo, verbose, concurrently);
-			}
-		}
+			reindex_one_database(dbname, REINDEX_TABLE, &tables, host,
+								 port, username, prompt_password, progname,
+								 echo, verbose, concurrently,
+								 Min(concurrentCons, tbl_count));
 
 		/*
 		 * reindex database only if neither index nor table nor schema is
 		 * specified
 		 */
 		if (indexes.head == NULL && tables.head == NULL && schemas.head == NULL)
-			reindex_one_database(NULL, dbname, REINDEX_DATABASE, host,
+			reindex_one_database(dbname, REINDEX_DATABASE, NULL, host,
 								 port, username, prompt_password, progname,
-								 echo, verbose, concurrently);
+								 echo, verbose, concurrently, concurrentCons);
 	}
 
 	exit(0);
 }
 
 static void
-reindex_one_database(const char *name, const char *dbname, ReindexType type,
-					 const char *host, const char *port, const char *username,
+reindex_one_database(const char *dbname, ReindexType type,
+					 SimpleStringList *user_list, const char *host,
+					 const char *port, const char *username,
 					 enum trivalue prompt_password, const char *progname, bool echo,
-					 bool verbose, bool concurrently)
+					 bool verbose, bool concurrently, int concurrentCons)
 {
-	PQExpBufferData sql;
 	PGconn	   *conn;
+	SimpleStringListCell *cell;
+	bool		parallel = concurrentCons > 1;
+	SimpleStringList *process_list = user_list;
+	ReindexType process_type = type;
+	ParallelSlot *slots;
+	int			i;
+	bool		failed = false;
 
 	conn = connectDatabase(dbname, host, port, username, prompt_password,
 						   progname, echo, false, false);
@@ -308,6 +346,101 @@ reindex_one_database(const char *name, const char *dbname, ReindexType type,
 		exit(1);
 	}
 
+	if (!parallel)
+	{
+		if (user_list == NULL)
+		{
+			/*
+			 * Create a dummy list with an empty string, as user requires an
+			 * element.
+			 */
+			process_list = pg_malloc0(sizeof(SimpleStringList));
+			simple_string_list_append(process_list, "");
+		}
+	}
+	else
+	{
+		/*
+		 * Database-wide parallel reindex requires special processing.  If
+		 * multiple jobs were asked, we have to reindex system catalogs first,
+		 * as they can't be processed in parallel.
+		 */
+		if (process_type == REINDEX_DATABASE)
+		{
+			Assert(user_list == NULL);
+			run_reindex_command(conn, REINDEX_SYSTEM, NULL, progname, echo,
+								verbose, concurrently, false);
+
+			process_type = REINDEX_TABLE;
+			process_list = get_parallel_object_list(conn, progname, echo);
+
+			/* Bail out if nothing to process */
+			if (process_list == NULL)
+			{
+				PQfinish(conn);
+				return;
+			}
+		}
+		else
+			Assert(user_list != NULL);
+	}
+
+	slots = SetupParallelSlots(dbname, host, port, username, prompt_password,
+							   progname, echo, conn, concurrentCons);
+
+	cell = process_list->head;
+	do
+	{
+		const char *objname = cell->val;
+		ParallelSlot *free_slot = NULL;
+
+		if (CancelRequested)
+		{
+			failed = true;
+			goto finish;
+		}
+
+		free_slot = ConsumeIdleSlot(slots, concurrentCons, progname);
+		if (!free_slot)
+		{
+			failed = true;
+			goto finish;
+		}
+
+		run_reindex_command(conn, process_type, objname, progname, echo,
+							verbose, concurrently, parallel);
+
+		cell = cell->next;
+	} while (cell != NULL);
+
+	if (!WaitForSlotsCompletion(slots, concurrentCons, progname))
+		failed = true;
+
+finish:
+	for (i = 0; i < concurrentCons; i++)
+	{
+		PGconn	   *conn = slots[i].connection;
+
+		if (conn == NULL)
+			continue;
+
+		disconnectDatabase(conn);
+		conn = NULL;
+	}
+	pfree(slots);
+
+	if (failed)
+		exit(1);
+}
+
+static void
+run_reindex_command(PGconn *conn, ReindexType type, const char *name,
+					const char *progname, bool echo, bool verbose,
+					bool concurrently, bool async)
+{
+	PQExpBufferData sql;
+	bool		status;
+
 	/* build the REINDEX query */
 	initPQExpBuffer(&sql);
 
@@ -358,7 +491,17 @@ reindex_one_database(const char *name, const char *dbname, ReindexType type,
 	/* finish the query */
 	appendPQExpBufferChar(&sql, ';');
 
-	if (!executeMaintenanceCommand(conn, sql.data, echo))
+	if (async)
+	{
+		if (echo)
+			printf("%s\n", sql.data);
+
+		status = PQsendQuery(conn, sql.data) == 1;
+	}
+	else
+		status = executeMaintenanceCommand(conn, sql.data, echo);
+
+	if (!status)
 	{
 		switch (type)
 		{
@@ -383,20 +526,90 @@ reindex_one_database(const char *name, const char *dbname, ReindexType type,
 							 name, PQdb(conn), PQerrorMessage(conn));
 				break;
 		}
-		PQfinish(conn);
-		exit(1);
+		if (!async)
+		{
+			PQfinish(conn);
+			exit(1);
+		}
 	}
 
-	PQfinish(conn);
 	termPQExpBuffer(&sql);
 }
 
+/*
+ * Prepare the list of objects to process by querying the catalogs.
+ *
+ * This function will return a SimpleStringList object containing the entire
+ * list of tables in the given database that should be processed by a parallel
+ * database-wide reindex (excluding system tables), or NULL if there's no such
+ * table.
+ */
+static SimpleStringList *
+get_parallel_object_list(PGconn *conn, const char *progname, bool echo)
+{
+	PQExpBufferData catalog_query;
+	PQExpBufferData buf;
+	PGresult   *res;
+	SimpleStringList *tables;
+	int			ntups,
+				i;
+
+	initPQExpBuffer(&catalog_query);
+
+	/*
+	 * This query is run using a safe search_path, so there's no need to fully
+	 * qualify everything.
+	 */
+	appendPQExpBuffer(&catalog_query,
+					  "SELECT c.relname, ns.nspname\n"
+					  " FROM pg_catalog.pg_class c\n"
+					  " JOIN pg_catalog.pg_namespace ns"
+					  " ON c.relnamespace = ns.oid\n"
+					  " WHERE ns.nspname != 'pg_catalog'\n"
+					  "   AND c.relkind IN ("
+					  CppAsString2(RELKIND_RELATION) ", "
+					  CppAsString2(RELKIND_MATVIEW) ")\n"
+					  " ORDER BY c.relpages DESC;");
+
+	res = executeQuery(conn, catalog_query.data, progname, echo);
+	termPQExpBuffer(&catalog_query);
+
+	/*
+	 * If no rows are returned, there are no matching tables, so we are done.
+	 */
+	ntups = PQntuples(res);
+	if (ntups == 0)
+	{
+		PQclear(res);
+		PQfinish(conn);
+		return NULL;
+	}
+
+	tables = pg_malloc0(sizeof(SimpleStringList));
+
+	/* Build qualified identifiers for each table */
+	initPQExpBuffer(&buf);
+	for (i = 0; i < ntups; i++)
+	{
+		appendPQExpBufferStr(&buf,
+							 fmtQualifiedId(PQgetvalue(res, i, 1),
+											PQgetvalue(res, i, 0)));
+
+		simple_string_list_append(tables, buf.data);
+		resetPQExpBuffer(&buf);
+	}
+	termPQExpBuffer(&buf);
+	PQclear(res);
+
+	return tables;
+}
+
 static void
 reindex_all_databases(const char *maintenance_db,
 					  const char *host, const char *port,
 					  const char *username, enum trivalue prompt_password,
 					  const char *progname, bool echo, bool quiet, bool verbose,
-					  bool concurrently)
+					  bool concurrently, int concurrentCons)
 {
 	PGconn	   *conn;
 	PGresult   *result;
@@ -423,9 +636,10 @@ reindex_all_databases(const char *maintenance_db,
 		appendPQExpBufferStr(&connstr, "dbname=");
 		appendConnStrVal(&connstr, dbname);
 
-		reindex_one_database(NULL, connstr.data, REINDEX_DATABASE, host,
+		reindex_one_database(connstr.data, REINDEX_DATABASE, NULL, host,
 							 port, username, prompt_password,
-							 progname, echo, verbose, concurrently);
+							 progname, echo, verbose, concurrently,
+							 concurrentCons);
 	}
 	termPQExpBuffer(&connstr);
 
@@ -444,6 +658,7 @@ help(const char *progname)
 	printf(_("  -d, --dbname=DBNAME       database to reindex\n"));
 	printf(_("  -e, --echo                show the commands being sent to the server\n"));
 	printf(_("  -i, --index=INDEX         recreate specific index(es) only\n"));
+	printf(_("  -j, --jobs=NUM            use this many concurrent connections to reindex\n"));
 	printf(_("  -q, --quiet               don't write any messages\n"));
 	printf(_("  -s, --system              reindex system catalogs\n"));
 	printf(_("  -S, --schema=SCHEMA       reindex specific schema(s) only\n"));
diff --git a/src/bin/scripts/t/090_reindexdb.pl b/src/bin/scripts/t/090_reindexdb.pl
index 1af8ab70ad..7f52e50566 100644
--- a/src/bin/scripts/t/090_reindexdb.pl
+++ b/src/bin/scripts/t/090_reindexdb.pl
@@ -3,7 +3,7 @@ use warnings;
 
 use PostgresNode;
 use TestLib;
-use Test::More tests => 34;
+use Test::More tests => 40;
 
 program_help_ok('reindexdb');
 program_version_ok('reindexdb');
@@ -77,3 +77,15 @@ $node->command_ok(
 $node->command_ok(
 	[qw(reindexdb --echo --system dbname=template1)],
 	'reindexdb system with connection string');
+
+# parallel processing
+$node->command_fails([qw(reindexdb -j2 -s)],
+   'reindexdb cannot process system catalogs in parallel');
+$node->command_fails([qw(reindexdb -j2 -i i1 -i i2)],
+   'reindexdb cannot process indexes in parallel');
+$node->issues_sql_like([qw(reindexdb -j2)],
+   qr/statement: REINDEX SYSTEM postgres/,
+   'Global and parallel reindex will issue a REINDEX SYSTEM');
+$node->issues_sql_like([qw(reindexdb -j2)],
+   qr/statement: REINDEX TABLE public.test1/,
+   'Global and parallel reindex will issue per-table REINDEX');
-- 
2.20.1

