c21 commented on a change in pull request #29455:
URL: https://github.com/apache/spark/pull/29455#discussion_r473377778
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -317,4 +318,41 @@ case class ShuffledHashJoinExec(
v => s"$v = $thisPlan.buildHashedRelation(inputs[1]);", forceInline =
true)
HashedRelationInfo(relationTerm, keyIsUnique = false, isEmpty = false)
}
+
+ /**
+ * Generates the code for anti join.
+ * Handles NULL-aware anti join (NAAJ) separately here.
+ */
+ protected override def codegenAnti(ctx: CodegenContext, input:
Seq[ExprCode]): String = {
+ if (isNullAwareAntiJoin) {
+ val HashedRelationInfo(relationTerm, _, _) = prepareRelation(ctx)
+ val (keyEv, anyNull) = genStreamSideJoinKey(ctx, input)
+ val (matched, _, _) = getJoinCondition(ctx, input)
+ val numOutput = metricTerm(ctx, "numOutputRows")
+ val found = ctx.freshName("found")
+
+ // Skip code generation for EmtpyHashedRelation and
EmptyHashedRelationWithAllNullKeys cases,
+ // since it is already handled by AQE rewrite.
+ s"""
+ |boolean $found = false;
+ |// generate join key for stream side
+ |${keyEv.code}
+ |if ($anyNull) {
+ | $found = true;
+ |} else {
+ | UnsafeRow $matched =
(UnsafeRow)$relationTerm.getValue(${keyEv.value});
Review comment:
`getValue()` is for retrieving of unique key only. how about changing
this into `get()` to be consistent with existing non-codegen code path. Though
there's no issue to retrieve a single value for key, when key is not unique.
But it can become a bug later when people enforce some check inside
`HashedRelation`.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -317,4 +318,41 @@ case class ShuffledHashJoinExec(
v => s"$v = $thisPlan.buildHashedRelation(inputs[1]);", forceInline =
true)
HashedRelationInfo(relationTerm, keyIsUnique = false, isEmpty = false)
}
+
+ /**
+ * Generates the code for anti join.
+ * Handles NULL-aware anti join (NAAJ) separately here.
+ */
+ protected override def codegenAnti(ctx: CodegenContext, input:
Seq[ExprCode]): String = {
Review comment:
given we refactored non-code-gen `nullAwareAntiJoin` to `HashJoin`, I
think we can also refactor code-gen `codegenNullAwareAntiJoin` from
`BroadcastHashJoinExec` and `ShuffledHashJoinExec` to `HashJoin` to avoid code
duplication.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
##########
@@ -277,6 +286,34 @@ trait HashJoin extends BaseJoinExec with CodegenSupport {
}
}
+ private def nullAwareAntiJoin(
+ streamIter: Iterator[InternalRow],
+ hashedRelation: HashedRelation): Iterator[InternalRow] = {
+ if (hashedRelation == EmptyHashedRelation) {
+ streamIter
+ } else if (hashedRelation == EmptyHashedRelationWithAllNullKeys) {
+ // This code branch should only be used in
BroadcastHashJoinExec(Non-AQE).
Review comment:
can we enforce this assumption better than comment? e.g. we can override
`nullAwareAntiJoin` in `BroadcastHashJoinExec` with:
```
protected override def nullAwareAntiJoin(
streamIter: Iterator[InternalRow],
hashedRelation: HashedRelation): Iterator[InternalRow] = {
if (hashedRelation == EmptyHashedRelationWithAllNullKeys &&
sqlContext.conf.adaptiveExecutionEnabled) {
Iterator.empty
} else {
super.nullAwareAntiJoin(streamIter, hashedRelation)
}
}
```
and we can make better assertion at `nullAwareAntiJoin ` in `HashJoin` here:
```
protected def nullAwareAntiJoin(
streamIter: Iterator[InternalRow],
hashedRelation: HashedRelation): Iterator[InternalRow] = {
if (hashedRelation == EmptyHashedRelation) {
streamIter
} else {
require(hashedRelation != EmptyHashedRelationWithAllNullKeys)
...
}
}
```
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
##########
@@ -277,6 +286,34 @@ trait HashJoin extends BaseJoinExec with CodegenSupport {
}
}
+ private def nullAwareAntiJoin(
+ streamIter: Iterator[InternalRow],
+ hashedRelation: HashedRelation): Iterator[InternalRow] = {
+ if (hashedRelation == EmptyHashedRelation) {
+ streamIter
+ } else if (hashedRelation == EmptyHashedRelationWithAllNullKeys) {
+ // This code branch should only be used in
BroadcastHashJoinExec(Non-AQE).
+ // In BroadcastHashJoinExec(AQE) and ShuffledHashJoinExec(AQE),
+ // the JoinExec would be rewritten into a Empty LocalRelation.
+ Iterator.empty
+ } else {
+ val keyGenerator = UnsafeProjection.create(
+ BindReferences.bindReferences[Expression](
+ leftKeys,
+ AttributeSeq(left.output))
+ )
Review comment:
why we can't reuse same method call for `streamSideKeyGenerator()`?
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
##########
@@ -52,7 +54,15 @@ case class EnsureRequirements(conf: SQLConf) extends
Rule[SparkPlan] {
case (child, distribution) =>
val numPartitions = distribution.requiredNumPartitions
.getOrElse(conf.numShufflePartitions)
- ShuffleExchangeExec(distribution.createPartitioning(numPartitions),
child)
+
+ operator match {
+ // Ensure collectNullPartitionKeyMetrics is passed through via
instance creation
+ case shj @ ShuffledHashJoinExec(_, _, LeftAnti, BuildRight, _, _, _,
true)
+ if shj.right.fastEquals(child) =>
+ ShuffleExchangeExec(distribution.createPartitioning(numPartitions),
+ child, statSpecs = Seq(NullPartitionKeyStatisticsSpec))
+ case _ =>
ShuffleExchangeExec(distribution.createPartitioning(numPartitions), child)
+ }
Review comment:
I feel this is not enough for passing `isNullAwareAntiJoin` information
from `ShuffledHashJoinExec` to `ShuffleExchangeExec`. This only works when we
need to insert a shuffle exchange at build side, but seems not cover when we
don't need to insert a shuffle exchange (i.e. build side already partitioned on
join keys).
Examples:
```
SELECT *
FROM t1
WHERE t1.key NOT IN (
SELECT t2.key
FROM t2
DISTRIBUTE BY t2
)
```
or
```
SELECT *
FROM t1
WHERE t1.key NOT IN (
SELECT t2.key
FROM t2 // t2 is bucketed table on column key
)
```
I feel we probably need a separate physical plan rule after
`EnsureRequirements` to pass the null-aware anti join information from shuffled
hash join to its child shuffle exchange operator.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]