I promised to research allowing parallel execution of schema dump/restore, so I have developed the attached patch, with dramatic results:
tables git patch 1000 22.29 18.30 2000 30.75 19.67 4000 46.33 22.31 8000 81.09 29.27 16000 145.43 40.12 32000 309.39 64.85 64000 754.62 108.76 These performance results are best-case because it was run with the the databases all the same size and equal to the number of server cores. (Test script attached.) This uses fork/processes on Unix, and threads on Windows. I need someone to check my use of waitpid() on Unix, and I need code compile and run testing on Windows. It basically adds a --jobs option, like pg_restore uses, to run multiple schema dumps/restores in parallel. I patterned this after the pg_restore pg_backup_archiver.c --jobs code. However, I found the pg_restore Windows code awkward because it puts everything in one struct array that has gaps for dead children. Because WaitForMultipleObjects() requires an array of thread handles with no gaps, the pg_restore code must make a temporary array for every call to WaitForMultipleObjects(). Instead, I created an array just for thread handles (rather than putting it in the same struct), and swapped entries into dead child slots to avoid gaps --- this allows the thread handle array to be passed directly to WaitForMultipleObjects(). Do people like this approach? Should we do the same in pg_restore. I expect us to be doing more parallelism in other areas so I would like to have an consistent approach. The only other optimization I can think of is to do parallel file copy per tablespace (in non-link mode). -- Bruce Momjian <br...@momjian.us> http://momjian.us EnterpriseDB http://enterprisedb.com + It's impossible for everything to be true. +
diff --git a/contrib/pg_upgrade/Makefile b/contrib/pg_upgrade/Makefile new file mode 100644 index dec57a6..bbb14a1 *** a/contrib/pg_upgrade/Makefile --- b/contrib/pg_upgrade/Makefile *************** PGAPPICON = win32 *** 5,11 **** PROGRAM = pg_upgrade OBJS = check.o controldata.o dump.o exec.o file.o function.o info.o \ ! option.o page.o pg_upgrade.o relfilenode.o server.o \ tablespace.o util.o version.o version_old_8_3.o $(WIN32RES) PG_CPPFLAGS = -DFRONTEND -DDLSUFFIX=\"$(DLSUFFIX)\" -I$(srcdir) -I$(libpq_srcdir) --- 5,11 ---- PROGRAM = pg_upgrade OBJS = check.o controldata.o dump.o exec.o file.o function.o info.o \ ! option.o page.o parallel.o pg_upgrade.o relfilenode.o server.o \ tablespace.o util.o version.o version_old_8_3.o $(WIN32RES) PG_CPPFLAGS = -DFRONTEND -DDLSUFFIX=\"$(DLSUFFIX)\" -I$(srcdir) -I$(libpq_srcdir) diff --git a/contrib/pg_upgrade/dump.c b/contrib/pg_upgrade/dump.c new file mode 100644 index f35852b..a4b0127 *** a/contrib/pg_upgrade/dump.c --- b/contrib/pg_upgrade/dump.c *************** generate_old_dump(void) *** 33,50 **** /* create per-db dump files */ for (dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++) { ! char file_name[MAXPGPATH]; DbInfo *old_db = &old_cluster.dbarr.dbs[dbnum]; pg_log(PG_STATUS, "%s", old_db->db_name); ! snprintf(file_name, sizeof(file_name), DB_DUMP_FILE_MASK, old_db->db_oid); ! exec_prog(RESTORE_LOG_FILE, NULL, true, "\"%s/pg_dump\" %s --schema-only --binary-upgrade --format=custom %s --file=\"%s\" \"%s\"", new_cluster.bindir, cluster_conn_opts(&old_cluster), ! log_opts.verbose ? "--verbose" : "", file_name, old_db->db_name); } end_progress_output(); check_ok(); } --- 33,55 ---- /* create per-db dump files */ for (dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++) { ! char sql_file_name[MAXPGPATH], log_file_name[MAXPGPATH]; DbInfo *old_db = &old_cluster.dbarr.dbs[dbnum]; pg_log(PG_STATUS, "%s", old_db->db_name); ! snprintf(sql_file_name, sizeof(sql_file_name), DB_DUMP_FILE_MASK, old_db->db_oid); ! snprintf(log_file_name, sizeof(log_file_name), DB_DUMP_LOG_FILE_MASK, old_db->db_oid); ! parallel_exec_prog(log_file_name, NULL, "\"%s/pg_dump\" %s --schema-only --binary-upgrade --format=custom %s --file=\"%s\" \"%s\"", new_cluster.bindir, cluster_conn_opts(&old_cluster), ! log_opts.verbose ? "--verbose" : "", sql_file_name, old_db->db_name); } + /* reap all children */ + while (reap_child(true) == true) + ; + end_progress_output(); check_ok(); } diff --git a/contrib/pg_upgrade/option.c b/contrib/pg_upgrade/option.c new file mode 100644 index 19053fa..88686c5 *** a/contrib/pg_upgrade/option.c --- b/contrib/pg_upgrade/option.c *************** parseCommandLine(int argc, char *argv[]) *** 52,57 **** --- 52,58 ---- {"check", no_argument, NULL, 'c'}, {"link", no_argument, NULL, 'k'}, {"retain", no_argument, NULL, 'r'}, + {"jobs", required_argument, NULL, 'j'}, {"verbose", no_argument, NULL, 'v'}, {NULL, 0, NULL, 0} }; *************** parseCommandLine(int argc, char *argv[]) *** 101,107 **** if ((log_opts.internal = fopen_priv(INTERNAL_LOG_FILE, "a")) == NULL) pg_log(PG_FATAL, "cannot write to log file %s\n", INTERNAL_LOG_FILE); ! while ((option = getopt_long(argc, argv, "d:D:b:B:cko:O:p:P:ru:v", long_options, &optindex)) != -1) { switch (option) --- 102,108 ---- if ((log_opts.internal = fopen_priv(INTERNAL_LOG_FILE, "a")) == NULL) pg_log(PG_FATAL, "cannot write to log file %s\n", INTERNAL_LOG_FILE); ! while ((option = getopt_long(argc, argv, "d:D:b:B:cj:ko:O:p:P:ru:v", long_options, &optindex)) != -1) { switch (option) *************** parseCommandLine(int argc, char *argv[]) *** 128,133 **** --- 129,138 ---- new_cluster.pgconfig = pg_strdup(optarg); break; + case 'j': + user_opts.jobs = atoi(optarg); + break; + case 'k': user_opts.transfer_mode = TRANSFER_MODE_LINK; break; *************** Options:\n\ *** 229,234 **** --- 234,240 ---- -c, --check check clusters only, don't change any data\n\ -d, --old-datadir=OLDDATADIR old cluster data directory\n\ -D, --new-datadir=NEWDATADIR new cluster data directory\n\ + -j, --jobs number of simultaneous processes or threads to use\n\ -k, --link link instead of copying files to new cluster\n\ -o, --old-options=OPTIONS old cluster options to pass to the server\n\ -O, --new-options=OPTIONS new cluster options to pass to the server\n\ diff --git a/contrib/pg_upgrade/parallel.c b/contrib/pg_upgrade/parallel.c new file mode 100644 index ...092d64f *** a/contrib/pg_upgrade/parallel.c --- b/contrib/pg_upgrade/parallel.c *************** *** 0 **** --- 1,217 ---- + /* + * parallel.c + * + * multi-process support + * + * Copyright (c) 2010-2012, PostgreSQL Global Development Group + * contrib/pg_upgrade/parallel.c + */ + + #include "postgres.h" + + #include "pg_upgrade.h" + + #include <stdlib.h> + #include <string.h> + #include <sys/types.h> + #include <sys/wait.h> + + #ifdef WIN32 + #include <io.h> + #endif + + static int parallel_jobs; + + #ifdef WIN32 + /* + * Array holding all active threads. There can't be any gaps/zeros so + * it can be passed to WaitForMultipleObjects(). We use two arrays + * so the thread_handles array can be passed to WaitForMultipleObjects(). + */ + HANDLE *thread_handles; + + typedef struct { + char log_file[MAXPGPATH]; + char *opt_log_file[MAXPGPATH]; + char *cmd[MAX_STRING]; + } thread_arg; + + thread_arg *thread_args; + + DWORD win32_exec_prog(thread_arg *args); + + #endif + + /* + * parallel_exec_prog + * + * This has the same API as exec_prog, except it does parallel execution, + * and therefore must throw errors and doesn't return an error status. + */ + void + parallel_exec_prog(const char *log_file, const char *opt_log_file, + const char *fmt,...) + { + va_list args; + char cmd[MAX_STRING]; + #ifndef WIN32 + pid_t child; + #else + HANDLE child; + thread_arg *new_arg; + #endif + + va_start(args, fmt); + vsnprintf(cmd, sizeof(cmd), fmt, args); + va_end(args); + + if (user_opts.jobs <= 1) + /* throw_error must be true to allow jobs */ + exec_prog(log_file, opt_log_file, true, "%s", cmd); + else + { + /* parallel */ + + /* harvest any dead children */ + while (reap_child(false) == true) + ; + + /* must we wait for a dead child? */ + if (parallel_jobs >= user_opts.jobs) + reap_child(true); + + /* set this before we start the job */ + parallel_jobs++; + + /* Ensure stdio state is quiesced before forking */ + fflush(NULL); + + #ifndef WIN32 + child = fork(); + if (child == 0) + /* use _exit to skip atexit() functions */ + _exit(!exec_prog(log_file, opt_log_file, true, "%s", cmd)); + else if (child < 0) + /* fork failed */ + pg_log(PG_FATAL, "could not create worker process: %s\n", strerror(errno)); + #else + if (active_threads == NULL) + { + int i; + + thread_handles = pg_malloc(user_opts.jobs * sizeof(thread_handle)); + thread_args = pg_malloc(user_opts.jobs * sizeof(thread_arg)); + + /* + * For safety and performance, we keep the args allocated during + * the entire life of the process, and we don't free the args + * in a thread different from the one that allocated it. + */ + for (i = 0; i < user_opts.jobs; i++) + thread_args[i] = pg_malloc(sizeof(thread_arg)); + } + + /* use first empty array element */ + new_arg = thread_args[parallel_jobs-1]; + + /* Can only pass one pointer into the function, so use a struct */ + strcpy(new_arg->log_file, log_file); + strcpy(new_arg->opt_log_file, opt_log_file); + strcpy(new_arg->cmd, cmd); + + child = (HANDLE) _beginthreadex(NULL, 0, (void *) win32_exec_prog, + new_arg, 0, NULL); + if (child == 0) + pg_log(PG_FATAL, "could not create worker thread: %s\n", strerror(errno)); + + thread_handles[parallel_jobs-1] = child; + #endif + } + + return; + } + + + #ifdef WIN32 + DWORD + win32_exec_prog(thread_arg *args) + { + int ret; + + ret = !exec_prog(args->log_file, args->opt_log_file, true, "%s", args->cmd); + + /* terminates thread */ + return ret; + } + #endif + + + /* + * collect status from a completed worker child + */ + bool + reap_child(bool wait_for_child) + { + int work_status; + int ret; + #ifdef WIN32 + int thread_num; + DWORD res; + #endif + + if (user_opts.jobs <= 1 || parallel_jobs == 0) + return false; + + #ifndef WIN32 + ret = waitpid(-1, &work_status, wait_for_child ? 0 : WNOHANG); + + /* no children or, for WNOHANG, no dead children */ + if (ret <= 0 || !WIFEXITED(work_status)) + return false; + + if (WEXITSTATUS(work_status) != 0) + pg_log(PG_FATAL, "child worker exited abnormally: %s\n", strerror(errno)); + + #else + /* wait for one to finish */ + thread_num = WaitForMultipleObjects(parallel_jobs, thread_handles, + false, wait_for_child ? INFINITE : 0); + + if (thread_num == WAIT_TIMEOUT || thread_num == WAIT_FAILED) + return false; + + /* compute thread index in active_threads */ + thread_num -= WAIT_OBJECT_0; + + /* get the result */ + GetExitCodeThread(thread_handles[thread_num], &res); + if (res != 0) + pg_log(PG_FATAL, "child worker exited abnormally: %s\n", strerror(errno)); + + /* dispose of handle to stop leaks */ + CloseHandle(thread_handles[thread_num]); + + /* Move last slot into dead child's position */ + if (thread_num != parallel_jobs - 1) + { + thread_arg *tmp_args; + + thread_handles[thread_num] = thread_handles[parallel_jobs - 1]; + + /* + * We must swap the arg struct pointers because the thread we + * just moved is active, and we must make sure it is not + * reused by the next created thread. Instead, the new thread + * will use the arg struct of the thread that just died. + */ + tmp_args = thread_args[thread_num]; + thread_args[thread_num] = thread_args[parallel_jobs - 1]; + thread_args[parallel_jobs - 1] = tmp_args; + } + #endif + + /* do this after job has been removed */ + parallel_jobs--; + + return true; + } diff --git a/contrib/pg_upgrade/pg_upgrade.c b/contrib/pg_upgrade/pg_upgrade.c new file mode 100644 index 2d4b678..8fa64b7 *** a/contrib/pg_upgrade/pg_upgrade.c --- b/contrib/pg_upgrade/pg_upgrade.c *************** char *output_files[] = { *** 61,67 **** /* unique file for pg_ctl start */ SERVER_START_LOG_FILE, #endif - RESTORE_LOG_FILE, UTILITY_LOG_FILE, INTERNAL_LOG_FILE, NULL --- 61,66 ---- *************** prepare_new_databases(void) *** 270,276 **** * support functions in template1 but pg_dumpall creates database using * the template0 template. */ ! exec_prog(RESTORE_LOG_FILE, NULL, true, "\"%s/psql\" " EXEC_PSQL_ARGS " %s -f \"%s\"", new_cluster.bindir, cluster_conn_opts(&new_cluster), GLOBALS_DUMP_FILE); --- 269,275 ---- * support functions in template1 but pg_dumpall creates database using * the template0 template. */ ! exec_prog(UTILITY_LOG_FILE, NULL, true, "\"%s/psql\" " EXEC_PSQL_ARGS " %s -f \"%s\"", new_cluster.bindir, cluster_conn_opts(&new_cluster), GLOBALS_DUMP_FILE); *************** create_new_objects(void) *** 307,328 **** for (dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++) { ! char file_name[MAXPGPATH]; DbInfo *old_db = &old_cluster.dbarr.dbs[dbnum]; pg_log(PG_STATUS, "%s", old_db->db_name); ! snprintf(file_name, sizeof(file_name), DB_DUMP_FILE_MASK, old_db->db_oid); /* * Using pg_restore --single-transaction is faster than other * methods, like --jobs. pg_dump only produces its output at the * end, so there is little parallelism using the pipe. */ ! exec_prog(RESTORE_LOG_FILE, NULL, true, "\"%s/pg_restore\" %s --exit-on-error --single-transaction --verbose --dbname \"%s\" \"%s\"", new_cluster.bindir, cluster_conn_opts(&new_cluster), ! old_db->db_name, file_name); } end_progress_output(); check_ok(); --- 306,333 ---- for (dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++) { ! char sql_file_name[MAXPGPATH], log_file_name[MAXPGPATH]; DbInfo *old_db = &old_cluster.dbarr.dbs[dbnum]; pg_log(PG_STATUS, "%s", old_db->db_name); ! snprintf(sql_file_name, sizeof(sql_file_name), DB_DUMP_FILE_MASK, old_db->db_oid); ! snprintf(log_file_name, sizeof(log_file_name), DB_DUMP_LOG_FILE_MASK, old_db->db_oid); /* * Using pg_restore --single-transaction is faster than other * methods, like --jobs. pg_dump only produces its output at the * end, so there is little parallelism using the pipe. */ ! parallel_exec_prog(log_file_name, NULL, "\"%s/pg_restore\" %s --exit-on-error --single-transaction --verbose --dbname \"%s\" \"%s\"", new_cluster.bindir, cluster_conn_opts(&new_cluster), ! old_db->db_name, sql_file_name); } + + /* reap all children */ + while (reap_child(true) == true) + ; + end_progress_output(); check_ok(); *************** cleanup(void) *** 494,504 **** if (old_cluster.dbarr.dbs) for (dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++) { ! char file_name[MAXPGPATH]; DbInfo *old_db = &old_cluster.dbarr.dbs[dbnum]; ! snprintf(file_name, sizeof(file_name), DB_DUMP_FILE_MASK, old_db->db_oid); ! unlink(file_name); } } } --- 499,512 ---- if (old_cluster.dbarr.dbs) for (dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++) { ! char sql_file_name[MAXPGPATH], log_file_name[MAXPGPATH]; DbInfo *old_db = &old_cluster.dbarr.dbs[dbnum]; ! snprintf(sql_file_name, sizeof(sql_file_name), DB_DUMP_FILE_MASK, old_db->db_oid); ! unlink(sql_file_name); ! ! snprintf(log_file_name, sizeof(log_file_name), DB_DUMP_FILE_MASK, old_db->db_oid); ! unlink(log_file_name); } } } diff --git a/contrib/pg_upgrade/pg_upgrade.h b/contrib/pg_upgrade/pg_upgrade.h new file mode 100644 index cae1e46..81d9d51 *** a/contrib/pg_upgrade/pg_upgrade.h --- b/contrib/pg_upgrade/pg_upgrade.h *************** *** 32,39 **** #define GLOBALS_DUMP_FILE "pg_upgrade_dump_globals.sql" #define DB_DUMP_FILE_MASK "pg_upgrade_dump_%u.custom" #define SERVER_LOG_FILE "pg_upgrade_server.log" - #define RESTORE_LOG_FILE "pg_upgrade_restore.log" #define UTILITY_LOG_FILE "pg_upgrade_utility.log" #define INTERNAL_LOG_FILE "pg_upgrade_internal.log" --- 32,39 ---- #define GLOBALS_DUMP_FILE "pg_upgrade_dump_globals.sql" #define DB_DUMP_FILE_MASK "pg_upgrade_dump_%u.custom" + #define DB_DUMP_LOG_FILE_MASK "pg_upgrade_dump_%u.log" #define SERVER_LOG_FILE "pg_upgrade_server.log" #define UTILITY_LOG_FILE "pg_upgrade_utility.log" #define INTERNAL_LOG_FILE "pg_upgrade_internal.log" *************** typedef struct *** 264,269 **** --- 264,270 ---- bool check; /* TRUE -> ask user for permission to make * changes */ transferMode transfer_mode; /* copy files or link them? */ + int jobs; } UserOpts; *************** void old_8_3_invalidate_hash_gin_indexe *** 461,463 **** --- 462,472 ---- void old_8_3_invalidate_bpchar_pattern_ops_indexes(ClusterInfo *cluster, bool check_mode); char *old_8_3_create_sequence_script(ClusterInfo *cluster); + + /* parallel.c */ + void parallel_exec_prog(const char *log_file, const char *opt_log_file, + const char *fmt,...) + __attribute__((format(PG_PRINTF_ATTRIBUTE, 3, 4))); + + bool reap_child(bool wait_for_child); + diff --git a/doc/src/sgml/pgupgrade.sgml b/doc/src/sgml/pgupgrade.sgml new file mode 100644 index 998cb2f..53781e4 *** a/doc/src/sgml/pgupgrade.sgml --- b/doc/src/sgml/pgupgrade.sgml *************** *** 113,118 **** --- 113,125 ---- </varlistentry> <varlistentry> + <term><option>-j</option></term> + <term><option>--jobs</option></term> + <listitem><para>number of simultaneous processes or threads to use + </para></listitem> + </varlistentry> + + <varlistentry> <term><option>-k</option></term> <term><option>--link</option></term> <listitem><para>use hard links instead of copying files to the new cluster</para></listitem> *************** NET STOP pgsql-8.3 (<productname>Postgr *** 331,340 **** requires that the old and new cluster data directories be in the same file system. See <literal>pg_upgrade --help</> for a full list of options. ! </para> ! <para> ! </para> <para> For Windows users, you must be logged into an administrative account, and --- 338,352 ---- requires that the old and new cluster data directories be in the same file system. See <literal>pg_upgrade --help</> for a full list of options. ! </para> ! <para> ! The <option>--jobs</> option allows multiple CPU cores to be used ! to dump and reload database schemas in parallel; a good place to ! start is the number of CPU cores on the server. This option can ! dramatically reduce the time to upgrade a multi-database server ! running on a multiprocessor machine. ! </para> <para> For Windows users, you must be logged into an administrative account, and
: . traprm export QUIET=$((QUIET + 1)) > /rtmp/out JOBLIMIT=16 REL=9.3 BRANCH=jobs export PGOPTIONS="-c synchronous_commit=off" for CYCLES in 1000 2000 4000 8000 16000 32000 64000 do echo "$CYCLES" >> /rtmp/out for JOBLIMIT in 1 16 do cd /pgsql/$REL pgsw $BRANCH cd - tools/setup $REL $REL sleep 2 # [ "$SSD" = "f" ] && tools/mv_to_archive # need for +16k for CONFIG in /u/pgsql.old/data/postgresql.conf /u/pgsql/data/postgresql.conf do pipe sed 's/#max_locks_per_transaction = 64/max_locks_per_transaction = 64000/' "$CONFIG" pipe sed 's/shared_buffers = 128MB/shared_buffers = 1GB/' "$CONFIG" done pgstart /u/pgsql.old/data sleep 2 for DB in $(jot $JOBLIMIT) do newdb test$DB for JOT in $(jot $(( $CYCLES / $JOBLIMIT )) ); do echo "CREATE TABLE test$JOT (x SERIAL PRIMARY KEY);"; done | PGOPTIONS="-c synchronous_commit=off" sql --single-transaction test$DB done pgstop /u/pgsql.old/data sleep 2 /usr/bin/time --output=/rtmp/out --append --format '%e' tools/upgrade -j $JOBLIMIT || exit sleep 2 done done bell
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers