Hi Lakshmi, Hayato,
Thanks a lot for your input!
I'm not sure why the VACUUM phase takes longer compared to the serial
run. We can potentially get a clue with a profiler. I know there is an
ongoing effort to introduce parallel heap vacuum [1] which I expect will
help with this.
The code comments you have provided me have been applied to the v2 patch
attached. Below I provide answers to the questions.
Also, why is -j accepted in case of non-partitions?
For non-partitioned tables, each worker loads a separate range of rows
via its own connection in parallel.
Copying seems to be divided into chunks per COPY_BATCH_SIZE. Is it really
essential to parallelize the initialization? I feel it may optimize even
serialized case thus can be discussed independently.
You're right that the COPY batching is an optimization that's
independent. I wanted to see how fast I can get this patch, so I looked
for bottlenecks in the new code with a profiler and this was one of
them. I agree it makes sense to apply this for the serialised case
separately.
Per my understanding, each thread creates its tables, and all of them are
attached to the parent table. Is it right? I think it needs more code
changes, and I am not sure it is critical to make initialization faster.
Yes, that's correct. Each worker creates its assigned partitions as
standalone tables, loads data into them, and then the main thread
attaches them all to the parent after loading completes. It's to avoid
AccessExclusiveLock contention on the parent table during parallel
loading and allow each worker to use COPY FREEZE on its standalone table.
So I suggest using the incremental approach. The first patch only
parallelizes
the data load, and the second patch implements the CREATE TABLE and
ALTER TABLE
ATTACH PARTITION. You can benchmark three patterns, master, 0001, and
0001 + 0002, then compare the results. IIUC, this is the common
approach to
reduce the patch size and make them more reviewable.
Thanks for the recommendation, I extracted 0001 and 0002 as per your
suggestion. I will see if I can split it more, as indeed it helps with
the review.
Results are similar with the previous runs.
master
pgbench -i -s 100 -j 10
done in 20.95 s (drop tables 0.00 s, create tables 0.01 s, client-side
generate 14.51 s, vacuum 0.27 s, primary keys 6.16 s).
pgbench -i -s 100 -j 10 --partitions=10
done in 29.73 s (drop tables 0.00 s, create tables 0.02 s, client-side
generate 16.33 s, vacuum 8.72 s, primary keys 4.67 s).
0001
pgbench -i -s 100 -j 10
done in 18.75 s (drop tables 0.00 s, create tables 0.01 s, client-side
generate 6.51 s, vacuum 5.73 s, primary keys 6.50 s).
pgbench -i -s 100 -j 10 --partitions=10
done in 29.33 s (drop tables 0.00 s, create tables 0.02 s, client-side
generate 16.48 s, vacuum 7.59 s, primary keys 5.24 s).
0002
pgbench -i -s 100 -j 10
done in 18.12 s (drop tables 0.00 s, create tables 0.01 s, client-side
generate 6.64 s, vacuum 5.81 s, primary keys 5.65 s).
pgbench -i -s 100 -j 10 --partitions=10
done in 14.38 s (drop tables 0.00 s, create tables 0.01 s, client-side
generate 7.97 s, vacuum 1.55 s, primary keys 4.85 s).
Looking forward to your feedback.
[1]:
https://www.postgresql.org/message-id/CAD21AoAEfCNv-GgaDheDJ%2Bs-p_Lv1H24AiJeNoPGCmZNSwL1YA%40mail.gmail.com
--
Thanks,
Mircea Cadariu
From e4619e03cf3c30653c2de4eafaaf9391eba34f3f Mon Sep 17 00:00:00 2001
From: Mircea Cadariu <[email protected]>
Date: Sun, 8 Mar 2026 20:05:41 +0000
Subject: [PATCH 1/2] Add parallel data loading support to pgbench
Add support for parallel data loading during pgbench initialization
(-i mode) using multiple threads (-j/--jobs). When multiple threads
are specified, pgbench_accounts data generation is split across
worker threads, each loading its portion via a separate connection.
Parallel loading is currently supported for non-partitioned tables only;
partitioned tables fall back to serial loading.
---
doc/src/sgml/ref/pgbench.sgml | 7 +
src/bin/pgbench/pgbench.c | 299 +++++++++++++++++--
src/bin/pgbench/t/001_pgbench_with_server.pl | 20 ++
src/tools/pgindent/typedefs.list | 2 +
4 files changed, 311 insertions(+), 17 deletions(-)
diff --git a/doc/src/sgml/ref/pgbench.sgml b/doc/src/sgml/ref/pgbench.sgml
index 2e401d1ceb..41772442d1 100644
--- a/doc/src/sgml/ref/pgbench.sgml
+++ b/doc/src/sgml/ref/pgbench.sgml
@@ -502,6 +502,13 @@ pgbench <optional> <replaceable>options</replaceable>
</optional> <replaceable>d
Clients are distributed as evenly as possible among available threads.
Default is 1.
</para>
+ <para>
+ In initialization mode (<option>-i</option>), this option controls
+ the number of threads used to load data into
+ <literal>pgbench_accounts</literal> in parallel.
+ Parallel data loading is currently supported for
+ non-partitioned tables only.
+ </para>
</listitem>
</varlistentry>
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index 1dae918cc0..1ecd44c4ab 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -78,6 +78,8 @@
#define ERRCODE_T_R_DEADLOCK_DETECTED "40P01"
#define ERRCODE_UNDEFINED_TABLE "42P01"
+#define COPY_BATCH_SIZE (1024 * 1024)
+
/*
* Hashing constants
*/
@@ -825,7 +827,6 @@ static const BuiltinScript builtin_script[] =
}
};
-
/* Function prototypes */
static void setNullValue(PgBenchValue *pv);
static void setBoolValue(PgBenchValue *pv, bool bval);
@@ -857,6 +858,17 @@ static const PsqlScanCallbacks pgbench_callbacks = {
NULL, /* don't need
get_variable functionality */
};
+/* Worker thread data for parallel table loading */
+typedef struct WorkerTask
+{
+ PGconn *con;
+ const char *table;
+ int64 start_row;
+ int64 end_row;
+ initRowMethod append_row;
+ int worker_id;
+} WorkerTask;
+
static char
get_table_relkind(PGconn *con, const char *table)
{
@@ -1586,6 +1598,216 @@ doConnect(void)
return conn;
}
+/*
+ * Truncate specified table(s)
+ * tableName can be a single table or comma-separated list of tables
+ */
+static void
+truncateTable(PGconn *con, const char *tableName)
+{
+ PQExpBufferData query;
+ initPQExpBuffer(&query);
+ printfPQExpBuffer(&query, "TRUNCATE TABLE %s", tableName);
+ executeStatement(con, query.data);
+ termPQExpBuffer(&query);
+}
+
+/*
+ * Parameters needed for COPY operations.
+ */
+typedef struct CopyTarget
+{
+ const char *table_name;
+ int64 start_row;
+ int64 end_row;
+ bool use_freeze;
+} CopyTarget;
+
+/*
+ * Perform COPY operation for a single table or partition.
+ * Batches rows into larger buffers before sending to reduce overhead.
+ */
+static void
+performCopy(PGconn *conn, WorkerTask *wt, CopyTarget *ct)
+{
+ PGresult *res;
+ char copy_statement[NAMEDATALEN + 32];
+ int64 row;
+ PQExpBufferData batch_buffer;
+
+ /* Build the COPY command */
+ if (ct->use_freeze)
+ snprintf(copy_statement, sizeof(copy_statement),
+ "COPY %s FROM STDIN (FREEZE ON)",
ct->table_name);
+ else
+ snprintf(copy_statement, sizeof(copy_statement),
+ "COPY %s FROM STDIN", ct->table_name);
+
+ /* Initiate COPY mode */
+ res = PQexec(conn, copy_statement);
+ if (PQresultStatus(res) != PGRES_COPY_IN)
+ pg_fatal("COPY command failed for table \"%s\": %s",
+ ct->table_name, PQerrorMessage(conn));
+ PQclear(res);
+
+ /* Pre-allocate buffer to avoid repeated reallocs */
+ initPQExpBuffer(&batch_buffer);
+ enlargePQExpBuffer(&batch_buffer, COPY_BATCH_SIZE);
+
+ /* Generate and send rows in batches using append_row */
+ for (row = ct->start_row; row < ct->end_row; row++)
+ {
+ /* Use append_row to accumulate multiple rows in the buffer */
+ wt->append_row(&batch_buffer, row);
+
+ /* Send batch when buffer reaches size threshold */
+ if (batch_buffer.len >= COPY_BATCH_SIZE)
+ {
+ if (PQputCopyData(conn, batch_buffer.data,
batch_buffer.len) <= 0)
+ pg_fatal("error in PQputCopyData: %s",
PQerrorMessage(conn));
+
+ resetPQExpBuffer(&batch_buffer);
+ }
+ }
+
+ /* Send any remaining buffered data */
+ if (batch_buffer.len > 0)
+ {
+ if (PQputCopyData(conn, batch_buffer.data, batch_buffer.len) <=
0)
+ pg_fatal("error in PQputCopyData: %s",
PQerrorMessage(conn));
+ }
+
+ /* Finalize the COPY operation */
+ if (PQputCopyEnd(conn, NULL) <= 0)
+ pg_fatal("error in PQputCopyEnd: %s", PQerrorMessage(conn));
+
+ res = PQgetResult(conn);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ pg_fatal("COPY failed for table \"%s\": %s",
+ ct->table_name, PQerrorMessage(conn));
+ PQclear(res);
+
+ termPQExpBuffer(&batch_buffer);
+}
+
+static void
+assignWorkerRows(WorkerTask *wt, int num_workers, int64 total_rows)
+{
+ int64 rows_per_worker = total_rows / num_workers;
+
+ wt->start_row = wt->worker_id * rows_per_worker;
+ wt->end_row = (wt->worker_id == num_workers - 1) ?
+ total_rows :
+ (wt->worker_id + 1) * rows_per_worker;
+}
+
+/*
+ * Load data into non-partitioned table.
+ *
+ * Only worker 0 can use COPY FREEZE, because it inherits the transaction
+ * that truncated the table. Other workers use plain COPY in their own
+ * transactions.
+ */
+static void
+loadRegularTable(PGconn *conn, WorkerTask *wt)
+{
+ CopyTarget target;
+
+ target.table_name = wt->table;
+ target.start_row = wt->start_row;
+ target.end_row = wt->end_row;
+ target.use_freeze = (wt->worker_id == 0);
+
+ performCopy(conn, wt, &target);
+}
+
+static THREAD_FUNC_RETURN_TYPE THREAD_FUNC_CC
+initWorkerThread(void *arg)
+{
+ WorkerTask *wt = (WorkerTask *) arg;
+ PGconn *conn;
+
+ /* Connection is pre-created, just use it */
+ conn = wt->con;
+
+ /*
+ * Start a new transaction for this worker, except for worker 0.
+ * Worker 0 continues the transaction from the main thread that already
+ * did the truncate (to enable COPY FREEZE).
+ */
+ if (wt->worker_id > 0)
+ executeStatement(conn, "begin");
+
+ loadRegularTable(conn, wt);
+
+ executeStatement(conn, "commit");
+
+ THREAD_FUNC_RETURN;
+}
+
+static void
+initPopulateTableParallel(PGconn *connection, int num_workers,
+ const char *table, int64
total_rows,
+ initRowMethod append_row)
+{
+ THREAD_T *worker_threads;
+ WorkerTask *worker_data;
+ PGconn **connections;
+ int i;
+
+ /* Allocate worker data and threads */
+ worker_threads = pg_malloc(num_workers * sizeof(THREAD_T));
+ worker_data = pg_malloc0(num_workers * sizeof(WorkerTask));
+ connections = pg_malloc(num_workers * sizeof(PGconn *));
+
+ /* Reuse main connection for worker 0, create new ones for others */
+ connections[0] = connection;
+ for (i = 1; i < num_workers; i++)
+ {
+ connections[i] = doConnect();
+ if (connections[i] == NULL)
+ pg_fatal("could not create connection for worker %d",
i);
+ }
+
+ executeStatement(connections[0], "begin");
+ truncateTable(connections[0], table);
+
+ fprintf(stderr, "loading %s with %d threads...\n", table, num_workers);
+
+ /* Create and start worker threads */
+ for (i = 0; i < num_workers; i++)
+ {
+ worker_data[i].con = connections[i];
+ worker_data[i].table = table;
+ worker_data[i].append_row = append_row;
+ worker_data[i].worker_id = i;
+
+ assignWorkerRows(&worker_data[i], num_workers, total_rows);
+
+ errno = THREAD_CREATE(&worker_threads[i], initWorkerThread,
&worker_data[i]);
+ if (errno != 0)
+ pg_fatal("could not create thread for worker %d: %m",
i);
+ }
+
+ /*
+ * Wait for all workers to finish. Any worker failure calls pg_fatal(),
+ * which terminates the process, so if we get here all workers
succeeded.
+ */
+ for (i = 0; i < num_workers; i++)
+ THREAD_JOIN(worker_threads[i]);
+
+ /*
+ * Clean up worker connections (skip index 0, which is the main
+ * connection)
+ */
+ for (i = 1; i < num_workers; i++)
+ PQfinish(connections[i]);
+
+ free(connections);
+ free(worker_threads);
+ free(worker_data);
+}
+
/* qsort comparator for Variable array */
static int
compareVariableNames(const void *v1, const void *v2)
@@ -4990,11 +5212,7 @@ initCreateTables(PGconn *con)
static void
initTruncateTables(PGconn *con)
{
- executeStatement(con, "truncate table "
- "pgbench_accounts, "
- "pgbench_branches, "
- "pgbench_history, "
- "pgbench_tellers");
+ truncateTable(con, "pgbench_accounts, pgbench_branches,
pgbench_history, pgbench_tellers");
}
static void
@@ -5024,9 +5242,41 @@ initAccount(PQExpBufferData *sql, int64 curr)
curr + 1, curr / naccounts + 1);
}
+/*
+ * Append-based versions to enable batching.
+ * These use appendPQExpBuffer instead of printfPQExpBuffer to allow
+ * multiple rows to be accumulated in a single buffer.
+ */
static void
-initPopulateTable(PGconn *con, const char *table, int64 base,
- initRowMethod init_row)
+appendBranch(PQExpBufferData *sql, int64 curr)
+{
+ /* "filler" column uses NULL */
+ appendPQExpBuffer(sql,
+ INT64_FORMAT "\t0\t\\N\n",
+ curr + 1);
+}
+
+static void
+appendTeller(PQExpBufferData *sql, int64 curr)
+{
+ /* "filler" column uses NULL */
+ appendPQExpBuffer(sql,
+ INT64_FORMAT "\t" INT64_FORMAT
"\t0\t\\N\n",
+ curr + 1, curr / ntellers + 1);
+}
+
+static void
+appendAccount(PQExpBufferData *sql, int64 curr)
+{
+ /* "filler" column defaults to blank padded empty string */
+ appendPQExpBuffer(sql,
+ INT64_FORMAT "\t" INT64_FORMAT
"\t0\t\n",
+ curr + 1, curr / naccounts + 1);
+}
+
+static void
+initPopulateTableSerial(PGconn *con, const char *table, int64 base,
+ initRowMethod init_row)
{
int n;
int64 k;
@@ -5034,7 +5284,7 @@ initPopulateTable(PGconn *con, const char *table, int64
base,
int prev_chars = 0;
PGresult *res;
PQExpBufferData sql;
- char copy_statement[256];
+ char copy_statement[NAMEDATALEN + 32];
const char *copy_statement_fmt = "copy %s from stdin";
int64 total = base * scale;
@@ -5143,6 +5393,16 @@ initPopulateTable(PGconn *con, const char *table, int64
base,
termPQExpBuffer(&sql);
}
+static void
+initPopulateTable(PGconn *con, const char *table, int64 total_rows,
+ initRowMethod init_row, initRowMethod
append_row, bool use_parallel)
+{
+ if (use_parallel && nthreads > 1 && partition_method == PART_NONE)
+ initPopulateTableParallel(con, nthreads, table, total_rows *
scale, append_row);
+ else
+ initPopulateTableSerial(con, table, total_rows, init_row);
+}
+
/*
* Fill the standard tables with some data generated and sent from the client.
*
@@ -5155,8 +5415,9 @@ initGenerateDataClientSide(PGconn *con)
fprintf(stderr, "generating data (client-side)...\n");
/*
- * we do all of this in one transaction to enable the backend's
- * data-loading optimizations
+ * For single-threaded mode, do everything in one transaction. For
+ * multi-threaded mode, do branches/tellers/history in one transaction,
+ * then accounts in parallel (each thread handles its own transaction).
*/
executeStatement(con, "begin");
@@ -5167,11 +5428,16 @@ initGenerateDataClientSide(PGconn *con)
* fill branches, tellers, accounts in that order in case foreign keys
* already exist
*/
- initPopulateTable(con, "pgbench_branches", nbranches, initBranch);
- initPopulateTable(con, "pgbench_tellers", ntellers, initTeller);
- initPopulateTable(con, "pgbench_accounts", naccounts, initAccount);
+ initPopulateTable(con, "pgbench_branches", nbranches, initBranch,
appendBranch, false);
+ initPopulateTable(con, "pgbench_tellers", ntellers, initTeller,
appendTeller, false);
- executeStatement(con, "commit");
+ if (nthreads > 1)
+ executeStatement(con, "commit");
+
+ initPopulateTable(con, "pgbench_accounts", naccounts, initAccount,
appendAccount, nthreads > 1);
+
+ if (nthreads == 1)
+ executeStatement(con, "commit");
}
/*
@@ -6944,7 +7210,6 @@ main(int argc, char **argv)
initialization_option_set = true;
break;
case 'j': /* jobs */
- benchmarking_option_set = true;
if (!option_parse_int(optarg, "-j/--jobs", 1,
INT_MAX,
&nthreads))
{
@@ -7176,7 +7441,7 @@ main(int argc, char **argv)
* optimization; throttle_delay is calculated incorrectly below if some
* threads have no clients assigned to them.)
*/
- if (nthreads > nclients)
+ if (nthreads > nclients && !is_init_mode)
nthreads = nclients;
/*
diff --git a/src/bin/pgbench/t/001_pgbench_with_server.pl
b/src/bin/pgbench/t/001_pgbench_with_server.pl
index b7685ea5d2..b0a4b973b4 100644
--- a/src/bin/pgbench/t/001_pgbench_with_server.pl
+++ b/src/bin/pgbench/t/001_pgbench_with_server.pl
@@ -217,6 +217,26 @@ my $nthreads = 2;
$nthreads = 1 if $stderr =~ m/threads are not supported on this
platform/;
}
+# Test parallel initialization (requires thread support)
+if ($nthreads > 1)
+{
+ # Parallel init without partitions
+ $node->pgbench(
+ '-i -j 2 --scale=1',
+ 0,
+ [qr{^$}],
+ [
+ qr{creating tables},
+ qr{loading pgbench_accounts with 2 threads},
+ qr{vacuuming},
+ qr{creating primary keys},
+ qr{done in \d+\.\d\d s }
+ ],
+ 'pgbench parallel initialization without partitions');
+
+ check_data_state($node, 'parallel-no-partitions');
+}
+
# run custom scripts
$node->pgbench(
"-t 100 -c 1 -j $nthreads -M prepared -n",
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 3da19d4141..f2ba9c75a6 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -541,6 +541,7 @@ CopyOnErrorChoice
CopySeqResult
CopySource
CopyStmt
+CopyTarget
CopyToRoutine
CopyToState
CopyToStateData
@@ -3383,6 +3384,7 @@ WorkerInfoData
WorkerInstrumentation
WorkerJobDumpPtrType
WorkerJobRestorePtrType
+WorkerTask
Working_State
WriteBufPtrType
WriteBytePtrType
--
2.39.5 (Apple Git-154)
From d8d0b4e92b118290c895be3f9ff21ff2ad79f0fa Mon Sep 17 00:00:00 2001
From: Mircea Cadariu <[email protected]>
Date: Wed, 11 Mar 2026 16:55:50 +0000
Subject: [PATCH 2/2] Extend pgbench parallel data loading to range-partitioned
tables
Each worker thread creates and loads complete partitions as standalone
tables, then partitions are attached to the parent after all data is
loaded. This avoids AccessExclusiveLock contention on the parent
table during parallel loading and allows COPY FREEZE on each partition.
The number of threads is capped to the number of partitions. Hash
partitioning with parallel loading is not supported and raises an error.
---
doc/src/sgml/ref/pgbench.sgml | 8 +-
src/bin/pgbench/pgbench.c | 186 ++++++++++++++++---
src/bin/pgbench/t/001_pgbench_with_server.pl | 17 ++
src/bin/pgbench/t/002_pgbench_no_server.pl | 5 +
4 files changed, 189 insertions(+), 27 deletions(-)
diff --git a/doc/src/sgml/ref/pgbench.sgml b/doc/src/sgml/ref/pgbench.sgml
index 41772442d1..8dcee8f423 100644
--- a/doc/src/sgml/ref/pgbench.sgml
+++ b/doc/src/sgml/ref/pgbench.sgml
@@ -506,8 +506,12 @@ pgbench <optional> <replaceable>options</replaceable>
</optional> <replaceable>d
In initialization mode (<option>-i</option>), this option controls
the number of threads used to load data into
<literal>pgbench_accounts</literal> in parallel.
- Parallel data loading is currently supported for
- non-partitioned tables only.
+ With <option>--partitions</option> using
+ <literal>range</literal> partitioning, each thread loads one or more
+ complete partitions independently.
+ The number of threads is limited to the number of partitions.
+ Parallel data loading is not currently supported with
+ <literal>hash</literal> partitioning.
</para>
</listitem>
</varlistentry>
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index 1ecd44c4ab..938fec1cc5 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -849,6 +849,8 @@ static void clear_socket_set(socket_set *sa);
static void add_socket_to_set(socket_set *sa, int fd, int idx);
static int wait_on_socket_set(socket_set *sa, int64 usecs);
static bool socket_has_input(socket_set *sa, int fd, int idx);
+static void createPartitions(PGconn *con, int part_start, int part_end);
+static void attachPartitions(PGconn *con);
/* callback used to build rows for COPY during data loading */
typedef void (*initRowMethod) (PQExpBufferData *sql, int64 curr);
@@ -867,6 +869,9 @@ typedef struct WorkerTask
int64 end_row;
initRowMethod append_row;
int worker_id;
+ int part_start;
+ int part_end;
+ int64 part_size;
} WorkerTask;
static char
@@ -1701,6 +1706,52 @@ assignWorkerRows(WorkerTask *wt, int num_workers, int64
total_rows)
(wt->worker_id + 1) * rows_per_worker;
}
+/*
+ * Covers only multiple partitions per worker (workers <= partitions) for now.
+ * Each worker loads complete partitions independently and can use COPY FREEZE.
+ */
+static void
+assignWorkerPartitions(WorkerTask *wt, int num_workers, int64 total_rows,
+ int num_parts)
+{
+ int parts_per_worker = num_parts / num_workers;
+ int extra_parts = num_parts % num_workers;
+
+ wt->part_start = wt->worker_id * parts_per_worker + 1 +
+ (wt->worker_id < extra_parts ? wt->worker_id : extra_parts);
+ wt->part_end = wt->part_start + parts_per_worker - 1 +
+ (wt->worker_id < extra_parts ? 1 : 0);
+
+ wt->start_row = (wt->part_start - 1) * wt->part_size;
+ wt->end_row = (wt->part_end == num_parts) ?
+ total_rows :
+ wt->part_end * wt->part_size;
+}
+
+/* Load data into partitioned table */
+static void
+loadPartitionedTable(PGconn *conn, WorkerTask *wt)
+{
+ int p;
+
+ for (p = wt->part_start; p <= wt->part_end; p++)
+ {
+ CopyTarget target;
+ int64 part_start_row = (p - 1) * wt->part_size;
+ int64 part_end_row = (p == partitions) ? (naccounts *
(int64) scale) : (p * wt->part_size);
+ char partition_table[NAMEDATALEN];
+
+ snprintf(partition_table, sizeof(partition_table),
"pgbench_accounts_%d", p);
+
+ target.table_name = partition_table;
+ target.start_row = part_start_row;
+ target.end_row = part_end_row;
+ target.use_freeze = true;
+
+ performCopy(conn, wt, &target);
+ }
+}
+
/*
* Load data into non-partitioned table.
*
@@ -1731,14 +1782,20 @@ initWorkerThread(void *arg)
conn = wt->con;
/*
- * Start a new transaction for this worker, except for worker 0.
- * Worker 0 continues the transaction from the main thread that already
- * did the truncate (to enable COPY FREEZE).
+ * Start a new transaction for this worker, except for worker 0 on
+ * non-partitioned tables. Worker 0 continues the transaction from the
+ * main thread that already did the truncate (to enable COPY FREEZE).
*/
- if (wt->worker_id > 0)
+ if (wt->part_start > 0 || wt->worker_id > 0)
executeStatement(conn, "begin");
- loadRegularTable(conn, wt);
+ if (wt->part_start > 0)
+ {
+ createPartitions(conn, wt->part_start, wt->part_end);
+ loadPartitionedTable(conn, wt);
+ }
+ else
+ loadRegularTable(conn, wt);
executeStatement(conn, "commit");
@@ -1753,6 +1810,7 @@ initPopulateTableParallel(PGconn *connection, int
num_workers,
THREAD_T *worker_threads;
WorkerTask *worker_data;
PGconn **connections;
+ bool is_partitioned;
int i;
/* Allocate worker data and threads */
@@ -1769,9 +1827,17 @@ initPopulateTableParallel(PGconn *connection, int
num_workers,
pg_fatal("could not create connection for worker %d",
i);
}
+ /* Works only for pgbench_accounts and the range partitioning option */
+ is_partitioned = strcmp(table, "pgbench_accounts") == 0 &&
partition_method == PART_RANGE;
+
executeStatement(connections[0], "begin");
truncateTable(connections[0], table);
+ if (is_partitioned)
+ {
+ executeStatement(connections[0], "commit");
+ }
+
fprintf(stderr, "loading %s with %d threads...\n", table, num_workers);
/* Create and start worker threads */
@@ -1782,7 +1848,14 @@ initPopulateTableParallel(PGconn *connection, int
num_workers,
worker_data[i].append_row = append_row;
worker_data[i].worker_id = i;
- assignWorkerRows(&worker_data[i], num_workers, total_rows);
+ if (!is_partitioned)
+ assignWorkerRows(&worker_data[i], num_workers,
total_rows);
+ else
+ {
+ worker_data[i].part_size = (naccounts * (int64) scale +
partitions - 1) / partitions;
+ assignWorkerPartitions(&worker_data[i], num_workers,
total_rows,
+ partitions);
+ }
errno = THREAD_CREATE(&worker_threads[i], initWorkerThread,
&worker_data[i]);
if (errno != 0)
@@ -1796,6 +1869,12 @@ initPopulateTableParallel(PGconn *connection, int
num_workers,
for (i = 0; i < num_workers; i++)
THREAD_JOIN(worker_threads[i]);
+ /*
+ * For partitioned tables, attach all partitions now that data is
loaded.
+ */
+ if (is_partitioned)
+ attachPartitions(connection);
+
/*
* Clean up worker connections (skip index 0, which is the main
* connection)
@@ -5046,14 +5125,50 @@ initDropTables(PGconn *con)
* with a known size, so we choose to partition it.
*/
static void
-createPartitions(PGconn *con)
+createPartitions(PGconn *con, int part_start, int part_end)
{
PQExpBufferData query;
/* we must have to create some partitions */
Assert(partitions > 0);
- fprintf(stderr, "creating %d partitions...\n", partitions);
+ initPQExpBuffer(&query);
+
+ for (int p = part_start; p <= part_end; p++)
+ {
+ /*
+ * Create standalone tables (not attached to parent yet). This
avoids
+ * AccessExclusiveLock on the parent table, allowing parallel
+ * creation. Tables will be attached after data loading.
+ */
+ printfPQExpBuffer(&query,
+ "create%s table
pgbench_accounts_%d\n"
+ " (aid int not null,\n"
+ " bid int,\n"
+ " abalance int,\n"
+ " filler character(84))\n"
+ " with (fillfactor=%d)",
+ unlogged_tables ? " unlogged"
: "", p,
+ fillfactor);
+
+ executeStatement(con, query.data);
+ }
+
+ termPQExpBuffer(&query);
+}
+
+/*
+ * Attach standalone partition tables to the parent table.
+ * Called after all data has been loaded in parallel.
+ */
+static void
+attachPartitions(PGconn *con)
+{
+ PQExpBufferData query;
+
+ Assert(partitions > 0);
+
+ fprintf(stderr, "attaching %d partitions...\n", partitions);
initPQExpBuffer(&query);
@@ -5064,10 +5179,9 @@ createPartitions(PGconn *con)
int64 part_size = (naccounts * (int64) scale
+ partitions - 1) / partitions;
printfPQExpBuffer(&query,
- "create%s table
pgbench_accounts_%d\n"
- " partition of
pgbench_accounts\n"
- " for values from (",
- unlogged_tables ? "
unlogged" : "", p);
+ "alter table
pgbench_accounts\n"
+ " attach partition
pgbench_accounts_%d\n"
+ " for values from
(", p);
/*
* For RANGE, we use open-ended partitions at the
beginning and
@@ -5090,21 +5204,16 @@ createPartitions(PGconn *con)
appendPQExpBufferChar(&query, ')');
}
else if (partition_method == PART_HASH)
+ {
printfPQExpBuffer(&query,
- "create%s table
pgbench_accounts_%d\n"
- " partition of
pgbench_accounts\n"
+ "alter table
pgbench_accounts\n"
+ " attach partition
pgbench_accounts_%d\n"
" for values with
(modulus %d, remainder %d)",
- unlogged_tables ? "
unlogged" : "", p,
- partitions, p - 1);
+ p, partitions, p - 1);
+ }
else /* cannot get there */
Assert(0);
- /*
- * Per ddlinfo in initCreateTables, fillfactor is needed on
table
- * pgbench_accounts.
- */
- appendPQExpBuffer(&query, " with (fillfactor=%d)", fillfactor);
-
executeStatement(con, query.data);
}
@@ -5202,8 +5311,11 @@ initCreateTables(PGconn *con)
termPQExpBuffer(&query);
- if (partition_method != PART_NONE)
- createPartitions(con);
+ if (partition_method != PART_NONE && (nthreads == 1 || partition_method
== PART_HASH))
+ {
+ fprintf(stderr, "creating %d partitions...\n", partitions);
+ createPartitions(con, 1, partitions);
+ }
}
/*
@@ -5397,10 +5509,21 @@ static void
initPopulateTable(PGconn *con, const char *table, int64 total_rows,
initRowMethod init_row, initRowMethod
append_row, bool use_parallel)
{
- if (use_parallel && nthreads > 1 && partition_method == PART_NONE)
+ bool is_accounts = (strcmp(table, "pgbench_accounts") == 0);
+
+ if (use_parallel && nthreads > 1)
initPopulateTableParallel(con, nthreads, table, total_rows *
scale, append_row);
else
+ {
+ /*
+ * For single-threaded mode with partitioned tables, attach
partitions
+ * before loading data so COPY to the parent table can route
rows.
+ */
+ if (is_accounts && partitions > 0 && partition_method !=
PART_NONE)
+ attachPartitions(con);
+
initPopulateTableSerial(con, table, total_rows, init_row);
+ }
}
/*
@@ -5463,6 +5586,9 @@ initGenerateDataServerSide(PGconn *con)
/* truncate away any old data */
initTruncateTables(con);
+ if (partitions > 0 && partition_method != PART_NONE)
+ attachPartitions(con);
+
initPQExpBuffer(&sql);
printfPQExpBuffer(&sql,
@@ -7486,6 +7612,16 @@ main(int argc, char **argv)
if (partitions > 0 && partition_method == PART_NONE)
partition_method = PART_RANGE;
+ if (partition_method == PART_HASH && nthreads > 1)
+ pg_fatal("parallel data loading (-j) is not supported
with hash partitioning");
+
+ /*
+ * For partitioned tables, limit the number of threads to the
number of
+ * partitions, since each worker handles at least one partition.
+ */
+ if (partition_method == PART_RANGE && nthreads > partitions)
+ nthreads = partitions;
+
if (initialize_steps == NULL)
initialize_steps = pg_strdup(DEFAULT_INIT_STEPS);
diff --git a/src/bin/pgbench/t/001_pgbench_with_server.pl
b/src/bin/pgbench/t/001_pgbench_with_server.pl
index b0a4b973b4..c64ca622d7 100644
--- a/src/bin/pgbench/t/001_pgbench_with_server.pl
+++ b/src/bin/pgbench/t/001_pgbench_with_server.pl
@@ -235,6 +235,23 @@ if ($nthreads > 1)
'pgbench parallel initialization without partitions');
check_data_state($node, 'parallel-no-partitions');
+
+ # Parallel init with range partitions
+ $node->pgbench(
+ '-i -j 2 --scale=1 --partitions=4 --partition-method=range',
+ 0,
+ [qr{^$}],
+ [
+ qr{creating tables},
+ qr{loading pgbench_accounts with 2 threads},
+ qr{attaching 4 partitions},
+ qr{vacuuming},
+ qr{creating primary keys},
+ qr{done in \d+\.\d\d s }
+ ],
+ 'pgbench parallel initialization with range partitions');
+
+ check_data_state($node, 'parallel-range-partitions');
}
# run custom scripts
diff --git a/src/bin/pgbench/t/002_pgbench_no_server.pl
b/src/bin/pgbench/t/002_pgbench_no_server.pl
index e694e9ef0f..d67f26e422 100644
--- a/src/bin/pgbench/t/002_pgbench_no_server.pl
+++ b/src/bin/pgbench/t/002_pgbench_no_server.pl
@@ -187,6 +187,11 @@ my @options = (
'-i --partition-method=hash',
[qr{partition-method requires greater than zero --partitions}]
],
+ [
+ 'parallel data loading with hash partitioning',
+ '-i -j 2 --partitions=4 --partition-method=hash',
+ [qr{parallel data loading \(-j\) is not supported with hash
partitioning}]
+ ],
[
'bad maximum number of tries',
'--max-tries -10',
--
2.39.5 (Apple Git-154)