agrawaldevesh commented on a change in pull request #29455:
URL: https://github.com/apache/spark/pull/29455#discussion_r472579634
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateNullAwareAntiJoin.scala
##########
@@ -20,22 +20,50 @@ package org.apache.spark.sql.execution.adaptive
import
org.apache.spark.sql.catalyst.planning.ExtractSingleColumnNullAwareAntiJoin
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
import org.apache.spark.sql.execution.joins.EmptyHashedRelationWithAllNullKeys
+import
org.apache.spark.sql.execution.metric.SQLShuffleWriteMetricsReporter.SHUFFLE_RECORDS_WRITTEN
/**
- * This optimization rule detects and convert a NAAJ to an Empty LocalRelation
- * when buildSide is EmptyHashedRelationWithAllNullKeys.
+ * This optimization rule try to Eliminate NAAJ by following patterns.
+ * 1. Convert a NAAJ to an Empty LocalRelation when buildSide is
EmptyHashedRelationWithAllNullKeys.
+ * 2. Convert a NAAJ to left plan when buildSide is empty.
*/
object EliminateNullAwareAntiJoin extends Rule[LogicalPlan] {
- private def canEliminate(plan: LogicalPlan): Boolean = plan match {
+ private def canConvertToEmptyLocalRelation(plan: LogicalPlan): Boolean =
plan match {
case LogicalQueryStage(_, stage: BroadcastQueryStageExec) if
stage.resultOption.get().isDefined
&& stage.broadcast.relationFuture.get().value ==
EmptyHashedRelationWithAllNullKeys => true
+ case LogicalQueryStage(_, stage: ShuffleQueryStageExec) if
stage.resultOption.get().isDefined =>
+ // Equivalent to EmptyHashedRelationWithAllNullKeys
+ val nullPartitionKeyMetrics =
stage.shuffle.asInstanceOf[ShuffleExchangeExec]
+ .metrics.get(ShuffleExchangeExec.NULL_PARTITION_KEY_RECORDS_WRITTEN)
+ if (nullPartitionKeyMetrics.isDefined &&
nullPartitionKeyMetrics.get.value > 0L) {
+ true
+ } else {
+ false
+ }
+ case _ => false
+ }
+
+ private def canConvertToLeftPlan(plan: LogicalPlan): Boolean = plan match {
+ case LogicalQueryStage(_, stage: ShuffleQueryStageExec) if
stage.resultOption.get().isDefined =>
+ // Equivalent to EmptyHashedRelation
Review comment:
same comment as above
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateNullAwareAntiJoin.scala
##########
@@ -20,22 +20,50 @@ package org.apache.spark.sql.execution.adaptive
import
org.apache.spark.sql.catalyst.planning.ExtractSingleColumnNullAwareAntiJoin
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
import org.apache.spark.sql.execution.joins.EmptyHashedRelationWithAllNullKeys
+import
org.apache.spark.sql.execution.metric.SQLShuffleWriteMetricsReporter.SHUFFLE_RECORDS_WRITTEN
/**
- * This optimization rule detects and convert a NAAJ to an Empty LocalRelation
- * when buildSide is EmptyHashedRelationWithAllNullKeys.
+ * This optimization rule try to Eliminate NAAJ by following patterns.
+ * 1. Convert a NAAJ to an Empty LocalRelation when buildSide is
EmptyHashedRelationWithAllNullKeys.
+ * 2. Convert a NAAJ to left plan when buildSide is empty.
*/
object EliminateNullAwareAntiJoin extends Rule[LogicalPlan] {
- private def canEliminate(plan: LogicalPlan): Boolean = plan match {
+ private def canConvertToEmptyLocalRelation(plan: LogicalPlan): Boolean =
plan match {
case LogicalQueryStage(_, stage: BroadcastQueryStageExec) if
stage.resultOption.get().isDefined
&& stage.broadcast.relationFuture.get().value ==
EmptyHashedRelationWithAllNullKeys => true
+ case LogicalQueryStage(_, stage: ShuffleQueryStageExec) if
stage.resultOption.get().isDefined =>
+ // Equivalent to EmptyHashedRelationWithAllNullKeys
+ val nullPartitionKeyMetrics =
stage.shuffle.asInstanceOf[ShuffleExchangeExec]
+ .metrics.get(ShuffleExchangeExec.NULL_PARTITION_KEY_RECORDS_WRITTEN)
+ if (nullPartitionKeyMetrics.isDefined &&
nullPartitionKeyMetrics.get.value > 0L) {
Review comment:
nit : I think the if condition can be removed and perhaps changed to
(return) `nullPartitionKeyMetrics.exists(_ > 0)` :-P
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateNullAwareAntiJoin.scala
##########
@@ -20,22 +20,50 @@ package org.apache.spark.sql.execution.adaptive
import
org.apache.spark.sql.catalyst.planning.ExtractSingleColumnNullAwareAntiJoin
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
import org.apache.spark.sql.execution.joins.EmptyHashedRelationWithAllNullKeys
+import
org.apache.spark.sql.execution.metric.SQLShuffleWriteMetricsReporter.SHUFFLE_RECORDS_WRITTEN
/**
- * This optimization rule detects and convert a NAAJ to an Empty LocalRelation
- * when buildSide is EmptyHashedRelationWithAllNullKeys.
+ * This optimization rule try to Eliminate NAAJ by following patterns.
+ * 1. Convert a NAAJ to an Empty LocalRelation when buildSide is
EmptyHashedRelationWithAllNullKeys.
+ * 2. Convert a NAAJ to left plan when buildSide is empty.
*/
object EliminateNullAwareAntiJoin extends Rule[LogicalPlan] {
- private def canEliminate(plan: LogicalPlan): Boolean = plan match {
+ private def canConvertToEmptyLocalRelation(plan: LogicalPlan): Boolean =
plan match {
case LogicalQueryStage(_, stage: BroadcastQueryStageExec) if
stage.resultOption.get().isDefined
&& stage.broadcast.relationFuture.get().value ==
EmptyHashedRelationWithAllNullKeys => true
+ case LogicalQueryStage(_, stage: ShuffleQueryStageExec) if
stage.resultOption.get().isDefined =>
+ // Equivalent to EmptyHashedRelationWithAllNullKeys
Review comment:
nit: I am not sure if this comment is still very clear. It reflects the
history of the code but not its current state.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
##########
@@ -275,8 +282,17 @@ object ShuffleExchangeExec {
position
}
case h: HashPartitioning =>
- val projection = UnsafeProjection.create(h.partitionIdExpression ::
Nil, outputAttributes)
- row => projection(row).getInt(0)
+ val extraExprs = if (isNullAwareAntiJoin) h.expressions.head :: Nil
else Nil
Review comment:
Can we name extraExprs explicitly ? I am wondering if having this as an
explicit different case branch (ie `case h : HashPartitioning if
(isNullAwareAntiJoin)` would be in fact clearer ?
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
##########
@@ -83,15 +83,18 @@ trait ShuffleExchangeLike extends Exchange {
case class ShuffleExchangeExec(
override val outputPartitioning: Partitioning,
child: SparkPlan,
- noUserSpecifiedNumPartition: Boolean = true) extends ShuffleExchangeLike {
+ noUserSpecifiedNumPartition: Boolean = true,
+ isNullAwareAntiJoin: Boolean = false) extends ShuffleExchangeLike {
Review comment:
> sorry I think I should be talking with data in first place. I just
checked and found in our case, less than 10% of join is using
`BroadcastNestedLoopJoin` and amount of CPU is also single digit percentage.
Our workload by no means is representative for industry, would like to hear
more of based on data.
First, some of the interest in improving single key NAAJ stems from
TPCH-Q16: Benchmarketing. Without this optimization that one query can easily
be a huge portion of the total TPCH runtime and skew the results.
Having said that, one of the chief reasons to optimize NOT-IN is in terms of
avoiding customer surprises. Often, people will trip on this query after an
innocuous change. This is particularly true when customers are submitting
queries via a UI tool (where you don't have control over the query generated by
the tool) or when they innocently introduce this query via a Notebook.
It's not very interesting to optimize in the batch sense, because you have
some flexibility to change the pipeline to use Not-Exists instead of Not-In
(which is not planned as an NAAJ).
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateNullAwareAntiJoin.scala
##########
@@ -20,22 +20,50 @@ package org.apache.spark.sql.execution.adaptive
import
org.apache.spark.sql.catalyst.planning.ExtractSingleColumnNullAwareAntiJoin
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
import org.apache.spark.sql.execution.joins.EmptyHashedRelationWithAllNullKeys
+import
org.apache.spark.sql.execution.metric.SQLShuffleWriteMetricsReporter.SHUFFLE_RECORDS_WRITTEN
/**
- * This optimization rule detects and convert a NAAJ to an Empty LocalRelation
- * when buildSide is EmptyHashedRelationWithAllNullKeys.
+ * This optimization rule try to Eliminate NAAJ by following patterns.
+ * 1. Convert a NAAJ to an Empty LocalRelation when buildSide is
EmptyHashedRelationWithAllNullKeys.
+ * 2. Convert a NAAJ to left plan when buildSide is empty.
*/
object EliminateNullAwareAntiJoin extends Rule[LogicalPlan] {
- private def canEliminate(plan: LogicalPlan): Boolean = plan match {
+ private def canConvertToEmptyLocalRelation(plan: LogicalPlan): Boolean =
plan match {
case LogicalQueryStage(_, stage: BroadcastQueryStageExec) if
stage.resultOption.get().isDefined
&& stage.broadcast.relationFuture.get().value ==
EmptyHashedRelationWithAllNullKeys => true
+ case LogicalQueryStage(_, stage: ShuffleQueryStageExec) if
stage.resultOption.get().isDefined =>
+ // Equivalent to EmptyHashedRelationWithAllNullKeys
+ val nullPartitionKeyMetrics =
stage.shuffle.asInstanceOf[ShuffleExchangeExec]
Review comment:
How accurate are metrics in spark ? Can they be double or under counted
?
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
##########
@@ -52,7 +54,14 @@ case class EnsureRequirements(conf: SQLConf) extends
Rule[SparkPlan] {
case (child, distribution) =>
val numPartitions = distribution.requiredNumPartitions
.getOrElse(conf.numShufflePartitions)
- ShuffleExchangeExec(distribution.createPartitioning(numPartitions),
child)
+
+ operator match {
+ case shj @ ShuffledHashJoinExec(_, _, LeftAnti, BuildRight, _, _, _,
true)
Review comment:
A one line comment explaining this would be awesome.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateNullAwareAntiJoin.scala
##########
@@ -20,22 +20,50 @@ package org.apache.spark.sql.execution.adaptive
import
org.apache.spark.sql.catalyst.planning.ExtractSingleColumnNullAwareAntiJoin
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
import org.apache.spark.sql.execution.joins.EmptyHashedRelationWithAllNullKeys
+import
org.apache.spark.sql.execution.metric.SQLShuffleWriteMetricsReporter.SHUFFLE_RECORDS_WRITTEN
/**
- * This optimization rule detects and convert a NAAJ to an Empty LocalRelation
- * when buildSide is EmptyHashedRelationWithAllNullKeys.
+ * This optimization rule try to Eliminate NAAJ by following patterns.
+ * 1. Convert a NAAJ to an Empty LocalRelation when buildSide is
EmptyHashedRelationWithAllNullKeys.
+ * 2. Convert a NAAJ to left plan when buildSide is empty.
*/
object EliminateNullAwareAntiJoin extends Rule[LogicalPlan] {
- private def canEliminate(plan: LogicalPlan): Boolean = plan match {
+ private def canConvertToEmptyLocalRelation(plan: LogicalPlan): Boolean =
plan match {
case LogicalQueryStage(_, stage: BroadcastQueryStageExec) if
stage.resultOption.get().isDefined
&& stage.broadcast.relationFuture.get().value ==
EmptyHashedRelationWithAllNullKeys => true
+ case LogicalQueryStage(_, stage: ShuffleQueryStageExec) if
stage.resultOption.get().isDefined =>
+ // Equivalent to EmptyHashedRelationWithAllNullKeys
+ val nullPartitionKeyMetrics =
stage.shuffle.asInstanceOf[ShuffleExchangeExec]
+ .metrics.get(ShuffleExchangeExec.NULL_PARTITION_KEY_RECORDS_WRITTEN)
+ if (nullPartitionKeyMetrics.isDefined &&
nullPartitionKeyMetrics.get.value > 0L) {
+ true
+ } else {
+ false
+ }
+ case _ => false
+ }
+
+ private def canConvertToLeftPlan(plan: LogicalPlan): Boolean = plan match {
Review comment:
nit: I somehow expect to see a `canConvertToRightPlan` when I encounter
a method called `canConvertToLeftPlan`. I am wondering if this can be renamed
to `canRemoveAntiJoin` ?
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
##########
@@ -225,7 +231,8 @@ object ShuffleExchangeExec {
outputAttributes: Seq[Attribute],
newPartitioning: Partitioning,
serializer: Serializer,
- writeMetrics: Map[String, SQLMetric])
+ writeMetrics: Map[String, SQLMetric],
+ isNullAwareAntiJoin: Boolean = false)
Review comment:
I am wondering if this should be renamed to something like
`countRowsWithNullPartitionKey` ? In the context of an exchange, having a
variable that is very join specific is a bit weird, because join is but one of
the things that causes an exchange.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -317,4 +318,41 @@ case class ShuffledHashJoinExec(
v => s"$v = $thisPlan.buildHashedRelation(inputs[1]);", forceInline =
true)
HashedRelationInfo(relationTerm, keyIsUnique = false, isEmpty = false)
}
+
+ /**
+ * Generates the code for anti join.
+ * Handles NULL-aware anti join (NAAJ) separately here.
+ */
+ protected override def codegenAnti(ctx: CodegenContext, input:
Seq[ExprCode]): String = {
+ if (isNullAwareAntiJoin) {
+ val HashedRelationInfo(relationTerm, _, _) = prepareRelation(ctx)
+ val (keyEv, anyNull) = genStreamSideJoinKey(ctx, input)
+ val (matched, _, _) = getJoinCondition(ctx, input)
+ val numOutput = metricTerm(ctx, "numOutputRows")
+ val found = ctx.freshName("found")
+
+ // Skip code check on EmtpyHashedRelation and
EmptyHashedRelationWithAllNullKeys,
Review comment:
Did you mean skip code generation for EmtpyHashedRelation and
EmptyHashedRelationWithAllNullKeys cases ?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]