agrawaldevesh commented on a change in pull request #29304:
URL: https://github.com/apache/spark/pull/29304#discussion_r463936613
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
##########
@@ -245,7 +244,7 @@ case class BroadcastHashJoinExec(
|boolean $found = false;
|// generate join key for stream side
|${keyEv.code}
- |if ($anyNull) {
+ |if (${ if (isLongHashedRelation) s"$anyNull" else
s"${keyEv.value}.allNull()"}) {
Review comment:
perhaps a comment here ?
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##########
@@ -327,11 +327,27 @@ private[joins] object UnsafeHashedRelation {
// Create a mapping of buildKeys -> rows
val keyGenerator = UnsafeProjection.create(key)
var numFields = 0
+ val nullPaddingCombinations: Seq[UnsafeProjection] = if (isNullAware) {
Review comment:
Can you add a small example here to illustrate what this block of code
is doing. What do you think of extracting this into a method for better
readability ? It's a bit big to be inline. Perhaps take an example of like 3
keys.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##########
@@ -342,8 +358,19 @@ private[joins] object UnsafeHashedRelation {
throw new SparkOutOfMemoryError("There is not enough memory to build
hash map")
// scalastyle:on throwerror
}
- } else if (isNullAware) {
- return EmptyHashedRelationWithAllNullKeys
+ }
+
+ val row = input.next().asInstanceOf[UnsafeRow]
+ numFields = row.numFields()
+ val key = keyGenerator(row)
+ if (isNullAware) {
+ // fast stop when all null column key found.
+ if (key.allNull()) {
+ return EmptyHashedRelationWithAllNullKeys
+ }
+ nullPaddingCombinations.foreach(project => append(project(row).copy(),
row))
Review comment:
nit: need a better name than `project` here. How about
`nullPaddedKeySubsequence` or `nullPaddedKeyCombination` or something.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
##########
@@ -245,7 +244,7 @@ case class BroadcastHashJoinExec(
|boolean $found = false;
|// generate join key for stream side
|${keyEv.code}
- |if ($anyNull) {
+ |if (${if (isLongHashedRelation) s"$anyNull" else
s"${keyEv.value}.allNull()"}) {
Review comment:
Oh you can combine multiple keys into a single long key ? I believe the
reason for this special case with the long hashed relation is because the long
hashed relation can only take a single long key. But it would be interesting if
we can get multiple "multiple real keys packed into it".
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##########
@@ -327,11 +327,27 @@ private[joins] object UnsafeHashedRelation {
// Create a mapping of buildKeys -> rows
val keyGenerator = UnsafeProjection.create(key)
var numFields = 0
+ val nullPaddingCombinations: Seq[UnsafeProjection] = if (isNullAware) {
Review comment:
Also do we use nullPaddingCombinations when all the keys are null ?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]