On 19.11.2025 21:03, Tomas Vondra wrote:
> Right, that's why I suggested to have a function the nodes would call in
> suitable places.
>
>>>> I like that idea, even though it would still not work while a node is
>>>> doing the crunching. That is after it has pulled all rows and before it
>>>> can return the first row. During this time the node won't call
>>>> ExecProcNode().
>>>>
>>> True. Perhaps we could provide a function nodes could call in suitable
>>> places to check whether to end?
>> This function would then also be required by the base relation scans.
>> And we would have to call it more or less in the same places
>> CHECK_FOR_INTERRUPTS() is called today.
>>
>
> Yes, but I don't think CHECK_FOR_INTERRUPTS() would be a good place to
> manipulate the executor state. Maybe you could do some magic with
> siglongjmp(), but I have "funny" feeling about that - I wouldn't be
> surprised if that interfered with elog(), which is the only other place
> using siglongjmp() AFAICS.
You had the right intuition. siglongjmp-ing out leaves behind per-node
instrumentation state and CurrentMemoryContext in an unexpected state.
Example: jumping out of the executor, after we've called
InstrStartNode(), but before we call InstrStopNode(). Subsequently
calling InstrEndLoop() will give the error you encountered. A similar
problem exists for CurrentMemoryContext which is not properly reset.
I didn't encounter these issues during my testing because they're
largely timing dependent. Execution can end before the other workers
have started executing. So the stopping logic didn't kick in.
Both issues can be accounted for when jumping out but this seems
somewhat hacky.
> Which is why I suggested maybe it should be handled in execProcnode
> (which would take care of cases where we produce a tuple), and then let
> nodes to optionally check the flag too (through a new function).
>
> I haven't tried doing this, so maybe I'm missing something ...
>
>> Beyond that, code such as heap_nextslot() or index_getnext_slot() don't
>> have access to the PlanState which would contain the stop flag. So that
>> would have to be propagated downwards as well.
>>
>> All of that would make for a fairly big patch, which the initial patch
>> avoids.
This turned out to be false. See below.
> Right. I don't think we can set the flag in plan/executor state, because
> that's not available in signal handler / ProcessInterrupts() ... It'd
> need to be a global variable, I guess.
What we can do is use a global variable. That also makes checking the
flag a lot easier because we don't need to pass it around through
multiple abstraction layers.
What needs to be taken care of though is to only bail from scans that
are actually initiated from plan nodes. There are many places in the
code that use e.g. the table AM API directly. We don't want to bail from
these scans. Without flagging if a scan should bail or not, e.g.
ScanPgRelation() will return no tuple and therefore relcache code starts
failing.
The new patch accounts for that by introducing a new TableScanDescData
flag SO_OBEY_PARALLEL_WORKER_STOP, which indicates if the scan should
adhere to the parallel worker stop or not.
Stopping is broadcasted to all parallel workers via SendProcSignal().
The stop variable is checked with the new
CHECK_FOR_PARALLEL_WORKER_STOP() macro.
In the PoC implementation I've for now only changed nodeSeqScan.c. If
there's agreement to pursue this approach, I'll change the other places
as well. Naming can also likely be still improved.
--
David Geier
From d056801ec55eaf95829f9f09e1675a47e8d57b5b Mon Sep 17 00:00:00 2001
From: David Geier <[email protected]>
Date: Tue, 2 Sep 2025 13:24:45 +0200
Subject: [PATCH] Parallel workers stop quicker
In the presence of a LIMIT N clause, the executor stops as soon as it got
enough rows and shuts down the plan. In the serial case the query ends
immediately. If the query happens to be parallel, the workers only
exit if they produced N rows, regardless of how many rows got already
produced by other participants.
Worst-case example: a query has a LIMIT 1 clause and scans a large table
where only a single row qualifies. The first worker that finds the
matching row will return that row and terminate. All other workers will
keep running until the table is scanned to completion.
This change signals all workers to stop execution once the leader got
enough rows.
---
src/backend/access/heap/heapam.c | 7 ++++++
src/backend/executor/execParallel.c | 23 +++++++++++++++---
src/backend/executor/nodeGather.c | 36 ++++++++++++++++++++++++++++
src/backend/executor/nodeSeqscan.c | 2 ++
src/backend/storage/ipc/procsignal.c | 4 ++++
src/backend/tcop/postgres.c | 4 ++++
src/include/access/tableam.h | 3 +++
src/include/executor/execParallel.h | 8 +++++++
src/include/nodes/execnodes.h | 1 +
src/include/storage/procsignal.h | 1 +
10 files changed, 86 insertions(+), 3 deletions(-)
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 4b0c49f4bb0..b3bb17c7aad 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -39,6 +39,7 @@
#include "access/syncscan.h"
#include "access/valid.h"
#include "access/visibilitymap.h"
+#include "executor/execParallel.h"
#include "access/xloginsert.h"
#include "catalog/pg_database.h"
#include "catalog/pg_database_d.h"
@@ -917,6 +918,9 @@ heapgettup(HeapScanDesc scan,
*/
while (true)
{
+ if ((scan->rs_base.rs_flags & SO_OBEY_PARALLEL_WORKER_STOP) != 0 && IS_PARALLEL_WORKER_STOP())
+ break;
+
heap_fetch_next_buffer(scan, dir);
/* did we run out of blocks to scan? */
@@ -1034,6 +1038,9 @@ heapgettup_pagemode(HeapScanDesc scan,
*/
while (true)
{
+ if ((scan->rs_base.rs_flags & SO_OBEY_PARALLEL_WORKER_STOP) != 0 && IS_PARALLEL_WORKER_STOP())
+ break;
+
heap_fetch_next_buffer(scan, dir);
/* did we run out of blocks to scan? */
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index f098a5557cf..5e20e33efcf 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -1410,6 +1410,23 @@ ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt)
pwcxt);
}
+volatile sig_atomic_t ParallelStopPending = false;
+volatile bool parallel_worker_stop = false;
+
+void HandleParallelStop(void)
+{
+ InterruptPending = true;
+ ParallelStopPending = true;
+ SetLatch(MyLatch);
+}
+
+void ProcessParallelStop(void)
+{
+ ParallelStopPending = false;
+ parallel_worker_stop = true;
+}
+
+
/*
* Main entrypoint for parallel query worker processes.
*
@@ -1493,9 +1510,9 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
InstrStartParallelQuery();
/*
- * Run the plan. If we specified a tuple bound, be careful not to demand
- * more tuples than that.
- */
+ * Run the plan. If we specified a tuple bound, be careful not to demand
+ * more tuples than that.
+ */
ExecutorRun(queryDesc,
ForwardScanDirection,
fpes->tuples_needed < 0 ? (int64) 0 : fpes->tuples_needed);
diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c
index dc7d1830259..31745e86ced 100644
--- a/src/backend/executor/nodeGather.c
+++ b/src/backend/executor/nodeGather.c
@@ -36,6 +36,7 @@
#include "executor/tqueue.h"
#include "miscadmin.h"
#include "optimizer/optimizer.h"
+#include "storage/procsignal.h"
#include "utils/wait_event.h"
@@ -71,6 +72,7 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
gatherstate->need_to_scan_locally =
!node->single_copy && parallel_leader_participation;
gatherstate->tuples_needed = -1;
+ gatherstate->tuples_produced = 0;
/*
* Miscellaneous initialization
@@ -126,6 +128,36 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
return gatherstate;
}
+/* ----------------------------------------------------------------
+ * Workers only stop when they themselves reach the LIMIT.
+ * They don't stop if other workers in total produced already
+ * enough rows to reach the LIMIT. Hence, we need to stop them
+ * explicitly.
+ * ----------------------------------------------------------------
+ */
+static void
+StopWorkersIfLimitReached(GatherState *node)
+{
+ if (node->tuples_needed != -1 && node->tuples_produced == node->tuples_needed)
+ {
+ if (node->pei != NULL)
+ {
+ ParallelContext *pcxt = node->pei->pcxt;
+ int i;
+
+ if (pcxt->worker != NULL)
+ {
+ for (i = 0; i < pcxt->nworkers_launched; ++i)
+ {
+ pid_t pid;
+ GetBackgroundWorkerPid(pcxt->worker[i].bgwhandle, &pid);
+ SendProcSignal(pid, PROCSIG_PARALLEL_STOP, INVALID_PROC_NUMBER);
+ }
+ }
+ }
+ }
+}
+
/* ----------------------------------------------------------------
* ExecGather(node)
*
@@ -212,6 +244,7 @@ ExecGather(PlanState *pstate)
/* Run plan locally if no workers or enabled and not single-copy. */
node->need_to_scan_locally = (node->nreaders == 0)
|| (!gather->single_copy && parallel_leader_participation);
+ node->tuples_produced = 0;
node->initialized = true;
}
@@ -230,6 +263,9 @@ ExecGather(PlanState *pstate)
if (TupIsNull(slot))
return NULL;
+ node->tuples_produced++;
+ StopWorkersIfLimitReached(node);
+
/* If no projection is required, we're done. */
if (node->ps.ps_ProjInfo == NULL)
return slot;
diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c
index 94047d29430..cb31ee8bbd3 100644
--- a/src/backend/executor/nodeSeqscan.c
+++ b/src/backend/executor/nodeSeqscan.c
@@ -375,6 +375,7 @@ ExecSeqScanInitializeDSM(SeqScanState *node,
shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, pscan);
node->ss.ss_currentScanDesc =
table_beginscan_parallel(node->ss.ss_currentRelation, pscan);
+ node->ss.ss_currentScanDesc->rs_flags |= SO_OBEY_PARALLEL_WORKER_STOP;
}
/* ----------------------------------------------------------------
@@ -408,4 +409,5 @@ ExecSeqScanInitializeWorker(SeqScanState *node,
pscan = shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, false);
node->ss.ss_currentScanDesc =
table_beginscan_parallel(node->ss.ss_currentRelation, pscan);
+ node->ss.ss_currentScanDesc->rs_flags |= SO_OBEY_PARALLEL_WORKER_STOP;
}
diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c
index 087821311cc..8f99ecebe2f 100644
--- a/src/backend/storage/ipc/procsignal.c
+++ b/src/backend/storage/ipc/procsignal.c
@@ -19,6 +19,7 @@
#include "access/parallel.h"
#include "commands/async.h"
+#include "executor/execParallel.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "port/pg_bitutils.h"
@@ -694,6 +695,9 @@ procsignal_sigusr1_handler(SIGNAL_ARGS)
if (CheckProcSignal(PROCSIG_PARALLEL_APPLY_MESSAGE))
HandleParallelApplyMessageInterrupt();
+ if (CheckProcSignal(PROCSIG_PARALLEL_STOP))
+ HandleParallelStop();
+
if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_DATABASE))
HandleRecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_DATABASE);
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 7dd75a490aa..bd74d381d67 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -40,6 +40,7 @@
#include "commands/explain_state.h"
#include "commands/prepare.h"
#include "common/pg_prng.h"
+#include "executor/execParallel.h"
#include "jit/jit.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
@@ -3541,6 +3542,9 @@ ProcessInterrupts(void)
if (ParallelApplyMessagePending)
ProcessParallelApplyMessages();
+
+ if (ParallelStopPending)
+ ProcessParallelStop();
}
/*
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index e16bf025692..239852627fc 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -63,6 +63,9 @@ typedef enum ScanOptions
/* unregister snapshot at scan end? */
SO_TEMP_SNAPSHOT = 1 << 9,
+
+ /* Bail out from this scan if parallel bailing activated */
+ SO_OBEY_PARALLEL_WORKER_STOP = 1 << 10,
} ScanOptions;
/*
diff --git a/src/include/executor/execParallel.h b/src/include/executor/execParallel.h
index 5e7106c397a..45d86b804f0 100644
--- a/src/include/executor/execParallel.h
+++ b/src/include/executor/execParallel.h
@@ -37,6 +37,14 @@ typedef struct ParallelExecutorInfo
struct TupleQueueReader **reader; /* tuple reader/writer support */
} ParallelExecutorInfo;
+extern PGDLLIMPORT volatile sig_atomic_t ParallelStopPending;
+extern PGDLLIMPORT volatile bool parallel_worker_stop;
+
+#define IS_PARALLEL_WORKER_STOP() (parallel_worker_stop)
+
+extern void HandleParallelStop(void);
+extern void ProcessParallelStop(void);
+
extern ParallelExecutorInfo *ExecInitParallelPlan(PlanState *planstate,
EState *estate, Bitmapset *sendParams, int nworkers,
int64 tuples_needed);
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 18ae8f0d4bb..9315fa6c942 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -2743,6 +2743,7 @@ typedef struct GatherState
bool initialized; /* workers launched? */
bool need_to_scan_locally; /* need to read from local plan? */
int64 tuples_needed; /* tuple bound, see ExecSetTupleBound */
+ int64 tuples_produced; /* tuples already produced */
/* these fields are set up once: */
TupleTableSlot *funnel_slot;
struct ParallelExecutorInfo *pei;
diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h
index afeeb1ca019..f7f4ee85154 100644
--- a/src/include/storage/procsignal.h
+++ b/src/include/storage/procsignal.h
@@ -36,6 +36,7 @@ typedef enum
PROCSIG_BARRIER, /* global barrier interrupt */
PROCSIG_LOG_MEMORY_CONTEXT, /* ask backend to log the memory contexts */
PROCSIG_PARALLEL_APPLY_MESSAGE, /* Message from parallel apply workers */
+ PROCSIG_PARALLEL_STOP, /* Instruct parallel worker to stop */
/* Recovery conflict reasons */
PROCSIG_RECOVERY_CONFLICT_FIRST,
--
2.51.0