Github user gengliangwang commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20069#discussion_r158697347
  
    --- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
    @@ -875,13 +875,15 @@ object PushDownPredicate extends Rule[LogicalPlan] 
with PredicateHelper {
           }
     
         case filter @ Filter(condition, watermark: EventTimeWatermark) =>
    -      // We can only push deterministic predicates which don't reference 
the watermark attribute.
    -      // We could in theory span() only on determinism and pull out 
deterministic predicates
    -      // on the watermark separately. But it seems unnecessary and a bit 
confusing to not simply
    -      // use the prefix as we do for nondeterminism in other cases.
    -
    -      val (pushDown, stayUp) = splitConjunctivePredicates(condition).span(
    -        p => p.deterministic && 
!p.references.contains(watermark.eventTime))
    +      val (pushDown, stayUp) = {
    +        val pushDownCondition: Expression => Boolean =
    +          p => p.deterministic && 
!p.references.contains(watermark.eventTime)
    +        if (SQLConf.get.outOfOrderPredicateEvaluationEnabled) {
    --- End diff --
    
    Nit: Create a function which accepts parameter `Expression => Boolean` and 
`predicates: Seq[Expression]`, and `partitionByDeterminism` is using the 
function parameter as `e => e. deterministic`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to