On Fri, Jan 21, 2022 at 3:20 PM Robert Haas <robertmh...@gmail.com> wrote:
>
> On Fri, Jan 14, 2022 at 2:25 PM James Coleman <jtc...@gmail.com> wrote:
> > I've been chewing on this a bit, and I was about to go re-read the
> > code and see how easy it'd be to do exactly what you're suggesting in
> > generate_gather_paths() (and verifying it doesn't need to happen in
> > other places). However there's one (I think large) gotcha with that
> > approach (assuming it otherwise makes sense): it means we do
> > unnecessary work. In the current patch series we only need to recheck
> > parallel safety if we're in a situation where we might actually
> > benefit from doing that work (namely when we have a correlated
> > subquery we might otherwise be able to execute in a parallel plan). If
> > we don't track that status we'd have to recheck the full parallel
> > safety of the path for all paths -- even without correlated
> > subqueries.
>
> I don't think there's an intrinsic problem with the idea of making a
> tentative determination about parallel safety and then refining it
> later, but I'm not sure why you think it would be a lot of work to
> figure this out at the point where we generate gather paths. I think
> it's just a matter of testing whether the set of parameters that the
> path needs as input is the empty set. It may be that neither extParam
> nor allParam are precisely that thing, but I think both are very
> close, and it seems to me that there's no theoretical reason why we
> can't know for every path the set of inputs that it requires "from the
> outside."

As I understand it now (not sure I realized this before) you're
suggesting that *even when there are required params* marking it as
parallel safe, and then checking the params for parallel safety later.
>From a purely theoretical perspective that seemed reasonable, so I
took a pass at that approach.

The first, and likely most interesting, thing I discovered was that
the vast majority of what the patch accomplishes it does so not via
the delayed params safety checking but rather via the required outer
relids checks I'd added to generate_useful_gather_paths.

For that to happen I did have to mark PARAM_EXEC params as presumed
parallel safe. That means that parallel_safe now doesn't strictly mean
"parallel safe in the current context" but "parallel safe as long as
any params are provided". That's a real change, but probably
acceptable as long as a project policy decision is made in that
direction.

There are a few concerns I have (and I'm not sure what level they rise to):

1. From what I can tell we don't have access on a path to the set of
params required by that path (I believe this is what Tom was
referencing in his sister reply at this point in the thread). That
means we have to rely on checking that the required outer relids are
provided by the current query level. I'm not quite sure yet whether or
not that guarantees (or if the rest of the path construction logic
guarantees for us) that the params provided by the outer rel are used
in a correlated way that isn't shared across workers. And because we
don't have the param information available we can't add additional
checks (that I can tell) to verify that.
2. Are we excluding any paths (by having one that will always be
invalid win the cost comparisons in add_partial_path)? I suppose this
danger actually exists in the previous version of the patch as well,
and I don't actually have any examples of this being a problem. Also
maybe this can only be a problem if (1) reveals a bug.
3. The new patch series actually ends up allowing parallelization of
correlated params in a few more places than the original patch series.
>From what I can tell all of the cases are in fact safe to execute in
parallel, which, if true, means this is a feature not a concern. The
changed query plans fall into two categories: a.) putting a gather
inside a subplan and b.) correlated param usages in a subquery scan
path on the inner side of a join. I've separated out those specific
changes in a separate patch to make it easier to tell which ones I'm
referencing.

On the other hand this is a dramatically simpler patch series.
Assuming the approach is sound, it should much easier to maintain than
the previous version.

The final patch in the series is a set of additional checks I could
imagine to try to be more explicit, but at least in the current test
suite there isn't anything at all they affect.

Does this look at least somewhat more like what you'd envisionsed
(granting the need to squint hard given the relids checks instead of
directly checking params)?

Thanks,
James Coleman
From 6596bb6909b9dfaadd6e6e834be4032e7903c4e9 Mon Sep 17 00:00:00 2001
From: jcoleman <jtc...@gmail.com>
Date: Mon, 30 Nov 2020 11:36:35 -0500
Subject: [PATCH v4 1/4] Allow parallel LATERAL subqueries with LIMIT/OFFSET

The code that determined whether or not a rel should be considered for
parallel query excluded subqueries with LIMIT/OFFSET. That's correct in
the general case: as the comment notes that'd mean we have to guarantee
ordering (and claims it's not worth checking that) for results to be
consistent across workers. However there's a simpler case that hasn't
been considered: LATERAL subqueries with LIMIT/OFFSET don't fall under
the same reasoning since they're executed (when not converted to a JOIN)
per tuple anyway, so consistency of results across workers isn't a
factor.
---
 src/backend/optimizer/path/allpaths.c         |  4 +++-
 src/test/regress/expected/select_parallel.out | 15 +++++++++++++++
 src/test/regress/sql/select_parallel.sql      |  6 ++++++
 3 files changed, 24 insertions(+), 1 deletion(-)

diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c
index 169b1d53fc..6a581e20fa 100644
--- a/src/backend/optimizer/path/allpaths.c
+++ b/src/backend/optimizer/path/allpaths.c
@@ -682,11 +682,13 @@ set_rel_consider_parallel(PlannerInfo *root, RelOptInfo *rel,
 			 * inconsistent results at the top-level.  (In some cases, where
 			 * the result is ordered, we could relax this restriction.  But it
 			 * doesn't currently seem worth expending extra effort to do so.)
+			 * LATERAL is an exception: LIMIT/OFFSET is safe to execute within
+			 * workers since the sub-select is executed per tuple
 			 */
 			{
 				Query	   *subquery = castNode(Query, rte->subquery);
 
-				if (limit_needed(subquery))
+				if (!rte->lateral && limit_needed(subquery))
 					return;
 			}
 			break;
diff --git a/src/test/regress/expected/select_parallel.out b/src/test/regress/expected/select_parallel.out
index 4ea1aa7dfd..2303f70d6e 100644
--- a/src/test/regress/expected/select_parallel.out
+++ b/src/test/regress/expected/select_parallel.out
@@ -1040,6 +1040,21 @@ explain (costs off)
                            Filter: (stringu1 ~~ '%AAAA'::text)
 (11 rows)
 
+-- ...unless it's LATERAL
+savepoint settings;
+set parallel_tuple_cost=0;
+explain (costs off) select t.unique1 from tenk1 t
+join lateral (select t.unique1 from tenk1 offset 0) l on true;
+                             QUERY PLAN                              
+---------------------------------------------------------------------
+ Gather
+   Workers Planned: 4
+   ->  Nested Loop
+         ->  Parallel Index Only Scan using tenk1_unique1 on tenk1 t
+         ->  Index Only Scan using tenk1_hundred on tenk1
+(5 rows)
+
+rollback to savepoint settings;
 -- to increase the parallel query test coverage
 SAVEPOINT settings;
 SET LOCAL force_parallel_mode = 1;
diff --git a/src/test/regress/sql/select_parallel.sql b/src/test/regress/sql/select_parallel.sql
index f924731248..019e17e751 100644
--- a/src/test/regress/sql/select_parallel.sql
+++ b/src/test/regress/sql/select_parallel.sql
@@ -390,6 +390,12 @@ explain (costs off, verbose)
 explain (costs off)
   select * from tenk1 a where two in
     (select two from tenk1 b where stringu1 like '%AAAA' limit 3);
+-- ...unless it's LATERAL
+savepoint settings;
+set parallel_tuple_cost=0;
+explain (costs off) select t.unique1 from tenk1 t
+join lateral (select t.unique1 from tenk1 offset 0) l on true;
+rollback to savepoint settings;
 
 -- to increase the parallel query test coverage
 SAVEPOINT settings;
-- 
2.17.1

From 200c4ed4118f92014253b49faa41a76ecec84c96 Mon Sep 17 00:00:00 2001
From: jcoleman <jtc...@gmail.com>
Date: Sat, 22 Jan 2022 18:34:54 -0500
Subject: [PATCH v4 3/4] Changed queries

---
 src/test/regress/expected/partition_prune.out | 104 +++++++++---------
 src/test/regress/expected/select_parallel.out |  26 +++--
 2 files changed, 69 insertions(+), 61 deletions(-)

diff --git a/src/test/regress/expected/partition_prune.out b/src/test/regress/expected/partition_prune.out
index 7555764c77..5c45f9c0a5 100644
--- a/src/test/regress/expected/partition_prune.out
+++ b/src/test/regress/expected/partition_prune.out
@@ -1284,60 +1284,64 @@ EXPLAIN (VERBOSE, COSTS OFF) SELECT * FROM part p(x) ORDER BY x;
 --
 -- pruning won't work for mc3p, because some keys are Params
 explain (costs off) select * from mc2p t1, lateral (select count(*) from mc3p t2 where t2.a = t1.b and abs(t2.b) = 1 and t2.c = 1) s where t1.a = 1;
-                              QUERY PLAN                               
------------------------------------------------------------------------
- Nested Loop
-   ->  Append
-         ->  Seq Scan on mc2p1 t1_1
-               Filter: (a = 1)
-         ->  Seq Scan on mc2p2 t1_2
-               Filter: (a = 1)
-         ->  Seq Scan on mc2p_default t1_3
-               Filter: (a = 1)
-   ->  Aggregate
-         ->  Append
-               ->  Seq Scan on mc3p0 t2_1
-                     Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1))
-               ->  Seq Scan on mc3p1 t2_2
-                     Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1))
-               ->  Seq Scan on mc3p2 t2_3
-                     Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1))
-               ->  Seq Scan on mc3p3 t2_4
-                     Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1))
-               ->  Seq Scan on mc3p4 t2_5
-                     Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1))
-               ->  Seq Scan on mc3p5 t2_6
-                     Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1))
-               ->  Seq Scan on mc3p6 t2_7
-                     Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1))
-               ->  Seq Scan on mc3p7 t2_8
-                     Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1))
-               ->  Seq Scan on mc3p_default t2_9
-                     Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1))
-(28 rows)
+                                 QUERY PLAN                                  
+-----------------------------------------------------------------------------
+ Gather
+   Workers Planned: 2
+   ->  Nested Loop
+         ->  Parallel Append
+               ->  Parallel Seq Scan on mc2p1 t1_1
+                     Filter: (a = 1)
+               ->  Parallel Seq Scan on mc2p2 t1_2
+                     Filter: (a = 1)
+               ->  Parallel Seq Scan on mc2p_default t1_3
+                     Filter: (a = 1)
+         ->  Aggregate
+               ->  Append
+                     ->  Seq Scan on mc3p0 t2_1
+                           Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1))
+                     ->  Seq Scan on mc3p1 t2_2
+                           Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1))
+                     ->  Seq Scan on mc3p2 t2_3
+                           Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1))
+                     ->  Seq Scan on mc3p3 t2_4
+                           Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1))
+                     ->  Seq Scan on mc3p4 t2_5
+                           Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1))
+                     ->  Seq Scan on mc3p5 t2_6
+                           Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1))
+                     ->  Seq Scan on mc3p6 t2_7
+                           Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1))
+                     ->  Seq Scan on mc3p7 t2_8
+                           Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1))
+                     ->  Seq Scan on mc3p_default t2_9
+                           Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1))
+(30 rows)
 
 -- pruning should work fine, because values for a prefix of keys (a, b) are
 -- available
 explain (costs off) select * from mc2p t1, lateral (select count(*) from mc3p t2 where t2.c = t1.b and abs(t2.b) = 1 and t2.a = 1) s where t1.a = 1;
-                              QUERY PLAN                               
------------------------------------------------------------------------
- Nested Loop
-   ->  Append
-         ->  Seq Scan on mc2p1 t1_1
-               Filter: (a = 1)
-         ->  Seq Scan on mc2p2 t1_2
-               Filter: (a = 1)
-         ->  Seq Scan on mc2p_default t1_3
-               Filter: (a = 1)
-   ->  Aggregate
-         ->  Append
-               ->  Seq Scan on mc3p0 t2_1
-                     Filter: ((c = t1.b) AND (a = 1) AND (abs(b) = 1))
-               ->  Seq Scan on mc3p1 t2_2
-                     Filter: ((c = t1.b) AND (a = 1) AND (abs(b) = 1))
-               ->  Seq Scan on mc3p_default t2_3
-                     Filter: ((c = t1.b) AND (a = 1) AND (abs(b) = 1))
-(16 rows)
+                                 QUERY PLAN                                  
+-----------------------------------------------------------------------------
+ Gather
+   Workers Planned: 2
+   ->  Nested Loop
+         ->  Parallel Append
+               ->  Parallel Seq Scan on mc2p1 t1_1
+                     Filter: (a = 1)
+               ->  Parallel Seq Scan on mc2p2 t1_2
+                     Filter: (a = 1)
+               ->  Parallel Seq Scan on mc2p_default t1_3
+                     Filter: (a = 1)
+         ->  Aggregate
+               ->  Append
+                     ->  Seq Scan on mc3p0 t2_1
+                           Filter: ((c = t1.b) AND (a = 1) AND (abs(b) = 1))
+                     ->  Seq Scan on mc3p1 t2_2
+                           Filter: ((c = t1.b) AND (a = 1) AND (abs(b) = 1))
+                     ->  Seq Scan on mc3p_default t2_3
+                           Filter: ((c = t1.b) AND (a = 1) AND (abs(b) = 1))
+(18 rows)
 
 -- also here, because values for all keys are provided
 explain (costs off) select * from mc2p t1, lateral (select count(*) from mc3p t2 where t2.a = 1 and abs(t2.b) = 1 and t2.c = 1) s where t1.a = 1;
diff --git a/src/test/regress/expected/select_parallel.out b/src/test/regress/expected/select_parallel.out
index 124fe9fec5..9bb60c2c1e 100644
--- a/src/test/regress/expected/select_parallel.out
+++ b/src/test/regress/expected/select_parallel.out
@@ -137,8 +137,8 @@ create table part_pa_test_p2 partition of part_pa_test for values from (0) to (m
 explain (costs off)
 	select (select max((select pa1.b from part_pa_test pa1 where pa1.a = pa2.a)))
 	from part_pa_test pa2;
-                          QUERY PLAN                          
---------------------------------------------------------------
+                           QUERY PLAN                           
+----------------------------------------------------------------
  Aggregate
    ->  Gather
          Workers Planned: 3
@@ -148,12 +148,14 @@ explain (costs off)
    SubPlan 2
      ->  Result
    SubPlan 1
-     ->  Append
-           ->  Seq Scan on part_pa_test_p1 pa1_1
-                 Filter: (a = pa2.a)
-           ->  Seq Scan on part_pa_test_p2 pa1_2
-                 Filter: (a = pa2.a)
-(14 rows)
+     ->  Gather
+           Workers Planned: 3
+           ->  Parallel Append
+                 ->  Parallel Seq Scan on part_pa_test_p1 pa1_1
+                       Filter: (a = pa2.a)
+                 ->  Parallel Seq Scan on part_pa_test_p2 pa1_2
+                       Filter: (a = pa2.a)
+(16 rows)
 
 drop table part_pa_test;
 -- test with leader participation disabled
@@ -1330,8 +1332,10 @@ SELECT 1 FROM tenk1_vw_sec
          ->  Parallel Index Only Scan using tenk1_unique1 on tenk1
    SubPlan 1
      ->  Aggregate
-           ->  Seq Scan on int4_tbl
-                 Filter: (f1 < tenk1_vw_sec.unique1)
-(9 rows)
+           ->  Gather
+                 Workers Planned: 1
+                 ->  Parallel Seq Scan on int4_tbl
+                       Filter: (f1 < tenk1_vw_sec.unique1)
+(11 rows)
 
 rollback;
-- 
2.17.1

From c11d38b8a88e8f7a46528712fc9341de34171f8d Mon Sep 17 00:00:00 2001
From: jcoleman <jtc...@gmail.com>
Date: Sat, 22 Jan 2022 17:33:23 -0500
Subject: [PATCH v4 4/4] Possible additional checks

---
 src/backend/optimizer/path/allpaths.c | 33 ++++++++++++++++++++++-----
 1 file changed, 27 insertions(+), 6 deletions(-)

diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c
index 776f002054..7e30824320 100644
--- a/src/backend/optimizer/path/allpaths.c
+++ b/src/backend/optimizer/path/allpaths.c
@@ -2682,11 +2682,16 @@ generate_gather_paths(PlannerInfo *root, RelOptInfo *rel, bool override_rows)
 	ListCell   *lc;
 	double		rows;
 	double	   *rowsp = NULL;
+	Relids		required_outer = rel->lateral_relids;
 
 	/* If there are no partial paths, there's nothing to do here. */
 	if (rel->partial_pathlist == NIL)
 		return;
 
+	if (!bms_is_subset(required_outer, rel->relids))
+		return;
+
+
 	/* Should we override the rel's rowcount estimate? */
 	if (override_rows)
 		rowsp = &rows;
@@ -2697,12 +2702,16 @@ generate_gather_paths(PlannerInfo *root, RelOptInfo *rel, bool override_rows)
 	 * of partial_pathlist because of the way add_partial_path works.
 	 */
 	cheapest_partial_path = linitial(rel->partial_pathlist);
-	rows =
-		cheapest_partial_path->rows * cheapest_partial_path->parallel_workers;
-	simple_gather_path = (Path *)
-		create_gather_path(root, rel, cheapest_partial_path, rel->reltarget,
-						   rel->lateral_relids, rowsp);
-	add_path(rel, simple_gather_path);
+	if (cheapest_partial_path->param_info == NULL ||
+			bms_is_subset(cheapest_partial_path->param_info->ppi_req_outer, rel->relids))
+	{
+		rows =
+			cheapest_partial_path->rows * cheapest_partial_path->parallel_workers;
+		simple_gather_path = (Path *)
+			create_gather_path(root, rel, cheapest_partial_path, rel->reltarget,
+							   rel->lateral_relids, rowsp);
+		add_path(rel, simple_gather_path);
+	}
 
 	/*
 	 * For each useful ordering, we can consider an order-preserving Gather
@@ -2716,6 +2725,10 @@ generate_gather_paths(PlannerInfo *root, RelOptInfo *rel, bool override_rows)
 		if (subpath->pathkeys == NIL)
 			continue;
 
+		if (subpath->param_info != NULL &&
+			!bms_is_subset(subpath->param_info->ppi_req_outer, rel->relids))
+			break;
+
 		rows = subpath->rows * subpath->parallel_workers;
 		path = create_gather_merge_path(root, rel, subpath, rel->reltarget,
 										subpath->pathkeys, rel->lateral_relids, rowsp);
@@ -2888,6 +2901,10 @@ generate_useful_gather_paths(PlannerInfo *root, RelOptInfo *rel, bool override_r
 			{
 				Path	   *tmp;
 
+				if (subpath->param_info != NULL &&
+					!bms_is_subset(subpath->param_info->ppi_req_outer, rel->relids))
+					break;
+
 				tmp = (Path *) create_sort_path(root,
 												rel,
 												subpath,
@@ -2916,6 +2933,10 @@ generate_useful_gather_paths(PlannerInfo *root, RelOptInfo *rel, bool override_r
 			{
 				Path	   *tmp;
 
+				if (subpath->param_info != NULL &&
+					!bms_is_subset(subpath->param_info->ppi_req_outer, rel->relids))
+					break;
+
 				/*
 				 * We should have already excluded pathkeys of length 1
 				 * because then presorted_keys > 0 would imply is_sorted was
-- 
2.17.1

From ba785f2abca7c9f5199f0fc27c3b71f6d1d8010b Mon Sep 17 00:00:00 2001
From: jcoleman <jtc...@gmail.com>
Date: Fri, 21 Jan 2022 22:38:46 -0500
Subject: [PATCH v4 2/4] Parallelize correlated subqueries

When params are provided at the current query level (i.e., are generated
within a single worker and not shared across workers) we can safely
execute these in parallel.

Alternative approach using just relids subset check
---
 doc/src/sgml/parallel.sgml                    |   3 +-
 src/backend/optimizer/path/allpaths.c         |  18 ++-
 src/backend/optimizer/path/joinpath.c         |  16 ++-
 src/backend/optimizer/util/clauses.c          |   3 +
 src/backend/optimizer/util/pathnode.c         |   2 +
 src/backend/utils/misc/guc.c                  |   1 +
 src/include/nodes/pathnodes.h                 |   2 +-
 .../regress/expected/incremental_sort.out     |  28 ++--
 src/test/regress/expected/select_parallel.out | 125 ++++++++++++++++++
 src/test/regress/sql/select_parallel.sql      |  25 ++++
 10 files changed, 197 insertions(+), 26 deletions(-)

diff --git a/doc/src/sgml/parallel.sgml b/doc/src/sgml/parallel.sgml
index 13479d7e5e..2d924dd2ac 100644
--- a/doc/src/sgml/parallel.sgml
+++ b/doc/src/sgml/parallel.sgml
@@ -517,7 +517,8 @@ EXPLAIN SELECT * FROM pgbench_accounts WHERE filler LIKE '%x%';
 
     <listitem>
       <para>
-        Plan nodes that reference a correlated <literal>SubPlan</literal>.
+        Plan nodes that reference a correlated <literal>SubPlan</literal> where
+        the result is shared between workers.
       </para>
     </listitem>
   </itemizedlist>
diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c
index 6a581e20fa..776f002054 100644
--- a/src/backend/optimizer/path/allpaths.c
+++ b/src/backend/optimizer/path/allpaths.c
@@ -556,7 +556,8 @@ set_rel_pathlist(PlannerInfo *root, RelOptInfo *rel,
 	 * (see grouping_planner).
 	 */
 	if (rel->reloptkind == RELOPT_BASEREL &&
-		bms_membership(root->all_baserels) != BMS_SINGLETON)
+		bms_membership(root->all_baserels) != BMS_SINGLETON
+		&& (rel->subplan_params == NIL || rte->rtekind != RTE_SUBQUERY))
 		generate_useful_gather_paths(root, rel, false);
 
 	/* Now find the cheapest of the paths for this rel */
@@ -2700,7 +2701,7 @@ generate_gather_paths(PlannerInfo *root, RelOptInfo *rel, bool override_rows)
 		cheapest_partial_path->rows * cheapest_partial_path->parallel_workers;
 	simple_gather_path = (Path *)
 		create_gather_path(root, rel, cheapest_partial_path, rel->reltarget,
-						   NULL, rowsp);
+						   rel->lateral_relids, rowsp);
 	add_path(rel, simple_gather_path);
 
 	/*
@@ -2717,7 +2718,7 @@ generate_gather_paths(PlannerInfo *root, RelOptInfo *rel, bool override_rows)
 
 		rows = subpath->rows * subpath->parallel_workers;
 		path = create_gather_merge_path(root, rel, subpath, rel->reltarget,
-										subpath->pathkeys, NULL, rowsp);
+										subpath->pathkeys, rel->lateral_relids, rowsp);
 		add_path(rel, &path->path);
 	}
 }
@@ -2819,11 +2820,15 @@ generate_useful_gather_paths(PlannerInfo *root, RelOptInfo *rel, bool override_r
 	double	   *rowsp = NULL;
 	List	   *useful_pathkeys_list = NIL;
 	Path	   *cheapest_partial_path = NULL;
+	Relids		required_outer = rel->lateral_relids;
 
 	/* If there are no partial paths, there's nothing to do here. */
 	if (rel->partial_pathlist == NIL)
 		return;
 
+	if (!bms_is_subset(required_outer, rel->relids))
+		return;
+
 	/* Should we override the rel's rowcount estimate? */
 	if (override_rows)
 		rowsp = &rows;
@@ -2895,7 +2900,7 @@ generate_useful_gather_paths(PlannerInfo *root, RelOptInfo *rel, bool override_r
 												tmp,
 												rel->reltarget,
 												tmp->pathkeys,
-												NULL,
+												required_outer,
 												rowsp);
 
 				add_path(rel, &path->path);
@@ -2929,7 +2934,7 @@ generate_useful_gather_paths(PlannerInfo *root, RelOptInfo *rel, bool override_r
 												tmp,
 												rel->reltarget,
 												tmp->pathkeys,
-												NULL,
+												required_outer,
 												rowsp);
 
 				add_path(rel, &path->path);
@@ -3108,7 +3113,8 @@ standard_join_search(PlannerInfo *root, int levels_needed, List *initial_rels)
 			/*
 			 * Except for the topmost scan/join rel, consider gathering
 			 * partial paths.  We'll do the same for the topmost scan/join rel
-			 * once we know the final targetlist (see grouping_planner).
+			 * once we know the final targetlist (see
+			 * apply_scanjoin_target_to_paths).
 			 */
 			if (lev < levels_needed)
 				generate_useful_gather_paths(root, rel, false);
diff --git a/src/backend/optimizer/path/joinpath.c b/src/backend/optimizer/path/joinpath.c
index f96fc9fd28..e85b5449ea 100644
--- a/src/backend/optimizer/path/joinpath.c
+++ b/src/backend/optimizer/path/joinpath.c
@@ -1791,16 +1791,24 @@ match_unsorted_outer(PlannerInfo *root,
 	 * partial path and the joinrel is parallel-safe.  However, we can't
 	 * handle JOIN_UNIQUE_OUTER, because the outer path will be partial, and
 	 * therefore we won't be able to properly guarantee uniqueness.  Nor can
-	 * we handle joins needing lateral rels, since partial paths must not be
-	 * parameterized. Similarly, we can't handle JOIN_FULL and JOIN_RIGHT,
-	 * because they can produce false null extended rows.
+	 * we handle JOIN_FULL and JOIN_RIGHT, because they can produce false null
+	 * extended rows.
+	 *
+	 * Partial paths may only have parameters in limited cases
+	 * where the parameterization is fully satisfied without sharing state
+	 * between workers, so we only allow lateral rels on inputs to the join
+	 * if the resulting join contains no lateral rels, the inner rel's laterals
+	 * are fully satisfied by the outer rel, and the outer rel doesn't depend
+	 * on the inner rel to produce any laterals.
 	 */
 	if (joinrel->consider_parallel &&
 		save_jointype != JOIN_UNIQUE_OUTER &&
 		save_jointype != JOIN_FULL &&
 		save_jointype != JOIN_RIGHT &&
 		outerrel->partial_pathlist != NIL &&
-		bms_is_empty(joinrel->lateral_relids))
+		bms_is_empty(joinrel->lateral_relids) &&
+		bms_is_subset(innerrel->lateral_relids, outerrel->relids) &&
+		(bms_is_empty(outerrel->lateral_relids) || !bms_is_subset(outerrel->lateral_relids, innerrel->relids)))
 	{
 		if (nestjoinOK)
 			consider_parallel_nestloop(root, joinrel, outerrel, innerrel,
diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c
index a707dc9f26..f0002f6887 100644
--- a/src/backend/optimizer/util/clauses.c
+++ b/src/backend/optimizer/util/clauses.c
@@ -822,6 +822,9 @@ max_parallel_hazard_walker(Node *node, max_parallel_hazard_context *context)
 		if (param->paramkind == PARAM_EXTERN)
 			return false;
 
+		if (param->paramkind == PARAM_EXEC)
+			return false;
+
 		if (param->paramkind != PARAM_EXEC ||
 			!list_member_int(context->safe_param_ids, param->paramid))
 		{
diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c
index 5c32c96b71..144b2c485d 100644
--- a/src/backend/optimizer/util/pathnode.c
+++ b/src/backend/optimizer/util/pathnode.c
@@ -2418,6 +2418,8 @@ create_nestloop_path(PlannerInfo *root,
 	NestPath   *pathnode = makeNode(NestPath);
 	Relids		inner_req_outer = PATH_REQ_OUTER(inner_path);
 
+	/* TODO: Assert lateral relids subset safety? */
+
 	/*
 	 * If the inner path is parameterized by the outer, we must drop any
 	 * restrict_clauses that are due to be moved into the inner path.  We have
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index effb9d03a0..52cd3512b3 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -58,6 +58,7 @@
 #include "libpq/libpq.h"
 #include "libpq/pqformat.h"
 #include "miscadmin.h"
+#include "optimizer/clauses.h"
 #include "optimizer/cost.h"
 #include "optimizer/geqo.h"
 #include "optimizer/optimizer.h"
diff --git a/src/include/nodes/pathnodes.h b/src/include/nodes/pathnodes.h
index 1f33fe13c1..75681d6fb9 100644
--- a/src/include/nodes/pathnodes.h
+++ b/src/include/nodes/pathnodes.h
@@ -2478,7 +2478,7 @@ typedef struct MinMaxAggInfo
  * for conflicting purposes.
  *
  * In addition, PARAM_EXEC slots are assigned for Params representing outputs
- * from subplans (values that are setParam items for those subplans).  These
+ * from subplans (values that are setParam items for those subplans). [TODO: is this true, or only for init plans?]  These
  * IDs need not be tracked via PlannerParamItems, since we do not need any
  * duplicate-elimination nor later processing of the represented expressions.
  * Instead, we just record the assignment of the slot number by appending to
diff --git a/src/test/regress/expected/incremental_sort.out b/src/test/regress/expected/incremental_sort.out
index 545e301e48..8f9ca05e60 100644
--- a/src/test/regress/expected/incremental_sort.out
+++ b/src/test/regress/expected/incremental_sort.out
@@ -1614,16 +1614,16 @@ from tenk1 t, generate_series(1, 1000);
                                    QUERY PLAN                                    
 ---------------------------------------------------------------------------------
  Unique
-   ->  Sort
-         Sort Key: t.unique1, ((SubPlan 1))
-         ->  Gather
-               Workers Planned: 2
+   ->  Gather Merge
+         Workers Planned: 2
+         ->  Sort
+               Sort Key: t.unique1, ((SubPlan 1))
                ->  Nested Loop
                      ->  Parallel Index Only Scan using tenk1_unique1 on tenk1 t
                      ->  Function Scan on generate_series
-               SubPlan 1
-                 ->  Index Only Scan using tenk1_unique1 on tenk1
-                       Index Cond: (unique1 = t.unique1)
+                     SubPlan 1
+                       ->  Index Only Scan using tenk1_unique1 on tenk1
+                             Index Cond: (unique1 = t.unique1)
 (11 rows)
 
 explain (costs off) select
@@ -1633,16 +1633,16 @@ from tenk1 t, generate_series(1, 1000)
 order by 1, 2;
                                 QUERY PLAN                                 
 ---------------------------------------------------------------------------
- Sort
-   Sort Key: t.unique1, ((SubPlan 1))
-   ->  Gather
-         Workers Planned: 2
+ Gather Merge
+   Workers Planned: 2
+   ->  Sort
+         Sort Key: t.unique1, ((SubPlan 1))
          ->  Nested Loop
                ->  Parallel Index Only Scan using tenk1_unique1 on tenk1 t
                ->  Function Scan on generate_series
-         SubPlan 1
-           ->  Index Only Scan using tenk1_unique1 on tenk1
-                 Index Cond: (unique1 = t.unique1)
+               SubPlan 1
+                 ->  Index Only Scan using tenk1_unique1 on tenk1
+                       Index Cond: (unique1 = t.unique1)
 (10 rows)
 
 -- Parallel sort but with expression not available until the upper rel.
diff --git a/src/test/regress/expected/select_parallel.out b/src/test/regress/expected/select_parallel.out
index 2303f70d6e..124fe9fec5 100644
--- a/src/test/regress/expected/select_parallel.out
+++ b/src/test/regress/expected/select_parallel.out
@@ -311,6 +311,131 @@ select count(*) from tenk1 where (two, four) not in
  10000
 (1 row)
 
+-- test parallel plans for queries containing correlated subplans
+-- where the subplan only needs params available from the current
+-- worker's scan.
+explain (costs off, verbose) select
+  (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1)
+  from tenk1 t, generate_series(1, 10);
+                                 QUERY PLAN                                 
+----------------------------------------------------------------------------
+ Gather
+   Output: ((SubPlan 1))
+   Workers Planned: 4
+   ->  Nested Loop
+         Output: (SubPlan 1)
+         ->  Parallel Index Only Scan using tenk1_unique1 on public.tenk1 t
+               Output: t.unique1
+         ->  Function Scan on pg_catalog.generate_series
+               Output: generate_series.generate_series
+               Function Call: generate_series(1, 10)
+         SubPlan 1
+           ->  Index Only Scan using tenk1_unique1 on public.tenk1
+                 Output: t.unique1
+                 Index Cond: (tenk1.unique1 = t.unique1)
+(14 rows)
+
+explain (costs off, verbose) select
+  (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1)
+  from tenk1 t;
+                              QUERY PLAN                              
+----------------------------------------------------------------------
+ Gather
+   Output: ((SubPlan 1))
+   Workers Planned: 4
+   ->  Parallel Index Only Scan using tenk1_unique1 on public.tenk1 t
+         Output: (SubPlan 1)
+         SubPlan 1
+           ->  Index Only Scan using tenk1_unique1 on public.tenk1
+                 Output: t.unique1
+                 Index Cond: (tenk1.unique1 = t.unique1)
+(9 rows)
+
+explain (analyze, costs off, summary off, verbose, timing off) select
+  (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1)
+  from tenk1 t
+  limit 1;
+                                             QUERY PLAN                                             
+----------------------------------------------------------------------------------------------------
+ Limit (actual rows=1 loops=1)
+   Output: ((SubPlan 1))
+   ->  Gather (actual rows=1 loops=1)
+         Output: ((SubPlan 1))
+         Workers Planned: 4
+         Workers Launched: 4
+         ->  Parallel Index Only Scan using tenk1_unique1 on public.tenk1 t (actual rows=1 loops=5)
+               Output: (SubPlan 1)
+               Heap Fetches: 0
+               Worker 0:  actual rows=1 loops=1
+               Worker 1:  actual rows=1 loops=1
+               Worker 2:  actual rows=1 loops=1
+               Worker 3:  actual rows=1 loops=1
+               SubPlan 1
+                 ->  Index Only Scan using tenk1_unique1 on public.tenk1 (actual rows=1 loops=5)
+                       Output: t.unique1
+                       Index Cond: (tenk1.unique1 = t.unique1)
+                       Heap Fetches: 0
+                       Worker 0:  actual rows=1 loops=1
+                       Worker 1:  actual rows=1 loops=1
+                       Worker 2:  actual rows=1 loops=1
+                       Worker 3:  actual rows=1 loops=1
+(22 rows)
+
+explain (costs off, verbose) select t.unique1
+  from tenk1 t
+  where t.unique1 = (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1);
+                              QUERY PLAN                              
+----------------------------------------------------------------------
+ Gather
+   Output: t.unique1
+   Workers Planned: 4
+   ->  Parallel Index Only Scan using tenk1_unique1 on public.tenk1 t
+         Output: t.unique1
+         Filter: (t.unique1 = (SubPlan 1))
+         SubPlan 1
+           ->  Index Only Scan using tenk1_unique1 on public.tenk1
+                 Output: t.unique1
+                 Index Cond: (tenk1.unique1 = t.unique1)
+(10 rows)
+
+explain (costs off, verbose) select *
+  from tenk1 t
+  order by (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1);
+                                                                                                QUERY PLAN                                                                                                
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ Gather Merge
+   Output: t.unique1, t.unique2, t.two, t.four, t.ten, t.twenty, t.hundred, t.thousand, t.twothousand, t.fivethous, t.tenthous, t.odd, t.even, t.stringu1, t.stringu2, t.string4, ((SubPlan 1))
+   Workers Planned: 4
+   ->  Sort
+         Output: t.unique1, t.unique2, t.two, t.four, t.ten, t.twenty, t.hundred, t.thousand, t.twothousand, t.fivethous, t.tenthous, t.odd, t.even, t.stringu1, t.stringu2, t.string4, ((SubPlan 1))
+         Sort Key: ((SubPlan 1))
+         ->  Parallel Seq Scan on public.tenk1 t
+               Output: t.unique1, t.unique2, t.two, t.four, t.ten, t.twenty, t.hundred, t.thousand, t.twothousand, t.fivethous, t.tenthous, t.odd, t.even, t.stringu1, t.stringu2, t.string4, (SubPlan 1)
+               SubPlan 1
+                 ->  Index Only Scan using tenk1_unique1 on public.tenk1
+                       Output: t.unique1
+                       Index Cond: (tenk1.unique1 = t.unique1)
+(12 rows)
+
+-- test subplan in join/lateral join
+explain (costs off, verbose, timing off) select t.unique1, l.*
+  from tenk1 t
+  join lateral (
+    select (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1 offset 0)
+  ) l on true;
+                              QUERY PLAN                              
+----------------------------------------------------------------------
+ Gather
+   Output: t.unique1, ((SubPlan 1))
+   Workers Planned: 4
+   ->  Parallel Index Only Scan using tenk1_unique1 on public.tenk1 t
+         Output: t.unique1, (SubPlan 1)
+         SubPlan 1
+           ->  Index Only Scan using tenk1_unique1 on public.tenk1
+                 Output: t.unique1
+                 Index Cond: (tenk1.unique1 = t.unique1)
+(9 rows)
+
 -- this is not parallel-safe due to use of random() within SubLink's testexpr:
 explain (costs off)
 	select * from tenk1 where (unique1 + random())::integer not in
diff --git a/src/test/regress/sql/select_parallel.sql b/src/test/regress/sql/select_parallel.sql
index 019e17e751..c49799b6d4 100644
--- a/src/test/regress/sql/select_parallel.sql
+++ b/src/test/regress/sql/select_parallel.sql
@@ -111,6 +111,31 @@ explain (costs off)
 	(select hundred, thousand from tenk2 where thousand > 100);
 select count(*) from tenk1 where (two, four) not in
 	(select hundred, thousand from tenk2 where thousand > 100);
+-- test parallel plans for queries containing correlated subplans
+-- where the subplan only needs params available from the current
+-- worker's scan.
+explain (costs off, verbose) select
+  (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1)
+  from tenk1 t, generate_series(1, 10);
+explain (costs off, verbose) select
+  (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1)
+  from tenk1 t;
+explain (analyze, costs off, summary off, verbose, timing off) select
+  (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1)
+  from tenk1 t
+  limit 1;
+explain (costs off, verbose) select t.unique1
+  from tenk1 t
+  where t.unique1 = (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1);
+explain (costs off, verbose) select *
+  from tenk1 t
+  order by (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1);
+-- test subplan in join/lateral join
+explain (costs off, verbose, timing off) select t.unique1, l.*
+  from tenk1 t
+  join lateral (
+    select (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1 offset 0)
+  ) l on true;
 -- this is not parallel-safe due to use of random() within SubLink's testexpr:
 explain (costs off)
 	select * from tenk1 where (unique1 + random())::integer not in
-- 
2.17.1

Reply via email to