unigof commented on code in PR #37630:
URL: https://github.com/apache/spark/pull/37630#discussion_r1427878186
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala:
##########
@@ -223,87 +224,383 @@ object MergeScalarSubqueries extends Rule[LogicalPlan] {
}
}
- // Recursively traverse down and try merging 2 plans. If merge is possible
then return the merged
- // plan with the attribute mapping from the new to the merged version.
- // Please note that merging arbitrary plans can be complicated, the current
version supports only
- // some of the most important nodes.
+ /**
+ * Recursively traverse down and try merging 2 plans.
+ *
+ * Please note that merging arbitrary plans can be complicated, the current
version supports only
+ * some of the most important nodes.
+ *
+ * @param newPlan a new plan that we want to merge to an already processed
plan
+ * @param cachedPlan a plan that we already processed, it can be either an
original plan or a
+ * merged version of 2 or more plans
+ * @param filterPropagationSupported a boolean flag that we propagate down
to signal we have seen
+ * an `Aggregate` node where propagated
filters can be merged
+ * @return A tuple of:
+ * - the merged plan,
+ * - the attribute mapping from the new to the merged version,
+ * - the 2 optional filters of both plans that we need to propagate
up and merge in
+ * an ancestor `Aggregate` node if possible,
+ * - the optional accumulated extra cost of merge that we need to
propagate up and
+ * check in the ancestor `Aggregate` node.
+ * The cost is optional to signal if the cost needs to be taken into
account up in the
+ * `Aggregate` node to decide about merge.
+ */
private def tryMergePlans(
newPlan: LogicalPlan,
- cachedPlan: LogicalPlan): Option[(LogicalPlan, AttributeMap[Attribute])]
= {
- checkIdenticalPlans(newPlan, cachedPlan).map(cachedPlan -> _).orElse(
+ cachedPlan: LogicalPlan,
+ filterPropagationSupported: Boolean):
+ Option[(LogicalPlan, AttributeMap[Attribute], Option[Expression],
Option[Expression],
+ Option[Double])] = {
+ checkIdenticalPlans(newPlan, cachedPlan).map { outputMap =>
+ // Currently the cost is always propagated up when
`filterPropagationSupported` is true but
+ // later we can address cases when we don't need to take cost into
account. Please find the
+ // details at the `Filter` node handling.
+ val mergeCost = if (filterPropagationSupported) Some(0d) else None
+
+ (cachedPlan, outputMap, None, None, mergeCost)
+ }.orElse(
(newPlan, cachedPlan) match {
Review Comment:
Could you add DSv2 support(especially parquet) for this pr?
I can test it's performance in our production env, thank you very much
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]