pg_dump runs parallel workers in two completely different ways. On
non-Windows platforms, they're forked processes communicating with the
leader over pipes-- which is what pipes and processes are for. Windows
has no fork(), so there the workers are threads. But instead of
coordinating those threads as threads, the Windows port runs the same
process-based protocol on top of them, unchanged. Each worker's channel
to the leader is a loopback TCP socketpair on 127.0.0.1, opened when the
worker starts. So to tell a worker "dump table 1234," the leader
serializes the command to a string and writes it down that socket; the
worker reads it back a byte at a time, and the leader watches the
sockets with select() to see who's done. All of it to hand work to a
thread a few megabytes away in the same address space.
And because the leader waits on the workers with WaitForMultipleObjects,
which takes at most 64 handles, you can't run more than 64 jobs on
Windows. The parallelism limit there is a limit of the wait call. (The
non-Windows side doesn't have this limit-- it reaps workers with wait()
rather than WaitForMultipleObjects, so PG_MAX_JOBS is INT_MAX.)
None of this is broken; it works. It's threads pretending to be
processes because the code was written for processes, and the port kept
the protocol rather than rethinking it. I'd like to stop.
One model everywhere should be threads on all platforms, coordinated by
an in-process work queue-- a mutex and a couple of condition variables--
instead of two worker models bridged by an inter-process protocol.
To be clear, the unification is on the queue, not on what Windows does
today. Teaching the non-Windows side to talk to its own threads over a
socket, a byte at a time, would just be the same trick on more
platforms-- that's the part worth deleting, not copying.
I've done the Windows half, both to prove it out and because it's the
coordination layer the non-Windows side would adopt. The socket protocol
is gone; the leader hands work to a worker in memory instead of down a
loopback connection. The 64-job cap is gone. The unchecked
_beginthreadex return-- which on failure recorded a thread that didn't
exist as an idle worker-- is fixed. Dump and restore are byte-for-byte
identical to stock from -j2 through -j250. The non-Windows port to
threads isn't written yet, and I won't start it until the direction is
settled.
One piece I deliberately left for the unified version: the queue still
passes the command as a string-- "DUMP 1234"-- and the worker parses it
back and looks the ID up to recover the TocEntry it already came from.
In one address space that's ceremony; the queue could carry a {
T_Action, TocEntry * } and drop the serialize/parse/lookup entirely. I
didn't do it on Windows because the non-Windows side still forks, and a
pointer is meaningless across a process boundary. There, the string is
the serializer that path needs, so converting it to Windows-only would
add a second message format instead of removing one. With threading in
place across Windows and non-Windows, we can pass the work item directly
and delete buildWorkerCommand/parseWorkerCommand and the matching
response pair, plus the dumpId lookup, on every platform at once.
The cost is crash isolation. fork() gives each non-Windows worker its
own address space, so one that segfaults can't corrupt the leader or its
siblings; threads give that up. What it actually buys today is narrow.
The moment any worker dies, the leader pg_fatal()s and the whole dump
comes down, so processes don't give you recovery-- only the guarantee
that a corrupt worker can't scribble on a sibling's output before
everyone exits. Windows has run without even that for years. It's an
acceptable trade for a single implementation, but it's the real cost.
I'll say plainly that this fixes no user-visible bug, and nothing is
broken today. It's consolidation. There are two implementations of
parallel dump right now, and they drift: the Windows one grew the socket
emulation and the 64-job cap out of running a process protocol on
threads, and it carried bugs the non-Windows side never had, like the
_beginthreadex one above. One model means a fix or a feature lands once,
on a path exercised on every platform, instead of twice, with a seam
down the middle. Windows already shows the threaded model works here,
and threads are the half both sides can share, since Windows can't fork.
The Windows port originally kept the process shape to stay consistent
with the other platforms; this keeps that same goal, on the model that
actually ports.
The strongest evidence of the thread-safety of the worker-reachable
code-- Windows has run that path with threads for years. With the thread
rework, fmtId's static return value, is now _Thread_local. The global
state a worker reads is built during the single-threaded catalog phase,
before any worker exists.
Patches are attached-- 0001 and 0002 are independent and can be
committed separately; 0004 depends on 0003.
--
Bryan Green
EDB: https://www.enterprisedb.com
From 9b4696bb7f24b2ef22e204aa3617c9ad81137476 Mon Sep 17 00:00:00 2001
From: Bryan Green <[email protected]>
Date: Sun, 28 Jun 2026 10:54:54 -0500
Subject: [PATCH v1 1/4] pg_dump: check for _beginthreadex() failure in
parallel dump
ParallelBackupStart() stored _beginthreadex()'s return value as the
worker's thread handle without checking it. On failure that value is 0,
which would later reach WaitForMultipleObjects() as a null handle, caught
only by an Assert. The fork() path already calls pg_fatal() when it
fails; do the same for _beginthreadex(), as pgbench does.
---
src/bin/pg_dump/parallel.c | 2 ++
1 file changed, 2 insertions(+)
diff --git a/src/bin/pg_dump/parallel.c b/src/bin/pg_dump/parallel.c
index 3e84b881ca..b77d2650df 100644
--- a/src/bin/pg_dump/parallel.c
+++ b/src/bin/pg_dump/parallel.c
@@ -976,6 +976,8 @@ ParallelBackupStart(ArchiveHandle *AH)
handle = _beginthreadex(NULL, 0, (void *)
&init_spawned_worker_win32,
wi, 0,
&(slot->threadId));
+ if (handle == 0)
+ pg_fatal("could not create worker thread: %m");
slot->hThread = handle;
slot->workerStatus = WRKR_IDLE;
#else /* !WIN32 */
--
2.54.0.windows.1
From 3c1b8510a546a504dd583c7f0979f645c30b8ac6 Mon Sep 17 00:00:00 2001
From: Bryan Green <[email protected]>
Date: Sun, 28 Jun 2026 11:40:58 -0500
Subject: [PATCH v1 2/4] Give fmtId()'s temporary buffer thread-local storage
getLocalPQExpBuffer() returns a function-scope static buffer, unsafe when
fmtId() and fmtQualifiedId() run in multiple threads. On Windows, where
pg_dump's parallel workers are threads, this was patched over at runtime
by swapping in a TLS-based buffer from ParallelBackupStart().
Mark the default buffer C11 _Thread_local instead. Each thread gets its
own, so the Windows TlsAlloc/TlsGetValue workaround and the
getLocalPQExpBuffer function-pointer swap can go.
---
src/bin/pg_dump/parallel.c | 52 -------------------------------------
src/fe_utils/string_utils.c | 7 ++---
2 files changed, 4 insertions(+), 55 deletions(-)
diff --git a/src/bin/pg_dump/parallel.c b/src/bin/pg_dump/parallel.c
index b77d2650df..12b462375d 100644
--- a/src/bin/pg_dump/parallel.c
+++ b/src/bin/pg_dump/parallel.c
@@ -193,9 +193,6 @@ static CRITICAL_SECTION signal_info_lock;
#ifdef WIN32
-/* file-scope variables */
-static DWORD tls_index;
-
/* globally visible variables (needed by exit_nicely) */
bool parallel_init_done = false;
DWORD mainThreadId;
@@ -243,8 +240,6 @@ init_parallel_dump_utils(void)
WSADATA wsaData;
int err;
- /* Prepare for threaded operation */
- tls_index = TlsAlloc();
mainThreadId = GetCurrentThreadId();
/* Initialize socket access */
@@ -280,48 +275,6 @@ GetMyPSlot(ParallelState *pstate)
return NULL;
}
-/*
- * A thread-local version of getLocalPQExpBuffer().
- *
- * Non-reentrant but reduces memory leakage: we'll consume one buffer per
- * thread, which is much better than one per fmtId/fmtQualifiedId call.
- */
-#ifdef WIN32
-static PQExpBuffer
-getThreadLocalPQExpBuffer(void)
-{
- /*
- * The Tls code goes awry if we use a static var, so we provide for both
- * static and auto, and omit any use of the static var when using Tls.
We
- * rely on TlsGetValue() to return 0 if the value is not yet set.
- */
- static PQExpBuffer s_id_return = NULL;
- PQExpBuffer id_return;
-
- if (parallel_init_done)
- id_return = (PQExpBuffer) TlsGetValue(tls_index);
- else
- id_return = s_id_return;
-
- if (id_return) /* first time through? */
- {
- /* same buffer, just wipe contents */
- resetPQExpBuffer(id_return);
- }
- else
- {
- /* new buffer */
- id_return = createPQExpBuffer();
- if (parallel_init_done)
- TlsSetValue(tls_index, id_return);
- else
- s_id_return = id_return;
- }
-
- return id_return;
-}
-#endif /* WIN32 */
-
/*
* pg_dump and pg_restore call this to register the cleanup handler
* as soon as they've created the ArchiveHandle.
@@ -918,11 +871,6 @@ ParallelBackupStart(ArchiveHandle *AH)
pstate->parallelSlot =
pg_malloc0_array(ParallelSlot, pstate->numWorkers);
-#ifdef WIN32
- /* Make fmtId() and fmtQualifiedId() use thread-local storage */
- getLocalPQExpBuffer = getThreadLocalPQExpBuffer;
-#endif
-
/*
* Set the pstate in shutdown_info, to tell the exit handler that it
must
* clean up workers as well as the main database connection. But we
don't
diff --git a/src/fe_utils/string_utils.c b/src/fe_utils/string_utils.c
index 7a762251f3..b3738c25db 100644
--- a/src/fe_utils/string_utils.c
+++ b/src/fe_utils/string_utils.c
@@ -34,14 +34,15 @@ static int fmtIdEncoding = -1;
* Returns a temporary PQExpBuffer, valid until the next call to the function.
* This is used by fmtId and fmtQualifiedId.
*
- * Non-reentrant and non-thread-safe but reduces memory leakage. You can
- * replace this with a custom version by setting the getLocalPQExpBuffer
+ * The buffer is thread-local, so this is safe to call from multiple threads
+ * at once; each thread gets its own. Not reentrant within a thread, but it
+ * reduces memory leakage. Replace it by setting the getLocalPQExpBuffer
* function pointer.
*/
static PQExpBuffer
defaultGetLocalPQExpBuffer(void)
{
- static PQExpBuffer id_return = NULL;
+ static _Thread_local PQExpBuffer id_return = NULL;
if (id_return) /* first time through? */
{
--
2.54.0.windows.1
From 4051dfd56fa68948c3864634f8e20fa4d386e94e Mon Sep 17 00:00:00 2001
From: Bryan Green <[email protected]>
Date: Sun, 28 Jun 2026 12:05:10 -0500
Subject: [PATCH v1 3/4] pg_dump: dispatch parallel workers in-process on
Windows
On Windows the parallel workers are threads in the leader's process, but
they reached the leader through the same transport as Unix worker
processes: pgpipe() (a loopback TCP socket pair), select(), and a
byte-at-a-time string protocol. That is pointless when the workers share
the leader's address space.
Give each ParallelSlot an in-process channel instead -- a command slot
and a response slot under a critical section, with one condition variable
to wake the worker and one to wake the leader. Messages pass as malloc'd
strings; dispatch writes to memory rather than a socket.
The pipe also signalled worker death through EOF. The channel has none,
and exit_nicely() ends only the worker thread, so a failed worker -- for
instance a connection that fails when -j exceeds max_connections -- would
hang the leader forever. A dying worker now sets slot->workerDied and
wakes the leader, and getMessageFromWorker() returns NULL just as the
Unix path does on EOF. For the leader to name the dead worker, each
worker records its own thread id on entry rather than trusting
_beginthreadex()'s output argument, which can lag the thread's start.
The Unix path is untouched; the now-unused pgpipe(), select_loop(), and
readMessageFromPipe() stay until the worker model is unified on threads.
---
src/bin/pg_dump/parallel.c | 197 +++++++++++++++++++++++++++++++------
1 file changed, 167 insertions(+), 30 deletions(-)
diff --git a/src/bin/pg_dump/parallel.c b/src/bin/pg_dump/parallel.c
index 12b462375d..552a8c149e 100644
--- a/src/bin/pg_dump/parallel.c
+++ b/src/bin/pg_dump/parallel.c
@@ -107,6 +107,19 @@ struct ParallelSlot
int pipeRevRead; /* child's end of the pipes */
int pipeRevWrite;
+#ifdef WIN32
+
+ /*
+ * In-process channel used instead of the pipes when workers are
threads.
+ * A message is a malloc'd string; ownership passes to the receiver.
+ * Protected by msg_lock.
+ */
+ char *cmdMsg; /* command pending for the
worker, or NULL */
+ char *respMsg; /* response pending for the leader, or
NULL */
+ bool chanClosed; /* leader closed the command
channel (EOF) */
+ bool workerDied; /* worker exited without
sending a response */
+#endif
+
/* Child process/thread identity info: */
#ifdef WIN32
uintptr_t hThread;
@@ -176,6 +189,15 @@ static volatile DumpSignalInformation signal_info;
#ifdef WIN32
static CRITICAL_SECTION signal_info_lock;
+
+/*
+ * Synchronization for the in-process channels (see struct ParallelSlot).
+ * msg_lock protects the per-slot cmdMsg/respMsg/chanClosed/workerDied fields;
+ * worker_cv wakes a worker, leader_cv wakes the leader.
+ */
+static CRITICAL_SECTION msg_lock;
+static CONDITION_VARIABLE worker_cv;
+static CONDITION_VARIABLE leader_cv;
#endif
/*
@@ -210,11 +232,11 @@ static void RunWorker(ArchiveHandle *AH, ParallelSlot
*slot);
static int GetIdleWorker(ParallelState *pstate);
static bool HasEveryWorkerTerminated(ParallelState *pstate);
static void lockTableForWorker(ArchiveHandle *AH, TocEntry *te);
-static void WaitForCommands(ArchiveHandle *AH, int pipefd[2]);
+static void WaitForCommands(ArchiveHandle *AH, ParallelSlot *slot);
static bool ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate,
bool do_wait);
-static char *getMessageFromLeader(int pipefd[2]);
-static void sendMessageToLeader(int pipefd[2], const char *str);
+static char *getMessageFromLeader(ParallelSlot *slot);
+static void sendMessageToLeader(ParallelSlot *slot, const char *str);
static int select_loop(int maxFd, fd_set *workerset);
static char *getMessageFromWorker(ParallelState *pstate,
bool do_wait,
int *worker);
@@ -242,6 +264,11 @@ init_parallel_dump_utils(void)
mainThreadId = GetCurrentThreadId();
+ /* Initialize the in-process message-channel synchronization */
+ InitializeCriticalSection(&msg_lock);
+ InitializeConditionVariable(&worker_cv);
+ InitializeConditionVariable(&leader_cv);
+
/* Initialize socket access */
err = WSAStartup(MAKEWORD(2, 2), &wsaData);
if (err != 0)
@@ -314,20 +341,21 @@ archive_close_connection(int code, void *arg)
else
{
/*
- * We're a worker. Shut down our own DB connection if
any. On
- * Windows, we also have to close our communication
sockets, to
- * emulate what will happen on Unix when the worker
process exits.
- * (Without this, if this is a premature exit, the
leader would
- * fail to detect it because there would be no EOF
condition on
- * the other end of the pipe.)
+ * We're a worker. Shut down our own DB connection if
any.
*/
if (slot->AH)
DisconnectDatabase(&(slot->AH->public));
-#ifdef WIN32
- closesocket(slot->pipeRevRead);
- closesocket(slot->pipeRevWrite);
-#endif
+ /*
+ * Tell the leader we're gone so it stops waiting for
our reply. On
+ * Unix the worker's exit closes the pipe and the
leader sees EOF;
+ * the channel has no EOF, and exit_nicely() ends only
this thread,
+ * so signal it here.
+ */
+ EnterCriticalSection(&msg_lock);
+ slot->workerDied = true;
+ WakeAllConditionVariable(&leader_cv);
+ LeaveCriticalSection(&msg_lock);
}
}
else
@@ -351,6 +379,17 @@ ShutdownWorkersHard(ParallelState *pstate)
{
int i;
+ /*
+ * Tell any workers that are waiting for commands that they can exit.
+ */
+#ifdef WIN32
+ EnterCriticalSection(&msg_lock);
+ for (i = 0; i < pstate->numWorkers; i++)
+ pstate->parallelSlot[i].chanClosed = true;
+ WakeAllConditionVariable(&worker_cv);
+ LeaveCriticalSection(&msg_lock);
+#else
+
/*
* Close our write end of the sockets so that any workers waiting for
* commands know they can exit. (Note: some of the pipeWrite fields
might
@@ -359,6 +398,7 @@ ShutdownWorkersHard(ParallelState *pstate)
*/
for (i = 0; i < pstate->numWorkers; i++)
closesocket(pstate->parallelSlot[i].pipeWrite);
+#endif
/*
* Force early termination of any commands currently in progress.
@@ -783,12 +823,6 @@ set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle
*AH)
static void
RunWorker(ArchiveHandle *AH, ParallelSlot *slot)
{
- int pipefd[2];
-
- /* fetch child ends of pipes */
- pipefd[PIPE_READ] = slot->pipeRevRead;
- pipefd[PIPE_WRITE] = slot->pipeRevWrite;
-
/*
* Clone the archive so that we have our own state to work with, and in
* particular our own database connection.
@@ -811,7 +845,7 @@ RunWorker(ArchiveHandle *AH, ParallelSlot *slot)
/*
* Execute commands until done.
*/
- WaitForCommands(AH, pipefd);
+ WaitForCommands(AH, slot);
/*
* Disconnect from database and clean up.
@@ -834,6 +868,15 @@ init_spawned_worker_win32(WorkerInfo *wi)
/* Don't need WorkerInfo anymore */
free(wi);
+ /*
+ * Record our thread id so GetMyPSlot() can tell us from the leader. Do
+ * this before anything that might call exit_nicely(): the cleanup
handler
+ * uses GetMyPSlot(), and mistaking a failing worker for the leader
+ * deadlocks shutdown. We can't trust the leader to have stored the id
+ * from _beginthreadex() yet, since this thread may run before that
returns.
+ */
+ slot->threadId = GetCurrentThreadId();
+
/* Run the worker ... */
RunWorker(AH, slot);
@@ -899,11 +942,12 @@ ParallelBackupStart(ArchiveHandle *AH)
uintptr_t handle;
#else
pid_t pid;
-#endif
- ParallelSlot *slot = &(pstate->parallelSlot[i]);
int pipeMW[2],
pipeWM[2];
+#endif
+ ParallelSlot *slot = &(pstate->parallelSlot[i]);
+#ifndef WIN32
/* Create communication pipes for this worker */
if (pgpipe(pipeMW) < 0 || pgpipe(pipeWM) < 0)
pg_fatal("could not create communication channels: %m");
@@ -914,6 +958,7 @@ ParallelBackupStart(ArchiveHandle *AH)
/* child's ends of the pipes */
slot->pipeRevRead = pipeMW[PIPE_READ];
slot->pipeRevWrite = pipeWM[PIPE_WRITE];
+#endif
#ifdef WIN32
/* Create transient structure to pass args to worker function */
@@ -922,8 +967,13 @@ ParallelBackupStart(ArchiveHandle *AH)
wi->AH = AH;
wi->slot = slot;
+ /*
+ * The worker stores its own thread id (see
init_spawned_worker_win32),
+ * so don't ask _beginthreadex() to report it -- that would
race its
+ * store.
+ */
handle = _beginthreadex(NULL, 0, (void *)
&init_spawned_worker_win32,
- wi, 0,
&(slot->threadId));
+ wi, 0, NULL);
if (handle == 0)
pg_fatal("could not create worker thread: %m");
slot->hThread = handle;
@@ -1019,12 +1069,21 @@ ParallelBackupEnd(ArchiveHandle *AH, ParallelState
*pstate)
/* There should not be any unfinished jobs */
Assert(IsEveryWorkerIdle(pstate));
+ /* Tell the workers they can exit */
+#ifdef WIN32
+ EnterCriticalSection(&msg_lock);
+ for (i = 0; i < pstate->numWorkers; i++)
+ pstate->parallelSlot[i].chanClosed = true;
+ WakeAllConditionVariable(&worker_cv);
+ LeaveCriticalSection(&msg_lock);
+#else
/* Close the sockets so that the workers know they can exit */
for (i = 0; i < pstate->numWorkers; i++)
{
closesocket(pstate->parallelSlot[i].pipeRead);
closesocket(pstate->parallelSlot[i].pipeWrite);
}
+#endif
/* Wait for them to exit */
WaitForTerminatingWorkers(pstate);
@@ -1285,7 +1344,7 @@ lockTableForWorker(ArchiveHandle *AH, TocEntry *te)
* Read and execute commands from the leader until we see EOF on the pipe.
*/
static void
-WaitForCommands(ArchiveHandle *AH, int pipefd[2])
+WaitForCommands(ArchiveHandle *AH, ParallelSlot *slot)
{
char *command;
TocEntry *te;
@@ -1295,7 +1354,7 @@ WaitForCommands(ArchiveHandle *AH, int pipefd[2])
for (;;)
{
- if (!(command = getMessageFromLeader(pipefd)))
+ if (!(command = getMessageFromLeader(slot)))
{
/* EOF, so done */
return;
@@ -1323,7 +1382,7 @@ WaitForCommands(ArchiveHandle *AH, int pipefd[2])
/* Return status to leader */
buildWorkerResponse(AH, te, act, status, buf, sizeof(buf));
- sendMessageToLeader(pipefd, buf);
+ sendMessageToLeader(slot, buf);
/* command was pg_malloc'd and we are responsible for free()ing
it. */
free(command);
@@ -1465,9 +1524,21 @@ WaitForWorkers(ArchiveHandle *AH, ParallelState *pstate,
WFW_WaitOption mode)
* This function is executed in worker processes.
*/
static char *
-getMessageFromLeader(int pipefd[2])
+getMessageFromLeader(ParallelSlot *slot)
{
- return readMessageFromPipe(pipefd[PIPE_READ]);
+#ifdef WIN32
+ char *msg;
+
+ EnterCriticalSection(&msg_lock);
+ while (slot->cmdMsg == NULL && !slot->chanClosed)
+ SleepConditionVariableCS(&worker_cv, &msg_lock, INFINITE);
+ msg = slot->cmdMsg; /* NULL here means the channel
was closed */
+ slot->cmdMsg = NULL;
+ LeaveCriticalSection(&msg_lock);
+ return msg;
+#else
+ return readMessageFromPipe(slot->pipeRevRead);
+#endif
}
/*
@@ -1476,12 +1547,20 @@ getMessageFromLeader(int pipefd[2])
* This function is executed in worker processes.
*/
static void
-sendMessageToLeader(int pipefd[2], const char *str)
+sendMessageToLeader(ParallelSlot *slot, const char *str)
{
+#ifdef WIN32
+ EnterCriticalSection(&msg_lock);
+ Assert(slot->respMsg == NULL);
+ slot->respMsg = pg_strdup(str);
+ WakeAllConditionVariable(&leader_cv);
+ LeaveCriticalSection(&msg_lock);
+#else
int len = strlen(str) + 1;
- if (pipewrite(pipefd[PIPE_WRITE], str, len) != len)
+ if (pipewrite(slot->pipeRevWrite, str, len) != len)
pg_fatal("could not write to the communication channel: %m");
+#endif
}
/*
@@ -1530,6 +1609,53 @@ select_loop(int maxFd, fd_set *workerset)
static char *
getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker)
{
+#ifdef WIN32
+ int i;
+
+ /*
+ * Return the first pending response; if none and do_wait, sleep on
+ * leader_cv until a worker posts one.
+ */
+ EnterCriticalSection(&msg_lock);
+ for (;;)
+ {
+ bool anyDied = false;
+
+ for (i = 0; i < pstate->numWorkers; i++)
+ {
+ char *msg;
+
+ if
(!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
+ continue;
+ msg = pstate->parallelSlot[i].respMsg;
+ if (msg != NULL)
+ {
+ pstate->parallelSlot[i].respMsg = NULL;
+ LeaveCriticalSection(&msg_lock);
+ *worker = i;
+ return msg;
+ }
+ if (pstate->parallelSlot[i].workerDied)
+ anyDied = true;
+ }
+
+ /*
+ * A worker died without responding: return NULL as the Unix
path does
+ * on EOF, so the caller reports the failure instead of hanging.
+ */
+ if (anyDied)
+ {
+ LeaveCriticalSection(&msg_lock);
+ return NULL;
+ }
+ if (!do_wait)
+ {
+ LeaveCriticalSection(&msg_lock);
+ return NULL;
+ }
+ SleepConditionVariableCS(&leader_cv, &msg_lock, INFINITE);
+ }
+#else
int i;
fd_set workerset;
int maxFd = -1;
@@ -1585,6 +1711,7 @@ getMessageFromWorker(ParallelState *pstate, bool do_wait,
int *worker)
}
Assert(false);
return NULL;
+#endif
}
/*
@@ -1595,12 +1722,22 @@ getMessageFromWorker(ParallelState *pstate, bool
do_wait, int *worker)
static void
sendMessageToWorker(ParallelState *pstate, int worker, const char *str)
{
+#ifdef WIN32
+ ParallelSlot *slot = &pstate->parallelSlot[worker];
+
+ EnterCriticalSection(&msg_lock);
+ Assert(slot->cmdMsg == NULL);
+ slot->cmdMsg = pg_strdup(str);
+ WakeAllConditionVariable(&worker_cv);
+ LeaveCriticalSection(&msg_lock);
+#else
int len = strlen(str) + 1;
if (pipewrite(pstate->parallelSlot[worker].pipeWrite, str, len) != len)
{
pg_fatal("could not write to the communication channel: %m");
}
+#endif
}
/*
--
2.54.0.windows.1
From 7faf95ec97cf1228674338db9cf1565d8ca236ed Mon Sep 17 00:00:00 2001
From: Bryan Green <[email protected]>
Date: Sun, 28 Jun 2026 13:24:28 -0500
Subject: [PATCH v1 4/4] pg_dump: allow more than 64 parallel jobs on Windows
The Windows worker join passed every running worker's handle to one
WaitForMultipleObjects() call, which waits on at most
MAXIMUM_WAIT_OBJECTS (64). That capped PG_MAX_JOBS at 64 on Windows.
With dispatch no longer using select()/FD_SETSIZE, this was the last
thing forcing the limit.
Wait on at most MAXIMUM_WAIT_OBJECTS handles per iteration. One worker
is reaped per iteration and the handle set is rebuilt from those still
running, so workers past the first batch are caught on later passes;
this is safe because every worker has been told to exit before the wait
begins. Raise PG_MAX_JOBS to INT_MAX on all platforms.
---
src/bin/pg_dump/parallel.c | 12 +++++++++---
src/bin/pg_dump/parallel.h | 8 --------
2 files changed, 9 insertions(+), 11 deletions(-)
diff --git a/src/bin/pg_dump/parallel.c b/src/bin/pg_dump/parallel.c
index 552a8c149e..d98ae70883 100644
--- a/src/bin/pg_dump/parallel.c
+++ b/src/bin/pg_dump/parallel.c
@@ -461,13 +461,19 @@ WaitForTerminatingWorkers(ParallelState *pstate)
}
}
#else /* WIN32 */
- /* On Windows, we must use WaitForMultipleObjects() */
- HANDLE *lpHandles = pg_malloc_array(HANDLE,
pstate->numWorkers);
+
+ /*
+ * WaitForMultipleObjects() waits on at most
MAXIMUM_WAIT_OBJECTS
+ * handles. Watch the first batch; we reap one per iteration
and
+ * rescan, so any beyond it are caught on later passes. Safe
because
+ * every worker has already been told to exit.
+ */
+ HANDLE *lpHandles = pg_malloc_array(HANDLE,
MAXIMUM_WAIT_OBJECTS);
int nrun = 0;
DWORD ret;
uintptr_t hThread;
- for (j = 0; j < pstate->numWorkers; j++)
+ for (j = 0; j < pstate->numWorkers && nrun <
MAXIMUM_WAIT_OBJECTS; j++)
{
if
(WORKER_IS_RUNNING(pstate->parallelSlot[j].workerStatus))
{
diff --git a/src/bin/pg_dump/parallel.h b/src/bin/pg_dump/parallel.h
index f7557cd089..5754e13eaa 100644
--- a/src/bin/pg_dump/parallel.h
+++ b/src/bin/pg_dump/parallel.h
@@ -37,16 +37,8 @@ typedef enum
/*
* Maximum number of parallel jobs allowed.
- *
- * On Windows we can only have at most MAXIMUM_WAIT_OBJECTS (= 64 usually)
- * parallel jobs because that's the maximum limit for the
- * WaitForMultipleObjects() call.
*/
-#ifdef WIN32
-#define PG_MAX_JOBS MAXIMUM_WAIT_OBJECTS
-#else
#define PG_MAX_JOBS INT_MAX
-#endif
/* ParallelSlot is an opaque struct known only within parallel.c */
typedef struct ParallelSlot ParallelSlot;
--
2.54.0.windows.1