Hi,
I propose a patch for speeding up pgbench -i through multithreading.
To enable this, pass -j and then the number of workers you want to use.
Here are some results I got on my laptop:
master
---
-i -s 100
done in 20.95 s (drop tables 0.00 s, create tables 0.01 s, client-side
generate 14.51 s, vacuum 0.27 s, primary keys 6.16 s).
-i -s 100 --partitions=10
done in 29.73 s (drop tables 0.00 s, create tables 0.02 s, client-side
generate 16.33 s, vacuum 8.72 s, primary keys 4.67 s).
patch (-j 10)
---
-i -s 100 -j 10
done in 18.64 s (drop tables 0.00 s, create tables 0.01 s, client-side
generate 5.82 s, vacuum 6.89 s, primary keys 5.93 s).
-i -s 100 -j 10 --partitions=10
done in 14.66 s (drop tables 0.00 s, create tables 0.01 s, client-side
generate 8.42 s, vacuum 1.55 s, primary keys 4.68 s).
The speedup is more significant for the partitioned use-case. This is
because all workers can use COPY FREEZE (thus incurring a lower vacuum
penalty) because they create their separate partitions.
For the non-partitioned case the speedup is lower, but I observe it
improves somewhat with larger scale factors. When parallel vacuum
support is merged, this should further reduce the time.
I'd still need to update docs, tests, better integrate the code with its
surroundings, and other aspects. Would appreciate any feedback on what I
have so far though. Thanks!
Kind regards,
Mircea Cadariu
From 18d91ec9c22d43522dc1cd83c16359c36b3dc58d Mon Sep 17 00:00:00 2001
From: Mircea Cadariu <[email protected]>
Date: Sun, 9 Nov 2025 10:41:51 +0000
Subject: [PATCH v1] wip
---
src/bin/pgbench/pgbench.c | 455 +++++++++++++++++++++++++++++++++++---
1 file changed, 420 insertions(+), 35 deletions(-)
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index a425176ecd..ef4e05678a 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -78,6 +78,8 @@
#define ERRCODE_T_R_DEADLOCK_DETECTED "40P01"
#define ERRCODE_UNDEFINED_TABLE "42P01"
+#define COPY_BATCH_SIZE (1024 * 1024)
+
/*
* Hashing constants
*/
@@ -825,7 +827,6 @@ static const BuiltinScript builtin_script[] =
}
};
-
/* Function prototypes */
static void setNullValue(PgBenchValue *pv);
static void setBoolValue(PgBenchValue *pv, bool bval);
@@ -848,6 +849,8 @@ static void clear_socket_set(socket_set *sa);
static void add_socket_to_set(socket_set *sa, int fd, int idx);
static int wait_on_socket_set(socket_set *sa, int64 usecs);
static bool socket_has_input(socket_set *sa, int fd, int idx);
+static void createPartitions(PGconn *con, int part_start, int part_end);
+static void attachPartitions(PGconn *con);
/* callback used to build rows for COPY during data loading */
typedef void (*initRowMethod) (PQExpBufferData *sql, int64 curr);
@@ -856,6 +859,19 @@ typedef void (*initRowMethod) (PQExpBufferData *sql, int64
curr);
static const PsqlScanCallbacks pgbench_callbacks = {
NULL, /* don't need
get_variable functionality */
};
+/* Worker thread data for parallel table loading */
+typedef struct WorkerTaskDescription
+{
+ PGconn *con;
+ const char *table;
+ int64 start_row;
+ int64 end_row;
+ initRowMethod append_row;
+ int worker_id;
+ int part_start;
+ int part_end;
+ int64 part_size;
+} WorkerTaskDescription;
static char
get_table_relkind(PGconn *con, const char *table)
@@ -1631,6 +1647,277 @@ doConnect(void)
return conn;
}
+/*
+ * Truncate specified table(s)
+ * tableName can be a single table or comma-separated list of tables
+ */
+static void
+truncateTable(PGconn *con, const char *tableName)
+{
+ PQExpBufferData query;
+
+ initPQExpBuffer(&query);
+ printfPQExpBuffer(&query, "TRUNCATE TABLE %s", tableName);
+ executeStatement(con, query.data);
+ termPQExpBuffer(&query);
+}
+
+/*
+ * Parameters needed for COPY operations.
+ */
+typedef struct CopyTarget
+{
+ const char *table_name;
+ int64 start_row;
+ int64 end_row;
+ bool use_freeze;
+} CopyTarget;
+
+/*
+ * Perform COPY operation for a single table or partition.
+ * Batches rows into larger buffers before sending to reduce overhead.
+ */
+static void
+performCopy(PGconn *conn, WorkerTaskDescription *wtd, CopyTarget *target)
+{
+ PGresult *res;
+ char copy_statement[NAMEDATALEN + 32];
+ int64 row;
+ PQExpBufferData batch_buffer;
+
+ /* Build the COPY command */
+ if (target->use_freeze)
+ snprintf(copy_statement, sizeof(copy_statement),
+ "COPY %s FROM STDIN (FREEZE ON)",
target->table_name);
+ else
+ snprintf(copy_statement, sizeof(copy_statement),
+ "COPY %s FROM STDIN", target->table_name);
+
+ /* Initiate COPY mode */
+ res = PQexec(conn, copy_statement);
+ if (PQresultStatus(res) != PGRES_COPY_IN)
+ pg_fatal("COPY command failed for table \"%s\": %s",
+ target->table_name, PQerrorMessage(conn));
+ PQclear(res);
+
+ /* Pre-allocate buffer to avoid repeated reallocs */
+ initPQExpBuffer(&batch_buffer);
+ enlargePQExpBuffer(&batch_buffer, COPY_BATCH_SIZE);
+
+ /* Generate and send rows in batches using append_row */
+ for (row = target->start_row; row < target->end_row; row++)
+ {
+ /* Use append_row to accumulate multiple rows in the buffer */
+ wtd->append_row(&batch_buffer, row);
+
+ /* Send batch when buffer reaches size threshold */
+ if (batch_buffer.len >= COPY_BATCH_SIZE)
+ {
+ if (PQputCopyData(conn, batch_buffer.data,
batch_buffer.len) <= 0)
+ pg_fatal("error in PQputCopyData: %s",
PQerrorMessage(conn));
+
+ resetPQExpBuffer(&batch_buffer);
+ }
+ }
+
+ /* Send any remaining buffered data */
+ if (batch_buffer.len > 0)
+ {
+ if (PQputCopyData(conn, batch_buffer.data, batch_buffer.len) <=
0)
+ pg_fatal("error in PQputCopyData: %s",
PQerrorMessage(conn));
+ }
+
+ /* Finalize the COPY operation */
+ if (PQputCopyEnd(conn, NULL) <= 0)
+ pg_fatal("error in PQputCopyEnd: %s", PQerrorMessage(conn));
+
+ res = PQgetResult(conn);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ pg_fatal("COPY failed for table \"%s\": %s",
+ target->table_name, PQerrorMessage(conn));
+ PQclear(res);
+
+ termPQExpBuffer(&batch_buffer);
+}
+
+
+static void
+assignWorkerRows(WorkerTaskDescription *wtd, int num_workers, int64 total_rows)
+{
+ int64 rows_per_worker = total_rows / num_workers;
+
+ wtd->start_row = wtd->worker_id * rows_per_worker;
+ wtd->end_row = (wtd->worker_id == num_workers - 1) ?
+ total_rows :
+ (wtd->worker_id + 1) * rows_per_worker;
+}
+
+/*
+ * Covers only multiple partitions per worker (workers <= partitions) for now.
+ * Each worker loads complete partitions independently and can use COPY FREEZE.
+ */
+static void
+assignWorkerPartitions(WorkerTaskDescription *wtd, int num_workers, int64
total_rows,
+ int num_parts)
+{
+ int parts_per_worker = num_parts / num_workers;
+ int extra_parts = num_parts % num_workers;
+
+ wtd->part_start = wtd->worker_id * parts_per_worker + 1 +
+ (wtd->worker_id < extra_parts ? wtd->worker_id : extra_parts);
+ wtd->part_end = wtd->part_start + parts_per_worker - 1 +
+ (wtd->worker_id < extra_parts ? 1 : 0);
+
+ wtd->start_row = (wtd->part_start - 1) * wtd->part_size;
+ wtd->end_row = (wtd->part_end == num_parts) ?
+ total_rows :
+ wtd->part_end * wtd->part_size;
+}
+
+
+/* Load data into partitioned table */
+static void
+loadPartitionedTable(PGconn *conn, WorkerTaskDescription *wtd)
+{
+ int p;
+
+ for (p = wtd->part_start; p <= wtd->part_end; p++)
+ {
+ CopyTarget target;
+ int64 part_start_row = (p - 1) * wtd->part_size;
+ int64 part_end_row = (p == partitions) ? (naccounts *
(int64) scale) : (p * wtd->part_size);
+ char partition_table[NAMEDATALEN];
+
+ snprintf(partition_table, sizeof(partition_table),
"pgbench_accounts_%d", p);
+
+ target.table_name = partition_table;
+ target.start_row = part_start_row;
+ target.end_row = part_end_row;
+ target.use_freeze = true;
+
+ performCopy(conn, wtd, &target);
+ }
+}
+
+/* Load data into non-partitioned table */
+static void
+loadRegularTable(PGconn *conn, WorkerTaskDescription *wtd)
+{
+ CopyTarget target;
+
+ target.table_name = wtd->table;
+ target.start_row = wtd->start_row;
+ target.end_row = wtd->end_row;
+ target.use_freeze = (wtd->worker_id == 0);
+
+ performCopy(conn, wtd, &target);
+}
+
+static THREAD_FUNC_RETURN_TYPE THREAD_FUNC_CC
+initWorkerThread(void *arg)
+{
+ WorkerTaskDescription *wtd = (WorkerTaskDescription *) arg;
+ PGconn *conn;
+
+ /* Connection is pre-created, just use it */
+ conn = wtd->con;
+
+ /*
+ * Start a new transaction for this worker, except for worker 0 on
+ * non-partitioned tables. Worker 0 continues the transaction from the
+ * main thread that already did the truncate (to enable COPY FREEZE).
+ */
+ if (wtd->part_start > 0 || wtd->worker_id > 0)
+ executeStatement(conn, "begin");
+
+ if (wtd->part_start > 0)
+ {
+ createPartitions(conn, wtd->part_start, wtd->part_end);
+ loadPartitionedTable(conn, wtd);
+ }
+ else
+ loadRegularTable(conn, wtd);
+
+ executeStatement(conn, "commit");
+
+ return NULL;
+}
+
+static void
+initPopulateTableParallel(PGconn *connection, int num_workers,
+ const char *table, int64
total_rows,
+ initRowMethod append_row)
+{
+ THREAD_T *worker_threads;
+ WorkerTaskDescription *worker_data;
+ PGconn **connections;
+ bool is_partitioned;
+ int i;
+
+ /* Allocate worker data and threads */
+ worker_threads = pg_malloc(num_workers * sizeof(pthread_t));
+ worker_data = pg_malloc0(num_workers * sizeof(WorkerTaskDescription));
+ connections = pg_malloc(num_workers * sizeof(PGconn *));
+
+ /* Reuse main connection for worker 0, create new ones for others */
+ connections[0] = connection;
+ for (i = 1; i < num_workers; i++)
+ connections[i] = doConnect();
+
+ /* Works only for pgbench_accounts and the range partitioning option */
+ is_partitioned = strcmp(table, "pgbench_accounts") == 0 &&
partition_method == PART_RANGE;
+
+ /* For partitioned tables, we handle only num_workers <= partitions for
now */
+ if (is_partitioned && num_workers > partitions)
+ pg_fatal("number of threads (%d) must not exceed the number of
partitions (%d)",
+ num_workers, partitions);
+
+ executeStatement(connections[0], "begin");
+ truncateTable(connections[0], table);
+
+ if (is_partitioned)
+ {
+ executeStatement(connections[0], "commit");
+ }
+
+ /* Create and start worker threads */
+ for (i = 0; i < num_workers; i++)
+ {
+ worker_data[i].con = connections[i];
+ worker_data[i].table = table;
+ worker_data[i].append_row = append_row;
+ worker_data[i].worker_id = i;
+
+ if (!is_partitioned)
+ assignWorkerRows(&worker_data[i], num_workers,
total_rows);
+ else
+ {
+ worker_data[i].part_size = (naccounts * (int64) scale +
partitions - 1) / partitions;
+ assignWorkerPartitions(&worker_data[i], num_workers,
total_rows,
+ partitions);
+ }
+
+ THREAD_CREATE(&worker_threads[i], initWorkerThread,
&worker_data[i]);
+ }
+
+ for (i = 0; i < num_workers; i++)
+ THREAD_JOIN(worker_threads[i]);
+
+ /*
+ * For partitioned tables, attach all partitions now that data is
loaded.
+ */
+ if (is_partitioned)
+ attachPartitions(connection);
+
+ /* Clean up worker connections (skip index 0, which is the main
connection) */
+ for (i = 1; i < num_workers; i++)
+ PQfinish(connections[i]);
+
+ free(connections);
+ free(worker_threads);
+ free(worker_data);
+}
+
/* qsort comparator for Variable array */
static int
compareVariableNames(const void *v1, const void *v2)
@@ -4869,14 +5156,58 @@ initDropTables(PGconn *con)
* with a known size, so we choose to partition it.
*/
static void
-createPartitions(PGconn *con)
+createPartitions(PGconn *con, int part_start, int part_end)
{
PQExpBufferData query;
/* we must have to create some partitions */
Assert(partitions > 0);
- fprintf(stderr, "creating %d partitions...\n", partitions);
+ /* If called with -1, create all partitions */
+ if (part_start == -1)
+ {
+ part_start = 1;
+ part_end = partitions;
+ fprintf(stderr, "creating %d partitions...\n", partitions);
+ }
+
+ initPQExpBuffer(&query);
+
+ for (int p = part_start; p <= part_end; p++)
+ {
+ /*
+ * Create standalone tables (not attached to parent yet).
+ * This avoids AccessExclusiveLock on the parent table, allowing
+ * parallel creation. Tables will be attached after data
loading.
+ */
+ printfPQExpBuffer(&query,
+ "create%s table
pgbench_accounts_%d\n"
+ " (aid int not null,\n"
+ " bid int,\n"
+ " abalance int,\n"
+ " filler character(84))\n"
+ " with (fillfactor=%d)",
+ unlogged_tables ? " unlogged"
: "", p,
+ fillfactor);
+
+ executeStatement(con, query.data);
+ }
+
+ termPQExpBuffer(&query);
+}
+
+/*
+ * Attach standalone partition tables to the parent table.
+ * Called after all data has been loaded in parallel.
+ */
+static void
+attachPartitions(PGconn *con)
+{
+ PQExpBufferData query;
+
+ Assert(partitions > 0);
+
+ fprintf(stderr, "attaching %d partitions...\n", partitions);
initPQExpBuffer(&query);
@@ -4884,13 +5215,12 @@ createPartitions(PGconn *con)
{
if (partition_method == PART_RANGE)
{
- int64 part_size = (naccounts * (int64) scale
+ partitions - 1) / partitions;
+ int64 part_size = (naccounts * (int64) scale +
partitions - 1) / partitions;
printfPQExpBuffer(&query,
- "create%s table
pgbench_accounts_%d\n"
- " partition of
pgbench_accounts\n"
- " for values from (",
- unlogged_tables ? "
unlogged" : "", p);
+ "alter table
pgbench_accounts\n"
+ " attach partition
pgbench_accounts_%d\n"
+ " for values from
(", p);
/*
* For RANGE, we use open-ended partitions at the
beginning and
@@ -4913,21 +5243,16 @@ createPartitions(PGconn *con)
appendPQExpBufferChar(&query, ')');
}
else if (partition_method == PART_HASH)
+ {
printfPQExpBuffer(&query,
- "create%s table
pgbench_accounts_%d\n"
- " partition of
pgbench_accounts\n"
+ "alter table
pgbench_accounts\n"
+ " attach partition
pgbench_accounts_%d\n"
" for values with
(modulus %d, remainder %d)",
- unlogged_tables ? "
unlogged" : "", p,
- partitions, p - 1);
+ p, partitions, p - 1);
+ }
else /* cannot get there */
Assert(0);
- /*
- * Per ddlinfo in initCreateTables, fillfactor is needed on
table
- * pgbench_accounts.
- */
- appendPQExpBuffer(&query, " with (fillfactor=%d)", fillfactor);
-
executeStatement(con, query.data);
}
@@ -5025,8 +5350,8 @@ initCreateTables(PGconn *con)
termPQExpBuffer(&query);
- if (partition_method != PART_NONE)
- createPartitions(con);
+ if (partition_method != PART_NONE && (nthreads == 1 || partition_method
== PART_HASH))
+ createPartitions(con, -1, -1);
}
/*
@@ -5035,11 +5360,7 @@ initCreateTables(PGconn *con)
static void
initTruncateTables(PGconn *con)
{
- executeStatement(con, "truncate table "
- "pgbench_accounts, "
- "pgbench_branches, "
- "pgbench_history, "
- "pgbench_tellers");
+ truncateTable(con, "pgbench_accounts, pgbench_branches,
pgbench_history, pgbench_tellers");
}
static void
@@ -5069,8 +5390,40 @@ initAccount(PQExpBufferData *sql, int64 curr)
curr + 1, curr / naccounts + 1);
}
+/*
+ * Append-based versions to enable batching.
+ * These use appendPQExpBuffer instead of printfPQExpBuffer to allow
+ * multiple rows to be accumulated in a single buffer.
+ */
+static void
+appendBranch(PQExpBufferData *sql, int64 curr)
+{
+ /* "filler" column uses NULL */
+ appendPQExpBuffer(sql,
+ INT64_FORMAT "\t0\t\\N\n",
+ curr + 1);
+}
+
+static void
+appendTeller(PQExpBufferData *sql, int64 curr)
+{
+ /* "filler" column uses NULL */
+ appendPQExpBuffer(sql,
+ INT64_FORMAT "\t" INT64_FORMAT
"\t0\t\\N\n",
+ curr + 1, curr / ntellers + 1);
+}
+
static void
-initPopulateTable(PGconn *con, const char *table, int64 base,
+appendAccount(PQExpBufferData *sql, int64 curr)
+{
+ /* "filler" column defaults to blank padded empty string */
+ appendPQExpBuffer(sql,
+ INT64_FORMAT "\t" INT64_FORMAT
"\t0\t\n",
+ curr + 1, curr / naccounts + 1);
+}
+
+static void
+initPopulateTableSerial(PGconn *con, const char *table, int64 base,
initRowMethod init_row)
{
int n;
@@ -5079,7 +5432,7 @@ initPopulateTable(PGconn *con, const char *table, int64
base,
int prev_chars = 0;
PGresult *res;
PQExpBufferData sql;
- char copy_statement[256];
+ char copy_statement[NAMEDATALEN + 32];
const char *copy_statement_fmt = "copy %s from stdin";
int64 total = base * scale;
@@ -5188,6 +5541,27 @@ initPopulateTable(PGconn *con, const char *table, int64
base,
termPQExpBuffer(&sql);
}
+static void
+initPopulateTable(PGconn *con, const char *table, int64 total_rows,
+ initRowMethod init_row, initRowMethod
append_row, bool use_parallel)
+{
+ bool is_accounts = (strcmp(table, "pgbench_accounts") == 0);
+
+ if (use_parallel && nthreads > 1)
+ initPopulateTableParallel(con, nthreads, table, total_rows *
scale, append_row);
+ else
+ {
+ /*
+ * For single-threaded mode with partitioned tables, attach
partitions
+ * before loading data so COPY to the parent table can route
rows.
+ */
+ if (is_accounts && partitions > 0 && partition_method !=
PART_NONE)
+ attachPartitions(con);
+
+ initPopulateTableSerial(con, table, total_rows, init_row);
+ }
+}
+
/*
* Fill the standard tables with some data generated and sent from the client.
*
@@ -5200,8 +5574,9 @@ initGenerateDataClientSide(PGconn *con)
fprintf(stderr, "generating data (client-side)...\n");
/*
- * we do all of this in one transaction to enable the backend's
- * data-loading optimizations
+ * For single-threaded mode, do everything in one transaction.
+ * For multi-threaded mode, do branches/tellers/history in one
transaction,
+ * then accounts in parallel (each thread handles its own transaction).
*/
executeStatement(con, "begin");
@@ -5212,11 +5587,16 @@ initGenerateDataClientSide(PGconn *con)
* fill branches, tellers, accounts in that order in case foreign keys
* already exist
*/
- initPopulateTable(con, "pgbench_branches", nbranches, initBranch);
- initPopulateTable(con, "pgbench_tellers", ntellers, initTeller);
- initPopulateTable(con, "pgbench_accounts", naccounts, initAccount);
+ initPopulateTable(con, "pgbench_branches", nbranches, initBranch,
appendBranch, false);
+ initPopulateTable(con, "pgbench_tellers", ntellers, initTeller,
appendTeller, false);
- executeStatement(con, "commit");
+ if (nthreads > 1)
+ executeStatement(con, "commit");
+
+ initPopulateTable(con, "pgbench_accounts", naccounts, initAccount,
appendAccount, nthreads > 1);
+
+ if (nthreads == 1)
+ executeStatement(con, "commit");
}
/*
@@ -5242,6 +5622,9 @@ initGenerateDataServerSide(PGconn *con)
/* truncate away any old data */
initTruncateTables(con);
+ if (partitions > 0 && partition_method != PART_NONE)
+ attachPartitions(con);
+
initPQExpBuffer(&sql);
printfPQExpBuffer(&sql,
@@ -6989,7 +7372,6 @@ main(int argc, char **argv)
initialization_option_set = true;
break;
case 'j': /* jobs */
- benchmarking_option_set = true;
if (!option_parse_int(optarg, "-j/--jobs", 1,
INT_MAX,
&nthreads))
{
@@ -7221,7 +7603,7 @@ main(int argc, char **argv)
* optimization; throttle_delay is calculated incorrectly below if some
* threads have no clients assigned to them.)
*/
- if (nthreads > nclients)
+ if (nthreads > nclients && !is_init_mode)
nthreads = nclients;
/*
@@ -7266,6 +7648,9 @@ main(int argc, char **argv)
if (partitions > 0 && partition_method == PART_NONE)
partition_method = PART_RANGE;
+ if (partition_method == PART_HASH && nthreads > 1)
+ pg_fatal("parallel data loading (-j) is not supported
with hash partitioning");
+
if (initialize_steps == NULL)
initialize_steps = pg_strdup(DEFAULT_INIT_STEPS);
--
2.39.5 (Apple Git-154)