leanken opened a new pull request #29301: URL: https://github.com/apache/spark/pull/29301
### What changes were proposed in this pull request? This is a follow up issue of [SPARK-32290](https://issues.apache.org/jira/browse/SPARK-32290). In SPARK-32290, We only support Single Column NAAJ because of the complexity of Multi Column support, See. http://www.vldb.org/pvldb/vol2/vldb09-423.pdf Section 6. In this PR, proposed a trade-off that can also support multi column to perform hash lookup in buildSide, but required buildSide with extra duplicate data, the duplication would be 2^numKeys - 1, for example, if we are to support NAAJ with 3 column join key, the buildSide would be expanded into (2^3 - 1) times, 7X. For example, if there is a UnsafeRow key (1,2,3) In NullAware Mode, it should be expanded into 7 keys with extra C(3,1), C(3,2) combinations, within the combinations, we duplicated these record with null padding as following. Original record (1,2,3) Extra record to be appended into HashedRelation (null, 2, 3) (1, null, 3) (1, 2, null) (null, null, 3) (null, 2, null) (1, null, null)) with the expanded data we can extract a common pattern for both single and multi column. allNull refer to a unsafeRow which has all null columns. * buildSide is empty input => return all rows * allNullColumnKey Exists In buildSide input => reject all rows * if streamedSideRow.allNull is true => drop the row * if streamedSideRow.allNull is false & findMatch in NullAwareHashedRelation => drop the row * if streamedSideRow.allNull is false & notFindMatch in NullAwareHashedRelation => return the row ### Why are the changes needed? Considered that NAAJ in real production usage, the numKeys should not be that big, normally 1~3 keys, I think it's still worth to do such trade-off. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? 1. SQLQueryTestSuite with NOT IN keyword SQL, add CONFIG_DIM with spark.sql.optimizeNullAwareAntiJoin on and off 2. added case in org.apache.spark.sql.JoinSuite. 3. added case in org.apache.spark.sql.SubquerySuite. 4. added case in org.apache.spark.sql.execution.joins.HashedRelationSuite to make sure the data expand logical. 5. config combination against e2e test (both single and multi column cases) with following ``` Map( "spark.sql.optimizeNullAwareAntiJoin" -> "true", "spark.sql.adaptive.enabled" -> "false", "spark.sql.codegen.wholeStage" -> "false" ), Map( "sspark.sql.optimizeNullAwareAntiJoin" -> "true", "spark.sql.adaptive.enabled" -> "false", "spark.sql.codegen.wholeStage" -> "true" ), Map( "spark.sql.optimizeNullAwareAntiJoin" -> "true", "spark.sql.adaptive.enabled" -> "true", "spark.sql.codegen.wholeStage" -> "false" ), Map( "spark.sql.optimizeNullAwareAntiJoin" -> "true", "spark.sql.adaptive.enabled" -> "true", "spark.sql.codegen.wholeStage" -> "true" ) ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
