[2/2] spark git commit: Preparing Spark release v1.2.0-snapshot1
Preparing Spark release v1.2.0-snapshot1 Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/38c1fbd9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/38c1fbd9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/38c1fbd9 Branch: refs/heads/branch-1.2 Commit: 38c1fbd9694430cefd962c90bc36b0d108c6124b Parents: e1339da Author: Ubuntu Authored: Mon Nov 17 06:37:44 2014 + Committer: Ubuntu Committed: Mon Nov 17 06:37:44 2014 + -- assembly/pom.xml | 2 +- bagel/pom.xml | 2 +- core/pom.xml | 2 +- core/src/main/scala/org/apache/spark/package.scala | 2 +- examples/pom.xml | 2 +- external/flume-sink/pom.xml| 2 +- external/flume/pom.xml | 2 +- external/kafka/pom.xml | 2 +- external/mqtt/pom.xml | 2 +- external/twitter/pom.xml | 2 +- external/zeromq/pom.xml| 2 +- extras/java8-tests/pom.xml | 2 +- extras/kinesis-asl/pom.xml | 2 +- extras/spark-ganglia-lgpl/pom.xml | 2 +- graphx/pom.xml | 2 +- mllib/pom.xml | 2 +- network/common/pom.xml | 2 +- network/shuffle/pom.xml| 2 +- network/yarn/pom.xml | 2 +- pom.xml| 2 +- repl/pom.xml | 2 +- sql/catalyst/pom.xml | 2 +- sql/core/pom.xml | 2 +- sql/hive-thriftserver/pom.xml | 2 +- sql/hive/pom.xml | 2 +- streaming/pom.xml | 2 +- tools/pom.xml | 2 +- yarn/alpha/pom.xml | 2 +- yarn/pom.xml | 2 +- yarn/stable/pom.xml| 2 +- 30 files changed, 30 insertions(+), 30 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/38c1fbd9/assembly/pom.xml -- diff --git a/assembly/pom.xml b/assembly/pom.xml index c65192b..5af17c5 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent -1.2.0-SNAPSHOT +1.2.0 ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/38c1fbd9/bagel/pom.xml -- diff --git a/bagel/pom.xml b/bagel/pom.xml index 93db0d5..1f43fcc 100644 --- a/bagel/pom.xml +++ b/bagel/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent -1.2.0-SNAPSHOT +1.2.0 ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/38c1fbd9/core/pom.xml -- diff --git a/core/pom.xml b/core/pom.xml index 492eddd..15cb382 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent -1.2.0-SNAPSHOT +1.2.0 ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/38c1fbd9/core/src/main/scala/org/apache/spark/package.scala -- diff --git a/core/src/main/scala/org/apache/spark/package.scala b/core/src/main/scala/org/apache/spark/package.scala index e2fc9c6..873ec3c 100644 --- a/core/src/main/scala/org/apache/spark/package.scala +++ b/core/src/main/scala/org/apache/spark/package.scala @@ -44,5 +44,5 @@ package org.apache package object spark { // For package docs only - val SPARK_VERSION = "1.2.0-SNAPSHOT" + val SPARK_VERSION = "1.2.0" } http://git-wip-us.apache.org/repos/asf/spark/blob/38c1fbd9/examples/pom.xml -- diff --git a/examples/pom.xml b/examples/pom.xml index 2752ce3..bd3b317 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent -1.2.0-SNAPSHOT +1.2.0 ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/38c1fbd9/external/flume-sink/pom.xml -- diff --git a/external/flume-sink/pom.xml b/external/flume-sink/pom.xml index ac291bd..ff7babc 100644 --- a/external/flume-sink/pom.xml +++ b/external/flume-sink/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent
Git Push Summary
Repository: spark Updated Tags: refs/tags/v1.2.0-snapshot1 [created] 38c1fbd96 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[1/2] spark git commit: Preparing development version 1.2.1-SNAPSHOT
Repository: spark Updated Branches: refs/heads/branch-1.2 e1339daec -> d7ac60134 Preparing development version 1.2.1-SNAPSHOT Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d7ac6013 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d7ac6013 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d7ac6013 Branch: refs/heads/branch-1.2 Commit: d7ac6013483e83caff8ea54c228f37aeca159db8 Parents: 38c1fbd Author: Ubuntu Authored: Mon Nov 17 06:37:44 2014 + Committer: Ubuntu Committed: Mon Nov 17 06:37:44 2014 + -- assembly/pom.xml | 2 +- bagel/pom.xml | 2 +- core/pom.xml | 2 +- core/src/main/scala/org/apache/spark/package.scala | 2 +- examples/pom.xml | 2 +- external/flume-sink/pom.xml| 2 +- external/flume/pom.xml | 2 +- external/kafka/pom.xml | 2 +- external/mqtt/pom.xml | 2 +- external/twitter/pom.xml | 2 +- external/zeromq/pom.xml| 2 +- extras/java8-tests/pom.xml | 2 +- extras/kinesis-asl/pom.xml | 2 +- extras/spark-ganglia-lgpl/pom.xml | 2 +- graphx/pom.xml | 2 +- mllib/pom.xml | 2 +- network/common/pom.xml | 2 +- network/shuffle/pom.xml| 2 +- network/yarn/pom.xml | 2 +- pom.xml| 4 ++-- repl/pom.xml | 2 +- sql/catalyst/pom.xml | 2 +- sql/core/pom.xml | 2 +- sql/hive-thriftserver/pom.xml | 2 +- sql/hive/pom.xml | 2 +- streaming/pom.xml | 2 +- tools/pom.xml | 2 +- yarn/alpha/pom.xml | 2 +- yarn/pom.xml | 2 +- yarn/stable/pom.xml| 2 +- 30 files changed, 31 insertions(+), 31 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d7ac6013/assembly/pom.xml -- diff --git a/assembly/pom.xml b/assembly/pom.xml index 5af17c5..65e3ddf 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent -1.2.0 +1.2.1-SNAPSHOT ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/d7ac6013/bagel/pom.xml -- diff --git a/bagel/pom.xml b/bagel/pom.xml index 1f43fcc..4ead7aa 100644 --- a/bagel/pom.xml +++ b/bagel/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent -1.2.0 +1.2.1-SNAPSHOT ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/d7ac6013/core/pom.xml -- diff --git a/core/pom.xml b/core/pom.xml index 15cb382..155b4c9 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent -1.2.0 +1.2.1-SNAPSHOT ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/d7ac6013/core/src/main/scala/org/apache/spark/package.scala -- diff --git a/core/src/main/scala/org/apache/spark/package.scala b/core/src/main/scala/org/apache/spark/package.scala index 873ec3c..b1a511f 100644 --- a/core/src/main/scala/org/apache/spark/package.scala +++ b/core/src/main/scala/org/apache/spark/package.scala @@ -44,5 +44,5 @@ package org.apache package object spark { // For package docs only - val SPARK_VERSION = "1.2.0" + val SPARK_VERSION = "1.2.1-SNAPSHOT" } http://git-wip-us.apache.org/repos/asf/spark/blob/d7ac6013/examples/pom.xml -- diff --git a/examples/pom.xml b/examples/pom.xml index bd3b317..5bbc9bd 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent -1.2.0 +1.2.1-SNAPSHOT ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/d7ac6013/external/flume-sink/pom.xml -- diff --git a/external/flume-sink/pom.xml b/external/flume-sink/pom.xml index ff7babc..fe1c8fb 100644 --- a/external/flume-sink/pom.xml +++ b
[2/2] spark git commit: Revert "Preparing Spark release v1.2.0-snapshot0"
Revert "Preparing Spark release v1.2.0-snapshot0" This reverts commit bc09875799aa373f4320d38b02618173ffa4c96f. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e1339dae Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e1339dae Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e1339dae Branch: refs/heads/branch-1.2 Commit: e1339daec59ff57cdcbccd9073e9dd5f0ac9d3df Parents: c3fd9ae Author: Patrick Wendell Authored: Sun Nov 16 22:13:40 2014 -0800 Committer: Patrick Wendell Committed: Sun Nov 16 22:13:40 2014 -0800 -- assembly/pom.xml | 2 +- bagel/pom.xml | 2 +- core/pom.xml | 2 +- core/src/main/scala/org/apache/spark/package.scala | 2 +- examples/pom.xml | 2 +- external/flume-sink/pom.xml| 2 +- external/flume/pom.xml | 2 +- external/kafka/pom.xml | 2 +- external/mqtt/pom.xml | 2 +- external/twitter/pom.xml | 2 +- external/zeromq/pom.xml| 2 +- extras/java8-tests/pom.xml | 2 +- extras/kinesis-asl/pom.xml | 2 +- extras/spark-ganglia-lgpl/pom.xml | 2 +- graphx/pom.xml | 2 +- mllib/pom.xml | 2 +- network/common/pom.xml | 2 +- network/shuffle/pom.xml| 2 +- network/yarn/pom.xml | 2 +- pom.xml| 2 +- repl/pom.xml | 2 +- sql/catalyst/pom.xml | 2 +- sql/core/pom.xml | 2 +- sql/hive-thriftserver/pom.xml | 2 +- sql/hive/pom.xml | 2 +- streaming/pom.xml | 2 +- tools/pom.xml | 2 +- yarn/alpha/pom.xml | 2 +- yarn/pom.xml | 2 +- yarn/stable/pom.xml| 2 +- 30 files changed, 30 insertions(+), 30 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e1339dae/assembly/pom.xml -- diff --git a/assembly/pom.xml b/assembly/pom.xml index 5af17c5..c65192b 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent -1.2.0 +1.2.0-SNAPSHOT ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/e1339dae/bagel/pom.xml -- diff --git a/bagel/pom.xml b/bagel/pom.xml index 1f43fcc..93db0d5 100644 --- a/bagel/pom.xml +++ b/bagel/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent -1.2.0 +1.2.0-SNAPSHOT ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/e1339dae/core/pom.xml -- diff --git a/core/pom.xml b/core/pom.xml index 15cb382..492eddd 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent -1.2.0 +1.2.0-SNAPSHOT ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/e1339dae/core/src/main/scala/org/apache/spark/package.scala -- diff --git a/core/src/main/scala/org/apache/spark/package.scala b/core/src/main/scala/org/apache/spark/package.scala index 873ec3c..e2fc9c6 100644 --- a/core/src/main/scala/org/apache/spark/package.scala +++ b/core/src/main/scala/org/apache/spark/package.scala @@ -44,5 +44,5 @@ package org.apache package object spark { // For package docs only - val SPARK_VERSION = "1.2.0" + val SPARK_VERSION = "1.2.0-SNAPSHOT" } http://git-wip-us.apache.org/repos/asf/spark/blob/e1339dae/examples/pom.xml -- diff --git a/examples/pom.xml b/examples/pom.xml index bd3b317..2752ce3 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent -1.2.0 +1.2.0-SNAPSHOT ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/e1339dae/external/flume-sink/pom.xml -- diff --git a/external/flume-sink/pom.xml b/external/flume-sink/pom.xml index ff7babc..ac291bd 100644 --- a/external/flume-sink/pom.xml +++ b/
[1/2] spark git commit: Revert "Preparing development version 1.2.1-SNAPSHOT"
Repository: spark Updated Branches: refs/heads/branch-1.2 8305e803e -> e1339daec Revert "Preparing development version 1.2.1-SNAPSHOT" This reverts commit 6c6fd218c83a049c874b8a0ea737333c1899c94a. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c3fd9aef Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c3fd9aef Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c3fd9aef Branch: refs/heads/branch-1.2 Commit: c3fd9aef99134f3f649285c5f013f81b3e8e697e Parents: 8305e80 Author: Patrick Wendell Authored: Sun Nov 16 22:13:29 2014 -0800 Committer: Patrick Wendell Committed: Sun Nov 16 22:13:29 2014 -0800 -- assembly/pom.xml | 4 ++-- bagel/pom.xml | 4 ++-- core/pom.xml | 4 ++-- core/src/main/scala/org/apache/spark/package.scala | 2 +- examples/pom.xml | 4 ++-- external/flume-sink/pom.xml| 4 ++-- external/flume/pom.xml | 4 ++-- external/kafka/pom.xml | 4 ++-- external/mqtt/pom.xml | 4 ++-- external/twitter/pom.xml | 4 ++-- external/zeromq/pom.xml| 4 ++-- extras/java8-tests/pom.xml | 4 ++-- extras/kinesis-asl/pom.xml | 4 ++-- extras/spark-ganglia-lgpl/pom.xml | 4 ++-- graphx/pom.xml | 4 ++-- mllib/pom.xml | 4 ++-- network/common/pom.xml | 4 ++-- network/shuffle/pom.xml| 4 ++-- network/yarn/pom.xml | 4 ++-- pom.xml| 4 ++-- repl/pom.xml | 4 ++-- sql/catalyst/pom.xml | 4 ++-- sql/core/pom.xml | 4 ++-- sql/hive-thriftserver/pom.xml | 4 ++-- sql/hive/pom.xml | 4 ++-- streaming/pom.xml | 4 ++-- tools/pom.xml | 4 ++-- yarn/alpha/pom.xml | 6 +++--- yarn/pom.xml | 4 ++-- yarn/stable/pom.xml| 6 +++--- 30 files changed, 61 insertions(+), 61 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c3fd9aef/assembly/pom.xml -- diff --git a/assembly/pom.xml b/assembly/pom.xml index 0c79f10..5af17c5 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -21,12 +21,12 @@ org.apache.spark spark-parent -1.2.1-SNAPSHOT +1.2.0 ../pom.xml org.apache.spark - spark-assembly_2.11 + spark-assembly_2.10 Spark Project Assembly http://spark.apache.org/ pom http://git-wip-us.apache.org/repos/asf/spark/blob/c3fd9aef/bagel/pom.xml -- diff --git a/bagel/pom.xml b/bagel/pom.xml index c968da9..1f43fcc 100644 --- a/bagel/pom.xml +++ b/bagel/pom.xml @@ -21,12 +21,12 @@ org.apache.spark spark-parent -1.2.1-SNAPSHOT +1.2.0 ../pom.xml org.apache.spark - spark-bagel_2.11 + spark-bagel_2.10 bagel http://git-wip-us.apache.org/repos/asf/spark/blob/c3fd9aef/core/pom.xml -- diff --git a/core/pom.xml b/core/pom.xml index 83c0307..15cb382 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -21,12 +21,12 @@ org.apache.spark spark-parent -1.2.1-SNAPSHOT +1.2.0 ../pom.xml org.apache.spark - spark-core_2.11 + spark-core_2.10 core http://git-wip-us.apache.org/repos/asf/spark/blob/c3fd9aef/core/src/main/scala/org/apache/spark/package.scala -- diff --git a/core/src/main/scala/org/apache/spark/package.scala b/core/src/main/scala/org/apache/spark/package.scala index b1a511f..873ec3c 100644 --- a/core/src/main/scala/org/apache/spark/package.scala +++ b/core/src/main/scala/org/apache/spark/package.scala @@ -44,5 +44,5 @@ package org.apache package object spark { // For package docs only - val SPARK_VERSION = "1.2.1-SNAPSHOT" + val SPARK_VERSION = "1.2.0" } http://git-wip-us.apache.org/repos/asf/spark/blob/c3fd9aef/examples/pom.xml -- diff --git a/examples/pom.xml b/examples/pom.xml index 3bc1149..bd3b317 100644 --- a/examples/pom.xml
spark git commit: [SPARK-4410][SQL] Add support for external sort
Repository: spark Updated Branches: refs/heads/branch-1.2 f3b93c1ba -> 8305e803e [SPARK-4410][SQL] Add support for external sort Adds a new operator that uses Spark's `ExternalSort` class. It is off by default now, but we might consider making it the default if benchmarks show that it does not regress performance. Author: Michael Armbrust Closes #3268 from marmbrus/externalSort and squashes the following commits: 48b9726 [Michael Armbrust] comments b98799d [Michael Armbrust] Add test afd7562 [Michael Armbrust] Add support for external sort. (cherry picked from commit 64c6b9bad559c21f25cd9fbe37c8813cdab939f2) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8305e803 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8305e803 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8305e803 Branch: refs/heads/branch-1.2 Commit: 8305e803e23808507b68fa9a6876ee455e58ac27 Parents: f3b93c1 Author: Michael Armbrust Authored: Sun Nov 16 21:55:57 2014 -0800 Committer: Reynold Xin Committed: Sun Nov 16 21:56:30 2014 -0800 -- .../scala/org/apache/spark/sql/SQLConf.scala| 7 .../spark/sql/execution/SparkStrategies.scala | 5 ++- .../spark/sql/execution/basicOperators.scala| 37 +--- .../org/apache/spark/sql/SQLQuerySuite.scala| 16 - 4 files changed, 59 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8305e803/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index cd7d78e..9697beb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -39,6 +39,10 @@ private[spark] object SQLConf { val COLUMN_NAME_OF_CORRUPT_RECORD = "spark.sql.columnNameOfCorruptRecord" + // Options that control which operators can be chosen by the query planner. These should be + // considered hints and may be ignored by future versions of Spark SQL. + val EXTERNAL_SORT = "spark.sql.planner.externalSort" + // This is only used for the thriftserver val THRIFTSERVER_POOL = "spark.sql.thriftserver.scheduler.pool" @@ -96,6 +100,9 @@ private[sql] trait SQLConf { private[spark] def parquetFilterPushDown = getConf(PARQUET_FILTER_PUSHDOWN_ENABLED, "false").toBoolean + /** When true the planner will use the external sort, which may spill to disk. */ + private[spark] def externalSortEnabled: Boolean = getConf(EXTERNAL_SORT, "false").toBoolean + /** * When set to true, Spark SQL will use the Scala compiler at runtime to generate custom bytecode * that evaluates expressions found in queries. In general this custom code runs much faster http://git-wip-us.apache.org/repos/asf/spark/blob/8305e803/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 03cd5bd..7ef1f9f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -263,9 +263,12 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case logical.Distinct(child) => execution.Distinct(partial = false, execution.Distinct(partial = true, planLater(child))) :: Nil + + case logical.Sort(sortExprs, child) if sqlContext.externalSortEnabled => +execution.ExternalSort(sortExprs, global = true, planLater(child)):: Nil case logical.Sort(sortExprs, child) => -// This sort is a global sort. Its requiredDistribution will be an OrderedDistribution. execution.Sort(sortExprs, global = true, planLater(child)):: Nil + case logical.SortPartitions(sortExprs, child) => // This sort only sorts tuples within a partition. Its requiredDistribution will be // an UnspecifiedDistribution. http://git-wip-us.apache.org/repos/asf/spark/blob/8305e803/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index 1b8ba3a..e53723c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperator
spark git commit: [SPARK-4410][SQL] Add support for external sort
Repository: spark Updated Branches: refs/heads/master 5168c6ca9 -> 64c6b9bad [SPARK-4410][SQL] Add support for external sort Adds a new operator that uses Spark's `ExternalSort` class. It is off by default now, but we might consider making it the default if benchmarks show that it does not regress performance. Author: Michael Armbrust Closes #3268 from marmbrus/externalSort and squashes the following commits: 48b9726 [Michael Armbrust] comments b98799d [Michael Armbrust] Add test afd7562 [Michael Armbrust] Add support for external sort. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/64c6b9ba Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/64c6b9ba Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/64c6b9ba Branch: refs/heads/master Commit: 64c6b9bad559c21f25cd9fbe37c8813cdab939f2 Parents: 5168c6c Author: Michael Armbrust Authored: Sun Nov 16 21:55:57 2014 -0800 Committer: Reynold Xin Committed: Sun Nov 16 21:55:57 2014 -0800 -- .../scala/org/apache/spark/sql/SQLConf.scala| 7 .../spark/sql/execution/SparkStrategies.scala | 5 ++- .../spark/sql/execution/basicOperators.scala| 37 +--- .../org/apache/spark/sql/SQLQuerySuite.scala| 16 - 4 files changed, 59 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/64c6b9ba/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index cd7d78e..9697beb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -39,6 +39,10 @@ private[spark] object SQLConf { val COLUMN_NAME_OF_CORRUPT_RECORD = "spark.sql.columnNameOfCorruptRecord" + // Options that control which operators can be chosen by the query planner. These should be + // considered hints and may be ignored by future versions of Spark SQL. + val EXTERNAL_SORT = "spark.sql.planner.externalSort" + // This is only used for the thriftserver val THRIFTSERVER_POOL = "spark.sql.thriftserver.scheduler.pool" @@ -96,6 +100,9 @@ private[sql] trait SQLConf { private[spark] def parquetFilterPushDown = getConf(PARQUET_FILTER_PUSHDOWN_ENABLED, "false").toBoolean + /** When true the planner will use the external sort, which may spill to disk. */ + private[spark] def externalSortEnabled: Boolean = getConf(EXTERNAL_SORT, "false").toBoolean + /** * When set to true, Spark SQL will use the Scala compiler at runtime to generate custom bytecode * that evaluates expressions found in queries. In general this custom code runs much faster http://git-wip-us.apache.org/repos/asf/spark/blob/64c6b9ba/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 03cd5bd..7ef1f9f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -263,9 +263,12 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case logical.Distinct(child) => execution.Distinct(partial = false, execution.Distinct(partial = true, planLater(child))) :: Nil + + case logical.Sort(sortExprs, child) if sqlContext.externalSortEnabled => +execution.ExternalSort(sortExprs, global = true, planLater(child)):: Nil case logical.Sort(sortExprs, child) => -// This sort is a global sort. Its requiredDistribution will be an OrderedDistribution. execution.Sort(sortExprs, global = true, planLater(child)):: Nil + case logical.SortPartitions(sortExprs, child) => // This sort only sorts tuples within a partition. Its requiredDistribution will be // an UnspecifiedDistribution. http://git-wip-us.apache.org/repos/asf/spark/blob/64c6b9ba/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index 1b8ba3a..e53723c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -29,6 +29,7
spark git commit: [SPARK-4422][MLLIB]In some cases, Vectors.fromBreeze get wrong results.
Repository: spark Updated Branches: refs/heads/branch-1.2 6c6fd218c -> f3b93c1ba [SPARK-4422][MLLIB]In some cases, Vectors.fromBreeze get wrong results. cc mengxr Author: GuoQiang Li Closes #3281 from witgo/SPARK-4422 and squashes the following commits: 5f1fa5e [GuoQiang Li] import order 50783bd [GuoQiang Li] review commits 7a10123 [GuoQiang Li] In some cases, Vectors.fromBreeze get wrong results. (cherry picked from commit 5168c6ca9f0008027d688661bae57c28cf386b54) Signed-off-by: Xiangrui Meng Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f3b93c1b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f3b93c1b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f3b93c1b Branch: refs/heads/branch-1.2 Commit: f3b93c1bac292fccb05bf16d1da4b1863b3031fd Parents: 6c6fd21 Author: GuoQiang Li Authored: Sun Nov 16 21:31:51 2014 -0800 Committer: Xiangrui Meng Committed: Sun Nov 16 21:32:09 2014 -0800 -- .../main/scala/org/apache/spark/mllib/linalg/Vectors.scala| 2 +- .../scala/org/apache/spark/mllib/linalg/VectorsSuite.scala| 7 +++ 2 files changed, 8 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f3b93c1b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala index 9fccd63..60ab2aa 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala @@ -237,7 +237,7 @@ object Vectors { private[mllib] def fromBreeze(breezeVector: BV[Double]): Vector = { breezeVector match { case v: BDV[Double] => -if (v.offset == 0 && v.stride == 1) { +if (v.offset == 0 && v.stride == 1 && v.length == v.data.length) { new DenseVector(v.data) } else { new DenseVector(v.toArray) // Can't use underlying array directly, so make a new one http://git-wip-us.apache.org/repos/asf/spark/blob/f3b93c1b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala -- diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala index 93a84fe..59cd85e 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.mllib.linalg +import breeze.linalg.{DenseMatrix => BDM} import org.scalatest.FunSuite import org.apache.spark.SparkException @@ -166,4 +167,10 @@ class VectorsSuite extends FunSuite { assert(v === udt.deserialize(udt.serialize(v))) } } + + test("fromBreeze") { +val x = BDM.zeros[Double](10, 10) +val v = Vectors.fromBreeze(x(::, 0)) +assert(v.size === x.rows) + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-4422][MLLIB]In some cases, Vectors.fromBreeze get wrong results.
Repository: spark Updated Branches: refs/heads/master 45ce3273c -> 5168c6ca9 [SPARK-4422][MLLIB]In some cases, Vectors.fromBreeze get wrong results. cc mengxr Author: GuoQiang Li Closes #3281 from witgo/SPARK-4422 and squashes the following commits: 5f1fa5e [GuoQiang Li] import order 50783bd [GuoQiang Li] review commits 7a10123 [GuoQiang Li] In some cases, Vectors.fromBreeze get wrong results. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5168c6ca Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5168c6ca Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5168c6ca Branch: refs/heads/master Commit: 5168c6ca9f0008027d688661bae57c28cf386b54 Parents: 45ce327 Author: GuoQiang Li Authored: Sun Nov 16 21:31:51 2014 -0800 Committer: Xiangrui Meng Committed: Sun Nov 16 21:31:51 2014 -0800 -- .../main/scala/org/apache/spark/mllib/linalg/Vectors.scala| 2 +- .../scala/org/apache/spark/mllib/linalg/VectorsSuite.scala| 7 +++ 2 files changed, 8 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5168c6ca/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala index 9fccd63..60ab2aa 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala @@ -237,7 +237,7 @@ object Vectors { private[mllib] def fromBreeze(breezeVector: BV[Double]): Vector = { breezeVector match { case v: BDV[Double] => -if (v.offset == 0 && v.stride == 1) { +if (v.offset == 0 && v.stride == 1 && v.length == v.data.length) { new DenseVector(v.data) } else { new DenseVector(v.toArray) // Can't use underlying array directly, so make a new one http://git-wip-us.apache.org/repos/asf/spark/blob/5168c6ca/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala -- diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala index 93a84fe..59cd85e 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.mllib.linalg +import breeze.linalg.{DenseMatrix => BDM} import org.scalatest.FunSuite import org.apache.spark.SparkException @@ -166,4 +167,10 @@ class VectorsSuite extends FunSuite { assert(v === udt.deserialize(udt.serialize(v))) } } + + test("fromBreeze") { +val x = BDM.zeros[Double](10, 10) +val v = Vectors.fromBreeze(x(::, 0)) +assert(v.size === x.rows) + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[1/2] spark git commit: Preparing Spark release v1.2.0-snapshot0
Repository: spark Updated Branches: refs/heads/branch-1.2 70d037168 -> 6c6fd218c Preparing Spark release v1.2.0-snapshot0 Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bc098757 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bc098757 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bc098757 Branch: refs/heads/branch-1.2 Commit: bc09875799aa373f4320d38b02618173ffa4c96f Parents: 70d0371 Author: Ubuntu Authored: Mon Nov 17 02:10:59 2014 + Committer: Ubuntu Committed: Mon Nov 17 02:10:59 2014 + -- assembly/pom.xml | 2 +- bagel/pom.xml | 2 +- core/pom.xml | 2 +- core/src/main/scala/org/apache/spark/package.scala | 2 +- examples/pom.xml | 2 +- external/flume-sink/pom.xml| 2 +- external/flume/pom.xml | 2 +- external/kafka/pom.xml | 2 +- external/mqtt/pom.xml | 2 +- external/twitter/pom.xml | 2 +- external/zeromq/pom.xml| 2 +- extras/java8-tests/pom.xml | 2 +- extras/kinesis-asl/pom.xml | 2 +- extras/spark-ganglia-lgpl/pom.xml | 2 +- graphx/pom.xml | 2 +- mllib/pom.xml | 2 +- network/common/pom.xml | 2 +- network/shuffle/pom.xml| 2 +- network/yarn/pom.xml | 2 +- pom.xml| 2 +- repl/pom.xml | 2 +- sql/catalyst/pom.xml | 2 +- sql/core/pom.xml | 2 +- sql/hive-thriftserver/pom.xml | 2 +- sql/hive/pom.xml | 2 +- streaming/pom.xml | 2 +- tools/pom.xml | 2 +- yarn/alpha/pom.xml | 2 +- yarn/pom.xml | 2 +- yarn/stable/pom.xml| 2 +- 30 files changed, 30 insertions(+), 30 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/bc098757/assembly/pom.xml -- diff --git a/assembly/pom.xml b/assembly/pom.xml index c65192b..5af17c5 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent -1.2.0-SNAPSHOT +1.2.0 ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/bc098757/bagel/pom.xml -- diff --git a/bagel/pom.xml b/bagel/pom.xml index 93db0d5..1f43fcc 100644 --- a/bagel/pom.xml +++ b/bagel/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent -1.2.0-SNAPSHOT +1.2.0 ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/bc098757/core/pom.xml -- diff --git a/core/pom.xml b/core/pom.xml index 492eddd..15cb382 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent -1.2.0-SNAPSHOT +1.2.0 ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/bc098757/core/src/main/scala/org/apache/spark/package.scala -- diff --git a/core/src/main/scala/org/apache/spark/package.scala b/core/src/main/scala/org/apache/spark/package.scala index e2fc9c6..873ec3c 100644 --- a/core/src/main/scala/org/apache/spark/package.scala +++ b/core/src/main/scala/org/apache/spark/package.scala @@ -44,5 +44,5 @@ package org.apache package object spark { // For package docs only - val SPARK_VERSION = "1.2.0-SNAPSHOT" + val SPARK_VERSION = "1.2.0" } http://git-wip-us.apache.org/repos/asf/spark/blob/bc098757/examples/pom.xml -- diff --git a/examples/pom.xml b/examples/pom.xml index 2752ce3..bd3b317 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent -1.2.0-SNAPSHOT +1.2.0 ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/bc098757/external/flume-sink/pom.xml -- diff --git a/external/flume-sink/pom.xml b/external/flume-sink/pom.xml index ac291bd..ff7babc 100644 --- a/external/flume-sink/pom.xml +++ b/exter
[2/2] spark git commit: Preparing development version 1.2.1-SNAPSHOT
Preparing development version 1.2.1-SNAPSHOT Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6c6fd218 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6c6fd218 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6c6fd218 Branch: refs/heads/branch-1.2 Commit: 6c6fd218c83a049c874b8a0ea737333c1899c94a Parents: bc09875 Author: Ubuntu Authored: Mon Nov 17 03:09:19 2014 + Committer: Ubuntu Committed: Mon Nov 17 03:09:19 2014 + -- assembly/pom.xml | 4 ++-- bagel/pom.xml | 4 ++-- core/pom.xml | 4 ++-- core/src/main/scala/org/apache/spark/package.scala | 2 +- examples/pom.xml | 4 ++-- external/flume-sink/pom.xml| 4 ++-- external/flume/pom.xml | 4 ++-- external/kafka/pom.xml | 4 ++-- external/mqtt/pom.xml | 4 ++-- external/twitter/pom.xml | 4 ++-- external/zeromq/pom.xml| 4 ++-- extras/java8-tests/pom.xml | 4 ++-- extras/kinesis-asl/pom.xml | 4 ++-- extras/spark-ganglia-lgpl/pom.xml | 4 ++-- graphx/pom.xml | 4 ++-- mllib/pom.xml | 4 ++-- network/common/pom.xml | 4 ++-- network/shuffle/pom.xml| 4 ++-- network/yarn/pom.xml | 4 ++-- pom.xml| 4 ++-- repl/pom.xml | 4 ++-- sql/catalyst/pom.xml | 4 ++-- sql/core/pom.xml | 4 ++-- sql/hive-thriftserver/pom.xml | 4 ++-- sql/hive/pom.xml | 4 ++-- streaming/pom.xml | 4 ++-- tools/pom.xml | 4 ++-- yarn/alpha/pom.xml | 6 +++--- yarn/pom.xml | 4 ++-- yarn/stable/pom.xml| 6 +++--- 30 files changed, 61 insertions(+), 61 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6c6fd218/assembly/pom.xml -- diff --git a/assembly/pom.xml b/assembly/pom.xml index 5af17c5..0c79f10 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -21,12 +21,12 @@ org.apache.spark spark-parent -1.2.0 +1.2.1-SNAPSHOT ../pom.xml org.apache.spark - spark-assembly_2.10 + spark-assembly_2.11 Spark Project Assembly http://spark.apache.org/ pom http://git-wip-us.apache.org/repos/asf/spark/blob/6c6fd218/bagel/pom.xml -- diff --git a/bagel/pom.xml b/bagel/pom.xml index 1f43fcc..c968da9 100644 --- a/bagel/pom.xml +++ b/bagel/pom.xml @@ -21,12 +21,12 @@ org.apache.spark spark-parent -1.2.0 +1.2.1-SNAPSHOT ../pom.xml org.apache.spark - spark-bagel_2.10 + spark-bagel_2.11 bagel http://git-wip-us.apache.org/repos/asf/spark/blob/6c6fd218/core/pom.xml -- diff --git a/core/pom.xml b/core/pom.xml index 15cb382..83c0307 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -21,12 +21,12 @@ org.apache.spark spark-parent -1.2.0 +1.2.1-SNAPSHOT ../pom.xml org.apache.spark - spark-core_2.10 + spark-core_2.11 core http://git-wip-us.apache.org/repos/asf/spark/blob/6c6fd218/core/src/main/scala/org/apache/spark/package.scala -- diff --git a/core/src/main/scala/org/apache/spark/package.scala b/core/src/main/scala/org/apache/spark/package.scala index 873ec3c..b1a511f 100644 --- a/core/src/main/scala/org/apache/spark/package.scala +++ b/core/src/main/scala/org/apache/spark/package.scala @@ -44,5 +44,5 @@ package org.apache package object spark { // For package docs only - val SPARK_VERSION = "1.2.0" + val SPARK_VERSION = "1.2.1-SNAPSHOT" } http://git-wip-us.apache.org/repos/asf/spark/blob/6c6fd218/examples/pom.xml -- diff --git a/examples/pom.xml b/examples/pom.xml index bd3b317..3bc1149 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -21,12 +21,12 @@ org.apache.spark spark-parent -1.2.0 +1.2.1-SNAPSHOT ../pom.xml org.apache.spark - spark-examp
Git Push Summary
Repository: spark Updated Tags: refs/tags/v1.2.0-snapshot0 [created] bc0987579 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
Git Push Summary
Repository: spark Updated Tags: refs/tags/test [deleted] 70d037168 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
Git Push Summary
Repository: spark Updated Tags: refs/tags/test [created] 70d037168 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: Revert "[SPARK-4309][SPARK-4407][SQL] Date type support for Thrift server, and fixes for complex types"
Repository: spark Updated Branches: refs/heads/branch-1.2 8b83a34fa -> 70d037168 Revert "[SPARK-4309][SPARK-4407][SQL] Date type support for Thrift server, and fixes for complex types" Author: Michael Armbrust Closes #3292 from marmbrus/revert4309 and squashes the following commits: 808e96e [Michael Armbrust] Revert "[SPARK-4309][SPARK-4407][SQL] Date type support for Thrift server, and fixes for complex types" (cherry picked from commit 45ce3273cb618d14ec4d20c4c95699634b951086) Signed-off-by: Michael Armbrust Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/70d03716 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/70d03716 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/70d03716 Branch: refs/heads/branch-1.2 Commit: 70d0371683a56059a7b4c4ebdab6e2fe055b9a76 Parents: 8b83a34 Author: Michael Armbrust Authored: Sun Nov 16 15:05:04 2014 -0800 Committer: Michael Armbrust Committed: Sun Nov 16 15:05:30 2014 -0800 -- .../thriftserver/HiveThriftServer2Suite.scala | 90 - .../spark/sql/hive/thriftserver/Shim12.scala| 11 +- .../spark/sql/hive/thriftserver/Shim13.scala| 29 +++-- .../org/apache/spark/sql/hive/HiveContext.scala | 127 ++- 4 files changed, 115 insertions(+), 142 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/70d03716/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala -- diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala index 23d12cb..bba29b2 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala @@ -19,10 +19,9 @@ package org.apache.spark.sql.hive.thriftserver import java.io.File import java.net.ServerSocket -import java.sql.{Date, DriverManager, Statement} +import java.sql.{DriverManager, Statement} import java.util.concurrent.TimeoutException -import scala.collection.JavaConversions._ import scala.collection.mutable.ArrayBuffer import scala.concurrent.duration._ import scala.concurrent.{Await, Promise} @@ -52,15 +51,6 @@ import org.apache.spark.sql.hive.HiveShim class HiveThriftServer2Suite extends FunSuite with Logging { Class.forName(classOf[HiveDriver].getCanonicalName) - object TestData { -def getTestDataFilePath(name: String) = { - Thread.currentThread().getContextClassLoader.getResource(s"data/files/$name") -} - -val smallKv = getTestDataFilePath("small_kv.txt") -val smallKvWithNull = getTestDataFilePath("small_kv_with_null.txt") - } - def randomListeningPort = { // Let the system to choose a random available port to avoid collision with other parallel // builds. @@ -155,8 +145,12 @@ class HiveThriftServer2Suite extends FunSuite with Logging { } } -// Resets SPARK_TESTING to avoid loading Log4J configurations in testing class paths -val env = Seq("SPARK_TESTING" -> "0") +val env = Seq( + // Resets SPARK_TESTING to avoid loading Log4J configurations in testing class paths + "SPARK_TESTING" -> "0", + // Prevents loading classes out of the assembly jar. Otherwise Utils.sparkVersion can't read + // proper version information from the jar manifest. + "SPARK_PREPEND_CLASSES" -> "") Process(command, None, env: _*).run(ProcessLogger( captureThriftServerOutput("stdout"), @@ -200,12 +194,15 @@ class HiveThriftServer2Suite extends FunSuite with Logging { test("Test JDBC query execution") { withJdbcStatement() { statement => - val queries = Seq( -"SET spark.sql.shuffle.partitions=3", -"DROP TABLE IF EXISTS test", -"CREATE TABLE test(key INT, val STRING)", -s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test", -"CACHE TABLE test") + val dataFilePath = + Thread.currentThread().getContextClassLoader.getResource("data/files/small_kv.txt") + + val queries = +s"""SET spark.sql.shuffle.partitions=3; + |CREATE TABLE test(key INT, val STRING); + |LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE test; + |CACHE TABLE test; + """.stripMargin.split(";").map(_.trim).filter(_.nonEmpty) queries.foreach(statement.execute) @@ -219,10 +216,14 @@ class HiveThriftServer2Suite extends FunSuite with Logging { test("SPARK-3004 regression: result set containing NULL") { wit
spark git commit: Revert "[SPARK-4309][SPARK-4407][SQL] Date type support for Thrift server, and fixes for complex types"
Repository: spark Updated Branches: refs/heads/master cb6bd83a9 -> 45ce3273c Revert "[SPARK-4309][SPARK-4407][SQL] Date type support for Thrift server, and fixes for complex types" Author: Michael Armbrust Closes #3292 from marmbrus/revert4309 and squashes the following commits: 808e96e [Michael Armbrust] Revert "[SPARK-4309][SPARK-4407][SQL] Date type support for Thrift server, and fixes for complex types" Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/45ce3273 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/45ce3273 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/45ce3273 Branch: refs/heads/master Commit: 45ce3273cb618d14ec4d20c4c95699634b951086 Parents: cb6bd83 Author: Michael Armbrust Authored: Sun Nov 16 15:05:04 2014 -0800 Committer: Michael Armbrust Committed: Sun Nov 16 15:05:08 2014 -0800 -- .../thriftserver/HiveThriftServer2Suite.scala | 90 - .../spark/sql/hive/thriftserver/Shim12.scala| 11 +- .../spark/sql/hive/thriftserver/Shim13.scala| 29 +++-- .../org/apache/spark/sql/hive/HiveContext.scala | 127 ++- 4 files changed, 115 insertions(+), 142 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/45ce3273/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala -- diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala index 23d12cb..bba29b2 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala @@ -19,10 +19,9 @@ package org.apache.spark.sql.hive.thriftserver import java.io.File import java.net.ServerSocket -import java.sql.{Date, DriverManager, Statement} +import java.sql.{DriverManager, Statement} import java.util.concurrent.TimeoutException -import scala.collection.JavaConversions._ import scala.collection.mutable.ArrayBuffer import scala.concurrent.duration._ import scala.concurrent.{Await, Promise} @@ -52,15 +51,6 @@ import org.apache.spark.sql.hive.HiveShim class HiveThriftServer2Suite extends FunSuite with Logging { Class.forName(classOf[HiveDriver].getCanonicalName) - object TestData { -def getTestDataFilePath(name: String) = { - Thread.currentThread().getContextClassLoader.getResource(s"data/files/$name") -} - -val smallKv = getTestDataFilePath("small_kv.txt") -val smallKvWithNull = getTestDataFilePath("small_kv_with_null.txt") - } - def randomListeningPort = { // Let the system to choose a random available port to avoid collision with other parallel // builds. @@ -155,8 +145,12 @@ class HiveThriftServer2Suite extends FunSuite with Logging { } } -// Resets SPARK_TESTING to avoid loading Log4J configurations in testing class paths -val env = Seq("SPARK_TESTING" -> "0") +val env = Seq( + // Resets SPARK_TESTING to avoid loading Log4J configurations in testing class paths + "SPARK_TESTING" -> "0", + // Prevents loading classes out of the assembly jar. Otherwise Utils.sparkVersion can't read + // proper version information from the jar manifest. + "SPARK_PREPEND_CLASSES" -> "") Process(command, None, env: _*).run(ProcessLogger( captureThriftServerOutput("stdout"), @@ -200,12 +194,15 @@ class HiveThriftServer2Suite extends FunSuite with Logging { test("Test JDBC query execution") { withJdbcStatement() { statement => - val queries = Seq( -"SET spark.sql.shuffle.partitions=3", -"DROP TABLE IF EXISTS test", -"CREATE TABLE test(key INT, val STRING)", -s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test", -"CACHE TABLE test") + val dataFilePath = + Thread.currentThread().getContextClassLoader.getResource("data/files/small_kv.txt") + + val queries = +s"""SET spark.sql.shuffle.partitions=3; + |CREATE TABLE test(key INT, val STRING); + |LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE test; + |CACHE TABLE test; + """.stripMargin.split(";").map(_.trim).filter(_.nonEmpty) queries.foreach(statement.execute) @@ -219,10 +216,14 @@ class HiveThriftServer2Suite extends FunSuite with Logging { test("SPARK-3004 regression: result set containing NULL") { withJdbcStatement() { statement => + val dataFilePath = +Thread.currentThread().getContextClassLoader
spark git commit: [SPARK-4309][SPARK-4407][SQL] Date type support for Thrift server, and fixes for complex types
Repository: spark Updated Branches: refs/heads/branch-1.2 2200de635 -> 8b83a34fa [SPARK-4309][SPARK-4407][SQL] Date type support for Thrift server, and fixes for complex types SPARK-4407 was detected while working on SPARK-4309. Merged these two into a single PR since 1.2.0 RC is approaching. [https://reviewable.io/review_button.png"; height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/3178) Author: Cheng Lian Closes #3178 from liancheng/date-for-thriftserver and squashes the following commits: 6f71d0b [Cheng Lian] Makes toHiveString static 26fa955 [Cheng Lian] Fixes complex type support in Hive 0.13.1 shim a92882a [Cheng Lian] Updates HiveShim for 0.13.1 73f442b [Cheng Lian] Adds Date support for HiveThriftServer2 (Hive 0.12.0) (cherry picked from commit cb6bd83a91d9b4a227dc6467255231869c1820e2) Signed-off-by: Michael Armbrust Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8b83a34f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8b83a34f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8b83a34f Branch: refs/heads/branch-1.2 Commit: 8b83a34fa310f4e6802c5ef32dcc737f6fb4903f Parents: 2200de6 Author: Cheng Lian Authored: Sun Nov 16 14:26:41 2014 -0800 Committer: Michael Armbrust Committed: Sun Nov 16 14:26:55 2014 -0800 -- .../thriftserver/HiveThriftServer2Suite.scala | 90 + .../spark/sql/hive/thriftserver/Shim12.scala| 11 +- .../spark/sql/hive/thriftserver/Shim13.scala| 29 ++--- .../org/apache/spark/sql/hive/HiveContext.scala | 127 +-- 4 files changed, 142 insertions(+), 115 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8b83a34f/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala -- diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala index bba29b2..23d12cb 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala @@ -19,9 +19,10 @@ package org.apache.spark.sql.hive.thriftserver import java.io.File import java.net.ServerSocket -import java.sql.{DriverManager, Statement} +import java.sql.{Date, DriverManager, Statement} import java.util.concurrent.TimeoutException +import scala.collection.JavaConversions._ import scala.collection.mutable.ArrayBuffer import scala.concurrent.duration._ import scala.concurrent.{Await, Promise} @@ -51,6 +52,15 @@ import org.apache.spark.sql.hive.HiveShim class HiveThriftServer2Suite extends FunSuite with Logging { Class.forName(classOf[HiveDriver].getCanonicalName) + object TestData { +def getTestDataFilePath(name: String) = { + Thread.currentThread().getContextClassLoader.getResource(s"data/files/$name") +} + +val smallKv = getTestDataFilePath("small_kv.txt") +val smallKvWithNull = getTestDataFilePath("small_kv_with_null.txt") + } + def randomListeningPort = { // Let the system to choose a random available port to avoid collision with other parallel // builds. @@ -145,12 +155,8 @@ class HiveThriftServer2Suite extends FunSuite with Logging { } } -val env = Seq( - // Resets SPARK_TESTING to avoid loading Log4J configurations in testing class paths - "SPARK_TESTING" -> "0", - // Prevents loading classes out of the assembly jar. Otherwise Utils.sparkVersion can't read - // proper version information from the jar manifest. - "SPARK_PREPEND_CLASSES" -> "") +// Resets SPARK_TESTING to avoid loading Log4J configurations in testing class paths +val env = Seq("SPARK_TESTING" -> "0") Process(command, None, env: _*).run(ProcessLogger( captureThriftServerOutput("stdout"), @@ -194,15 +200,12 @@ class HiveThriftServer2Suite extends FunSuite with Logging { test("Test JDBC query execution") { withJdbcStatement() { statement => - val dataFilePath = - Thread.currentThread().getContextClassLoader.getResource("data/files/small_kv.txt") - - val queries = -s"""SET spark.sql.shuffle.partitions=3; - |CREATE TABLE test(key INT, val STRING); - |LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE test; - |CACHE TABLE test; - """.stripMargin.split(";").map(_.trim).filter(_.nonEmpty) + val queries = Seq( +"SET spark.sql.shuffle.partitions=3", +"DROP TABLE IF EXISTS test", +"CRE
spark git commit: [SPARK-4309][SPARK-4407][SQL] Date type support for Thrift server, and fixes for complex types
Repository: spark Updated Branches: refs/heads/master 7850e0c70 -> cb6bd83a9 [SPARK-4309][SPARK-4407][SQL] Date type support for Thrift server, and fixes for complex types SPARK-4407 was detected while working on SPARK-4309. Merged these two into a single PR since 1.2.0 RC is approaching. [https://reviewable.io/review_button.png"; height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/3178) Author: Cheng Lian Closes #3178 from liancheng/date-for-thriftserver and squashes the following commits: 6f71d0b [Cheng Lian] Makes toHiveString static 26fa955 [Cheng Lian] Fixes complex type support in Hive 0.13.1 shim a92882a [Cheng Lian] Updates HiveShim for 0.13.1 73f442b [Cheng Lian] Adds Date support for HiveThriftServer2 (Hive 0.12.0) Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cb6bd83a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cb6bd83a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cb6bd83a Branch: refs/heads/master Commit: cb6bd83a91d9b4a227dc6467255231869c1820e2 Parents: 7850e0c Author: Cheng Lian Authored: Sun Nov 16 14:26:41 2014 -0800 Committer: Michael Armbrust Committed: Sun Nov 16 14:26:41 2014 -0800 -- .../thriftserver/HiveThriftServer2Suite.scala | 90 + .../spark/sql/hive/thriftserver/Shim12.scala| 11 +- .../spark/sql/hive/thriftserver/Shim13.scala| 29 ++--- .../org/apache/spark/sql/hive/HiveContext.scala | 127 +-- 4 files changed, 142 insertions(+), 115 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/cb6bd83a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala -- diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala index bba29b2..23d12cb 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala @@ -19,9 +19,10 @@ package org.apache.spark.sql.hive.thriftserver import java.io.File import java.net.ServerSocket -import java.sql.{DriverManager, Statement} +import java.sql.{Date, DriverManager, Statement} import java.util.concurrent.TimeoutException +import scala.collection.JavaConversions._ import scala.collection.mutable.ArrayBuffer import scala.concurrent.duration._ import scala.concurrent.{Await, Promise} @@ -51,6 +52,15 @@ import org.apache.spark.sql.hive.HiveShim class HiveThriftServer2Suite extends FunSuite with Logging { Class.forName(classOf[HiveDriver].getCanonicalName) + object TestData { +def getTestDataFilePath(name: String) = { + Thread.currentThread().getContextClassLoader.getResource(s"data/files/$name") +} + +val smallKv = getTestDataFilePath("small_kv.txt") +val smallKvWithNull = getTestDataFilePath("small_kv_with_null.txt") + } + def randomListeningPort = { // Let the system to choose a random available port to avoid collision with other parallel // builds. @@ -145,12 +155,8 @@ class HiveThriftServer2Suite extends FunSuite with Logging { } } -val env = Seq( - // Resets SPARK_TESTING to avoid loading Log4J configurations in testing class paths - "SPARK_TESTING" -> "0", - // Prevents loading classes out of the assembly jar. Otherwise Utils.sparkVersion can't read - // proper version information from the jar manifest. - "SPARK_PREPEND_CLASSES" -> "") +// Resets SPARK_TESTING to avoid loading Log4J configurations in testing class paths +val env = Seq("SPARK_TESTING" -> "0") Process(command, None, env: _*).run(ProcessLogger( captureThriftServerOutput("stdout"), @@ -194,15 +200,12 @@ class HiveThriftServer2Suite extends FunSuite with Logging { test("Test JDBC query execution") { withJdbcStatement() { statement => - val dataFilePath = - Thread.currentThread().getContextClassLoader.getResource("data/files/small_kv.txt") - - val queries = -s"""SET spark.sql.shuffle.partitions=3; - |CREATE TABLE test(key INT, val STRING); - |LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE test; - |CACHE TABLE test; - """.stripMargin.split(";").map(_.trim).filter(_.nonEmpty) + val queries = Seq( +"SET spark.sql.shuffle.partitions=3", +"DROP TABLE IF EXISTS test", +"CREATE TABLE test(key INT, val STRING)", +s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TA
spark git commit: [SPARK-4393] Fix memory leak in ConnectionManager ACK timeout TimerTasks; use HashedWheelTimer
Repository: spark Updated Branches: refs/heads/branch-1.2 24287014f -> 2200de635 [SPARK-4393] Fix memory leak in ConnectionManager ACK timeout TimerTasks; use HashedWheelTimer This patch is intended to fix a subtle memory leak in ConnectionManager's ACK timeout TimerTasks: in the old code, each TimerTask held a reference to the message being sent and a cancelled TimerTask won't necessarily be garbage-collected until it's scheduled to run, so this caused huge buildups of messages that weren't garbage collected until their timeouts expired, leading to OOMs. This patch addresses this problem by capturing only the message ID in the TimerTask instead of the whole message, and by keeping a WeakReference to the promise in the TimerTask. I've also modified this code to use Netty's HashedWheelTimer, whose performance characteristics should be better for this use-case. Thanks to cristianopris for narrowing down this issue! Author: Josh Rosen Closes #3259 from JoshRosen/connection-manager-timeout-bugfix and squashes the following commits: afcc8d6 [Josh Rosen] Address rxin's review feedback. 2a2e92d [Josh Rosen] Keep only WeakReference to promise in TimerTask; 0f0913b [Josh Rosen] Spelling fix: timout => timeout 3200c33 [Josh Rosen] Use Netty HashedWheelTimer f847dd4 [Josh Rosen] Don't capture entire message in ACK timeout task. (cherry picked from commit 7850e0c707affd5eafd570fb43716753396cf479) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2200de63 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2200de63 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2200de63 Branch: refs/heads/branch-1.2 Commit: 2200de6352fdc1000908554003912303edc3d160 Parents: 2428701 Author: Josh Rosen Authored: Sun Nov 16 00:44:15 2014 -0800 Committer: Reynold Xin Committed: Sun Nov 16 00:44:31 2014 -0800 -- .../spark/network/nio/ConnectionManager.scala | 47 +++- 1 file changed, 35 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2200de63/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala index f198aa8..df4b085 100644 --- a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala +++ b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala @@ -18,13 +18,13 @@ package org.apache.spark.network.nio import java.io.IOException +import java.lang.ref.WeakReference import java.net._ import java.nio._ import java.nio.channels._ import java.nio.channels.spi._ import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.{LinkedBlockingDeque, ThreadPoolExecutor, TimeUnit} -import java.util.{Timer, TimerTask} import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, SynchronizedMap, SynchronizedQueue} import scala.concurrent.duration._ @@ -32,6 +32,7 @@ import scala.concurrent.{Await, ExecutionContext, Future, Promise} import scala.language.postfixOps import com.google.common.base.Charsets.UTF_8 +import io.netty.util.{Timeout, TimerTask, HashedWheelTimer} import org.apache.spark._ import org.apache.spark.network.sasl.{SparkSaslClient, SparkSaslServer} @@ -77,7 +78,8 @@ private[nio] class ConnectionManager( } private val selector = SelectorProvider.provider.openSelector() - private val ackTimeoutMonitor = new Timer("AckTimeoutMonitor", true) + private val ackTimeoutMonitor = +new HashedWheelTimer(Utils.namedThreadFactory("AckTimeoutMonitor")) private val ackTimeout = conf.getInt("spark.core.connection.ack.wait.timeout", 60) @@ -139,7 +141,10 @@ private[nio] class ConnectionManager( new HashMap[SelectionKey, Connection] with SynchronizedMap[SelectionKey, Connection] private val connectionsById = new HashMap[ConnectionManagerId, SendingConnection] with SynchronizedMap[ConnectionManagerId, SendingConnection] - private val messageStatuses = new HashMap[Int, MessageStatus] + // Tracks sent messages for which we are awaiting acknowledgements. Entries are added to this + // map when messages are sent and are removed when acknowledgement messages are received or when + // acknowledgement timeouts expire + private val messageStatuses = new HashMap[Int, MessageStatus] // [MessageId, MessageStatus] private val keyInterestChangeRequests = new SynchronizedQueue[(SelectionKey, Int)] private val registerRequests = new SynchronizedQueue[SendingConnection] @@ -899,22 +904,41 @@ private[nio] class ConnectionManager( : Future[Message] = { val prom
spark git commit: [SPARK-4393] Fix memory leak in ConnectionManager ACK timeout TimerTasks; use HashedWheelTimer
Repository: spark Updated Branches: refs/heads/master 84468b2e2 -> 7850e0c70 [SPARK-4393] Fix memory leak in ConnectionManager ACK timeout TimerTasks; use HashedWheelTimer This patch is intended to fix a subtle memory leak in ConnectionManager's ACK timeout TimerTasks: in the old code, each TimerTask held a reference to the message being sent and a cancelled TimerTask won't necessarily be garbage-collected until it's scheduled to run, so this caused huge buildups of messages that weren't garbage collected until their timeouts expired, leading to OOMs. This patch addresses this problem by capturing only the message ID in the TimerTask instead of the whole message, and by keeping a WeakReference to the promise in the TimerTask. I've also modified this code to use Netty's HashedWheelTimer, whose performance characteristics should be better for this use-case. Thanks to cristianopris for narrowing down this issue! Author: Josh Rosen Closes #3259 from JoshRosen/connection-manager-timeout-bugfix and squashes the following commits: afcc8d6 [Josh Rosen] Address rxin's review feedback. 2a2e92d [Josh Rosen] Keep only WeakReference to promise in TimerTask; 0f0913b [Josh Rosen] Spelling fix: timout => timeout 3200c33 [Josh Rosen] Use Netty HashedWheelTimer f847dd4 [Josh Rosen] Don't capture entire message in ACK timeout task. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7850e0c7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7850e0c7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7850e0c7 Branch: refs/heads/master Commit: 7850e0c707affd5eafd570fb43716753396cf479 Parents: 84468b2 Author: Josh Rosen Authored: Sun Nov 16 00:44:15 2014 -0800 Committer: Reynold Xin Committed: Sun Nov 16 00:44:15 2014 -0800 -- .../spark/network/nio/ConnectionManager.scala | 47 +++- 1 file changed, 35 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7850e0c7/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala index f198aa8..df4b085 100644 --- a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala +++ b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala @@ -18,13 +18,13 @@ package org.apache.spark.network.nio import java.io.IOException +import java.lang.ref.WeakReference import java.net._ import java.nio._ import java.nio.channels._ import java.nio.channels.spi._ import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.{LinkedBlockingDeque, ThreadPoolExecutor, TimeUnit} -import java.util.{Timer, TimerTask} import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, SynchronizedMap, SynchronizedQueue} import scala.concurrent.duration._ @@ -32,6 +32,7 @@ import scala.concurrent.{Await, ExecutionContext, Future, Promise} import scala.language.postfixOps import com.google.common.base.Charsets.UTF_8 +import io.netty.util.{Timeout, TimerTask, HashedWheelTimer} import org.apache.spark._ import org.apache.spark.network.sasl.{SparkSaslClient, SparkSaslServer} @@ -77,7 +78,8 @@ private[nio] class ConnectionManager( } private val selector = SelectorProvider.provider.openSelector() - private val ackTimeoutMonitor = new Timer("AckTimeoutMonitor", true) + private val ackTimeoutMonitor = +new HashedWheelTimer(Utils.namedThreadFactory("AckTimeoutMonitor")) private val ackTimeout = conf.getInt("spark.core.connection.ack.wait.timeout", 60) @@ -139,7 +141,10 @@ private[nio] class ConnectionManager( new HashMap[SelectionKey, Connection] with SynchronizedMap[SelectionKey, Connection] private val connectionsById = new HashMap[ConnectionManagerId, SendingConnection] with SynchronizedMap[ConnectionManagerId, SendingConnection] - private val messageStatuses = new HashMap[Int, MessageStatus] + // Tracks sent messages for which we are awaiting acknowledgements. Entries are added to this + // map when messages are sent and are removed when acknowledgement messages are received or when + // acknowledgement timeouts expire + private val messageStatuses = new HashMap[Int, MessageStatus] // [MessageId, MessageStatus] private val keyInterestChangeRequests = new SynchronizedQueue[(SelectionKey, Int)] private val registerRequests = new SynchronizedQueue[SendingConnection] @@ -899,22 +904,41 @@ private[nio] class ConnectionManager( : Future[Message] = { val promise = Promise[Message]() -val timeoutTask = new TimerTask { - override def run(): Unit = { +