On Fri, Jan 21, 2022 at 3:20 PM Robert Haas <robertmh...@gmail.com> wrote: > > On Fri, Jan 14, 2022 at 2:25 PM James Coleman <jtc...@gmail.com> wrote: > > I've been chewing on this a bit, and I was about to go re-read the > > code and see how easy it'd be to do exactly what you're suggesting in > > generate_gather_paths() (and verifying it doesn't need to happen in > > other places). However there's one (I think large) gotcha with that > > approach (assuming it otherwise makes sense): it means we do > > unnecessary work. In the current patch series we only need to recheck > > parallel safety if we're in a situation where we might actually > > benefit from doing that work (namely when we have a correlated > > subquery we might otherwise be able to execute in a parallel plan). If > > we don't track that status we'd have to recheck the full parallel > > safety of the path for all paths -- even without correlated > > subqueries. > > I don't think there's an intrinsic problem with the idea of making a > tentative determination about parallel safety and then refining it > later, but I'm not sure why you think it would be a lot of work to > figure this out at the point where we generate gather paths. I think > it's just a matter of testing whether the set of parameters that the > path needs as input is the empty set. It may be that neither extParam > nor allParam are precisely that thing, but I think both are very > close, and it seems to me that there's no theoretical reason why we > can't know for every path the set of inputs that it requires "from the > outside."
As I understand it now (not sure I realized this before) you're suggesting that *even when there are required params* marking it as parallel safe, and then checking the params for parallel safety later. >From a purely theoretical perspective that seemed reasonable, so I took a pass at that approach. The first, and likely most interesting, thing I discovered was that the vast majority of what the patch accomplishes it does so not via the delayed params safety checking but rather via the required outer relids checks I'd added to generate_useful_gather_paths. For that to happen I did have to mark PARAM_EXEC params as presumed parallel safe. That means that parallel_safe now doesn't strictly mean "parallel safe in the current context" but "parallel safe as long as any params are provided". That's a real change, but probably acceptable as long as a project policy decision is made in that direction. There are a few concerns I have (and I'm not sure what level they rise to): 1. From what I can tell we don't have access on a path to the set of params required by that path (I believe this is what Tom was referencing in his sister reply at this point in the thread). That means we have to rely on checking that the required outer relids are provided by the current query level. I'm not quite sure yet whether or not that guarantees (or if the rest of the path construction logic guarantees for us) that the params provided by the outer rel are used in a correlated way that isn't shared across workers. And because we don't have the param information available we can't add additional checks (that I can tell) to verify that. 2. Are we excluding any paths (by having one that will always be invalid win the cost comparisons in add_partial_path)? I suppose this danger actually exists in the previous version of the patch as well, and I don't actually have any examples of this being a problem. Also maybe this can only be a problem if (1) reveals a bug. 3. The new patch series actually ends up allowing parallelization of correlated params in a few more places than the original patch series. >From what I can tell all of the cases are in fact safe to execute in parallel, which, if true, means this is a feature not a concern. The changed query plans fall into two categories: a.) putting a gather inside a subplan and b.) correlated param usages in a subquery scan path on the inner side of a join. I've separated out those specific changes in a separate patch to make it easier to tell which ones I'm referencing. On the other hand this is a dramatically simpler patch series. Assuming the approach is sound, it should much easier to maintain than the previous version. The final patch in the series is a set of additional checks I could imagine to try to be more explicit, but at least in the current test suite there isn't anything at all they affect. Does this look at least somewhat more like what you'd envisionsed (granting the need to squint hard given the relids checks instead of directly checking params)? Thanks, James Coleman
From 6596bb6909b9dfaadd6e6e834be4032e7903c4e9 Mon Sep 17 00:00:00 2001 From: jcoleman <jtc...@gmail.com> Date: Mon, 30 Nov 2020 11:36:35 -0500 Subject: [PATCH v4 1/4] Allow parallel LATERAL subqueries with LIMIT/OFFSET The code that determined whether or not a rel should be considered for parallel query excluded subqueries with LIMIT/OFFSET. That's correct in the general case: as the comment notes that'd mean we have to guarantee ordering (and claims it's not worth checking that) for results to be consistent across workers. However there's a simpler case that hasn't been considered: LATERAL subqueries with LIMIT/OFFSET don't fall under the same reasoning since they're executed (when not converted to a JOIN) per tuple anyway, so consistency of results across workers isn't a factor. --- src/backend/optimizer/path/allpaths.c | 4 +++- src/test/regress/expected/select_parallel.out | 15 +++++++++++++++ src/test/regress/sql/select_parallel.sql | 6 ++++++ 3 files changed, 24 insertions(+), 1 deletion(-) diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c index 169b1d53fc..6a581e20fa 100644 --- a/src/backend/optimizer/path/allpaths.c +++ b/src/backend/optimizer/path/allpaths.c @@ -682,11 +682,13 @@ set_rel_consider_parallel(PlannerInfo *root, RelOptInfo *rel, * inconsistent results at the top-level. (In some cases, where * the result is ordered, we could relax this restriction. But it * doesn't currently seem worth expending extra effort to do so.) + * LATERAL is an exception: LIMIT/OFFSET is safe to execute within + * workers since the sub-select is executed per tuple */ { Query *subquery = castNode(Query, rte->subquery); - if (limit_needed(subquery)) + if (!rte->lateral && limit_needed(subquery)) return; } break; diff --git a/src/test/regress/expected/select_parallel.out b/src/test/regress/expected/select_parallel.out index 4ea1aa7dfd..2303f70d6e 100644 --- a/src/test/regress/expected/select_parallel.out +++ b/src/test/regress/expected/select_parallel.out @@ -1040,6 +1040,21 @@ explain (costs off) Filter: (stringu1 ~~ '%AAAA'::text) (11 rows) +-- ...unless it's LATERAL +savepoint settings; +set parallel_tuple_cost=0; +explain (costs off) select t.unique1 from tenk1 t +join lateral (select t.unique1 from tenk1 offset 0) l on true; + QUERY PLAN +--------------------------------------------------------------------- + Gather + Workers Planned: 4 + -> Nested Loop + -> Parallel Index Only Scan using tenk1_unique1 on tenk1 t + -> Index Only Scan using tenk1_hundred on tenk1 +(5 rows) + +rollback to savepoint settings; -- to increase the parallel query test coverage SAVEPOINT settings; SET LOCAL force_parallel_mode = 1; diff --git a/src/test/regress/sql/select_parallel.sql b/src/test/regress/sql/select_parallel.sql index f924731248..019e17e751 100644 --- a/src/test/regress/sql/select_parallel.sql +++ b/src/test/regress/sql/select_parallel.sql @@ -390,6 +390,12 @@ explain (costs off, verbose) explain (costs off) select * from tenk1 a where two in (select two from tenk1 b where stringu1 like '%AAAA' limit 3); +-- ...unless it's LATERAL +savepoint settings; +set parallel_tuple_cost=0; +explain (costs off) select t.unique1 from tenk1 t +join lateral (select t.unique1 from tenk1 offset 0) l on true; +rollback to savepoint settings; -- to increase the parallel query test coverage SAVEPOINT settings; -- 2.17.1
From 200c4ed4118f92014253b49faa41a76ecec84c96 Mon Sep 17 00:00:00 2001 From: jcoleman <jtc...@gmail.com> Date: Sat, 22 Jan 2022 18:34:54 -0500 Subject: [PATCH v4 3/4] Changed queries --- src/test/regress/expected/partition_prune.out | 104 +++++++++--------- src/test/regress/expected/select_parallel.out | 26 +++-- 2 files changed, 69 insertions(+), 61 deletions(-) diff --git a/src/test/regress/expected/partition_prune.out b/src/test/regress/expected/partition_prune.out index 7555764c77..5c45f9c0a5 100644 --- a/src/test/regress/expected/partition_prune.out +++ b/src/test/regress/expected/partition_prune.out @@ -1284,60 +1284,64 @@ EXPLAIN (VERBOSE, COSTS OFF) SELECT * FROM part p(x) ORDER BY x; -- -- pruning won't work for mc3p, because some keys are Params explain (costs off) select * from mc2p t1, lateral (select count(*) from mc3p t2 where t2.a = t1.b and abs(t2.b) = 1 and t2.c = 1) s where t1.a = 1; - QUERY PLAN ------------------------------------------------------------------------ - Nested Loop - -> Append - -> Seq Scan on mc2p1 t1_1 - Filter: (a = 1) - -> Seq Scan on mc2p2 t1_2 - Filter: (a = 1) - -> Seq Scan on mc2p_default t1_3 - Filter: (a = 1) - -> Aggregate - -> Append - -> Seq Scan on mc3p0 t2_1 - Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1)) - -> Seq Scan on mc3p1 t2_2 - Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1)) - -> Seq Scan on mc3p2 t2_3 - Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1)) - -> Seq Scan on mc3p3 t2_4 - Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1)) - -> Seq Scan on mc3p4 t2_5 - Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1)) - -> Seq Scan on mc3p5 t2_6 - Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1)) - -> Seq Scan on mc3p6 t2_7 - Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1)) - -> Seq Scan on mc3p7 t2_8 - Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1)) - -> Seq Scan on mc3p_default t2_9 - Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1)) -(28 rows) + QUERY PLAN +----------------------------------------------------------------------------- + Gather + Workers Planned: 2 + -> Nested Loop + -> Parallel Append + -> Parallel Seq Scan on mc2p1 t1_1 + Filter: (a = 1) + -> Parallel Seq Scan on mc2p2 t1_2 + Filter: (a = 1) + -> Parallel Seq Scan on mc2p_default t1_3 + Filter: (a = 1) + -> Aggregate + -> Append + -> Seq Scan on mc3p0 t2_1 + Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1)) + -> Seq Scan on mc3p1 t2_2 + Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1)) + -> Seq Scan on mc3p2 t2_3 + Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1)) + -> Seq Scan on mc3p3 t2_4 + Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1)) + -> Seq Scan on mc3p4 t2_5 + Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1)) + -> Seq Scan on mc3p5 t2_6 + Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1)) + -> Seq Scan on mc3p6 t2_7 + Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1)) + -> Seq Scan on mc3p7 t2_8 + Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1)) + -> Seq Scan on mc3p_default t2_9 + Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1)) +(30 rows) -- pruning should work fine, because values for a prefix of keys (a, b) are -- available explain (costs off) select * from mc2p t1, lateral (select count(*) from mc3p t2 where t2.c = t1.b and abs(t2.b) = 1 and t2.a = 1) s where t1.a = 1; - QUERY PLAN ------------------------------------------------------------------------ - Nested Loop - -> Append - -> Seq Scan on mc2p1 t1_1 - Filter: (a = 1) - -> Seq Scan on mc2p2 t1_2 - Filter: (a = 1) - -> Seq Scan on mc2p_default t1_3 - Filter: (a = 1) - -> Aggregate - -> Append - -> Seq Scan on mc3p0 t2_1 - Filter: ((c = t1.b) AND (a = 1) AND (abs(b) = 1)) - -> Seq Scan on mc3p1 t2_2 - Filter: ((c = t1.b) AND (a = 1) AND (abs(b) = 1)) - -> Seq Scan on mc3p_default t2_3 - Filter: ((c = t1.b) AND (a = 1) AND (abs(b) = 1)) -(16 rows) + QUERY PLAN +----------------------------------------------------------------------------- + Gather + Workers Planned: 2 + -> Nested Loop + -> Parallel Append + -> Parallel Seq Scan on mc2p1 t1_1 + Filter: (a = 1) + -> Parallel Seq Scan on mc2p2 t1_2 + Filter: (a = 1) + -> Parallel Seq Scan on mc2p_default t1_3 + Filter: (a = 1) + -> Aggregate + -> Append + -> Seq Scan on mc3p0 t2_1 + Filter: ((c = t1.b) AND (a = 1) AND (abs(b) = 1)) + -> Seq Scan on mc3p1 t2_2 + Filter: ((c = t1.b) AND (a = 1) AND (abs(b) = 1)) + -> Seq Scan on mc3p_default t2_3 + Filter: ((c = t1.b) AND (a = 1) AND (abs(b) = 1)) +(18 rows) -- also here, because values for all keys are provided explain (costs off) select * from mc2p t1, lateral (select count(*) from mc3p t2 where t2.a = 1 and abs(t2.b) = 1 and t2.c = 1) s where t1.a = 1; diff --git a/src/test/regress/expected/select_parallel.out b/src/test/regress/expected/select_parallel.out index 124fe9fec5..9bb60c2c1e 100644 --- a/src/test/regress/expected/select_parallel.out +++ b/src/test/regress/expected/select_parallel.out @@ -137,8 +137,8 @@ create table part_pa_test_p2 partition of part_pa_test for values from (0) to (m explain (costs off) select (select max((select pa1.b from part_pa_test pa1 where pa1.a = pa2.a))) from part_pa_test pa2; - QUERY PLAN --------------------------------------------------------------- + QUERY PLAN +---------------------------------------------------------------- Aggregate -> Gather Workers Planned: 3 @@ -148,12 +148,14 @@ explain (costs off) SubPlan 2 -> Result SubPlan 1 - -> Append - -> Seq Scan on part_pa_test_p1 pa1_1 - Filter: (a = pa2.a) - -> Seq Scan on part_pa_test_p2 pa1_2 - Filter: (a = pa2.a) -(14 rows) + -> Gather + Workers Planned: 3 + -> Parallel Append + -> Parallel Seq Scan on part_pa_test_p1 pa1_1 + Filter: (a = pa2.a) + -> Parallel Seq Scan on part_pa_test_p2 pa1_2 + Filter: (a = pa2.a) +(16 rows) drop table part_pa_test; -- test with leader participation disabled @@ -1330,8 +1332,10 @@ SELECT 1 FROM tenk1_vw_sec -> Parallel Index Only Scan using tenk1_unique1 on tenk1 SubPlan 1 -> Aggregate - -> Seq Scan on int4_tbl - Filter: (f1 < tenk1_vw_sec.unique1) -(9 rows) + -> Gather + Workers Planned: 1 + -> Parallel Seq Scan on int4_tbl + Filter: (f1 < tenk1_vw_sec.unique1) +(11 rows) rollback; -- 2.17.1
From c11d38b8a88e8f7a46528712fc9341de34171f8d Mon Sep 17 00:00:00 2001 From: jcoleman <jtc...@gmail.com> Date: Sat, 22 Jan 2022 17:33:23 -0500 Subject: [PATCH v4 4/4] Possible additional checks --- src/backend/optimizer/path/allpaths.c | 33 ++++++++++++++++++++++----- 1 file changed, 27 insertions(+), 6 deletions(-) diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c index 776f002054..7e30824320 100644 --- a/src/backend/optimizer/path/allpaths.c +++ b/src/backend/optimizer/path/allpaths.c @@ -2682,11 +2682,16 @@ generate_gather_paths(PlannerInfo *root, RelOptInfo *rel, bool override_rows) ListCell *lc; double rows; double *rowsp = NULL; + Relids required_outer = rel->lateral_relids; /* If there are no partial paths, there's nothing to do here. */ if (rel->partial_pathlist == NIL) return; + if (!bms_is_subset(required_outer, rel->relids)) + return; + + /* Should we override the rel's rowcount estimate? */ if (override_rows) rowsp = &rows; @@ -2697,12 +2702,16 @@ generate_gather_paths(PlannerInfo *root, RelOptInfo *rel, bool override_rows) * of partial_pathlist because of the way add_partial_path works. */ cheapest_partial_path = linitial(rel->partial_pathlist); - rows = - cheapest_partial_path->rows * cheapest_partial_path->parallel_workers; - simple_gather_path = (Path *) - create_gather_path(root, rel, cheapest_partial_path, rel->reltarget, - rel->lateral_relids, rowsp); - add_path(rel, simple_gather_path); + if (cheapest_partial_path->param_info == NULL || + bms_is_subset(cheapest_partial_path->param_info->ppi_req_outer, rel->relids)) + { + rows = + cheapest_partial_path->rows * cheapest_partial_path->parallel_workers; + simple_gather_path = (Path *) + create_gather_path(root, rel, cheapest_partial_path, rel->reltarget, + rel->lateral_relids, rowsp); + add_path(rel, simple_gather_path); + } /* * For each useful ordering, we can consider an order-preserving Gather @@ -2716,6 +2725,10 @@ generate_gather_paths(PlannerInfo *root, RelOptInfo *rel, bool override_rows) if (subpath->pathkeys == NIL) continue; + if (subpath->param_info != NULL && + !bms_is_subset(subpath->param_info->ppi_req_outer, rel->relids)) + break; + rows = subpath->rows * subpath->parallel_workers; path = create_gather_merge_path(root, rel, subpath, rel->reltarget, subpath->pathkeys, rel->lateral_relids, rowsp); @@ -2888,6 +2901,10 @@ generate_useful_gather_paths(PlannerInfo *root, RelOptInfo *rel, bool override_r { Path *tmp; + if (subpath->param_info != NULL && + !bms_is_subset(subpath->param_info->ppi_req_outer, rel->relids)) + break; + tmp = (Path *) create_sort_path(root, rel, subpath, @@ -2916,6 +2933,10 @@ generate_useful_gather_paths(PlannerInfo *root, RelOptInfo *rel, bool override_r { Path *tmp; + if (subpath->param_info != NULL && + !bms_is_subset(subpath->param_info->ppi_req_outer, rel->relids)) + break; + /* * We should have already excluded pathkeys of length 1 * because then presorted_keys > 0 would imply is_sorted was -- 2.17.1
From ba785f2abca7c9f5199f0fc27c3b71f6d1d8010b Mon Sep 17 00:00:00 2001 From: jcoleman <jtc...@gmail.com> Date: Fri, 21 Jan 2022 22:38:46 -0500 Subject: [PATCH v4 2/4] Parallelize correlated subqueries When params are provided at the current query level (i.e., are generated within a single worker and not shared across workers) we can safely execute these in parallel. Alternative approach using just relids subset check --- doc/src/sgml/parallel.sgml | 3 +- src/backend/optimizer/path/allpaths.c | 18 ++- src/backend/optimizer/path/joinpath.c | 16 ++- src/backend/optimizer/util/clauses.c | 3 + src/backend/optimizer/util/pathnode.c | 2 + src/backend/utils/misc/guc.c | 1 + src/include/nodes/pathnodes.h | 2 +- .../regress/expected/incremental_sort.out | 28 ++-- src/test/regress/expected/select_parallel.out | 125 ++++++++++++++++++ src/test/regress/sql/select_parallel.sql | 25 ++++ 10 files changed, 197 insertions(+), 26 deletions(-) diff --git a/doc/src/sgml/parallel.sgml b/doc/src/sgml/parallel.sgml index 13479d7e5e..2d924dd2ac 100644 --- a/doc/src/sgml/parallel.sgml +++ b/doc/src/sgml/parallel.sgml @@ -517,7 +517,8 @@ EXPLAIN SELECT * FROM pgbench_accounts WHERE filler LIKE '%x%'; <listitem> <para> - Plan nodes that reference a correlated <literal>SubPlan</literal>. + Plan nodes that reference a correlated <literal>SubPlan</literal> where + the result is shared between workers. </para> </listitem> </itemizedlist> diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c index 6a581e20fa..776f002054 100644 --- a/src/backend/optimizer/path/allpaths.c +++ b/src/backend/optimizer/path/allpaths.c @@ -556,7 +556,8 @@ set_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, * (see grouping_planner). */ if (rel->reloptkind == RELOPT_BASEREL && - bms_membership(root->all_baserels) != BMS_SINGLETON) + bms_membership(root->all_baserels) != BMS_SINGLETON + && (rel->subplan_params == NIL || rte->rtekind != RTE_SUBQUERY)) generate_useful_gather_paths(root, rel, false); /* Now find the cheapest of the paths for this rel */ @@ -2700,7 +2701,7 @@ generate_gather_paths(PlannerInfo *root, RelOptInfo *rel, bool override_rows) cheapest_partial_path->rows * cheapest_partial_path->parallel_workers; simple_gather_path = (Path *) create_gather_path(root, rel, cheapest_partial_path, rel->reltarget, - NULL, rowsp); + rel->lateral_relids, rowsp); add_path(rel, simple_gather_path); /* @@ -2717,7 +2718,7 @@ generate_gather_paths(PlannerInfo *root, RelOptInfo *rel, bool override_rows) rows = subpath->rows * subpath->parallel_workers; path = create_gather_merge_path(root, rel, subpath, rel->reltarget, - subpath->pathkeys, NULL, rowsp); + subpath->pathkeys, rel->lateral_relids, rowsp); add_path(rel, &path->path); } } @@ -2819,11 +2820,15 @@ generate_useful_gather_paths(PlannerInfo *root, RelOptInfo *rel, bool override_r double *rowsp = NULL; List *useful_pathkeys_list = NIL; Path *cheapest_partial_path = NULL; + Relids required_outer = rel->lateral_relids; /* If there are no partial paths, there's nothing to do here. */ if (rel->partial_pathlist == NIL) return; + if (!bms_is_subset(required_outer, rel->relids)) + return; + /* Should we override the rel's rowcount estimate? */ if (override_rows) rowsp = &rows; @@ -2895,7 +2900,7 @@ generate_useful_gather_paths(PlannerInfo *root, RelOptInfo *rel, bool override_r tmp, rel->reltarget, tmp->pathkeys, - NULL, + required_outer, rowsp); add_path(rel, &path->path); @@ -2929,7 +2934,7 @@ generate_useful_gather_paths(PlannerInfo *root, RelOptInfo *rel, bool override_r tmp, rel->reltarget, tmp->pathkeys, - NULL, + required_outer, rowsp); add_path(rel, &path->path); @@ -3108,7 +3113,8 @@ standard_join_search(PlannerInfo *root, int levels_needed, List *initial_rels) /* * Except for the topmost scan/join rel, consider gathering * partial paths. We'll do the same for the topmost scan/join rel - * once we know the final targetlist (see grouping_planner). + * once we know the final targetlist (see + * apply_scanjoin_target_to_paths). */ if (lev < levels_needed) generate_useful_gather_paths(root, rel, false); diff --git a/src/backend/optimizer/path/joinpath.c b/src/backend/optimizer/path/joinpath.c index f96fc9fd28..e85b5449ea 100644 --- a/src/backend/optimizer/path/joinpath.c +++ b/src/backend/optimizer/path/joinpath.c @@ -1791,16 +1791,24 @@ match_unsorted_outer(PlannerInfo *root, * partial path and the joinrel is parallel-safe. However, we can't * handle JOIN_UNIQUE_OUTER, because the outer path will be partial, and * therefore we won't be able to properly guarantee uniqueness. Nor can - * we handle joins needing lateral rels, since partial paths must not be - * parameterized. Similarly, we can't handle JOIN_FULL and JOIN_RIGHT, - * because they can produce false null extended rows. + * we handle JOIN_FULL and JOIN_RIGHT, because they can produce false null + * extended rows. + * + * Partial paths may only have parameters in limited cases + * where the parameterization is fully satisfied without sharing state + * between workers, so we only allow lateral rels on inputs to the join + * if the resulting join contains no lateral rels, the inner rel's laterals + * are fully satisfied by the outer rel, and the outer rel doesn't depend + * on the inner rel to produce any laterals. */ if (joinrel->consider_parallel && save_jointype != JOIN_UNIQUE_OUTER && save_jointype != JOIN_FULL && save_jointype != JOIN_RIGHT && outerrel->partial_pathlist != NIL && - bms_is_empty(joinrel->lateral_relids)) + bms_is_empty(joinrel->lateral_relids) && + bms_is_subset(innerrel->lateral_relids, outerrel->relids) && + (bms_is_empty(outerrel->lateral_relids) || !bms_is_subset(outerrel->lateral_relids, innerrel->relids))) { if (nestjoinOK) consider_parallel_nestloop(root, joinrel, outerrel, innerrel, diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c index a707dc9f26..f0002f6887 100644 --- a/src/backend/optimizer/util/clauses.c +++ b/src/backend/optimizer/util/clauses.c @@ -822,6 +822,9 @@ max_parallel_hazard_walker(Node *node, max_parallel_hazard_context *context) if (param->paramkind == PARAM_EXTERN) return false; + if (param->paramkind == PARAM_EXEC) + return false; + if (param->paramkind != PARAM_EXEC || !list_member_int(context->safe_param_ids, param->paramid)) { diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c index 5c32c96b71..144b2c485d 100644 --- a/src/backend/optimizer/util/pathnode.c +++ b/src/backend/optimizer/util/pathnode.c @@ -2418,6 +2418,8 @@ create_nestloop_path(PlannerInfo *root, NestPath *pathnode = makeNode(NestPath); Relids inner_req_outer = PATH_REQ_OUTER(inner_path); + /* TODO: Assert lateral relids subset safety? */ + /* * If the inner path is parameterized by the outer, we must drop any * restrict_clauses that are due to be moved into the inner path. We have diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index effb9d03a0..52cd3512b3 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -58,6 +58,7 @@ #include "libpq/libpq.h" #include "libpq/pqformat.h" #include "miscadmin.h" +#include "optimizer/clauses.h" #include "optimizer/cost.h" #include "optimizer/geqo.h" #include "optimizer/optimizer.h" diff --git a/src/include/nodes/pathnodes.h b/src/include/nodes/pathnodes.h index 1f33fe13c1..75681d6fb9 100644 --- a/src/include/nodes/pathnodes.h +++ b/src/include/nodes/pathnodes.h @@ -2478,7 +2478,7 @@ typedef struct MinMaxAggInfo * for conflicting purposes. * * In addition, PARAM_EXEC slots are assigned for Params representing outputs - * from subplans (values that are setParam items for those subplans). These + * from subplans (values that are setParam items for those subplans). [TODO: is this true, or only for init plans?] These * IDs need not be tracked via PlannerParamItems, since we do not need any * duplicate-elimination nor later processing of the represented expressions. * Instead, we just record the assignment of the slot number by appending to diff --git a/src/test/regress/expected/incremental_sort.out b/src/test/regress/expected/incremental_sort.out index 545e301e48..8f9ca05e60 100644 --- a/src/test/regress/expected/incremental_sort.out +++ b/src/test/regress/expected/incremental_sort.out @@ -1614,16 +1614,16 @@ from tenk1 t, generate_series(1, 1000); QUERY PLAN --------------------------------------------------------------------------------- Unique - -> Sort - Sort Key: t.unique1, ((SubPlan 1)) - -> Gather - Workers Planned: 2 + -> Gather Merge + Workers Planned: 2 + -> Sort + Sort Key: t.unique1, ((SubPlan 1)) -> Nested Loop -> Parallel Index Only Scan using tenk1_unique1 on tenk1 t -> Function Scan on generate_series - SubPlan 1 - -> Index Only Scan using tenk1_unique1 on tenk1 - Index Cond: (unique1 = t.unique1) + SubPlan 1 + -> Index Only Scan using tenk1_unique1 on tenk1 + Index Cond: (unique1 = t.unique1) (11 rows) explain (costs off) select @@ -1633,16 +1633,16 @@ from tenk1 t, generate_series(1, 1000) order by 1, 2; QUERY PLAN --------------------------------------------------------------------------- - Sort - Sort Key: t.unique1, ((SubPlan 1)) - -> Gather - Workers Planned: 2 + Gather Merge + Workers Planned: 2 + -> Sort + Sort Key: t.unique1, ((SubPlan 1)) -> Nested Loop -> Parallel Index Only Scan using tenk1_unique1 on tenk1 t -> Function Scan on generate_series - SubPlan 1 - -> Index Only Scan using tenk1_unique1 on tenk1 - Index Cond: (unique1 = t.unique1) + SubPlan 1 + -> Index Only Scan using tenk1_unique1 on tenk1 + Index Cond: (unique1 = t.unique1) (10 rows) -- Parallel sort but with expression not available until the upper rel. diff --git a/src/test/regress/expected/select_parallel.out b/src/test/regress/expected/select_parallel.out index 2303f70d6e..124fe9fec5 100644 --- a/src/test/regress/expected/select_parallel.out +++ b/src/test/regress/expected/select_parallel.out @@ -311,6 +311,131 @@ select count(*) from tenk1 where (two, four) not in 10000 (1 row) +-- test parallel plans for queries containing correlated subplans +-- where the subplan only needs params available from the current +-- worker's scan. +explain (costs off, verbose) select + (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1) + from tenk1 t, generate_series(1, 10); + QUERY PLAN +---------------------------------------------------------------------------- + Gather + Output: ((SubPlan 1)) + Workers Planned: 4 + -> Nested Loop + Output: (SubPlan 1) + -> Parallel Index Only Scan using tenk1_unique1 on public.tenk1 t + Output: t.unique1 + -> Function Scan on pg_catalog.generate_series + Output: generate_series.generate_series + Function Call: generate_series(1, 10) + SubPlan 1 + -> Index Only Scan using tenk1_unique1 on public.tenk1 + Output: t.unique1 + Index Cond: (tenk1.unique1 = t.unique1) +(14 rows) + +explain (costs off, verbose) select + (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1) + from tenk1 t; + QUERY PLAN +---------------------------------------------------------------------- + Gather + Output: ((SubPlan 1)) + Workers Planned: 4 + -> Parallel Index Only Scan using tenk1_unique1 on public.tenk1 t + Output: (SubPlan 1) + SubPlan 1 + -> Index Only Scan using tenk1_unique1 on public.tenk1 + Output: t.unique1 + Index Cond: (tenk1.unique1 = t.unique1) +(9 rows) + +explain (analyze, costs off, summary off, verbose, timing off) select + (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1) + from tenk1 t + limit 1; + QUERY PLAN +---------------------------------------------------------------------------------------------------- + Limit (actual rows=1 loops=1) + Output: ((SubPlan 1)) + -> Gather (actual rows=1 loops=1) + Output: ((SubPlan 1)) + Workers Planned: 4 + Workers Launched: 4 + -> Parallel Index Only Scan using tenk1_unique1 on public.tenk1 t (actual rows=1 loops=5) + Output: (SubPlan 1) + Heap Fetches: 0 + Worker 0: actual rows=1 loops=1 + Worker 1: actual rows=1 loops=1 + Worker 2: actual rows=1 loops=1 + Worker 3: actual rows=1 loops=1 + SubPlan 1 + -> Index Only Scan using tenk1_unique1 on public.tenk1 (actual rows=1 loops=5) + Output: t.unique1 + Index Cond: (tenk1.unique1 = t.unique1) + Heap Fetches: 0 + Worker 0: actual rows=1 loops=1 + Worker 1: actual rows=1 loops=1 + Worker 2: actual rows=1 loops=1 + Worker 3: actual rows=1 loops=1 +(22 rows) + +explain (costs off, verbose) select t.unique1 + from tenk1 t + where t.unique1 = (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1); + QUERY PLAN +---------------------------------------------------------------------- + Gather + Output: t.unique1 + Workers Planned: 4 + -> Parallel Index Only Scan using tenk1_unique1 on public.tenk1 t + Output: t.unique1 + Filter: (t.unique1 = (SubPlan 1)) + SubPlan 1 + -> Index Only Scan using tenk1_unique1 on public.tenk1 + Output: t.unique1 + Index Cond: (tenk1.unique1 = t.unique1) +(10 rows) + +explain (costs off, verbose) select * + from tenk1 t + order by (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1); + QUERY PLAN +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Gather Merge + Output: t.unique1, t.unique2, t.two, t.four, t.ten, t.twenty, t.hundred, t.thousand, t.twothousand, t.fivethous, t.tenthous, t.odd, t.even, t.stringu1, t.stringu2, t.string4, ((SubPlan 1)) + Workers Planned: 4 + -> Sort + Output: t.unique1, t.unique2, t.two, t.four, t.ten, t.twenty, t.hundred, t.thousand, t.twothousand, t.fivethous, t.tenthous, t.odd, t.even, t.stringu1, t.stringu2, t.string4, ((SubPlan 1)) + Sort Key: ((SubPlan 1)) + -> Parallel Seq Scan on public.tenk1 t + Output: t.unique1, t.unique2, t.two, t.four, t.ten, t.twenty, t.hundred, t.thousand, t.twothousand, t.fivethous, t.tenthous, t.odd, t.even, t.stringu1, t.stringu2, t.string4, (SubPlan 1) + SubPlan 1 + -> Index Only Scan using tenk1_unique1 on public.tenk1 + Output: t.unique1 + Index Cond: (tenk1.unique1 = t.unique1) +(12 rows) + +-- test subplan in join/lateral join +explain (costs off, verbose, timing off) select t.unique1, l.* + from tenk1 t + join lateral ( + select (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1 offset 0) + ) l on true; + QUERY PLAN +---------------------------------------------------------------------- + Gather + Output: t.unique1, ((SubPlan 1)) + Workers Planned: 4 + -> Parallel Index Only Scan using tenk1_unique1 on public.tenk1 t + Output: t.unique1, (SubPlan 1) + SubPlan 1 + -> Index Only Scan using tenk1_unique1 on public.tenk1 + Output: t.unique1 + Index Cond: (tenk1.unique1 = t.unique1) +(9 rows) + -- this is not parallel-safe due to use of random() within SubLink's testexpr: explain (costs off) select * from tenk1 where (unique1 + random())::integer not in diff --git a/src/test/regress/sql/select_parallel.sql b/src/test/regress/sql/select_parallel.sql index 019e17e751..c49799b6d4 100644 --- a/src/test/regress/sql/select_parallel.sql +++ b/src/test/regress/sql/select_parallel.sql @@ -111,6 +111,31 @@ explain (costs off) (select hundred, thousand from tenk2 where thousand > 100); select count(*) from tenk1 where (two, four) not in (select hundred, thousand from tenk2 where thousand > 100); +-- test parallel plans for queries containing correlated subplans +-- where the subplan only needs params available from the current +-- worker's scan. +explain (costs off, verbose) select + (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1) + from tenk1 t, generate_series(1, 10); +explain (costs off, verbose) select + (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1) + from tenk1 t; +explain (analyze, costs off, summary off, verbose, timing off) select + (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1) + from tenk1 t + limit 1; +explain (costs off, verbose) select t.unique1 + from tenk1 t + where t.unique1 = (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1); +explain (costs off, verbose) select * + from tenk1 t + order by (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1); +-- test subplan in join/lateral join +explain (costs off, verbose, timing off) select t.unique1, l.* + from tenk1 t + join lateral ( + select (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1 offset 0) + ) l on true; -- this is not parallel-safe due to use of random() within SubLink's testexpr: explain (costs off) select * from tenk1 where (unique1 + random())::integer not in -- 2.17.1