[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/7334#discussion_r52209170 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.rdd.RDD +import org.apache.spark.serializer.Serializer +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.physical._ + + +/** + * Take the first `limit` elements and collect them to a single partition. + * + * This operator will be used when a logical `Limit` operation is the final operator in an + * logical plan, which happens when the user is collecting results back to the driver. + */ +case class CollectLimit(limit: Int, child: SparkPlan) extends UnaryNode { + override def output: Seq[Attribute] = child.output + override def outputPartitioning: Partitioning = SinglePartition + override def executeCollect(): Array[InternalRow] = child.executeTake(limit) + private val serializer: Serializer = new UnsafeRowSerializer(child.output.size) + protected override def doExecute(): RDD[InternalRow] = { +val shuffled = new ShuffledRowRDD( + Exchange.prepareShuffleDependency(child.execute(), child.output, SinglePartition, serializer)) --- End diff -- In this case, the plan in SparkUI (no exchange) will not match the actual plan (having exchange) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/7334#discussion_r52213727 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.rdd.RDD +import org.apache.spark.serializer.Serializer +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.physical._ + + +/** + * Take the first `limit` elements and collect them to a single partition. + * + * This operator will be used when a logical `Limit` operation is the final operator in an + * logical plan, which happens when the user is collecting results back to the driver. + */ +case class CollectLimit(limit: Int, child: SparkPlan) extends UnaryNode { + override def output: Seq[Attribute] = child.output + override def outputPartitioning: Partitioning = SinglePartition + override def executeCollect(): Array[InternalRow] = child.executeTake(limit) + private val serializer: Serializer = new UnsafeRowSerializer(child.output.size) + protected override def doExecute(): RDD[InternalRow] = { +val shuffled = new ShuffledRowRDD( + Exchange.prepareShuffleDependency(child.execute(), child.output, SinglePartition, serializer)) --- End diff -- Since the current implementation also adds an ShuffledRowRDD without Exchange, this is not a regression. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...
Github user davies commented on the pull request: https://github.com/apache/spark/pull/7334#issuecomment-181533917 LGTM, merging this into master. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/7334 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7334#issuecomment-180568890 **[Test build #50844 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50844/consoleFull)** for PR 7334 at commit [`c4b0a53`](https://github.com/apache/spark/commit/c4b0a5303b4de328e11e8bfb79e8b3e795bd1174). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7334#issuecomment-180608495 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7334#issuecomment-180608496 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50844/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/7334#discussion_r52056901 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala --- @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.physical._ + + +/** + * Take the first `limit` elements and collect them to a single partition. + * + * This operator will be used when a logical `Limit` operation is the final operator in an + * logical plan, which happens when the user is collecting results back to the driver. + */ +case class CollectLimit(limit: Int, child: SparkPlan) extends UnaryNode { + override def output: Seq[Attribute] = child.output + override def outputPartitioning: Partitioning = SinglePartition + override def executeCollect(): Array[InternalRow] = child.executeTake(limit) + protected override def doExecute(): RDD[InternalRow] = sparkContext.makeRDD(executeCollect(), 1) --- End diff -- As discussed upthread, simply always having `GlobalLimit` would lead to regressions when calling `df.limit(10).collect()`, since it would now have to compute all partitions of `df` whereas before it might be able to compute only a subset. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/7334#discussion_r52056035 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala --- @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.physical._ + + +/** + * Take the first `limit` elements and collect them to a single partition. + * + * This operator will be used when a logical `Limit` operation is the final operator in an + * logical plan, which happens when the user is collecting results back to the driver. + */ +case class CollectLimit(limit: Int, child: SparkPlan) extends UnaryNode { + override def output: Seq[Attribute] = child.output + override def outputPartitioning: Partitioning = SinglePartition + override def executeCollect(): Array[InternalRow] = child.executeTake(limit) + protected override def doExecute(): RDD[InternalRow] = sparkContext.makeRDD(executeCollect(), 1) --- End diff -- Maybe having a special case for Limit as root is not a good idea, I think we could always have GlobalLimit, call `executeTake(limit)` in that. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/7334#discussion_r52053966 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala --- @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.physical._ + + +/** + * Take the first `limit` elements and collect them to a single partition. + * + * This operator will be used when a logical `Limit` operation is the final operator in an + * logical plan, which happens when the user is collecting results back to the driver. + */ +case class CollectLimit(limit: Int, child: SparkPlan) extends UnaryNode { + override def output: Seq[Attribute] = child.output + override def outputPartitioning: Partitioning = SinglePartition + override def executeCollect(): Array[InternalRow] = child.executeTake(limit) + protected override def doExecute(): RDD[InternalRow] = sparkContext.makeRDD(executeCollect(), 1) --- End diff -- I've started working on this and ran into one minor snag: what happens if a user has a DataFrame whose root logical plan is an RDD, then calls `.rdd()` on it? In that case, `CollectLimit` will still be the root and we'll need it to have a functioning `doExecute()`. On reflection, I agree that the current `doExecute()` here is bad because it breaks linage and laziness. I'll see about switching back to an approach which invokes Exchange directly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...
Github user davies commented on the pull request: https://github.com/apache/spark/pull/7334#issuecomment-180598004 LGTM, pending tests. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7334#issuecomment-180608312 **[Test build #50844 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50844/consoleFull)** for PR 7334 at commit [`c4b0a53`](https://github.com/apache/spark/commit/c4b0a5303b4de328e11e8bfb79e8b3e795bd1174). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...
Github user JoshRosen commented on the pull request: https://github.com/apache/spark/pull/7334#issuecomment-180527530 Alright, just pushed an update which I think should fix the laziness problems with `.rdd` and `.cache`. The key idea is to move the `ReturnAnswer` injection into `DataFrame.collect()` rather than always putting it at the root of the logical plan before invoking the planner. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...
Github user JoshRosen commented on the pull request: https://github.com/apache/spark/pull/7334#issuecomment-180561524 Jenkins, retest this please. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7334#issuecomment-180534132 **[Test build #50837 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50837/consoleFull)** for PR 7334 at commit [`c4b0a53`](https://github.com/apache/spark/commit/c4b0a5303b4de328e11e8bfb79e8b3e795bd1174). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7334#issuecomment-180554345 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7334#issuecomment-180554215 **[Test build #50837 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50837/consoleFull)** for PR 7334 at commit [`c4b0a53`](https://github.com/apache/spark/commit/c4b0a5303b4de328e11e8bfb79e8b3e795bd1174). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7334#issuecomment-180554346 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50837/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7334#issuecomment-180629560 **[Test build #50850 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50850/consoleFull)** for PR 7334 at commit [`b8c9e47`](https://github.com/apache/spark/commit/b8c9e47d5b0220c1074d686fcc62b2bb547f61ec). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...
Github user JoshRosen commented on the pull request: https://github.com/apache/spark/pull/7334#issuecomment-180658750 This should now be ready for a final sign-off. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7334#issuecomment-180646598 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7334#issuecomment-180646223 **[Test build #50850 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50850/consoleFull)** for PR 7334 at commit [`b8c9e47`](https://github.com/apache/spark/commit/b8c9e47d5b0220c1074d686fcc62b2bb547f61ec). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7334#issuecomment-180646611 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50850/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/7334#issuecomment-180062121 cc @davies for a more detailed review --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/7334#discussion_r51939570 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala --- @@ -44,7 +44,7 @@ class QueryExecution(val sqlContext: SQLContext, val logical: LogicalPlan) { lazy val sparkPlan: SparkPlan = { SQLContext.setActive(sqlContext) -sqlContext.planner.plan(optimizedPlan).next() +sqlContext.planner.plan(ReturnAnswer(optimizedPlan)).next() --- End diff -- Yeah, this looks good to me. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/7334#discussion_r51957998 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala --- @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.physical._ + + +/** + * Take the first `limit` elements and collect them to a single partition. + * + * This operator will be used when a logical `Limit` operation is the final operator in an + * logical plan, which happens when the user is collecting results back to the driver. + */ +case class CollectLimit(limit: Int, child: SparkPlan) extends UnaryNode { + override def output: Seq[Attribute] = child.output + override def outputPartitioning: Partitioning = SinglePartition + override def executeCollect(): Array[InternalRow] = child.executeTake(limit) + protected override def doExecute(): RDD[InternalRow] = sparkContext.makeRDD(executeCollect(), 1) --- End diff -- This actually _can_ get called if you call `df.limit(10).cache()`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/7334#discussion_r51959428 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala --- @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.physical._ + + +/** + * Take the first `limit` elements and collect them to a single partition. + * + * This operator will be used when a logical `Limit` operation is the final operator in an + * logical plan, which happens when the user is collecting results back to the driver. + */ +case class CollectLimit(limit: Int, child: SparkPlan) extends UnaryNode { + override def output: Seq[Attribute] = child.output + override def outputPartitioning: Partitioning = SinglePartition + override def executeCollect(): Array[InternalRow] = child.executeTake(limit) + protected override def doExecute(): RDD[InternalRow] = sparkContext.makeRDD(executeCollect(), 1) --- End diff -- This is actually a tricky problem to address. If we want to actually plan `GlobalLimit` here instead then we need to be able to distinguish between the collect-to-driver and cache cases earlier in the planning process, which I think is going to be a large and non-trivial change. If we want to avoid collecting back to the driver and re-broadcasting then I suppose we can do something similar to what the old `Limit` operator's `doExecute` did. The problem with the old `doExecute()` was that it involved a lot of inefficient copying and serialization. Instead, it might be worth seeing whether the inner logic of the `Exchange` operator could be called from `CollectLimit.doExecute`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/7334#discussion_r51959415 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala --- @@ -337,8 +337,12 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { execution.Sample(lb, ub, withReplacement, seed, planLater(child)) :: Nil case logical.LocalRelation(output, data) => LocalTableScan(output, data) :: Nil + case logical.ReturnAnswer(logical.Limit(IntegerLiteral(limit), child)) => +execution.CollectLimit(limit, planLater(child)) :: Nil case logical.Limit(IntegerLiteral(limit), child) => -execution.Limit(limit, planLater(child)) :: Nil +val perPartitionLimit = execution.LocalLimit(limit, planLater(child)) --- End diff -- I'm thinking that we did not have this kind of test case, right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/7334#discussion_r51957757 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala --- @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.physical._ + + +/** + * Take the first `limit` elements and collect them to a single partition. + * + * This operator will be used when a logical `Limit` operation is the final operator in an + * logical plan, which happens when the user is collecting results back to the driver. + */ +case class CollectLimit(limit: Int, child: SparkPlan) extends UnaryNode { + override def output: Seq[Attribute] = child.output + override def outputPartitioning: Partitioning = SinglePartition + override def executeCollect(): Array[InternalRow] = child.executeTake(limit) + protected override def doExecute(): RDD[InternalRow] = sparkContext.makeRDD(executeCollect(), 1) --- End diff -- This should not be called, should we throw UnsupportedOperaionError? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/7334#discussion_r51959015 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala --- @@ -337,8 +337,12 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { execution.Sample(lb, ub, withReplacement, seed, planLater(child)) :: Nil case logical.LocalRelation(output, data) => LocalTableScan(output, data) :: Nil + case logical.ReturnAnswer(logical.Limit(IntegerLiteral(limit), child)) => +execution.CollectLimit(limit, planLater(child)) :: Nil case logical.Limit(IntegerLiteral(limit), child) => -execution.Limit(limit, planLater(child)) :: Nil +val perPartitionLimit = execution.LocalLimit(limit, planLater(child)) --- End diff -- Today this will be planned as a `scan -> local limit -> global limit` for each branch of the union, which I believe is correct: this query should return at most 20 results. What do you think this test should be asserting? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/7334#discussion_r51958945 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala --- @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.physical._ + + +/** + * Take the first `limit` elements and collect them to a single partition. + * + * This operator will be used when a logical `Limit` operation is the final operator in an + * logical plan, which happens when the user is collecting results back to the driver. + */ +case class CollectLimit(limit: Int, child: SparkPlan) extends UnaryNode { + override def output: Seq[Attribute] = child.output + override def outputPartitioning: Partitioning = SinglePartition + override def executeCollect(): Array[InternalRow] = child.executeTake(limit) + protected override def doExecute(): RDD[InternalRow] = sparkContext.makeRDD(executeCollect(), 1) --- End diff -- For this case, should we use GlobalLimit? Or it will waste some time in collecting rows to driver, than sending them to executors, --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/7334#discussion_r51971509 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala --- @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.physical._ + + +/** + * Take the first `limit` elements and collect them to a single partition. + * + * This operator will be used when a logical `Limit` operation is the final operator in an + * logical plan, which happens when the user is collecting results back to the driver. + */ +case class CollectLimit(limit: Int, child: SparkPlan) extends UnaryNode { + override def output: Seq[Attribute] = child.output + override def outputPartitioning: Partitioning = SinglePartition + override def executeCollect(): Array[InternalRow] = child.executeTake(limit) + protected override def doExecute(): RDD[InternalRow] = sparkContext.makeRDD(executeCollect(), 1) --- End diff -- That's a really good idea. I think we could just match on `CachePlan(CollectLimit(...))` and replace it by `GlobalLimit`. Let me go ahead and try this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/7334#discussion_r51958741 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala --- @@ -337,8 +337,12 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { execution.Sample(lb, ub, withReplacement, seed, planLater(child)) :: Nil case logical.LocalRelation(output, data) => LocalTableScan(output, data) :: Nil + case logical.ReturnAnswer(logical.Limit(IntegerLiteral(limit), child)) => +execution.CollectLimit(limit, planLater(child)) :: Nil case logical.Limit(IntegerLiteral(limit), child) => -execution.Limit(limit, planLater(child)) :: Nil +val perPartitionLimit = execution.LocalLimit(limit, planLater(child)) --- End diff -- Should we have a test for this? ``` select * from (select * from A limit 10 ) t1 UNION ALL (select * from B limit 10) t2 ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/7334#discussion_r51960794 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala --- @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.physical._ + + +/** + * Take the first `limit` elements and collect them to a single partition. + * + * This operator will be used when a logical `Limit` operation is the final operator in an + * logical plan, which happens when the user is collecting results back to the driver. + */ +case class CollectLimit(limit: Int, child: SparkPlan) extends UnaryNode { + override def output: Seq[Attribute] = child.output + override def outputPartitioning: Partitioning = SinglePartition + override def executeCollect(): Array[InternalRow] = child.executeTake(limit) + protected override def doExecute(): RDD[InternalRow] = sparkContext.makeRDD(executeCollect(), 1) --- End diff -- Similar to ReturnAnswer, we could put a CachePlan() as the root (See CacheManager.cacheQuery), and have a special case for it. If it's not trivial, we could just leave a TODO for this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/7334#discussion_r51962883 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala --- @@ -337,8 +337,12 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { execution.Sample(lb, ub, withReplacement, seed, planLater(child)) :: Nil case logical.LocalRelation(output, data) => LocalTableScan(output, data) :: Nil + case logical.ReturnAnswer(logical.Limit(IntegerLiteral(limit), child)) => +execution.CollectLimit(limit, planLater(child)) :: Nil case logical.Limit(IntegerLiteral(limit), child) => -execution.Limit(limit, planLater(child)) :: Nil +val perPartitionLimit = execution.LocalLimit(limit, planLater(child)) --- End diff -- This is not allowed. See my PR: https://github.com/apache/spark/pull/10689 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/7334#discussion_r51970296 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala --- @@ -337,8 +337,12 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { execution.Sample(lb, ub, withReplacement, seed, planLater(child)) :: Nil case logical.LocalRelation(output, data) => LocalTableScan(output, data) :: Nil + case logical.ReturnAnswer(logical.Limit(IntegerLiteral(limit), child)) => +execution.CollectLimit(limit, planLater(child)) :: Nil case logical.Limit(IntegerLiteral(limit), child) => -execution.Limit(limit, planLater(child)) :: Nil +val perPartitionLimit = execution.LocalLimit(limit, planLater(child)) --- End diff -- @davies, to clarify, are you proposing that we need to add a test to ensure that global limits below union are not pulled up? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/7334#discussion_r51971305 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala --- @@ -337,8 +337,12 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { execution.Sample(lb, ub, withReplacement, seed, planLater(child)) :: Nil case logical.LocalRelation(output, data) => LocalTableScan(output, data) :: Nil + case logical.ReturnAnswer(logical.Limit(IntegerLiteral(limit), child)) => +execution.CollectLimit(limit, planLater(child)) :: Nil case logical.Limit(IntegerLiteral(limit), child) => -execution.Limit(limit, planLater(child)) :: Nil +val perPartitionLimit = execution.LocalLimit(limit, planLater(child)) --- End diff -- @gatorsmile: I think we're in agreement. To recap: Before this patch, ` select * from (select * from A limit 10 ) t1 UNION ALL (select * from B limit 10) t2` should get planned as: ``` âââââââââââ â Union â âââââââââââ â² ââââââââââ´âââââââââ â â ââââââââââââ ââââââââââââ â Limit 10 â â Limit 10 â ââââââââââââ ââââââââââââ â² â² â â ââââââââââââ ââââââââââââ â Scan A â â Scan B â ââââââââââââ ââââââââââââ ``` Afterwards, this becomes ``` âââââââââââ â Union â âââââââââââ â² ââââââââââ´âââââââââââ â â âââââââââââââââââ ââââââââââââââââ âGlobalLimit 10 â âGlobalLimit 10â âââââââââââââââââ ââââââââââââââââ â² â² â â ââââââââââââââââ ââââââââââââââââ âLocalLimit 10 ââ LocaLimit 10 â ââââââââââââââââ ââââââââââââââââ â² â² â â ââââââââââââ ââââââââââââ â Scan A ââ Scan B â ââââââââââââ ââââââââââââ ``` What is **not** legal to do here is to pull the `GlobalLimit` up, so the following would be wrong: ``` âââââââââââââââââ âGlobalLimit 10 â âââââââââââââââââ â² â âââââââââââ â Union â âââââââââââ â² ââââââââââ´âââââââââââ â â ââââââââââââââââ ââââââââââââââââ âLocalLimit 10 ââ LocaLimit 10 â ââââââââââââââââ ââââââââââââââââ â² â² â â ââââââââââââ ââââââââââââ â Scan A ââ Scan B â ââââââââââââ ââââââââââââ ``` That plan would be semantically equivalent to executing ``` select * from (select * from A ) t1 UNION ALL (select * from B) t2 LIMIT 10 ``` @davies, were you suggesting that the current planning is wrong? Or that we need more tests to guard against incorrect changes to limit planning? I don't believe that the changes in this patch will affect the planning
[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/7334#discussion_r51973696 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala --- @@ -337,8 +337,12 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { execution.Sample(lb, ub, withReplacement, seed, planLater(child)) :: Nil case logical.LocalRelation(output, data) => LocalTableScan(output, data) :: Nil + case logical.ReturnAnswer(logical.Limit(IntegerLiteral(limit), child)) => +execution.CollectLimit(limit, planLater(child)) :: Nil case logical.Limit(IntegerLiteral(limit), child) => -execution.Limit(limit, planLater(child)) :: Nil +val perPartitionLimit = execution.LocalLimit(limit, planLater(child)) --- End diff -- @JoshRosen Thank you for your explanation! That is so great that my previous PR are useful. So far, I am unable to find more operators for limit push down, except outer join and union all: https://github.com/apache/spark/pull/10454 and https://github.com/apache/spark/pull/10451 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7334#issuecomment-179727593 **[Test build #2514 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2514/consoleFull)** for PR 7334 at commit [`55e27af`](https://github.com/apache/spark/commit/55e27af45f4a39bb1e86425be5687af3608fd4f3). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7334#issuecomment-179700823 **[Test build #2514 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2514/consoleFull)** for PR 7334 at commit [`55e27af`](https://github.com/apache/spark/commit/55e27af45f4a39bb1e86425be5687af3608fd4f3). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/7334#discussion_r51839591 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala --- @@ -44,7 +44,7 @@ class QueryExecution(val sqlContext: SQLContext, val logical: LogicalPlan) { lazy val sparkPlan: SparkPlan = { SQLContext.setActive(sqlContext) -sqlContext.planner.plan(optimizedPlan).next() +sqlContext.planner.plan(ReturnAnswer(optimizedPlan)).next() --- End diff -- cc @marmbrus here to make sure it is safe --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7334#issuecomment-179518293 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50694/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...
Github user JoshRosen commented on the pull request: https://github.com/apache/spark/pull/7334#issuecomment-179507588 Jenkins, retest this please. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7334#issuecomment-179508983 **[Test build #50691 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50691/consoleFull)** for PR 7334 at commit [`b4de467`](https://github.com/apache/spark/commit/b4de46737e393f67bac114b6343f968df6bba733). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7334#issuecomment-179538112 **[Test build #50691 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50691/consoleFull)** for PR 7334 at commit [`b4de467`](https://github.com/apache/spark/commit/b4de46737e393f67bac114b6343f968df6bba733). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `case class CollectLimit(limit: Int, child: SparkPlan) extends UnaryNode ` * `case class LocalLimit(limit: Int, child: SparkPlan) extends BaseLimit ` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7334#issuecomment-179538618 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7334#issuecomment-179538621 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50691/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...
Github user JoshRosen commented on the pull request: https://github.com/apache/spark/pull/7334#issuecomment-179511191 /cc @rxin @nongli for review. This patch is the first step towards better limit planning and execution. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7334#issuecomment-179518288 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7334#issuecomment-179551660 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50702/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7334#issuecomment-179551561 **[Test build #50702 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50702/consoleFull)** for PR 7334 at commit [`924925f`](https://github.com/apache/spark/commit/924925f881c78c19bc792582fb9f606ba1441422). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7334#issuecomment-179549214 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7334#issuecomment-179549218 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50710/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7334#issuecomment-179551658 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7334#issuecomment-179605639 **[Test build #50730 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50730/consoleFull)** for PR 7334 at commit [`55e27af`](https://github.com/apache/spark/commit/55e27af45f4a39bb1e86425be5687af3608fd4f3). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7334#issuecomment-179582764 **[Test build #50730 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50730/consoleFull)** for PR 7334 at commit [`55e27af`](https://github.com/apache/spark/commit/55e27af45f4a39bb1e86425be5687af3608fd4f3). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...
Github user JoshRosen commented on the pull request: https://github.com/apache/spark/pull/7334#issuecomment-179578858 Jenkins, retest this please. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7334#issuecomment-179606351 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7334#issuecomment-179606358 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50730/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...
Github user gatorsmile commented on the pull request: https://github.com/apache/spark/pull/7334#issuecomment-179526045 retest it please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7334#issuecomment-179528938 **[Test build #50702 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50702/consoleFull)** for PR 7334 at commit [`924925f`](https://github.com/apache/spark/commit/924925f881c78c19bc792582fb9f606ba1441422). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7334#issuecomment-120166220 [Test build #36977 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/36977/consoleFull) for PR 7334 at commit [`dfe6ff1`](https://github.com/apache/spark/commit/dfe6ff100ae2efcb1778d93bf828f9e2e8c46e18). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...
Github user JoshRosen commented on the pull request: https://github.com/apache/spark/pull/7334#issuecomment-120169614 One question that I have: will the change here introduce a performance regression in cases where Limit is the final operator in the plan? I think that we may want to continue to do the `executeTake()` for those cases, which may require us to use a different physical plan for `Limit` in cases where it's the terminal operator. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/7334#discussion_r34313535 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala --- @@ -108,48 +105,24 @@ case class Union(children: Seq[SparkPlan]) extends SparkPlan { /** * :: DeveloperApi :: - * Take the first limit elements. Note that the implementation is different depending on whether - * this is a terminal operator or not. If it is terminal and is invoked using executeCollect, - * this operator uses something similar to Spark's take method on the Spark driver. If it is not - * terminal or is invoked using execute, we first take the limit on each partition, and then - * repartition all the data to a single partition to compute the global limit. + * Take the first `limit` elements from each partition. */ @DeveloperApi -case class Limit(limit: Int, child: SparkPlan) +case class PartitionLocalLimit(limit: Int, child: SparkPlan) extends UnaryNode { - // TODO: Implement a partition local limit, and use a strategy to generate the proper limit plan: - // partition local limit - exchange into one partition - partition local limit again - - /** We must copy rows when sort based shuffle is on */ - private def sortBasedShuffleOn = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager] - override def output: Seq[Attribute] = child.output - override def outputPartitioning: Partitioning = SinglePartition override def executeCollect(): Array[Row] = child.executeTake(limit) --- End diff -- Whoops, forgot to remove this. This should probably be split to a separate operator in order to handle the cases where Limit is the final operator. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/7334#discussion_r34313763 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala --- @@ -108,48 +105,24 @@ case class Union(children: Seq[SparkPlan]) extends SparkPlan { /** * :: DeveloperApi :: - * Take the first limit elements. Note that the implementation is different depending on whether - * this is a terminal operator or not. If it is terminal and is invoked using executeCollect, - * this operator uses something similar to Spark's take method on the Spark driver. If it is not - * terminal or is invoked using execute, we first take the limit on each partition, and then - * repartition all the data to a single partition to compute the global limit. + * Take the first `limit` elements from each partition. */ @DeveloperApi -case class Limit(limit: Int, child: SparkPlan) +case class PartitionLocalLimit(limit: Int, child: SparkPlan) extends UnaryNode { - // TODO: Implement a partition local limit, and use a strategy to generate the proper limit plan: - // partition local limit - exchange into one partition - partition local limit again - - /** We must copy rows when sort based shuffle is on */ - private def sortBasedShuffleOn = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager] - override def output: Seq[Attribute] = child.output - override def outputPartitioning: Partitioning = SinglePartition override def executeCollect(): Array[Row] = child.executeTake(limit) --- End diff -- I looked at the code for `executeTake` and it looks like it's also doing some potentially-unnecessary copying and inefficient serialization of result rows (AFAIK it's not using SqlSerializer2). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7334#issuecomment-120166038 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...
Github user JoshRosen commented on the pull request: https://github.com/apache/spark/pull/7334#issuecomment-120165330 Ping @rxin, since this is addressing an old TODO that you added. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7334#issuecomment-120166014 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7334#issuecomment-120177806 [Test build #36977 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/36977/console) for PR 7334 at commit [`dfe6ff1`](https://github.com/apache/spark/commit/dfe6ff100ae2efcb1778d93bf828f9e2e8c46e18). * This patch **fails PySpark unit tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `class KafkaRDD(RDD):` * `class KafkaDStream(DStream):` * `class KafkaTransformedDStream(TransformedDStream):` * `class GenericInternalRowWithSchema(values: Array[Any], override val schema: StructType)` * `case class PartitionLocalLimit(limit: Int, child: SparkPlan)` * `case class StreamInputInfo(` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8964] [SQL] Use Exchange to perform shu...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7334#issuecomment-120177823 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org