Github user gengliangwang commented on a diff in the pull request:
https://github.com/apache/spark/pull/20069#discussion_r158697347
--- Diff:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
---
@@ -875,13 +875,15 @@ object PushDownPredicate extends Rule[LogicalPlan]
with PredicateHelper {
}
case filter @ Filter(condition, watermark: EventTimeWatermark) =>
- // We can only push deterministic predicates which don't reference
the watermark attribute.
- // We could in theory span() only on determinism and pull out
deterministic predicates
- // on the watermark separately. But it seems unnecessary and a bit
confusing to not simply
- // use the prefix as we do for nondeterminism in other cases.
-
- val (pushDown, stayUp) = splitConjunctivePredicates(condition).span(
- p => p.deterministic &&
!p.references.contains(watermark.eventTime))
+ val (pushDown, stayUp) = {
+ val pushDownCondition: Expression => Boolean =
+ p => p.deterministic &&
!p.references.contains(watermark.eventTime)
+ if (SQLConf.get.outOfOrderPredicateEvaluationEnabled) {
--- End diff --
Nit: Create a function which accepts parameter `Expression => Boolean` and
`predicates: Seq[Expression]`, and `partitionByDeterminism` is using the
function parameter as `e => e. deterministic`.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]