wangyum opened a new pull request #27518: [SPARK-30768][SQL] Constraints should 
be inferred from inequality attributes
URL: https://github.com/apache/spark/pull/27518
 
 
   ### What changes were proposed in this pull request?
   Constraints should be inferred from inequality attributes, for example:
   ```sql
   create table SPARK_30768_1(c1 int, c2 int);
   create table SPARK_30768_2(c1 int, c2 int);
   
   explain select t1.* from SPARK_30768_1 t1 join SPARK_30768_2 t2 on (t1.c1 > 
t2.c1) where t1.c1 = 3;
   == Physical Plan ==
   *(3) Project [c1#5, c2#6]
   +- BroadcastNestedLoopJoin BuildRight, Inner, (c1#5 > c1#7)
      :- *(1) Project [c1#5, c2#6]
      :  +- *(1) Filter (isnotnull(c1#5) AND (c1#5 = 3))
      :     +- *(1) ColumnarToRow
      :        +- FileScan parquet default.spark_30768_1[c1#5,c2#6] Batched: 
true, DataFilters: [isnotnull(c1#5), (c1#5 = 3)], Format: Parquet, Location: 
InMemoryFileIndex[file:/Users/yumwang/spark-3.0.0-preview2-bin-hadoop2.7/spark-warehous...,
 PartitionFilters: [], PushedFilters: [IsNotNull(c1), EqualTo(c1,3)], 
ReadSchema: struct<c1:int,c2:int>
      +- BroadcastExchange IdentityBroadcastMode, [id=#60]
         +- *(2) Project [c1#7]
            +- *(2) Filter isnotnull(c1#7)
               +- *(2) ColumnarToRow
                  +- FileScan parquet default.spark_30768_2[c1#7] Batched: 
true, DataFilters: [isnotnull(c1#7)], Format: Parquet, Location: 
InMemoryFileIndex[file:/Users/yumwang/spark-3.0.0-preview2-bin-hadoop2.7/spark-warehous...,
 PartitionFilters: [], PushedFilters: [IsNotNull(c1)], ReadSchema: 
struct<c1:int>
   ```
   After this PR:
   ```sql
   == Physical Plan ==
   *(3) Project [c1#218, c2#219]
   +- BroadcastNestedLoopJoin BuildRight, Inner, (c1#218 > c1#220)
      :- *(1) Project [c1#218, c2#219]
      :  +- *(1) Filter (isnotnull(c1#218) AND (c1#218 = 3))
      :     +- *(1) ColumnarToRow
      :        +- FileScan parquet default.spark_30768_1[c1#218,c2#219] 
Batched: true, DataFilters: [isnotnull(c1#218), (c1#218 = 3)], Format: Parquet, 
Location: 
InMemoryFileIndex[file:/Users/yumwang/opensource/spark/sql/core/spark-warehouse/org.apache.spark....,
 PartitionFilters: [], PushedFilters: [IsNotNull(c1), EqualTo(c1,3)], 
ReadSchema: struct<c1:int,c2:int>
      +- BroadcastExchange IdentityBroadcastMode, [id=#120]
         +- *(2) Project [c1#220]
            +- *(2) Filter ((3 > c1#220) AND isnotnull(c1#220))
               +- *(2) ColumnarToRow
                  +- FileScan parquet default.spark_30768_2[c1#220] Batched: 
true, DataFilters: [(3 > c1#220), isnotnull(c1#220)], Format: Parquet, 
Location: 
InMemoryFileIndex[file:/Users/yumwang/opensource/spark/sql/core/spark-warehouse/org.apache.spark....,
 PartitionFilters: [], PushedFilters: [LessThan(c1,3), IsNotNull(c1)], 
ReadSchema: struct<c1:int>
   
   ```
   Hive support this  feature:
   ```sql
   hive> explain select t1.* from SPARK_30768_1 t1 join SPARK_30768_2 t2 on 
(t1.c1 > t2.c1) where t1.c1 = 3;
   Warning: Map Join MAPJOIN[13][bigTable=?] in task 'Stage-3:MAPRED' is a 
cross product
   OK
   STAGE DEPENDENCIES:
     Stage-4 is a root stage
     Stage-3 depends on stages: Stage-4
     Stage-0 depends on stages: Stage-3
   
   STAGE PLANS:
     Stage: Stage-4
       Map Reduce Local Work
         Alias -> Map Local Tables:
           $hdt$_0:t1
             Fetch Operator
               limit: -1
         Alias -> Map Local Operator Tree:
           $hdt$_0:t1
             TableScan
               alias: t1
               filterExpr: (c1 = 3) (type: boolean)
               Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column 
stats: NONE
               Filter Operator
                 predicate: (c1 = 3) (type: boolean)
                 Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL 
Column stats: NONE
                 Select Operator
                   expressions: c2 (type: int)
                   outputColumnNames: _col1
                   Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL 
Column stats: NONE
                   HashTable Sink Operator
                     keys:
                       0
                       1
   
     Stage: Stage-3
       Map Reduce
         Map Operator Tree:
             TableScan
               alias: t2
               filterExpr: (c1 < 3) (type: boolean)
               Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column 
stats: NONE
               Filter Operator
                 predicate: (c1 < 3) (type: boolean)
                 Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL 
Column stats: NONE
                 Select Operator
                   Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL 
Column stats: NONE
                   Map Join Operator
                     condition map:
                          Inner Join 0 to 1
                     keys:
                       0
                       1
                     outputColumnNames: _col1
                     Statistics: Num rows: 1 Data size: 1 Basic stats: PARTIAL 
Column stats: NONE
                     Select Operator
                       expressions: 3 (type: int), _col1 (type: int)
                       outputColumnNames: _col0, _col1
                       Statistics: Num rows: 1 Data size: 1 Basic stats: 
PARTIAL Column stats: NONE
                       File Output Operator
                         compressed: false
                         Statistics: Num rows: 1 Data size: 1 Basic stats: 
PARTIAL Column stats: NONE
                         table:
                             input format: 
org.apache.hadoop.mapred.SequenceFileInputFormat
                             output format: 
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                             serde: 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
         Execution mode: vectorized
         Local Work:
           Map Reduce Local Work
   
     Stage: Stage-0
       Fetch Operator
         limit: -1
         Processor Tree:
           ListSink
   
   Time taken: 5.491 seconds, Fetched: 71 row(s)
   ```
   
   ### Why are the changes needed?
   Improve query performance.
   
   
   ### Does this PR introduce any user-facing change?
   No.
   
   
   ### How was this patch tested?
   Unit test.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to