cloud-fan commented on a change in pull request #24164: [SPARK-27225][SQL]
Implement join strategy hints
URL: https://github.com/apache/spark/pull/24164#discussion_r273340869
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/EliminateResolvedHint.scala
##########
@@ -30,29 +30,58 @@ object EliminateResolvedHint extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = {
val pulledUp = plan transformUp {
case j: Join =>
- val leftHint = mergeHints(collectHints(j.left))
- val rightHint = mergeHints(collectHints(j.right))
- j.copy(hint = JoinHint(leftHint, rightHint))
+ val (newLeft, leftHints) = extractHintsFromPlan(j.left)
+ val (newRight, rightHints) = extractHintsFromPlan(j.right)
+ val newJoinHint = JoinHint(mergeHints(leftHints),
mergeHints(rightHints))
+ j.copy(left = newLeft, right = newRight, hint = newJoinHint)
}
pulledUp.transformUp {
- case h: ResolvedHint => h.child
+ case h: ResolvedHint =>
+ handleInvalidHintInfo(h.hints)
+ h.child
}
}
+ /**
+ * Combine a list of [[HintInfo]]s into one [[HintInfo]].
+ */
private def mergeHints(hints: Seq[HintInfo]): Option[HintInfo] = {
- hints.reduceOption((h1, h2) => h1.merge(h2))
+ hints.reduceOption((h1, h2) => h1.merge(h2, handleOverriddenHintInfo))
}
- private def collectHints(plan: LogicalPlan): Seq[HintInfo] = {
+ /**
+ * Extract all hints from the plan, returning a list of extracted hints and
the transformed plan
+ * with [[ResolvedHint]] nodes removed. The returned hint list comes in
top-down order.
+ * Note that hints can only be extracted from under certain nodes. Those
that cannot be extracted
+ * in this method will be cleaned up later by this rule, and may emit
warnings depending on the
+ * configurations.
+ */
+ private def extractHintsFromPlan(plan: LogicalPlan): (LogicalPlan,
Seq[HintInfo]) = {
plan match {
- case h: ResolvedHint => h.hints +: collectHints(h.child)
- case u: UnaryNode => collectHints(u.child)
+ case h: ResolvedHint =>
+ val (plan, hints) = extractHintsFromPlan(h.child)
+ (plan, h.hints +: hints)
+ case u: UnaryNode =>
+ val (plan, hints) = extractHintsFromPlan(u.child)
+ (u.withNewChildren(Seq(plan)), hints)
// TODO revisit this logic:
// except and intersect are semi/anti-joins which won't return more data
then
- // their left argument, so the join strategy hint should be propagated
here
- case i: Intersect => collectHints(i.left)
- case e: Except => collectHints(e.left)
- case _ => Seq.empty
+ // their left argument, so the broadcast hint should be propagated here
+ case i: Intersect =>
+ val (plan, hints) = extractHintsFromPlan(i.left)
+ (i.copy(left = plan), hints)
+ case e: Except =>
+ val (plan, hints) = extractHintsFromPlan(e.left)
+ (e.copy(left = plan), hints)
+ case p: LogicalPlan => (p, Seq.empty)
}
}
+
+ private def handleInvalidHintInfo(hint: HintInfo): Unit = {
+ logWarning(s"A join hint $hint is specified but it is not part of a join
relation.")
+ }
+
+ private def handleOverriddenHintInfo(hint: HintInfo): Unit = {
Review comment:
it's a little weird to see this method being defined twice. Can we just log
the message inside `HintInfo.merge`?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]