Hi Tomas!

On 26.11.2025 18:30, Tomas Vondra wrote:
> The question is if this are the only two such issues possible, and I'm
> afraid the answer is "no" :-(
> The question is if "exiting" from any place calling CFI leaves the
> execution state in a valid state. Valid enough so that we can call
> ExecEndNode() on all the nodes, including the one from which we exited.
> But I don't think we can rely on that. The node can do multiple steps,
> interleaved with CFI, not expecting that only one of them happens. I
> assume this would cause a lot of issues ...

Yes, seems like this is the wrong approach. Even though it has the
appeal of being able to rely on all the existing calls to CFI.

Not using CFI means we need to add checks to all places we care about. I
would start with all base relation scans and sort and take it from there.

>> The new patch accounts for that by introducing a new TableScanDescData
>> flag SO_OBEY_PARALLEL_WORKER_STOP, which indicates if the scan should
>> adhere to the parallel worker stop or not.
>>
>> Stopping is broadcasted to all parallel workers via SendProcSignal().
>> The stop variable is checked with the new
>> CHECK_FOR_PARALLEL_WORKER_STOP() macro.

Turned out that this way background workers can miss signals, if they
start up after the leader has already found enough rows and hence
broadcasted the stop signal.

I've fixed that by instead storing a stop flag in shared memory which
the workers can check to see if they're supposed to stop. The flag must
be propagated into the various places that need to check it, e.g. into
the HEAP or index scan code.

>> In the PoC implementation I've for now only changed nodeSeqScan.c. If
>> there's agreement to pursue this approach, I'll change the other places
>> as well. Naming can also likely be still improved.
>>
> 
> This assumes we need to "exit" only from a heapam scan. That's true for
> the example, but is that enough in general? What if the worker already
> finished it's plan, and is now busy doing something else expensive?
> Could be a big sort, aggregation, ... Can we do something about these
> cases too?

This is only to illustrate the overall approach. Other places can easily
be made work in the spirit of nodeSeqscan.c and heapam.c, by passing the
stop flag into the looping code and bailing if it turns true.

But I would first like to agree on the overall approach before I spend
more time changing the other places.

--
David Geier
From e3019009a359939873a05637ad030485e132d551 Mon Sep 17 00:00:00 2001
From: David Geier <[email protected]>
Date: Wed, 19 Nov 2025 11:23:08 +0100
Subject: [PATCH v3] Parallel workers stop earlier in presence of LIMIT

In the presence of a LIMIT N clause, the executor stops as soon as it got
enough rows and shuts down the plan. In the serial case the query ends
immediately. If the query happens to be parallel, the workers only
exit if they produced N rows, regardless of how many rows got already
produced by other participants.

Worst-case example: a query has a LIMIT 1 clause and scans a large table
where only a single row qualifies. The first worker that finds the
matching row will return that row and terminate. All other workers will
keep running until the table is scanned to completion.

This change informs all workers to stop execution once the leader got
enough rows. The information to stop is passed to the workers via shared
memory. Using signals, e.g. via the SendProcSignal() API, doesn't work
reliably because the signal might get lost in case the worker only
starts up after the signal got already sent.
---
 src/backend/access/heap/heapam.c    |  8 ++++++
 src/backend/executor/execParallel.c | 42 ++++++++++++++++++++++++++---
 src/backend/executor/nodeGather.c   |  3 +++
 src/backend/executor/nodeSeqscan.c  |  2 ++
 src/include/access/relscan.h        |  6 +++++
 src/include/executor/execParallel.h |  2 ++
 src/include/nodes/execnodes.h       |  1 +
 7 files changed, 60 insertions(+), 4 deletions(-)

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index ad9d6338ec2..5e115472e61 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -39,6 +39,7 @@
 #include "access/syncscan.h"
 #include "access/valid.h"
 #include "access/visibilitymap.h"
+#include "executor/execParallel.h"
 #include "access/xloginsert.h"
 #include "catalog/pg_database.h"
 #include "catalog/pg_database_d.h"
@@ -935,6 +936,9 @@ heapgettup(HeapScanDesc scan,
 	 */
 	while (true)
 	{
+		if (scan->rs_base.rs_stop_flag != NULL && !pg_atomic_unlocked_test_flag(scan->rs_base.rs_stop_flag))
+			break;
+
 		heap_fetch_next_buffer(scan, dir);
 
 		/* did we run out of blocks to scan? */
@@ -1052,6 +1056,9 @@ heapgettup_pagemode(HeapScanDesc scan,
 	 */
 	while (true)
 	{
+		if (scan->rs_base.rs_stop_flag != NULL && !pg_atomic_unlocked_test_flag(scan->rs_base.rs_stop_flag))
+			break;
+
 		heap_fetch_next_buffer(scan, dir);
 
 		/* did we run out of blocks to scan? */
@@ -1153,6 +1160,7 @@ heap_beginscan(Relation relation, Snapshot snapshot,
 	scan->rs_base.rs_flags = flags;
 	scan->rs_base.rs_parallel = parallel_scan;
 	scan->rs_strategy = NULL;	/* set in initscan */
+	scan->rs_base.rs_stop_flag = NULL;
 	scan->rs_cbuf = InvalidBuffer;
 
 	/*
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index 772e81f3154..2bbb3cc5c89 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -74,10 +74,11 @@
  */
 typedef struct FixedParallelExecutorState
 {
-	int64		tuples_needed;	/* tuple bound, see ExecSetTupleBound */
-	dsa_pointer param_exec;
-	int			eflags;
-	int			jit_flags;
+	int64			tuples_needed;	/* tuple bound, see ExecSetTupleBound */
+	dsa_pointer		param_exec;
+	int				eflags;
+	int				jit_flags;
+	pg_atomic_flag	stop_flag;
 } FixedParallelExecutorState;
 
 /*
@@ -603,6 +604,20 @@ ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
 	return responseq;
 }
 
+/*
+ * Propagate the terminate flag pointer to all nodes in the plan tree.
+ * This ensures that all nodes can check for termination requests from the leader.
+ */
+static bool
+ExecParallelPropagateTerminateFlag(PlanState *planstate, pg_atomic_flag *stop_flag)
+{
+	if (planstate == NULL)
+		return false;
+
+	planstate->parallel_stop_flag = stop_flag;
+	return planstate_tree_walker(planstate, ExecParallelPropagateTerminateFlag, stop_flag);
+}
+
 /*
  * Sets up the required infrastructure for backend workers to perform
  * execution and return results to the main backend.
@@ -768,6 +783,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
 	fpes->param_exec = InvalidDsaPointer;
 	fpes->eflags = estate->es_top_eflags;
 	fpes->jit_flags = estate->es_jit_flags;
+	pg_atomic_init_flag(&fpes->stop_flag);
 	shm_toc_insert(pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, fpes);
 
 	/* Store query string */
@@ -803,6 +819,8 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
 	/* We don't need the TupleQueueReaders yet, though. */
 	pei->reader = NULL;
 
+	pei->stop_flag = &fpes->stop_flag;
+
 	/*
 	 * If instrumentation options were supplied, allocate space for the data.
 	 * It only gets partially initialized here; the rest happens during
@@ -888,6 +906,9 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
 	if (e.nnodes != d.nnodes)
 		elog(ERROR, "inconsistent count of PlanState nodes");
 
+ 	/* Set up pointer to the shared terminate flag */
+ 	pei->stop_flag = &fpes->stop_flag;
+
 	/* OK, we're ready to rock and roll. */
 	return pei;
 }
@@ -1165,6 +1186,16 @@ ExecParallelRetrieveJitInstrumentation(PlanState *planstate,
 	memcpy(planstate->worker_jit_instrument, shared_jit, ibytes);
 }
 
+/*
+ * Signal all parallel workers to terminate execution by setting the shared
+ * terminate flag. This causes ExecProcNode() to return NULL in all workers.
+ */
+void
+ExecParallelTerminate(ParallelExecutorInfo *pei)
+{
+	pg_atomic_test_set_flag(pei->stop_flag);
+}
+
 /*
  * Finish parallel execution.  We wait for parallel workers to finish, and
  * accumulate their buffer/WAL usage.
@@ -1488,6 +1519,9 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
 	queryDesc->plannedstmt->jitFlags = fpes->jit_flags;
 	ExecutorStart(queryDesc, fpes->eflags);
 
+	/* Propagate the terminate flag pointer to all nodes in the worker's plan tree */
+	ExecParallelPropagateTerminateFlag(queryDesc->planstate, &fpes->stop_flag);
+
 	/* Special executor initialization steps for parallel workers */
 	queryDesc->planstate->state->es_query_dsa = area;
 	if (DsaPointerIsValid(fpes->param_exec))
diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c
index 4105f1d1968..b7cbb3d9ab8 100644
--- a/src/backend/executor/nodeGather.c
+++ b/src/backend/executor/nodeGather.c
@@ -417,6 +417,9 @@ ExecShutdownGatherWorkers(GatherState *node)
 void
 ExecShutdownGather(GatherState *node)
 {
+	if (node->pei != NULL)
+		ExecParallelTerminate(node->pei);
+
 	ExecShutdownGatherWorkers(node);
 
 	/* Now destroy the parallel context. */
diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c
index b8119face43..a5a99a4dd84 100644
--- a/src/backend/executor/nodeSeqscan.c
+++ b/src/backend/executor/nodeSeqscan.c
@@ -375,6 +375,7 @@ ExecSeqScanInitializeDSM(SeqScanState *node,
 	shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, pscan);
 	node->ss.ss_currentScanDesc =
 		table_beginscan_parallel(node->ss.ss_currentRelation, pscan);
+	node->ss.ss_currentScanDesc->rs_stop_flag = node->ss.ps.parallel_stop_flag;
 }
 
 /* ----------------------------------------------------------------
@@ -408,4 +409,5 @@ ExecSeqScanInitializeWorker(SeqScanState *node,
 	pscan = shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, false);
 	node->ss.ss_currentScanDesc =
 		table_beginscan_parallel(node->ss.ss_currentRelation, pscan);
+	node->ss.ss_currentScanDesc->rs_stop_flag = node->ss.ps.parallel_stop_flag;
 }
diff --git a/src/include/access/relscan.h b/src/include/access/relscan.h
index 9b342d5bd80..8b39ff7200c 100644
--- a/src/include/access/relscan.h
+++ b/src/include/access/relscan.h
@@ -63,6 +63,12 @@ typedef struct TableScanDescData
 	 */
 	uint32		rs_flags;
 
+	/*
+	 * Flag used to indicate if the parallel workers participating in the scan
+	 * should bail out and stop scanning.
+	 */
+	pg_atomic_flag *rs_stop_flag;
+
 	struct ParallelTableScanDescData *rs_parallel;	/* parallel scan
 													 * information */
 } TableScanDescData;
diff --git a/src/include/executor/execParallel.h b/src/include/executor/execParallel.h
index 5a2034811d5..4e7858e533c 100644
--- a/src/include/executor/execParallel.h
+++ b/src/include/executor/execParallel.h
@@ -32,6 +32,7 @@ typedef struct ParallelExecutorInfo
 	dsa_area   *area;			/* points to DSA area in DSM */
 	dsa_pointer param_exec;		/* serialized PARAM_EXEC parameters */
 	bool		finished;		/* set true by ExecParallelFinish */
+	pg_atomic_flag	*stop_flag;	/* pointer to shared terminate flag */
 	/* These two arrays have pcxt->nworkers_launched entries: */
 	shm_mq_handle **tqueue;		/* tuple queues for worker output */
 	struct TupleQueueReader **reader;	/* tuple reader/writer support */
@@ -41,6 +42,7 @@ extern ParallelExecutorInfo *ExecInitParallelPlan(PlanState *planstate,
 												  EState *estate, Bitmapset *sendParams, int nworkers,
 												  int64 tuples_needed);
 extern void ExecParallelCreateReaders(ParallelExecutorInfo *pei);
+extern void ExecParallelTerminate(ParallelExecutorInfo *pei);
 extern void ExecParallelFinish(ParallelExecutorInfo *pei);
 extern void ExecParallelCleanup(ParallelExecutorInfo *pei);
 extern void ExecParallelReinitialize(PlanState *planstate,
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 02265456978..2e59f9809cf 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -1248,6 +1248,7 @@ typedef struct PlanState
 	bool		outeropsset;
 	bool		inneropsset;
 	bool		resultopsset;
+	pg_atomic_flag	*parallel_stop_flag;
 } PlanState;
 
 /* ----------------
-- 
2.51.0

Reply via email to