[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...

2016-02-08 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/7334#discussion_r52209170
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.physical._
+
+
+/**
+ * Take the first `limit` elements and collect them to a single partition.
+ *
+ * This operator will be used when a logical `Limit` operation is the 
final operator in an
+ * logical plan, which happens when the user is collecting results back to 
the driver.
+ */
+case class CollectLimit(limit: Int, child: SparkPlan) extends UnaryNode {
+  override def output: Seq[Attribute] = child.output
+  override def outputPartitioning: Partitioning = SinglePartition
+  override def executeCollect(): Array[InternalRow] = 
child.executeTake(limit)
+  private val serializer: Serializer = new 
UnsafeRowSerializer(child.output.size)
+  protected override def doExecute(): RDD[InternalRow] = {
+val shuffled = new ShuffledRowRDD(
+  Exchange.prepareShuffleDependency(child.execute(), child.output, 
SinglePartition, serializer))
--- End diff --

In this case, the plan in SparkUI (no exchange) will not match the actual 
plan (having exchange)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...

2016-02-08 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/7334#discussion_r52213727
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.physical._
+
+
+/**
+ * Take the first `limit` elements and collect them to a single partition.
+ *
+ * This operator will be used when a logical `Limit` operation is the 
final operator in an
+ * logical plan, which happens when the user is collecting results back to 
the driver.
+ */
+case class CollectLimit(limit: Int, child: SparkPlan) extends UnaryNode {
+  override def output: Seq[Attribute] = child.output
+  override def outputPartitioning: Partitioning = SinglePartition
+  override def executeCollect(): Array[InternalRow] = 
child.executeTake(limit)
+  private val serializer: Serializer = new 
UnsafeRowSerializer(child.output.size)
+  protected override def doExecute(): RDD[InternalRow] = {
+val shuffled = new ShuffledRowRDD(
+  Exchange.prepareShuffleDependency(child.execute(), child.output, 
SinglePartition, serializer))
--- End diff --

Since the current implementation also adds an ShuffledRowRDD without 
Exchange, this is not a regression.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...

2016-02-08 Thread davies
Github user davies commented on the pull request:

https://github.com/apache/spark/pull/7334#issuecomment-181533917
  
LGTM, merging this into master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...

2016-02-08 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/7334


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...

2016-02-05 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/7334#issuecomment-180568890
  
**[Test build #50844 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50844/consoleFull)**
 for PR 7334 at commit 
[`c4b0a53`](https://github.com/apache/spark/commit/c4b0a5303b4de328e11e8bfb79e8b3e795bd1174).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...

2016-02-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7334#issuecomment-180608495
  
Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...

2016-02-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7334#issuecomment-180608496
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50844/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...

2016-02-05 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/7334#discussion_r52056901
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.physical._
+
+
+/**
+ * Take the first `limit` elements and collect them to a single partition.
+ *
+ * This operator will be used when a logical `Limit` operation is the 
final operator in an
+ * logical plan, which happens when the user is collecting results back to 
the driver.
+ */
+case class CollectLimit(limit: Int, child: SparkPlan) extends UnaryNode {
+  override def output: Seq[Attribute] = child.output
+  override def outputPartitioning: Partitioning = SinglePartition
+  override def executeCollect(): Array[InternalRow] = 
child.executeTake(limit)
+  protected override def doExecute(): RDD[InternalRow] = 
sparkContext.makeRDD(executeCollect(), 1)
--- End diff --

As discussed upthread, simply always having `GlobalLimit` would lead to 
regressions when calling `df.limit(10).collect()`, since it would now have to 
compute all partitions of `df` whereas before it might be able to compute only 
a subset.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...

2016-02-05 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/7334#discussion_r52056035
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.physical._
+
+
+/**
+ * Take the first `limit` elements and collect them to a single partition.
+ *
+ * This operator will be used when a logical `Limit` operation is the 
final operator in an
+ * logical plan, which happens when the user is collecting results back to 
the driver.
+ */
+case class CollectLimit(limit: Int, child: SparkPlan) extends UnaryNode {
+  override def output: Seq[Attribute] = child.output
+  override def outputPartitioning: Partitioning = SinglePartition
+  override def executeCollect(): Array[InternalRow] = 
child.executeTake(limit)
+  protected override def doExecute(): RDD[InternalRow] = 
sparkContext.makeRDD(executeCollect(), 1)
--- End diff --

Maybe having a special case for Limit as root is not a good idea, I think 
we could always have GlobalLimit, call `executeTake(limit)` in that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...

2016-02-05 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/7334#discussion_r52053966
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.physical._
+
+
+/**
+ * Take the first `limit` elements and collect them to a single partition.
+ *
+ * This operator will be used when a logical `Limit` operation is the 
final operator in an
+ * logical plan, which happens when the user is collecting results back to 
the driver.
+ */
+case class CollectLimit(limit: Int, child: SparkPlan) extends UnaryNode {
+  override def output: Seq[Attribute] = child.output
+  override def outputPartitioning: Partitioning = SinglePartition
+  override def executeCollect(): Array[InternalRow] = 
child.executeTake(limit)
+  protected override def doExecute(): RDD[InternalRow] = 
sparkContext.makeRDD(executeCollect(), 1)
--- End diff --

I've started working on this and ran into one minor snag: what happens if a 
user has a DataFrame whose root logical plan is an RDD, then calls `.rdd()` on 
it? In that case, `CollectLimit` will still be the root and we'll need it to 
have a functioning `doExecute()`.

On reflection, I agree that the current `doExecute()` here is bad because 
it breaks linage and laziness. I'll see about switching back to an approach 
which invokes Exchange directly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...

2016-02-05 Thread davies
Github user davies commented on the pull request:

https://github.com/apache/spark/pull/7334#issuecomment-180598004
  
LGTM, pending tests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...

2016-02-05 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/7334#issuecomment-180608312
  
**[Test build #50844 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50844/consoleFull)**
 for PR 7334 at commit 
[`c4b0a53`](https://github.com/apache/spark/commit/c4b0a5303b4de328e11e8bfb79e8b3e795bd1174).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...

2016-02-05 Thread JoshRosen
Github user JoshRosen commented on the pull request:

https://github.com/apache/spark/pull/7334#issuecomment-180527530
  
Alright, just pushed an update which I think should fix the laziness 
problems with `.rdd` and `.cache`. The key idea is to move the `ReturnAnswer` 
injection into `DataFrame.collect()` rather than always putting it at the root 
of the logical plan before invoking the planner.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...

2016-02-05 Thread JoshRosen
Github user JoshRosen commented on the pull request:

https://github.com/apache/spark/pull/7334#issuecomment-180561524
  
Jenkins, retest this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...

2016-02-05 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/7334#issuecomment-180534132
  
**[Test build #50837 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50837/consoleFull)**
 for PR 7334 at commit 
[`c4b0a53`](https://github.com/apache/spark/commit/c4b0a5303b4de328e11e8bfb79e8b3e795bd1174).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...

2016-02-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7334#issuecomment-180554345
  
Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...

2016-02-05 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/7334#issuecomment-180554215
  
**[Test build #50837 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50837/consoleFull)**
 for PR 7334 at commit 
[`c4b0a53`](https://github.com/apache/spark/commit/c4b0a5303b4de328e11e8bfb79e8b3e795bd1174).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...

2016-02-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7334#issuecomment-180554346
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50837/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...

2016-02-05 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/7334#issuecomment-180629560
  
**[Test build #50850 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50850/consoleFull)**
 for PR 7334 at commit 
[`b8c9e47`](https://github.com/apache/spark/commit/b8c9e47d5b0220c1074d686fcc62b2bb547f61ec).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...

2016-02-05 Thread JoshRosen
Github user JoshRosen commented on the pull request:

https://github.com/apache/spark/pull/7334#issuecomment-180658750
  
This should now be ready for a final sign-off.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...

2016-02-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7334#issuecomment-180646598
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...

2016-02-05 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/7334#issuecomment-180646223
  
**[Test build #50850 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50850/consoleFull)**
 for PR 7334 at commit 
[`b8c9e47`](https://github.com/apache/spark/commit/b8c9e47d5b0220c1074d686fcc62b2bb547f61ec).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...

2016-02-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7334#issuecomment-180646611
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50850/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...

2016-02-04 Thread rxin
Github user rxin commented on the pull request:

https://github.com/apache/spark/pull/7334#issuecomment-180062121
  
cc @davies for a more detailed review


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...

2016-02-04 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/7334#discussion_r51939570
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala ---
@@ -44,7 +44,7 @@ class QueryExecution(val sqlContext: SQLContext, val 
logical: LogicalPlan) {
 
   lazy val sparkPlan: SparkPlan = {
 SQLContext.setActive(sqlContext)
-sqlContext.planner.plan(optimizedPlan).next()
+sqlContext.planner.plan(ReturnAnswer(optimizedPlan)).next()
--- End diff --

Yeah, this looks good to me.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...

2016-02-04 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/7334#discussion_r51957998
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.physical._
+
+
+/**
+ * Take the first `limit` elements and collect them to a single partition.
+ *
+ * This operator will be used when a logical `Limit` operation is the 
final operator in an
+ * logical plan, which happens when the user is collecting results back to 
the driver.
+ */
+case class CollectLimit(limit: Int, child: SparkPlan) extends UnaryNode {
+  override def output: Seq[Attribute] = child.output
+  override def outputPartitioning: Partitioning = SinglePartition
+  override def executeCollect(): Array[InternalRow] = 
child.executeTake(limit)
+  protected override def doExecute(): RDD[InternalRow] = 
sparkContext.makeRDD(executeCollect(), 1)
--- End diff --

This actually _can_ get called if you call `df.limit(10).cache()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...

2016-02-04 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/7334#discussion_r51959428
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.physical._
+
+
+/**
+ * Take the first `limit` elements and collect them to a single partition.
+ *
+ * This operator will be used when a logical `Limit` operation is the 
final operator in an
+ * logical plan, which happens when the user is collecting results back to 
the driver.
+ */
+case class CollectLimit(limit: Int, child: SparkPlan) extends UnaryNode {
+  override def output: Seq[Attribute] = child.output
+  override def outputPartitioning: Partitioning = SinglePartition
+  override def executeCollect(): Array[InternalRow] = 
child.executeTake(limit)
+  protected override def doExecute(): RDD[InternalRow] = 
sparkContext.makeRDD(executeCollect(), 1)
--- End diff --

This is actually a tricky problem to address. If we want to actually plan 
`GlobalLimit` here instead then we need to be able to distinguish between the 
collect-to-driver and cache cases earlier in the planning process, which I 
think is going to be a large and non-trivial change.

If we want to avoid collecting back to the driver and re-broadcasting then 
I suppose we can do something similar to what the old `Limit` operator's 
`doExecute` did. The problem with the old `doExecute()` was that it involved a 
lot of inefficient copying and serialization. Instead, it might be worth seeing 
whether the inner logic of the `Exchange` operator could be called from 
`CollectLimit.doExecute`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...

2016-02-04 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/7334#discussion_r51959415
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
@@ -337,8 +337,12 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
 execution.Sample(lb, ub, withReplacement, seed, planLater(child)) 
:: Nil
   case logical.LocalRelation(output, data) =>
 LocalTableScan(output, data) :: Nil
+  case logical.ReturnAnswer(logical.Limit(IntegerLiteral(limit), 
child)) =>
+execution.CollectLimit(limit, planLater(child)) :: Nil
   case logical.Limit(IntegerLiteral(limit), child) =>
-execution.Limit(limit, planLater(child)) :: Nil
+val perPartitionLimit = execution.LocalLimit(limit, 
planLater(child))
--- End diff --

I'm thinking that we did not have this kind of test case, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...

2016-02-04 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/7334#discussion_r51957757
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.physical._
+
+
+/**
+ * Take the first `limit` elements and collect them to a single partition.
+ *
+ * This operator will be used when a logical `Limit` operation is the 
final operator in an
+ * logical plan, which happens when the user is collecting results back to 
the driver.
+ */
+case class CollectLimit(limit: Int, child: SparkPlan) extends UnaryNode {
+  override def output: Seq[Attribute] = child.output
+  override def outputPartitioning: Partitioning = SinglePartition
+  override def executeCollect(): Array[InternalRow] = 
child.executeTake(limit)
+  protected override def doExecute(): RDD[InternalRow] = 
sparkContext.makeRDD(executeCollect(), 1)
--- End diff --

This should not be called, should we throw UnsupportedOperaionError? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...

2016-02-04 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/7334#discussion_r51959015
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
@@ -337,8 +337,12 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
 execution.Sample(lb, ub, withReplacement, seed, planLater(child)) 
:: Nil
   case logical.LocalRelation(output, data) =>
 LocalTableScan(output, data) :: Nil
+  case logical.ReturnAnswer(logical.Limit(IntegerLiteral(limit), 
child)) =>
+execution.CollectLimit(limit, planLater(child)) :: Nil
   case logical.Limit(IntegerLiteral(limit), child) =>
-execution.Limit(limit, planLater(child)) :: Nil
+val perPartitionLimit = execution.LocalLimit(limit, 
planLater(child))
--- End diff --

Today this will be planned as a `scan -> local limit -> global limit` for 
each branch of the union, which I believe is correct: this query should return 
at most 20 results.

What do you think this test should be asserting?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...

2016-02-04 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/7334#discussion_r51958945
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.physical._
+
+
+/**
+ * Take the first `limit` elements and collect them to a single partition.
+ *
+ * This operator will be used when a logical `Limit` operation is the 
final operator in an
+ * logical plan, which happens when the user is collecting results back to 
the driver.
+ */
+case class CollectLimit(limit: Int, child: SparkPlan) extends UnaryNode {
+  override def output: Seq[Attribute] = child.output
+  override def outputPartitioning: Partitioning = SinglePartition
+  override def executeCollect(): Array[InternalRow] = 
child.executeTake(limit)
+  protected override def doExecute(): RDD[InternalRow] = 
sparkContext.makeRDD(executeCollect(), 1)
--- End diff --

For this case, should we use GlobalLimit?

Or it will waste some time in collecting rows to driver, than sending them 
to executors, 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...

2016-02-04 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/7334#discussion_r51971509
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.physical._
+
+
+/**
+ * Take the first `limit` elements and collect them to a single partition.
+ *
+ * This operator will be used when a logical `Limit` operation is the 
final operator in an
+ * logical plan, which happens when the user is collecting results back to 
the driver.
+ */
+case class CollectLimit(limit: Int, child: SparkPlan) extends UnaryNode {
+  override def output: Seq[Attribute] = child.output
+  override def outputPartitioning: Partitioning = SinglePartition
+  override def executeCollect(): Array[InternalRow] = 
child.executeTake(limit)
+  protected override def doExecute(): RDD[InternalRow] = 
sparkContext.makeRDD(executeCollect(), 1)
--- End diff --

That's a really good idea. I think we could just match on 
`CachePlan(CollectLimit(...))` and replace it by `GlobalLimit`. Let me go ahead 
and try this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...

2016-02-04 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/7334#discussion_r51958741
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
@@ -337,8 +337,12 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
 execution.Sample(lb, ub, withReplacement, seed, planLater(child)) 
:: Nil
   case logical.LocalRelation(output, data) =>
 LocalTableScan(output, data) :: Nil
+  case logical.ReturnAnswer(logical.Limit(IntegerLiteral(limit), 
child)) =>
+execution.CollectLimit(limit, planLater(child)) :: Nil
   case logical.Limit(IntegerLiteral(limit), child) =>
-execution.Limit(limit, planLater(child)) :: Nil
+val perPartitionLimit = execution.LocalLimit(limit, 
planLater(child))
--- End diff --

Should we have a test for this?

```
   select * from (select * from A limit 10 ) t1 UNION ALL (select * from B 
limit 10) t2
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...

2016-02-04 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/7334#discussion_r51960794
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.physical._
+
+
+/**
+ * Take the first `limit` elements and collect them to a single partition.
+ *
+ * This operator will be used when a logical `Limit` operation is the 
final operator in an
+ * logical plan, which happens when the user is collecting results back to 
the driver.
+ */
+case class CollectLimit(limit: Int, child: SparkPlan) extends UnaryNode {
+  override def output: Seq[Attribute] = child.output
+  override def outputPartitioning: Partitioning = SinglePartition
+  override def executeCollect(): Array[InternalRow] = 
child.executeTake(limit)
+  protected override def doExecute(): RDD[InternalRow] = 
sparkContext.makeRDD(executeCollect(), 1)
--- End diff --

Similar to ReturnAnswer, we could put a CachePlan() as the root (See 
CacheManager.cacheQuery), and have a special case for it.

If it's not trivial, we could just leave a TODO for this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...

2016-02-04 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/7334#discussion_r51962883
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
@@ -337,8 +337,12 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
 execution.Sample(lb, ub, withReplacement, seed, planLater(child)) 
:: Nil
   case logical.LocalRelation(output, data) =>
 LocalTableScan(output, data) :: Nil
+  case logical.ReturnAnswer(logical.Limit(IntegerLiteral(limit), 
child)) =>
+execution.CollectLimit(limit, planLater(child)) :: Nil
   case logical.Limit(IntegerLiteral(limit), child) =>
-execution.Limit(limit, planLater(child)) :: Nil
+val perPartitionLimit = execution.LocalLimit(limit, 
planLater(child))
--- End diff --

This is not allowed. See my PR: https://github.com/apache/spark/pull/10689


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...

2016-02-04 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/7334#discussion_r51970296
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
@@ -337,8 +337,12 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
 execution.Sample(lb, ub, withReplacement, seed, planLater(child)) 
:: Nil
   case logical.LocalRelation(output, data) =>
 LocalTableScan(output, data) :: Nil
+  case logical.ReturnAnswer(logical.Limit(IntegerLiteral(limit), 
child)) =>
+execution.CollectLimit(limit, planLater(child)) :: Nil
   case logical.Limit(IntegerLiteral(limit), child) =>
-execution.Limit(limit, planLater(child)) :: Nil
+val perPartitionLimit = execution.LocalLimit(limit, 
planLater(child))
--- End diff --

@davies, to clarify, are you proposing that we need to add a test to ensure 
that global limits below union are not pulled up?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...

2016-02-04 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/7334#discussion_r51971305
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
@@ -337,8 +337,12 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
 execution.Sample(lb, ub, withReplacement, seed, planLater(child)) 
:: Nil
   case logical.LocalRelation(output, data) =>
 LocalTableScan(output, data) :: Nil
+  case logical.ReturnAnswer(logical.Limit(IntegerLiteral(limit), 
child)) =>
+execution.CollectLimit(limit, planLater(child)) :: Nil
   case logical.Limit(IntegerLiteral(limit), child) =>
-execution.Limit(limit, planLater(child)) :: Nil
+val perPartitionLimit = execution.LocalLimit(limit, 
planLater(child))
--- End diff --

@gatorsmile: I think we're in agreement. To recap:

Before this patch, `   select * from (select * from A limit 10 ) t1 UNION 
ALL (select * from B limit 10) t2` should get planned as:

```
  ┌─────────┐ 
  │  Union  │ 
  └─────────┘ 
   ▲  
  ┌────────┴────────┐ 
  │ │ 
┌──────────┐  
┌──────────┐
│ Limit 10 │  │ Limit 10 │
└──────────┘  
└──────────┘
  ▲ ▲ 
  │ │ 
┌──────────┐  
┌──────────┐
│  Scan A  │  │  Scan B  │
└──────────┘  
└──────────┘
```

Afterwards, this becomes

```
┌─────────┐ 
│  Union  │ 
└─────────┘ 
 ▲  
┌────────┴──────────┐ 
  
│   │   
┌───────────────┐   
┌──────────────┐
│GlobalLimit 10 │   │GlobalLimit 10│
└───────────────┘   
└──────────────┘
▲   ▲   
│   │   
┌──────────────┐
┌──────────────┐
│LocalLimit 10 ││ LocaLimit 10 │
└──────────────┘
└──────────────┘
▲   ▲   
│   │   
  ┌──────────┐
┌──────────┐  
  │  Scan A  ││  Scan B  │  
  └──────────┘
└──────────┘  
```

What is **not** legal to do here is to pull the `GlobalLimit` up, so the 
following would be wrong:

```
 ┌───────────────┐  
 │GlobalLimit 10 │  
 └───────────────┘  
 ▲  
 │  
┌─────────┐ 
│  Union  │ 
└─────────┘ 
 ▲  
┌────────┴──────────┐ 
  
│   │   
┌──────────────┐
┌──────────────┐
│LocalLimit 10 ││ LocaLimit 10 │
└──────────────┘
└──────────────┘
▲   ▲   
│   │   
  ┌──────────┐
┌──────────┐  
  │  Scan A  ││  Scan B  │  
  └──────────┘
└──────────┘  
```

That plan would be semantically equivalent to executing

```
  select * from (select * from A ) t1 UNION ALL (select * from B) t2 LIMIT 
10
```

@davies, were you suggesting that the current planning is wrong? Or that we 
need more tests to guard against incorrect changes to limit planning? I don't 
believe that the changes in this patch will affect the planning 

[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...

2016-02-04 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/7334#discussion_r51973696
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
@@ -337,8 +337,12 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
 execution.Sample(lb, ub, withReplacement, seed, planLater(child)) 
:: Nil
   case logical.LocalRelation(output, data) =>
 LocalTableScan(output, data) :: Nil
+  case logical.ReturnAnswer(logical.Limit(IntegerLiteral(limit), 
child)) =>
+execution.CollectLimit(limit, planLater(child)) :: Nil
   case logical.Limit(IntegerLiteral(limit), child) =>
-execution.Limit(limit, planLater(child)) :: Nil
+val perPartitionLimit = execution.LocalLimit(limit, 
planLater(child))
--- End diff --

@JoshRosen Thank you for your explanation! That is so great that my 
previous PR are useful. So far, I am unable to find more operators for limit 
push down, except outer join and union all:  
https://github.com/apache/spark/pull/10454 and 
https://github.com/apache/spark/pull/10451


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...

2016-02-04 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/7334#issuecomment-179727593
  
**[Test build #2514 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2514/consoleFull)**
 for PR 7334 at commit 
[`55e27af`](https://github.com/apache/spark/commit/55e27af45f4a39bb1e86425be5687af3608fd4f3).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...

2016-02-04 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/7334#issuecomment-179700823
  
**[Test build #2514 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2514/consoleFull)**
 for PR 7334 at commit 
[`55e27af`](https://github.com/apache/spark/commit/55e27af45f4a39bb1e86425be5687af3608fd4f3).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...

2016-02-04 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/7334#discussion_r51839591
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala ---
@@ -44,7 +44,7 @@ class QueryExecution(val sqlContext: SQLContext, val 
logical: LogicalPlan) {
 
   lazy val sparkPlan: SparkPlan = {
 SQLContext.setActive(sqlContext)
-sqlContext.planner.plan(optimizedPlan).next()
+sqlContext.planner.plan(ReturnAnswer(optimizedPlan)).next()
--- End diff --

cc @marmbrus here to make sure it is safe


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...

2016-02-03 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7334#issuecomment-179518293
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50694/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...

2016-02-03 Thread JoshRosen
Github user JoshRosen commented on the pull request:

https://github.com/apache/spark/pull/7334#issuecomment-179507588
  
Jenkins, retest this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...

2016-02-03 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/7334#issuecomment-179508983
  
**[Test build #50691 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50691/consoleFull)**
 for PR 7334 at commit 
[`b4de467`](https://github.com/apache/spark/commit/b4de46737e393f67bac114b6343f968df6bba733).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...

2016-02-03 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/7334#issuecomment-179538112
  
**[Test build #50691 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50691/consoleFull)**
 for PR 7334 at commit 
[`b4de467`](https://github.com/apache/spark/commit/b4de46737e393f67bac114b6343f968df6bba733).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `case class CollectLimit(limit: Int, child: SparkPlan) extends 
UnaryNode `
  * `case class LocalLimit(limit: Int, child: SparkPlan) extends BaseLimit `


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...

2016-02-03 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7334#issuecomment-179538618
  
Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...

2016-02-03 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7334#issuecomment-179538621
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50691/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...

2016-02-03 Thread JoshRosen
Github user JoshRosen commented on the pull request:

https://github.com/apache/spark/pull/7334#issuecomment-179511191
  
/cc @rxin @nongli for review. This patch is the first step towards better 
limit planning and execution.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...

2016-02-03 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7334#issuecomment-179518288
  
Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...

2016-02-03 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7334#issuecomment-179551660
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50702/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...

2016-02-03 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/7334#issuecomment-179551561
  
**[Test build #50702 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50702/consoleFull)**
 for PR 7334 at commit 
[`924925f`](https://github.com/apache/spark/commit/924925f881c78c19bc792582fb9f606ba1441422).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...

2016-02-03 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7334#issuecomment-179549214
  
Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...

2016-02-03 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7334#issuecomment-179549218
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50710/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...

2016-02-03 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7334#issuecomment-179551658
  
Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...

2016-02-03 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/7334#issuecomment-179605639
  
**[Test build #50730 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50730/consoleFull)**
 for PR 7334 at commit 
[`55e27af`](https://github.com/apache/spark/commit/55e27af45f4a39bb1e86425be5687af3608fd4f3).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...

2016-02-03 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/7334#issuecomment-179582764
  
**[Test build #50730 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50730/consoleFull)**
 for PR 7334 at commit 
[`55e27af`](https://github.com/apache/spark/commit/55e27af45f4a39bb1e86425be5687af3608fd4f3).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...

2016-02-03 Thread JoshRosen
Github user JoshRosen commented on the pull request:

https://github.com/apache/spark/pull/7334#issuecomment-179578858
  
Jenkins, retest this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...

2016-02-03 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7334#issuecomment-179606351
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...

2016-02-03 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7334#issuecomment-179606358
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50730/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...

2016-02-03 Thread gatorsmile
Github user gatorsmile commented on the pull request:

https://github.com/apache/spark/pull/7334#issuecomment-179526045
  
retest it please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...

2016-02-03 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/7334#issuecomment-179528938
  
**[Test build #50702 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50702/consoleFull)**
 for PR 7334 at commit 
[`924925f`](https://github.com/apache/spark/commit/924925f881c78c19bc792582fb9f606ba1441422).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...

2015-07-09 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/7334#issuecomment-120166220
  
  [Test build #36977 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/36977/consoleFull)
 for   PR 7334 at commit 
[`dfe6ff1`](https://github.com/apache/spark/commit/dfe6ff100ae2efcb1778d93bf828f9e2e8c46e18).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...

2015-07-09 Thread JoshRosen
Github user JoshRosen commented on the pull request:

https://github.com/apache/spark/pull/7334#issuecomment-120169614
  
One question that I have: will the change here introduce a performance 
regression in cases where Limit is the final operator in the plan?  I think 
that we may want to continue to do the `executeTake()` for those cases, which 
may require us to use a different physical plan for `Limit` in cases where it's 
the terminal operator.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...

2015-07-09 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/7334#discussion_r34313535
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala ---
@@ -108,48 +105,24 @@ case class Union(children: Seq[SparkPlan]) extends 
SparkPlan {
 
 /**
  * :: DeveloperApi ::
- * Take the first limit elements. Note that the implementation is 
different depending on whether
- * this is a terminal operator or not. If it is terminal and is invoked 
using executeCollect,
- * this operator uses something similar to Spark's take method on the 
Spark driver. If it is not
- * terminal or is invoked using execute, we first take the limit on each 
partition, and then
- * repartition all the data to a single partition to compute the global 
limit.
+ * Take the first `limit` elements from each partition.
  */
 @DeveloperApi
-case class Limit(limit: Int, child: SparkPlan)
+case class PartitionLocalLimit(limit: Int, child: SparkPlan)
   extends UnaryNode {
-  // TODO: Implement a partition local limit, and use a strategy to 
generate the proper limit plan:
-  // partition local limit - exchange into one partition - partition 
local limit again
-
-  /** We must copy rows when sort based shuffle is on */
-  private def sortBasedShuffleOn = 
SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager]
-
   override def output: Seq[Attribute] = child.output
-  override def outputPartitioning: Partitioning = SinglePartition
 
   override def executeCollect(): Array[Row] = child.executeTake(limit)
--- End diff --

Whoops, forgot to remove this.  This should probably be split to a separate 
operator in order to handle the cases where Limit is the final operator.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...

2015-07-09 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/7334#discussion_r34313763
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala ---
@@ -108,48 +105,24 @@ case class Union(children: Seq[SparkPlan]) extends 
SparkPlan {
 
 /**
  * :: DeveloperApi ::
- * Take the first limit elements. Note that the implementation is 
different depending on whether
- * this is a terminal operator or not. If it is terminal and is invoked 
using executeCollect,
- * this operator uses something similar to Spark's take method on the 
Spark driver. If it is not
- * terminal or is invoked using execute, we first take the limit on each 
partition, and then
- * repartition all the data to a single partition to compute the global 
limit.
+ * Take the first `limit` elements from each partition.
  */
 @DeveloperApi
-case class Limit(limit: Int, child: SparkPlan)
+case class PartitionLocalLimit(limit: Int, child: SparkPlan)
   extends UnaryNode {
-  // TODO: Implement a partition local limit, and use a strategy to 
generate the proper limit plan:
-  // partition local limit - exchange into one partition - partition 
local limit again
-
-  /** We must copy rows when sort based shuffle is on */
-  private def sortBasedShuffleOn = 
SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager]
-
   override def output: Seq[Attribute] = child.output
-  override def outputPartitioning: Partitioning = SinglePartition
 
   override def executeCollect(): Array[Row] = child.executeTake(limit)
--- End diff --

I looked at the code for `executeTake` and it looks like it's also doing 
some potentially-unnecessary copying and inefficient serialization of result 
rows (AFAIK it's not using SqlSerializer2).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...

2015-07-09 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7334#issuecomment-120166038
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...

2015-07-09 Thread JoshRosen
Github user JoshRosen commented on the pull request:

https://github.com/apache/spark/pull/7334#issuecomment-120165330
  
Ping @rxin, since this is addressing an old TODO that you added.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...

2015-07-09 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7334#issuecomment-120166014
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...

2015-07-09 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/7334#issuecomment-120177806
  
  [Test build #36977 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/36977/console)
 for   PR 7334 at commit 
[`dfe6ff1`](https://github.com/apache/spark/commit/dfe6ff100ae2efcb1778d93bf828f9e2e8c46e18).
 * This patch **fails PySpark unit tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `class KafkaRDD(RDD):`
  * `class KafkaDStream(DStream):`
  * `class KafkaTransformedDStream(TransformedDStream):`
  * `class GenericInternalRowWithSchema(values: Array[Any], override val 
schema: StructType)`
  * `case class PartitionLocalLimit(limit: Int, child: SparkPlan)`
  * `case class StreamInputInfo(`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...

2015-07-09 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7334#issuecomment-120177823
  
Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org