wangyum opened a new pull request #27518: [SPARK-30768][SQL] Constraints should be inferred from inequality attributes URL: https://github.com/apache/spark/pull/27518 ### What changes were proposed in this pull request? Constraints should be inferred from inequality attributes, for example: ```sql create table SPARK_30768_1(c1 int, c2 int); create table SPARK_30768_2(c1 int, c2 int); explain select t1.* from SPARK_30768_1 t1 join SPARK_30768_2 t2 on (t1.c1 > t2.c1) where t1.c1 = 3; == Physical Plan == *(3) Project [c1#5, c2#6] +- BroadcastNestedLoopJoin BuildRight, Inner, (c1#5 > c1#7) :- *(1) Project [c1#5, c2#6] : +- *(1) Filter (isnotnull(c1#5) AND (c1#5 = 3)) : +- *(1) ColumnarToRow : +- FileScan parquet default.spark_30768_1[c1#5,c2#6] Batched: true, DataFilters: [isnotnull(c1#5), (c1#5 = 3)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark-3.0.0-preview2-bin-hadoop2.7/spark-warehous..., PartitionFilters: [], PushedFilters: [IsNotNull(c1), EqualTo(c1,3)], ReadSchema: struct<c1:int,c2:int> +- BroadcastExchange IdentityBroadcastMode, [id=#60] +- *(2) Project [c1#7] +- *(2) Filter isnotnull(c1#7) +- *(2) ColumnarToRow +- FileScan parquet default.spark_30768_2[c1#7] Batched: true, DataFilters: [isnotnull(c1#7)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark-3.0.0-preview2-bin-hadoop2.7/spark-warehous..., PartitionFilters: [], PushedFilters: [IsNotNull(c1)], ReadSchema: struct<c1:int> ``` After this PR: ```sql == Physical Plan == *(3) Project [c1#218, c2#219] +- BroadcastNestedLoopJoin BuildRight, Inner, (c1#218 > c1#220) :- *(1) Project [c1#218, c2#219] : +- *(1) Filter (isnotnull(c1#218) AND (c1#218 = 3)) : +- *(1) ColumnarToRow : +- FileScan parquet default.spark_30768_1[c1#218,c2#219] Batched: true, DataFilters: [isnotnull(c1#218), (c1#218 = 3)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/opensource/spark/sql/core/spark-warehouse/org.apache.spark...., PartitionFilters: [], PushedFilters: [IsNotNull(c1), EqualTo(c1,3)], ReadSchema: struct<c1:int,c2:int> +- BroadcastExchange IdentityBroadcastMode, [id=#120] +- *(2) Project [c1#220] +- *(2) Filter ((3 > c1#220) AND isnotnull(c1#220)) +- *(2) ColumnarToRow +- FileScan parquet default.spark_30768_2[c1#220] Batched: true, DataFilters: [(3 > c1#220), isnotnull(c1#220)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/opensource/spark/sql/core/spark-warehouse/org.apache.spark...., PartitionFilters: [], PushedFilters: [LessThan(c1,3), IsNotNull(c1)], ReadSchema: struct<c1:int> ``` Hive support this feature: ```sql hive> explain select t1.* from SPARK_30768_1 t1 join SPARK_30768_2 t2 on (t1.c1 > t2.c1) where t1.c1 = 3; Warning: Map Join MAPJOIN[13][bigTable=?] in task 'Stage-3:MAPRED' is a cross product OK STAGE DEPENDENCIES: Stage-4 is a root stage Stage-3 depends on stages: Stage-4 Stage-0 depends on stages: Stage-3 STAGE PLANS: Stage: Stage-4 Map Reduce Local Work Alias -> Map Local Tables: $hdt$_0:t1 Fetch Operator limit: -1 Alias -> Map Local Operator Tree: $hdt$_0:t1 TableScan alias: t1 filterExpr: (c1 = 3) (type: boolean) Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE Filter Operator predicate: (c1 = 3) (type: boolean) Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE Select Operator expressions: c2 (type: int) outputColumnNames: _col1 Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE HashTable Sink Operator keys: 0 1 Stage: Stage-3 Map Reduce Map Operator Tree: TableScan alias: t2 filterExpr: (c1 < 3) (type: boolean) Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE Filter Operator predicate: (c1 < 3) (type: boolean) Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE Select Operator Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE Map Join Operator condition map: Inner Join 0 to 1 keys: 0 1 outputColumnNames: _col1 Statistics: Num rows: 1 Data size: 1 Basic stats: PARTIAL Column stats: NONE Select Operator expressions: 3 (type: int), _col1 (type: int) outputColumnNames: _col0, _col1 Statistics: Num rows: 1 Data size: 1 Basic stats: PARTIAL Column stats: NONE File Output Operator compressed: false Statistics: Num rows: 1 Data size: 1 Basic stats: PARTIAL Column stats: NONE table: input format: org.apache.hadoop.mapred.SequenceFileInputFormat output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe Execution mode: vectorized Local Work: Map Reduce Local Work Stage: Stage-0 Fetch Operator limit: -1 Processor Tree: ListSink Time taken: 5.491 seconds, Fetched: 71 row(s) ``` ### Why are the changes needed? Improve query performance. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Unit test.
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
