Good day, hackers.

Problem:
- Append path is created with explicitely parallel_aware = true
- It has two child, one is trivial, other is parallel_aware = false .
  Trivial child is dropped.
- Gather/GatherMerge path takes Append path as a child and thinks
  its child is parallel_aware = true.
- But Append path is removed at the last since it has only one child.
- Now Gather/GatherMerge thinks its child is parallel_aware, but it
  is not.
  Gather/GatherMerge runs its child twice: in a worker and in a leader,
  and gathers same rows twice.

Reproduction code attached (repro.sql. Included as a test as well).

Suggested quick (and valid) fix in the patch attached:
- If Append has single child, then copy its parallel awareness.

Bug were introduced with commit 8edd0e79460b414b1d971895312e549e95e12e4f
"Suppress Append and MergeAppend plan nodes that have a single child."

During discussion, it were supposed [1] those fields should be copied:

> I haven't looked into whether this does the right things for parallel
> planning --- possibly create_[merge]append_path need to propagate up
> parallel-related path fields from the single child?

But it were not so obvious [2].

Better fix could contain removing Gather/GatherMerge node as well if
its child is not parallel aware.

Bug is reported in 
https://postgr.es/m/flat/17335-4dc92e1aea3a78af%40postgresql.org
Since no way to add thread from pgsql-bugs to commitfest, I write here.

[1] https://postgr.es/m/17500.1551669976%40sss.pgh.pa.us
[2] 
https://postgr.es/m/CAKJS1f_Wt_tL3S32R3wpU86zQjuHfbnZbFt0eqm%3DqcRFcdbLvw%40mail.gmail.com

---- 
regards
Yura Sokolov
y.soko...@postgrespro.ru
funny.fal...@gmail.com
From 47c6e161de4fc9d2d6eff45f427ebf49b4c9d11c Mon Sep 17 00:00:00 2001
From: Yura Sokolov <y.soko...@postgrespro.ru>
Date: Mon, 20 Dec 2021 11:48:10 +0300
Subject: [PATCH v1] Quick fix to duplicate result rows after Append path
 removal.

It could happen Append path is created with "parallel_aware" flag,
but its single child is not. Append path parent (Gather or Gather Merge)
thinks its child is parallel_aware, but after Append path removal Gather's
child become not parallel_aware. Then when Gather/Gather Merge decides
to run child in several workers or worker + leader participation, it
gathers duplicate result rows from several child path invocations.

With this fix Append path copies its single child parallel_aware / cost /
workers values.

Copied `num_workers == 0` triggers assert `num_workers > 0` in
cost_gather_merge function. But changing this assert to `num_workers >= 0`
doesn't lead to any runtime and/or logical error.

Fixes bug 17335
https://postgr.es/m/flat/17335-4dc92e1aea3a78af%40postgresql.org
---
 src/backend/optimizer/path/costsize.c         |   2 +-
 src/backend/optimizer/util/pathnode.c         |   3 +
 .../expected/gather_removed_append.out        | 131 ++++++++++++++++++
 src/test/regress/parallel_schedule            |   1 +
 .../regress/sql/gather_removed_append.sql     |  82 +++++++++++
 5 files changed, 218 insertions(+), 1 deletion(-)
 create mode 100644 src/test/regress/expected/gather_removed_append.out
 create mode 100644 src/test/regress/sql/gather_removed_append.sql

diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c
index 1e4d404f024..9949c3ab555 100644
--- a/src/backend/optimizer/path/costsize.c
+++ b/src/backend/optimizer/path/costsize.c
@@ -440,7 +440,7 @@ cost_gather_merge(GatherMergePath *path, PlannerInfo *root,
 	 * be overgenerous since the leader will do less work than other workers
 	 * in typical cases, but we'll go with it for now.
 	 */
-	Assert(path->num_workers > 0);
+	Assert(path->num_workers >= 0);
 	N = (double) path->num_workers + 1;
 	logN = LOG2(N);
 
diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c
index af5e8df26b4..2ff4678937a 100644
--- a/src/backend/optimizer/util/pathnode.c
+++ b/src/backend/optimizer/util/pathnode.c
@@ -1340,6 +1340,9 @@ create_append_path(PlannerInfo *root,
 		pathnode->path.startup_cost = child->startup_cost;
 		pathnode->path.total_cost = child->total_cost;
 		pathnode->path.pathkeys = child->pathkeys;
+		pathnode->path.parallel_aware = child->parallel_aware;
+		pathnode->path.parallel_safe = child->parallel_safe;
+		pathnode->path.parallel_workers = child->parallel_workers;
 	}
 	else
 		cost_append(pathnode);
diff --git a/src/test/regress/expected/gather_removed_append.out b/src/test/regress/expected/gather_removed_append.out
new file mode 100644
index 00000000000..f6e861ce59d
--- /dev/null
+++ b/src/test/regress/expected/gather_removed_append.out
@@ -0,0 +1,131 @@
+-- Test correctness of parallel query execution after removal
+-- of Append path due to single non-trivial child.
+DROP TABLE IF EXISTS gather_append_1, gather_append_2;
+NOTICE:  table "gather_append_1" does not exist, skipping
+NOTICE:  table "gather_append_2" does not exist, skipping
+CREATE TABLE gather_append_1 (
+    fk int,
+    f bool
+);
+INSERT INTO gather_append_1 (fk, f) SELECT i, i%50=0 from generate_series(1, 2000) as i;
+CREATE INDEX gather_append_1_ix on gather_append_1 (f);
+CREATE TABLE gather_append_2 (
+    fk int,
+    val serial
+);
+INSERT INTO gather_append_2 (fk) SELECT fk from gather_append_1, generate_series(1, 5) as i;
+ANALYZE gather_append_1, gather_append_2;
+SET max_parallel_workers_per_gather = 0;
+-- Find correct rows count
+SELECT count(1)
+FROM (
+  SELECT fk FROM gather_append_1 WHERE f
+  UNION ALL
+  SELECT fk FROM gather_append_1 WHERE false
+) as t
+LEFT OUTER JOIN gather_append_2
+USING (fk);
+ count 
+-------
+   200
+(1 row)
+
+SET parallel_setup_cost = 0;
+SET parallel_tuple_cost = 0.1;
+SET min_parallel_table_scan_size = 0;
+SET max_parallel_workers_per_gather = 2;
+SELECT count(1)
+FROM (
+  SELECT fk FROM gather_append_1 WHERE f
+  UNION ALL
+  SELECT fk FROM gather_append_1 WHERE false
+) as t
+LEFT OUTER JOIN gather_append_2
+USING (fk);
+ count 
+-------
+   200
+(1 row)
+
+EXPLAIN (ANALYZE true, COSTS false, TIMING false, SUMMARY false, VERBOSE false, BUFFERS false)
+SELECT count(1)
+FROM (
+  SELECT fk FROM gather_append_1 WHERE f
+  UNION ALL
+  SELECT fk FROM gather_append_1 WHERE false
+) as t
+LEFT OUTER JOIN gather_append_2
+USING (fk);
+                                               QUERY PLAN                                                
+---------------------------------------------------------------------------------------------------------
+ Finalize Aggregate (actual rows=1 loops=1)
+   ->  Gather (actual rows=1 loops=1)
+         Workers Planned: 1
+         Workers Launched: 1
+         Single Copy: true
+         ->  Partial Aggregate (actual rows=1 loops=1)
+               ->  Parallel Hash Left Join (actual rows=200 loops=1)
+                     Hash Cond: (gather_append_1.fk = gather_append_2.fk)
+                     ->  Index Scan using gather_append_1_ix on gather_append_1 (actual rows=40 loops=1)
+                           Index Cond: (f = true)
+                     ->  Parallel Hash (actual rows=10000 loops=1)
+                           Buckets: 16384  Batches: 1  Memory Usage: 544kB
+                           ->  Parallel Seq Scan on gather_append_2 (actual rows=10000 loops=1)
+(13 rows)
+
+-- Result rows in root node should be equal to non-parallel count
+EXPLAIN (ANALYZE true, COSTS false, TIMING false, SUMMARY false, VERBOSE false, BUFFERS false)
+SELECT val
+FROM (
+  SELECT fk FROM gather_append_1 WHERE f
+  UNION ALL
+  SELECT fk FROM gather_append_1 WHERE false
+) as t
+LEFT OUTER JOIN gather_append_2
+USING (fk)
+ORDER BY val;
+                                            QUERY PLAN                                             
+---------------------------------------------------------------------------------------------------
+ Gather Merge (actual rows=200 loops=1)
+   Workers Planned: 0
+   Workers Launched: 0
+   ->  Sort (actual rows=200 loops=1)
+         Sort Key: gather_append_2.val
+         Sort Method: quicksort  Memory: 25kB
+         ->  Parallel Hash Left Join (actual rows=200 loops=1)
+               Hash Cond: (gather_append_1.fk = gather_append_2.fk)
+               ->  Index Scan using gather_append_1_ix on gather_append_1 (actual rows=40 loops=1)
+                     Index Cond: (f = true)
+               ->  Parallel Hash (actual rows=10000 loops=1)
+                     Buckets: 16384  Batches: 1  Memory Usage: 519kB
+                     ->  Parallel Seq Scan on gather_append_2 (actual rows=10000 loops=1)
+(13 rows)
+
+-- Result rows in root node should be equal to non-parallel count
+EXPLAIN (ANALYZE true, COSTS false, TIMING false, SUMMARY false, VERBOSE false, BUFFERS false)
+SELECT val
+FROM (
+  SELECT fk FROM gather_append_1 WHERE f
+  UNION ALL
+  SELECT fk FROM gather_append_1 WHERE false
+) as t
+LEFT OUTER JOIN gather_append_2
+USING (fk)
+ORDER BY val;
+                                            QUERY PLAN                                             
+---------------------------------------------------------------------------------------------------
+ Gather Merge (actual rows=200 loops=1)
+   Workers Planned: 0
+   Workers Launched: 0
+   ->  Sort (actual rows=200 loops=1)
+         Sort Key: gather_append_2.val
+         Sort Method: quicksort  Memory: 25kB
+         ->  Parallel Hash Left Join (actual rows=200 loops=1)
+               Hash Cond: (gather_append_1.fk = gather_append_2.fk)
+               ->  Index Scan using gather_append_1_ix on gather_append_1 (actual rows=40 loops=1)
+                     Index Cond: (f = true)
+               ->  Parallel Hash (actual rows=10000 loops=1)
+                     Buckets: 16384  Batches: 1  Memory Usage: 519kB
+                     ->  Parallel Seq Scan on gather_append_2 (actual rows=10000 loops=1)
+(13 rows)
+
diff --git a/src/test/regress/parallel_schedule b/src/test/regress/parallel_schedule
index 5b0c73d7e37..84f2f81255d 100644
--- a/src/test/regress/parallel_schedule
+++ b/src/test/regress/parallel_schedule
@@ -100,6 +100,7 @@ test: rules psql psql_crosstab amutils stats_ext collate.linux.utf8
 test: select_parallel
 test: write_parallel
 test: vacuum_parallel
+test: gather_removed_append
 
 # no relation related tests can be put in this group
 test: publication subscription
diff --git a/src/test/regress/sql/gather_removed_append.sql b/src/test/regress/sql/gather_removed_append.sql
new file mode 100644
index 00000000000..df1b796a4f6
--- /dev/null
+++ b/src/test/regress/sql/gather_removed_append.sql
@@ -0,0 +1,82 @@
+-- Test correctness of parallel query execution after removal
+-- of Append path due to single non-trivial child.
+
+DROP TABLE IF EXISTS gather_append_1, gather_append_2;
+
+CREATE TABLE gather_append_1 (
+    fk int,
+    f bool
+);
+
+INSERT INTO gather_append_1 (fk, f) SELECT i, i%50=0 from generate_series(1, 2000) as i;
+
+CREATE INDEX gather_append_1_ix on gather_append_1 (f);
+
+CREATE TABLE gather_append_2 (
+    fk int,
+    val serial
+);
+
+INSERT INTO gather_append_2 (fk) SELECT fk from gather_append_1, generate_series(1, 5) as i;
+
+ANALYZE gather_append_1, gather_append_2;
+
+SET max_parallel_workers_per_gather = 0;
+
+-- Find correct rows count
+SELECT count(1)
+FROM (
+  SELECT fk FROM gather_append_1 WHERE f
+  UNION ALL
+  SELECT fk FROM gather_append_1 WHERE false
+) as t
+LEFT OUTER JOIN gather_append_2
+USING (fk);
+
+SET parallel_setup_cost = 0;
+SET parallel_tuple_cost = 0.1;
+SET min_parallel_table_scan_size = 0;
+SET max_parallel_workers_per_gather = 2;
+
+SELECT count(1)
+FROM (
+  SELECT fk FROM gather_append_1 WHERE f
+  UNION ALL
+  SELECT fk FROM gather_append_1 WHERE false
+) as t
+LEFT OUTER JOIN gather_append_2
+USING (fk);
+
+EXPLAIN (ANALYZE true, COSTS false, TIMING false, SUMMARY false, VERBOSE false, BUFFERS false)
+SELECT count(1)
+FROM (
+  SELECT fk FROM gather_append_1 WHERE f
+  UNION ALL
+  SELECT fk FROM gather_append_1 WHERE false
+) as t
+LEFT OUTER JOIN gather_append_2
+USING (fk);
+
+-- Result rows in root node should be equal to non-parallel count
+EXPLAIN (ANALYZE true, COSTS false, TIMING false, SUMMARY false, VERBOSE false, BUFFERS false)
+SELECT val
+FROM (
+  SELECT fk FROM gather_append_1 WHERE f
+  UNION ALL
+  SELECT fk FROM gather_append_1 WHERE false
+) as t
+LEFT OUTER JOIN gather_append_2
+USING (fk)
+ORDER BY val;
+
+-- Result rows in root node should be equal to non-parallel count
+EXPLAIN (ANALYZE true, COSTS false, TIMING false, SUMMARY false, VERBOSE false, BUFFERS false)
+SELECT val
+FROM (
+  SELECT fk FROM gather_append_1 WHERE f
+  UNION ALL
+  SELECT fk FROM gather_append_1 WHERE false
+) as t
+LEFT OUTER JOIN gather_append_2
+USING (fk)
+ORDER BY val;
-- 
2.34.1

Attachment: repro.sql
Description: application/sql

Reply via email to