agrawaldevesh commented on a change in pull request #29104:
URL: https://github.com/apache/spark/pull/29104#discussion_r460176730
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##########
@@ -323,11 +374,20 @@ private[joins] object UnsafeHashedRelation {
// Create a mapping of buildKeys -> rows
val keyGenerator = UnsafeProjection.create(key)
var numFields = 0
+ val numKeys = key.length
+ val isOriginalInputEmpty = !input.hasNext
+ var allNullColumnKeyExistsInOriginalInput: Boolean = false
while (input.hasNext) {
val row = input.next().asInstanceOf[UnsafeRow]
numFields = row.numFields()
val key = keyGenerator(row)
- if (!key.anyNull) {
+ if (isNullAware &&
+ !allNullColumnKeyExistsInOriginalInput &&
+ (0 until numKeys).forall(key.isNullAt)) {
Review comment:
Dumb question, which I know I have asked before: Can `numKeys` be more
than 1 now given that we are only considering the single key case ? If not,
should this `allNullColumnKeyExistsInOriginalInput` be changed to
`keyWithSingleNullColumnExistsInOriginalInput` (as an example name) in addition
to changing the `forall` above to simply check for `key.isNullAll(0)` ?
If you do consider making this change, just a reminder to change
`allNullColumnKeyExistsInOriginalInput` in the `LongHashedRelation` too
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
##########
@@ -454,6 +491,48 @@ case class BroadcastHashJoinExec(
val (matched, checkCondition, _) = getJoinCondition(ctx, input)
val numOutput = metricTerm(ctx, "numOutputRows")
+ // fast stop if isOriginalInputEmpty = true, should accept all rows in
streamedSide
+ if (broadcastRelation.value.isOriginalInputEmpty) {
+ return s"""
+ |// Anti Join isOriginalInputEmpty(true) accept all
+ |$numOutput.add(1);
+ |${consume(ctx, input)}
+ """.stripMargin
+ }
+
+ if (isNullAwareAntiJoin) {
+ if (broadcastRelation.value.allNullColumnKeyExistsInOriginalInput) {
+ return s"""
+ |// NAAJ
+ |// isOriginalInputEmpty(false)
allNullColumnKeyExistsInOriginalInput(true)
+ |// reject all
+ """.stripMargin
+ } else {
+ val found = ctx.freshName("found")
+ return s"""
+ |// NAAJ
+ |// isOriginalInputEmpty(false)
allNullColumnKeyExistsInOriginalInput(false)
+ |boolean $found = false;
+ |// generate join key for stream side
+ |${keyEv.code}
+ |// anyNull is equivalent to allNull since it's a
single-column key.
Review comment:
nit: same, this reference to allNull is distracting now.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
##########
@@ -133,10 +142,38 @@ case class BroadcastHashJoinExec(
val numOutputRows = longMetric("numOutputRows")
val broadcastRelation = buildPlan.executeBroadcast[HashedRelation]()
- streamedPlan.execute().mapPartitions { streamedIter =>
- val hashed = broadcastRelation.value.asReadOnlyCopy()
-
TaskContext.get().taskMetrics().incPeakExecutionMemory(hashed.estimatedSize)
- join(streamedIter, hashed, numOutputRows)
+ if (isNullAwareAntiJoin) {
+ streamedPlan.execute().mapPartitionsInternal { streamedIter =>
+ val hashed = broadcastRelation.value.asReadOnlyCopy()
+
TaskContext.get().taskMetrics().incPeakExecutionMemory(hashed.estimatedSize)
+ if (hashed.isOriginalInputEmpty) {
+ streamedIter
+ } else if (hashed.allNullColumnKeyExistsInOriginalInput) {
+ Iterator.empty
+ } else {
+ val keyGenerator = UnsafeProjection.create(
+ BindReferences.bindReferences[Expression](
+ leftKeys,
+ AttributeSeq(left.output))
+ )
+ streamedIter.filter(row => {
+ val lookupKey: UnsafeRow = keyGenerator(row)
+ // anyNull is equivalent to allNull since it's a single-column key.
Review comment:
nit: I think this comment is distracting now that there is no such thing
as 'allNull'
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##########
@@ -896,22 +969,32 @@ private[joins] object LongHashedRelation {
// Create a mapping of key -> rows
var numFields = 0
+ val isOriginalInputEmpty: Boolean = !input.hasNext
+ var allNullColumnKeyExistsInOriginalInput: Boolean = false
while (input.hasNext) {
val unsafeRow = input.next().asInstanceOf[UnsafeRow]
numFields = unsafeRow.numFields()
val rowKey = keyGenerator(unsafeRow)
if (!rowKey.isNullAt(0)) {
+ // LongToUnsafeRowMap can't insert null key
val key = rowKey.getLong(0)
map.append(key, unsafeRow)
+ } else if (!allNullColumnKeyExistsInOriginalInput) {
Review comment:
Should this check also incorporate `isNullAware`: `else if (isNullAware
&& ! allNullColumnKeyExistsInOriginalInput)`
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##########
@@ -81,6 +81,34 @@ private[execution] sealed trait HashedRelation extends
KnownSizeEstimation {
*/
def asReadOnlyCopy(): HashedRelation
+
+ /**
+ * Normally HashedRelation is built from an Source (input:
Iterator[InternalRow]).
+ * This indicates the original input is empty.
+ * Note that, the hashed relation can be empty despite the input being not
empty,
+ * since the hashed relation skips over null keys.
+ */
+ var isOriginalInputEmpty: Boolean
+
+ def setOriginalInputEmtpy(isOriginalInputEmpty: Boolean): HashedRelation = {
+ this.isOriginalInputEmpty = isOriginalInputEmpty
+ this
+ }
+
+ /**
+ * It's only used in null aware anti join.
+ * This will be set true if Source (input: Iterator[InternalRow]) contains a
key,
+ * which is allNullColumn.
Review comment:
nit: Can you further expand allNullColumn in the comment: eg, "contains
a key, which has all null columns"
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##########
@@ -81,6 +81,34 @@ private[execution] sealed trait HashedRelation extends
KnownSizeEstimation {
*/
def asReadOnlyCopy(): HashedRelation
+
+ /**
+ * Normally HashedRelation is built from an Source (input:
Iterator[InternalRow]).
+ * This indicates the original input is empty.
+ * Note that, the hashed relation can be empty despite the input being not
empty,
+ * since the hashed relation skips over null keys.
+ */
+ var isOriginalInputEmpty: Boolean
+
+ def setOriginalInputEmtpy(isOriginalInputEmpty: Boolean): HashedRelation = {
+ this.isOriginalInputEmpty = isOriginalInputEmpty
+ this
+ }
+
+ /**
+ * It's only used in null aware anti join.
+ * This will be set true if Source (input: Iterator[InternalRow]) contains a
key,
+ * which is allNullColumn.
+ */
+ var allNullColumnKeyExistsInOriginalInput: Boolean
+
+ def setAllNullColumnKeyExistsInOriginalInput(
+ allNullColumnKeyExistsInOriginalInput: Boolean): HashedRelation = {
+ this.allNullColumnKeyExistsInOriginalInput =
Review comment:
nit: I kind of think this line does not need to be split into two.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##########
@@ -923,9 +1006,9 @@ case class HashedRelationBroadcastMode(key:
Seq[Expression])
sizeHint: Option[Long]): HashedRelation = {
sizeHint match {
case Some(numRows) =>
- HashedRelation(rows, canonicalized.key, numRows.toInt)
+ HashedRelation(rows, canonicalized.key, numRows.toInt, isNullAware =
isNullAware)
Review comment:
nit: Should this simply be `isNullAware` (ie passed in without a named
argument)
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##########
@@ -896,22 +969,32 @@ private[joins] object LongHashedRelation {
// Create a mapping of key -> rows
var numFields = 0
+ val isOriginalInputEmpty: Boolean = !input.hasNext
+ var allNullColumnKeyExistsInOriginalInput: Boolean = false
while (input.hasNext) {
val unsafeRow = input.next().asInstanceOf[UnsafeRow]
numFields = unsafeRow.numFields()
val rowKey = keyGenerator(unsafeRow)
if (!rowKey.isNullAt(0)) {
+ // LongToUnsafeRowMap can't insert null key
val key = rowKey.getLong(0)
map.append(key, unsafeRow)
+ } else if (!allNullColumnKeyExistsInOriginalInput) {
+ // LongHashedRelation stores single-column key
Review comment:
nit: I think this comment online 983 is more globally applicable about
LongHashedRelation: either remove it for fold it in the class description. Same
might be said for the comment on line 979 above.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]