leanken opened a new pull request #29304:
URL: https://github.com/apache/spark/pull/29304
### What changes were proposed in this pull request?
In this PR, proposed a trade-off that can also support multi column to
perform hash lookup in buildSide, but required buildSide with extra duplicate
data, the duplication would be 2^numKeys - 1, for example, if we are to support
NAAJ with 3 column join key, the buildSide would be expanded into (2^3 - 1)
times, 7X.
For example, if there is a UnsafeRow key (1,2,3)
In NullAware Mode, it should be expanded into 7 keys with extra C(3,1),
C(3,2) combinations, within the combinations, we duplicated these record with
null padding as following.
Original record
(1,2,3)
Extra record to be appended into HashedRelation
(null, 2, 3) (1, null, 3) (1, 2, null)
(null, null, 3) (null, 2, null) (1, null, null))
with the expanded data we can extract a common pattern for both single and
multi column. allNull refer to a unsafeRow which has all null columns.
* buildSide is empty input => return all rows
* allNullColumnKey Exists In buildSide input => reject all rows
* if streamedSideRow.allNull is true => drop the row
* if streamedSideRow.allNull is false & findMatch in NullAwareHashedRelation
=> drop the row
* if streamedSideRow.allNull is false & notFindMatch in
NullAwareHashedRelation => return the row
### Why are the changes needed?
Considered that NAAJ in real production usage, the numKeys should not be
that big, normally 1~3 keys, I think it's still worth to do such trade-off.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
1. SQLQueryTestSuite with NOT IN keyword SQL, add CONFIG_DIM with
spark.sql.optimizeNullAwareAntiJoin on and off
2. added case in org.apache.spark.sql.JoinSuite.
3. added case in org.apache.spark.sql.SubquerySuite.
4. added case in org.apache.spark.sql.execution.joins.HashedRelationSuite to
make sure the data expand logical.
5. config combination against e2e test (both single and multi column cases)
with following
```
Map(
"spark.sql.optimizeNullAwareAntiJoin" -> "true",
"spark.sql.adaptive.enabled" -> "false",
"spark.sql.codegen.wholeStage" -> "false"
),
Map(
"sspark.sql.optimizeNullAwareAntiJoin" -> "true",
"spark.sql.adaptive.enabled" -> "false",
"spark.sql.codegen.wholeStage" -> "true"
),
Map(
"spark.sql.optimizeNullAwareAntiJoin" -> "true",
"spark.sql.adaptive.enabled" -> "true",
"spark.sql.codegen.wholeStage" -> "false"
),
Map(
"spark.sql.optimizeNullAwareAntiJoin" -> "true",
"spark.sql.adaptive.enabled" -> "true",
"spark.sql.codegen.wholeStage" -> "true"
)
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]