cloud-fan commented on code in PR #52149: URL: https://github.com/apache/spark/pull/52149#discussion_r2318853541
########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala: ########## @@ -1191,6 +1194,67 @@ object CollapseProject extends Rule[LogicalPlan] with AliasHelper { } } + private def cheapToInlineProducer( + producer: NamedExpression, + relatedConsumers: Iterable[Expression]) = trimAliases(producer) match { + // These collection creation functions are not cheap as a producer, but we have + // optimizer rules that can optimize them out if they are only consumed by + // ExtractValue (See SimplifyExtractValueOps), so we need to allow to inline them to + // avoid perf regression. As an example: + // Project(s.a, s.b, Project(create_struct(a, b, c) as s, child)) + // We should collapse these two projects and eventually get Project(a, b, child) + case e @ (_: CreateNamedStruct | _: UpdateFields | _: CreateMap | _: CreateArray) => + // We can inline the collection creation producer if at most one of its access + // is non-cheap. Cheap access here means the access can be optimized by + // `SimplifyExtractValueOps` and become a cheap expression. For example, + // `create_struct(a, b, c).a` is a cheap access as it can be optimized to `a`. + // For a query: + // Project(s.a, s, Project(create_struct(a, b, c) as s, child)) + // We should collapse these two projects and eventually get + // Project(a, create_struct(a, b, c) as s, child) + var nonCheapAccessSeen = false + def nonCheapAccessVisitor(): Boolean = { + // Returns true for all calls after the first. + try { + nonCheapAccessSeen + } finally { + nonCheapAccessSeen = true + } + } + + !relatedConsumers + .exists(findNonCheapAccesses(_, producer.toAttribute, e, nonCheapAccessVisitor)) + + case other => isCheap(other) + } + + private def mergeProjectExpressions( + consumers: Seq[NamedExpression], + producers: Seq[NamedExpression], + alwaysInline: Boolean): (Seq[NamedExpression], Seq[NamedExpression]) = { + lazy val producerAttributes = AttributeSet(producers.collect { case a: Alias => a.toAttribute }) + lazy val producerReferences = AttributeMap(consumers + .flatMap(e => collectReferences(e).filter(producerAttributes.contains).map(_ -> e)) + .groupMap(_._1)(_._2) + .transform((_, v) => (v.size, ExpressionSet(v)))) + + val (substitute, keep) = producers.partition { + case a: Alias if producerReferences.contains(a.toAttribute) => + val (count, relatedConsumers) = producerReferences(a.toAttribute) + a.deterministic && + (alwaysInline || count == 1 || cheapToInlineProducer(a, relatedConsumers)) + + case _ => true + } + + val substituted = buildCleanedProjectList(consumers, substitute) + if (keep.isEmpty) { + (Seq.empty, substituted) + } else { + (substituted, keep ++ AttributeSet(substitute.flatMap(_.references))) Review Comment: I do like this new approach that allows partial merging of two Projects. It's a net win and we should merge it first. But we should also reconsider the cost model for python UDFs. Merging them into a single Project is a huge win, and we should do it even if we need to replicate some expensive expressions. We can discuss it in a followup PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org