[jira] [Commented] (SPARK-30876) Optimizer cannot infer from inferred constraints with join
[ https://issues.apache.org/jira/browse/SPARK-30876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17161616#comment-17161616 ] Apache Spark commented on SPARK-30876: -- User 'navinvishy' has created a pull request for this issue: https://github.com/apache/spark/pull/29170 > Optimizer cannot infer from inferred constraints with join > -- > > Key: SPARK-30876 > URL: https://issues.apache.org/jira/browse/SPARK-30876 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.1.0 >Reporter: Yuming Wang >Priority: Major > > How to reproduce this issue: > {code:sql} > create table t1(a int, b int, c int); > create table t2(a int, b int, c int); > create table t3(a int, b int, c int); > select count(*) from t1 join t2 join t3 on (t1.a = t2.b and t2.b = t3.c and > t3.c = 1); > {code} > Spark 2.3+: > {noformat} > == Physical Plan == > *(4) HashAggregate(keys=[], functions=[count(1)]) > +- Exchange SinglePartition, true, [id=#102] >+- *(3) HashAggregate(keys=[], functions=[partial_count(1)]) > +- *(3) Project > +- *(3) BroadcastHashJoin [b#10], [c#14], Inner, BuildRight > :- *(3) Project [b#10] > : +- *(3) BroadcastHashJoin [a#6], [b#10], Inner, BuildRight > : :- *(3) Project [a#6] > : : +- *(3) Filter isnotnull(a#6) > : : +- *(3) ColumnarToRow > : :+- FileScan parquet default.t1[a#6] Batched: true, > DataFilters: [isnotnull(a#6)], Format: Parquet, Location: > InMemoryFileIndex[file:/Users/yumwang/Downloads/spark-3.0.0-preview2-bin-hadoop2.7/spark-warehous..., > PartitionFilters: [], PushedFilters: [IsNotNull(a)], ReadSchema: > struct > : +- BroadcastExchange > HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), > [id=#87] > :+- *(1) Project [b#10] > : +- *(1) Filter (isnotnull(b#10) AND (b#10 = 1)) > : +- *(1) ColumnarToRow > : +- FileScan parquet default.t2[b#10] Batched: > true, DataFilters: [isnotnull(b#10), (b#10 = 1)], Format: Parquet, Location: > InMemoryFileIndex[file:/Users/yumwang/Downloads/spark-3.0.0-preview2-bin-hadoop2.7/spark-warehous..., > PartitionFilters: [], PushedFilters: [IsNotNull(b), EqualTo(b,1)], > ReadSchema: struct > +- BroadcastExchange > HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), > [id=#96] >+- *(2) Project [c#14] > +- *(2) Filter (isnotnull(c#14) AND (c#14 = 1)) > +- *(2) ColumnarToRow > +- FileScan parquet default.t3[c#14] Batched: true, > DataFilters: [isnotnull(c#14), (c#14 = 1)], Format: Parquet, Location: > InMemoryFileIndex[file:/Users/yumwang/Downloads/spark-3.0.0-preview2-bin-hadoop2.7/spark-warehous..., > PartitionFilters: [], PushedFilters: [IsNotNull(c), EqualTo(c,1)], > ReadSchema: struct > Time taken: 3.785 seconds, Fetched 1 row(s) > {noformat} > Spark 2.2.x: > {noformat} > == Physical Plan == > *HashAggregate(keys=[], functions=[count(1)]) > +- Exchange SinglePartition >+- *HashAggregate(keys=[], functions=[partial_count(1)]) > +- *Project > +- *SortMergeJoin [b#19], [c#23], Inner > :- *Project [b#19] > : +- *SortMergeJoin [a#15], [b#19], Inner > : :- *Sort [a#15 ASC NULLS FIRST], false, 0 > : : +- Exchange hashpartitioning(a#15, 200) > : : +- *Filter (isnotnull(a#15) && (a#15 = 1)) > : :+- HiveTableScan [a#15], HiveTableRelation > `default`.`t1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [a#15, > b#16, c#17] > : +- *Sort [b#19 ASC NULLS FIRST], false, 0 > :+- Exchange hashpartitioning(b#19, 200) > : +- *Filter (isnotnull(b#19) && (b#19 = 1)) > : +- HiveTableScan [b#19], HiveTableRelation > `default`.`t2`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [a#18, > b#19, c#20] > +- *Sort [c#23 ASC NULLS FIRST], false, 0 >+- Exchange hashpartitioning(c#23, 200) > +- *Filter (isnotnull(c#23) && (c#23 = 1)) > +- HiveTableScan [c#23], HiveTableRelation > `default`.`t3`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [a#21, > b#22, c#23] > Time taken: 0.728 seconds, Fetched 1 row(s) > {noformat} > Spark 2.2 can infer {{(a#15 = 1)}}, but Spark 2.3+ can't. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands,
[jira] [Commented] (SPARK-30876) Optimizer cannot infer from inferred constraints with join
[ https://issues.apache.org/jira/browse/SPARK-30876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17140898#comment-17140898 ] Navin Viswanath commented on SPARK-30876: - [~yumwang] would this be in the logical plan optimization? I was looking into the logical plans and got this. Unoptimized: {noformat} 'Filter ((('x.a = 'y.b) AND ('y.b = 'z.c)) AND ('z.c = 1)) +- 'Join Inner :- Join Inner : :- SubqueryAlias x : : +- LocalRelation , [a#0, b#1, c#2] : +- SubqueryAlias y : +- LocalRelation , [d#3] +- SubqueryAlias z +- LocalRelation , [a#0, b#1, c#2]{noformat} Optimized: {noformat} 'Filter ((('x.a = 'y.b) AND ('y.b = 'z.c)) AND ('z.c = 1)) +- 'Join Inner :- Join Inner : :- LocalRelation , [a#0, b#1, c#2] : +- LocalRelation , [d#3] +- LocalRelation , [a#0, b#1, c#2]{noformat} Or was this supposed to be in the physical plan? Any pointers would help. Thanks! > Optimizer cannot infer from inferred constraints with join > -- > > Key: SPARK-30876 > URL: https://issues.apache.org/jira/browse/SPARK-30876 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.1.0 >Reporter: Yuming Wang >Priority: Major > > How to reproduce this issue: > {code:sql} > create table t1(a int, b int, c int); > create table t2(a int, b int, c int); > create table t3(a int, b int, c int); > select count(*) from t1 join t2 join t3 on (t1.a = t2.b and t2.b = t3.c and > t3.c = 1); > {code} > Spark 2.3+: > {noformat} > == Physical Plan == > *(4) HashAggregate(keys=[], functions=[count(1)]) > +- Exchange SinglePartition, true, [id=#102] >+- *(3) HashAggregate(keys=[], functions=[partial_count(1)]) > +- *(3) Project > +- *(3) BroadcastHashJoin [b#10], [c#14], Inner, BuildRight > :- *(3) Project [b#10] > : +- *(3) BroadcastHashJoin [a#6], [b#10], Inner, BuildRight > : :- *(3) Project [a#6] > : : +- *(3) Filter isnotnull(a#6) > : : +- *(3) ColumnarToRow > : :+- FileScan parquet default.t1[a#6] Batched: true, > DataFilters: [isnotnull(a#6)], Format: Parquet, Location: > InMemoryFileIndex[file:/Users/yumwang/Downloads/spark-3.0.0-preview2-bin-hadoop2.7/spark-warehous..., > PartitionFilters: [], PushedFilters: [IsNotNull(a)], ReadSchema: > struct > : +- BroadcastExchange > HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), > [id=#87] > :+- *(1) Project [b#10] > : +- *(1) Filter (isnotnull(b#10) AND (b#10 = 1)) > : +- *(1) ColumnarToRow > : +- FileScan parquet default.t2[b#10] Batched: > true, DataFilters: [isnotnull(b#10), (b#10 = 1)], Format: Parquet, Location: > InMemoryFileIndex[file:/Users/yumwang/Downloads/spark-3.0.0-preview2-bin-hadoop2.7/spark-warehous..., > PartitionFilters: [], PushedFilters: [IsNotNull(b), EqualTo(b,1)], > ReadSchema: struct > +- BroadcastExchange > HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), > [id=#96] >+- *(2) Project [c#14] > +- *(2) Filter (isnotnull(c#14) AND (c#14 = 1)) > +- *(2) ColumnarToRow > +- FileScan parquet default.t3[c#14] Batched: true, > DataFilters: [isnotnull(c#14), (c#14 = 1)], Format: Parquet, Location: > InMemoryFileIndex[file:/Users/yumwang/Downloads/spark-3.0.0-preview2-bin-hadoop2.7/spark-warehous..., > PartitionFilters: [], PushedFilters: [IsNotNull(c), EqualTo(c,1)], > ReadSchema: struct > Time taken: 3.785 seconds, Fetched 1 row(s) > {noformat} > Spark 2.2.x: > {noformat} > == Physical Plan == > *HashAggregate(keys=[], functions=[count(1)]) > +- Exchange SinglePartition >+- *HashAggregate(keys=[], functions=[partial_count(1)]) > +- *Project > +- *SortMergeJoin [b#19], [c#23], Inner > :- *Project [b#19] > : +- *SortMergeJoin [a#15], [b#19], Inner > : :- *Sort [a#15 ASC NULLS FIRST], false, 0 > : : +- Exchange hashpartitioning(a#15, 200) > : : +- *Filter (isnotnull(a#15) && (a#15 = 1)) > : :+- HiveTableScan [a#15], HiveTableRelation > `default`.`t1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [a#15, > b#16, c#17] > : +- *Sort [b#19 ASC NULLS FIRST], false, 0 > :+- Exchange hashpartitioning(b#19, 200) > : +- *Filter (isnotnull(b#19) && (b#19 = 1)) > : +- HiveTableScan [b#19], HiveTableRelation > `default`.`t2`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [a#18, > b#19, c#20] > +- *Sort [c#23 ASC NULLS FIRST], false, 0 >
[jira] [Commented] (SPARK-30876) Optimizer cannot infer from inferred constraints with join
[ https://issues.apache.org/jira/browse/SPARK-30876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17138072#comment-17138072 ] Yuming Wang commented on SPARK-30876: - Thank you [~navinvishy]. > Optimizer cannot infer from inferred constraints with join > -- > > Key: SPARK-30876 > URL: https://issues.apache.org/jira/browse/SPARK-30876 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.1.0 >Reporter: Yuming Wang >Priority: Major > > How to reproduce this issue: > {code:sql} > create table t1(a int, b int, c int); > create table t2(a int, b int, c int); > create table t3(a int, b int, c int); > select count(*) from t1 join t2 join t3 on (t1.a = t2.b and t2.b = t3.c and > t3.c = 1); > {code} > Spark 2.3+: > {noformat} > == Physical Plan == > *(4) HashAggregate(keys=[], functions=[count(1)]) > +- Exchange SinglePartition, true, [id=#102] >+- *(3) HashAggregate(keys=[], functions=[partial_count(1)]) > +- *(3) Project > +- *(3) BroadcastHashJoin [b#10], [c#14], Inner, BuildRight > :- *(3) Project [b#10] > : +- *(3) BroadcastHashJoin [a#6], [b#10], Inner, BuildRight > : :- *(3) Project [a#6] > : : +- *(3) Filter isnotnull(a#6) > : : +- *(3) ColumnarToRow > : :+- FileScan parquet default.t1[a#6] Batched: true, > DataFilters: [isnotnull(a#6)], Format: Parquet, Location: > InMemoryFileIndex[file:/Users/yumwang/Downloads/spark-3.0.0-preview2-bin-hadoop2.7/spark-warehous..., > PartitionFilters: [], PushedFilters: [IsNotNull(a)], ReadSchema: > struct > : +- BroadcastExchange > HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), > [id=#87] > :+- *(1) Project [b#10] > : +- *(1) Filter (isnotnull(b#10) AND (b#10 = 1)) > : +- *(1) ColumnarToRow > : +- FileScan parquet default.t2[b#10] Batched: > true, DataFilters: [isnotnull(b#10), (b#10 = 1)], Format: Parquet, Location: > InMemoryFileIndex[file:/Users/yumwang/Downloads/spark-3.0.0-preview2-bin-hadoop2.7/spark-warehous..., > PartitionFilters: [], PushedFilters: [IsNotNull(b), EqualTo(b,1)], > ReadSchema: struct > +- BroadcastExchange > HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), > [id=#96] >+- *(2) Project [c#14] > +- *(2) Filter (isnotnull(c#14) AND (c#14 = 1)) > +- *(2) ColumnarToRow > +- FileScan parquet default.t3[c#14] Batched: true, > DataFilters: [isnotnull(c#14), (c#14 = 1)], Format: Parquet, Location: > InMemoryFileIndex[file:/Users/yumwang/Downloads/spark-3.0.0-preview2-bin-hadoop2.7/spark-warehous..., > PartitionFilters: [], PushedFilters: [IsNotNull(c), EqualTo(c,1)], > ReadSchema: struct > Time taken: 3.785 seconds, Fetched 1 row(s) > {noformat} > Spark 2.2.x: > {noformat} > == Physical Plan == > *HashAggregate(keys=[], functions=[count(1)]) > +- Exchange SinglePartition >+- *HashAggregate(keys=[], functions=[partial_count(1)]) > +- *Project > +- *SortMergeJoin [b#19], [c#23], Inner > :- *Project [b#19] > : +- *SortMergeJoin [a#15], [b#19], Inner > : :- *Sort [a#15 ASC NULLS FIRST], false, 0 > : : +- Exchange hashpartitioning(a#15, 200) > : : +- *Filter (isnotnull(a#15) && (a#15 = 1)) > : :+- HiveTableScan [a#15], HiveTableRelation > `default`.`t1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [a#15, > b#16, c#17] > : +- *Sort [b#19 ASC NULLS FIRST], false, 0 > :+- Exchange hashpartitioning(b#19, 200) > : +- *Filter (isnotnull(b#19) && (b#19 = 1)) > : +- HiveTableScan [b#19], HiveTableRelation > `default`.`t2`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [a#18, > b#19, c#20] > +- *Sort [c#23 ASC NULLS FIRST], false, 0 >+- Exchange hashpartitioning(c#23, 200) > +- *Filter (isnotnull(c#23) && (c#23 = 1)) > +- HiveTableScan [c#23], HiveTableRelation > `default`.`t3`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [a#21, > b#22, c#23] > Time taken: 0.728 seconds, Fetched 1 row(s) > {noformat} > Spark 2.2 can infer {{(a#15 = 1)}}, but Spark 2.3+ can't. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-30876) Optimizer cannot infer from inferred constraints with join
[ https://issues.apache.org/jira/browse/SPARK-30876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17136753#comment-17136753 ] Navin Viswanath commented on SPARK-30876: - Hi [~yumwang] I'd be happy to take a look at this if that's ok. > Optimizer cannot infer from inferred constraints with join > -- > > Key: SPARK-30876 > URL: https://issues.apache.org/jira/browse/SPARK-30876 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.1.0 >Reporter: Yuming Wang >Priority: Major > > How to reproduce this issue: > {code:sql} > create table t1(a int, b int, c int); > create table t2(a int, b int, c int); > create table t3(a int, b int, c int); > select count(*) from t1 join t2 join t3 on (t1.a = t2.b and t2.b = t3.c and > t3.c = 1); > {code} > Spark 2.3+: > {noformat} > == Physical Plan == > *(4) HashAggregate(keys=[], functions=[count(1)]) > +- Exchange SinglePartition, true, [id=#102] >+- *(3) HashAggregate(keys=[], functions=[partial_count(1)]) > +- *(3) Project > +- *(3) BroadcastHashJoin [b#10], [c#14], Inner, BuildRight > :- *(3) Project [b#10] > : +- *(3) BroadcastHashJoin [a#6], [b#10], Inner, BuildRight > : :- *(3) Project [a#6] > : : +- *(3) Filter isnotnull(a#6) > : : +- *(3) ColumnarToRow > : :+- FileScan parquet default.t1[a#6] Batched: true, > DataFilters: [isnotnull(a#6)], Format: Parquet, Location: > InMemoryFileIndex[file:/Users/yumwang/Downloads/spark-3.0.0-preview2-bin-hadoop2.7/spark-warehous..., > PartitionFilters: [], PushedFilters: [IsNotNull(a)], ReadSchema: > struct > : +- BroadcastExchange > HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), > [id=#87] > :+- *(1) Project [b#10] > : +- *(1) Filter (isnotnull(b#10) AND (b#10 = 1)) > : +- *(1) ColumnarToRow > : +- FileScan parquet default.t2[b#10] Batched: > true, DataFilters: [isnotnull(b#10), (b#10 = 1)], Format: Parquet, Location: > InMemoryFileIndex[file:/Users/yumwang/Downloads/spark-3.0.0-preview2-bin-hadoop2.7/spark-warehous..., > PartitionFilters: [], PushedFilters: [IsNotNull(b), EqualTo(b,1)], > ReadSchema: struct > +- BroadcastExchange > HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), > [id=#96] >+- *(2) Project [c#14] > +- *(2) Filter (isnotnull(c#14) AND (c#14 = 1)) > +- *(2) ColumnarToRow > +- FileScan parquet default.t3[c#14] Batched: true, > DataFilters: [isnotnull(c#14), (c#14 = 1)], Format: Parquet, Location: > InMemoryFileIndex[file:/Users/yumwang/Downloads/spark-3.0.0-preview2-bin-hadoop2.7/spark-warehous..., > PartitionFilters: [], PushedFilters: [IsNotNull(c), EqualTo(c,1)], > ReadSchema: struct > Time taken: 3.785 seconds, Fetched 1 row(s) > {noformat} > Spark 2.2.x: > {noformat} > == Physical Plan == > *HashAggregate(keys=[], functions=[count(1)]) > +- Exchange SinglePartition >+- *HashAggregate(keys=[], functions=[partial_count(1)]) > +- *Project > +- *SortMergeJoin [b#19], [c#23], Inner > :- *Project [b#19] > : +- *SortMergeJoin [a#15], [b#19], Inner > : :- *Sort [a#15 ASC NULLS FIRST], false, 0 > : : +- Exchange hashpartitioning(a#15, 200) > : : +- *Filter (isnotnull(a#15) && (a#15 = 1)) > : :+- HiveTableScan [a#15], HiveTableRelation > `default`.`t1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [a#15, > b#16, c#17] > : +- *Sort [b#19 ASC NULLS FIRST], false, 0 > :+- Exchange hashpartitioning(b#19, 200) > : +- *Filter (isnotnull(b#19) && (b#19 = 1)) > : +- HiveTableScan [b#19], HiveTableRelation > `default`.`t2`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [a#18, > b#19, c#20] > +- *Sort [c#23 ASC NULLS FIRST], false, 0 >+- Exchange hashpartitioning(c#23, 200) > +- *Filter (isnotnull(c#23) && (c#23 = 1)) > +- HiveTableScan [c#23], HiveTableRelation > `default`.`t3`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [a#21, > b#22, c#23] > Time taken: 0.728 seconds, Fetched 1 row(s) > {noformat} > Spark 2.2 can infer {{(a#15 = 1)}}, but Spark 2.3+ can't. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org