Andy Fan <[email protected]> writes:
Hi,
After some coding with this subject, I think it is better redefining
the problem and solution.
Problem:
--------
Supplan is common to be ineffective *AND* recently I find it is hard to
work with parallel framework. e.g.
create table bigt (a int, b int, c int);
insert into bigt select i, i, i from generate_series(1, 1000000)i;
analyze bigt;
q1:
select * from bigt o where b = 1
and c > (select avg(c) from bigt i where c = o.c);
We get plan:
QUERY PLAN
-------------------------------------------
Seq Scan on bigt o
Filter: ((b = 1) AND (c > (SubPlan 1)))
SubPlan 1
-> Aggregate
-> Seq Scan on bigt i
Filter: (c = o.c)
(6 rows)
Here we can see there is no parallel at all. However if split the query
q1 into queries q2 and q3, both of them can be parallelized.
q2:
explain (costs off) select * from bigt o where b = 1 and c > 2;
QUERY PLAN
---------------------------------------
Gather
Workers Planned: 2
-> Parallel Seq Scan on bigt o
Filter: ((c > 2) AND (b = 1))
(4 rows)
q3:
explain (costs off) select avg(c) from bigt o where c = 2;
QUERY PLAN
-----------------------------------------
Aggregate
-> Gather
Workers Planned: 2
-> Parallel Seq Scan on bigt o
Filter: (c = 2)
(5 rows)
Analysis
--------
The major reason of q1 can't be paralleled is the subplan is parameterized.
the comment from add_partial_path:
* We don't generate parameterized partial paths for several reasons.
Most
* importantly, they're not safe to execute, because there's nothing to
* make sure that a parallel scan within the parameterized portion of the
* plan is running with the same value in every worker at the same time.
the comment from max_parallel_hazard_walker:
* We can't pass Params to workers at the moment either .. unless
* they are listed in safe_param_ids, meaning they could be
* either generated within workers or can be computed by the leader and
* then their value can be passed to workers.
Solutions
----------
two foundations for this solution in my mind:
1. It is not safe to execute a partial parameterized plan with different
parameter value, as what we have well done and documented. But this
doesn't apply to a parameterized completed plan, in this case each
worker runs a completed plan, they always generate the same result
no matter it runs in parallel worker or leader.
2. The subplan never be a partial Plan. in make_subplan:
best_path = get_cheapest_fractional_path(final_rel, tuple_fraction);
plan = create_plan(subroot, best_path);
/* And convert to SubPlan or InitPlan format. */
result = build_subplan(root, plan, best_path,
subroot, plan_params,
subLinkType, subLinkId,
testexpr, NIL, isTopQual);
get_cheapest_fractional_path never read rel->partial_pathlist.
So I think it is safe to ignore the PARAM_EXEC check in
max_parallel_hazard_context.safe_param_ids) for subplan. See attached
patch 1.
Benefit:
--------
After this patch, we could get the below plan -- the correlated subplan
is parallelized.
explain (costs off) select * from bigt o where b = 1
and c > (select avg(c) from bigt i where c = o.c);
QUERY PLAN
------------------------------------------------------
Seq Scan on bigt o
Filter: ((b = 1) AND ((c)::numeric > (SubPlan 1)))
SubPlan 1
-> Aggregate
-> Gather
Workers Planned: 2
-> Parallel Seq Scan on bigt i
Filter: (c = o.c)
(8 rows)
Continue the test to prove the impact of this patch by removing the
"Gather" in SubPlan, we could get the below plan -- scan with
parallel-safe SubPlan is parallelized.
create table t (a int, b int);
explain (costs off) select * from bigt o where b = 1
and c > (select avg(a) from t i where b = o.c);
QUERY PLAN
------------------------------------------------------------
Gather
Workers Planned: 2
-> Parallel Seq Scan on bigt o
Filter: ((b = 1) AND ((c)::numeric > (SubPlan 1)))
SubPlan 1
-> Aggregate
-> Seq Scan on t i
Filter: (b = o.c)
(8 rows)
incremental_sort.sql provides another impacts of this patch. It is
helpful for parallel sort.
Query:
select distinct
unique1,
(select t.unique1 from tenk1 where tenk1.unique1 = t.unique1)
from tenk1 t, generate_series(1, 1000);
>From (master)
QUERY PLAN
----------------------------------------------------------------------------------------
Unique
Output: t.unique1, ((SubPlan 1))
-> Sort
Output: t.unique1, ((SubPlan 1))
Sort Key: t.unique1, ((SubPlan 1))
-> Gather
Output: t.unique1, (SubPlan 1)
Workers Planned: 2
-> Nested Loop
Output: t.unique1
-> Parallel Index Only Scan using tenk1_unique1 on
public.tenk1 t
Output: t.unique1
-> Function Scan on pg_catalog.generate_series
Output: generate_series.generate_series
Function Call: generate_series(1, 1000)
SubPlan 1
-> Index Only Scan using tenk1_unique1 on public.tenk1
Output: t.unique1
Index Cond: (tenk1.unique1 = t.unique1)
(19 rows)
To (patched)
QUERY PLAN
----------------------------------------------------------------------------------------------
Unique
Output: t.unique1, ((SubPlan 1))
-> Gather Merge * Merge gather at last *
Output: t.unique1, ((SubPlan 1))
Workers Planned: 2
-> Unique
Output: t.unique1, ((SubPlan 1))
-> Sort ** Sort In worker *
Output: t.unique1, ((SubPlan 1))
Sort Key: t.unique1, ((SubPlan 1))
-> Nested Loop
*SubPlan in Worker.**
Output: t.unique1, (SubPlan 1)
-> Parallel Index Only Scan using tenk1_unique1 on
public.tenk1 t
Output: t.unique1
-> Function Scan on pg_catalog.generate_series
Output: generate_series.generate_series
Function Call: generate_series(1, 1000)
SubPlan 1
-> Index Only Scan using tenk1_unique1 on
public.tenk1
Output: t.unique1
Index Cond: (tenk1.unique1 = t.unique1)
(21 rows)
The execution time for the above query also decreased from 13351.928 ms
to 4814.043 ms, by 64%. The major difference is:
(1) master: correlated subquery is parallel unsafe, so it runs in leader
only, and then sort.
(2) patched: correlated subquery is parallel safe, so it run in worker
(Nested Loop) and then *sort in parallel worker* and then run "merge
gather".
About the implementation, I know 2 issues at least (the state is PoC
now).
1. Query.is_in_sublink should be set in parser and keep unchanged later.
2. The below comment increment_sort.sql should be changed, it is just
conflicted with this patch.
"""
-- Parallel sort but with expression (correlated subquery) that
-- is prohibited in parallel plans.
"""
Hope I have made myself clear, any feedback is welcome!
--
Best Regards
Andy Fan
>From 19ef904ab5ed0d2b2dcc5141fc894e1f33ef9ab0 Mon Sep 17 00:00:00 2001
From: Andy Fan <[email protected]>
Date: Wed, 2 Jul 2025 06:28:02 +0000
Subject: [PATCH v0 1/1] Revisit Subplan's parallel safety.
---
src/backend/optimizer/plan/planner.c | 2 +
src/backend/optimizer/plan/subselect.c | 2 +
src/backend/optimizer/util/clauses.c | 32 ++++++++++-----
src/include/nodes/parsenodes.h | 9 ++++
src/include/nodes/pathnodes.h | 3 ++
.../regress/expected/incremental_sort.out | 41 ++++++++++---------
src/test/regress/expected/select_parallel.out | 26 +++++++-----
7 files changed, 73 insertions(+), 42 deletions(-)
diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c
index 549aedcfa99..3a1fa618e4f 100644
--- a/src/backend/optimizer/plan/planner.c
+++ b/src/backend/optimizer/plan/planner.c
@@ -708,6 +708,8 @@ subquery_planner(PlannerGlobal *glob, Query *parse, PlannerInfo *parent_root,
*/
root->join_domains = list_make1(makeNode(JoinDomain));
+ root->isSubPlan = parse->is_in_sublink;
+
/*
* If there is a WITH list, process each WITH query and either convert it
* to RTE_SUBQUERY RTE(s) or build an initplan SubPlan structure for it.
diff --git a/src/backend/optimizer/plan/subselect.c b/src/backend/optimizer/plan/subselect.c
index e7cb3fede66..4de880de9ef 100644
--- a/src/backend/optimizer/plan/subselect.c
+++ b/src/backend/optimizer/plan/subselect.c
@@ -181,6 +181,8 @@ make_subplan(PlannerInfo *root, Query *orig_subquery,
*/
subquery = copyObject(orig_subquery);
+ subquery->is_in_sublink = true;
+
/*
* If it's an EXISTS subplan, we might be able to simplify it.
*/
diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c
index 26a3e050086..63816d8a733 100644
--- a/src/backend/optimizer/util/clauses.c
+++ b/src/backend/optimizer/util/clauses.c
@@ -92,6 +92,7 @@ typedef struct
char max_hazard; /* worst proparallel hazard found so far */
char max_interesting; /* worst proparallel hazard of interest */
List *safe_param_ids; /* PARAM_EXEC Param IDs to treat as safe */
+ bool is_subplan;
} max_parallel_hazard_context;
static bool contain_agg_clause_walker(Node *node, void *context);
@@ -770,23 +771,26 @@ is_parallel_safe(PlannerInfo *root, Node *node)
context.max_hazard = PROPARALLEL_SAFE;
context.max_interesting = PROPARALLEL_RESTRICTED;
context.safe_param_ids = NIL;
+ context.is_subplan = root->isSubPlan;
- /*
- * The params that refer to the same or parent query level are considered
- * parallel-safe. The idea is that we compute such params at Gather or
- * Gather Merge node and pass their value to workers.
- */
- for (proot = root; proot != NULL; proot = proot->parent_root)
+ if (!context.is_subplan)
{
- foreach(l, proot->init_plans)
+ /*
+ * The params that refer to the same or parent query level are
+ * considered parallel-safe. The idea is that we compute such params
+ * at Gather or Gather Merge node and pass their value to workers.
+ */
+ for (proot = root; proot != NULL; proot = proot->parent_root)
{
- SubPlan *initsubplan = (SubPlan *) lfirst(l);
+ foreach(l, proot->init_plans)
+ {
+ SubPlan *initsubplan = (SubPlan *) lfirst(l);
- context.safe_param_ids = list_concat(context.safe_param_ids,
- initsubplan->setParam);
+ context.safe_param_ids = list_concat(context.safe_param_ids,
+ initsubplan->setParam);
+ }
}
}
-
return !max_parallel_hazard_walker(node, &context);
}
@@ -936,6 +940,12 @@ max_parallel_hazard_walker(Node *node, max_parallel_hazard_context *context)
if (param->paramkind == PARAM_EXTERN)
return false;
+ /*
+ * Subplan is always non partial plan, so their parameter are always
+ * computed by leader.
+ */
+ if (context->is_subplan)
+ return false;
if (param->paramkind != PARAM_EXEC ||
!list_member_int(context->safe_param_ids, param->paramid))
{
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index daa285ca62f..cc36dc72eb3 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -244,6 +244,15 @@ typedef struct Query
/* a list of WithCheckOption's (added during rewrite) */
List *withCheckOptions pg_node_attr(query_jumble_ignore);
+ /*
+ * XXX: looks currently we don't know where a Query comes from, e.g.
+ * SubQuery or SubLink. Providing such information to planner might be
+ * helpful sometimes. For now only SubLink case is interesting and it is
+ * only set during make_subplan. we can do it in parser if it worths the
+ * trouble (there are many members in enum SubLinkType).
+ */
+ bool is_in_sublink;
+
/*
* The following two fields identify the portion of the source text string
* containing this query. They are typically only populated in top-level
diff --git a/src/include/nodes/pathnodes.h b/src/include/nodes/pathnodes.h
index 6567759595d..053ace26e74 100644
--- a/src/include/nodes/pathnodes.h
+++ b/src/include/nodes/pathnodes.h
@@ -583,6 +583,9 @@ struct PlannerInfo
/* PartitionPruneInfos added in this query's plan. */
List *partPruneInfos;
+
+ /* is building a subplan */
+ bool isSubPlan;
};
diff --git a/src/test/regress/expected/incremental_sort.out b/src/test/regress/expected/incremental_sort.out
index b00219643b9..9ba232c4c67 100644
--- a/src/test/regress/expected/incremental_sort.out
+++ b/src/test/regress/expected/incremental_sort.out
@@ -1605,20 +1605,21 @@ explain (costs off) select distinct
unique1,
(select t.unique1 from tenk1 where tenk1.unique1 = t.unique1)
from tenk1 t, generate_series(1, 1000);
- QUERY PLAN
----------------------------------------------------------------------------------
+ QUERY PLAN
+---------------------------------------------------------------------------------------
Unique
- -> Sort
- Sort Key: t.unique1, ((SubPlan 1))
- -> Gather
- Workers Planned: 2
- -> Nested Loop
- -> Parallel Index Only Scan using tenk1_unique1 on tenk1 t
- -> Function Scan on generate_series
- SubPlan 1
- -> Index Only Scan using tenk1_unique1 on tenk1
- Index Cond: (unique1 = t.unique1)
-(11 rows)
+ -> Gather Merge
+ Workers Planned: 2
+ -> Unique
+ -> Sort
+ Sort Key: t.unique1, ((SubPlan 1))
+ -> Nested Loop
+ -> Parallel Index Only Scan using tenk1_unique1 on tenk1 t
+ -> Function Scan on generate_series
+ SubPlan 1
+ -> Index Only Scan using tenk1_unique1 on tenk1
+ Index Cond: (unique1 = t.unique1)
+(12 rows)
explain (costs off) select
unique1,
@@ -1627,16 +1628,16 @@ from tenk1 t, generate_series(1, 1000)
order by 1, 2;
QUERY PLAN
---------------------------------------------------------------------------
- Sort
- Sort Key: t.unique1, ((SubPlan 1))
- -> Gather
- Workers Planned: 2
+ Gather Merge
+ Workers Planned: 2
+ -> Sort
+ Sort Key: t.unique1, ((SubPlan 1))
-> Nested Loop
-> Parallel Index Only Scan using tenk1_unique1 on tenk1 t
-> Function Scan on generate_series
- SubPlan 1
- -> Index Only Scan using tenk1_unique1 on tenk1
- Index Cond: (unique1 = t.unique1)
+ SubPlan 1
+ -> Index Only Scan using tenk1_unique1 on tenk1
+ Index Cond: (unique1 = t.unique1)
(10 rows)
-- Parallel sort but with expression not available until the upper rel.
diff --git a/src/test/regress/expected/select_parallel.out b/src/test/regress/expected/select_parallel.out
index 0185ef661b1..189a77a4f5f 100644
--- a/src/test/regress/expected/select_parallel.out
+++ b/src/test/regress/expected/select_parallel.out
@@ -148,8 +148,8 @@ create table part_pa_test_p2 partition of part_pa_test for values from (0) to (m
explain (costs off)
select (select max((select pa1.b from part_pa_test pa1 where pa1.a = pa2.a)))
from part_pa_test pa2;
- QUERY PLAN
---------------------------------------------------------------
+ QUERY PLAN
+----------------------------------------------------------------
Aggregate
-> Gather
Workers Planned: 3
@@ -159,12 +159,14 @@ explain (costs off)
SubPlan 2
-> Result
SubPlan 1
- -> Append
- -> Seq Scan on part_pa_test_p1 pa1_1
- Filter: (a = pa2.a)
- -> Seq Scan on part_pa_test_p2 pa1_2
- Filter: (a = pa2.a)
-(14 rows)
+ -> Gather
+ Workers Planned: 3
+ -> Parallel Append
+ -> Parallel Seq Scan on part_pa_test_p1 pa1_1
+ Filter: (a = pa2.a)
+ -> Parallel Seq Scan on part_pa_test_p2 pa1_2
+ Filter: (a = pa2.a)
+(16 rows)
drop table part_pa_test;
-- test with leader participation disabled
@@ -1339,9 +1341,11 @@ SELECT 1 FROM tenk1_vw_sec
-> Parallel Index Only Scan using tenk1_unique1 on tenk1
SubPlan 1
-> Aggregate
- -> Seq Scan on int4_tbl
- Filter: (f1 < tenk1_vw_sec.unique1)
-(9 rows)
+ -> Gather
+ Workers Planned: 1
+ -> Parallel Seq Scan on int4_tbl
+ Filter: (f1 < tenk1_vw_sec.unique1)
+(11 rows)
rollback;
-- test that a newly-created session role propagates to workers.
--
2.45.1