svn commit: r24128 - in /dev/spark/2.3.0-SNAPSHOT-2018_01_10_22_01-317b0aa-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Thu Jan 11 06:16:07 2018 New Revision: 24128 Log: Apache Spark 2.3.0-SNAPSHOT-2018_01_10_22_01-317b0aa docs [This commit notification would consist of 1439 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r24127 - in /dev/spark/2.3.0-SNAPSHOT-2018_01_10_20_01-a6647ff-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Thu Jan 11 04:15:17 2018 New Revision: 24127 Log: Apache Spark 2.3.0-SNAPSHOT-2018_01_10_20_01-a6647ff docs [This commit notification would consist of 1439 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-22587] Spark job fails if fs.defaultFS and application jar are different url
Repository: spark Updated Branches: refs/heads/branch-2.3 551ccfba5 -> 317b0aaed [SPARK-22587] Spark job fails if fs.defaultFS and application jar are different url ## What changes were proposed in this pull request? Two filesystems comparing does not consider the authority of URI. This is specific for WASB file storage system, where userInfo is honored to differentiate filesystems. For example: wasbs://user1xyz.net, wasbs://user2xyz.net would consider as two filesystem. Therefore, we have to add the authority to compare two filesystem, and two filesystem with different authority can not be the same FS. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Mingjie Tang Closes #19885 from merlintang/EAR-7377. (cherry picked from commit a6647ffbf7a312a3e119a9beef90880cc915aa60) Signed-off-by: jerryshao Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/317b0aae Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/317b0aae Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/317b0aae Branch: refs/heads/branch-2.3 Commit: 317b0aaed83e4bbf66f63ddc0d618da9f1f85085 Parents: 551ccfb Author: Mingjie Tang Authored: Thu Jan 11 11:51:03 2018 +0800 Committer: jerryshao Committed: Thu Jan 11 11:51:34 2018 +0800 -- .../org/apache/spark/deploy/yarn/Client.scala | 24 +++--- .../apache/spark/deploy/yarn/ClientSuite.scala | 33 2 files changed, 53 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/317b0aae/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala -- diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 15328d0..8cd3cd9 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -1421,15 +1421,20 @@ private object Client extends Logging { } /** - * Return whether the two file systems are the same. + * Return whether two URI represent file system are the same */ - private def compareFs(srcFs: FileSystem, destFs: FileSystem): Boolean = { -val srcUri = srcFs.getUri() -val dstUri = destFs.getUri() + private[spark] def compareUri(srcUri: URI, dstUri: URI): Boolean = { + if (srcUri.getScheme() == null || srcUri.getScheme() != dstUri.getScheme()) { return false } +val srcAuthority = srcUri.getAuthority() +val dstAuthority = dstUri.getAuthority() +if (srcAuthority != null && !srcAuthority.equalsIgnoreCase(dstAuthority)) { + return false +} + var srcHost = srcUri.getHost() var dstHost = dstUri.getHost() @@ -1447,6 +1452,17 @@ private object Client extends Logging { } Objects.equal(srcHost, dstHost) && srcUri.getPort() == dstUri.getPort() + + } + + /** + * Return whether the two file systems are the same. + */ + protected def compareFs(srcFs: FileSystem, destFs: FileSystem): Boolean = { +val srcUri = srcFs.getUri() +val dstUri = destFs.getUri() + +compareUri(srcUri, dstUri) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/317b0aae/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala -- diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index 9d5f5eb..7fa5971 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -357,6 +357,39 @@ class ClientSuite extends SparkFunSuite with Matchers { sparkConf.get(SECONDARY_JARS) should be (Some(Seq(new File(jar2.toURI).getName))) } + private val matching = Seq( +("files URI match test1", "file:///file1", "file:///file2"), +("files URI match test2", "file:///c:file1", "file://c:file2"), +("files URI match test3", "file://host/file1", "file://host/file2"), +("wasb URI match test", "wasb://bucket1@user", "wasb://bucket1@user/"), +("hdfs URI match test", "hdfs:/path1", "hdfs:/path1") + ) + + matching.foreach { t => + test(t._1) { +assert(Client.compareUri(new URI(t._2), new URI(t._3)), + s"No match between ${t._2} and ${t._3}") + } + } + + private val unmatching = Seq( +("files URI unmatch test1", "file:///file1", "file://host/file2"), +("files URI un
spark git commit: [SPARK-22587] Spark job fails if fs.defaultFS and application jar are different url
Repository: spark Updated Branches: refs/heads/master 9b33dfc40 -> a6647ffbf [SPARK-22587] Spark job fails if fs.defaultFS and application jar are different url ## What changes were proposed in this pull request? Two filesystems comparing does not consider the authority of URI. This is specific for WASB file storage system, where userInfo is honored to differentiate filesystems. For example: wasbs://user1xyz.net, wasbs://user2xyz.net would consider as two filesystem. Therefore, we have to add the authority to compare two filesystem, and two filesystem with different authority can not be the same FS. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Mingjie Tang Closes #19885 from merlintang/EAR-7377. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a6647ffb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a6647ffb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a6647ffb Branch: refs/heads/master Commit: a6647ffbf7a312a3e119a9beef90880cc915aa60 Parents: 9b33dfc Author: Mingjie Tang Authored: Thu Jan 11 11:51:03 2018 +0800 Committer: jerryshao Committed: Thu Jan 11 11:51:03 2018 +0800 -- .../org/apache/spark/deploy/yarn/Client.scala | 24 +++--- .../apache/spark/deploy/yarn/ClientSuite.scala | 33 2 files changed, 53 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a6647ffb/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala -- diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 15328d0..8cd3cd9 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -1421,15 +1421,20 @@ private object Client extends Logging { } /** - * Return whether the two file systems are the same. + * Return whether two URI represent file system are the same */ - private def compareFs(srcFs: FileSystem, destFs: FileSystem): Boolean = { -val srcUri = srcFs.getUri() -val dstUri = destFs.getUri() + private[spark] def compareUri(srcUri: URI, dstUri: URI): Boolean = { + if (srcUri.getScheme() == null || srcUri.getScheme() != dstUri.getScheme()) { return false } +val srcAuthority = srcUri.getAuthority() +val dstAuthority = dstUri.getAuthority() +if (srcAuthority != null && !srcAuthority.equalsIgnoreCase(dstAuthority)) { + return false +} + var srcHost = srcUri.getHost() var dstHost = dstUri.getHost() @@ -1447,6 +1452,17 @@ private object Client extends Logging { } Objects.equal(srcHost, dstHost) && srcUri.getPort() == dstUri.getPort() + + } + + /** + * Return whether the two file systems are the same. + */ + protected def compareFs(srcFs: FileSystem, destFs: FileSystem): Boolean = { +val srcUri = srcFs.getUri() +val dstUri = destFs.getUri() + +compareUri(srcUri, dstUri) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/a6647ffb/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala -- diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index 9d5f5eb..7fa5971 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -357,6 +357,39 @@ class ClientSuite extends SparkFunSuite with Matchers { sparkConf.get(SECONDARY_JARS) should be (Some(Seq(new File(jar2.toURI).getName))) } + private val matching = Seq( +("files URI match test1", "file:///file1", "file:///file2"), +("files URI match test2", "file:///c:file1", "file://c:file2"), +("files URI match test3", "file://host/file1", "file://host/file2"), +("wasb URI match test", "wasb://bucket1@user", "wasb://bucket1@user/"), +("hdfs URI match test", "hdfs:/path1", "hdfs:/path1") + ) + + matching.foreach { t => + test(t._1) { +assert(Client.compareUri(new URI(t._2), new URI(t._3)), + s"No match between ${t._2} and ${t._3}") + } + } + + private val unmatching = Seq( +("files URI unmatch test1", "file:///file1", "file://host/file2"), +("files URI unmatch test2", "file://host/file1", "file:///file2"), +("files URI unmatch test3", "file://host/file1
svn commit: r24124 - in /dev/spark/2.3.0-SNAPSHOT-2018_01_10_18_01-551ccfb-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Thu Jan 11 02:15:27 2018 New Revision: 24124 Log: Apache Spark 2.3.0-SNAPSHOT-2018_01_10_18_01-551ccfb docs [This commit notification would consist of 1439 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-23009][PYTHON] Fix for non-str col names to createDataFrame from Pandas
Repository: spark Updated Branches: refs/heads/branch-2.3 eb4fa551e -> 551ccfba5 [SPARK-23009][PYTHON] Fix for non-str col names to createDataFrame from Pandas ## What changes were proposed in this pull request? This the case when calling `SparkSession.createDataFrame` using a Pandas DataFrame that has non-str column labels. The column name conversion logic to handle non-string or unicode in python2 is: ``` if column is not any type of string: name = str(column) else if column is unicode in Python 2: name = column.encode('utf-8') ``` ## How was this patch tested? Added a new test with a Pandas DataFrame that has int column labels Author: Bryan Cutler Closes #20210 from BryanCutler/python-createDataFrame-int-col-error-SPARK-23009. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/551ccfba Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/551ccfba Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/551ccfba Branch: refs/heads/branch-2.3 Commit: 551ccfba529996e987c4d2e8d4dd61c4ab9a2e95 Parents: eb4fa55 Author: Bryan Cutler Authored: Wed Jan 10 14:55:24 2018 +0900 Committer: hyukjinkwon Committed: Thu Jan 11 09:46:50 2018 +0900 -- python/pyspark/sql/session.py | 4 +++- python/pyspark/sql/tests.py | 9 + 2 files changed, 12 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/551ccfba/python/pyspark/sql/session.py -- diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py index 3e45747..604021c 100644 --- a/python/pyspark/sql/session.py +++ b/python/pyspark/sql/session.py @@ -648,7 +648,9 @@ class SparkSession(object): # If no schema supplied by user then get the names of columns only if schema is None: -schema = [x.encode('utf-8') if not isinstance(x, str) else x for x in data.columns] +schema = [str(x) if not isinstance(x, basestring) else + (x.encode('utf-8') if not isinstance(x, str) else x) + for x in data.columns] if self.conf.get("spark.sql.execution.arrow.enabled", "false").lower() == "true" \ and len(data) > 0: http://git-wip-us.apache.org/repos/asf/spark/blob/551ccfba/python/pyspark/sql/tests.py -- diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 13576ff..80a94a9 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -3532,6 +3532,15 @@ class ArrowTests(ReusedSQLTestCase): self.assertTrue(expected[r][e] == result_arrow[r][e] and result[r][e] == result_arrow[r][e]) +def test_createDataFrame_with_int_col_names(self): +import numpy as np +import pandas as pd +pdf = pd.DataFrame(np.random.rand(4, 2)) +df, df_arrow = self._createDataFrame_toggle(pdf) +pdf_col_names = [str(c) for c in pdf.columns] +self.assertEqual(pdf_col_names, df.columns) +self.assertEqual(pdf_col_names, df_arrow.columns) + @unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not installed") class PandasUDFTests(ReusedSQLTestCase): - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r24120 - in /dev/spark/2.3.0-SNAPSHOT-2018_01_10_16_01-9b33dfc-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Thu Jan 11 00:15:19 2018 New Revision: 24120 Log: Apache Spark 2.3.0-SNAPSHOT-2018_01_10_16_01-9b33dfc docs [This commit notification would consist of 1439 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-22951][SQL] fix aggregation after dropDuplicates on empty data frames
Repository: spark Updated Branches: refs/heads/master 344e3aab8 -> 9b33dfc40 [SPARK-22951][SQL] fix aggregation after dropDuplicates on empty data frames ## What changes were proposed in this pull request? (courtesy of liancheng) Spark SQL supports both global aggregation and grouping aggregation. Global aggregation always return a single row with the initial aggregation state as the output, even there are zero input rows. Spark implements this by simply checking the number of grouping keys and treats an aggregation as a global aggregation if it has zero grouping keys. However, this simple principle drops the ball in the following case: ```scala spark.emptyDataFrame.dropDuplicates().agg(count($"*") as "c").show() // +---+ // | c | // +---+ // | 1 | // +---+ ``` The reason is that: 1. `df.dropDuplicates()` is roughly translated into something equivalent to: ```scala val allColumns = df.columns.map { col } df.groupBy(allColumns: _*).agg(allColumns.head, allColumns.tail: _*) ``` This translation is implemented in the rule `ReplaceDeduplicateWithAggregate`. 2. `spark.emptyDataFrame` contains zero columns and zero rows. Therefore, rule `ReplaceDeduplicateWithAggregate` makes a confusing transformation roughly equivalent to the following one: ```scala spark.emptyDataFrame.dropDuplicates() => spark.emptyDataFrame.groupBy().agg(Map.empty[String, String]) ``` The above transformation is confusing because the resulting aggregate operator contains no grouping keys (because `emptyDataFrame` contains no columns), and gets recognized as a global aggregation. As a result, Spark SQL allocates a single row filled by the initial aggregation state and uses it as the output, and returns a wrong result. To fix this issue, this PR tweaks `ReplaceDeduplicateWithAggregate` by appending a literal `1` to the grouping key list of the resulting `Aggregate` operator when the input plan contains zero output columns. In this way, `spark.emptyDataFrame.dropDuplicates()` is now translated into a grouping aggregation, roughly depicted as: ```scala spark.emptyDataFrame.dropDuplicates() => spark.emptyDataFrame.groupBy(lit(1)).agg(Map.empty[String, String]) ``` Which is now properly treated as a grouping aggregation and returns the correct answer. ## How was this patch tested? New unit tests added Author: Feng Liu Closes #20174 from liufengdb/fix-duplicate. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9b33dfc4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9b33dfc4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9b33dfc4 Branch: refs/heads/master Commit: 9b33dfc408de986f4203bb0ac0c3f5c56effd69d Parents: 344e3aa Author: Feng Liu Authored: Wed Jan 10 14:25:04 2018 -0800 Committer: Cheng Lian Committed: Wed Jan 10 14:25:04 2018 -0800 -- .../sql/catalyst/optimizer/Optimizer.scala | 8 ++- .../optimizer/ReplaceOperatorSuite.scala| 10 +++- .../spark/sql/DataFrameAggregateSuite.scala | 24 ++-- 3 files changed, 38 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9b33dfc4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index df0af82..c794ba8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1222,7 +1222,13 @@ object ReplaceDeduplicateWithAggregate extends Rule[LogicalPlan] { Alias(new First(attr).toAggregateExpression(), attr.name)(attr.exprId) } } - Aggregate(keys, aggCols, child) + // SPARK-22951: Physical aggregate operators distinguishes global aggregation and grouping + // aggregations by checking the number of grouping keys. The key difference here is that a + // global aggregation always returns at least one row even if there are no input rows. Here + // we append a literal when the grouping key list is empty so that the result aggregate + // operator is properly treated as a grouping aggregation. + val nonemptyKeys = if (keys.isEmpty) Literal(1) :: Nil else keys + Aggregate(nonemptyKeys, aggCols, child) } } http://git-wip-us.apache.org/repos/asf/spark/blob/9b33dfc4/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/c
spark git commit: [SPARK-22951][SQL] fix aggregation after dropDuplicates on empty data frames
Repository: spark Updated Branches: refs/heads/branch-2.3 5b5851cb6 -> eb4fa551e [SPARK-22951][SQL] fix aggregation after dropDuplicates on empty data frames ## What changes were proposed in this pull request? (courtesy of liancheng) Spark SQL supports both global aggregation and grouping aggregation. Global aggregation always return a single row with the initial aggregation state as the output, even there are zero input rows. Spark implements this by simply checking the number of grouping keys and treats an aggregation as a global aggregation if it has zero grouping keys. However, this simple principle drops the ball in the following case: ```scala spark.emptyDataFrame.dropDuplicates().agg(count($"*") as "c").show() // +---+ // | c | // +---+ // | 1 | // +---+ ``` The reason is that: 1. `df.dropDuplicates()` is roughly translated into something equivalent to: ```scala val allColumns = df.columns.map { col } df.groupBy(allColumns: _*).agg(allColumns.head, allColumns.tail: _*) ``` This translation is implemented in the rule `ReplaceDeduplicateWithAggregate`. 2. `spark.emptyDataFrame` contains zero columns and zero rows. Therefore, rule `ReplaceDeduplicateWithAggregate` makes a confusing transformation roughly equivalent to the following one: ```scala spark.emptyDataFrame.dropDuplicates() => spark.emptyDataFrame.groupBy().agg(Map.empty[String, String]) ``` The above transformation is confusing because the resulting aggregate operator contains no grouping keys (because `emptyDataFrame` contains no columns), and gets recognized as a global aggregation. As a result, Spark SQL allocates a single row filled by the initial aggregation state and uses it as the output, and returns a wrong result. To fix this issue, this PR tweaks `ReplaceDeduplicateWithAggregate` by appending a literal `1` to the grouping key list of the resulting `Aggregate` operator when the input plan contains zero output columns. In this way, `spark.emptyDataFrame.dropDuplicates()` is now translated into a grouping aggregation, roughly depicted as: ```scala spark.emptyDataFrame.dropDuplicates() => spark.emptyDataFrame.groupBy(lit(1)).agg(Map.empty[String, String]) ``` Which is now properly treated as a grouping aggregation and returns the correct answer. ## How was this patch tested? New unit tests added Author: Feng Liu Closes #20174 from liufengdb/fix-duplicate. (cherry picked from commit 9b33dfc408de986f4203bb0ac0c3f5c56effd69d) Signed-off-by: Cheng Lian Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/eb4fa551 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/eb4fa551 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/eb4fa551 Branch: refs/heads/branch-2.3 Commit: eb4fa551e60800269a939b2c1c0ad69e3a801264 Parents: 5b5851c Author: Feng Liu Authored: Wed Jan 10 14:25:04 2018 -0800 Committer: Cheng Lian Committed: Wed Jan 10 14:25:33 2018 -0800 -- .../sql/catalyst/optimizer/Optimizer.scala | 8 ++- .../optimizer/ReplaceOperatorSuite.scala| 10 +++- .../spark/sql/DataFrameAggregateSuite.scala | 24 ++-- 3 files changed, 38 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/eb4fa551/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index df0af82..c794ba8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1222,7 +1222,13 @@ object ReplaceDeduplicateWithAggregate extends Rule[LogicalPlan] { Alias(new First(attr).toAggregateExpression(), attr.name)(attr.exprId) } } - Aggregate(keys, aggCols, child) + // SPARK-22951: Physical aggregate operators distinguishes global aggregation and grouping + // aggregations by checking the number of grouping keys. The key difference here is that a + // global aggregation always returns at least one row even if there are no input rows. Here + // we append a literal when the grouping key list is empty so that the result aggregate + // operator is properly treated as a grouping aggregation. + val nonemptyKeys = if (keys.isEmpty) Literal(1) :: Nil else keys + Aggregate(nonemptyKeys, aggCols, child) } } http://git-wip-us.apache.org/repos/asf/spark/blob/eb4fa551/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala --
svn commit: r24116 - in /dev/spark/2.3.0-SNAPSHOT-2018_01_10_12_01-344e3aa-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Wed Jan 10 20:15:34 2018 New Revision: 24116 Log: Apache Spark 2.3.0-SNAPSHOT-2018_01_10_12_01-344e3aa docs [This commit notification would consist of 1439 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r24115 - in /dev/spark/2.3.0-SNAPSHOT-2018_01_10_10_01-5b5851c-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Wed Jan 10 18:15:40 2018 New Revision: 24115 Log: Apache Spark 2.3.0-SNAPSHOT-2018_01_10_10_01-5b5851c docs [This commit notification would consist of 1439 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-23019][CORE] Wait until SparkContext.stop() finished in SparkLauncherSuite
Repository: spark Updated Branches: refs/heads/master f340b6b30 -> 344e3aab8 [SPARK-23019][CORE] Wait until SparkContext.stop() finished in SparkLauncherSuite ## What changes were proposed in this pull request? In current code ,the function `waitFor` call https://github.com/apache/spark/blob/cfcd746689c2b84824745fa6d327ffb584c7a17d/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java#L155 only wait until DAGScheduler is stopped, while SparkContext.clearActiveContext may not be called yet. https://github.com/apache/spark/blob/1c9f95cb771ac78775a77edd1abfeb2d8ae2a124/core/src/main/scala/org/apache/spark/SparkContext.scala#L1924 Thus, in the Jenkins test https://amplab.cs.berkeley.edu/jenkins/job/spark-branch-2.3-test-maven-hadoop-2.6/ , `JdbcRDDSuite` failed because the previous test `SparkLauncherSuite` exit before SparkContext.stop() is finished. To repo: ``` $ build/sbt > project core > testOnly *SparkLauncherSuite *JavaJdbcRDDSuite ``` To Fix: Wait for a reasonable amount of time to avoid creating two active SparkContext in JVM in SparkLauncherSuite. Can' come up with any better solution for now. ## How was this patch tested? Unit test Author: Wang Gengliang Closes #20221 from gengliangwang/SPARK-23019. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/344e3aab Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/344e3aab Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/344e3aab Branch: refs/heads/master Commit: 344e3aab87178e45957333479a07e07f202ca1fd Parents: f340b6b Author: Wang Gengliang Authored: Wed Jan 10 09:44:30 2018 -0800 Committer: Marcelo Vanzin Committed: Wed Jan 10 09:44:30 2018 -0800 -- .../test/java/org/apache/spark/launcher/SparkLauncherSuite.java | 5 + 1 file changed, 5 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/344e3aab/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java -- diff --git a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java index c2261c2..9d2f563 100644 --- a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java +++ b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java @@ -23,6 +23,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.TimeUnit; import org.junit.Test; import static org.junit.Assert.*; @@ -133,6 +134,10 @@ public class SparkLauncherSuite extends BaseSuite { p.put(e.getKey(), e.getValue()); } System.setProperties(p); + // Here DAGScheduler is stopped, while SparkContext.clearActiveContext may not be called yet. + // Wait for a reasonable amount of time to avoid creating two active SparkContext in JVM. + // See SPARK-23019 and SparkContext.stop() for details. + TimeUnit.MILLISECONDS.sleep(500); } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-23019][CORE] Wait until SparkContext.stop() finished in SparkLauncherSuite
Repository: spark Updated Branches: refs/heads/branch-2.3 60d4d79bb -> 5b5851cb6 [SPARK-23019][CORE] Wait until SparkContext.stop() finished in SparkLauncherSuite ## What changes were proposed in this pull request? In current code ,the function `waitFor` call https://github.com/apache/spark/blob/cfcd746689c2b84824745fa6d327ffb584c7a17d/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java#L155 only wait until DAGScheduler is stopped, while SparkContext.clearActiveContext may not be called yet. https://github.com/apache/spark/blob/1c9f95cb771ac78775a77edd1abfeb2d8ae2a124/core/src/main/scala/org/apache/spark/SparkContext.scala#L1924 Thus, in the Jenkins test https://amplab.cs.berkeley.edu/jenkins/job/spark-branch-2.3-test-maven-hadoop-2.6/ , `JdbcRDDSuite` failed because the previous test `SparkLauncherSuite` exit before SparkContext.stop() is finished. To repo: ``` $ build/sbt > project core > testOnly *SparkLauncherSuite *JavaJdbcRDDSuite ``` To Fix: Wait for a reasonable amount of time to avoid creating two active SparkContext in JVM in SparkLauncherSuite. Can' come up with any better solution for now. ## How was this patch tested? Unit test Author: Wang Gengliang Closes #20221 from gengliangwang/SPARK-23019. (cherry picked from commit 344e3aab87178e45957333479a07e07f202ca1fd) Signed-off-by: Marcelo Vanzin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5b5851cb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5b5851cb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5b5851cb Branch: refs/heads/branch-2.3 Commit: 5b5851cb685f395574c94174d45a47c4fbf946c8 Parents: 60d4d79 Author: Wang Gengliang Authored: Wed Jan 10 09:44:30 2018 -0800 Committer: Marcelo Vanzin Committed: Wed Jan 10 09:44:50 2018 -0800 -- .../test/java/org/apache/spark/launcher/SparkLauncherSuite.java | 5 + 1 file changed, 5 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5b5851cb/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java -- diff --git a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java index c2261c2..9d2f563 100644 --- a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java +++ b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java @@ -23,6 +23,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.TimeUnit; import org.junit.Test; import static org.junit.Assert.*; @@ -133,6 +134,10 @@ public class SparkLauncherSuite extends BaseSuite { p.put(e.getKey(), e.getValue()); } System.setProperties(p); + // Here DAGScheduler is stopped, while SparkContext.clearActiveContext may not be called yet. + // Wait for a reasonable amount of time to avoid creating two active SparkContext in JVM. + // See SPARK-23019 and SparkContext.stop() for details. + TimeUnit.MILLISECONDS.sleep(500); } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-22972] Couldn't find corresponding Hive SerDe for data source provider org.apache.spark.sql.hive.orc
Repository: spark Updated Branches: refs/heads/branch-2.2 24f1f2a54 -> 0d943d96b [SPARK-22972] Couldn't find corresponding Hive SerDe for data source provider org.apache.spark.sql.hive.orc ## What changes were proposed in this pull request? Fix the warning: Couldn't find corresponding Hive SerDe for data source provider org.apache.spark.sql.hive.orc. This PR is for branch-2.2 and cherry-pick from https://github.com/apache/spark/commit/8032cf852fccd0ab8754f633affdc9ba8fc99e58 The old PR is https://github.com/apache/spark/pull/20165 ## How was this patch tested? Please see test("SPARK-22972: hive orc source") Author: xubo245 <601450...@qq.com> Closes #20195 from xubo245/HiveSerDeForBranch2.2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0d943d96 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0d943d96 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0d943d96 Branch: refs/heads/branch-2.2 Commit: 0d943d96b3e2cbd663177e8cab2829fefd18411a Parents: 24f1f2a Author: xubo245 <601450...@qq.com> Authored: Wed Jan 10 23:27:45 2018 +0800 Committer: gatorsmile Committed: Wed Jan 10 23:27:45 2018 +0800 -- .../apache/spark/sql/internal/HiveSerDe.scala | 1 + .../spark/sql/hive/orc/OrcSourceSuite.scala | 32 +++- 2 files changed, 32 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0d943d96/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala index b9515ec..dac4636 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala @@ -73,6 +73,7 @@ object HiveSerDe { val key = source.toLowerCase(Locale.ROOT) match { case s if s.startsWith("org.apache.spark.sql.parquet") => "parquet" case s if s.startsWith("org.apache.spark.sql.orc") => "orc" + case s if s.startsWith("org.apache.spark.sql.hive.orc") => "orc" case s if s.equals("orcfile") => "orc" case s if s.equals("parquetfile") => "parquet" case s if s.equals("avrofile") => "avro" http://git-wip-us.apache.org/repos/asf/spark/blob/0d943d96/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala index 6bfb88c..a562de4 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala @@ -22,9 +22,12 @@ import java.io.File import org.scalatest.BeforeAndAfterAll import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.hive.HiveExternalCatalog import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.internal.HiveSerDe import org.apache.spark.sql.sources._ +import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -197,7 +200,7 @@ abstract class OrcSuite extends QueryTest with TestHiveSingleton with BeforeAndA } } -class OrcSourceSuite extends OrcSuite { +class OrcSourceSuite extends OrcSuite with SQLTestUtils{ override def beforeAll(): Unit = { super.beforeAll() @@ -250,4 +253,31 @@ class OrcSourceSuite extends OrcSuite { )).get.toString } } + + test("SPARK-22972: hive orc source") { +val tableName = "normal_orc_as_source_hive" +withTable(tableName) { + spark.sql( +s""" + |CREATE TABLE $tableName + |USING org.apache.spark.sql.hive.orc + |OPTIONS ( + | PATH '${new File(orcTableAsDir.getAbsolutePath).toURI}' + |) +""".stripMargin) + + val tableMetadata = spark.sessionState.catalog.getTableMetadata( +TableIdentifier(tableName)) + assert(tableMetadata.storage.inputFormat == +Option("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat")) + assert(tableMetadata.storage.outputFormat == +Option("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat")) + assert(tableMetadata.storage.serde == +Option("org.apache.hadoop.hive.ql.io.orc.OrcSerde")) + assert(HiveSerDe.sourceToSerDe("org.apache.spark.sql.hive.orc") +.equals(HiveSerDe.sourceToSerDe("orc"))) + assert(HiveSerDe.sourceToSerDe("org.apache.spark.sql.orc") +.equals(
svn commit: r24114 - in /dev/spark/2.3.0-SNAPSHOT-2018_01_10_04_01-f340b6b-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Wed Jan 10 12:15:16 2018 New Revision: 24114 Log: Apache Spark 2.3.0-SNAPSHOT-2018_01_10_04_01-f340b6b docs [This commit notification would consist of 1439 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-22997] Add additional defenses against use of freed MemoryBlocks
Repository: spark Updated Branches: refs/heads/branch-2.3 2db523959 -> 60d4d79bb [SPARK-22997] Add additional defenses against use of freed MemoryBlocks ## What changes were proposed in this pull request? This patch modifies Spark's `MemoryAllocator` implementations so that `free(MemoryBlock)` mutates the passed block to clear pointers (in the off-heap case) or null out references to backing `long[]` arrays (in the on-heap case). The goal of this change is to add an extra layer of defense against use-after-free bugs because currently it's hard to detect corruption caused by blind writes to freed memory blocks. ## How was this patch tested? New unit tests in `PlatformSuite`, including new tests for existing functionality because we did not have sufficient mutation coverage of the on-heap memory allocator's pooling logic. Author: Josh Rosen Closes #20191 from JoshRosen/SPARK-22997-add-defenses-against-use-after-free-bugs-in-memory-allocator. (cherry picked from commit f340b6b3066033d40b7e163fd5fb68e9820adfb1) Signed-off-by: Josh Rosen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/60d4d79b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/60d4d79b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/60d4d79b Branch: refs/heads/branch-2.3 Commit: 60d4d79bb40f13c68773a0224f2003cdca28c138 Parents: 2db5239 Author: Josh Rosen Authored: Wed Jan 10 00:45:47 2018 -0800 Committer: Josh Rosen Committed: Wed Jan 10 00:46:27 2018 -0800 -- .../unsafe/memory/HeapMemoryAllocator.java | 35 ++ .../apache/spark/unsafe/memory/MemoryBlock.java | 21 +++- .../unsafe/memory/UnsafeMemoryAllocator.java| 11 + .../apache/spark/unsafe/PlatformUtilSuite.java | 50 +++- .../apache/spark/memory/TaskMemoryManager.java | 13 - .../spark/memory/TaskMemoryManagerSuite.java| 29 6 files changed, 146 insertions(+), 13 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/60d4d79b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java -- diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java index cc9cc42..3acfe36 100644 --- a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java +++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java @@ -31,8 +31,7 @@ import org.apache.spark.unsafe.Platform; public class HeapMemoryAllocator implements MemoryAllocator { @GuardedBy("this") - private final Map>> bufferPoolsBySize = -new HashMap<>(); + private final Map>> bufferPoolsBySize = new HashMap<>(); private static final int POOLING_THRESHOLD_BYTES = 1024 * 1024; @@ -49,13 +48,14 @@ public class HeapMemoryAllocator implements MemoryAllocator { public MemoryBlock allocate(long size) throws OutOfMemoryError { if (shouldPool(size)) { synchronized (this) { -final LinkedList> pool = bufferPoolsBySize.get(size); +final LinkedList> pool = bufferPoolsBySize.get(size); if (pool != null) { while (!pool.isEmpty()) { -final WeakReference blockReference = pool.pop(); -final MemoryBlock memory = blockReference.get(); -if (memory != null) { - assert (memory.size() == size); +final WeakReference arrayReference = pool.pop(); +final long[] array = arrayReference.get(); +if (array != null) { + assert (array.length * 8L >= size); + MemoryBlock memory = new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, size); if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) { memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE); } @@ -76,18 +76,35 @@ public class HeapMemoryAllocator implements MemoryAllocator { @Override public void free(MemoryBlock memory) { +assert (memory.obj != null) : + "baseObject was null; are you trying to use the on-heap allocator to free off-heap memory?"; +assert (memory.pageNumber != MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER) : + "page has already been freed"; +assert ((memory.pageNumber == MemoryBlock.NO_PAGE_NUMBER) +|| (memory.pageNumber == MemoryBlock.FREED_IN_TMM_PAGE_NUMBER)) : + "TMM-allocated pages must first be freed via TMM.freePage(), not directly in allocator free()"; + final long size = memory.size(); if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) { memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_FREED_VALUE); } + +// Mark the page a
spark git commit: [SPARK-22997] Add additional defenses against use of freed MemoryBlocks
Repository: spark Updated Branches: refs/heads/master 70bcc9d5a -> f340b6b30 [SPARK-22997] Add additional defenses against use of freed MemoryBlocks ## What changes were proposed in this pull request? This patch modifies Spark's `MemoryAllocator` implementations so that `free(MemoryBlock)` mutates the passed block to clear pointers (in the off-heap case) or null out references to backing `long[]` arrays (in the on-heap case). The goal of this change is to add an extra layer of defense against use-after-free bugs because currently it's hard to detect corruption caused by blind writes to freed memory blocks. ## How was this patch tested? New unit tests in `PlatformSuite`, including new tests for existing functionality because we did not have sufficient mutation coverage of the on-heap memory allocator's pooling logic. Author: Josh Rosen Closes #20191 from JoshRosen/SPARK-22997-add-defenses-against-use-after-free-bugs-in-memory-allocator. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f340b6b3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f340b6b3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f340b6b3 Branch: refs/heads/master Commit: f340b6b3066033d40b7e163fd5fb68e9820adfb1 Parents: 70bcc9d Author: Josh Rosen Authored: Wed Jan 10 00:45:47 2018 -0800 Committer: Josh Rosen Committed: Wed Jan 10 00:45:47 2018 -0800 -- .../unsafe/memory/HeapMemoryAllocator.java | 35 ++ .../apache/spark/unsafe/memory/MemoryBlock.java | 21 +++- .../unsafe/memory/UnsafeMemoryAllocator.java| 11 + .../apache/spark/unsafe/PlatformUtilSuite.java | 50 +++- .../apache/spark/memory/TaskMemoryManager.java | 13 - .../spark/memory/TaskMemoryManagerSuite.java| 29 6 files changed, 146 insertions(+), 13 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f340b6b3/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java -- diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java index cc9cc42..3acfe36 100644 --- a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java +++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java @@ -31,8 +31,7 @@ import org.apache.spark.unsafe.Platform; public class HeapMemoryAllocator implements MemoryAllocator { @GuardedBy("this") - private final Map>> bufferPoolsBySize = -new HashMap<>(); + private final Map>> bufferPoolsBySize = new HashMap<>(); private static final int POOLING_THRESHOLD_BYTES = 1024 * 1024; @@ -49,13 +48,14 @@ public class HeapMemoryAllocator implements MemoryAllocator { public MemoryBlock allocate(long size) throws OutOfMemoryError { if (shouldPool(size)) { synchronized (this) { -final LinkedList> pool = bufferPoolsBySize.get(size); +final LinkedList> pool = bufferPoolsBySize.get(size); if (pool != null) { while (!pool.isEmpty()) { -final WeakReference blockReference = pool.pop(); -final MemoryBlock memory = blockReference.get(); -if (memory != null) { - assert (memory.size() == size); +final WeakReference arrayReference = pool.pop(); +final long[] array = arrayReference.get(); +if (array != null) { + assert (array.length * 8L >= size); + MemoryBlock memory = new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, size); if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) { memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE); } @@ -76,18 +76,35 @@ public class HeapMemoryAllocator implements MemoryAllocator { @Override public void free(MemoryBlock memory) { +assert (memory.obj != null) : + "baseObject was null; are you trying to use the on-heap allocator to free off-heap memory?"; +assert (memory.pageNumber != MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER) : + "page has already been freed"; +assert ((memory.pageNumber == MemoryBlock.NO_PAGE_NUMBER) +|| (memory.pageNumber == MemoryBlock.FREED_IN_TMM_PAGE_NUMBER)) : + "TMM-allocated pages must first be freed via TMM.freePage(), not directly in allocator free()"; + final long size = memory.size(); if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) { memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_FREED_VALUE); } + +// Mark the page as freed (so we can detect double-frees). +memory.pageNumber = MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUM
svn commit: r24111 - in /dev/spark/2.3.0-SNAPSHOT-2018_01_10_00_01-70bcc9d-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Wed Jan 10 08:16:20 2018 New Revision: 24111 Log: Apache Spark 2.3.0-SNAPSHOT-2018_01_10_00_01-70bcc9d docs [This commit notification would consist of 1439 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org