Hi,

This patch adds a new option (-J num, --jobs-per-disk=num) in 
pg_upgrade to speed up copy mode. This generates upto ${num} 
processes per tablespace to copy segments of the same relfilenode 
in parallel.

This can help when you have many multi gigabyte tables (each segment 
is 1GB by default) in different tablespaces (each tablespace in a 
different disk) and multiple processors.

In a customer's database (~20Tb) it went down from 6h to 4h 45min.

It lacks documentation and I need help with WIN32 part of it, I created
this new mail to put the patch on the next commitfest.

Original thread: 
https://www.postgresql.org/message-id/flat/YZVbtHKYP02AZDIO%40ahch-to

-- 
Jaime Casanova
Director de Servicios Profesionales
SystemGuards - Consultores de PostgreSQL
>From 0d04f79cb51d6be0ced9c6561cfca5bfe18c4bdd Mon Sep 17 00:00:00 2001
From: Jaime Casanova <jcasa...@systemguards.com.ec>
Date: Wed, 15 Dec 2021 12:14:44 -0500
Subject: [PATCH] Add --jobs-per-disk option to allow multiple processes per
 tablespace

This option is independent of the --jobs one. It's will fork new processes
to copy the different segments of a relfilenode in parallel.
---
 src/bin/pg_upgrade/option.c      |  8 ++-
 src/bin/pg_upgrade/parallel.c    | 93 ++++++++++++++++++++++++++++++++
 src/bin/pg_upgrade/pg_upgrade.h  |  4 ++
 src/bin/pg_upgrade/relfilenode.c | 59 +++++++++++---------
 4 files changed, 139 insertions(+), 25 deletions(-)

diff --git a/src/bin/pg_upgrade/option.c b/src/bin/pg_upgrade/option.c
index 66fe16964e..46b1913a42 100644
--- a/src/bin/pg_upgrade/option.c
+++ b/src/bin/pg_upgrade/option.c
@@ -54,6 +54,7 @@ parseCommandLine(int argc, char *argv[])
 		{"link", no_argument, NULL, 'k'},
 		{"retain", no_argument, NULL, 'r'},
 		{"jobs", required_argument, NULL, 'j'},
+		{"jobs-per-disks", required_argument, NULL, 'J'},
 		{"socketdir", required_argument, NULL, 's'},
 		{"verbose", no_argument, NULL, 'v'},
 		{"clone", no_argument, NULL, 1},
@@ -103,7 +104,7 @@ parseCommandLine(int argc, char *argv[])
 	if (os_user_effective_id == 0)
 		pg_fatal("%s: cannot be run as root\n", os_info.progname);
 
-	while ((option = getopt_long(argc, argv, "d:D:b:B:cj:kNo:O:p:P:rs:U:v",
+	while ((option = getopt_long(argc, argv, "d:D:b:B:cj:J:kNo:O:p:P:rs:U:v",
 								 long_options, &optindex)) != -1)
 	{
 		switch (option)
@@ -132,6 +133,10 @@ parseCommandLine(int argc, char *argv[])
 				user_opts.jobs = atoi(optarg);
 				break;
 
+			case 'J':
+				user_opts.jobs_per_disk = atoi(optarg);
+				break;
+
 			case 'k':
 				user_opts.transfer_mode = TRANSFER_MODE_LINK;
 				break;
@@ -291,6 +296,7 @@ usage(void)
 	printf(_("  -d, --old-datadir=DATADIR     old cluster data directory\n"));
 	printf(_("  -D, --new-datadir=DATADIR     new cluster data directory\n"));
 	printf(_("  -j, --jobs=NUM                number of simultaneous processes or threads to use\n"));
+	printf(_("  -J, --jobs_per_disk=NUM       number of simultaneous processes or threads to use per tablespace\n"));
 	printf(_("  -k, --link                    link instead of copying files to new cluster\n"));
 	printf(_("  -N, --no-sync                 do not wait for changes to be written safely to disk\n"));
 	printf(_("  -o, --old-options=OPTIONS     old cluster options to pass to the server\n"));
diff --git a/src/bin/pg_upgrade/parallel.c b/src/bin/pg_upgrade/parallel.c
index ee7364da3b..82f698a9ab 100644
--- a/src/bin/pg_upgrade/parallel.c
+++ b/src/bin/pg_upgrade/parallel.c
@@ -17,6 +17,9 @@
 #include "pg_upgrade.h"
 
 static int	parallel_jobs;
+static int	current_jobs = 0;
+
+static bool      reap_subchild(bool wait_for_child);
 
 #ifdef WIN32
 /*
@@ -277,6 +280,60 @@ win32_transfer_all_new_dbs(transfer_thread_arg *args)
 #endif
 
 
+
+/*
+ * parallel_process_relfile_segment()
+ *
+ * Copy or link file from old cluster to new one.  If vm_must_add_frozenbit
+ * is true, visibility map forks are converted and rewritten, even in link
+ * mode.
+ */
+void
+parallel_process_relfile_segment(FileNameMap *map, const char *type_suffix, bool vm_must_add_frozenbit, const char *old_file, const char *new_file)
+{
+#ifndef WIN32
+	pid_t		child;
+#else
+	HANDLE		child;
+	transfer_thread_arg *new_arg;
+#endif
+	if (user_opts.jobs <= 1 || user_opts.jobs_per_disk <= 1)
+		process_relfile_segment(map, type_suffix, vm_must_add_frozenbit, old_file, new_file);
+	else
+	{
+		/* parallel */
+
+		/* harvest any dead children */
+		while (reap_subchild(false) == true)
+			;
+
+		/* must we wait for a dead child? use a maximum of 3 childs per tablespace */
+		if (current_jobs >= user_opts.jobs_per_disk)
+			reap_subchild(true);
+
+		/* set this before we start the job */
+		current_jobs++;
+
+		/* Ensure stdio state is quiesced before forking */
+		fflush(NULL);
+
+#ifndef WIN32
+		child = fork();
+		if (child == 0)
+		{
+			process_relfile_segment(map, type_suffix, vm_must_add_frozenbit, old_file, new_file);
+			/* use _exit to skip atexit() functions */
+			_exit(0);
+		}
+		else if (child < 0)
+			/* fork failed */
+			pg_fatal("could not create worker process: %s\n", strerror(errno));
+#endif
+	}
+}
+
+
+
 /*
  *	collect status from a completed worker child
  */
@@ -345,3 +402,39 @@ reap_child(bool wait_for_child)
 
 	return true;
 }
+
+
+
+
+/*
+ *	collect status from a completed worker subchild
+ */
+static bool
+reap_subchild(bool wait_for_child)
+{
+#ifndef WIN32
+	int			work_status;
+	pid_t		child;
+#else
+	int			thread_num;
+	DWORD		res;
+#endif
+
+	if (user_opts.jobs <= 1 || current_jobs == 0)
+		return false;
+
+#ifndef WIN32
+	child = waitpid(-1, &work_status, wait_for_child ? 0 : WNOHANG);
+	if (child == (pid_t) -1)
+		pg_fatal("waitpid() failed: %s\n", strerror(errno));
+	if (child == 0)
+		return false;			/* no children, or no dead children */
+	if (work_status != 0)
+		pg_fatal("child process exited abnormally: status %d\n", work_status);
+#endif
+
+	/* do this after job has been removed */
+	current_jobs--;
+
+	return true;
+}
diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h
index 22169f1002..adcb24ffea 100644
--- a/src/bin/pg_upgrade/pg_upgrade.h
+++ b/src/bin/pg_upgrade/pg_upgrade.h
@@ -282,6 +282,7 @@ typedef struct
 	bool		do_sync;		/* flush changes to disk */
 	transferMode transfer_mode; /* copy files or link them? */
 	int			jobs;			/* number of processes/threads to use */
+	int			jobs_per_disk;			/* number of processes/threads to use */
 	char	   *socketdir;		/* directory to use for Unix sockets */
 } UserOpts;
 
@@ -450,4 +451,7 @@ void		parallel_exec_prog(const char *log_file, const char *opt_log_file,
 void		parallel_transfer_all_new_dbs(DbInfoArr *old_db_arr, DbInfoArr *new_db_arr,
 										  char *old_pgdata, char *new_pgdata,
 										  char *old_tablespace);
+
+void 		process_relfile_segment(FileNameMap *map, const char *suffix, bool vm_must_add_frozenbit, const char *old_file, const char *new_file);
+void 		parallel_process_relfile_segment(FileNameMap *map, const char *suffix, bool vm_must_add_frozenbit, const char *old_file, const char *new_file);
 bool		reap_child(bool wait_for_child);
diff --git a/src/bin/pg_upgrade/relfilenode.c b/src/bin/pg_upgrade/relfilenode.c
index 5dbefbceaf..8a7c49efaa 100644
--- a/src/bin/pg_upgrade/relfilenode.c
+++ b/src/bin/pg_upgrade/relfilenode.c
@@ -17,6 +17,7 @@
 
 static void transfer_single_new_db(FileNameMap *maps, int size, char *old_tablespace);
 static void transfer_relfile(FileNameMap *map, const char *suffix, bool vm_must_add_frozenbit);
+void process_relfile_segment(FileNameMap *map, const char *suffix, bool vm_must_add_frozenbit, const char *old_file, const char *new_file);
 
 
 /*
@@ -232,30 +233,40 @@ transfer_relfile(FileNameMap *map, const char *type_suffix, bool vm_must_add_fro
 		/* Copying files might take some time, so give feedback. */
 		pg_log(PG_STATUS, "%s", old_file);
 
-		if (vm_must_add_frozenbit && strcmp(type_suffix, "_vm") == 0)
+		parallel_process_relfile_segment(map, type_suffix, vm_must_add_frozenbit, old_file, new_file);
+	}
+}
+
+
+
+void
+process_relfile_segment(FileNameMap *map, const char *type_suffix, bool vm_must_add_frozenbit, const char *old_file, const char *new_file)
+{
+
+	if (vm_must_add_frozenbit && strcmp(type_suffix, "_vm") == 0)
+	{
+		/* Need to rewrite visibility map format */
+		pg_log(PG_VERBOSE, "rewriting \"%s\" to \"%s\"\n",
+			   old_file, new_file);
+		rewriteVisibilityMap(old_file, new_file, map->nspname, map->relname);
+	}
+	else
+		switch (user_opts.transfer_mode)
 		{
-			/* Need to rewrite visibility map format */
-			pg_log(PG_VERBOSE, "rewriting \"%s\" to \"%s\"\n",
-				   old_file, new_file);
-			rewriteVisibilityMap(old_file, new_file, map->nspname, map->relname);
+			case TRANSFER_MODE_CLONE:
+				pg_log(PG_VERBOSE, "cloning \"%s\" to \"%s\"\n",
+					   old_file, new_file);
+				cloneFile(old_file, new_file, map->nspname, map->relname);
+				break;
+			case TRANSFER_MODE_COPY:
+				pg_log(PG_VERBOSE, "copying \"%s\" to \"%s\"\n",
+					   old_file, new_file);
+				copyFile(old_file, new_file, map->nspname, map->relname);
+				break;
+			case TRANSFER_MODE_LINK:
+				pg_log(PG_VERBOSE, "linking \"%s\" to \"%s\"\n",
+					   old_file, new_file);
+				linkFile(old_file, new_file, map->nspname, map->relname);
+				break;
 		}
-		else
-			switch (user_opts.transfer_mode)
-			{
-				case TRANSFER_MODE_CLONE:
-					pg_log(PG_VERBOSE, "cloning \"%s\" to \"%s\"\n",
-						   old_file, new_file);
-					cloneFile(old_file, new_file, map->nspname, map->relname);
-					break;
-				case TRANSFER_MODE_COPY:
-					pg_log(PG_VERBOSE, "copying \"%s\" to \"%s\"\n",
-						   old_file, new_file);
-					copyFile(old_file, new_file, map->nspname, map->relname);
-					break;
-				case TRANSFER_MODE_LINK:
-					pg_log(PG_VERBOSE, "linking \"%s\" to \"%s\"\n",
-						   old_file, new_file);
-					linkFile(old_file, new_file, map->nspname, map->relname);
-			}
-	}
 }
-- 
2.20.1

Reply via email to