spark git commit: [SPARK-14932][SQL] Allow DataFrame.replace() to replace values with None
Repository: spark Updated Branches: refs/heads/master c06f3f5ac -> 84454d7d3 [SPARK-14932][SQL] Allow DataFrame.replace() to replace values with None ## What changes were proposed in this pull request? Currently `df.na.replace("*", Map[String, String]("NULL" -> null))` will produce exception. This PR enables passing null/None as value in the replacement map in DataFrame.replace(). Note that the replacement map keys and values should still be the same type, while the values can have a mix of null/None and that type. This PR enables following operations for example: `df.na.replace("*", Map[String, String]("NULL" -> null))`(scala) `df.na.replace("*", Map[Any, Any](60 -> null, 70 -> 80))`(scala) `df.na.replace('Alice', None)`(python) `df.na.replace([10, 20])`(python, replacing with None is by default) One use case could be: I want to replace all the empty strings with null/None because they were incorrectly generated and then drop all null/None data `df.na.replace("*", Map("" -> null)).na.drop()`(scala) `df.replace(u'', None).dropna()`(python) ## How was this patch tested? Scala unit test. Python doctest and unit test. Author: bravo-zhangCloses #18820 from bravo-zhang/spark-14932. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/84454d7d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/84454d7d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/84454d7d Branch: refs/heads/master Commit: 84454d7d33363a41adf242c8a81ffca20769c55c Parents: c06f3f5 Author: bravo-zhang Authored: Wed Aug 9 17:42:21 2017 -0700 Committer: gatorsmile Committed: Wed Aug 9 17:42:21 2017 -0700 -- python/pyspark/sql/dataframe.py | 35 +++- python/pyspark/sql/tests.py | 15 ++ .../apache/spark/sql/DataFrameNaFunctions.scala | 57 +++- .../spark/sql/DataFrameNaFunctionsSuite.scala | 43 +++ 4 files changed, 113 insertions(+), 37 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/84454d7d/python/pyspark/sql/dataframe.py -- diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 944739b..edc7ca6 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -1362,8 +1362,8 @@ class DataFrame(object): """Returns a new :class:`DataFrame` replacing a value with another value. :func:`DataFrame.replace` and :func:`DataFrameNaFunctions.replace` are aliases of each other. -Values to_replace and value should contain either all numerics, all booleans, -or all strings. When replacing, the new value will be cast +Values to_replace and value must have the same type and can only be numerics, booleans, +or strings. Value can have None. When replacing, the new value will be cast to the type of the existing column. For numeric replacements all values to be replaced should have unique floating point representation. In case of conflicts (for example with `{42: -1, 42.0: 1}`) @@ -1373,8 +1373,8 @@ class DataFrame(object): Value to be replaced. If the value is a dict, then `value` is ignored and `to_replace` must be a mapping between a value and a replacement. -:param value: int, long, float, string, or list. -The replacement value must be an int, long, float, or string. If `value` is a +:param value: bool, int, long, float, string, list or None. +The replacement value must be a bool, int, long, float, string or None. If `value` is a list, `value` should be of the same length and type as `to_replace`. If `value` is a scalar and `to_replace` is a sequence, then `value` is used as a replacement for each item in `to_replace`. @@ -1393,6 +1393,16 @@ class DataFrame(object): |null| null| null| ++--+-+ +>>> df4.na.replace('Alice', None).show() +++--++ +| age|height|name| +++--++ +| 10|80|null| +| 5| null| Bob| +|null| null| Tom| +|null| null|null| +++--++ + >>> df4.na.replace(['Alice', 'Bob'], ['A', 'B'], 'name').show() ++--++ | age|height|name| @@ -1425,12 +1435,13 @@ class DataFrame(object): valid_types = (bool, float, int, long, basestring, list, tuple) if not isinstance(to_replace, valid_types + (dict, )): raise ValueError( -"to_replace should be a float, int, long, string, list, tuple, or dict. " +
spark git commit: [SPARK-21551][PYTHON] Increase timeout for PythonRDD.serveIterator
Repository: spark Updated Branches: refs/heads/master 0fb73253f -> c06f3f5ac [SPARK-21551][PYTHON] Increase timeout for PythonRDD.serveIterator ## What changes were proposed in this pull request? This modification increases the timeout for `serveIterator` (which is not dynamically configurable). This fixes timeout issues in pyspark when using `collect` and similar functions, in cases where Python may take more than a couple seconds to connect. See https://issues.apache.org/jira/browse/SPARK-21551 ## How was this patch tested? Ran the tests. cc rxin Author: peayCloses #18752 from peay/spark-21551. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c06f3f5a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c06f3f5a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c06f3f5a Branch: refs/heads/master Commit: c06f3f5ac500b02d38ca7ec5fcb33085e07f2f75 Parents: 0fb7325 Author: peay Authored: Wed Aug 9 14:03:18 2017 -0700 Committer: Reynold Xin Committed: Wed Aug 9 14:03:18 2017 -0700 -- .../src/main/scala/org/apache/spark/api/python/PythonRDD.scala | 6 +++--- python/pyspark/rdd.py | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c06f3f5a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 6a81752..3377101 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -683,7 +683,7 @@ private[spark] object PythonRDD extends Logging { * Create a socket server and a background thread to serve the data in `items`, * * The socket server can only accept one connection, or close if no connection - * in 3 seconds. + * in 15 seconds. * * Once a connection comes in, it tries to serialize all the data in `items` * and send them into this connection. @@ -692,8 +692,8 @@ private[spark] object PythonRDD extends Logging { */ def serveIterator[T](items: Iterator[T], threadName: String): Int = { val serverSocket = new ServerSocket(0, 1, InetAddress.getByName("localhost")) -// Close the socket if no connection in 3 seconds -serverSocket.setSoTimeout(3000) +// Close the socket if no connection in 15 seconds +serverSocket.setSoTimeout(15000) new Thread(threadName) { setDaemon(true) http://git-wip-us.apache.org/repos/asf/spark/blob/c06f3f5a/python/pyspark/rdd.py -- diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 3325b65..ea993c5 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -127,7 +127,7 @@ def _load_from_socket(port, serializer): af, socktype, proto, canonname, sa = res sock = socket.socket(af, socktype, proto) try: -sock.settimeout(3) +sock.settimeout(15) sock.connect(sa) except socket.error: sock.close() - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-21587][SS] Added filter pushdown through watermarks.
Repository: spark Updated Branches: refs/heads/master 2d799d080 -> 0fb73253f [SPARK-21587][SS] Added filter pushdown through watermarks. ## What changes were proposed in this pull request? Push filter predicates through EventTimeWatermark if they're deterministic and do not reference the watermarked attribute. (This is similar but not identical to the logic for pushing through UnaryNode.) ## How was this patch tested? unit tests Author: Jose TorresCloses #18790 from joseph-torres/SPARK-21587. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0fb73253 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0fb73253 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0fb73253 Branch: refs/heads/master Commit: 0fb73253fc832361d5d89ba85692ae653961e104 Parents: 2d799d0 Author: Jose Torres Authored: Wed Aug 9 12:50:04 2017 -0700 Committer: Tathagata Das Committed: Wed Aug 9 12:50:04 2017 -0700 -- .../sql/catalyst/optimizer/Optimizer.scala | 19 +++ .../optimizer/FilterPushdownSuite.scala | 57 2 files changed, 76 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0fb73253/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index d82af94..a51b385 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -867,6 +867,25 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper { filter } +case filter @ Filter(condition, watermark: EventTimeWatermark) => + // We can only push deterministic predicates which don't reference the watermark attribute. + // We could in theory span() only on determinism and pull out deterministic predicates + // on the watermark separately. But it seems unnecessary and a bit confusing to not simply + // use the prefix as we do for nondeterminism in other cases. + + val (pushDown, stayUp) = splitConjunctivePredicates(condition).span( +p => p.deterministic && !p.references.contains(watermark.eventTime)) + + if (pushDown.nonEmpty) { +val pushDownPredicate = pushDown.reduceLeft(And) +val newWatermark = watermark.copy(child = Filter(pushDownPredicate, watermark.child)) +// If there is no more filter to stay up, just eliminate the filter. +// Otherwise, create "Filter(stayUp) <- watermark <- Filter(pushDownPredicate)". +if (stayUp.isEmpty) newWatermark else Filter(stayUp.reduceLeft(And), newWatermark) + } else { +filter + } + case filter @ Filter(_, u: UnaryNode) if canPushThrough(u) && u.expressions.forall(_.deterministic) => pushDownPredicate(filter, u.child) { predicate => http://git-wip-us.apache.org/repos/asf/spark/blob/0fb73253/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index 3553d23..582b3ea 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.types.IntegerType +import org.apache.spark.unsafe.types.CalendarInterval class FilterPushdownSuite extends PlanTest { @@ -1134,4 +1135,60 @@ class FilterPushdownSuite extends PlanTest { comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer.analyze, checkAnalysis = false) } + + test("watermark pushdown: no pushdown on watermark attribute") { +val interval = new CalendarInterval(2, 2000L) + +// Verify that all conditions preceding the first watermark touching condition are pushed down +// by the optimizer and others are not. +val originalQuery = EventTimeWatermark('b, interval, testRelation) + .where('a === 5 && 'b === 10 && 'c === 5) +val correctAnswer = EventTimeWatermark( +
spark git commit: [SPARK-21504][SQL] Add spark version info into table metadata
Repository: spark Updated Branches: refs/heads/master b78cf13bf -> 2d799d080 [SPARK-21504][SQL] Add spark version info into table metadata ## What changes were proposed in this pull request? This PR is to add the spark version info in the table metadata. When creating the table, this value is assigned. It can help users find which version of Spark was used to create the table. ## How was this patch tested? N/A Author: gatorsmileCloses #18709 from gatorsmile/addVersion. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2d799d08 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2d799d08 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2d799d08 Branch: refs/heads/master Commit: 2d799d08081032828cc2c95cbf58a268653c7a05 Parents: b78cf13 Author: gatorsmile Authored: Wed Aug 9 08:46:25 2017 -0700 Committer: gatorsmile Committed: Wed Aug 9 08:46:25 2017 -0700 -- .../sql/catalyst/catalog/ExternalCatalog.scala| 4 +++- .../spark/sql/catalyst/catalog/interface.scala| 7 ++- .../spark/sql/catalyst/trees/TreeNodeSuite.scala | 4 +++- .../describe-table-after-alter-table.sql.out | 15 ++- .../resources/sql-tests/results/describe.sql.out | 18 -- .../sql-tests/results/show-tables.sql.out | 9 ++--- .../org/apache/spark/sql/SQLQueryTestSuite.scala | 3 ++- .../spark/sql/execution/command/DDLSuite.scala| 1 + .../spark/sql/hive/HiveExternalCatalog.scala | 12 +++- .../spark/sql/hive/execution/HiveDDLSuite.scala | 1 + 10 files changed, 55 insertions(+), 19 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2d799d08/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala index 6000d48..68644f4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala @@ -106,8 +106,10 @@ abstract class ExternalCatalog final def createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = { val db = tableDefinition.database val name = tableDefinition.identifier.table +val tableDefinitionWithVersion = + tableDefinition.copy(createVersion = org.apache.spark.SPARK_VERSION) postToAll(CreateTablePreEvent(db, name)) -doCreateTable(tableDefinition, ignoreIfExists) +doCreateTable(tableDefinitionWithVersion, ignoreIfExists) postToAll(CreateTableEvent(db, name)) } http://git-wip-us.apache.org/repos/asf/spark/blob/2d799d08/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 9531456..f865106 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -205,6 +205,9 @@ case class BucketSpec( * configured. * @param ignoredProperties is a list of table properties that are used by the underlying table * but ignored by Spark SQL yet. + * @param createVersion records the version of Spark that created this table metadata. The default + * is an empty string. We expect it will be read from the catalog or filled by + * ExternalCatalog.createTable. For temporary views, the value will be empty. */ case class CatalogTable( identifier: TableIdentifier, @@ -217,6 +220,7 @@ case class CatalogTable( owner: String = "", createTime: Long = System.currentTimeMillis, lastAccessTime: Long = -1, +createVersion: String = "", properties: Map[String, String] = Map.empty, stats: Option[CatalogStatistics] = None, viewText: Option[String] = None, @@ -302,8 +306,9 @@ case class CatalogTable( identifier.database.foreach(map.put("Database", _)) map.put("Table", identifier.table) if (owner.nonEmpty) map.put("Owner", owner) -map.put("Created", new Date(createTime).toString) +map.put("Created Time", new Date(createTime).toString) map.put("Last Access", new Date(lastAccessTime).toString) +map.put("Created By", "Spark "
spark git commit: [SPARK-21276][CORE] Update lz4-java to the latest (v1.4.0)
Repository: spark Updated Branches: refs/heads/master 83fe3b5e1 -> b78cf13bf [SPARK-21276][CORE] Update lz4-java to the latest (v1.4.0) ## What changes were proposed in this pull request? This pr updated `lz4-java` to the latest (v1.4.0) and removed custom `LZ4BlockInputStream`. We currently use custom `LZ4BlockInputStream` to read concatenated byte stream in shuffle. But, this functionality has been implemented in the latest lz4-java (https://github.com/lz4/lz4-java/pull/105). So, we might update the latest to remove the custom `LZ4BlockInputStream`. Major diffs between the latest release and v1.3.0 in the master are as follows (https://github.com/lz4/lz4-java/compare/62f7547abb0819d1ca1e669645ee1a9d26cd60b0...6d4693f56253fcddfad7b441bb8d917b182efa2d); - fixed NPE in XXHashFactory similarly - Don't place resources in default package to support shading - Fixes ByteBuffer methods failing to apply arrayOffset() for array-backed - Try to load lz4-java from java.library.path, then fallback to bundled - Add ppc64le binary - Add s390x JNI binding - Add basic LZ4 Frame v1.5.0 support - enable aarch64 support for lz4-java - Allow unsafeInstance() for ppc64le archiecture - Add unsafeInstance support for AArch64 - Support 64-bit JNI build on Solaris - Avoid over-allocating a buffer - Allow EndMark to be incompressible for LZ4FrameInputStream. - Concat byte stream ## How was this patch tested? Existing tests. Author: Takeshi YamamuroCloses #18883 from maropu/SPARK-21276. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b78cf13b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b78cf13b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b78cf13b Branch: refs/heads/master Commit: b78cf13bf05f0eadd7ae97df84b6e1505dc5ff9f Parents: 83fe3b5 Author: Takeshi Yamamuro Authored: Wed Aug 9 17:31:52 2017 +0200 Committer: Sean Owen Committed: Wed Aug 9 17:31:52 2017 +0200 -- core/pom.xml| 4 +- .../apache/spark/io/LZ4BlockInputStream.java| 260 --- .../org/apache/spark/io/CompressionCodec.scala | 7 +- dev/deps/spark-deps-hadoop-2.6 | 2 +- dev/deps/spark-deps-hadoop-2.7 | 2 +- external/kafka-0-10-assembly/pom.xml| 4 +- external/kafka-0-8-assembly/pom.xml | 4 +- pom.xml | 6 +- project/MimaExcludes.scala | 5 +- 9 files changed, 20 insertions(+), 274 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b78cf13b/core/pom.xml -- diff --git a/core/pom.xml b/core/pom.xml index bc6b1c4..431967e 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -190,8 +190,8 @@ snappy-java - net.jpountz.lz4 - lz4 + org.lz4 + lz4-java org.roaringbitmap http://git-wip-us.apache.org/repos/asf/spark/blob/b78cf13b/core/src/main/java/org/apache/spark/io/LZ4BlockInputStream.java -- diff --git a/core/src/main/java/org/apache/spark/io/LZ4BlockInputStream.java b/core/src/main/java/org/apache/spark/io/LZ4BlockInputStream.java deleted file mode 100644 index 9d6f06e..000 --- a/core/src/main/java/org/apache/spark/io/LZ4BlockInputStream.java +++ /dev/null @@ -1,260 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.io; - -import java.io.EOFException; -import java.io.FilterInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.util.zip.Checksum; - -import net.jpountz.lz4.LZ4Exception; -import net.jpountz.lz4.LZ4Factory; -import net.jpountz.lz4.LZ4FastDecompressor; -import net.jpountz.util.SafeUtils; -import net.jpountz.xxhash.XXHashFactory; - -/** - * {@link InputStream} implementation to decode data written with - * {@link net.jpountz.lz4.LZ4BlockOutputStream}. This class is not thread-safe and does not - * support {@link #mark(int)}/{@link #reset()}. - * @see net.jpountz.lz4.LZ4BlockOutputStream - * - * This is based on net.jpountz.lz4.LZ4BlockInputStream - * - * changes:
spark git commit: [SPARK-21665][CORE] Need to close resources after use
Repository: spark Updated Branches: refs/heads/master 6426adffa -> 83fe3b5e1 [SPARK-21665][CORE] Need to close resources after use ## What changes were proposed in this pull request? Resources in Core - SparkSubmitArguments.scala, Spark-launcher - AbstractCommandBuilder.java, resource-managers- YARN - Client.scala are released ## How was this patch tested? No new test cases added, Unit test have been passed Author: vinodkcCloses #18880 from vinodkc/br_fixresouceleak. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/83fe3b5e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/83fe3b5e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/83fe3b5e Branch: refs/heads/master Commit: 83fe3b5e10f8dc62245ea37143abb96be0f39805 Parents: 6426adf Author: vinodkc Authored: Wed Aug 9 15:18:38 2017 +0200 Committer: Sean Owen Committed: Wed Aug 9 15:18:38 2017 +0200 -- .../apache/spark/deploy/SparkSubmitArguments.scala | 9 + .../spark/launcher/AbstractCommandBuilder.java | 16 +++- 2 files changed, 8 insertions(+), 17 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/83fe3b5e/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index 3721b98..a7722e4 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -207,11 +207,12 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S uriScheme match { case "file" => try { -val jar = new JarFile(uri.getPath) -// Note that this might still return null if no main-class is set; we catch that later -mainClass = jar.getManifest.getMainAttributes.getValue("Main-Class") +Utils.tryWithResource(new JarFile(uri.getPath)) { jar => + // Note that this might still return null if no main-class is set; we catch that later + mainClass = jar.getManifest.getMainAttributes.getValue("Main-Class") +} } catch { -case e: Exception => +case _: Exception => SparkSubmit.printErrorAndExit(s"Cannot load main class from JAR $primaryResource") } case _ => http://git-wip-us.apache.org/repos/asf/spark/blob/83fe3b5e/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java -- diff --git a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java index 860ab35..44028c5 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java +++ b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java @@ -291,24 +291,14 @@ abstract class AbstractCommandBuilder { } if (propsFile.isFile()) { - FileInputStream fd = null; - try { -fd = new FileInputStream(propsFile); -props.load(new InputStreamReader(fd, StandardCharsets.UTF_8)); + try (InputStreamReader isr = new InputStreamReader( + new FileInputStream(propsFile), StandardCharsets.UTF_8)) { +props.load(isr); for (Map.Entry
spark git commit: [SPARK-21663][TESTS] test("remote fetch below max RPC message size") should call masterTracker.stop() in MapOutputTrackerSuite
Repository: spark Updated Branches: refs/heads/master b35660dd0 -> 6426adffa [SPARK-21663][TESTS] test("remote fetch below max RPC message size") should call masterTracker.stop() in MapOutputTrackerSuite Signed-off-by: 10087686 ## What changes were proposed in this pull request? After Unit tests endï¼there should be call masterTracker.stop() to free resource; (Please fill in changes proposed in this fix) ## How was this patch tested? Run Unit tests; (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. Author: 10087686Closes #18867 from wangjiaochun/mapout. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6426adff Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6426adff Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6426adff Branch: refs/heads/master Commit: 6426adffaf152651c30d481bb925d5025fd6130a Parents: b35660d Author: 10087686 Authored: Wed Aug 9 18:45:38 2017 +0800 Committer: Wenchen Fan Committed: Wed Aug 9 18:45:38 2017 +0800 -- .../src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6426adff/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index bc3d23e..493ae51 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -176,7 +176,8 @@ class MapOutputTrackerSuite extends SparkFunSuite { val masterTracker = newTrackerMaster(newConf) val rpcEnv = createRpcEnv("spark") val masterEndpoint = new MapOutputTrackerMasterEndpoint(rpcEnv, masterTracker, newConf) -rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME, masterEndpoint) +masterTracker.trackerEndpoint = + rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME, masterEndpoint) // Message size should be ~123B, and no exception should be thrown masterTracker.registerShuffle(10, 1) @@ -191,7 +192,7 @@ class MapOutputTrackerSuite extends SparkFunSuite { verify(rpcCallContext, timeout(3)).reply(any()) assert(0 == masterTracker.getNumCachedSerializedBroadcast) -//masterTracker.stop() // this throws an exception +masterTracker.stop() rpcEnv.shutdown() } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-21663][TESTS] test("remote fetch below max RPC message size") should call masterTracker.stop() in MapOutputTrackerSuite
Repository: spark Updated Branches: refs/heads/branch-2.2 f6d56d2f1 -> 3ca55eaaf [SPARK-21663][TESTS] test("remote fetch below max RPC message size") should call masterTracker.stop() in MapOutputTrackerSuite Signed-off-by: 10087686 ## What changes were proposed in this pull request? After Unit tests endï¼there should be call masterTracker.stop() to free resource; (Please fill in changes proposed in this fix) ## How was this patch tested? Run Unit tests; (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. Author: 10087686Closes #18867 from wangjiaochun/mapout. (cherry picked from commit 6426adffaf152651c30d481bb925d5025fd6130a) Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3ca55eaa Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3ca55eaa Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3ca55eaa Branch: refs/heads/branch-2.2 Commit: 3ca55eaafee8f4216eb5466021a97604713033a1 Parents: f6d56d2 Author: 10087686 Authored: Wed Aug 9 18:45:38 2017 +0800 Committer: Wenchen Fan Committed: Wed Aug 9 18:45:53 2017 +0800 -- .../src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3ca55eaa/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index 71bedda..ca94fd1 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -175,7 +175,8 @@ class MapOutputTrackerSuite extends SparkFunSuite { val masterTracker = newTrackerMaster(newConf) val rpcEnv = createRpcEnv("spark") val masterEndpoint = new MapOutputTrackerMasterEndpoint(rpcEnv, masterTracker, newConf) -rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME, masterEndpoint) +masterTracker.trackerEndpoint = + rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME, masterEndpoint) // Message size should be ~123B, and no exception should be thrown masterTracker.registerShuffle(10, 1) @@ -190,7 +191,7 @@ class MapOutputTrackerSuite extends SparkFunSuite { verify(rpcCallContext, timeout(3)).reply(any()) assert(0 == masterTracker.getNumCachedSerializedBroadcast) -//masterTracker.stop() // this throws an exception +masterTracker.stop() rpcEnv.shutdown() } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-21596][SS] Ensure places calling HDFSMetadataLog.get check the return value
Repository: spark Updated Branches: refs/heads/branch-2.2 7446be332 -> f6d56d2f1 [SPARK-21596][SS] Ensure places calling HDFSMetadataLog.get check the return value Same PR as #18799 but for branch 2.2. Main discussion the other PR. When I was investigating a flaky test, I realized that many places don't check the return value of `HDFSMetadataLog.get(batchId: Long): Option[T]`. When a batch is supposed to be there, the caller just ignores None rather than throwing an error. If some bug causes a query doesn't generate a batch metadata file, this behavior will hide it and allow the query continuing to run and finally delete metadata logs and make it hard to debug. This PR ensures that places calling HDFSMetadataLog.get always check the return value. Jenkins Author: Shixiong ZhuCloses #18890 from tdas/SPARK-21596-2.2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f6d56d2f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f6d56d2f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f6d56d2f Branch: refs/heads/branch-2.2 Commit: f6d56d2f1c377000921effea2b1faae15f9cae82 Parents: 7446be3 Author: Shixiong Zhu Authored: Tue Aug 8 23:49:33 2017 -0700 Committer: Shixiong Zhu Committed: Tue Aug 8 23:49:33 2017 -0700 -- .../streaming/CompactibleFileStreamLog.scala| 24 ++--- .../streaming/FileStreamSourceLog.scala | 5 +- .../execution/streaming/HDFSMetadataLog.scala | 57 ++-- .../execution/streaming/StreamExecution.scala | 17 -- .../streaming/HDFSMetadataLogSuite.scala| 17 ++ .../sql/streaming/FileStreamSourceSuite.scala | 1 + 6 files changed, 104 insertions(+), 17 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f6d56d2f/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala index 408c8f8..77bc0ba 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala @@ -169,13 +169,15 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag]( */ private def compact(batchId: Long, logs: Array[T]): Boolean = { val validBatches = getValidBatchesBeforeCompactionBatch(batchId, compactInterval) -val allLogs = validBatches.flatMap(batchId => super.get(batchId)).flatten ++ logs -if (super.add(batchId, compactLogs(allLogs).toArray)) { - true -} else { - // Return false as there is another writer. - false -} +val allLogs = validBatches.map { id => + super.get(id).getOrElse { +throw new IllegalStateException( + s"${batchIdToPath(id)} doesn't exist when compacting batch $batchId " + +s"(compactInterval: $compactInterval)") + } +}.flatten ++ logs +// Return false as there is another writer. +super.add(batchId, compactLogs(allLogs).toArray) } /** @@ -190,7 +192,13 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag]( if (latestId >= 0) { try { val logs = -getAllValidBatches(latestId, compactInterval).flatMap(id => super.get(id)).flatten +getAllValidBatches(latestId, compactInterval).map { id => + super.get(id).getOrElse { +throw new IllegalStateException( + s"${batchIdToPath(id)} doesn't exist " + +s"(latestId: $latestId, compactInterval: $compactInterval)") + } +}.flatten return compactLogs(logs).toArray } catch { case e: IOException => http://git-wip-us.apache.org/repos/asf/spark/blob/f6d56d2f/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala index 33e6a1d..8628471 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala @@ -115,7 +115,10 @@ class FileStreamSourceLog( Map.empty[Long, Option[Array[FileEntry]]]
spark git commit: [SPARK-21523][ML] update breeze to 0.13.2 for an emergency bugfix in strong wolfe line search
Repository: spark Updated Branches: refs/heads/branch-2.2 d02331452 -> 7446be332 [SPARK-21523][ML] update breeze to 0.13.2 for an emergency bugfix in strong wolfe line search ## What changes were proposed in this pull request? Update breeze to 0.13.1 for an emergency bugfix in strong wolfe line search https://github.com/scalanlp/breeze/pull/651 ## How was this patch tested? N/A Author: WeichenXuCloses #18797 from WeichenXu123/update-breeze. (cherry picked from commit b35660dd0e930f4b484a079d9e2516b0a7dacf1d) Signed-off-by: Yanbo Liang Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7446be33 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7446be33 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7446be33 Branch: refs/heads/branch-2.2 Commit: 7446be3328ea75a5197b2587e3a8e2ca7977726b Parents: d023314 Author: WeichenXu Authored: Wed Aug 9 14:44:10 2017 +0800 Committer: Yanbo Liang Committed: Wed Aug 9 14:44:39 2017 +0800 -- dev/deps/spark-deps-hadoop-2.6| 4 ++-- dev/deps/spark-deps-hadoop-2.7| 4 ++-- .../spark/ml/regression/AFTSurvivalRegression.scala | 2 ++ .../ml/regression/AFTSurvivalRegressionSuite.scala| 1 - .../org/apache/spark/ml/util/MLTestingUtils.scala | 1 - .../apache/spark/mllib/optimization/LBFGSSuite.scala | 4 ++-- pom.xml | 2 +- python/pyspark/ml/regression.py | 14 +++--- 8 files changed, 16 insertions(+), 16 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7446be33/dev/deps/spark-deps-hadoop-2.6 -- diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6 index 9287bd4..02c0b21 100644 --- a/dev/deps/spark-deps-hadoop-2.6 +++ b/dev/deps/spark-deps-hadoop-2.6 @@ -19,8 +19,8 @@ avro-mapred-1.7.7-hadoop2.jar base64-2.3.8.jar bcprov-jdk15on-1.51.jar bonecp-0.8.0.RELEASE.jar -breeze-macros_2.11-0.13.1.jar -breeze_2.11-0.13.1.jar +breeze-macros_2.11-0.13.2.jar +breeze_2.11-0.13.2.jar calcite-avatica-1.2.0-incubating.jar calcite-core-1.2.0-incubating.jar calcite-linq4j-1.2.0-incubating.jar http://git-wip-us.apache.org/repos/asf/spark/blob/7446be33/dev/deps/spark-deps-hadoop-2.7 -- diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7 index ab1de3d..47e28de 100644 --- a/dev/deps/spark-deps-hadoop-2.7 +++ b/dev/deps/spark-deps-hadoop-2.7 @@ -19,8 +19,8 @@ avro-mapred-1.7.7-hadoop2.jar base64-2.3.8.jar bcprov-jdk15on-1.51.jar bonecp-0.8.0.RELEASE.jar -breeze-macros_2.11-0.13.1.jar -breeze_2.11-0.13.1.jar +breeze-macros_2.11-0.13.2.jar +breeze_2.11-0.13.2.jar calcite-avatica-1.2.0-incubating.jar calcite-core-1.2.0-incubating.jar calcite-linq4j-1.2.0-incubating.jar http://git-wip-us.apache.org/repos/asf/spark/blob/7446be33/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala index 094853b..0891994 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala @@ -553,6 +553,8 @@ private class AFTAggregator( val ti = data.label val delta = data.censor +require(ti > 0.0, "The lifetime or label should be greater than 0.") + val localFeaturesStd = bcFeaturesStd.value val margin = { http://git-wip-us.apache.org/repos/asf/spark/blob/7446be33/mllib/src/test/scala/org/apache/spark/ml/regression/AFTSurvivalRegressionSuite.scala -- diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/AFTSurvivalRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/AFTSurvivalRegressionSuite.scala index fb39e50..02e5c6d 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/regression/AFTSurvivalRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/regression/AFTSurvivalRegressionSuite.scala @@ -364,7 +364,6 @@ class AFTSurvivalRegressionSuite test("should support all NumericType censors, and not support other types") { val df = spark.createDataFrame(Seq( - (0, Vectors.dense(0)), (1, Vectors.dense(1)), (2, Vectors.dense(2)), (3, Vectors.dense(3)),
spark git commit: [SPARK-21523][ML] update breeze to 0.13.2 for an emergency bugfix in strong wolfe line search
Repository: spark Updated Branches: refs/heads/master ae8a2b149 -> b35660dd0 [SPARK-21523][ML] update breeze to 0.13.2 for an emergency bugfix in strong wolfe line search ## What changes were proposed in this pull request? Update breeze to 0.13.1 for an emergency bugfix in strong wolfe line search https://github.com/scalanlp/breeze/pull/651 ## How was this patch tested? N/A Author: WeichenXuCloses #18797 from WeichenXu123/update-breeze. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b35660dd Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b35660dd Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b35660dd Branch: refs/heads/master Commit: b35660dd0e930f4b484a079d9e2516b0a7dacf1d Parents: ae8a2b1 Author: WeichenXu Authored: Wed Aug 9 14:44:10 2017 +0800 Committer: Yanbo Liang Committed: Wed Aug 9 14:44:10 2017 +0800 -- dev/deps/spark-deps-hadoop-2.6| 4 ++-- dev/deps/spark-deps-hadoop-2.7| 4 ++-- .../spark/ml/regression/AFTSurvivalRegression.scala | 2 ++ .../ml/regression/AFTSurvivalRegressionSuite.scala| 1 - .../org/apache/spark/ml/util/MLTestingUtils.scala | 1 - .../apache/spark/mllib/optimization/LBFGSSuite.scala | 4 ++-- pom.xml | 2 +- python/pyspark/ml/regression.py | 14 +++--- 8 files changed, 16 insertions(+), 16 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b35660dd/dev/deps/spark-deps-hadoop-2.6 -- diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6 index a41183a..d7587fb 100644 --- a/dev/deps/spark-deps-hadoop-2.6 +++ b/dev/deps/spark-deps-hadoop-2.6 @@ -22,8 +22,8 @@ avro-mapred-1.7.7-hadoop2.jar base64-2.3.8.jar bcprov-jdk15on-1.51.jar bonecp-0.8.0.RELEASE.jar -breeze-macros_2.11-0.13.1.jar -breeze_2.11-0.13.1.jar +breeze-macros_2.11-0.13.2.jar +breeze_2.11-0.13.2.jar calcite-avatica-1.2.0-incubating.jar calcite-core-1.2.0-incubating.jar calcite-linq4j-1.2.0-incubating.jar http://git-wip-us.apache.org/repos/asf/spark/blob/b35660dd/dev/deps/spark-deps-hadoop-2.7 -- diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7 index 5e1321b..887eeca 100644 --- a/dev/deps/spark-deps-hadoop-2.7 +++ b/dev/deps/spark-deps-hadoop-2.7 @@ -22,8 +22,8 @@ avro-mapred-1.7.7-hadoop2.jar base64-2.3.8.jar bcprov-jdk15on-1.51.jar bonecp-0.8.0.RELEASE.jar -breeze-macros_2.11-0.13.1.jar -breeze_2.11-0.13.1.jar +breeze-macros_2.11-0.13.2.jar +breeze_2.11-0.13.2.jar calcite-avatica-1.2.0-incubating.jar calcite-core-1.2.0-incubating.jar calcite-linq4j-1.2.0-incubating.jar http://git-wip-us.apache.org/repos/asf/spark/blob/b35660dd/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala index 094853b..0891994 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala @@ -553,6 +553,8 @@ private class AFTAggregator( val ti = data.label val delta = data.censor +require(ti > 0.0, "The lifetime or label should be greater than 0.") + val localFeaturesStd = bcFeaturesStd.value val margin = { http://git-wip-us.apache.org/repos/asf/spark/blob/b35660dd/mllib/src/test/scala/org/apache/spark/ml/regression/AFTSurvivalRegressionSuite.scala -- diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/AFTSurvivalRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/AFTSurvivalRegressionSuite.scala index fb39e50..02e5c6d 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/regression/AFTSurvivalRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/regression/AFTSurvivalRegressionSuite.scala @@ -364,7 +364,6 @@ class AFTSurvivalRegressionSuite test("should support all NumericType censors, and not support other types") { val df = spark.createDataFrame(Seq( - (0, Vectors.dense(0)), (1, Vectors.dense(1)), (2, Vectors.dense(2)), (3, Vectors.dense(3)), http://git-wip-us.apache.org/repos/asf/spark/blob/b35660dd/mllib/src/test/scala/org/apache/spark/ml/util/MLTestingUtils.scala
spark git commit: [SPARK-21176][WEB UI] Use a single ProxyServlet to proxy all workers and applications
Repository: spark Updated Branches: refs/heads/master f016f5c8f -> ae8a2b149 [SPARK-21176][WEB UI] Use a single ProxyServlet to proxy all workers and applications ## What changes were proposed in this pull request? Currently, each application and each worker creates their own proxy servlet. Each proxy servlet is backed by its own HTTP client and a relatively large number of selector threads. This is excessive but was fixed (to an extent) by https://github.com/apache/spark/pull/18437. However, a single HTTP client (backed by a single selector thread) should be enough to handle all proxy requests. This PR creates a single proxy servlet no matter how many applications and workers there are. ## How was this patch tested? . The unit tests for rewriting proxied locations and headers were updated. I then spun up a 100 node cluster to ensure that proxy'ing worked correctly jiangxb1987 Please let me know if there's anything else I can do to help push this thru. Thanks! Author: Anderson OsagieCloses #18499 from aosagie/fix/minimize-proxy-threads. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ae8a2b14 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ae8a2b14 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ae8a2b14 Branch: refs/heads/master Commit: ae8a2b14966b1dfa10e620bb24ca6560778c20e7 Parents: f016f5c Author: Anderson Osagie Authored: Wed Aug 9 14:35:27 2017 +0800 Committer: Wenchen Fan Committed: Wed Aug 9 14:35:27 2017 +0800 -- .../org/apache/spark/deploy/master/Master.scala | 15 ++- .../spark/deploy/master/ui/MasterWebUI.scala| 21 + .../scala/org/apache/spark/ui/JettyUtils.scala | 45 +++- .../scala/org/apache/spark/ui/UISuite.scala | 20 - 4 files changed, 46 insertions(+), 55 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ae8a2b14/core/src/main/scala/org/apache/spark/deploy/master/Master.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 4cc580e..e030cac 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -133,6 +133,7 @@ private[deploy] class Master( masterWebUiUrl = "http://; + masterPublicAddress + ":" + webUi.boundPort if (reverseProxy) { masterWebUiUrl = conf.get("spark.ui.reverseProxyUrl", masterWebUiUrl) + webUi.addProxy() logInfo(s"Spark Master is acting as a reverse proxy. Master, Workers and " + s"Applications UIs are available at $masterWebUiUrl") } @@ -769,9 +770,6 @@ private[deploy] class Master( workers += worker idToWorker(worker.id) = worker addressToWorker(workerAddress) = worker -if (reverseProxy) { - webUi.addProxyTargets(worker.id, worker.webUiAddress) -} true } @@ -780,9 +778,7 @@ private[deploy] class Master( worker.setState(WorkerState.DEAD) idToWorker -= worker.id addressToWorker -= worker.endpoint.address -if (reverseProxy) { - webUi.removeProxyTargets(worker.id) -} + for (exec <- worker.executors.values) { logInfo("Telling app of lost executor: " + exec.id) exec.application.driver.send(ExecutorUpdated( @@ -844,9 +840,6 @@ private[deploy] class Master( endpointToApp(app.driver) = app addressToApp(appAddress) = app waitingApps += app -if (reverseProxy) { - webUi.addProxyTargets(app.id, app.desc.appUiUrl) -} } private def finishApplication(app: ApplicationInfo) { @@ -860,9 +853,7 @@ private[deploy] class Master( idToApp -= app.id endpointToApp -= app.driver addressToApp -= app.driver.address - if (reverseProxy) { -webUi.removeProxyTargets(app.id) - } + if (completedApps.size >= RETAINED_APPLICATIONS) { val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1) completedApps.take(toRemove).foreach { a => http://git-wip-us.apache.org/repos/asf/spark/blob/ae8a2b14/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala index e42f41b..35b7ddd 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala @@ -17,10 +17,7 @@ package org.apache.spark.deploy.master.ui -import