leanken opened a new pull request #29304:
URL: https://github.com/apache/spark/pull/29304


   ### What changes were proposed in this pull request?
   
   In this PR, proposed a trade-off that can also support multi column to 
perform hash lookup in buildSide, but required buildSide with extra duplicate 
data, the duplication would be 2^numKeys - 1, for example, if we are to support 
NAAJ with 3 column join key, the buildSide would be expanded into (2^3 - 1) 
times, 7X.
   
   For example, if there is a UnsafeRow key (1,2,3)
   In NullAware Mode, it should be expanded into 7 keys with extra C(3,1), 
C(3,2) combinations, within the combinations, we duplicated these record with 
null padding as following.
   
   Original record
   
   (1,2,3) 
   
   Extra record to be appended into HashedRelation
   
   (null, 2, 3) (1, null, 3) (1, 2, null)
   (null, null, 3) (null, 2, null) (1, null, null))
   
   with the expanded data we can extract a common pattern for both single and 
multi column. allNull refer to a unsafeRow which has all null columns.
   
   * buildSide is empty input => return all rows
   * allNullColumnKey Exists In buildSide input => reject all rows
   * if streamedSideRow.allNull is true => drop the row
   * if streamedSideRow.allNull is false & findMatch in NullAwareHashedRelation 
=> drop the row
   * if streamedSideRow.allNull is false & notFindMatch in 
NullAwareHashedRelation => return the row
   
   ### Why are the changes needed?
   Considered that NAAJ in real production usage, the numKeys should not be 
that big, normally 1~3 keys, I think it's still worth to do such trade-off. 
   
   ### Does this PR introduce _any_ user-facing change?
   No.
   
   ### How was this patch tested?
   
   1. SQLQueryTestSuite with NOT IN keyword SQL, add CONFIG_DIM with 
spark.sql.optimizeNullAwareAntiJoin on and off
   2. added case in org.apache.spark.sql.JoinSuite.
   3. added case in org.apache.spark.sql.SubquerySuite.
   4. added case in org.apache.spark.sql.execution.joins.HashedRelationSuite to 
make sure the data expand logical.
   5. config combination against e2e test (both single and multi column cases) 
with following
   
   ```
   Map(
     "spark.sql.optimizeNullAwareAntiJoin" -> "true",
     "spark.sql.adaptive.enabled" -> "false",
     "spark.sql.codegen.wholeStage" -> "false"
   ),
   Map(
     "sspark.sql.optimizeNullAwareAntiJoin" -> "true",
     "spark.sql.adaptive.enabled" -> "false",
     "spark.sql.codegen.wholeStage" -> "true"
   ),
   Map(
     "spark.sql.optimizeNullAwareAntiJoin" -> "true",
     "spark.sql.adaptive.enabled" -> "true",
     "spark.sql.codegen.wholeStage" -> "false"
   ),
   Map(
     "spark.sql.optimizeNullAwareAntiJoin" -> "true",
     "spark.sql.adaptive.enabled" -> "true",
     "spark.sql.codegen.wholeStage" -> "true"
   )
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to