[GitHub] spark pull request #18925: [SPARK-21713][SC] Replace streaming bit with Outp...

2017-08-22 Thread joseph-torres
Github user joseph-torres closed the pull request at:

https://github.com/apache/spark/pull/18925


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18925: [SPARK-21713][SC] Replace streaming bit with Outp...

2017-08-11 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/18925#discussion_r132795536
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 ---
@@ -779,10 +780,16 @@ case object OneRowRelation extends LeafNode {
 }
 
 /** A logical plan for `dropDuplicates`. */
+case object Deduplicate {
+  def apply(keys: Seq[Attribute], child: LogicalPlan): Deduplicate = {
+Deduplicate(keys, child, child.outputMode)
+  }
+}
+
 case class Deduplicate(
 keys: Seq[Attribute],
 child: LogicalPlan,
-streaming: Boolean) extends UnaryNode {
+originalOutputMode: OutputMode) extends UnaryNode {
--- End diff --

The intent here is that callers who need a Deduplicate will use the 
two-argument form in the Object, which will then use the constructor to 
preserve the output mode of the child.

A val defined inside the case class isn't accounted for by copy(), which 
caused test failures when I tried it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18925: [SPARK-21713][SC] Replace streaming bit with Outp...

2017-08-11 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/18925#discussion_r132791950
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 ---
@@ -779,10 +780,16 @@ case object OneRowRelation extends LeafNode {
 }
 
 /** A logical plan for `dropDuplicates`. */
+case object Deduplicate {
+  def apply(keys: Seq[Attribute], child: LogicalPlan): Deduplicate = {
+Deduplicate(keys, child, child.outputMode)
+  }
+}
+
 case class Deduplicate(
 keys: Seq[Attribute],
 child: LogicalPlan,
-streaming: Boolean) extends UnaryNode {
+originalOutputMode: OutputMode) extends UnaryNode {
--- End diff --

Can we drop this?  Can it just preserve the output mode of its child?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18925: [SPARK-21713][SC] Replace streaming bit with Outp...

2017-08-11 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/18925#discussion_r132791711
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1185,7 +1186,7 @@ object ReplaceDistinctWithAggregate extends 
Rule[LogicalPlan] {
  */
 object ReplaceDeduplicateWithAggregate extends Rule[LogicalPlan] {
   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
-case Deduplicate(keys, child, streaming) if !streaming =>
+case d @ Deduplicate(keys, child, _) if d.originalOutputMode != 
OutputMode.Append() =>
--- End diff --

existing: It's kind of odd that this decision is made in the optimizer and 
not the query planner.  I think our aggregate operator is actually worse than 
the specialized deduplication operator (since the specialized one is 
non-blocking).

It doesn't have to be in this PR, but we should probably move this to the 
planner eventually.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18925: [SPARK-21713][SC] Replace streaming bit with Outp...

2017-08-11 Thread joseph-torres
GitHub user joseph-torres opened a pull request:

https://github.com/apache/spark/pull/18925

[SPARK-21713][SC] Replace streaming bit with OutputMode

## What changes were proposed in this pull request?

* Replace LogicalPlan.isStreaming with output mode.
* Replace Deduplicate.streaming with output mode.

Note that this is an implementation-only change, so it deliberately does 
not change isStreaming in the Dataset API.

## How was this patch tested?

refactoring only - ran existing unit tests


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/joseph-torres/spark SC-8027

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/18925.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #18925


commit 4d8fe6e3e0300b847cbf11a8c29b9cda696bb238
Author: Jose Torres 
Date:   2017-08-10T16:59:40Z

Rename FlatMapGroupsWithState.outputMode to funcOutputMode.

commit aacf0592e7720e4784377714673cc4d2151be66d
Author: Jose Torres 
Date:   2017-08-10T16:24:56Z

partial

commit e23b1d0c7563eee61697b7a3a4a1a3f6fe1d
Author: Jose Torres 
Date:   2017-08-10T17:54:54Z

Replace LogicalPlan.isStreaming with outputMode Append() and Complete().

commit d2f7e604f58071f7f14c68f9760e1b5d0b705487
Author: Jose Torres 
Date:   2017-08-11T17:25:52Z

Replace Deduplicate.streaming with output mode.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org