Hi, This patch adds a new option (-J num, --jobs-per-disk=num) in pg_upgrade to speed up copy mode. This generates upto ${num} processes per tablespace to copy segments of the same relfilenode in parallel.
This can help when you have many multi gigabyte tables (each segment is 1GB by default) in different tablespaces (each tablespace in a different disk) and multiple processors. In a customer's database (~20Tb) it went down from 6h to 4h 45min. It lacks documentation and I need help with WIN32 part of it, I created this new mail to put the patch on the next commitfest. Original thread: https://www.postgresql.org/message-id/flat/YZVbtHKYP02AZDIO%40ahch-to -- Jaime Casanova Director de Servicios Profesionales SystemGuards - Consultores de PostgreSQL
>From 0d04f79cb51d6be0ced9c6561cfca5bfe18c4bdd Mon Sep 17 00:00:00 2001 From: Jaime Casanova <jcasa...@systemguards.com.ec> Date: Wed, 15 Dec 2021 12:14:44 -0500 Subject: [PATCH] Add --jobs-per-disk option to allow multiple processes per tablespace This option is independent of the --jobs one. It's will fork new processes to copy the different segments of a relfilenode in parallel. --- src/bin/pg_upgrade/option.c | 8 ++- src/bin/pg_upgrade/parallel.c | 93 ++++++++++++++++++++++++++++++++ src/bin/pg_upgrade/pg_upgrade.h | 4 ++ src/bin/pg_upgrade/relfilenode.c | 59 +++++++++++--------- 4 files changed, 139 insertions(+), 25 deletions(-) diff --git a/src/bin/pg_upgrade/option.c b/src/bin/pg_upgrade/option.c index 66fe16964e..46b1913a42 100644 --- a/src/bin/pg_upgrade/option.c +++ b/src/bin/pg_upgrade/option.c @@ -54,6 +54,7 @@ parseCommandLine(int argc, char *argv[]) {"link", no_argument, NULL, 'k'}, {"retain", no_argument, NULL, 'r'}, {"jobs", required_argument, NULL, 'j'}, + {"jobs-per-disks", required_argument, NULL, 'J'}, {"socketdir", required_argument, NULL, 's'}, {"verbose", no_argument, NULL, 'v'}, {"clone", no_argument, NULL, 1}, @@ -103,7 +104,7 @@ parseCommandLine(int argc, char *argv[]) if (os_user_effective_id == 0) pg_fatal("%s: cannot be run as root\n", os_info.progname); - while ((option = getopt_long(argc, argv, "d:D:b:B:cj:kNo:O:p:P:rs:U:v", + while ((option = getopt_long(argc, argv, "d:D:b:B:cj:J:kNo:O:p:P:rs:U:v", long_options, &optindex)) != -1) { switch (option) @@ -132,6 +133,10 @@ parseCommandLine(int argc, char *argv[]) user_opts.jobs = atoi(optarg); break; + case 'J': + user_opts.jobs_per_disk = atoi(optarg); + break; + case 'k': user_opts.transfer_mode = TRANSFER_MODE_LINK; break; @@ -291,6 +296,7 @@ usage(void) printf(_(" -d, --old-datadir=DATADIR old cluster data directory\n")); printf(_(" -D, --new-datadir=DATADIR new cluster data directory\n")); printf(_(" -j, --jobs=NUM number of simultaneous processes or threads to use\n")); + printf(_(" -J, --jobs_per_disk=NUM number of simultaneous processes or threads to use per tablespace\n")); printf(_(" -k, --link link instead of copying files to new cluster\n")); printf(_(" -N, --no-sync do not wait for changes to be written safely to disk\n")); printf(_(" -o, --old-options=OPTIONS old cluster options to pass to the server\n")); diff --git a/src/bin/pg_upgrade/parallel.c b/src/bin/pg_upgrade/parallel.c index ee7364da3b..82f698a9ab 100644 --- a/src/bin/pg_upgrade/parallel.c +++ b/src/bin/pg_upgrade/parallel.c @@ -17,6 +17,9 @@ #include "pg_upgrade.h" static int parallel_jobs; +static int current_jobs = 0; + +static bool reap_subchild(bool wait_for_child); #ifdef WIN32 /* @@ -277,6 +280,60 @@ win32_transfer_all_new_dbs(transfer_thread_arg *args) #endif + +/* + * parallel_process_relfile_segment() + * + * Copy or link file from old cluster to new one. If vm_must_add_frozenbit + * is true, visibility map forks are converted and rewritten, even in link + * mode. + */ +void +parallel_process_relfile_segment(FileNameMap *map, const char *type_suffix, bool vm_must_add_frozenbit, const char *old_file, const char *new_file) +{ +#ifndef WIN32 + pid_t child; +#else + HANDLE child; + transfer_thread_arg *new_arg; +#endif + if (user_opts.jobs <= 1 || user_opts.jobs_per_disk <= 1) + process_relfile_segment(map, type_suffix, vm_must_add_frozenbit, old_file, new_file); + else + { + /* parallel */ + + /* harvest any dead children */ + while (reap_subchild(false) == true) + ; + + /* must we wait for a dead child? use a maximum of 3 childs per tablespace */ + if (current_jobs >= user_opts.jobs_per_disk) + reap_subchild(true); + + /* set this before we start the job */ + current_jobs++; + + /* Ensure stdio state is quiesced before forking */ + fflush(NULL); + +#ifndef WIN32 + child = fork(); + if (child == 0) + { + process_relfile_segment(map, type_suffix, vm_must_add_frozenbit, old_file, new_file); + /* use _exit to skip atexit() functions */ + _exit(0); + } + else if (child < 0) + /* fork failed */ + pg_fatal("could not create worker process: %s\n", strerror(errno)); +#endif + } +} + + + /* * collect status from a completed worker child */ @@ -345,3 +402,39 @@ reap_child(bool wait_for_child) return true; } + + + + +/* + * collect status from a completed worker subchild + */ +static bool +reap_subchild(bool wait_for_child) +{ +#ifndef WIN32 + int work_status; + pid_t child; +#else + int thread_num; + DWORD res; +#endif + + if (user_opts.jobs <= 1 || current_jobs == 0) + return false; + +#ifndef WIN32 + child = waitpid(-1, &work_status, wait_for_child ? 0 : WNOHANG); + if (child == (pid_t) -1) + pg_fatal("waitpid() failed: %s\n", strerror(errno)); + if (child == 0) + return false; /* no children, or no dead children */ + if (work_status != 0) + pg_fatal("child process exited abnormally: status %d\n", work_status); +#endif + + /* do this after job has been removed */ + current_jobs--; + + return true; +} diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h index 22169f1002..adcb24ffea 100644 --- a/src/bin/pg_upgrade/pg_upgrade.h +++ b/src/bin/pg_upgrade/pg_upgrade.h @@ -282,6 +282,7 @@ typedef struct bool do_sync; /* flush changes to disk */ transferMode transfer_mode; /* copy files or link them? */ int jobs; /* number of processes/threads to use */ + int jobs_per_disk; /* number of processes/threads to use */ char *socketdir; /* directory to use for Unix sockets */ } UserOpts; @@ -450,4 +451,7 @@ void parallel_exec_prog(const char *log_file, const char *opt_log_file, void parallel_transfer_all_new_dbs(DbInfoArr *old_db_arr, DbInfoArr *new_db_arr, char *old_pgdata, char *new_pgdata, char *old_tablespace); + +void process_relfile_segment(FileNameMap *map, const char *suffix, bool vm_must_add_frozenbit, const char *old_file, const char *new_file); +void parallel_process_relfile_segment(FileNameMap *map, const char *suffix, bool vm_must_add_frozenbit, const char *old_file, const char *new_file); bool reap_child(bool wait_for_child); diff --git a/src/bin/pg_upgrade/relfilenode.c b/src/bin/pg_upgrade/relfilenode.c index 5dbefbceaf..8a7c49efaa 100644 --- a/src/bin/pg_upgrade/relfilenode.c +++ b/src/bin/pg_upgrade/relfilenode.c @@ -17,6 +17,7 @@ static void transfer_single_new_db(FileNameMap *maps, int size, char *old_tablespace); static void transfer_relfile(FileNameMap *map, const char *suffix, bool vm_must_add_frozenbit); +void process_relfile_segment(FileNameMap *map, const char *suffix, bool vm_must_add_frozenbit, const char *old_file, const char *new_file); /* @@ -232,30 +233,40 @@ transfer_relfile(FileNameMap *map, const char *type_suffix, bool vm_must_add_fro /* Copying files might take some time, so give feedback. */ pg_log(PG_STATUS, "%s", old_file); - if (vm_must_add_frozenbit && strcmp(type_suffix, "_vm") == 0) + parallel_process_relfile_segment(map, type_suffix, vm_must_add_frozenbit, old_file, new_file); + } +} + + + +void +process_relfile_segment(FileNameMap *map, const char *type_suffix, bool vm_must_add_frozenbit, const char *old_file, const char *new_file) +{ + + if (vm_must_add_frozenbit && strcmp(type_suffix, "_vm") == 0) + { + /* Need to rewrite visibility map format */ + pg_log(PG_VERBOSE, "rewriting \"%s\" to \"%s\"\n", + old_file, new_file); + rewriteVisibilityMap(old_file, new_file, map->nspname, map->relname); + } + else + switch (user_opts.transfer_mode) { - /* Need to rewrite visibility map format */ - pg_log(PG_VERBOSE, "rewriting \"%s\" to \"%s\"\n", - old_file, new_file); - rewriteVisibilityMap(old_file, new_file, map->nspname, map->relname); + case TRANSFER_MODE_CLONE: + pg_log(PG_VERBOSE, "cloning \"%s\" to \"%s\"\n", + old_file, new_file); + cloneFile(old_file, new_file, map->nspname, map->relname); + break; + case TRANSFER_MODE_COPY: + pg_log(PG_VERBOSE, "copying \"%s\" to \"%s\"\n", + old_file, new_file); + copyFile(old_file, new_file, map->nspname, map->relname); + break; + case TRANSFER_MODE_LINK: + pg_log(PG_VERBOSE, "linking \"%s\" to \"%s\"\n", + old_file, new_file); + linkFile(old_file, new_file, map->nspname, map->relname); + break; } - else - switch (user_opts.transfer_mode) - { - case TRANSFER_MODE_CLONE: - pg_log(PG_VERBOSE, "cloning \"%s\" to \"%s\"\n", - old_file, new_file); - cloneFile(old_file, new_file, map->nspname, map->relname); - break; - case TRANSFER_MODE_COPY: - pg_log(PG_VERBOSE, "copying \"%s\" to \"%s\"\n", - old_file, new_file); - copyFile(old_file, new_file, map->nspname, map->relname); - break; - case TRANSFER_MODE_LINK: - pg_log(PG_VERBOSE, "linking \"%s\" to \"%s\"\n", - old_file, new_file); - linkFile(old_file, new_file, map->nspname, map->relname); - } - } } -- 2.20.1