[GitHub] spark pull request #18925: [SPARK-21713][SC] Replace streaming bit with Outp...
Github user joseph-torres closed the pull request at: https://github.com/apache/spark/pull/18925 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18925: [SPARK-21713][SC] Replace streaming bit with Outp...
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/18925#discussion_r132795536 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala --- @@ -779,10 +780,16 @@ case object OneRowRelation extends LeafNode { } /** A logical plan for `dropDuplicates`. */ +case object Deduplicate { + def apply(keys: Seq[Attribute], child: LogicalPlan): Deduplicate = { +Deduplicate(keys, child, child.outputMode) + } +} + case class Deduplicate( keys: Seq[Attribute], child: LogicalPlan, -streaming: Boolean) extends UnaryNode { +originalOutputMode: OutputMode) extends UnaryNode { --- End diff -- The intent here is that callers who need a Deduplicate will use the two-argument form in the Object, which will then use the constructor to preserve the output mode of the child. A val defined inside the case class isn't accounted for by copy(), which caused test failures when I tried it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18925: [SPARK-21713][SC] Replace streaming bit with Outp...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/18925#discussion_r132791950 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala --- @@ -779,10 +780,16 @@ case object OneRowRelation extends LeafNode { } /** A logical plan for `dropDuplicates`. */ +case object Deduplicate { + def apply(keys: Seq[Attribute], child: LogicalPlan): Deduplicate = { +Deduplicate(keys, child, child.outputMode) + } +} + case class Deduplicate( keys: Seq[Attribute], child: LogicalPlan, -streaming: Boolean) extends UnaryNode { +originalOutputMode: OutputMode) extends UnaryNode { --- End diff -- Can we drop this? Can it just preserve the output mode of its child? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18925: [SPARK-21713][SC] Replace streaming bit with Outp...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/18925#discussion_r132791711 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1185,7 +1186,7 @@ object ReplaceDistinctWithAggregate extends Rule[LogicalPlan] { */ object ReplaceDeduplicateWithAggregate extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { -case Deduplicate(keys, child, streaming) if !streaming => +case d @ Deduplicate(keys, child, _) if d.originalOutputMode != OutputMode.Append() => --- End diff -- existing: It's kind of odd that this decision is made in the optimizer and not the query planner. I think our aggregate operator is actually worse than the specialized deduplication operator (since the specialized one is non-blocking). It doesn't have to be in this PR, but we should probably move this to the planner eventually. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18925: [SPARK-21713][SC] Replace streaming bit with Outp...
GitHub user joseph-torres opened a pull request: https://github.com/apache/spark/pull/18925 [SPARK-21713][SC] Replace streaming bit with OutputMode ## What changes were proposed in this pull request? * Replace LogicalPlan.isStreaming with output mode. * Replace Deduplicate.streaming with output mode. Note that this is an implementation-only change, so it deliberately does not change isStreaming in the Dataset API. ## How was this patch tested? refactoring only - ran existing unit tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/joseph-torres/spark SC-8027 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/18925.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #18925 commit 4d8fe6e3e0300b847cbf11a8c29b9cda696bb238 Author: Jose TorresDate: 2017-08-10T16:59:40Z Rename FlatMapGroupsWithState.outputMode to funcOutputMode. commit aacf0592e7720e4784377714673cc4d2151be66d Author: Jose Torres Date: 2017-08-10T16:24:56Z partial commit e23b1d0c7563eee61697b7a3a4a1a3f6fe1d Author: Jose Torres Date: 2017-08-10T17:54:54Z Replace LogicalPlan.isStreaming with outputMode Append() and Complete(). commit d2f7e604f58071f7f14c68f9760e1b5d0b705487 Author: Jose Torres Date: 2017-08-11T17:25:52Z Replace Deduplicate.streaming with output mode. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org