[GitHub] spark pull request #18790: [SPARK-21587][SS] Added pushdown through watermar...
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/18790#discussion_r132238309 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -872,6 +886,25 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper { pushDownPredicate(filter, u.child) { predicate => u.withNewChildren(Seq(Filter(predicate, u.child))) } + +case filter @ Filter(condition, watermark: EventTimeWatermark) => --- End diff -- Done. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18790: [SPARK-21587][SS] Added pushdown through watermar...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/18790#discussion_r132095065 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -410,6 +413,14 @@ object PushProjectionThroughUnion extends Rule[LogicalPlan] with PredicateHelper } else { p } + +case p @ Project(projectList, watermark: EventTimeWatermark) => + // Push as long as the project doesn't eliminates the attribute. --- End diff -- Do we still need `Project` when `projectList` are identical to the output of `EventTimeWatermark`, except the metadata? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18790: [SPARK-21587][SS] Added pushdown through watermar...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/18790#discussion_r132094610 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -410,6 +413,14 @@ object PushProjectionThroughUnion extends Rule[LogicalPlan] with PredicateHelper } else { p } + +case p @ Project(projectList, watermark: EventTimeWatermark) => + // Push as long as the project doesn't eliminates the attribute. --- End diff -- Do we need to check `deterministic` here, like what we do for the above case? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18790: [SPARK-21587][SS] Added pushdown through watermar...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/18790#discussion_r132093307 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -872,6 +886,25 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper { pushDownPredicate(filter, u.child) { predicate => u.withNewChildren(Seq(Filter(predicate, u.child))) } + +case filter @ Filter(condition, watermark: EventTimeWatermark) => --- End diff -- I see. How about moving this case above `UnaryNode`? The following two functions `canPushThrough ` and `pushDownPredicate ` are just for UnaryNode. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18790: [SPARK-21587][SS] Added pushdown through watermar...
GitHub user joseph-torres reopened a pull request: https://github.com/apache/spark/pull/18790 [SPARK-21587][SS] Added pushdown through watermarks. ## What changes were proposed in this pull request? * Filter predicates can be pushed through EventTimeWatermark if they're deterministic and do not reference the watermarked attribute. * Projects can be pushed through EventTimeWatermark if they include the watermarked attribute. * Limits can be pushed through EventTimeWatermark. ## How was this patch tested? unit tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/joseph-torres/spark SPARK-21587 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/18790.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #18790 commit 9cc8da5dbc5be9b7b663e002097214e3c0720801 Author: Jose TorresDate: 2017-07-31T22:58:03Z Added pushdown below watermarks. commit 154d34820be73f7d20bf1119fb385940d0ce6455 Author: Jose Torres Date: 2017-08-01T16:47:32Z Push Project, Limit, and Filter through watermark when appropriate. commit 84575b60609a3efc9824eba96541011a99313a63 Author: Jose Torres Date: 2017-08-07T20:50:29Z Remove pushdown limit through watermark. commit 4cae8973f52078afae2a9d92d59c91edaab0ba88 Author: Jose Torres Date: 2017-08-09T01:56:46Z remove leaked import --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18790: [SPARK-21587][SS] Added pushdown through watermar...
Github user joseph-torres closed the pull request at: https://github.com/apache/spark/pull/18790 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18790: [SPARK-21587][SS] Added pushdown through watermar...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/18790#discussion_r132063427 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala --- @@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.expressions.Add import org.apache.spark.sql.catalyst.plans.{FullOuter, LeftOuter, PlanTest, RightOuter} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ +import org.apache.spark.unsafe.types.CalendarInterval --- End diff -- is this needed? there does not seem to any change in this file. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18790: [SPARK-21587][SS] Added pushdown through watermar...
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/18790#discussion_r131761258 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -872,6 +886,25 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper { pushDownPredicate(filter, u.child) { predicate => u.withNewChildren(Seq(Filter(predicate, u.child))) } + +case filter @ Filter(condition, watermark: EventTimeWatermark) => --- End diff -- For filter, the logic has a subtle additional condition. We don't want to push down filters on the watermark attribute, because: * they'll be at least as expensive to evaluate as the watermark operator * partition pruning shouldn't apply since there won't be useful partitions on an event time For project, I don't see a rule for UnaryNode anywhere. I might have missed it. For limit, I actually removed the rule for EventTimeWatermark that I originally added, since it does drop rows in some situations. So I don't think that making EventTimeWatermark subclass UnaryNode would avoid any of the code in this PR. I agree it seems appropriate, but it also seems orthogonal. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18790: [SPARK-21587][SS] Added pushdown through watermar...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/18790#discussion_r131533778 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -872,6 +886,25 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper { pushDownPredicate(filter, u.child) { predicate => u.withNewChildren(Seq(Filter(predicate, u.child))) } + +case filter @ Filter(condition, watermark: EventTimeWatermark) => --- End diff -- Why not changing `EventTimeWatermark ` to `UnaryNode`? Then, we do not need to write a separate case only for `EventTimeWatermark`. We can reuse the existing `pushDownPredicate`, right? We also have the other rules that already consider `UnaryNode`, do you think it make sense to avoid duplicating the codes for `EventTimeWatermark ` only? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org