agrawaldevesh commented on a change in pull request #29104:
URL: https://github.com/apache/spark/pull/29104#discussion_r459804718
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##########
@@ -81,6 +81,34 @@ private[execution] sealed trait HashedRelation extends
KnownSizeEstimation {
*/
def asReadOnlyCopy(): HashedRelation
+
+ /**
+ * Normally HashedRelation is built from an Source (input:
Iterator[InternalRow]).
+ * This indicates the original input is empty.
+ * Note that, the hashed relation can be empty despite the input being not
empty,
+ * since the hashed relation skips over null keys.
+ */
+ var isOriginalInputEmpty: Boolean
+
+ def setOriginInputEmtpy(isOriginalInputEmpty: Boolean): HashedRelation = {
Review comment:
oops Forgot to change the name of the setter.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
##########
@@ -454,6 +491,48 @@ case class BroadcastHashJoinExec(
val (matched, checkCondition, _) = getJoinCondition(ctx, input)
val numOutput = metricTerm(ctx, "numOutputRows")
+ // fast stop if isOriginalInputEmpty = true, should accept all rows in
streamedSide
+ if (broadcastRelation.value.isOriginalInputEmpty) {
+ return s"""
+ |// Anti Join isOriginalInputEmpty(true) accept all
+ |$numOutput.add(1);
+ |${consume(ctx, input)}
+ """.stripMargin
+ }
+
+ if (isNullAwareAntiJoin) {
+ if (broadcastRelation.value.allNullColumnKeyExistsInOriginalInput) {
+ return s"""
+ |// NAAJ
Review comment:
It would be awesome to verify using
org.apache.spark.sql.execution.debug.DebugQuery#debugCodegen that this case
does not generate an empty spin-loop -- ie a loop with no executable body.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##########
@@ -323,11 +374,18 @@ private[joins] object UnsafeHashedRelation {
// Create a mapping of buildKeys -> rows
val keyGenerator = UnsafeProjection.create(key)
var numFields = 0
+ val numKeys = key.length
+ val isOriginalInputEmpty = !input.hasNext
+ var allNullColumnKeyExistsInOriginalInput: Boolean = false
while (input.hasNext) {
val row = input.next().asInstanceOf[UnsafeRow]
numFields = row.numFields()
val key = keyGenerator(row)
- if (!key.anyNull) {
+ if ((0 until numKeys).forall(key.isNullAt)) {
Review comment:
I think you should only do this check if isNullAware is true. Why pay
the perf penalty of checking the row again otherwise ?
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##########
@@ -323,11 +374,18 @@ private[joins] object UnsafeHashedRelation {
// Create a mapping of buildKeys -> rows
val keyGenerator = UnsafeProjection.create(key)
var numFields = 0
+ val numKeys = key.length
+ val isOriginalInputEmpty = !input.hasNext
+ var allNullColumnKeyExistsInOriginalInput: Boolean = false
while (input.hasNext) {
val row = input.next().asInstanceOf[UnsafeRow]
numFields = row.numFields()
val key = keyGenerator(row)
- if (!key.anyNull) {
+ if ((0 until numKeys).forall(key.isNullAt)) {
+ allNullColumnKeyExistsInOriginalInput = true
+ }
+
+ if (isNullAware || (!isNullAware && !key.anyNull)) {
Review comment:
Can we simplify this to: `isNullAware || !key.anyNull` ?
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##########
@@ -896,22 +967,32 @@ private[joins] object LongHashedRelation {
// Create a mapping of key -> rows
var numFields = 0
+ val isOriginalInputEmpty: Boolean = !input.hasNext
+ var allNullColumnKeyExistsInOriginalInput: Boolean = false
while (input.hasNext) {
val unsafeRow = input.next().asInstanceOf[UnsafeRow]
numFields = unsafeRow.numFields()
val rowKey = keyGenerator(unsafeRow)
if (!rowKey.isNullAt(0)) {
+ // LongToUnsafeRowMap can't insert null key
val key = rowKey.getLong(0)
map.append(key, unsafeRow)
+ } else {
+ // LongHashedRelation is single-column key
Review comment:
LongHashedRelation only stores single column keys.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]